feat(queue): Add queue module
This commit is contained in:
187
modules/queue/queue.go
Normal file
187
modules/queue/queue.go
Normal file
@@ -0,0 +1,187 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"git.ostiwe.com/ostiwe-com/status/modules/log"
|
||||
"git.ostiwe.com/ostiwe-com/status/modules/queue/dto"
|
||||
"git.ostiwe.com/ostiwe-com/status/modules/queue/tasks"
|
||||
rabbitModule "git.ostiwe.com/ostiwe-com/status/modules/rabbit"
|
||||
"git.ostiwe.com/ostiwe-com/status/repository"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
func InitQueues() {
|
||||
rabbitModule.InitConnection()
|
||||
|
||||
err := flushQueues()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err = createTasksForServices(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
log.Global.Get(log.SYSTEM).Info("Wait 5 seconds before start listen task queue")
|
||||
time.Sleep(time.Second * 5)
|
||||
|
||||
if err = listenTaskQueue(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func createTasksForServices() error {
|
||||
log.Global.Get(log.SYSTEM).Info("Create tasks ping for services")
|
||||
|
||||
serviceRepository := repository.NewServiceRepository()
|
||||
|
||||
services, err := serviceRepository.All(context.Background(), -1, 0, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := range services {
|
||||
pingPayload, marshallErr := json.Marshal(dto.PingMessage{ServiceID: services[i].ID})
|
||||
if marshallErr != nil {
|
||||
return marshallErr
|
||||
}
|
||||
|
||||
payload := dto.TaskMessage{
|
||||
Task: "ping",
|
||||
Payload: string(pingPayload),
|
||||
}
|
||||
|
||||
payloadMsg, marshallErr := json.Marshal(payload)
|
||||
if marshallErr != nil {
|
||||
return marshallErr
|
||||
}
|
||||
|
||||
publishErr := rabbitModule.Channel.Publish(
|
||||
"",
|
||||
"tasks",
|
||||
false,
|
||||
false,
|
||||
amqp.Publishing{Body: payloadMsg},
|
||||
)
|
||||
if publishErr != nil {
|
||||
return publishErr
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func flushQueues() error {
|
||||
log.Global.Get(log.SYSTEM).Info("Flush queues")
|
||||
|
||||
_, err := rabbitModule.Channel.QueuePurge("tasks", false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func listenTaskQueue() error {
|
||||
ctx := context.Background()
|
||||
|
||||
consume, err := rabbitModule.Channel.Consume(
|
||||
"tasks",
|
||||
"tasks-consumer",
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
taskCollection := tasks.CollectionMap()
|
||||
|
||||
for delivery := range consume {
|
||||
go handleQueueMessage(ctx, delivery, taskCollection)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleQueueMessage(ctx context.Context, delivery amqp.Delivery, taskCollection map[string]tasks.Task) {
|
||||
queueLog := log.Global.Get(log.QUEUE)
|
||||
|
||||
var taskMsg dto.TaskMessage
|
||||
unmarshalErr := json.Unmarshal(delivery.Body, &taskMsg)
|
||||
if unmarshalErr != nil {
|
||||
queueLog.WithField("body", string(delivery.Body)).Error("Failed unmarshal queue message - ", unmarshalErr)
|
||||
|
||||
rejectErr := delivery.Reject(false)
|
||||
if rejectErr != nil {
|
||||
queueLog.Error("Failed reject message - ", rejectErr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
task, ok := taskCollection[taskMsg.Task]
|
||||
|
||||
if !ok {
|
||||
queueLog.Error("Undefined task with name - ", taskMsg.Task)
|
||||
rejectErr := delivery.Reject(false)
|
||||
if rejectErr != nil {
|
||||
queueLog.Error("Failed reject message - ", rejectErr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
payload := []byte(taskMsg.Payload)
|
||||
|
||||
handleErr := task.Handle(ctx, payload)
|
||||
if handleErr != nil {
|
||||
queueLog.Error("Task handler return error - ", handleErr)
|
||||
|
||||
fallback, fallbackErr := task.Fallback(ctx, handleErr)
|
||||
if fallbackErr != nil {
|
||||
queueLog.Error("Task failed with error - ", handleErr, "; Task fallback handler also return error - ", fallbackErr, "; Reject message.")
|
||||
|
||||
rejectErr := delivery.Reject(false)
|
||||
if rejectErr != nil {
|
||||
queueLog.Error("Failed reject message - ", rejectErr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if !fallback {
|
||||
queueLog.Error("Task fallback unsuccessful, reject message")
|
||||
|
||||
rejectErr := delivery.Reject(false)
|
||||
if rejectErr != nil {
|
||||
queueLog.Error("Failed reject message - ", rejectErr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
ackErr := delivery.Ack(false)
|
||||
if ackErr != nil {
|
||||
queueLog.Error("Failed acknowledge message - ", ackErr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
ackErr := delivery.Ack(false)
|
||||
if ackErr != nil {
|
||||
queueLog.Error("Failed acknowledge message - ", ackErr)
|
||||
}
|
||||
|
||||
afterHandleErr := task.AfterHandle(ctx, payload)
|
||||
if afterHandleErr != nil {
|
||||
queueLog.Error("Task after handler hook failed with error - ", afterHandleErr)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user