-
-
Save kovetskiy/3948faf8c9d83e10a91fa06056674e3c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sudo systemctl stop kafka; sudo rm -rf /var/lib/kafka/my-topic-0; sudo systemctl start kafka; AMOUNT=20000 THREADS=5 CYCLES=10 go run main.go | |
2018/07/18 15:02:47 dial tcp [::1]:9092: connect: connection refused | |
2018/07/18 15:02:47 dial tcp [::1]:9092: connect: connection refused | |
2018/07/18 15:02:47 dial tcp [::1]:9092: connect: connection refused | |
2018/07/18 15:02:47 dial tcp [::1]:9092: connect: connection refused | |
2018/07/18 15:02:47 dial tcp [::1]:9092: connect: connection refused | |
2018/07/18 15:02:47 dial tcp [::1]:9092: connect: connection refused | |
2018/07/18 15:02:47 dial tcp [::1]:9092: connect: connection refused | |
2018/07/18 15:02:47 dial tcp [::1]:9092: connect: connection refused | |
2018/07/18 15:02:47 dial tcp [::1]:9092: connect: connection refused | |
2018/07/18 15:02:48 dial tcp [::1]:9092: connect: connection refused | |
starting producing messages, messages: 20000 ; threads: 5 | |
PRODUCE (cycle: 1): 20000 messages: 6.248497373s (3200.77 m/s) | |
PRODUCE (cycle: 2): 20000 messages: 2.599712804s (7693.16 m/s) | |
PRODUCE (cycle: 3): 20000 messages: 1.655028401s (12084.38 m/s) | |
PRODUCE (cycle: 4): 20000 messages: 1.65300486s (12099.18 m/s) | |
PRODUCE (cycle: 5): 20000 messages: 1.667484248s (11994.12 m/s) | |
PRODUCE (cycle: 6): 20000 messages: 1.687667123s (11850.68 m/s) | |
PRODUCE (cycle: 7): 20000 messages: 1.733858764s (11534.96 m/s) | |
PRODUCE (cycle: 8): 20000 messages: 1.705781828s (11724.83 m/s) | |
PRODUCE (cycle: 9): 20000 messages: 1.691839654s (11821.45 m/s) | |
PRODUCE (cycle: 10): 20000 messages: 1.7095852s (11698.74 m/s) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"context" | |
"encoding/binary" | |
"fmt" | |
"log" | |
"os" | |
"strconv" | |
"sync" | |
"time" | |
"github.com/kovetskiy/goa/uuid" | |
"github.com/segmentio/kafka-go" | |
) | |
type Packet struct { | |
ID int64 `json:"id"` | |
AccountDebit uuid.UUID `json:"account_debit" binding:"required"` | |
AccountCredit uuid.UUID `json:"account_credit" binding:"required"` | |
Status int64 `json:"status,omitempty"` | |
Side int64 `json:"side"` | |
Kind int64 `json:"kind"` | |
Market [10]byte `json:"market"` | |
Amount int64 `json:"amount"` | |
Price int64 `json:"price"` | |
CreatedAt int64 `json:"created_at"` | |
UpdatedAt int64 `json:"updated_at,omitempty"` | |
} | |
func main() { | |
topic := "my-topic" | |
partition := 0 | |
var err error | |
var conn *kafka.Conn | |
for { | |
conn, err = kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition) | |
if err != nil { | |
log.Println(err) | |
time.Sleep(time.Millisecond * 100) | |
continue | |
} | |
break | |
} | |
buffer := bytes.NewBuffer(nil) | |
err = binary.Write(buffer, binary.BigEndian, &Packet{}) | |
if err != nil { | |
panic(err) | |
} | |
amount := mustGetEnvInt("AMOUNT") | |
threads := mustGetEnvInt("THREADS") | |
cycles := mustGetEnvInt("CYCLES") | |
fmt.Println( | |
"starting producing messages, messages:", | |
amount, | |
"; threads:", | |
threads, | |
) | |
for retry := 0; retry < cycles; retry++ { | |
produceStarted := time.Now() | |
wg := &sync.WaitGroup{} | |
for t := 0; t < threads; t++ { | |
wg.Add(1) | |
go func() { | |
for i := 0; i < amount/threads; i++ { | |
_, err = conn.WriteMessages( | |
kafka.Message{Value: buffer.Bytes()}, | |
) | |
if err != nil { | |
panic(err) | |
} | |
} | |
wg.Done() | |
}() | |
} | |
wg.Wait() | |
produceFinished := time.Now() | |
duration := produceFinished.Sub(produceStarted) | |
fmt.Printf( | |
"PRODUCE (cycle: %d): %d messages: %s (%.2f m/s)\n", | |
retry+1, amount, duration, | |
float64(amount)/float64(duration.Seconds()), | |
) | |
} | |
conn.Close() | |
} | |
func mustGetEnvInt(key string) int { | |
value := os.Getenv(key) | |
if value == "" { | |
panic("no env value " + key + " specified") | |
} | |
number, err := strconv.Atoi(value) | |
if err != nil { | |
panic(err) | |
} | |
return number | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment