Merge pull request 'Backend: Update Application Version and Implement Queue Module with RabbitMQ Integration' (#1) from dev into main
Reviewed-on: #1
This commit is contained in:
5
.env
5
.env
@@ -4,3 +4,8 @@ DATABASE_USER=
|
||||
DATABASE_PASS=
|
||||
DATABASE_DB=
|
||||
DATABASE_TZ=Europe/Moscow
|
||||
|
||||
RABBIT_USER=user
|
||||
RABBIT_PASSWORD=user
|
||||
RABBIT_HOST=localhost
|
||||
RABBIT_PORT=5672
|
||||
|
||||
@@ -12,5 +12,14 @@ services:
|
||||
ports:
|
||||
- "5444:5432"
|
||||
|
||||
rabbitmq:
|
||||
image: rabbitmq:4.1.2-management
|
||||
environment:
|
||||
RABBITMQ_DEFAULT_USER: user
|
||||
RABBITMQ_DEFAULT_PASS: user
|
||||
ports:
|
||||
- "5672:5672"
|
||||
- "15672:15672"
|
||||
|
||||
volumes:
|
||||
database_postgres:
|
||||
1
go.mod
1
go.mod
@@ -7,6 +7,7 @@ require (
|
||||
github.com/go-andiamo/chioas v1.16.4
|
||||
github.com/go-chi/chi/v5 v5.2.2
|
||||
github.com/joho/godotenv v1.5.1
|
||||
github.com/rabbitmq/amqp091-go v1.10.0
|
||||
github.com/sirupsen/logrus v1.9.3
|
||||
go.uber.org/mock v0.5.2
|
||||
gorm.io/driver/postgres v1.6.0
|
||||
|
||||
4
go.sum
4
go.sum
@@ -35,6 +35,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
|
||||
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
|
||||
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
|
||||
@@ -45,6 +47,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
go.uber.org/mock v0.5.2 h1:LbtPTcP8A5k9WPXj54PPPbjcI4Y6lhyOZXn+VS7wNko=
|
||||
go.uber.org/mock v0.5.2/go.mod h1:wLlUxC2vVTPTaE3UD51E0BGOAElKrILxhVSDYQLld5o=
|
||||
golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM=
|
||||
|
||||
71
main.go
71
main.go
@@ -4,72 +4,30 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"git.ostiwe.com/ostiwe-com/status/model"
|
||||
"git.ostiwe.com/ostiwe-com/status/modules/db"
|
||||
appLog "git.ostiwe.com/ostiwe-com/status/modules/log"
|
||||
"git.ostiwe.com/ostiwe-com/status/pkg/args"
|
||||
"git.ostiwe.com/ostiwe-com/status/router"
|
||||
"git.ostiwe.com/ostiwe-com/status/version"
|
||||
"git.ostiwe.com/ostiwe-com/status/server"
|
||||
_ "git.ostiwe.com/ostiwe-com/status/settings"
|
||||
"github.com/alexflint/go-arg"
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
_ "git.ostiwe.com/ostiwe-com/status/settings"
|
||||
)
|
||||
|
||||
type ServerCmd struct {
|
||||
Port string `arg:"-p,--port" help:"Port to listen on" default:"8080"`
|
||||
}
|
||||
|
||||
type ServerDocumentationCmd struct {
|
||||
Port string `arg:"-p,--port" help:"Port to listen on" default:"8081"`
|
||||
Plain bool `arg:"--plain" help:"Enable plain text output" default:"true"`
|
||||
PlainFormat string `arg:"--plain-format" help:"Set format for output (json, yaml)" default:"yaml"`
|
||||
}
|
||||
|
||||
type appArgs struct {
|
||||
Server *ServerCmd `arg:"subcommand:server" help:"Start the api server"`
|
||||
ServerDocumentation *ServerDocumentationCmd `arg:"subcommand:server-docs" help:"Generate documentation for api server"`
|
||||
}
|
||||
|
||||
var args appArgs
|
||||
|
||||
func (appArgs) Version() string {
|
||||
return version.AppVersion()
|
||||
}
|
||||
var appArgs args.AppArgs
|
||||
|
||||
func main() {
|
||||
arg.MustParse(&args)
|
||||
arg.MustParse(&appArgs)
|
||||
|
||||
if args.Server != nil {
|
||||
connect, err := db.Connect()
|
||||
if err != nil {
|
||||
appLog.Global.Get(appLog.SYSTEM).Error(fmt.Sprintf("Startup server error, failed connect to database: %v", err))
|
||||
if appArgs.Server != nil {
|
||||
server.Run(appArgs.Server)
|
||||
return
|
||||
}
|
||||
|
||||
db.SetGlobal(connect)
|
||||
appLog.Global.Get(appLog.SYSTEM).Info("Run db migration")
|
||||
if err = runMigrate(); err != nil {
|
||||
appLog.Global.Get(appLog.SYSTEM).Error(fmt.Sprintf("Migration failed, error: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
appLog.Global.Put(appLog.SERVER, logrus.New())
|
||||
appLog.Global.Get(appLog.SERVER).Info("Startup server on port: ", args.Server.Port)
|
||||
|
||||
err = http.ListenAndServe(fmt.Sprintf(":%s", args.Server.Port), router.InitRoutes())
|
||||
if err != nil {
|
||||
appLog.Global.Get(appLog.SERVER).Error(fmt.Sprintf("Startup server error: %v", err))
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if args.ServerDocumentation != nil {
|
||||
if appArgs.ServerDocumentation != nil {
|
||||
appLog.Global.Get(appLog.SYSTEM).Info("Collect documentation")
|
||||
|
||||
docs := router.Documentate()
|
||||
if !args.ServerDocumentation.Plain {
|
||||
if !appArgs.ServerDocumentation.Plain {
|
||||
chiRouter := chi.NewRouter()
|
||||
|
||||
err := docs.SetupRoutes(chiRouter, docs)
|
||||
@@ -78,8 +36,8 @@ func main() {
|
||||
return
|
||||
}
|
||||
|
||||
appLog.Global.Get(appLog.SYSTEM).Info(fmt.Sprintf("Start documentation server on port: %s", args.ServerDocumentation.Port))
|
||||
err = http.ListenAndServe(fmt.Sprintf(":%s", args.ServerDocumentation.Port), chiRouter)
|
||||
appLog.Global.Get(appLog.SYSTEM).Info(fmt.Sprintf("Start documentation server on port: %s", appArgs.ServerDocumentation.Port))
|
||||
err = http.ListenAndServe(fmt.Sprintf(":%s", appArgs.ServerDocumentation.Port), chiRouter)
|
||||
if err != nil {
|
||||
appLog.Global.Get(appLog.SYSTEM).Error(fmt.Sprintf("Startup server error: %v", err))
|
||||
}
|
||||
@@ -91,10 +49,3 @@ func main() {
|
||||
|
||||
appLog.Global.Get(appLog.SYSTEM).Info("Exit from application")
|
||||
}
|
||||
|
||||
func runMigrate() error {
|
||||
return db.Global.AutoMigrate(
|
||||
model.Service{},
|
||||
model.Status{},
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
package model
|
||||
|
||||
type HTTPConfig struct {
|
||||
Authorization string `json:"authorization"`
|
||||
Method string `json:"method"`
|
||||
Headers map[string]string `json:"headers"`
|
||||
}
|
||||
|
||||
type ServiceTypeCheckConfig struct {
|
||||
@@ -32,3 +33,17 @@ type Service struct {
|
||||
func (Service) TableName() string {
|
||||
return "service"
|
||||
}
|
||||
|
||||
func (s Service) CountLastStatuses(status StatusCode) uint {
|
||||
var count uint = 0
|
||||
|
||||
for i := range s.Statuses {
|
||||
if s.Statuses[i].Status != status {
|
||||
break
|
||||
}
|
||||
|
||||
count++
|
||||
}
|
||||
|
||||
return count
|
||||
}
|
||||
|
||||
@@ -6,10 +6,18 @@ import (
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type StatusCode string
|
||||
|
||||
const (
|
||||
StatusOK StatusCode = "ok" // Means - response ok, service is alive
|
||||
StatusFailed StatusCode = "failed" // Means - response failed, all tries failed, service down
|
||||
StatusWarn StatusCode = "warn" // Means - response failed after N tries and still watched
|
||||
)
|
||||
|
||||
type Status struct {
|
||||
ID int `gorm:"primary_key;auto_increment" json:"-"`
|
||||
ServiceID int `gorm:"one" json:"-"`
|
||||
Status string `gorm:"size:255;not null" json:"status"`
|
||||
Status StatusCode `gorm:"size:255;not null" json:"status"`
|
||||
Description *string `gorm:"size:255" json:"description"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
}
|
||||
|
||||
@@ -10,6 +10,10 @@ const (
|
||||
SERVER = "server"
|
||||
SYSTEM = "system"
|
||||
DATABASE = "database"
|
||||
CRON = "cron"
|
||||
QUEUE = "queue"
|
||||
RABBIT = "rabbit"
|
||||
internal = "internal"
|
||||
)
|
||||
|
||||
var Global *LoggerManager
|
||||
@@ -24,6 +28,9 @@ type LoggerManager struct {
|
||||
}
|
||||
|
||||
func (m *LoggerManager) Get(name string) *logrus.Logger {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
if logger, ok := m.loggers[name]; ok {
|
||||
return logger
|
||||
}
|
||||
@@ -35,13 +42,32 @@ func (m *LoggerManager) Put(name string, logger *logrus.Logger) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
if _, ok := m.loggers[name]; ok {
|
||||
m.Get(internal).Errorf("Logger with name '%s' already exists. Use PutForce to replace it.", name)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
logger.Formatter = NewTextFormatter(name)
|
||||
|
||||
m.loggers[name] = logger
|
||||
}
|
||||
|
||||
func (m *LoggerManager) PutForce(name string, logger *logrus.Logger) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
logger.Formatter = NewTextFormatter(name)
|
||||
|
||||
m.loggers[name] = logger
|
||||
}
|
||||
|
||||
func NewManager() *LoggerManager {
|
||||
return &LoggerManager{
|
||||
manager := &LoggerManager{
|
||||
loggers: make(map[string]*logrus.Logger),
|
||||
}
|
||||
|
||||
manager.Put(internal, logrus.New())
|
||||
|
||||
return manager
|
||||
}
|
||||
|
||||
5
modules/queue/dto/ping.go
Normal file
5
modules/queue/dto/ping.go
Normal file
@@ -0,0 +1,5 @@
|
||||
package dto
|
||||
|
||||
type PingMessage struct {
|
||||
ServiceID int `json:"serviceID"`
|
||||
}
|
||||
6
modules/queue/dto/task.go
Normal file
6
modules/queue/dto/task.go
Normal file
@@ -0,0 +1,6 @@
|
||||
package dto
|
||||
|
||||
type TaskMessage struct {
|
||||
Task string `json:"task"`
|
||||
Payload string `json:"payload"`
|
||||
}
|
||||
187
modules/queue/queue.go
Normal file
187
modules/queue/queue.go
Normal file
@@ -0,0 +1,187 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"git.ostiwe.com/ostiwe-com/status/modules/log"
|
||||
"git.ostiwe.com/ostiwe-com/status/modules/queue/dto"
|
||||
"git.ostiwe.com/ostiwe-com/status/modules/queue/tasks"
|
||||
rabbitModule "git.ostiwe.com/ostiwe-com/status/modules/rabbit"
|
||||
"git.ostiwe.com/ostiwe-com/status/repository"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
func InitQueues() {
|
||||
rabbitModule.InitConnection()
|
||||
|
||||
err := flushQueues()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err = createTasksForServices(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
log.Global.Get(log.SYSTEM).Info("Wait 5 seconds before start listen task queue")
|
||||
time.Sleep(time.Second * 5)
|
||||
|
||||
if err = listenTaskQueue(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func createTasksForServices() error {
|
||||
log.Global.Get(log.SYSTEM).Info("Create tasks ping for services")
|
||||
|
||||
serviceRepository := repository.NewServiceRepository()
|
||||
|
||||
services, err := serviceRepository.All(context.Background(), -1, 0, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := range services {
|
||||
pingPayload, marshallErr := json.Marshal(dto.PingMessage{ServiceID: services[i].ID})
|
||||
if marshallErr != nil {
|
||||
return marshallErr
|
||||
}
|
||||
|
||||
payload := dto.TaskMessage{
|
||||
Task: "ping",
|
||||
Payload: string(pingPayload),
|
||||
}
|
||||
|
||||
payloadMsg, marshallErr := json.Marshal(payload)
|
||||
if marshallErr != nil {
|
||||
return marshallErr
|
||||
}
|
||||
|
||||
publishErr := rabbitModule.Channel.Publish(
|
||||
"",
|
||||
"tasks",
|
||||
false,
|
||||
false,
|
||||
amqp.Publishing{Body: payloadMsg},
|
||||
)
|
||||
if publishErr != nil {
|
||||
return publishErr
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func flushQueues() error {
|
||||
log.Global.Get(log.SYSTEM).Info("Flush queues")
|
||||
|
||||
_, err := rabbitModule.Channel.QueuePurge("tasks", false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func listenTaskQueue() error {
|
||||
ctx := context.Background()
|
||||
|
||||
consume, err := rabbitModule.Channel.Consume(
|
||||
"tasks",
|
||||
"tasks-consumer",
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
taskCollection := tasks.CollectionMap()
|
||||
|
||||
for delivery := range consume {
|
||||
go handleQueueMessage(ctx, delivery, taskCollection)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleQueueMessage(ctx context.Context, delivery amqp.Delivery, taskCollection map[string]tasks.Task) {
|
||||
queueLog := log.Global.Get(log.QUEUE)
|
||||
|
||||
var taskMsg dto.TaskMessage
|
||||
unmarshalErr := json.Unmarshal(delivery.Body, &taskMsg)
|
||||
if unmarshalErr != nil {
|
||||
queueLog.WithField("body", string(delivery.Body)).Error("Failed unmarshal queue message - ", unmarshalErr)
|
||||
|
||||
rejectErr := delivery.Reject(false)
|
||||
if rejectErr != nil {
|
||||
queueLog.Error("Failed reject message - ", rejectErr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
task, ok := taskCollection[taskMsg.Task]
|
||||
|
||||
if !ok {
|
||||
queueLog.Error("Undefined task with name - ", taskMsg.Task)
|
||||
rejectErr := delivery.Reject(false)
|
||||
if rejectErr != nil {
|
||||
queueLog.Error("Failed reject message - ", rejectErr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
payload := []byte(taskMsg.Payload)
|
||||
|
||||
handleErr := task.Handle(ctx, payload)
|
||||
if handleErr != nil {
|
||||
queueLog.Error("Task handler return error - ", handleErr)
|
||||
|
||||
fallback, fallbackErr := task.Fallback(ctx, handleErr)
|
||||
if fallbackErr != nil {
|
||||
queueLog.Error("Task failed with error - ", handleErr, "; Task fallback handler also return error - ", fallbackErr, "; Reject message.")
|
||||
|
||||
rejectErr := delivery.Reject(false)
|
||||
if rejectErr != nil {
|
||||
queueLog.Error("Failed reject message - ", rejectErr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if !fallback {
|
||||
queueLog.Error("Task fallback unsuccessful, reject message")
|
||||
|
||||
rejectErr := delivery.Reject(false)
|
||||
if rejectErr != nil {
|
||||
queueLog.Error("Failed reject message - ", rejectErr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
ackErr := delivery.Ack(false)
|
||||
if ackErr != nil {
|
||||
queueLog.Error("Failed acknowledge message - ", ackErr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
ackErr := delivery.Ack(false)
|
||||
if ackErr != nil {
|
||||
queueLog.Error("Failed acknowledge message - ", ackErr)
|
||||
}
|
||||
|
||||
afterHandleErr := task.AfterHandle(ctx, payload)
|
||||
if afterHandleErr != nil {
|
||||
queueLog.Error("Task after handler hook failed with error - ", afterHandleErr)
|
||||
}
|
||||
}
|
||||
24
modules/queue/tasks/collection.go
Normal file
24
modules/queue/tasks/collection.go
Normal file
@@ -0,0 +1,24 @@
|
||||
package tasks
|
||||
|
||||
func Collection() []Task {
|
||||
return []Task{
|
||||
PingTask,
|
||||
}
|
||||
}
|
||||
|
||||
func CollectionMap() map[string]Task {
|
||||
collectionMap := make(map[string]Task)
|
||||
|
||||
for _, task := range Collection() {
|
||||
if task.Fallback == nil {
|
||||
task.Fallback = DefaultFallbackFn
|
||||
}
|
||||
if task.AfterHandle == nil {
|
||||
task.AfterHandle = DefaultAfterHandleFn
|
||||
}
|
||||
|
||||
collectionMap[task.Name] = task
|
||||
}
|
||||
|
||||
return collectionMap
|
||||
}
|
||||
92
modules/queue/tasks/ping.go
Normal file
92
modules/queue/tasks/ping.go
Normal file
@@ -0,0 +1,92 @@
|
||||
package tasks
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"git.ostiwe.com/ostiwe-com/status/model"
|
||||
"git.ostiwe.com/ostiwe-com/status/modules/log"
|
||||
"git.ostiwe.com/ostiwe-com/status/modules/queue/dto"
|
||||
rabbitModule "git.ostiwe.com/ostiwe-com/status/modules/rabbit"
|
||||
"git.ostiwe.com/ostiwe-com/status/repository"
|
||||
"git.ostiwe.com/ostiwe-com/status/service"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
var PingTask = Task{
|
||||
Name: "ping",
|
||||
Handle: func(ctx context.Context, msgBody []byte) error {
|
||||
logger := log.Global.Get(log.QUEUE)
|
||||
var msg dto.PingMessage
|
||||
|
||||
err := json.Unmarshal(msgBody, &msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Info("Received new ping task for service - ", msg.ServiceID)
|
||||
serviceRepository := repository.NewServiceRepository()
|
||||
|
||||
srv, err := serviceRepository.Find(ctx, msg.ServiceID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var lastStatus = &model.Status{Status: model.StatusWarn}
|
||||
if len(srv.Statuses) > 0 {
|
||||
lastStatus = &srv.Statuses[0]
|
||||
}
|
||||
|
||||
checker := service.NewCheck()
|
||||
|
||||
err = checker.Observe(ctx, srv)
|
||||
if err != nil {
|
||||
newStatus := model.StatusWarn
|
||||
|
||||
if lastStatus.Status == model.StatusWarn && srv.CountLastStatuses(model.StatusWarn) >= 10 {
|
||||
newStatus = model.StatusFailed
|
||||
}
|
||||
|
||||
if lastStatus.Status == model.StatusFailed {
|
||||
newStatus = model.StatusFailed
|
||||
}
|
||||
|
||||
logger.Info("Observe service with id ", msg.ServiceID, " failed - ", err, " setting [", newStatus, "] status")
|
||||
|
||||
regErr := checker.RegisterStatus(ctx, msg.ServiceID, newStatus)
|
||||
if regErr != nil {
|
||||
logger.Info("Setting status for service with id ", msg.ServiceID, " failed - ", err)
|
||||
|
||||
return regErr
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return checker.RegisterStatus(ctx, msg.ServiceID, model.StatusOK)
|
||||
},
|
||||
AfterHandle: func(ctx context.Context, msgBody []byte) error {
|
||||
time.Sleep(time.Second * 30)
|
||||
|
||||
payload := dto.TaskMessage{
|
||||
Task: "ping",
|
||||
Payload: string(msgBody),
|
||||
}
|
||||
|
||||
payloadMsg, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return rabbitModule.Channel.Publish(
|
||||
"",
|
||||
"tasks",
|
||||
false,
|
||||
false,
|
||||
amqp.Publishing{
|
||||
Body: payloadMsg,
|
||||
},
|
||||
)
|
||||
},
|
||||
}
|
||||
42
modules/queue/tasks/tasks.go
Normal file
42
modules/queue/tasks/tasks.go
Normal file
@@ -0,0 +1,42 @@
|
||||
package tasks
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.ostiwe.com/ostiwe-com/status/modules/log"
|
||||
)
|
||||
|
||||
var (
|
||||
DefaultFallbackFn = func(_ context.Context, _ error) (bool, error) {
|
||||
log.Global.Get(log.QUEUE).Info("Default fallback function triggered")
|
||||
|
||||
return true, nil
|
||||
}
|
||||
DefaultAfterHandleFn = func(_ context.Context, _ []byte) error {
|
||||
log.Global.Get(log.QUEUE).Info("Default after handler function triggered")
|
||||
|
||||
return nil
|
||||
}
|
||||
)
|
||||
|
||||
// Task represents a task that can be executed in the queue.
|
||||
//
|
||||
// Name returns the name of the task, used for logging and identification purposes.
|
||||
//
|
||||
// Handle processes a message from the queue. It takes a context and the raw message body ([]byte) as arguments.
|
||||
// If handling fails, it should return an error that can be caught in the Fallback method.
|
||||
//
|
||||
// AfterHandle performs some action after successfully processing the message with Handle.
|
||||
// It takes the same arguments as Handle: a context and the raw message body ([]byte).
|
||||
// Default is DefaultAfterHandleFn
|
||||
//
|
||||
// Fallback handles errors that occur during the execution of the Handle method.
|
||||
// It takes a context and an error as arguments. The method should return a boolean indicating whether the error was handled,
|
||||
// and error for additional logging or debugging information.
|
||||
// Default is DefaultFallbackFn
|
||||
type Task struct {
|
||||
Name string
|
||||
Handle func(ctx context.Context, msgBody []byte) error
|
||||
AfterHandle func(ctx context.Context, msgBody []byte) error
|
||||
Fallback func(ctx context.Context, err error) (bool, error)
|
||||
}
|
||||
66
modules/rabbit/rabbit.go
Normal file
66
modules/rabbit/rabbit.go
Normal file
@@ -0,0 +1,66 @@
|
||||
package rabbit
|
||||
|
||||
import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"git.ostiwe.com/ostiwe-com/status/modules/log"
|
||||
"git.ostiwe.com/ostiwe-com/status/pkg/rabbit"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var Channel *amqp.Channel
|
||||
|
||||
func InitConnection() {
|
||||
log.Global.Put(log.RABBIT, logrus.New())
|
||||
log.Global.Put(log.QUEUE, logrus.New())
|
||||
|
||||
rConn, err := rabbit.NewConnection(rabbit.ConnectionArgs{
|
||||
Host: os.Getenv("RABBIT_HOST"),
|
||||
Password: os.Getenv("RABBIT_PASSWORD"),
|
||||
User: os.Getenv("RABBIT_USER"),
|
||||
Port: os.Getenv("RABBIT_PORT"),
|
||||
ReconnectInterval: time.Second * 5,
|
||||
Logger: log.Global.Get(log.RABBIT),
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
rConn.IsAlive()
|
||||
|
||||
if err = declareQueues(rConn); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
channel, err := rConn.Channel()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
Channel = channel
|
||||
}
|
||||
|
||||
func declareQueues(conn *rabbit.Connection) error {
|
||||
channel, err := conn.Channel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = channel.QueueDeclare(
|
||||
"tasks",
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
22
pkg/args/args.go
Normal file
22
pkg/args/args.go
Normal file
@@ -0,0 +1,22 @@
|
||||
package args
|
||||
|
||||
import "git.ostiwe.com/ostiwe-com/status/version"
|
||||
|
||||
type ServerCmd struct {
|
||||
Port string `arg:"-p,--port" help:"Port to listen on" default:"8080"`
|
||||
}
|
||||
|
||||
type ServerDocumentationCmd struct {
|
||||
Port string `arg:"-p,--port" help:"Port to listen on" default:"8081"`
|
||||
Plain bool `arg:"--plain" help:"Enable plain text output" default:"true"`
|
||||
PlainFormat string `arg:"--plain-format" help:"Set format for output (json, yaml)" default:"yaml"`
|
||||
}
|
||||
|
||||
type AppArgs struct {
|
||||
Server *ServerCmd `arg:"subcommand:server" help:"Start the api server"`
|
||||
ServerDocumentation *ServerDocumentationCmd `arg:"subcommand:server-docs" help:"Generate documentation for api server"`
|
||||
}
|
||||
|
||||
func (AppArgs) Version() string {
|
||||
return version.AppVersion()
|
||||
}
|
||||
98
pkg/rabbit/connection.go
Normal file
98
pkg/rabbit/connection.go
Normal file
@@ -0,0 +1,98 @@
|
||||
package rabbit
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type ConnectionArgs struct {
|
||||
Host string
|
||||
Password string
|
||||
User string
|
||||
Port string
|
||||
ReconnectInterval time.Duration
|
||||
Logger *logrus.Logger
|
||||
}
|
||||
|
||||
type Connection struct {
|
||||
conn *amqp.Connection
|
||||
connString string
|
||||
|
||||
isClosed bool
|
||||
closeNotify chan *amqp.Error
|
||||
logger *logrus.Logger
|
||||
reconnectInterval time.Duration
|
||||
}
|
||||
|
||||
func NewConnection(args ConnectionArgs) (*Connection, error) {
|
||||
connString := fmt.Sprintf(
|
||||
"amqp://%s:%s@%s:%s/",
|
||||
args.User,
|
||||
args.Password,
|
||||
args.Host,
|
||||
args.Port,
|
||||
)
|
||||
|
||||
dial, err := amqp.Dial(connString)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c := &Connection{
|
||||
conn: dial,
|
||||
connString: connString,
|
||||
closeNotify: dial.NotifyClose(make(chan *amqp.Error)),
|
||||
logger: args.Logger,
|
||||
reconnectInterval: args.ReconnectInterval,
|
||||
}
|
||||
|
||||
if args.ReconnectInterval > 0 {
|
||||
go c.listenCloseNotify()
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *Connection) Channel() (*amqp.Channel, error) {
|
||||
return c.conn.Channel()
|
||||
}
|
||||
|
||||
func (c *Connection) IsAlive() bool {
|
||||
return c.isClosed
|
||||
}
|
||||
|
||||
func (c *Connection) listenCloseNotify() {
|
||||
if c.reconnectInterval <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
c.logger.Info(fmt.Sprintf(
|
||||
"Start listening close notify channel, with %s interval",
|
||||
c.reconnectInterval.String(),
|
||||
))
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c.closeNotify:
|
||||
c.logger.Info("Trying to reconnect to rabbit")
|
||||
|
||||
dial, dialErr := amqp.Dial(c.connString)
|
||||
if dialErr != nil {
|
||||
c.logger.Error("Error during reconnect try - ", dialErr)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
c.conn = dial
|
||||
|
||||
c.logger.Info("Rabbit connection stabilized")
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
time.Sleep(c.reconnectInterval)
|
||||
}
|
||||
}
|
||||
5
pkg/slice/chunk.go
Normal file
5
pkg/slice/chunk.go
Normal file
@@ -0,0 +1,5 @@
|
||||
package slice
|
||||
|
||||
func ChunkSlice[T any](slice []T, chunkSize int) [][]T {
|
||||
|
||||
}
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
|
||||
type Service interface {
|
||||
All(ctx context.Context, limit, offset int, publicOnly bool) ([]model.Service, error)
|
||||
Find(ctx context.Context, serviceID int) (*model.Service, error)
|
||||
}
|
||||
|
||||
type service struct {
|
||||
@@ -48,3 +49,15 @@ func (s *service) All(ctx context.Context, limit, offset int, publicOnly bool) (
|
||||
return items, query.
|
||||
Error
|
||||
}
|
||||
|
||||
func (s *service) Find(ctx context.Context, serviceID int) (*model.Service, error) {
|
||||
item := &model.Service{}
|
||||
|
||||
return item, s.db.
|
||||
WithContext(ctx).
|
||||
Preload("Statuses", func(db *gorm.DB) *gorm.DB {
|
||||
return db.Order("created_at desc")
|
||||
}).
|
||||
First(&item, "id", serviceID).
|
||||
Error
|
||||
}
|
||||
|
||||
26
repository/status.go
Normal file
26
repository/status.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.ostiwe.com/ostiwe-com/status/model"
|
||||
"git.ostiwe.com/ostiwe-com/status/modules/db"
|
||||
)
|
||||
|
||||
type Status interface {
|
||||
Add(ctx context.Context, status model.Status) error
|
||||
}
|
||||
|
||||
type status struct {
|
||||
repository
|
||||
}
|
||||
|
||||
func NewStatusRepository() Status {
|
||||
return &status{
|
||||
repository: repository{db: db.Global},
|
||||
}
|
||||
}
|
||||
|
||||
func (s status) Add(ctx context.Context, status model.Status) error {
|
||||
return s.db.WithContext(ctx).Create(&status).Error
|
||||
}
|
||||
47
server/server.go
Normal file
47
server/server.go
Normal file
@@ -0,0 +1,47 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"git.ostiwe.com/ostiwe-com/status/model"
|
||||
"git.ostiwe.com/ostiwe-com/status/modules/db"
|
||||
appLog "git.ostiwe.com/ostiwe-com/status/modules/log"
|
||||
"git.ostiwe.com/ostiwe-com/status/modules/queue"
|
||||
"git.ostiwe.com/ostiwe-com/status/pkg/args"
|
||||
"git.ostiwe.com/ostiwe-com/status/router"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func Run(serverArgs *args.ServerCmd) {
|
||||
connect, err := db.Connect()
|
||||
if err != nil {
|
||||
appLog.Global.Get(appLog.SYSTEM).Error(fmt.Sprintf("Startup server error, failed connect to database: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
db.SetGlobal(connect)
|
||||
appLog.Global.Get(appLog.SYSTEM).Info("Run db migration")
|
||||
if err = runMigrate(); err != nil {
|
||||
appLog.Global.Get(appLog.SYSTEM).Error(fmt.Sprintf("Migration failed, error: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
appLog.Global.Get(appLog.SYSTEM).Info("Start service observer")
|
||||
go queue.InitQueues()
|
||||
|
||||
appLog.Global.Put(appLog.SERVER, logrus.New())
|
||||
appLog.Global.Get(appLog.SERVER).Info("Startup server on port: ", serverArgs.Port)
|
||||
|
||||
err = http.ListenAndServe(fmt.Sprintf(":%s", serverArgs.Port), router.InitRoutes())
|
||||
if err != nil {
|
||||
appLog.Global.Get(appLog.SERVER).Error(fmt.Sprintf("Startup server error: %v", err))
|
||||
}
|
||||
}
|
||||
|
||||
func runMigrate() error {
|
||||
return db.Global.AutoMigrate(
|
||||
model.Service{},
|
||||
model.Status{},
|
||||
)
|
||||
}
|
||||
83
service/check.go
Normal file
83
service/check.go
Normal file
@@ -0,0 +1,83 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"git.ostiwe.com/ostiwe-com/status/model"
|
||||
"git.ostiwe.com/ostiwe-com/status/modules/log"
|
||||
"git.ostiwe.com/ostiwe-com/status/repository"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var (
|
||||
httpObserveFailed = errors.New("http observe fail")
|
||||
httpObserveMaxTries = errors.New("http observe max tries")
|
||||
)
|
||||
|
||||
func init() {
|
||||
log.Global.Put(log.CRON, logrus.New())
|
||||
}
|
||||
|
||||
type Check interface {
|
||||
Observe(ctx context.Context, srv *model.Service) error
|
||||
RegisterStatus(ctx context.Context, serviceID int, status model.StatusCode) error
|
||||
}
|
||||
|
||||
type check struct {
|
||||
serviceRepository repository.Service
|
||||
statusRepository repository.Status
|
||||
}
|
||||
|
||||
func NewCheck() Check {
|
||||
return &check{
|
||||
serviceRepository: repository.NewServiceRepository(),
|
||||
statusRepository: repository.NewStatusRepository(),
|
||||
}
|
||||
}
|
||||
|
||||
func (c check) Observe(ctx context.Context, srv *model.Service) error {
|
||||
return c.observeHttp(srv)
|
||||
}
|
||||
|
||||
func (c check) RegisterStatus(ctx context.Context, serviceID int, status model.StatusCode) error {
|
||||
return c.statusRepository.Add(ctx, model.Status{
|
||||
ServiceID: serviceID,
|
||||
Status: status,
|
||||
CreatedAt: time.Now(),
|
||||
})
|
||||
}
|
||||
|
||||
func (c check) observeHttp(service *model.Service) error {
|
||||
if service.TypeConfig == nil || service.TypeConfig.HTTPConfig == nil {
|
||||
return errors.New("service has no config for http")
|
||||
}
|
||||
|
||||
conf := service.TypeConfig.HTTPConfig
|
||||
|
||||
client := &http.Client{
|
||||
Timeout: time.Second * 5,
|
||||
}
|
||||
|
||||
request, err := http.NewRequest(conf.Method, service.Host, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for s := range conf.Headers {
|
||||
request.Header.Set(s, conf.Headers[s])
|
||||
}
|
||||
|
||||
response, err := client.Do(request)
|
||||
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
|
||||
return err
|
||||
}
|
||||
|
||||
if response.StatusCode != http.StatusOK {
|
||||
return httpObserveFailed
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -10,7 +10,6 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
AppVersion string
|
||||
AppStartTime time.Time
|
||||
)
|
||||
|
||||
@@ -18,10 +17,6 @@ func init() {
|
||||
appLog.SetGlobalManager(appLog.NewManager())
|
||||
appLog.Global.Put(appLog.SYSTEM, logrus.New())
|
||||
|
||||
if AppVersion == "" {
|
||||
AppVersion = "dev"
|
||||
}
|
||||
|
||||
AppStartTime = time.Now()
|
||||
|
||||
env := os.Getenv("APP_ENV")
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
package version
|
||||
|
||||
func AppVersion() string {
|
||||
return "0.0.1"
|
||||
return "1.0.0"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user