package rabbit import ( "fmt" "time" amqp "github.com/rabbitmq/amqp091-go" "github.com/sirupsen/logrus" ) type ConnectionArgs struct { Host string Password string User string Port string ReconnectInterval time.Duration Logger *logrus.Logger } type Connection struct { conn *amqp.Connection connString string isClosed bool closeNotify chan *amqp.Error logger *logrus.Logger reconnectInterval time.Duration } func NewConnection(args ConnectionArgs) (*Connection, error) { connString := fmt.Sprintf( "amqp://%s:%s@%s:%s/", args.User, args.Password, args.Host, args.Port, ) dial, err := amqp.Dial(connString) if err != nil { return nil, err } c := &Connection{ conn: dial, connString: connString, closeNotify: dial.NotifyClose(make(chan *amqp.Error)), logger: args.Logger, reconnectInterval: args.ReconnectInterval, } if args.ReconnectInterval > 0 { go c.listenCloseNotify() } return c, nil } func (c *Connection) Channel() (*amqp.Channel, error) { return c.conn.Channel() } func (c *Connection) IsAlive() bool { return c.isClosed } func (c *Connection) listenCloseNotify() { if c.reconnectInterval <= 0 { return } c.logger.Info(fmt.Sprintf( "Start listening close notify channel, with %s interval", c.reconnectInterval.String(), )) for { select { case <-c.closeNotify: c.logger.Info("Trying to reconnect to rabbit") dial, dialErr := amqp.Dial(c.connString) if dialErr != nil { c.logger.Error("Error during reconnect try - ", dialErr) continue } c.conn = dial c.logger.Info("Rabbit connection stabilized") break } time.Sleep(c.reconnectInterval) } }