Last active
May 24, 2020 02:53
-
-
Save tranphuoctien/ed08acdf42ee5698146519ce6c7f6a11 to your computer and use it in GitHub Desktop.
Auto reconnect rabbitMQ
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package consumers | |
import ( | |
"errors" | |
"fmt" | |
"os" | |
"runtime" | |
"sync/atomic" | |
"time" | |
"git.ecomeasy.asia/ece/services/config" | |
_ "git.ecomeasy.asia/ece/services/utils" | |
"github.com/labstack/gommon/log" | |
"github.com/manucorporat/try" | |
"github.com/streadway/amqp" | |
) | |
type Consumer struct { | |
conn *amqp.Connection | |
channel *amqp.Channel | |
done chan error | |
consumerTag string // Name that consumer identifies itself to the server with | |
uri string // uri of the rabbitmq server | |
exchange string // exchange that we will bind to | |
exchangeType string // topic, direct, etc... | |
lastRecoverTime int64 | |
//track service current status | |
currentStatus atomic.Value | |
} | |
const RECOVER_INTERVAL_TIME = 6 * 60 | |
// NewConsumer returns a Consumer struct that has been initialized properly | |
// essentially don't touch conn, channel, or done and you can create Consumer manually | |
func newConsumer(consumerTag, uri, exchange, exchangeType string) *Consumer { | |
name, err := os.Hostname() | |
if err != nil { | |
name = "ece" | |
} | |
consumer := &Consumer{ | |
consumerTag: fmt.Sprintf("%s_%s", name, consumerTag), | |
uri: uri, | |
exchange: exchange, | |
exchangeType: exchangeType, | |
done: make(chan error), | |
lastRecoverTime: time.Now().Unix(), | |
} | |
consumer.currentStatus.Store(true) | |
return consumer | |
} | |
func maxParallelism() int { | |
maxProcs := runtime.GOMAXPROCS(0) | |
numCPU := runtime.NumCPU() | |
if maxProcs < numCPU { | |
return maxProcs | |
} | |
return numCPU | |
} | |
func RunConsumer(rabbitUri, consumerTag, exchange, exchangeType, queueName, routingKey string, handler func([]byte) bool) { | |
consumer := newConsumer(consumerTag, rabbitUri, exchange, exchangeType) | |
if err := consumer.Connect(); err != nil { | |
fmt.Sprintf("[%s]connect error", consumerTag) | |
} | |
deliveries, _ := consumer.AnnounceQueue(queueName, routingKey) | |
//fmt.Sprintf("[%s]Error when calling AnnounceQueue()", consumerTag) | |
consumer.Handle(deliveries, handler, maxParallelism(), queueName, routingKey) | |
} | |
func (c *Consumer) ReConnect(queueName, routingKey string, retryTime int) (<-chan amqp.Delivery, error) { | |
c.Close() | |
time.Sleep(time.Duration(config.Config.RabbitMq.TimeOutRetry) * time.Second) | |
log.Info("Try ReConnect with times:", retryTime) | |
if err := c.Connect(); err != nil { | |
return nil, err | |
} | |
deliveries, err := c.AnnounceQueue(queueName, routingKey) | |
if err != nil { | |
return deliveries, errors.New("Couldn't connect") | |
} | |
return deliveries, nil | |
} | |
// Connect to RabbitMQ server | |
func (c *Consumer) Connect() error { | |
var err error | |
log.Info("dialing: ", c.uri) | |
c.conn, err = amqp.Dial(c.uri) | |
if err != nil { | |
return fmt.Errorf("Dial: %s", err) | |
} | |
go func() { | |
// Waits here for the channel to be closed | |
log.Info("closing: ", <-c.conn.NotifyClose(make(chan *amqp.Error))) | |
// Let Handle know it's not time to reconnect | |
c.done <- errors.New("Channel Closed") | |
}() | |
log.Info("got Connection, getting Channel") | |
c.channel, err = c.conn.Channel() | |
if err != nil { | |
return fmt.Errorf("Channel: %s", err) | |
} | |
log.Info("got Channel, declaring Exchange ", c.exchange) | |
if err = c.channel.ExchangeDeclare( | |
c.exchange, // name of the exchange | |
c.exchangeType, // type | |
true, // durable | |
false, // delete when complete | |
false, // internal | |
false, // noWait | |
nil, // arguments | |
); err != nil { | |
return fmt.Errorf("Exchange Declare: %s", err) | |
} | |
return nil | |
} | |
// AnnounceQueue sets the queue that will be listened to for this | |
// connection... | |
func (c *Consumer) AnnounceQueue(queueName, routingKey string) (<-chan amqp.Delivery, error) { | |
log.Info("declared Exchange, declaring Queue:", queueName) | |
queue, err := c.channel.QueueDeclare( | |
queueName, // name of the queue | |
true, // durable | |
false, // delete when usused | |
false, // exclusive | |
false, // noWait | |
nil, // arguments | |
) | |
if err != nil { | |
return nil, fmt.Errorf("Queue Declare: %s", err) | |
} | |
log.Info(fmt.Sprintf("declared Queue (%q %d messages, %d consumers), binding to Exchange (key %q)", | |
queue.Name, queue.Messages, queue.Consumers, routingKey)) | |
err = c.channel.Qos(50, 0, false) | |
if err != nil { | |
return nil, fmt.Errorf("Error setting qos: %s", err) | |
} | |
if err = c.channel.QueueBind( | |
queue.Name, // name of the queue | |
routingKey, // routingKey | |
c.exchange, // sourceExchange | |
false, // noWait | |
nil, // arguments | |
); err != nil { | |
return nil, fmt.Errorf("Queue Bind: %s", err) | |
} | |
log.Info("Queue bound to Exchange, starting Consume consumer tag:", c.consumerTag) | |
deliveries, err := c.channel.Consume( | |
queue.Name, // name | |
c.consumerTag, // consumerTag, | |
false, // noAck | |
false, // exclusive | |
false, // noLocal | |
false, // noWait | |
nil, // arguments | |
) | |
if err != nil { | |
return nil, fmt.Errorf("Queue Consume: %s", err) | |
} | |
return deliveries, nil | |
} | |
func (c *Consumer) Close() { | |
if c.channel != nil { | |
c.channel.Close() | |
c.channel = nil | |
} | |
if c.conn != nil { | |
c.conn.Close() | |
c.conn = nil | |
} | |
} | |
func (c *Consumer) Handle( | |
deliveries <-chan amqp.Delivery, | |
fn func([]byte) bool, | |
threads int, | |
queue string, | |
routingKey string) { | |
var err error | |
for { | |
log.Info("Enter for busy loop with thread:", threads) | |
for i := 0; i < threads; i++ { | |
go func() { | |
log.Info("Enter go with thread with deliveries", deliveries) | |
for msg := range deliveries { | |
log.Info("Enter deliver") | |
ret := false | |
try.This(func() { | |
body := msg.Body[:] | |
ret = fn(body) | |
/* var data map[string]interface{} | |
if err := json.Unmarshal(msg.Body, &data); err != nil { | |
panic(err) | |
} | |
isSuccess := true | |
//isSuccess := utils.AddService2k(data["order_code"].(string)) | |
log.Info("Checking order #", data["order_code"].(string), "OK: ", isSuccess) */ | |
}).Finally(func() { | |
if ret == true { | |
msg.Ack(false) | |
currentTime := time.Now().Unix() | |
if currentTime-c.lastRecoverTime > RECOVER_INTERVAL_TIME && !c.currentStatus.Load().(bool) { | |
log.Info("Try to Recover Unack Messages!") | |
c.currentStatus.Store(true) | |
c.lastRecoverTime = currentTime | |
c.channel.Recover(true) | |
} | |
} else { | |
// this really a litter dangerous. if the worker is panic very quickly, | |
// it will ddos our sentry server......plz, add [retry-ttl] in header. | |
//msg.Nack(false, true) | |
c.currentStatus.Store(false) | |
} | |
}).Catch(func(e try.E) { | |
log.Error(e) | |
}) | |
} | |
}() | |
} | |
// Go into reconnect loop when | |
// c.done is passed non nil values | |
if <-c.done != nil { | |
c.currentStatus.Store(false) | |
retryTime := 1 | |
for { | |
deliveries, err = c.ReConnect(queue, routingKey, retryTime) | |
if err != nil { | |
log.Error("Reconnecting Error") | |
retryTime += 1 | |
} else { | |
break | |
} | |
} | |
} | |
log.Info("Reconnected!!!") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment