-
-
Save palash25/6d90978264224ea73a183b49b0eb318b to your computer and use it in GitHub Desktop.
rabbitmq client in golang.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package client | |
import ( | |
"context" | |
"crypto/md5" | |
"crypto/tls" | |
"encoding/json" | |
"errors" | |
"fmt" | |
"net" | |
"strings" | |
"sync" | |
"sync/atomic" | |
"time" | |
"github.com/streadway/amqp" | |
) | |
var ( | |
// beatChan = make(chan bool, 16) | |
beatTime = time.Second * 30 | |
pubTime = time.Second * 16 | |
tickTime = time.Second * 8 | |
messageTTL = int64(time.Hour / time.Millisecond) // TTL for message in queue | |
queueExpire = int64(time.Hour * 24 * 7 / time.Millisecond) // expire time for unused queue | |
errAck = errors.New("ack") | |
errNack = errors.New("nack") | |
errFull = errors.New("full") | |
errCancel = errors.New("cancel") | |
errTimeout = errors.New("timeout") | |
) | |
// Client for mq | |
type Client struct { | |
server *broker | |
device string | |
conn *amqp.Connection | |
ch *amqp.Channel | |
que amqp.Queue | |
msgs <-chan amqp.Delivery | |
destructor sync.Once | |
confirm chan amqp.Confirmation | |
pubChan chan *publishMsg | |
ctx context.Context | |
cancel context.CancelFunc | |
onPublish int32 | |
} | |
type topic struct { | |
chanName string | |
chanType string | |
keyPrefix string | |
} | |
type broker struct { | |
server string | |
ssl bool | |
vhost string | |
userPass string | |
ca string | |
cert string | |
key string | |
quePrefix string | |
topics []topic | |
} | |
// Close the client | |
func (clt *Client) Close() { | |
clt.destructor.Do(func() { | |
if clt.conn != nil { | |
clt.conn.Close() | |
clt.conn = nil | |
clt.ch = nil | |
} | |
clt.cancel() | |
}) | |
} | |
func (clt *Client) connInit(tlsConfig *tls.Config, server *broker, proxyAddr string) (err error) { | |
protocol := "amqp" | |
if server.ssl { | |
protocol = "amqps" | |
} | |
var conn *amqp.Connection | |
if len(proxyAddr) > 0 { | |
conn, err = amqp.DialConfig(protocol+"://"+server.userPass+"@"+server.server+"/"+server.vhost, | |
amqp.Config{ | |
Heartbeat: 10 * time.Second, | |
TLSClientConfig: tlsConfig, | |
Locale: "en_US", | |
Dial: func(network, addr string) (net.Conn, error) { | |
return net.Dial("tcp4", proxyAddr) | |
}}) | |
} else { | |
conn, err = amqp.DialTLS(protocol+"://"+server.userPass+"@"+server.server+"/"+server.vhost, tlsConfig) | |
} | |
if err != nil { | |
return | |
} | |
ch, err := conn.Channel() | |
if err != nil { | |
return | |
} | |
ch.Confirm(false) | |
clt.confirm = make(chan amqp.Confirmation, 16) | |
clt.confirm = ch.NotifyPublish(clt.confirm) | |
for _, topic := range server.topics { | |
err = ch.ExchangeDeclare( | |
topic.chanName, // name | |
topic.chanType, // type | |
true, // durable | |
false, // auto-deleted | |
false, // internal | |
false, // no-wait | |
nil, // arguments | |
) | |
if err != nil { | |
return | |
} | |
} | |
clt.conn = conn | |
clt.ch = ch | |
return | |
} | |
func (clt *Client) queInit(server *broker, ifFresh bool) (err error) { | |
var num int | |
ch := clt.ch | |
if ifFresh { | |
num, err = ch.QueueDelete( | |
server.quePrefix+"."+clt.device, | |
false, | |
false, | |
false, | |
) | |
if err != nil { | |
return | |
} | |
log.Println("[RABBITMQ_CLIENT]", clt.device, "queue deleted with", num, "message purged") | |
} | |
args := make(amqp.Table) | |
args["x-message-ttl"] = messageTTL | |
args["x-expires"] = queueExpire | |
q, err := ch.QueueDeclare( | |
server.quePrefix+"."+clt.device, // name | |
true, // durable | |
false, // delete when usused | |
false, // exclusive | |
false, // no-wait | |
args, // arguments | |
) | |
if err != nil { | |
return | |
} | |
for _, topic := range server.topics { | |
err = ch.QueueBind( | |
q.Name, | |
topic.keyPrefix+"."+clt.device, | |
topic.chanName, | |
false, | |
nil, | |
) | |
if err != nil { | |
return | |
} | |
} | |
clt.que = q | |
return | |
} | |
type publishMsg struct { | |
topicId int | |
keySuffix string | |
msg []byte | |
expire time.Duration | |
startTime time.Time | |
ctx context.Context | |
cancel context.CancelFunc | |
ackErr error | |
// ackChan chan error | |
} | |
func (clt *Client) sendPublish(topicId int, keySuffix string, msg []byte, expire time.Duration) error { | |
topic := &clt.server.topics[topicId] | |
if expire <= 0 { | |
return errors.New("Expiration parameter error") | |
} | |
return clt.ch.Publish(topic.chanName, topic.keyPrefix+"."+keySuffix, false, false, amqp.Publishing{ | |
ContentType: "text/plain", | |
Body: msg, | |
Expiration: fmt.Sprintf("%d", int64(expire/time.Millisecond)), | |
}) | |
} | |
// heartBeat publish beatmsg to topic through mq client. | |
// check the beatChan to identify whether the client connection is health. | |
func (clt *Client) heartBeat(topicId int, keySuffix string, beatChan chan bool) bool { | |
pubTimer := time.NewTimer(time.Second * 8) | |
var beatMsg struct { | |
T string `json:"t"` | |
Expired int64 `json:"expired"` | |
Msid string `json:"msid"` | |
} | |
beatMsg.T = "heartbeat" | |
beatMsg.Expired = time.Now().Add(beatTime).Unix() | |
uuid := md5.Sum([]byte("heartbeat-" + fmt.Sprintf("%d", beatMsg.Expired) + clt.device)) | |
beatMsg.Msid = fmt.Sprintf("%x-%x-%x-%x-%x", uuid[0:4], uuid[4:6], uuid[6:8], uuid[8:10], uuid[10:16]) | |
beatStr, _ := json.Marshal(beatMsg) | |
for i := 0; i < 3; i++ { | |
pubTimer.Reset(pubTime) | |
err := clt.Publish(topicId, true, keySuffix, beatTime, beatStr) | |
if err != nil { | |
log.Println("[RABBITMQ_CLIENT]", "heartBeat publish error:", err) | |
continue | |
} | |
select { | |
case <-beatChan: | |
return true | |
case <-pubTimer.C: | |
} | |
} | |
return false | |
} | |
func newConsumer(parent context.Context, msgProcess func(topic string, message []byte), server string, device string) *Client { | |
clt := new(Client) | |
clt.ctx, clt.cancel = context.WithCancel(parent) | |
clt.server = server | |
clt.device = device | |
tlsConfig, err := clt.server.getTLSconfig() | |
if err != nil { | |
log.Println("[RABBITMQ_CLIENT]", "tls ERROR:", err) | |
return nil | |
} | |
err = clt.connInit(tlsConfig, clt.server) | |
if err != nil { | |
log.Println("[RABBITMQ_CLIENT]", "connInit ERROR:", err) | |
return nil | |
} | |
err = clt.queInit(clt.server, false) | |
if err != nil { | |
clt.Close() | |
err = clt.connInit(tlsConfig, clt.server) | |
if err != nil { | |
log.Println("[RABBITMQ_CLIENT]", "connInit ERROR:", err) | |
return nil | |
} | |
err = clt.queInit(clt.server, true) | |
} | |
if err != nil { | |
clt.Close() | |
log.Println("[RABBITMQ_CLIENT]", "queInit ERROR:", err) | |
return nil | |
} | |
msgs, err := clt.ch.Consume( | |
clt.que.Name, // queue | |
clt.device, // consumer | |
false, // auto ack | |
false, // exclusive | |
false, // no local | |
false, // no wait | |
nil, // args | |
) | |
if err != nil { | |
clt.Close() | |
log.Println("[RABBITMQ_CLIENT]", "Start consume ERROR:", err) | |
return nil | |
} | |
clt.msgs = msgs | |
clt.pubChan = make(chan *publishMsg, 4) | |
go func() { | |
cc := make(chan *amqp.Error) | |
e := <-clt.ch.NotifyClose(cc) | |
log.Println("[RABBITMQ_CLIENT]", "channel close error:", e.Error()) | |
clt.cancel() | |
}() | |
go func() { | |
for d := range msgs { | |
msg := d.Body | |
msgProcess(d.Exchange, msg) | |
d.Ack(false) | |
} | |
}() | |
return clt | |
} | |
func (clt *Client) publishProc() { | |
ticker := time.NewTicker(tickTime) | |
deliveryMap := make(map[uint64]*publishMsg) | |
defer func() { | |
atomic.AddInt32(&clt.onPublish, -1) | |
ticker.Stop() | |
for _, msg := range deliveryMap { | |
msg.ackErr = errCancel | |
msg.cancel() | |
} | |
}() | |
var deliveryTag uint64 = 1 | |
var ackTag uint64 = 1 | |
var pMsg *publishMsg | |
for { | |
select { | |
case <-clt.ctx.Done(): | |
return | |
case pMsg = <-clt.pubChan: | |
pMsg.startTime = time.Now() | |
err := clt.sendPublish(pMsg.topicId, pMsg.keySuffix, pMsg.msg, pMsg.expire) | |
if err != nil { | |
pMsg.ackErr = err | |
pMsg.cancel() | |
} | |
deliveryMap[deliveryTag] = pMsg | |
deliveryTag++ | |
case c, ok := <-clt.confirm: | |
if !ok { | |
log.Println("[RABBITMQ_CLIENT]", "client Publish notify channel error") | |
return | |
} | |
pMsg = deliveryMap[c.DeliveryTag] | |
// fmt.Println("DeliveryTag:", c.DeliveryTag) | |
delete(deliveryMap, c.DeliveryTag) | |
if c.Ack { | |
pMsg.ackErr = nil | |
pMsg.cancel() | |
} else { | |
pMsg.ackErr = errNack | |
pMsg.cancel() | |
} | |
case <-ticker.C: | |
now := time.Now() | |
for { | |
if len(deliveryMap) == 0 { | |
break | |
} | |
pMsg = deliveryMap[ackTag] | |
if pMsg != nil { | |
if now.Sub(pMsg.startTime.Add(pubTime)) > 0 { | |
pMsg.ackErr = errTimeout | |
pMsg.cancel() | |
delete(deliveryMap, ackTag) | |
} else { | |
break | |
} | |
} | |
ackTag++ | |
} | |
} | |
} | |
} | |
// Publish used to send message to topic in messageQueue. | |
func (clt *Client) Publish(topicID int, keySuffix string, expire time.Duration, msg []byte) (err error) { | |
pMsg := publishMsg{ | |
topicId: topicID, | |
keySuffix: keySuffix, | |
expire: expire, | |
msg: msg, | |
} | |
// In the client, pubchan is pre-built, but only publishIc is created when there is publish | |
// If the exception occurs during the sending process, publishProc exits, at which point onPublish is set to zero, you can create a new publishProc again. | |
// There may be a residual msg in pubChan. If the new publishProc is not started in time, the processing of these messages is useless. | |
// If the current client is closed, pubchan will not close immediately (waiting for gc), and the publish that has entered the sending process will wait for a timeout. | |
if atomic.AddInt32(&clt.onPublish, 1) == 1 { | |
go clt.publishProc() | |
} else { | |
atomic.AddInt32(&clt.onPublish, -1) | |
} | |
timer := time.NewTimer(pubTime) | |
defer timer.Stop() | |
pMsg.ctx, pMsg.cancel = context.WithCancel(context.Background()) | |
defer pMsg.cancel() | |
select { | |
case <-timer.C: | |
err = errFull | |
return | |
case clt.pubChan <- &pMsg: | |
} | |
timer.Reset(pubTime) | |
select { | |
case <-pMsg.ctx.Done(): | |
err = pMsg.ackErr | |
break | |
case <-timer.C: | |
err = errTimeout | |
break | |
case <-clt.ctx.Done(): | |
err = errCancel | |
break | |
} | |
return | |
} | |
// NewPublisher init a Publisher of rabbitmq client | |
func NewPublisher(parent context.Context, server string) *Client { | |
clt := new(Client) | |
clt.ctx, clt.cancel = context.WithCancel(parent) | |
clt.server = server | |
tlsConfig, err := clt.server.getTLSconfig() | |
if err != nil { | |
log.Println("[RABBITMQ_CLIENT]", "tls ERROR:", err) | |
return nil | |
} | |
err = clt.connInit(tlsConfig, clt.server) | |
if err != nil { | |
log.Println("[RABBITMQ_CLIENT]", "connInit ERROR:", err) | |
return nil | |
} | |
return clt | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment