package queue import ( "context" "encoding/json" "time" "git.ostiwe.com/ostiwe-com/status/modules/log" "git.ostiwe.com/ostiwe-com/status/modules/queue/dto" "git.ostiwe.com/ostiwe-com/status/modules/queue/tasks" rabbitModule "git.ostiwe.com/ostiwe-com/status/modules/rabbit" "git.ostiwe.com/ostiwe-com/status/repository" amqp "github.com/rabbitmq/amqp091-go" ) func InitQueues() { rabbitModule.InitConnection() err := flushQueues() if err != nil { panic(err) } if err = createTasksForServices(); err != nil { panic(err) } log.Global.Get(log.SYSTEM).Info("Wait 5 seconds before start listen task queue") time.Sleep(time.Second * 5) if err = listenTaskQueue(); err != nil { panic(err) } } func createTasksForServices() error { log.Global.Get(log.SYSTEM).Info("Create tasks ping for services") serviceRepository := repository.NewServiceRepository() services, err := serviceRepository.All(context.Background(), -1, 0, false) if err != nil { return err } for i := range services { pingPayload, marshallErr := json.Marshal(dto.PingMessage{ServiceID: services[i].ID}) if marshallErr != nil { return marshallErr } payload := dto.TaskMessage{ Task: "ping", Payload: string(pingPayload), } payloadMsg, marshallErr := json.Marshal(payload) if marshallErr != nil { return marshallErr } publishErr := rabbitModule.Channel.Publish( "", "tasks", false, false, amqp.Publishing{Body: payloadMsg}, ) if publishErr != nil { return publishErr } } return nil } func flushQueues() error { log.Global.Get(log.SYSTEM).Info("Flush queues") _, err := rabbitModule.Channel.QueuePurge("tasks", false) if err != nil { return err } return nil } func listenTaskQueue() error { ctx := context.Background() consume, err := rabbitModule.Channel.Consume( "tasks", "tasks-consumer", false, false, false, false, nil, ) if err != nil { return err } taskCollection := tasks.CollectionMap() for delivery := range consume { go handleQueueMessage(ctx, delivery, taskCollection) } return nil } func handleQueueMessage(ctx context.Context, delivery amqp.Delivery, taskCollection map[string]tasks.Task) { queueLog := log.Global.Get(log.QUEUE) var taskMsg dto.TaskMessage unmarshalErr := json.Unmarshal(delivery.Body, &taskMsg) if unmarshalErr != nil { queueLog.WithField("body", string(delivery.Body)).Error("Failed unmarshal queue message - ", unmarshalErr) rejectErr := delivery.Reject(false) if rejectErr != nil { queueLog.Error("Failed reject message - ", rejectErr) } return } task, ok := taskCollection[taskMsg.Task] if !ok { queueLog.Error("Undefined task with name - ", taskMsg.Task) rejectErr := delivery.Reject(false) if rejectErr != nil { queueLog.Error("Failed reject message - ", rejectErr) } return } payload := []byte(taskMsg.Payload) handleErr := task.Handle(ctx, payload) if handleErr != nil { queueLog.Error("Task handler return error - ", handleErr) fallback, fallbackErr := task.Fallback(ctx, handleErr) if fallbackErr != nil { queueLog.Error("Task failed with error - ", handleErr, "; Task fallback handler also return error - ", fallbackErr, "; Reject message.") rejectErr := delivery.Reject(false) if rejectErr != nil { queueLog.Error("Failed reject message - ", rejectErr) } return } if !fallback { queueLog.Error("Task fallback unsuccessful, reject message") rejectErr := delivery.Reject(false) if rejectErr != nil { queueLog.Error("Failed reject message - ", rejectErr) } return } ackErr := delivery.Ack(false) if ackErr != nil { queueLog.Error("Failed acknowledge message - ", ackErr) } return } ackErr := delivery.Ack(false) if ackErr != nil { queueLog.Error("Failed acknowledge message - ", ackErr) } afterHandleErr := task.AfterHandle(ctx, payload) if afterHandleErr != nil { queueLog.Error("Task after handler hook failed with error - ", afterHandleErr) } }