From 1958d9b3d94fe32d7c4fcefb66acb0cd475d98ec Mon Sep 17 00:00:00 2001 From: ostiwe Date: Tue, 28 Oct 2025 00:47:21 +0300 Subject: [PATCH] refactor: Remove unused code Rethinking queue operation, switching to cron --- modules/queue/dto/ping.go | 5 - modules/queue/dto/task.go | 6 - modules/queue/queue.go | 187 ------------------------------ modules/queue/tasks/collection.go | 24 ---- modules/queue/tasks/ping.go | 92 --------------- modules/queue/tasks/tasks.go | 42 ------- modules/rabbit/rabbit.go | 66 ----------- pkg/slice/chunk.go | 5 - 8 files changed, 427 deletions(-) delete mode 100644 modules/queue/dto/ping.go delete mode 100644 modules/queue/dto/task.go delete mode 100644 modules/queue/queue.go delete mode 100644 modules/queue/tasks/collection.go delete mode 100644 modules/queue/tasks/ping.go delete mode 100644 modules/queue/tasks/tasks.go delete mode 100644 modules/rabbit/rabbit.go delete mode 100644 pkg/slice/chunk.go diff --git a/modules/queue/dto/ping.go b/modules/queue/dto/ping.go deleted file mode 100644 index 373b436..0000000 --- a/modules/queue/dto/ping.go +++ /dev/null @@ -1,5 +0,0 @@ -package dto - -type PingMessage struct { - ServiceID int `json:"serviceID"` -} diff --git a/modules/queue/dto/task.go b/modules/queue/dto/task.go deleted file mode 100644 index 27a1902..0000000 --- a/modules/queue/dto/task.go +++ /dev/null @@ -1,6 +0,0 @@ -package dto - -type TaskMessage struct { - Task string `json:"task"` - Payload string `json:"payload"` -} diff --git a/modules/queue/queue.go b/modules/queue/queue.go deleted file mode 100644 index eb44fd7..0000000 --- a/modules/queue/queue.go +++ /dev/null @@ -1,187 +0,0 @@ -package queue - -import ( - "context" - "encoding/json" - "time" - - "git.ostiwe.com/ostiwe-com/status/modules/log" - "git.ostiwe.com/ostiwe-com/status/modules/queue/dto" - "git.ostiwe.com/ostiwe-com/status/modules/queue/tasks" - rabbitModule "git.ostiwe.com/ostiwe-com/status/modules/rabbit" - "git.ostiwe.com/ostiwe-com/status/repository" - amqp "github.com/rabbitmq/amqp091-go" -) - -func InitQueues() { - rabbitModule.InitConnection() - - err := flushQueues() - if err != nil { - panic(err) - } - - if err = createTasksForServices(); err != nil { - panic(err) - } - - log.Global.Get(log.SYSTEM).Info("Wait 5 seconds before start listen task queue") - time.Sleep(time.Second * 5) - - if err = listenTaskQueue(); err != nil { - panic(err) - } -} - -func createTasksForServices() error { - log.Global.Get(log.SYSTEM).Info("Create tasks ping for services") - - serviceRepository := repository.NewServiceRepository() - - services, err := serviceRepository.All(context.Background(), -1, 0, false) - if err != nil { - return err - } - - for i := range services { - pingPayload, marshallErr := json.Marshal(dto.PingMessage{ServiceID: services[i].ID}) - if marshallErr != nil { - return marshallErr - } - - payload := dto.TaskMessage{ - Task: "ping", - Payload: string(pingPayload), - } - - payloadMsg, marshallErr := json.Marshal(payload) - if marshallErr != nil { - return marshallErr - } - - publishErr := rabbitModule.Channel.Publish( - "", - "tasks", - false, - false, - amqp.Publishing{Body: payloadMsg}, - ) - if publishErr != nil { - return publishErr - } - } - - return nil -} - -func flushQueues() error { - log.Global.Get(log.SYSTEM).Info("Flush queues") - - _, err := rabbitModule.Channel.QueuePurge("tasks", false) - if err != nil { - return err - } - - return nil -} - -func listenTaskQueue() error { - ctx := context.Background() - - consume, err := rabbitModule.Channel.Consume( - "tasks", - "tasks-consumer", - false, - false, - false, - false, - nil, - ) - if err != nil { - return err - } - - taskCollection := tasks.CollectionMap() - - for delivery := range consume { - go handleQueueMessage(ctx, delivery, taskCollection) - } - - return nil -} - -func handleQueueMessage(ctx context.Context, delivery amqp.Delivery, taskCollection map[string]tasks.Task) { - queueLog := log.Global.Get(log.QUEUE) - - var taskMsg dto.TaskMessage - unmarshalErr := json.Unmarshal(delivery.Body, &taskMsg) - if unmarshalErr != nil { - queueLog.WithField("body", string(delivery.Body)).Error("Failed unmarshal queue message - ", unmarshalErr) - - rejectErr := delivery.Reject(false) - if rejectErr != nil { - queueLog.Error("Failed reject message - ", rejectErr) - } - - return - } - - task, ok := taskCollection[taskMsg.Task] - - if !ok { - queueLog.Error("Undefined task with name - ", taskMsg.Task) - rejectErr := delivery.Reject(false) - if rejectErr != nil { - queueLog.Error("Failed reject message - ", rejectErr) - } - - return - } - - payload := []byte(taskMsg.Payload) - - handleErr := task.Handle(ctx, payload) - if handleErr != nil { - queueLog.Error("Task handler return error - ", handleErr) - - fallback, fallbackErr := task.Fallback(ctx, handleErr) - if fallbackErr != nil { - queueLog.Error("Task failed with error - ", handleErr, "; Task fallback handler also return error - ", fallbackErr, "; Reject message.") - - rejectErr := delivery.Reject(false) - if rejectErr != nil { - queueLog.Error("Failed reject message - ", rejectErr) - } - - return - } - - if !fallback { - queueLog.Error("Task fallback unsuccessful, reject message") - - rejectErr := delivery.Reject(false) - if rejectErr != nil { - queueLog.Error("Failed reject message - ", rejectErr) - } - - return - } - - ackErr := delivery.Ack(false) - if ackErr != nil { - queueLog.Error("Failed acknowledge message - ", ackErr) - } - - return - } - - ackErr := delivery.Ack(false) - if ackErr != nil { - queueLog.Error("Failed acknowledge message - ", ackErr) - } - - afterHandleErr := task.AfterHandle(ctx, payload) - if afterHandleErr != nil { - queueLog.Error("Task after handler hook failed with error - ", afterHandleErr) - } -} diff --git a/modules/queue/tasks/collection.go b/modules/queue/tasks/collection.go deleted file mode 100644 index 770eef3..0000000 --- a/modules/queue/tasks/collection.go +++ /dev/null @@ -1,24 +0,0 @@ -package tasks - -func Collection() []Task { - return []Task{ - PingTask, - } -} - -func CollectionMap() map[string]Task { - collectionMap := make(map[string]Task) - - for _, task := range Collection() { - if task.Fallback == nil { - task.Fallback = DefaultFallbackFn - } - if task.AfterHandle == nil { - task.AfterHandle = DefaultAfterHandleFn - } - - collectionMap[task.Name] = task - } - - return collectionMap -} diff --git a/modules/queue/tasks/ping.go b/modules/queue/tasks/ping.go deleted file mode 100644 index 5b8c6d4..0000000 --- a/modules/queue/tasks/ping.go +++ /dev/null @@ -1,92 +0,0 @@ -package tasks - -import ( - "context" - "encoding/json" - "time" - - "git.ostiwe.com/ostiwe-com/status/model" - "git.ostiwe.com/ostiwe-com/status/modules/log" - "git.ostiwe.com/ostiwe-com/status/modules/queue/dto" - rabbitModule "git.ostiwe.com/ostiwe-com/status/modules/rabbit" - "git.ostiwe.com/ostiwe-com/status/repository" - "git.ostiwe.com/ostiwe-com/status/service" - amqp "github.com/rabbitmq/amqp091-go" -) - -var PingTask = Task{ - Name: "ping", - Handle: func(ctx context.Context, msgBody []byte) error { - logger := log.Global.Get(log.QUEUE) - var msg dto.PingMessage - - err := json.Unmarshal(msgBody, &msg) - if err != nil { - return err - } - - logger.Info("Received new ping task for service - ", msg.ServiceID) - serviceRepository := repository.NewServiceRepository() - - srv, err := serviceRepository.Find(ctx, msg.ServiceID) - if err != nil { - return err - } - - var lastStatus = &model.Status{Status: model.StatusWarn} - if len(srv.Statuses) > 0 { - lastStatus = &srv.Statuses[0] - } - - checker := service.NewCheck() - - err = checker.Observe(ctx, srv) - if err != nil { - newStatus := model.StatusWarn - - if lastStatus.Status == model.StatusWarn && srv.CountLastStatuses(model.StatusWarn) >= 10 { - newStatus = model.StatusFailed - } - - if lastStatus.Status == model.StatusFailed { - newStatus = model.StatusFailed - } - - logger.Info("Observe service with id ", msg.ServiceID, " failed - ", err, " setting [", newStatus, "] status") - - regErr := checker.RegisterStatus(ctx, msg.ServiceID, newStatus) - if regErr != nil { - logger.Info("Setting status for service with id ", msg.ServiceID, " failed - ", err) - - return regErr - } - - return nil - } - - return checker.RegisterStatus(ctx, msg.ServiceID, model.StatusOK) - }, - AfterHandle: func(ctx context.Context, msgBody []byte) error { - time.Sleep(time.Second * 30) - - payload := dto.TaskMessage{ - Task: "ping", - Payload: string(msgBody), - } - - payloadMsg, err := json.Marshal(payload) - if err != nil { - return err - } - - return rabbitModule.Channel.Publish( - "", - "tasks", - false, - false, - amqp.Publishing{ - Body: payloadMsg, - }, - ) - }, -} diff --git a/modules/queue/tasks/tasks.go b/modules/queue/tasks/tasks.go deleted file mode 100644 index 05845fb..0000000 --- a/modules/queue/tasks/tasks.go +++ /dev/null @@ -1,42 +0,0 @@ -package tasks - -import ( - "context" - - "git.ostiwe.com/ostiwe-com/status/modules/log" -) - -var ( - DefaultFallbackFn = func(_ context.Context, _ error) (bool, error) { - log.Global.Get(log.QUEUE).Info("Default fallback function triggered") - - return true, nil - } - DefaultAfterHandleFn = func(_ context.Context, _ []byte) error { - log.Global.Get(log.QUEUE).Info("Default after handler function triggered") - - return nil - } -) - -// Task represents a task that can be executed in the queue. -// -// Name returns the name of the task, used for logging and identification purposes. -// -// Handle processes a message from the queue. It takes a context and the raw message body ([]byte) as arguments. -// If handling fails, it should return an error that can be caught in the Fallback method. -// -// AfterHandle performs some action after successfully processing the message with Handle. -// It takes the same arguments as Handle: a context and the raw message body ([]byte). -// Default is DefaultAfterHandleFn -// -// Fallback handles errors that occur during the execution of the Handle method. -// It takes a context and an error as arguments. The method should return a boolean indicating whether the error was handled, -// and error for additional logging or debugging information. -// Default is DefaultFallbackFn -type Task struct { - Name string - Handle func(ctx context.Context, msgBody []byte) error - AfterHandle func(ctx context.Context, msgBody []byte) error - Fallback func(ctx context.Context, err error) (bool, error) -} diff --git a/modules/rabbit/rabbit.go b/modules/rabbit/rabbit.go deleted file mode 100644 index 7484c63..0000000 --- a/modules/rabbit/rabbit.go +++ /dev/null @@ -1,66 +0,0 @@ -package rabbit - -import ( - "os" - "time" - - "git.ostiwe.com/ostiwe-com/status/modules/log" - "git.ostiwe.com/ostiwe-com/status/pkg/rabbit" - amqp "github.com/rabbitmq/amqp091-go" - "github.com/sirupsen/logrus" -) - -var Channel *amqp.Channel - -func InitConnection() { - log.Global.Put(log.RABBIT, logrus.New()) - log.Global.Put(log.QUEUE, logrus.New()) - - rConn, err := rabbit.NewConnection(rabbit.ConnectionArgs{ - Host: os.Getenv("RABBIT_HOST"), - Password: os.Getenv("RABBIT_PASSWORD"), - User: os.Getenv("RABBIT_USER"), - Port: os.Getenv("RABBIT_PORT"), - ReconnectInterval: time.Second * 5, - Logger: log.Global.Get(log.RABBIT), - }, - ) - - if err != nil { - panic(err) - } - - rConn.IsAlive() - - if err = declareQueues(rConn); err != nil { - panic(err) - } - - channel, err := rConn.Channel() - if err != nil { - panic(err) - } - - Channel = channel -} - -func declareQueues(conn *rabbit.Connection) error { - channel, err := conn.Channel() - if err != nil { - return err - } - - _, err = channel.QueueDeclare( - "tasks", - true, - false, - false, - false, - nil, - ) - if err != nil { - return err - } - - return nil -} diff --git a/pkg/slice/chunk.go b/pkg/slice/chunk.go deleted file mode 100644 index 5bdc100..0000000 --- a/pkg/slice/chunk.go +++ /dev/null @@ -1,5 +0,0 @@ -package slice - -func ChunkSlice[T any](slice []T, chunkSize int) [][]T { - -}