refactor: Remove unused code
Rethinking queue operation, switching to cron
This commit is contained in:
@@ -1,5 +0,0 @@
|
||||
package dto
|
||||
|
||||
type PingMessage struct {
|
||||
ServiceID int `json:"serviceID"`
|
||||
}
|
||||
@@ -1,6 +0,0 @@
|
||||
package dto
|
||||
|
||||
type TaskMessage struct {
|
||||
Task string `json:"task"`
|
||||
Payload string `json:"payload"`
|
||||
}
|
||||
@@ -1,187 +0,0 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"git.ostiwe.com/ostiwe-com/status/modules/log"
|
||||
"git.ostiwe.com/ostiwe-com/status/modules/queue/dto"
|
||||
"git.ostiwe.com/ostiwe-com/status/modules/queue/tasks"
|
||||
rabbitModule "git.ostiwe.com/ostiwe-com/status/modules/rabbit"
|
||||
"git.ostiwe.com/ostiwe-com/status/repository"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
func InitQueues() {
|
||||
rabbitModule.InitConnection()
|
||||
|
||||
err := flushQueues()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err = createTasksForServices(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
log.Global.Get(log.SYSTEM).Info("Wait 5 seconds before start listen task queue")
|
||||
time.Sleep(time.Second * 5)
|
||||
|
||||
if err = listenTaskQueue(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func createTasksForServices() error {
|
||||
log.Global.Get(log.SYSTEM).Info("Create tasks ping for services")
|
||||
|
||||
serviceRepository := repository.NewServiceRepository()
|
||||
|
||||
services, err := serviceRepository.All(context.Background(), -1, 0, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := range services {
|
||||
pingPayload, marshallErr := json.Marshal(dto.PingMessage{ServiceID: services[i].ID})
|
||||
if marshallErr != nil {
|
||||
return marshallErr
|
||||
}
|
||||
|
||||
payload := dto.TaskMessage{
|
||||
Task: "ping",
|
||||
Payload: string(pingPayload),
|
||||
}
|
||||
|
||||
payloadMsg, marshallErr := json.Marshal(payload)
|
||||
if marshallErr != nil {
|
||||
return marshallErr
|
||||
}
|
||||
|
||||
publishErr := rabbitModule.Channel.Publish(
|
||||
"",
|
||||
"tasks",
|
||||
false,
|
||||
false,
|
||||
amqp.Publishing{Body: payloadMsg},
|
||||
)
|
||||
if publishErr != nil {
|
||||
return publishErr
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func flushQueues() error {
|
||||
log.Global.Get(log.SYSTEM).Info("Flush queues")
|
||||
|
||||
_, err := rabbitModule.Channel.QueuePurge("tasks", false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func listenTaskQueue() error {
|
||||
ctx := context.Background()
|
||||
|
||||
consume, err := rabbitModule.Channel.Consume(
|
||||
"tasks",
|
||||
"tasks-consumer",
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
taskCollection := tasks.CollectionMap()
|
||||
|
||||
for delivery := range consume {
|
||||
go handleQueueMessage(ctx, delivery, taskCollection)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleQueueMessage(ctx context.Context, delivery amqp.Delivery, taskCollection map[string]tasks.Task) {
|
||||
queueLog := log.Global.Get(log.QUEUE)
|
||||
|
||||
var taskMsg dto.TaskMessage
|
||||
unmarshalErr := json.Unmarshal(delivery.Body, &taskMsg)
|
||||
if unmarshalErr != nil {
|
||||
queueLog.WithField("body", string(delivery.Body)).Error("Failed unmarshal queue message - ", unmarshalErr)
|
||||
|
||||
rejectErr := delivery.Reject(false)
|
||||
if rejectErr != nil {
|
||||
queueLog.Error("Failed reject message - ", rejectErr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
task, ok := taskCollection[taskMsg.Task]
|
||||
|
||||
if !ok {
|
||||
queueLog.Error("Undefined task with name - ", taskMsg.Task)
|
||||
rejectErr := delivery.Reject(false)
|
||||
if rejectErr != nil {
|
||||
queueLog.Error("Failed reject message - ", rejectErr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
payload := []byte(taskMsg.Payload)
|
||||
|
||||
handleErr := task.Handle(ctx, payload)
|
||||
if handleErr != nil {
|
||||
queueLog.Error("Task handler return error - ", handleErr)
|
||||
|
||||
fallback, fallbackErr := task.Fallback(ctx, handleErr)
|
||||
if fallbackErr != nil {
|
||||
queueLog.Error("Task failed with error - ", handleErr, "; Task fallback handler also return error - ", fallbackErr, "; Reject message.")
|
||||
|
||||
rejectErr := delivery.Reject(false)
|
||||
if rejectErr != nil {
|
||||
queueLog.Error("Failed reject message - ", rejectErr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if !fallback {
|
||||
queueLog.Error("Task fallback unsuccessful, reject message")
|
||||
|
||||
rejectErr := delivery.Reject(false)
|
||||
if rejectErr != nil {
|
||||
queueLog.Error("Failed reject message - ", rejectErr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
ackErr := delivery.Ack(false)
|
||||
if ackErr != nil {
|
||||
queueLog.Error("Failed acknowledge message - ", ackErr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
ackErr := delivery.Ack(false)
|
||||
if ackErr != nil {
|
||||
queueLog.Error("Failed acknowledge message - ", ackErr)
|
||||
}
|
||||
|
||||
afterHandleErr := task.AfterHandle(ctx, payload)
|
||||
if afterHandleErr != nil {
|
||||
queueLog.Error("Task after handler hook failed with error - ", afterHandleErr)
|
||||
}
|
||||
}
|
||||
@@ -1,24 +0,0 @@
|
||||
package tasks
|
||||
|
||||
func Collection() []Task {
|
||||
return []Task{
|
||||
PingTask,
|
||||
}
|
||||
}
|
||||
|
||||
func CollectionMap() map[string]Task {
|
||||
collectionMap := make(map[string]Task)
|
||||
|
||||
for _, task := range Collection() {
|
||||
if task.Fallback == nil {
|
||||
task.Fallback = DefaultFallbackFn
|
||||
}
|
||||
if task.AfterHandle == nil {
|
||||
task.AfterHandle = DefaultAfterHandleFn
|
||||
}
|
||||
|
||||
collectionMap[task.Name] = task
|
||||
}
|
||||
|
||||
return collectionMap
|
||||
}
|
||||
@@ -1,92 +0,0 @@
|
||||
package tasks
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"git.ostiwe.com/ostiwe-com/status/model"
|
||||
"git.ostiwe.com/ostiwe-com/status/modules/log"
|
||||
"git.ostiwe.com/ostiwe-com/status/modules/queue/dto"
|
||||
rabbitModule "git.ostiwe.com/ostiwe-com/status/modules/rabbit"
|
||||
"git.ostiwe.com/ostiwe-com/status/repository"
|
||||
"git.ostiwe.com/ostiwe-com/status/service"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
var PingTask = Task{
|
||||
Name: "ping",
|
||||
Handle: func(ctx context.Context, msgBody []byte) error {
|
||||
logger := log.Global.Get(log.QUEUE)
|
||||
var msg dto.PingMessage
|
||||
|
||||
err := json.Unmarshal(msgBody, &msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Info("Received new ping task for service - ", msg.ServiceID)
|
||||
serviceRepository := repository.NewServiceRepository()
|
||||
|
||||
srv, err := serviceRepository.Find(ctx, msg.ServiceID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var lastStatus = &model.Status{Status: model.StatusWarn}
|
||||
if len(srv.Statuses) > 0 {
|
||||
lastStatus = &srv.Statuses[0]
|
||||
}
|
||||
|
||||
checker := service.NewCheck()
|
||||
|
||||
err = checker.Observe(ctx, srv)
|
||||
if err != nil {
|
||||
newStatus := model.StatusWarn
|
||||
|
||||
if lastStatus.Status == model.StatusWarn && srv.CountLastStatuses(model.StatusWarn) >= 10 {
|
||||
newStatus = model.StatusFailed
|
||||
}
|
||||
|
||||
if lastStatus.Status == model.StatusFailed {
|
||||
newStatus = model.StatusFailed
|
||||
}
|
||||
|
||||
logger.Info("Observe service with id ", msg.ServiceID, " failed - ", err, " setting [", newStatus, "] status")
|
||||
|
||||
regErr := checker.RegisterStatus(ctx, msg.ServiceID, newStatus)
|
||||
if regErr != nil {
|
||||
logger.Info("Setting status for service with id ", msg.ServiceID, " failed - ", err)
|
||||
|
||||
return regErr
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return checker.RegisterStatus(ctx, msg.ServiceID, model.StatusOK)
|
||||
},
|
||||
AfterHandle: func(ctx context.Context, msgBody []byte) error {
|
||||
time.Sleep(time.Second * 30)
|
||||
|
||||
payload := dto.TaskMessage{
|
||||
Task: "ping",
|
||||
Payload: string(msgBody),
|
||||
}
|
||||
|
||||
payloadMsg, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return rabbitModule.Channel.Publish(
|
||||
"",
|
||||
"tasks",
|
||||
false,
|
||||
false,
|
||||
amqp.Publishing{
|
||||
Body: payloadMsg,
|
||||
},
|
||||
)
|
||||
},
|
||||
}
|
||||
@@ -1,42 +0,0 @@
|
||||
package tasks
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.ostiwe.com/ostiwe-com/status/modules/log"
|
||||
)
|
||||
|
||||
var (
|
||||
DefaultFallbackFn = func(_ context.Context, _ error) (bool, error) {
|
||||
log.Global.Get(log.QUEUE).Info("Default fallback function triggered")
|
||||
|
||||
return true, nil
|
||||
}
|
||||
DefaultAfterHandleFn = func(_ context.Context, _ []byte) error {
|
||||
log.Global.Get(log.QUEUE).Info("Default after handler function triggered")
|
||||
|
||||
return nil
|
||||
}
|
||||
)
|
||||
|
||||
// Task represents a task that can be executed in the queue.
|
||||
//
|
||||
// Name returns the name of the task, used for logging and identification purposes.
|
||||
//
|
||||
// Handle processes a message from the queue. It takes a context and the raw message body ([]byte) as arguments.
|
||||
// If handling fails, it should return an error that can be caught in the Fallback method.
|
||||
//
|
||||
// AfterHandle performs some action after successfully processing the message with Handle.
|
||||
// It takes the same arguments as Handle: a context and the raw message body ([]byte).
|
||||
// Default is DefaultAfterHandleFn
|
||||
//
|
||||
// Fallback handles errors that occur during the execution of the Handle method.
|
||||
// It takes a context and an error as arguments. The method should return a boolean indicating whether the error was handled,
|
||||
// and error for additional logging or debugging information.
|
||||
// Default is DefaultFallbackFn
|
||||
type Task struct {
|
||||
Name string
|
||||
Handle func(ctx context.Context, msgBody []byte) error
|
||||
AfterHandle func(ctx context.Context, msgBody []byte) error
|
||||
Fallback func(ctx context.Context, err error) (bool, error)
|
||||
}
|
||||
@@ -1,66 +0,0 @@
|
||||
package rabbit
|
||||
|
||||
import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"git.ostiwe.com/ostiwe-com/status/modules/log"
|
||||
"git.ostiwe.com/ostiwe-com/status/pkg/rabbit"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var Channel *amqp.Channel
|
||||
|
||||
func InitConnection() {
|
||||
log.Global.Put(log.RABBIT, logrus.New())
|
||||
log.Global.Put(log.QUEUE, logrus.New())
|
||||
|
||||
rConn, err := rabbit.NewConnection(rabbit.ConnectionArgs{
|
||||
Host: os.Getenv("RABBIT_HOST"),
|
||||
Password: os.Getenv("RABBIT_PASSWORD"),
|
||||
User: os.Getenv("RABBIT_USER"),
|
||||
Port: os.Getenv("RABBIT_PORT"),
|
||||
ReconnectInterval: time.Second * 5,
|
||||
Logger: log.Global.Get(log.RABBIT),
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
rConn.IsAlive()
|
||||
|
||||
if err = declareQueues(rConn); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
channel, err := rConn.Channel()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
Channel = channel
|
||||
}
|
||||
|
||||
func declareQueues(conn *rabbit.Connection) error {
|
||||
channel, err := conn.Channel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = channel.QueueDeclare(
|
||||
"tasks",
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -1,5 +0,0 @@
|
||||
package slice
|
||||
|
||||
func ChunkSlice[T any](slice []T, chunkSize int) [][]T {
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user