refactor: Remove unused code
Rethinking queue operation, switching to cron
This commit is contained in:
@@ -1,5 +0,0 @@
|
|||||||
package dto
|
|
||||||
|
|
||||||
type PingMessage struct {
|
|
||||||
ServiceID int `json:"serviceID"`
|
|
||||||
}
|
|
||||||
@@ -1,6 +0,0 @@
|
|||||||
package dto
|
|
||||||
|
|
||||||
type TaskMessage struct {
|
|
||||||
Task string `json:"task"`
|
|
||||||
Payload string `json:"payload"`
|
|
||||||
}
|
|
||||||
@@ -1,187 +0,0 @@
|
|||||||
package queue
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.ostiwe.com/ostiwe-com/status/modules/log"
|
|
||||||
"git.ostiwe.com/ostiwe-com/status/modules/queue/dto"
|
|
||||||
"git.ostiwe.com/ostiwe-com/status/modules/queue/tasks"
|
|
||||||
rabbitModule "git.ostiwe.com/ostiwe-com/status/modules/rabbit"
|
|
||||||
"git.ostiwe.com/ostiwe-com/status/repository"
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
)
|
|
||||||
|
|
||||||
func InitQueues() {
|
|
||||||
rabbitModule.InitConnection()
|
|
||||||
|
|
||||||
err := flushQueues()
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = createTasksForServices(); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Global.Get(log.SYSTEM).Info("Wait 5 seconds before start listen task queue")
|
|
||||||
time.Sleep(time.Second * 5)
|
|
||||||
|
|
||||||
if err = listenTaskQueue(); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func createTasksForServices() error {
|
|
||||||
log.Global.Get(log.SYSTEM).Info("Create tasks ping for services")
|
|
||||||
|
|
||||||
serviceRepository := repository.NewServiceRepository()
|
|
||||||
|
|
||||||
services, err := serviceRepository.All(context.Background(), -1, 0, false)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range services {
|
|
||||||
pingPayload, marshallErr := json.Marshal(dto.PingMessage{ServiceID: services[i].ID})
|
|
||||||
if marshallErr != nil {
|
|
||||||
return marshallErr
|
|
||||||
}
|
|
||||||
|
|
||||||
payload := dto.TaskMessage{
|
|
||||||
Task: "ping",
|
|
||||||
Payload: string(pingPayload),
|
|
||||||
}
|
|
||||||
|
|
||||||
payloadMsg, marshallErr := json.Marshal(payload)
|
|
||||||
if marshallErr != nil {
|
|
||||||
return marshallErr
|
|
||||||
}
|
|
||||||
|
|
||||||
publishErr := rabbitModule.Channel.Publish(
|
|
||||||
"",
|
|
||||||
"tasks",
|
|
||||||
false,
|
|
||||||
false,
|
|
||||||
amqp.Publishing{Body: payloadMsg},
|
|
||||||
)
|
|
||||||
if publishErr != nil {
|
|
||||||
return publishErr
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func flushQueues() error {
|
|
||||||
log.Global.Get(log.SYSTEM).Info("Flush queues")
|
|
||||||
|
|
||||||
_, err := rabbitModule.Channel.QueuePurge("tasks", false)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func listenTaskQueue() error {
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
consume, err := rabbitModule.Channel.Consume(
|
|
||||||
"tasks",
|
|
||||||
"tasks-consumer",
|
|
||||||
false,
|
|
||||||
false,
|
|
||||||
false,
|
|
||||||
false,
|
|
||||||
nil,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
taskCollection := tasks.CollectionMap()
|
|
||||||
|
|
||||||
for delivery := range consume {
|
|
||||||
go handleQueueMessage(ctx, delivery, taskCollection)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func handleQueueMessage(ctx context.Context, delivery amqp.Delivery, taskCollection map[string]tasks.Task) {
|
|
||||||
queueLog := log.Global.Get(log.QUEUE)
|
|
||||||
|
|
||||||
var taskMsg dto.TaskMessage
|
|
||||||
unmarshalErr := json.Unmarshal(delivery.Body, &taskMsg)
|
|
||||||
if unmarshalErr != nil {
|
|
||||||
queueLog.WithField("body", string(delivery.Body)).Error("Failed unmarshal queue message - ", unmarshalErr)
|
|
||||||
|
|
||||||
rejectErr := delivery.Reject(false)
|
|
||||||
if rejectErr != nil {
|
|
||||||
queueLog.Error("Failed reject message - ", rejectErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
task, ok := taskCollection[taskMsg.Task]
|
|
||||||
|
|
||||||
if !ok {
|
|
||||||
queueLog.Error("Undefined task with name - ", taskMsg.Task)
|
|
||||||
rejectErr := delivery.Reject(false)
|
|
||||||
if rejectErr != nil {
|
|
||||||
queueLog.Error("Failed reject message - ", rejectErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
payload := []byte(taskMsg.Payload)
|
|
||||||
|
|
||||||
handleErr := task.Handle(ctx, payload)
|
|
||||||
if handleErr != nil {
|
|
||||||
queueLog.Error("Task handler return error - ", handleErr)
|
|
||||||
|
|
||||||
fallback, fallbackErr := task.Fallback(ctx, handleErr)
|
|
||||||
if fallbackErr != nil {
|
|
||||||
queueLog.Error("Task failed with error - ", handleErr, "; Task fallback handler also return error - ", fallbackErr, "; Reject message.")
|
|
||||||
|
|
||||||
rejectErr := delivery.Reject(false)
|
|
||||||
if rejectErr != nil {
|
|
||||||
queueLog.Error("Failed reject message - ", rejectErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if !fallback {
|
|
||||||
queueLog.Error("Task fallback unsuccessful, reject message")
|
|
||||||
|
|
||||||
rejectErr := delivery.Reject(false)
|
|
||||||
if rejectErr != nil {
|
|
||||||
queueLog.Error("Failed reject message - ", rejectErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ackErr := delivery.Ack(false)
|
|
||||||
if ackErr != nil {
|
|
||||||
queueLog.Error("Failed acknowledge message - ", ackErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ackErr := delivery.Ack(false)
|
|
||||||
if ackErr != nil {
|
|
||||||
queueLog.Error("Failed acknowledge message - ", ackErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
afterHandleErr := task.AfterHandle(ctx, payload)
|
|
||||||
if afterHandleErr != nil {
|
|
||||||
queueLog.Error("Task after handler hook failed with error - ", afterHandleErr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,24 +0,0 @@
|
|||||||
package tasks
|
|
||||||
|
|
||||||
func Collection() []Task {
|
|
||||||
return []Task{
|
|
||||||
PingTask,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func CollectionMap() map[string]Task {
|
|
||||||
collectionMap := make(map[string]Task)
|
|
||||||
|
|
||||||
for _, task := range Collection() {
|
|
||||||
if task.Fallback == nil {
|
|
||||||
task.Fallback = DefaultFallbackFn
|
|
||||||
}
|
|
||||||
if task.AfterHandle == nil {
|
|
||||||
task.AfterHandle = DefaultAfterHandleFn
|
|
||||||
}
|
|
||||||
|
|
||||||
collectionMap[task.Name] = task
|
|
||||||
}
|
|
||||||
|
|
||||||
return collectionMap
|
|
||||||
}
|
|
||||||
@@ -1,92 +0,0 @@
|
|||||||
package tasks
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.ostiwe.com/ostiwe-com/status/model"
|
|
||||||
"git.ostiwe.com/ostiwe-com/status/modules/log"
|
|
||||||
"git.ostiwe.com/ostiwe-com/status/modules/queue/dto"
|
|
||||||
rabbitModule "git.ostiwe.com/ostiwe-com/status/modules/rabbit"
|
|
||||||
"git.ostiwe.com/ostiwe-com/status/repository"
|
|
||||||
"git.ostiwe.com/ostiwe-com/status/service"
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
)
|
|
||||||
|
|
||||||
var PingTask = Task{
|
|
||||||
Name: "ping",
|
|
||||||
Handle: func(ctx context.Context, msgBody []byte) error {
|
|
||||||
logger := log.Global.Get(log.QUEUE)
|
|
||||||
var msg dto.PingMessage
|
|
||||||
|
|
||||||
err := json.Unmarshal(msgBody, &msg)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Info("Received new ping task for service - ", msg.ServiceID)
|
|
||||||
serviceRepository := repository.NewServiceRepository()
|
|
||||||
|
|
||||||
srv, err := serviceRepository.Find(ctx, msg.ServiceID)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var lastStatus = &model.Status{Status: model.StatusWarn}
|
|
||||||
if len(srv.Statuses) > 0 {
|
|
||||||
lastStatus = &srv.Statuses[0]
|
|
||||||
}
|
|
||||||
|
|
||||||
checker := service.NewCheck()
|
|
||||||
|
|
||||||
err = checker.Observe(ctx, srv)
|
|
||||||
if err != nil {
|
|
||||||
newStatus := model.StatusWarn
|
|
||||||
|
|
||||||
if lastStatus.Status == model.StatusWarn && srv.CountLastStatuses(model.StatusWarn) >= 10 {
|
|
||||||
newStatus = model.StatusFailed
|
|
||||||
}
|
|
||||||
|
|
||||||
if lastStatus.Status == model.StatusFailed {
|
|
||||||
newStatus = model.StatusFailed
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Info("Observe service with id ", msg.ServiceID, " failed - ", err, " setting [", newStatus, "] status")
|
|
||||||
|
|
||||||
regErr := checker.RegisterStatus(ctx, msg.ServiceID, newStatus)
|
|
||||||
if regErr != nil {
|
|
||||||
logger.Info("Setting status for service with id ", msg.ServiceID, " failed - ", err)
|
|
||||||
|
|
||||||
return regErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return checker.RegisterStatus(ctx, msg.ServiceID, model.StatusOK)
|
|
||||||
},
|
|
||||||
AfterHandle: func(ctx context.Context, msgBody []byte) error {
|
|
||||||
time.Sleep(time.Second * 30)
|
|
||||||
|
|
||||||
payload := dto.TaskMessage{
|
|
||||||
Task: "ping",
|
|
||||||
Payload: string(msgBody),
|
|
||||||
}
|
|
||||||
|
|
||||||
payloadMsg, err := json.Marshal(payload)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return rabbitModule.Channel.Publish(
|
|
||||||
"",
|
|
||||||
"tasks",
|
|
||||||
false,
|
|
||||||
false,
|
|
||||||
amqp.Publishing{
|
|
||||||
Body: payloadMsg,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
@@ -1,42 +0,0 @@
|
|||||||
package tasks
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"git.ostiwe.com/ostiwe-com/status/modules/log"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
DefaultFallbackFn = func(_ context.Context, _ error) (bool, error) {
|
|
||||||
log.Global.Get(log.QUEUE).Info("Default fallback function triggered")
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
DefaultAfterHandleFn = func(_ context.Context, _ []byte) error {
|
|
||||||
log.Global.Get(log.QUEUE).Info("Default after handler function triggered")
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
// Task represents a task that can be executed in the queue.
|
|
||||||
//
|
|
||||||
// Name returns the name of the task, used for logging and identification purposes.
|
|
||||||
//
|
|
||||||
// Handle processes a message from the queue. It takes a context and the raw message body ([]byte) as arguments.
|
|
||||||
// If handling fails, it should return an error that can be caught in the Fallback method.
|
|
||||||
//
|
|
||||||
// AfterHandle performs some action after successfully processing the message with Handle.
|
|
||||||
// It takes the same arguments as Handle: a context and the raw message body ([]byte).
|
|
||||||
// Default is DefaultAfterHandleFn
|
|
||||||
//
|
|
||||||
// Fallback handles errors that occur during the execution of the Handle method.
|
|
||||||
// It takes a context and an error as arguments. The method should return a boolean indicating whether the error was handled,
|
|
||||||
// and error for additional logging or debugging information.
|
|
||||||
// Default is DefaultFallbackFn
|
|
||||||
type Task struct {
|
|
||||||
Name string
|
|
||||||
Handle func(ctx context.Context, msgBody []byte) error
|
|
||||||
AfterHandle func(ctx context.Context, msgBody []byte) error
|
|
||||||
Fallback func(ctx context.Context, err error) (bool, error)
|
|
||||||
}
|
|
||||||
@@ -1,66 +0,0 @@
|
|||||||
package rabbit
|
|
||||||
|
|
||||||
import (
|
|
||||||
"os"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.ostiwe.com/ostiwe-com/status/modules/log"
|
|
||||||
"git.ostiwe.com/ostiwe-com/status/pkg/rabbit"
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
)
|
|
||||||
|
|
||||||
var Channel *amqp.Channel
|
|
||||||
|
|
||||||
func InitConnection() {
|
|
||||||
log.Global.Put(log.RABBIT, logrus.New())
|
|
||||||
log.Global.Put(log.QUEUE, logrus.New())
|
|
||||||
|
|
||||||
rConn, err := rabbit.NewConnection(rabbit.ConnectionArgs{
|
|
||||||
Host: os.Getenv("RABBIT_HOST"),
|
|
||||||
Password: os.Getenv("RABBIT_PASSWORD"),
|
|
||||||
User: os.Getenv("RABBIT_USER"),
|
|
||||||
Port: os.Getenv("RABBIT_PORT"),
|
|
||||||
ReconnectInterval: time.Second * 5,
|
|
||||||
Logger: log.Global.Get(log.RABBIT),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
rConn.IsAlive()
|
|
||||||
|
|
||||||
if err = declareQueues(rConn); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
channel, err := rConn.Channel()
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
Channel = channel
|
|
||||||
}
|
|
||||||
|
|
||||||
func declareQueues(conn *rabbit.Connection) error {
|
|
||||||
channel, err := conn.Channel()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = channel.QueueDeclare(
|
|
||||||
"tasks",
|
|
||||||
true,
|
|
||||||
false,
|
|
||||||
false,
|
|
||||||
false,
|
|
||||||
nil,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
@@ -1,5 +0,0 @@
|
|||||||
package slice
|
|
||||||
|
|
||||||
func ChunkSlice[T any](slice []T, chunkSize int) [][]T {
|
|
||||||
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user