diff --git a/modules/queue/queue.go b/modules/queue/queue.go new file mode 100644 index 0000000..eb44fd7 --- /dev/null +++ b/modules/queue/queue.go @@ -0,0 +1,187 @@ +package queue + +import ( + "context" + "encoding/json" + "time" + + "git.ostiwe.com/ostiwe-com/status/modules/log" + "git.ostiwe.com/ostiwe-com/status/modules/queue/dto" + "git.ostiwe.com/ostiwe-com/status/modules/queue/tasks" + rabbitModule "git.ostiwe.com/ostiwe-com/status/modules/rabbit" + "git.ostiwe.com/ostiwe-com/status/repository" + amqp "github.com/rabbitmq/amqp091-go" +) + +func InitQueues() { + rabbitModule.InitConnection() + + err := flushQueues() + if err != nil { + panic(err) + } + + if err = createTasksForServices(); err != nil { + panic(err) + } + + log.Global.Get(log.SYSTEM).Info("Wait 5 seconds before start listen task queue") + time.Sleep(time.Second * 5) + + if err = listenTaskQueue(); err != nil { + panic(err) + } +} + +func createTasksForServices() error { + log.Global.Get(log.SYSTEM).Info("Create tasks ping for services") + + serviceRepository := repository.NewServiceRepository() + + services, err := serviceRepository.All(context.Background(), -1, 0, false) + if err != nil { + return err + } + + for i := range services { + pingPayload, marshallErr := json.Marshal(dto.PingMessage{ServiceID: services[i].ID}) + if marshallErr != nil { + return marshallErr + } + + payload := dto.TaskMessage{ + Task: "ping", + Payload: string(pingPayload), + } + + payloadMsg, marshallErr := json.Marshal(payload) + if marshallErr != nil { + return marshallErr + } + + publishErr := rabbitModule.Channel.Publish( + "", + "tasks", + false, + false, + amqp.Publishing{Body: payloadMsg}, + ) + if publishErr != nil { + return publishErr + } + } + + return nil +} + +func flushQueues() error { + log.Global.Get(log.SYSTEM).Info("Flush queues") + + _, err := rabbitModule.Channel.QueuePurge("tasks", false) + if err != nil { + return err + } + + return nil +} + +func listenTaskQueue() error { + ctx := context.Background() + + consume, err := rabbitModule.Channel.Consume( + "tasks", + "tasks-consumer", + false, + false, + false, + false, + nil, + ) + if err != nil { + return err + } + + taskCollection := tasks.CollectionMap() + + for delivery := range consume { + go handleQueueMessage(ctx, delivery, taskCollection) + } + + return nil +} + +func handleQueueMessage(ctx context.Context, delivery amqp.Delivery, taskCollection map[string]tasks.Task) { + queueLog := log.Global.Get(log.QUEUE) + + var taskMsg dto.TaskMessage + unmarshalErr := json.Unmarshal(delivery.Body, &taskMsg) + if unmarshalErr != nil { + queueLog.WithField("body", string(delivery.Body)).Error("Failed unmarshal queue message - ", unmarshalErr) + + rejectErr := delivery.Reject(false) + if rejectErr != nil { + queueLog.Error("Failed reject message - ", rejectErr) + } + + return + } + + task, ok := taskCollection[taskMsg.Task] + + if !ok { + queueLog.Error("Undefined task with name - ", taskMsg.Task) + rejectErr := delivery.Reject(false) + if rejectErr != nil { + queueLog.Error("Failed reject message - ", rejectErr) + } + + return + } + + payload := []byte(taskMsg.Payload) + + handleErr := task.Handle(ctx, payload) + if handleErr != nil { + queueLog.Error("Task handler return error - ", handleErr) + + fallback, fallbackErr := task.Fallback(ctx, handleErr) + if fallbackErr != nil { + queueLog.Error("Task failed with error - ", handleErr, "; Task fallback handler also return error - ", fallbackErr, "; Reject message.") + + rejectErr := delivery.Reject(false) + if rejectErr != nil { + queueLog.Error("Failed reject message - ", rejectErr) + } + + return + } + + if !fallback { + queueLog.Error("Task fallback unsuccessful, reject message") + + rejectErr := delivery.Reject(false) + if rejectErr != nil { + queueLog.Error("Failed reject message - ", rejectErr) + } + + return + } + + ackErr := delivery.Ack(false) + if ackErr != nil { + queueLog.Error("Failed acknowledge message - ", ackErr) + } + + return + } + + ackErr := delivery.Ack(false) + if ackErr != nil { + queueLog.Error("Failed acknowledge message - ", ackErr) + } + + afterHandleErr := task.AfterHandle(ctx, payload) + if afterHandleErr != nil { + queueLog.Error("Task after handler hook failed with error - ", afterHandleErr) + } +}