Created
January 29, 2019 20:32
-
-
Save mantzas/dbd5a26c91bcadc2f778d8aa05c907f7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"os" | |
"os/signal" | |
"syscall" | |
"github.com/confluentinc/confluent-kafka-go/kafka" | |
beatkafka "github.com/taxibeat/go-toolkit/benchmarks/kafka" | |
) | |
func main() { | |
sigchan := make(chan os.Signal, 1) | |
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) | |
cnt := beatkafka.NewCounter() | |
topic := "test_topic" | |
c, err := kafka.NewConsumer(&kafka.ConfigMap{ | |
"bootstrap.servers": "localhost:9092", | |
"session.timeout.ms": 6000, | |
"go.events.channel.enable": true, | |
"go.application.rebalance.enable": true, | |
"group.id": "test-id", | |
"default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"}}) | |
if err != nil { | |
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err) | |
os.Exit(1) | |
} | |
fmt.Printf("Created Consumer %v\n", c) | |
err = c.SubscribeTopics([]string{topic}, nil) | |
run := true | |
for run == true { | |
select { | |
case sig := <-sigchan: | |
fmt.Printf("Caught signal %v: terminating\n", sig) | |
run = false | |
case ev := <-c.Events(): | |
switch e := ev.(type) { | |
case kafka.AssignedPartitions: | |
fmt.Fprintf(os.Stderr, "%% %v\n", e) | |
c.Assign(e.Partitions) | |
case kafka.RevokedPartitions: | |
fmt.Fprintf(os.Stderr, "%% %v\n", e) | |
c.Unassign() | |
case *kafka.Message: | |
cnt.Inc() | |
case kafka.PartitionEOF: | |
fmt.Printf("%% Reached %v\n", e) | |
cnt.PrintStats() | |
case kafka.Error: | |
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e) | |
run = false | |
} | |
} | |
} | |
fmt.Printf("Closing consumer\n") | |
c.Close() | |
cnt.PrintStats() | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"log" | |
"os" | |
"os/signal" | |
"sync" | |
"syscall" | |
"time" | |
"github.com/confluentinc/confluent-kafka-go/kafka" | |
beatkafka "github.com/taxibeat/go-toolkit/benchmarks/kafka" | |
) | |
func main() { | |
cnt := beatkafka.NewCounter() | |
broker := "localhost:9092" | |
topic := "test_topic" | |
msg := kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte("test")} | |
sigchan := make(chan os.Signal, 1) | |
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) | |
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) | |
if err != nil { | |
log.Fatalf("failed to create producer: %v", err) | |
} | |
terminate := false | |
wg := sync.WaitGroup{} | |
go func(t *bool) { | |
wg.Add(1) | |
for !*t { | |
p.ProduceChannel() <- &msg | |
} | |
wg.Done() | |
}(&terminate) | |
go func(t *bool) { | |
wg.Add(1) | |
for !*t { | |
e := <-p.Events() | |
switch ev := e.(type) { | |
case *kafka.Message: | |
if ev.TopicPartition.Error == nil { | |
cnt.Inc() | |
} | |
} | |
} | |
wg.Done() | |
}(&terminate) | |
for { | |
select { | |
case sig := <-sigchan: | |
log.Printf("Caught signal %v: terminating", sig) | |
terminate = true | |
wg.Wait() | |
p.Flush(1000) | |
//p.Close() this one blocks forever | |
cnt.PrintStats() | |
os.Exit(0) | |
case <-time.NewTimer(time.Second).C: | |
cnt.PrintStats() | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package kafka | |
import ( | |
"log" | |
"sync" | |
"sync/atomic" | |
"time" | |
) | |
type Counter struct { | |
sync.Mutex | |
start time.Time | |
cnt int64 | |
} | |
func NewCounter() *Counter { | |
return &Counter{cnt: 0, start: time.Now()} | |
} | |
func (c *Counter) Inc() { | |
atomic.AddInt64(&c.cnt, 1) | |
} | |
func (c *Counter) PrintStats() { | |
c.Lock() | |
count := c.cnt | |
c.Unlock() | |
dur := time.Since(c.start) | |
rate := float64(count) / dur.Seconds() | |
log.Printf("Processed %d message with a rate of %f msg/sec", c.cnt, rate) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"log" | |
"os" | |
"os/signal" | |
"syscall" | |
"github.com/Shopify/sarama" | |
beatkafka "github.com/taxibeat/go-toolkit/benchmarks/kafka" | |
) | |
type consumerGroupHandler struct { | |
cnt *beatkafka.Counter | |
sigChan <-chan os.Signal | |
} | |
func (*consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } | |
func (*consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } | |
func (cgh *consumerGroupHandler) ConsumeClaim(ses sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { | |
for { | |
select { | |
case sig := <-cgh.sigChan: | |
fmt.Printf("Caught signal %v: terminating\n", sig) | |
return nil | |
case <-claim.Messages(): | |
cgh.cnt.Inc() | |
} | |
} | |
} | |
func main() { | |
sigchan := make(chan os.Signal, 1) | |
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) | |
cnt := beatkafka.NewCounter() | |
cfg := sarama.NewConfig() | |
cfg.Version = sarama.V1_0_0_0 | |
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest | |
cfg.Consumer.Return.Errors = true | |
gr, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "sarama-test", cfg) | |
if err != nil { | |
log.Fatalf("failed to create consumer group: %v", err) | |
} | |
defer func() { _ = gr.Close() }() | |
topics := []string{"test_topic"} | |
handler := consumerGroupHandler{cnt: cnt, sigChan: sigchan} | |
err = gr.Consume(context.Background(), topics, &handler) | |
if err != nil { | |
log.Fatalf("failed to consume group: %v", err) | |
} | |
cnt.PrintStats() | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"log" | |
"os" | |
"os/signal" | |
"sync" | |
"syscall" | |
"time" | |
"github.com/Shopify/sarama" | |
beatkafka "github.com/taxibeat/go-toolkit/benchmarks/kafka" | |
) | |
func main() { | |
cnt := beatkafka.NewCounter() | |
brokers := []string{"localhost:9092"} | |
topic := "test_topic" | |
msg := sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder("test")} | |
sigchan := make(chan os.Signal, 1) | |
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) | |
config := sarama.NewConfig() | |
config.Version = sarama.V0_11_0_0 | |
config.Producer.Return.Successes = true | |
prod, err := sarama.NewAsyncProducer(brokers, config) | |
if err != nil { | |
log.Fatalf("failed to create producer: %v", err) | |
} | |
terminate := false | |
wg := sync.WaitGroup{} | |
go func(t *bool) { | |
wg.Add(1) | |
for !*t { | |
prod.Input() <- &msg | |
} | |
wg.Done() | |
}(&terminate) | |
go func(t *bool) { | |
wg.Add(1) | |
for !*t { | |
<-prod.Successes() | |
cnt.Inc() | |
} | |
wg.Done() | |
}(&terminate) | |
for { | |
select { | |
case sig := <-sigchan: | |
log.Printf("Caught signal %v: terminating", sig) | |
terminate = true | |
wg.Wait() | |
prod.Close() | |
cnt.PrintStats() | |
os.Exit(0) | |
case <-time.NewTimer(time.Second).C: | |
cnt.PrintStats() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment