188 lines
3.9 KiB
Go
188 lines
3.9 KiB
Go
package queue
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"time"
|
|
|
|
"git.ostiwe.com/ostiwe-com/status/modules/log"
|
|
"git.ostiwe.com/ostiwe-com/status/modules/queue/dto"
|
|
"git.ostiwe.com/ostiwe-com/status/modules/queue/tasks"
|
|
rabbitModule "git.ostiwe.com/ostiwe-com/status/modules/rabbit"
|
|
"git.ostiwe.com/ostiwe-com/status/repository"
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
)
|
|
|
|
func InitQueues() {
|
|
rabbitModule.InitConnection()
|
|
|
|
err := flushQueues()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if err = createTasksForServices(); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
log.Global.Get(log.SYSTEM).Info("Wait 5 seconds before start listen task queue")
|
|
time.Sleep(time.Second * 5)
|
|
|
|
if err = listenTaskQueue(); err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func createTasksForServices() error {
|
|
log.Global.Get(log.SYSTEM).Info("Create tasks ping for services")
|
|
|
|
serviceRepository := repository.NewServiceRepository()
|
|
|
|
services, err := serviceRepository.All(context.Background(), -1, 0, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for i := range services {
|
|
pingPayload, marshallErr := json.Marshal(dto.PingMessage{ServiceID: services[i].ID})
|
|
if marshallErr != nil {
|
|
return marshallErr
|
|
}
|
|
|
|
payload := dto.TaskMessage{
|
|
Task: "ping",
|
|
Payload: string(pingPayload),
|
|
}
|
|
|
|
payloadMsg, marshallErr := json.Marshal(payload)
|
|
if marshallErr != nil {
|
|
return marshallErr
|
|
}
|
|
|
|
publishErr := rabbitModule.Channel.Publish(
|
|
"",
|
|
"tasks",
|
|
false,
|
|
false,
|
|
amqp.Publishing{Body: payloadMsg},
|
|
)
|
|
if publishErr != nil {
|
|
return publishErr
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func flushQueues() error {
|
|
log.Global.Get(log.SYSTEM).Info("Flush queues")
|
|
|
|
_, err := rabbitModule.Channel.QueuePurge("tasks", false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func listenTaskQueue() error {
|
|
ctx := context.Background()
|
|
|
|
consume, err := rabbitModule.Channel.Consume(
|
|
"tasks",
|
|
"tasks-consumer",
|
|
false,
|
|
false,
|
|
false,
|
|
false,
|
|
nil,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
taskCollection := tasks.CollectionMap()
|
|
|
|
for delivery := range consume {
|
|
go handleQueueMessage(ctx, delivery, taskCollection)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func handleQueueMessage(ctx context.Context, delivery amqp.Delivery, taskCollection map[string]tasks.Task) {
|
|
queueLog := log.Global.Get(log.QUEUE)
|
|
|
|
var taskMsg dto.TaskMessage
|
|
unmarshalErr := json.Unmarshal(delivery.Body, &taskMsg)
|
|
if unmarshalErr != nil {
|
|
queueLog.WithField("body", string(delivery.Body)).Error("Failed unmarshal queue message - ", unmarshalErr)
|
|
|
|
rejectErr := delivery.Reject(false)
|
|
if rejectErr != nil {
|
|
queueLog.Error("Failed reject message - ", rejectErr)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
task, ok := taskCollection[taskMsg.Task]
|
|
|
|
if !ok {
|
|
queueLog.Error("Undefined task with name - ", taskMsg.Task)
|
|
rejectErr := delivery.Reject(false)
|
|
if rejectErr != nil {
|
|
queueLog.Error("Failed reject message - ", rejectErr)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
payload := []byte(taskMsg.Payload)
|
|
|
|
handleErr := task.Handle(ctx, payload)
|
|
if handleErr != nil {
|
|
queueLog.Error("Task handler return error - ", handleErr)
|
|
|
|
fallback, fallbackErr := task.Fallback(ctx, handleErr)
|
|
if fallbackErr != nil {
|
|
queueLog.Error("Task failed with error - ", handleErr, "; Task fallback handler also return error - ", fallbackErr, "; Reject message.")
|
|
|
|
rejectErr := delivery.Reject(false)
|
|
if rejectErr != nil {
|
|
queueLog.Error("Failed reject message - ", rejectErr)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
if !fallback {
|
|
queueLog.Error("Task fallback unsuccessful, reject message")
|
|
|
|
rejectErr := delivery.Reject(false)
|
|
if rejectErr != nil {
|
|
queueLog.Error("Failed reject message - ", rejectErr)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
ackErr := delivery.Ack(false)
|
|
if ackErr != nil {
|
|
queueLog.Error("Failed acknowledge message - ", ackErr)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
ackErr := delivery.Ack(false)
|
|
if ackErr != nil {
|
|
queueLog.Error("Failed acknowledge message - ", ackErr)
|
|
}
|
|
|
|
afterHandleErr := task.AfterHandle(ctx, payload)
|
|
if afterHandleErr != nil {
|
|
queueLog.Error("Task after handler hook failed with error - ", afterHandleErr)
|
|
}
|
|
}
|