Files
status/modules/queue/tasks/ping.go

93 lines
2.1 KiB
Go

package tasks
import (
"context"
"encoding/json"
"time"
"git.ostiwe.com/ostiwe-com/status/model"
"git.ostiwe.com/ostiwe-com/status/modules/log"
"git.ostiwe.com/ostiwe-com/status/modules/queue/dto"
rabbitModule "git.ostiwe.com/ostiwe-com/status/modules/rabbit"
"git.ostiwe.com/ostiwe-com/status/repository"
"git.ostiwe.com/ostiwe-com/status/service"
amqp "github.com/rabbitmq/amqp091-go"
)
var PingTask = Task{
Name: "ping",
Handle: func(ctx context.Context, msgBody []byte) error {
logger := log.Global.Get(log.QUEUE)
var msg dto.PingMessage
err := json.Unmarshal(msgBody, &msg)
if err != nil {
return err
}
logger.Info("Received new ping task for service - ", msg.ServiceID)
serviceRepository := repository.NewServiceRepository()
srv, err := serviceRepository.Find(ctx, msg.ServiceID)
if err != nil {
return err
}
var lastStatus = &model.Status{Status: model.StatusWarn}
if len(srv.Statuses) > 0 {
lastStatus = &srv.Statuses[0]
}
checker := service.NewCheck()
err = checker.Observe(ctx, srv)
if err != nil {
newStatus := model.StatusWarn
if lastStatus.Status == model.StatusWarn && srv.CountLastStatuses(model.StatusWarn) >= 10 {
newStatus = model.StatusFailed
}
if lastStatus.Status == model.StatusFailed {
newStatus = model.StatusFailed
}
logger.Info("Observe service with id ", msg.ServiceID, " failed - ", err, " setting [", newStatus, "] status")
regErr := checker.RegisterStatus(ctx, msg.ServiceID, newStatus)
if regErr != nil {
logger.Info("Setting status for service with id ", msg.ServiceID, " failed - ", err)
return regErr
}
return nil
}
return checker.RegisterStatus(ctx, msg.ServiceID, model.StatusOK)
},
AfterHandle: func(ctx context.Context, msgBody []byte) error {
time.Sleep(time.Second * 30)
payload := dto.TaskMessage{
Task: "ping",
Payload: string(msgBody),
}
payloadMsg, err := json.Marshal(payload)
if err != nil {
return err
}
return rabbitModule.Channel.Publish(
"",
"tasks",
false,
false,
amqp.Publishing{
Body: payloadMsg,
},
)
},
}