Created
April 9, 2021 10:28
-
-
Save ms2008/e291ffedbb094e59057689606b87f885 to your computer and use it in GitHub Desktop.
rabbitmq producer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package rabbitmq | |
import ( | |
"log" | |
"time" | |
"github.com/pborman/uuid" | |
"github.com/streadway/amqp" | |
) | |
type Producer struct { | |
conn *amqp.Connection | |
channel *amqp.Channel | |
connNotify chan *amqp.Error | |
channelNotify chan *amqp.Error | |
quit chan struct{} | |
addr string | |
exchange string | |
routingKey string | |
} | |
func NewProducer(addr, exchange string) *Producer { | |
p := &Producer{ | |
addr: addr, | |
exchange: exchange, | |
routingKey: "", | |
quit: make(chan struct{}), | |
} | |
return p | |
} | |
func (p *Producer) Start() error { | |
if err := p.Run(); err != nil { | |
return err | |
} | |
go p.ReConnect() | |
return nil | |
} | |
func (p *Producer) Stop() { | |
close(p.quit) | |
if !p.conn.IsClosed() { | |
if err := p.conn.Close(); err != nil { | |
log.Println("rabbitmq producer - connection close failed: ", err) | |
} | |
} | |
} | |
func (p *Producer) Run() error { | |
var err error | |
if p.conn, err = amqp.Dial(p.addr); err != nil { | |
return err | |
} | |
if p.channel, err = p.conn.Channel(); err != nil { | |
p.conn.Close() | |
return err | |
} | |
p.connNotify = p.conn.NotifyClose(make(chan *amqp.Error)) | |
p.channelNotify = p.channel.NotifyClose(make(chan *amqp.Error)) | |
return err | |
} | |
func (p *Producer) ReConnect() { | |
for { | |
select { | |
case err := <-p.connNotify: | |
if err != nil { | |
log.Println("rabbitmq producer - connection NotifyClose: ", err) | |
} | |
case err := <-p.channelNotify: | |
if err != nil { | |
log.Println("rabbitmq producer - channel NotifyClose: ", err) | |
} | |
case <-p.quit: | |
return | |
} | |
// backstop | |
if !p.conn.IsClosed() { | |
if err := p.conn.Close(); err != nil { | |
log.Println("rabbitmq producer - connection close failed: ", err) | |
} | |
} | |
// IMPORTANT: 必须清空 Notify,否则死连接不会释放 | |
for err := range p.channelNotify { | |
log.Println(err) | |
} | |
for err := range p.connNotify { | |
log.Println(err) | |
} | |
quit: | |
for { | |
select { | |
case <-p.quit: | |
return | |
default: | |
log.Println("rabbitmq producer - reconnect") | |
if err := p.Run(); err != nil { | |
log.Println("rabbitmq producer - failCheck: ", err) | |
// sleep 5s reconnect | |
time.Sleep(time.Second * 5) | |
continue | |
} | |
break quit | |
} | |
} | |
} | |
} | |
func (p *Producer) Publish(msg []byte) error { | |
return p.channel.Publish( | |
p.exchange, // exchange | |
p.routingKey, // routing key | |
false, // mandatory | |
false, // immediate | |
amqp.Publishing{ | |
ContentType: "text/plain", | |
MessageId: uuid.New(), | |
Type: "", | |
Body: msg, | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Stop() may be not right
first close(p.quit)
when run conn.close() conn.NotifyClose may be block