-
-
Save glycerine/cbdd58e889b8805a7101 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
// simple example to show how to process one message at a time with nsq using the go-nsq client library. | |
// see config stuff in var below to play around with different scenarios. | |
import ( | |
"log" | |
"os" | |
"os/signal" | |
"strconv" | |
"syscall" | |
"time" | |
"github.com/nsqio/go-nsq" | |
) | |
var ( | |
msgTimeout = time.Second * 7 // how long to wait for a message to be process | |
jobRunTime = time.Second * 6 // fake time it takes to process a message | |
numMessages = 10 // how many messages to publish | |
numConsumers = 10 // number of individual consumers | |
nsqdTCPAddrs = []string{"localhost:4150"} | |
topic = "t" | |
channel = "c" | |
) | |
type JobHandler struct { | |
consumerId int | |
consumer *nsq.Consumer | |
timeToRun time.Duration | |
} | |
func (jh *JobHandler) HandleMessage(m *nsq.Message) error { | |
// key line here ... we'll tell the other end when we are done with this message | |
m.DisableAutoResponse() | |
s := string(m.Body) | |
time.Sleep(jh.timeToRun) // do some work. if this takes longer than our MsgTimeout, though, we have a problem | |
log.Printf("consumer %d: finished processing %s", jh.consumerId, s) | |
// ok, all finished processing. stop the consumer and let the nsqd know | |
jh.consumer.Stop() | |
m.Finish() | |
return nil | |
} | |
func main() { | |
stopChan := make(chan bool) | |
termChan := make(chan os.Signal, 1) | |
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) | |
// produce some messages | |
go func() { | |
producer, err := nsq.NewProducer(nsqdTCPAddrs[0], nsq.NewConfig()) | |
if err != nil { | |
log.Fatal(err) | |
} | |
bs := make([][]byte, 0) | |
for i := 1; i <= numMessages; i++ { | |
b := []byte(strconv.Itoa(i)) | |
bs = append(bs, b) | |
} | |
err = producer.MultiPublish(topic, bs) | |
log.Println("producer: published", len(bs), "messages") | |
if err != nil { | |
log.Fatal(err) | |
} | |
}() | |
// fire up some consumers to process the messages | |
go func() { | |
for i := 1; i <= numConsumers; i++ { | |
cfg := nsq.NewConfig() | |
cfg.MaxInFlight = 1 | |
cfg.MsgTimeout = msgTimeout | |
consumer, err := nsq.NewConsumer(topic, channel, cfg) | |
if err != nil { | |
log.Fatal(err) | |
} | |
consumer.AddHandler(&JobHandler{timeToRun: jobRunTime, consumer: consumer, consumerId: i}) | |
err = consumer.ConnectToNSQDs(nsqdTCPAddrs) | |
if err != nil { | |
log.Fatal(err) | |
} | |
} | |
}() | |
// hang around so stuff can happen | |
select { | |
case <-termChan: | |
case <-stopChan: | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment