diff --git a/pkg/rabbit/connection.go b/pkg/rabbit/connection.go new file mode 100644 index 0000000..63c5fc1 --- /dev/null +++ b/pkg/rabbit/connection.go @@ -0,0 +1,98 @@ +package rabbit + +import ( + "fmt" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/sirupsen/logrus" +) + +type ConnectionArgs struct { + Host string + Password string + User string + Port string + ReconnectInterval time.Duration + Logger *logrus.Logger +} + +type Connection struct { + conn *amqp.Connection + connString string + + isClosed bool + closeNotify chan *amqp.Error + logger *logrus.Logger + reconnectInterval time.Duration +} + +func NewConnection(args ConnectionArgs) (*Connection, error) { + connString := fmt.Sprintf( + "amqp://%s:%s@%s:%s/", + args.User, + args.Password, + args.Host, + args.Port, + ) + + dial, err := amqp.Dial(connString) + if err != nil { + return nil, err + } + + c := &Connection{ + conn: dial, + connString: connString, + closeNotify: dial.NotifyClose(make(chan *amqp.Error)), + logger: args.Logger, + reconnectInterval: args.ReconnectInterval, + } + + if args.ReconnectInterval > 0 { + go c.listenCloseNotify() + } + + return c, nil +} + +func (c *Connection) Channel() (*amqp.Channel, error) { + return c.conn.Channel() +} + +func (c *Connection) IsAlive() bool { + return c.isClosed +} + +func (c *Connection) listenCloseNotify() { + if c.reconnectInterval <= 0 { + return + } + + c.logger.Info(fmt.Sprintf( + "Start listening close notify channel, with %s interval", + c.reconnectInterval.String(), + )) + + for { + select { + case <-c.closeNotify: + c.logger.Info("Trying to reconnect to rabbit") + + dial, dialErr := amqp.Dial(c.connString) + if dialErr != nil { + c.logger.Error("Error during reconnect try - ", dialErr) + + continue + } + + c.conn = dial + + c.logger.Info("Rabbit connection stabilized") + + break + } + + time.Sleep(c.reconnectInterval) + } +}