From 87defbf3913dd19998787d501b5ce96858b080eb Mon Sep 17 00:00:00 2001 From: ostiwe Date: Mon, 21 Jul 2025 01:59:03 +0300 Subject: [PATCH 01/16] feat: Add check service (WIP) --- main.go | 5 ++ model/service.go | 3 +- model/status.go | 6 ++ modules/log/manager.go | 1 + pkg/slice/chunk.go | 5 ++ repository/status.go | 24 ++++++ service/check.go | 175 +++++++++++++++++++++++++++++++++++++++++ settings/settings.go | 5 -- version/version.go | 2 +- 9 files changed, 219 insertions(+), 7 deletions(-) create mode 100644 pkg/slice/chunk.go create mode 100644 repository/status.go create mode 100644 service/check.go diff --git a/main.go b/main.go index bd8414f..705e64c 100644 --- a/main.go +++ b/main.go @@ -3,11 +3,13 @@ package main import ( "fmt" "net/http" + "time" "git.ostiwe.com/ostiwe-com/status/model" "git.ostiwe.com/ostiwe-com/status/modules/db" appLog "git.ostiwe.com/ostiwe-com/status/modules/log" "git.ostiwe.com/ostiwe-com/status/router" + "git.ostiwe.com/ostiwe-com/status/service" "git.ostiwe.com/ostiwe-com/status/version" "github.com/alexflint/go-arg" "github.com/go-chi/chi/v5" @@ -54,6 +56,9 @@ func main() { return } + appLog.Global.Get(appLog.SYSTEM).Info("Start service observer") + go service.NewCheck(20).Start(time.Second * 10) + appLog.Global.Put(appLog.SERVER, logrus.New()) appLog.Global.Get(appLog.SERVER).Info("Startup server on port: ", args.Server.Port) diff --git a/model/service.go b/model/service.go index b721924..428cf45 100644 --- a/model/service.go +++ b/model/service.go @@ -1,7 +1,8 @@ package model type HTTPConfig struct { - Authorization string `json:"authorization"` + Method string `json:"method"` + Headers map[string]string `json:"headers"` } type ServiceTypeCheckConfig struct { diff --git a/model/status.go b/model/status.go index 2b98e10..c7b2cbe 100644 --- a/model/status.go +++ b/model/status.go @@ -6,6 +6,12 @@ import ( "gorm.io/gorm" ) +const ( + StatusOK = "ok" // Means - response ok, service is alive + StatusFailed = "failed" // Means - response failed, all tries failed, service down + StatusWarn = "warn" // Means - response failed after N tries and still watched +) + type Status struct { ID int `gorm:"primary_key;auto_increment" json:"-"` ServiceID int `gorm:"one" json:"-"` diff --git a/modules/log/manager.go b/modules/log/manager.go index 6e507ed..0414956 100644 --- a/modules/log/manager.go +++ b/modules/log/manager.go @@ -10,6 +10,7 @@ const ( SERVER = "server" SYSTEM = "system" DATABASE = "database" + CRON = "cron" ) var Global *LoggerManager diff --git a/pkg/slice/chunk.go b/pkg/slice/chunk.go new file mode 100644 index 0000000..5bdc100 --- /dev/null +++ b/pkg/slice/chunk.go @@ -0,0 +1,5 @@ +package slice + +func ChunkSlice[T any](slice []T, chunkSize int) [][]T { + +} diff --git a/repository/status.go b/repository/status.go new file mode 100644 index 0000000..8cc9f27 --- /dev/null +++ b/repository/status.go @@ -0,0 +1,24 @@ +package repository + +import ( + "git.ostiwe.com/ostiwe-com/status/model" + "git.ostiwe.com/ostiwe-com/status/modules/db" +) + +type Status interface { + Add(status model.Status) error +} + +type status struct { + repository +} + +func NewStatusRepository() Status { + return &status{ + repository: repository{db: db.Global}, + } +} + +func (s status) Add(status model.Status) error { + return s.db.Create(&status).Error +} diff --git a/service/check.go b/service/check.go new file mode 100644 index 0000000..58e6e1f --- /dev/null +++ b/service/check.go @@ -0,0 +1,175 @@ +package service + +import ( + "context" + "errors" + "net" + "net/http" + "net/url" + "slices" + "sync" + "time" + + "git.ostiwe.com/ostiwe-com/status/model" + "git.ostiwe.com/ostiwe-com/status/modules/log" + "git.ostiwe.com/ostiwe-com/status/repository" + "github.com/sirupsen/logrus" +) + +var ( + httpObserveFailed = errors.New("http observe fail") + httpObserveMaxTries = errors.New("http observe max tries") +) + +func init() { + log.Global.Put(log.CRON, logrus.New()) +} + +type Check interface { + Start(interval time.Duration) +} + +type check struct { + serviceRepository repository.Service + maxServicesPerRoutine int + statusRepository repository.Status +} + +func NewCheck(maxServicesPerRoutine int) Check { + if maxServicesPerRoutine <= 0 { + maxServicesPerRoutine = 15 + } + + return &check{ + maxServicesPerRoutine: maxServicesPerRoutine, + serviceRepository: repository.NewServiceRepository(), + statusRepository: repository.NewStatusRepository(), + } +} + +func (c check) Start(interval time.Duration) { + for { + time.Sleep(interval) + + services, err := c.serviceRepository.All(context.Background(), -1, 0, false) + if err != nil { + log.Global.Get(log.CRON).Error(err) + + continue + } + + wg := new(sync.WaitGroup) + + chunked := slices.Chunk(services, c.maxServicesPerRoutine) + + for i := range chunked { + wg.Add(1) + + go c.observeChunk(wg, i) + } + + wg.Wait() + + } +} + +func (c check) observeChunk(wg *sync.WaitGroup, chunk []model.Service) { + defer wg.Done() + + for _, item := range chunk { + switch item.Type { + case "http": + err := c.observeHttp(item, 0) + if err == nil { + addErr := c.statusRepository.Add(model.Status{ + ServiceID: item.ID, + Status: model.StatusOK, + }) + if addErr != nil { + log.Global.Get(log.CRON).Error("Add status to status repository fail", item.ID, addErr) + } + + continue + } + + if errors.Is(err, httpObserveFailed) { + addErr := c.statusRepository.Add(model.Status{ + ServiceID: item.ID, + Status: model.StatusFailed, + }) + if addErr != nil { + log.Global.Get(log.CRON).Error("Add status to status repository fail", item.ID, addErr) + } + + continue + } + + log.Global.Get(log.CRON).Error("Unexpected observe fail", item.ID, err) + + break + case "tcp": + log.Global.Get(log.CRON).Warn("Cannot handle service with id: ", item.ID, ", because a tcp observer not implemented.") + continue + } + } +} + +func (c check) observeHttp(service model.Service, attempt int) error { + if attempt >= 20 { + return httpObserveMaxTries + } + + if attempt > 5 { + err := c.statusRepository.Add(model.Status{ + ServiceID: service.ID, + Status: model.StatusWarn, + }) + if err != nil { + return err + } + } + + if service.TypeConfig == nil || service.TypeConfig.HTTPConfig == nil { + return errors.New("service has no config for http") + } + + conf := service.TypeConfig.HTTPConfig + + client := &http.Client{ + Timeout: time.Second * 5, + } + + request, err := http.NewRequest(conf.Method, service.Host, nil) + if err != nil { + return err + } + + for s := range conf.Headers { + request.Header.Set(s, conf.Headers[s]) + } + + response, err := client.Do(request) + if err != nil && !errors.Is(err, context.DeadlineExceeded) { + var ( + e *url.Error + opErr *net.OpError + ) + if errors.As(err, &e) && errors.As(e.Err, &opErr) { + time.Sleep(2 * time.Second) + + return c.observeHttp(service, attempt+1) + } + + return err + } + + if errors.Is(err, context.DeadlineExceeded) { + return c.observeHttp(service, attempt+1) + } + + if response.StatusCode != http.StatusOK { + return httpObserveFailed + } + + return nil +} diff --git a/settings/settings.go b/settings/settings.go index cd4c758..df26b7d 100644 --- a/settings/settings.go +++ b/settings/settings.go @@ -10,7 +10,6 @@ import ( ) var ( - AppVersion string AppStartTime time.Time ) @@ -18,10 +17,6 @@ func init() { appLog.SetGlobalManager(appLog.NewManager()) appLog.Global.Put(appLog.SYSTEM, logrus.New()) - if AppVersion == "" { - AppVersion = "dev" - } - AppStartTime = time.Now() env := os.Getenv("APP_ENV") diff --git a/version/version.go b/version/version.go index c7878da..29add40 100644 --- a/version/version.go +++ b/version/version.go @@ -1,5 +1,5 @@ package version func AppVersion() string { - return "0.0.1" + return "0.0.3" } From 4603668956fd2a180b5e0bc27a6c8bd19a2f0fdb Mon Sep 17 00:00:00 2001 From: ostiwe Date: Sun, 10 Aug 2025 22:10:44 +0300 Subject: [PATCH 02/16] feat(rabbitmq): Add rabbitmq --- .env | 7 ++++++- docker-compose.yml | 9 +++++++++ go.mod | 1 + go.sum | 4 ++++ 4 files changed, 20 insertions(+), 1 deletion(-) diff --git a/.env b/.env index 538702c..f2597c5 100644 --- a/.env +++ b/.env @@ -3,4 +3,9 @@ DATABASE_PORT= DATABASE_USER= DATABASE_PASS= DATABASE_DB= -DATABASE_TZ=Europe/Moscow \ No newline at end of file +DATABASE_TZ=Europe/Moscow + +RABBIT_USER=user +RABBIT_PASSWORD=user +RABBIT_HOST=localhost +RABBIT_PORT=5672 diff --git a/docker-compose.yml b/docker-compose.yml index 39e60bc..bb3771a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,5 +12,14 @@ services: ports: - "5444:5432" + rabbitmq: + image: rabbitmq:4.1.2-management + environment: + RABBITMQ_DEFAULT_USER: user + RABBITMQ_DEFAULT_PASS: user + ports: + - "5672:5672" + - "15672:15672" + volumes: database_postgres: \ No newline at end of file diff --git a/go.mod b/go.mod index fc5979f..dbfad2e 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/go-andiamo/chioas v1.16.4 github.com/go-chi/chi/v5 v5.2.2 github.com/joho/godotenv v1.5.1 + github.com/rabbitmq/amqp091-go v1.10.0 github.com/sirupsen/logrus v1.9.3 go.uber.org/mock v0.5.2 gorm.io/driver/postgres v1.6.0 diff --git a/go.sum b/go.sum index 16a9eae..5fddf93 100644 --- a/go.sum +++ b/go.sum @@ -35,6 +35,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= @@ -45,6 +47,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/mock v0.5.2 h1:LbtPTcP8A5k9WPXj54PPPbjcI4Y6lhyOZXn+VS7wNko= go.uber.org/mock v0.5.2/go.mod h1:wLlUxC2vVTPTaE3UD51E0BGOAElKrILxhVSDYQLld5o= golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM= From 68c98bb038244a7c32539785e25aa88614127c91 Mon Sep 17 00:00:00 2001 From: ostiwe Date: Sun, 10 Aug 2025 22:13:42 +0300 Subject: [PATCH 03/16] refactor(server): Move server to package --- main.go | 76 +++++++----------------------------------------- pkg/args/args.go | 22 ++++++++++++++ server/server.go | 47 ++++++++++++++++++++++++++++++ 3 files changed, 80 insertions(+), 65 deletions(-) create mode 100644 pkg/args/args.go create mode 100644 server/server.go diff --git a/main.go b/main.go index 705e64c..e6cc9d3 100644 --- a/main.go +++ b/main.go @@ -3,78 +3,31 @@ package main import ( "fmt" "net/http" - "time" - "git.ostiwe.com/ostiwe-com/status/model" - "git.ostiwe.com/ostiwe-com/status/modules/db" appLog "git.ostiwe.com/ostiwe-com/status/modules/log" + "git.ostiwe.com/ostiwe-com/status/pkg/args" "git.ostiwe.com/ostiwe-com/status/router" - "git.ostiwe.com/ostiwe-com/status/service" - "git.ostiwe.com/ostiwe-com/status/version" + "git.ostiwe.com/ostiwe-com/status/server" + _ "git.ostiwe.com/ostiwe-com/status/settings" "github.com/alexflint/go-arg" "github.com/go-chi/chi/v5" - "github.com/sirupsen/logrus" - - _ "git.ostiwe.com/ostiwe-com/status/settings" ) -type ServerCmd struct { - Port string `arg:"-p,--port" help:"Port to listen on" default:"8080"` -} - -type ServerDocumentationCmd struct { - Port string `arg:"-p,--port" help:"Port to listen on" default:"8081"` - Plain bool `arg:"--plain" help:"Enable plain text output" default:"true"` - PlainFormat string `arg:"--plain-format" help:"Set format for output (json, yaml)" default:"yaml"` -} - -type appArgs struct { - Server *ServerCmd `arg:"subcommand:server" help:"Start the api server"` - ServerDocumentation *ServerDocumentationCmd `arg:"subcommand:server-docs" help:"Generate documentation for api server"` -} - -var args appArgs - -func (appArgs) Version() string { - return version.AppVersion() -} +var appArgs args.AppArgs func main() { - arg.MustParse(&args) - - if args.Server != nil { - connect, err := db.Connect() - if err != nil { - appLog.Global.Get(appLog.SYSTEM).Error(fmt.Sprintf("Startup server error, failed connect to database: %v", err)) - return - } - - db.SetGlobal(connect) - appLog.Global.Get(appLog.SYSTEM).Info("Run db migration") - if err = runMigrate(); err != nil { - appLog.Global.Get(appLog.SYSTEM).Error(fmt.Sprintf("Migration failed, error: %v", err)) - return - } - - appLog.Global.Get(appLog.SYSTEM).Info("Start service observer") - go service.NewCheck(20).Start(time.Second * 10) - - appLog.Global.Put(appLog.SERVER, logrus.New()) - appLog.Global.Get(appLog.SERVER).Info("Startup server on port: ", args.Server.Port) - - err = http.ListenAndServe(fmt.Sprintf(":%s", args.Server.Port), router.InitRoutes()) - if err != nil { - appLog.Global.Get(appLog.SERVER).Error(fmt.Sprintf("Startup server error: %v", err)) - } + arg.MustParse(&appArgs) + if appArgs.Server != nil { + server.Run(appArgs.Server) return } - if args.ServerDocumentation != nil { + if appArgs.ServerDocumentation != nil { appLog.Global.Get(appLog.SYSTEM).Info("Collect documentation") docs := router.Documentate() - if !args.ServerDocumentation.Plain { + if !appArgs.ServerDocumentation.Plain { chiRouter := chi.NewRouter() err := docs.SetupRoutes(chiRouter, docs) @@ -83,8 +36,8 @@ func main() { return } - appLog.Global.Get(appLog.SYSTEM).Info(fmt.Sprintf("Start documentation server on port: %s", args.ServerDocumentation.Port)) - err = http.ListenAndServe(fmt.Sprintf(":%s", args.ServerDocumentation.Port), chiRouter) + appLog.Global.Get(appLog.SYSTEM).Info(fmt.Sprintf("Start documentation server on port: %s", appArgs.ServerDocumentation.Port)) + err = http.ListenAndServe(fmt.Sprintf(":%s", appArgs.ServerDocumentation.Port), chiRouter) if err != nil { appLog.Global.Get(appLog.SYSTEM).Error(fmt.Sprintf("Startup server error: %v", err)) } @@ -96,10 +49,3 @@ func main() { appLog.Global.Get(appLog.SYSTEM).Info("Exit from application") } - -func runMigrate() error { - return db.Global.AutoMigrate( - model.Service{}, - model.Status{}, - ) -} diff --git a/pkg/args/args.go b/pkg/args/args.go new file mode 100644 index 0000000..8775d4f --- /dev/null +++ b/pkg/args/args.go @@ -0,0 +1,22 @@ +package args + +import "git.ostiwe.com/ostiwe-com/status/version" + +type ServerCmd struct { + Port string `arg:"-p,--port" help:"Port to listen on" default:"8080"` +} + +type ServerDocumentationCmd struct { + Port string `arg:"-p,--port" help:"Port to listen on" default:"8081"` + Plain bool `arg:"--plain" help:"Enable plain text output" default:"true"` + PlainFormat string `arg:"--plain-format" help:"Set format for output (json, yaml)" default:"yaml"` +} + +type AppArgs struct { + Server *ServerCmd `arg:"subcommand:server" help:"Start the api server"` + ServerDocumentation *ServerDocumentationCmd `arg:"subcommand:server-docs" help:"Generate documentation for api server"` +} + +func (AppArgs) Version() string { + return version.AppVersion() +} diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..243c8ae --- /dev/null +++ b/server/server.go @@ -0,0 +1,47 @@ +package server + +import ( + "fmt" + "net/http" + + "git.ostiwe.com/ostiwe-com/status/model" + "git.ostiwe.com/ostiwe-com/status/modules/db" + appLog "git.ostiwe.com/ostiwe-com/status/modules/log" + "git.ostiwe.com/ostiwe-com/status/modules/queue" + "git.ostiwe.com/ostiwe-com/status/pkg/args" + "git.ostiwe.com/ostiwe-com/status/router" + "github.com/sirupsen/logrus" +) + +func Run(serverArgs *args.ServerCmd) { + connect, err := db.Connect() + if err != nil { + appLog.Global.Get(appLog.SYSTEM).Error(fmt.Sprintf("Startup server error, failed connect to database: %v", err)) + return + } + + db.SetGlobal(connect) + appLog.Global.Get(appLog.SYSTEM).Info("Run db migration") + if err = runMigrate(); err != nil { + appLog.Global.Get(appLog.SYSTEM).Error(fmt.Sprintf("Migration failed, error: %v", err)) + return + } + + appLog.Global.Get(appLog.SYSTEM).Info("Start service observer") + go queue.InitQueues() + + appLog.Global.Put(appLog.SERVER, logrus.New()) + appLog.Global.Get(appLog.SERVER).Info("Startup server on port: ", serverArgs.Port) + + err = http.ListenAndServe(fmt.Sprintf(":%s", serverArgs.Port), router.InitRoutes()) + if err != nil { + appLog.Global.Get(appLog.SERVER).Error(fmt.Sprintf("Startup server error: %v", err)) + } +} + +func runMigrate() error { + return db.Global.AutoMigrate( + model.Service{}, + model.Status{}, + ) +} From d22aca00b5c2db5835f2457bebb5e28ca756991b Mon Sep 17 00:00:00 2001 From: ostiwe Date: Sun, 10 Aug 2025 22:16:18 +0300 Subject: [PATCH 04/16] refactor(observer): Refactor observer service --- service/check.go | 124 ++++++----------------------------------------- 1 file changed, 16 insertions(+), 108 deletions(-) diff --git a/service/check.go b/service/check.go index 58e6e1f..5447dd5 100644 --- a/service/check.go +++ b/service/check.go @@ -3,11 +3,7 @@ package service import ( "context" "errors" - "net" "net/http" - "net/url" - "slices" - "sync" "time" "git.ostiwe.com/ostiwe-com/status/model" @@ -26,109 +22,35 @@ func init() { } type Check interface { - Start(interval time.Duration) + Observe(ctx context.Context, srv *model.Service) error + RegisterStatus(ctx context.Context, serviceID int, status model.StatusCode) error } type check struct { - serviceRepository repository.Service - maxServicesPerRoutine int - statusRepository repository.Status + serviceRepository repository.Service + statusRepository repository.Status } -func NewCheck(maxServicesPerRoutine int) Check { - if maxServicesPerRoutine <= 0 { - maxServicesPerRoutine = 15 - } - +func NewCheck() Check { return &check{ - maxServicesPerRoutine: maxServicesPerRoutine, - serviceRepository: repository.NewServiceRepository(), - statusRepository: repository.NewStatusRepository(), + serviceRepository: repository.NewServiceRepository(), + statusRepository: repository.NewStatusRepository(), } } -func (c check) Start(interval time.Duration) { - for { - time.Sleep(interval) - - services, err := c.serviceRepository.All(context.Background(), -1, 0, false) - if err != nil { - log.Global.Get(log.CRON).Error(err) - - continue - } - - wg := new(sync.WaitGroup) - - chunked := slices.Chunk(services, c.maxServicesPerRoutine) - - for i := range chunked { - wg.Add(1) - - go c.observeChunk(wg, i) - } - - wg.Wait() - - } +func (c check) Observe(ctx context.Context, srv *model.Service) error { + return c.observeHttp(srv) } -func (c check) observeChunk(wg *sync.WaitGroup, chunk []model.Service) { - defer wg.Done() - - for _, item := range chunk { - switch item.Type { - case "http": - err := c.observeHttp(item, 0) - if err == nil { - addErr := c.statusRepository.Add(model.Status{ - ServiceID: item.ID, - Status: model.StatusOK, - }) - if addErr != nil { - log.Global.Get(log.CRON).Error("Add status to status repository fail", item.ID, addErr) - } - - continue - } - - if errors.Is(err, httpObserveFailed) { - addErr := c.statusRepository.Add(model.Status{ - ServiceID: item.ID, - Status: model.StatusFailed, - }) - if addErr != nil { - log.Global.Get(log.CRON).Error("Add status to status repository fail", item.ID, addErr) - } - - continue - } - - log.Global.Get(log.CRON).Error("Unexpected observe fail", item.ID, err) - - break - case "tcp": - log.Global.Get(log.CRON).Warn("Cannot handle service with id: ", item.ID, ", because a tcp observer not implemented.") - continue - } - } +func (c check) RegisterStatus(ctx context.Context, serviceID int, status model.StatusCode) error { + return c.statusRepository.Add(ctx, model.Status{ + ServiceID: serviceID, + Status: status, + CreatedAt: time.Now(), + }) } -func (c check) observeHttp(service model.Service, attempt int) error { - if attempt >= 20 { - return httpObserveMaxTries - } - - if attempt > 5 { - err := c.statusRepository.Add(model.Status{ - ServiceID: service.ID, - Status: model.StatusWarn, - }) - if err != nil { - return err - } - } - +func (c check) observeHttp(service *model.Service) error { if service.TypeConfig == nil || service.TypeConfig.HTTPConfig == nil { return errors.New("service has no config for http") } @@ -150,23 +72,9 @@ func (c check) observeHttp(service model.Service, attempt int) error { response, err := client.Do(request) if err != nil && !errors.Is(err, context.DeadlineExceeded) { - var ( - e *url.Error - opErr *net.OpError - ) - if errors.As(err, &e) && errors.As(e.Err, &opErr) { - time.Sleep(2 * time.Second) - - return c.observeHttp(service, attempt+1) - } - return err } - if errors.Is(err, context.DeadlineExceeded) { - return c.observeHttp(service, attempt+1) - } - if response.StatusCode != http.StatusOK { return httpObserveFailed } From 37906d51a80629077f251de962130f8c210d7d98 Mon Sep 17 00:00:00 2001 From: ostiwe Date: Sun, 10 Aug 2025 22:16:34 +0300 Subject: [PATCH 05/16] refactor(router): Rename file --- router/{init.go => server.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename router/{init.go => server.go} (100%) diff --git a/router/init.go b/router/server.go similarity index 100% rename from router/init.go rename to router/server.go From bdcf65276e2f31c43046815f972e0085e8744505 Mon Sep 17 00:00:00 2001 From: ostiwe Date: Sun, 10 Aug 2025 22:17:18 +0300 Subject: [PATCH 06/16] feat(repository): Add Find method for service repository --- repository/service.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/repository/service.go b/repository/service.go index c8cac92..1dbf258 100644 --- a/repository/service.go +++ b/repository/service.go @@ -13,6 +13,7 @@ import ( type Service interface { All(ctx context.Context, limit, offset int, publicOnly bool) ([]model.Service, error) + Find(ctx context.Context, serviceID int) (*model.Service, error) } type service struct { @@ -48,3 +49,15 @@ func (s *service) All(ctx context.Context, limit, offset int, publicOnly bool) ( return items, query. Error } + +func (s *service) Find(ctx context.Context, serviceID int) (*model.Service, error) { + item := &model.Service{} + + return item, s.db. + WithContext(ctx). + Preload("Statuses", func(db *gorm.DB) *gorm.DB { + return db.Order("created_at desc") + }). + First(&item, "id", serviceID). + Error +} From 5df15fb40ae772ca931662c0a3ca2defd95a6921 Mon Sep 17 00:00:00 2001 From: ostiwe Date: Sun, 10 Aug 2025 22:17:41 +0300 Subject: [PATCH 07/16] feat(repository): Add method for create status by observer --- repository/status.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/repository/status.go b/repository/status.go index 8cc9f27..5a9af3c 100644 --- a/repository/status.go +++ b/repository/status.go @@ -1,12 +1,14 @@ package repository import ( + "context" + "git.ostiwe.com/ostiwe-com/status/model" "git.ostiwe.com/ostiwe-com/status/modules/db" ) type Status interface { - Add(status model.Status) error + Add(ctx context.Context, status model.Status) error } type status struct { @@ -19,6 +21,6 @@ func NewStatusRepository() Status { } } -func (s status) Add(status model.Status) error { - return s.db.Create(&status).Error +func (s status) Add(ctx context.Context, status model.Status) error { + return s.db.WithContext(ctx).Create(&status).Error } From 73b791615f1b45d7231f63780555d5324205c914 Mon Sep 17 00:00:00 2001 From: ostiwe Date: Sun, 10 Aug 2025 22:18:24 +0300 Subject: [PATCH 08/16] feat(rabbitmq): Add rabbitmq connection with reconnect feature --- pkg/rabbit/connection.go | 98 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 pkg/rabbit/connection.go diff --git a/pkg/rabbit/connection.go b/pkg/rabbit/connection.go new file mode 100644 index 0000000..63c5fc1 --- /dev/null +++ b/pkg/rabbit/connection.go @@ -0,0 +1,98 @@ +package rabbit + +import ( + "fmt" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/sirupsen/logrus" +) + +type ConnectionArgs struct { + Host string + Password string + User string + Port string + ReconnectInterval time.Duration + Logger *logrus.Logger +} + +type Connection struct { + conn *amqp.Connection + connString string + + isClosed bool + closeNotify chan *amqp.Error + logger *logrus.Logger + reconnectInterval time.Duration +} + +func NewConnection(args ConnectionArgs) (*Connection, error) { + connString := fmt.Sprintf( + "amqp://%s:%s@%s:%s/", + args.User, + args.Password, + args.Host, + args.Port, + ) + + dial, err := amqp.Dial(connString) + if err != nil { + return nil, err + } + + c := &Connection{ + conn: dial, + connString: connString, + closeNotify: dial.NotifyClose(make(chan *amqp.Error)), + logger: args.Logger, + reconnectInterval: args.ReconnectInterval, + } + + if args.ReconnectInterval > 0 { + go c.listenCloseNotify() + } + + return c, nil +} + +func (c *Connection) Channel() (*amqp.Channel, error) { + return c.conn.Channel() +} + +func (c *Connection) IsAlive() bool { + return c.isClosed +} + +func (c *Connection) listenCloseNotify() { + if c.reconnectInterval <= 0 { + return + } + + c.logger.Info(fmt.Sprintf( + "Start listening close notify channel, with %s interval", + c.reconnectInterval.String(), + )) + + for { + select { + case <-c.closeNotify: + c.logger.Info("Trying to reconnect to rabbit") + + dial, dialErr := amqp.Dial(c.connString) + if dialErr != nil { + c.logger.Error("Error during reconnect try - ", dialErr) + + continue + } + + c.conn = dial + + c.logger.Info("Rabbit connection stabilized") + + break + } + + time.Sleep(c.reconnectInterval) + } +} From 76d03fb426c3988a16f63dafd8817884f3fee650 Mon Sep 17 00:00:00 2001 From: ostiwe Date: Sun, 10 Aug 2025 22:18:47 +0300 Subject: [PATCH 09/16] refactor(logger): Refactor logger --- modules/log/manager.go | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/modules/log/manager.go b/modules/log/manager.go index 0414956..66e8ed2 100644 --- a/modules/log/manager.go +++ b/modules/log/manager.go @@ -11,6 +11,9 @@ const ( SYSTEM = "system" DATABASE = "database" CRON = "cron" + QUEUE = "queue" + RABBIT = "rabbit" + internal = "internal" ) var Global *LoggerManager @@ -25,6 +28,9 @@ type LoggerManager struct { } func (m *LoggerManager) Get(name string) *logrus.Logger { + m.mu.Lock() + defer m.mu.Unlock() + if logger, ok := m.loggers[name]; ok { return logger } @@ -36,13 +42,32 @@ func (m *LoggerManager) Put(name string, logger *logrus.Logger) { m.mu.Lock() defer m.mu.Unlock() + if _, ok := m.loggers[name]; ok { + m.Get(internal).Errorf("Logger with name '%s' already exists. Use PutForce to replace it.", name) + + return + } + + logger.Formatter = NewTextFormatter(name) + + m.loggers[name] = logger +} + +func (m *LoggerManager) PutForce(name string, logger *logrus.Logger) { + m.mu.Lock() + defer m.mu.Unlock() + logger.Formatter = NewTextFormatter(name) m.loggers[name] = logger } func NewManager() *LoggerManager { - return &LoggerManager{ + manager := &LoggerManager{ loggers: make(map[string]*logrus.Logger), } + + manager.Put(internal, logrus.New()) + + return manager } From 23874ef97534b341de3d4c1e70a2b9f105e284e9 Mon Sep 17 00:00:00 2001 From: ostiwe Date: Sun, 10 Aug 2025 22:19:33 +0300 Subject: [PATCH 10/16] feat(rabbitmq): Add rabbitmq module --- modules/rabbit/rabbit.go | 66 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 modules/rabbit/rabbit.go diff --git a/modules/rabbit/rabbit.go b/modules/rabbit/rabbit.go new file mode 100644 index 0000000..7484c63 --- /dev/null +++ b/modules/rabbit/rabbit.go @@ -0,0 +1,66 @@ +package rabbit + +import ( + "os" + "time" + + "git.ostiwe.com/ostiwe-com/status/modules/log" + "git.ostiwe.com/ostiwe-com/status/pkg/rabbit" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/sirupsen/logrus" +) + +var Channel *amqp.Channel + +func InitConnection() { + log.Global.Put(log.RABBIT, logrus.New()) + log.Global.Put(log.QUEUE, logrus.New()) + + rConn, err := rabbit.NewConnection(rabbit.ConnectionArgs{ + Host: os.Getenv("RABBIT_HOST"), + Password: os.Getenv("RABBIT_PASSWORD"), + User: os.Getenv("RABBIT_USER"), + Port: os.Getenv("RABBIT_PORT"), + ReconnectInterval: time.Second * 5, + Logger: log.Global.Get(log.RABBIT), + }, + ) + + if err != nil { + panic(err) + } + + rConn.IsAlive() + + if err = declareQueues(rConn); err != nil { + panic(err) + } + + channel, err := rConn.Channel() + if err != nil { + panic(err) + } + + Channel = channel +} + +func declareQueues(conn *rabbit.Connection) error { + channel, err := conn.Channel() + if err != nil { + return err + } + + _, err = channel.QueueDeclare( + "tasks", + true, + false, + false, + false, + nil, + ) + if err != nil { + return err + } + + return nil +} From 57c8c86c3acce35646055d7b7ac36d8a917c0cf9 Mon Sep 17 00:00:00 2001 From: ostiwe Date: Sun, 10 Aug 2025 22:19:53 +0300 Subject: [PATCH 11/16] refactor(model): Refactor Status model --- model/status.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/model/status.go b/model/status.go index c7b2cbe..655d9b2 100644 --- a/model/status.go +++ b/model/status.go @@ -6,18 +6,20 @@ import ( "gorm.io/gorm" ) +type StatusCode string + const ( - StatusOK = "ok" // Means - response ok, service is alive - StatusFailed = "failed" // Means - response failed, all tries failed, service down - StatusWarn = "warn" // Means - response failed after N tries and still watched + StatusOK StatusCode = "ok" // Means - response ok, service is alive + StatusFailed StatusCode = "failed" // Means - response failed, all tries failed, service down + StatusWarn StatusCode = "warn" // Means - response failed after N tries and still watched ) type Status struct { - ID int `gorm:"primary_key;auto_increment" json:"-"` - ServiceID int `gorm:"one" json:"-"` - Status string `gorm:"size:255;not null" json:"status"` - Description *string `gorm:"size:255" json:"description"` - CreatedAt time.Time `json:"created_at"` + ID int `gorm:"primary_key;auto_increment" json:"-"` + ServiceID int `gorm:"one" json:"-"` + Status StatusCode `gorm:"size:255;not null" json:"status"` + Description *string `gorm:"size:255" json:"description"` + CreatedAt time.Time `json:"created_at"` } func (Status) TableName() string { From 7cfa8cc52b79ee0b41b40435937ff75c59c26cb4 Mon Sep 17 00:00:00 2001 From: ostiwe Date: Sun, 10 Aug 2025 22:20:41 +0300 Subject: [PATCH 12/16] feat(model): Add method for getting count of last statuses --- model/service.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/model/service.go b/model/service.go index 428cf45..bcf9e7c 100644 --- a/model/service.go +++ b/model/service.go @@ -33,3 +33,17 @@ type Service struct { func (Service) TableName() string { return "service" } + +func (s Service) CountLastStatuses(status StatusCode) uint { + var count uint = 0 + + for i := range s.Statuses { + if s.Statuses[i].Status != status { + break + } + + count++ + } + + return count +} From b75dfc984929426f163fc107edddbfc4323344ab Mon Sep 17 00:00:00 2001 From: ostiwe Date: Sun, 10 Aug 2025 22:21:06 +0300 Subject: [PATCH 13/16] feat(rabbitmq): Add dto for messages --- modules/queue/dto/ping.go | 5 +++++ modules/queue/dto/task.go | 6 ++++++ 2 files changed, 11 insertions(+) create mode 100644 modules/queue/dto/ping.go create mode 100644 modules/queue/dto/task.go diff --git a/modules/queue/dto/ping.go b/modules/queue/dto/ping.go new file mode 100644 index 0000000..373b436 --- /dev/null +++ b/modules/queue/dto/ping.go @@ -0,0 +1,5 @@ +package dto + +type PingMessage struct { + ServiceID int `json:"serviceID"` +} diff --git a/modules/queue/dto/task.go b/modules/queue/dto/task.go new file mode 100644 index 0000000..27a1902 --- /dev/null +++ b/modules/queue/dto/task.go @@ -0,0 +1,6 @@ +package dto + +type TaskMessage struct { + Task string `json:"task"` + Payload string `json:"payload"` +} From e9b9cf21bff789ca1ab0641c50764228e91bd49b Mon Sep 17 00:00:00 2001 From: ostiwe Date: Sun, 10 Aug 2025 22:21:22 +0300 Subject: [PATCH 14/16] feat(rabbitmq): Add tasks for processing --- modules/queue/tasks/collection.go | 24 ++++++++ modules/queue/tasks/ping.go | 92 +++++++++++++++++++++++++++++++ modules/queue/tasks/tasks.go | 42 ++++++++++++++ 3 files changed, 158 insertions(+) create mode 100644 modules/queue/tasks/collection.go create mode 100644 modules/queue/tasks/ping.go create mode 100644 modules/queue/tasks/tasks.go diff --git a/modules/queue/tasks/collection.go b/modules/queue/tasks/collection.go new file mode 100644 index 0000000..770eef3 --- /dev/null +++ b/modules/queue/tasks/collection.go @@ -0,0 +1,24 @@ +package tasks + +func Collection() []Task { + return []Task{ + PingTask, + } +} + +func CollectionMap() map[string]Task { + collectionMap := make(map[string]Task) + + for _, task := range Collection() { + if task.Fallback == nil { + task.Fallback = DefaultFallbackFn + } + if task.AfterHandle == nil { + task.AfterHandle = DefaultAfterHandleFn + } + + collectionMap[task.Name] = task + } + + return collectionMap +} diff --git a/modules/queue/tasks/ping.go b/modules/queue/tasks/ping.go new file mode 100644 index 0000000..5b8c6d4 --- /dev/null +++ b/modules/queue/tasks/ping.go @@ -0,0 +1,92 @@ +package tasks + +import ( + "context" + "encoding/json" + "time" + + "git.ostiwe.com/ostiwe-com/status/model" + "git.ostiwe.com/ostiwe-com/status/modules/log" + "git.ostiwe.com/ostiwe-com/status/modules/queue/dto" + rabbitModule "git.ostiwe.com/ostiwe-com/status/modules/rabbit" + "git.ostiwe.com/ostiwe-com/status/repository" + "git.ostiwe.com/ostiwe-com/status/service" + amqp "github.com/rabbitmq/amqp091-go" +) + +var PingTask = Task{ + Name: "ping", + Handle: func(ctx context.Context, msgBody []byte) error { + logger := log.Global.Get(log.QUEUE) + var msg dto.PingMessage + + err := json.Unmarshal(msgBody, &msg) + if err != nil { + return err + } + + logger.Info("Received new ping task for service - ", msg.ServiceID) + serviceRepository := repository.NewServiceRepository() + + srv, err := serviceRepository.Find(ctx, msg.ServiceID) + if err != nil { + return err + } + + var lastStatus = &model.Status{Status: model.StatusWarn} + if len(srv.Statuses) > 0 { + lastStatus = &srv.Statuses[0] + } + + checker := service.NewCheck() + + err = checker.Observe(ctx, srv) + if err != nil { + newStatus := model.StatusWarn + + if lastStatus.Status == model.StatusWarn && srv.CountLastStatuses(model.StatusWarn) >= 10 { + newStatus = model.StatusFailed + } + + if lastStatus.Status == model.StatusFailed { + newStatus = model.StatusFailed + } + + logger.Info("Observe service with id ", msg.ServiceID, " failed - ", err, " setting [", newStatus, "] status") + + regErr := checker.RegisterStatus(ctx, msg.ServiceID, newStatus) + if regErr != nil { + logger.Info("Setting status for service with id ", msg.ServiceID, " failed - ", err) + + return regErr + } + + return nil + } + + return checker.RegisterStatus(ctx, msg.ServiceID, model.StatusOK) + }, + AfterHandle: func(ctx context.Context, msgBody []byte) error { + time.Sleep(time.Second * 30) + + payload := dto.TaskMessage{ + Task: "ping", + Payload: string(msgBody), + } + + payloadMsg, err := json.Marshal(payload) + if err != nil { + return err + } + + return rabbitModule.Channel.Publish( + "", + "tasks", + false, + false, + amqp.Publishing{ + Body: payloadMsg, + }, + ) + }, +} diff --git a/modules/queue/tasks/tasks.go b/modules/queue/tasks/tasks.go new file mode 100644 index 0000000..05845fb --- /dev/null +++ b/modules/queue/tasks/tasks.go @@ -0,0 +1,42 @@ +package tasks + +import ( + "context" + + "git.ostiwe.com/ostiwe-com/status/modules/log" +) + +var ( + DefaultFallbackFn = func(_ context.Context, _ error) (bool, error) { + log.Global.Get(log.QUEUE).Info("Default fallback function triggered") + + return true, nil + } + DefaultAfterHandleFn = func(_ context.Context, _ []byte) error { + log.Global.Get(log.QUEUE).Info("Default after handler function triggered") + + return nil + } +) + +// Task represents a task that can be executed in the queue. +// +// Name returns the name of the task, used for logging and identification purposes. +// +// Handle processes a message from the queue. It takes a context and the raw message body ([]byte) as arguments. +// If handling fails, it should return an error that can be caught in the Fallback method. +// +// AfterHandle performs some action after successfully processing the message with Handle. +// It takes the same arguments as Handle: a context and the raw message body ([]byte). +// Default is DefaultAfterHandleFn +// +// Fallback handles errors that occur during the execution of the Handle method. +// It takes a context and an error as arguments. The method should return a boolean indicating whether the error was handled, +// and error for additional logging or debugging information. +// Default is DefaultFallbackFn +type Task struct { + Name string + Handle func(ctx context.Context, msgBody []byte) error + AfterHandle func(ctx context.Context, msgBody []byte) error + Fallback func(ctx context.Context, err error) (bool, error) +} From 58c21338d3a32547b0ab0f58c1525cfc6dfc26cd Mon Sep 17 00:00:00 2001 From: ostiwe Date: Sun, 10 Aug 2025 22:21:40 +0300 Subject: [PATCH 15/16] feat(queue): Add queue module --- modules/queue/queue.go | 187 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 187 insertions(+) create mode 100644 modules/queue/queue.go diff --git a/modules/queue/queue.go b/modules/queue/queue.go new file mode 100644 index 0000000..eb44fd7 --- /dev/null +++ b/modules/queue/queue.go @@ -0,0 +1,187 @@ +package queue + +import ( + "context" + "encoding/json" + "time" + + "git.ostiwe.com/ostiwe-com/status/modules/log" + "git.ostiwe.com/ostiwe-com/status/modules/queue/dto" + "git.ostiwe.com/ostiwe-com/status/modules/queue/tasks" + rabbitModule "git.ostiwe.com/ostiwe-com/status/modules/rabbit" + "git.ostiwe.com/ostiwe-com/status/repository" + amqp "github.com/rabbitmq/amqp091-go" +) + +func InitQueues() { + rabbitModule.InitConnection() + + err := flushQueues() + if err != nil { + panic(err) + } + + if err = createTasksForServices(); err != nil { + panic(err) + } + + log.Global.Get(log.SYSTEM).Info("Wait 5 seconds before start listen task queue") + time.Sleep(time.Second * 5) + + if err = listenTaskQueue(); err != nil { + panic(err) + } +} + +func createTasksForServices() error { + log.Global.Get(log.SYSTEM).Info("Create tasks ping for services") + + serviceRepository := repository.NewServiceRepository() + + services, err := serviceRepository.All(context.Background(), -1, 0, false) + if err != nil { + return err + } + + for i := range services { + pingPayload, marshallErr := json.Marshal(dto.PingMessage{ServiceID: services[i].ID}) + if marshallErr != nil { + return marshallErr + } + + payload := dto.TaskMessage{ + Task: "ping", + Payload: string(pingPayload), + } + + payloadMsg, marshallErr := json.Marshal(payload) + if marshallErr != nil { + return marshallErr + } + + publishErr := rabbitModule.Channel.Publish( + "", + "tasks", + false, + false, + amqp.Publishing{Body: payloadMsg}, + ) + if publishErr != nil { + return publishErr + } + } + + return nil +} + +func flushQueues() error { + log.Global.Get(log.SYSTEM).Info("Flush queues") + + _, err := rabbitModule.Channel.QueuePurge("tasks", false) + if err != nil { + return err + } + + return nil +} + +func listenTaskQueue() error { + ctx := context.Background() + + consume, err := rabbitModule.Channel.Consume( + "tasks", + "tasks-consumer", + false, + false, + false, + false, + nil, + ) + if err != nil { + return err + } + + taskCollection := tasks.CollectionMap() + + for delivery := range consume { + go handleQueueMessage(ctx, delivery, taskCollection) + } + + return nil +} + +func handleQueueMessage(ctx context.Context, delivery amqp.Delivery, taskCollection map[string]tasks.Task) { + queueLog := log.Global.Get(log.QUEUE) + + var taskMsg dto.TaskMessage + unmarshalErr := json.Unmarshal(delivery.Body, &taskMsg) + if unmarshalErr != nil { + queueLog.WithField("body", string(delivery.Body)).Error("Failed unmarshal queue message - ", unmarshalErr) + + rejectErr := delivery.Reject(false) + if rejectErr != nil { + queueLog.Error("Failed reject message - ", rejectErr) + } + + return + } + + task, ok := taskCollection[taskMsg.Task] + + if !ok { + queueLog.Error("Undefined task with name - ", taskMsg.Task) + rejectErr := delivery.Reject(false) + if rejectErr != nil { + queueLog.Error("Failed reject message - ", rejectErr) + } + + return + } + + payload := []byte(taskMsg.Payload) + + handleErr := task.Handle(ctx, payload) + if handleErr != nil { + queueLog.Error("Task handler return error - ", handleErr) + + fallback, fallbackErr := task.Fallback(ctx, handleErr) + if fallbackErr != nil { + queueLog.Error("Task failed with error - ", handleErr, "; Task fallback handler also return error - ", fallbackErr, "; Reject message.") + + rejectErr := delivery.Reject(false) + if rejectErr != nil { + queueLog.Error("Failed reject message - ", rejectErr) + } + + return + } + + if !fallback { + queueLog.Error("Task fallback unsuccessful, reject message") + + rejectErr := delivery.Reject(false) + if rejectErr != nil { + queueLog.Error("Failed reject message - ", rejectErr) + } + + return + } + + ackErr := delivery.Ack(false) + if ackErr != nil { + queueLog.Error("Failed acknowledge message - ", ackErr) + } + + return + } + + ackErr := delivery.Ack(false) + if ackErr != nil { + queueLog.Error("Failed acknowledge message - ", ackErr) + } + + afterHandleErr := task.AfterHandle(ctx, payload) + if afterHandleErr != nil { + queueLog.Error("Task after handler hook failed with error - ", afterHandleErr) + } +} From b98ec252c428c1b88c8e9d86abaf9341efc79388 Mon Sep 17 00:00:00 2001 From: ostiwe Date: Sun, 10 Aug 2025 22:31:18 +0300 Subject: [PATCH 16/16] feat: Update version --- version/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version/version.go b/version/version.go index 29add40..07a3901 100644 --- a/version/version.go +++ b/version/version.go @@ -1,5 +1,5 @@ package version func AppVersion() string { - return "0.0.3" + return "1.0.0" }