From e9b9cf21bff789ca1ab0641c50764228e91bd49b Mon Sep 17 00:00:00 2001 From: ostiwe Date: Sun, 10 Aug 2025 22:21:22 +0300 Subject: [PATCH] feat(rabbitmq): Add tasks for processing --- modules/queue/tasks/collection.go | 24 ++++++++ modules/queue/tasks/ping.go | 92 +++++++++++++++++++++++++++++++ modules/queue/tasks/tasks.go | 42 ++++++++++++++ 3 files changed, 158 insertions(+) create mode 100644 modules/queue/tasks/collection.go create mode 100644 modules/queue/tasks/ping.go create mode 100644 modules/queue/tasks/tasks.go diff --git a/modules/queue/tasks/collection.go b/modules/queue/tasks/collection.go new file mode 100644 index 0000000..770eef3 --- /dev/null +++ b/modules/queue/tasks/collection.go @@ -0,0 +1,24 @@ +package tasks + +func Collection() []Task { + return []Task{ + PingTask, + } +} + +func CollectionMap() map[string]Task { + collectionMap := make(map[string]Task) + + for _, task := range Collection() { + if task.Fallback == nil { + task.Fallback = DefaultFallbackFn + } + if task.AfterHandle == nil { + task.AfterHandle = DefaultAfterHandleFn + } + + collectionMap[task.Name] = task + } + + return collectionMap +} diff --git a/modules/queue/tasks/ping.go b/modules/queue/tasks/ping.go new file mode 100644 index 0000000..5b8c6d4 --- /dev/null +++ b/modules/queue/tasks/ping.go @@ -0,0 +1,92 @@ +package tasks + +import ( + "context" + "encoding/json" + "time" + + "git.ostiwe.com/ostiwe-com/status/model" + "git.ostiwe.com/ostiwe-com/status/modules/log" + "git.ostiwe.com/ostiwe-com/status/modules/queue/dto" + rabbitModule "git.ostiwe.com/ostiwe-com/status/modules/rabbit" + "git.ostiwe.com/ostiwe-com/status/repository" + "git.ostiwe.com/ostiwe-com/status/service" + amqp "github.com/rabbitmq/amqp091-go" +) + +var PingTask = Task{ + Name: "ping", + Handle: func(ctx context.Context, msgBody []byte) error { + logger := log.Global.Get(log.QUEUE) + var msg dto.PingMessage + + err := json.Unmarshal(msgBody, &msg) + if err != nil { + return err + } + + logger.Info("Received new ping task for service - ", msg.ServiceID) + serviceRepository := repository.NewServiceRepository() + + srv, err := serviceRepository.Find(ctx, msg.ServiceID) + if err != nil { + return err + } + + var lastStatus = &model.Status{Status: model.StatusWarn} + if len(srv.Statuses) > 0 { + lastStatus = &srv.Statuses[0] + } + + checker := service.NewCheck() + + err = checker.Observe(ctx, srv) + if err != nil { + newStatus := model.StatusWarn + + if lastStatus.Status == model.StatusWarn && srv.CountLastStatuses(model.StatusWarn) >= 10 { + newStatus = model.StatusFailed + } + + if lastStatus.Status == model.StatusFailed { + newStatus = model.StatusFailed + } + + logger.Info("Observe service with id ", msg.ServiceID, " failed - ", err, " setting [", newStatus, "] status") + + regErr := checker.RegisterStatus(ctx, msg.ServiceID, newStatus) + if regErr != nil { + logger.Info("Setting status for service with id ", msg.ServiceID, " failed - ", err) + + return regErr + } + + return nil + } + + return checker.RegisterStatus(ctx, msg.ServiceID, model.StatusOK) + }, + AfterHandle: func(ctx context.Context, msgBody []byte) error { + time.Sleep(time.Second * 30) + + payload := dto.TaskMessage{ + Task: "ping", + Payload: string(msgBody), + } + + payloadMsg, err := json.Marshal(payload) + if err != nil { + return err + } + + return rabbitModule.Channel.Publish( + "", + "tasks", + false, + false, + amqp.Publishing{ + Body: payloadMsg, + }, + ) + }, +} diff --git a/modules/queue/tasks/tasks.go b/modules/queue/tasks/tasks.go new file mode 100644 index 0000000..05845fb --- /dev/null +++ b/modules/queue/tasks/tasks.go @@ -0,0 +1,42 @@ +package tasks + +import ( + "context" + + "git.ostiwe.com/ostiwe-com/status/modules/log" +) + +var ( + DefaultFallbackFn = func(_ context.Context, _ error) (bool, error) { + log.Global.Get(log.QUEUE).Info("Default fallback function triggered") + + return true, nil + } + DefaultAfterHandleFn = func(_ context.Context, _ []byte) error { + log.Global.Get(log.QUEUE).Info("Default after handler function triggered") + + return nil + } +) + +// Task represents a task that can be executed in the queue. +// +// Name returns the name of the task, used for logging and identification purposes. +// +// Handle processes a message from the queue. It takes a context and the raw message body ([]byte) as arguments. +// If handling fails, it should return an error that can be caught in the Fallback method. +// +// AfterHandle performs some action after successfully processing the message with Handle. +// It takes the same arguments as Handle: a context and the raw message body ([]byte). +// Default is DefaultAfterHandleFn +// +// Fallback handles errors that occur during the execution of the Handle method. +// It takes a context and an error as arguments. The method should return a boolean indicating whether the error was handled, +// and error for additional logging or debugging information. +// Default is DefaultFallbackFn +type Task struct { + Name string + Handle func(ctx context.Context, msgBody []byte) error + AfterHandle func(ctx context.Context, msgBody []byte) error + Fallback func(ctx context.Context, err error) (bool, error) +}