feat(rabbitmq): Add tasks for processing
This commit is contained in:
24
modules/queue/tasks/collection.go
Normal file
24
modules/queue/tasks/collection.go
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
package tasks
|
||||||
|
|
||||||
|
func Collection() []Task {
|
||||||
|
return []Task{
|
||||||
|
PingTask,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func CollectionMap() map[string]Task {
|
||||||
|
collectionMap := make(map[string]Task)
|
||||||
|
|
||||||
|
for _, task := range Collection() {
|
||||||
|
if task.Fallback == nil {
|
||||||
|
task.Fallback = DefaultFallbackFn
|
||||||
|
}
|
||||||
|
if task.AfterHandle == nil {
|
||||||
|
task.AfterHandle = DefaultAfterHandleFn
|
||||||
|
}
|
||||||
|
|
||||||
|
collectionMap[task.Name] = task
|
||||||
|
}
|
||||||
|
|
||||||
|
return collectionMap
|
||||||
|
}
|
||||||
92
modules/queue/tasks/ping.go
Normal file
92
modules/queue/tasks/ping.go
Normal file
@@ -0,0 +1,92 @@
|
|||||||
|
package tasks
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.ostiwe.com/ostiwe-com/status/model"
|
||||||
|
"git.ostiwe.com/ostiwe-com/status/modules/log"
|
||||||
|
"git.ostiwe.com/ostiwe-com/status/modules/queue/dto"
|
||||||
|
rabbitModule "git.ostiwe.com/ostiwe-com/status/modules/rabbit"
|
||||||
|
"git.ostiwe.com/ostiwe-com/status/repository"
|
||||||
|
"git.ostiwe.com/ostiwe-com/status/service"
|
||||||
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
var PingTask = Task{
|
||||||
|
Name: "ping",
|
||||||
|
Handle: func(ctx context.Context, msgBody []byte) error {
|
||||||
|
logger := log.Global.Get(log.QUEUE)
|
||||||
|
var msg dto.PingMessage
|
||||||
|
|
||||||
|
err := json.Unmarshal(msgBody, &msg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("Received new ping task for service - ", msg.ServiceID)
|
||||||
|
serviceRepository := repository.NewServiceRepository()
|
||||||
|
|
||||||
|
srv, err := serviceRepository.Find(ctx, msg.ServiceID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var lastStatus = &model.Status{Status: model.StatusWarn}
|
||||||
|
if len(srv.Statuses) > 0 {
|
||||||
|
lastStatus = &srv.Statuses[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
checker := service.NewCheck()
|
||||||
|
|
||||||
|
err = checker.Observe(ctx, srv)
|
||||||
|
if err != nil {
|
||||||
|
newStatus := model.StatusWarn
|
||||||
|
|
||||||
|
if lastStatus.Status == model.StatusWarn && srv.CountLastStatuses(model.StatusWarn) >= 10 {
|
||||||
|
newStatus = model.StatusFailed
|
||||||
|
}
|
||||||
|
|
||||||
|
if lastStatus.Status == model.StatusFailed {
|
||||||
|
newStatus = model.StatusFailed
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("Observe service with id ", msg.ServiceID, " failed - ", err, " setting [", newStatus, "] status")
|
||||||
|
|
||||||
|
regErr := checker.RegisterStatus(ctx, msg.ServiceID, newStatus)
|
||||||
|
if regErr != nil {
|
||||||
|
logger.Info("Setting status for service with id ", msg.ServiceID, " failed - ", err)
|
||||||
|
|
||||||
|
return regErr
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return checker.RegisterStatus(ctx, msg.ServiceID, model.StatusOK)
|
||||||
|
},
|
||||||
|
AfterHandle: func(ctx context.Context, msgBody []byte) error {
|
||||||
|
time.Sleep(time.Second * 30)
|
||||||
|
|
||||||
|
payload := dto.TaskMessage{
|
||||||
|
Task: "ping",
|
||||||
|
Payload: string(msgBody),
|
||||||
|
}
|
||||||
|
|
||||||
|
payloadMsg, err := json.Marshal(payload)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rabbitModule.Channel.Publish(
|
||||||
|
"",
|
||||||
|
"tasks",
|
||||||
|
false,
|
||||||
|
false,
|
||||||
|
amqp.Publishing{
|
||||||
|
Body: payloadMsg,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
},
|
||||||
|
}
|
||||||
42
modules/queue/tasks/tasks.go
Normal file
42
modules/queue/tasks/tasks.go
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
package tasks
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"git.ostiwe.com/ostiwe-com/status/modules/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
DefaultFallbackFn = func(_ context.Context, _ error) (bool, error) {
|
||||||
|
log.Global.Get(log.QUEUE).Info("Default fallback function triggered")
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
DefaultAfterHandleFn = func(_ context.Context, _ []byte) error {
|
||||||
|
log.Global.Get(log.QUEUE).Info("Default after handler function triggered")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
// Task represents a task that can be executed in the queue.
|
||||||
|
//
|
||||||
|
// Name returns the name of the task, used for logging and identification purposes.
|
||||||
|
//
|
||||||
|
// Handle processes a message from the queue. It takes a context and the raw message body ([]byte) as arguments.
|
||||||
|
// If handling fails, it should return an error that can be caught in the Fallback method.
|
||||||
|
//
|
||||||
|
// AfterHandle performs some action after successfully processing the message with Handle.
|
||||||
|
// It takes the same arguments as Handle: a context and the raw message body ([]byte).
|
||||||
|
// Default is DefaultAfterHandleFn
|
||||||
|
//
|
||||||
|
// Fallback handles errors that occur during the execution of the Handle method.
|
||||||
|
// It takes a context and an error as arguments. The method should return a boolean indicating whether the error was handled,
|
||||||
|
// and error for additional logging or debugging information.
|
||||||
|
// Default is DefaultFallbackFn
|
||||||
|
type Task struct {
|
||||||
|
Name string
|
||||||
|
Handle func(ctx context.Context, msgBody []byte) error
|
||||||
|
AfterHandle func(ctx context.Context, msgBody []byte) error
|
||||||
|
Fallback func(ctx context.Context, err error) (bool, error)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user