refactor(observer): Refactor observer service
This commit is contained in:
114
service/check.go
114
service/check.go
@@ -3,11 +3,7 @@ package service
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"net"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
|
||||||
"slices"
|
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.ostiwe.com/ostiwe-com/status/model"
|
"git.ostiwe.com/ostiwe-com/status/model"
|
||||||
@@ -26,109 +22,35 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Check interface {
|
type Check interface {
|
||||||
Start(interval time.Duration)
|
Observe(ctx context.Context, srv *model.Service) error
|
||||||
|
RegisterStatus(ctx context.Context, serviceID int, status model.StatusCode) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type check struct {
|
type check struct {
|
||||||
serviceRepository repository.Service
|
serviceRepository repository.Service
|
||||||
maxServicesPerRoutine int
|
|
||||||
statusRepository repository.Status
|
statusRepository repository.Status
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCheck(maxServicesPerRoutine int) Check {
|
func NewCheck() Check {
|
||||||
if maxServicesPerRoutine <= 0 {
|
|
||||||
maxServicesPerRoutine = 15
|
|
||||||
}
|
|
||||||
|
|
||||||
return &check{
|
return &check{
|
||||||
maxServicesPerRoutine: maxServicesPerRoutine,
|
|
||||||
serviceRepository: repository.NewServiceRepository(),
|
serviceRepository: repository.NewServiceRepository(),
|
||||||
statusRepository: repository.NewStatusRepository(),
|
statusRepository: repository.NewStatusRepository(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c check) Start(interval time.Duration) {
|
func (c check) Observe(ctx context.Context, srv *model.Service) error {
|
||||||
for {
|
return c.observeHttp(srv)
|
||||||
time.Sleep(interval)
|
|
||||||
|
|
||||||
services, err := c.serviceRepository.All(context.Background(), -1, 0, false)
|
|
||||||
if err != nil {
|
|
||||||
log.Global.Get(log.CRON).Error(err)
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wg := new(sync.WaitGroup)
|
func (c check) RegisterStatus(ctx context.Context, serviceID int, status model.StatusCode) error {
|
||||||
|
return c.statusRepository.Add(ctx, model.Status{
|
||||||
chunked := slices.Chunk(services, c.maxServicesPerRoutine)
|
ServiceID: serviceID,
|
||||||
|
Status: status,
|
||||||
for i := range chunked {
|
CreatedAt: time.Now(),
|
||||||
wg.Add(1)
|
|
||||||
|
|
||||||
go c.observeChunk(wg, i)
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c check) observeChunk(wg *sync.WaitGroup, chunk []model.Service) {
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
for _, item := range chunk {
|
|
||||||
switch item.Type {
|
|
||||||
case "http":
|
|
||||||
err := c.observeHttp(item, 0)
|
|
||||||
if err == nil {
|
|
||||||
addErr := c.statusRepository.Add(model.Status{
|
|
||||||
ServiceID: item.ID,
|
|
||||||
Status: model.StatusOK,
|
|
||||||
})
|
})
|
||||||
if addErr != nil {
|
|
||||||
log.Global.Get(log.CRON).Error("Add status to status repository fail", item.ID, addErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if errors.Is(err, httpObserveFailed) {
|
|
||||||
addErr := c.statusRepository.Add(model.Status{
|
|
||||||
ServiceID: item.ID,
|
|
||||||
Status: model.StatusFailed,
|
|
||||||
})
|
|
||||||
if addErr != nil {
|
|
||||||
log.Global.Get(log.CRON).Error("Add status to status repository fail", item.ID, addErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Global.Get(log.CRON).Error("Unexpected observe fail", item.ID, err)
|
|
||||||
|
|
||||||
break
|
|
||||||
case "tcp":
|
|
||||||
log.Global.Get(log.CRON).Warn("Cannot handle service with id: ", item.ID, ", because a tcp observer not implemented.")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c check) observeHttp(service model.Service, attempt int) error {
|
|
||||||
if attempt >= 20 {
|
|
||||||
return httpObserveMaxTries
|
|
||||||
}
|
|
||||||
|
|
||||||
if attempt > 5 {
|
|
||||||
err := c.statusRepository.Add(model.Status{
|
|
||||||
ServiceID: service.ID,
|
|
||||||
Status: model.StatusWarn,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c check) observeHttp(service *model.Service) error {
|
||||||
if service.TypeConfig == nil || service.TypeConfig.HTTPConfig == nil {
|
if service.TypeConfig == nil || service.TypeConfig.HTTPConfig == nil {
|
||||||
return errors.New("service has no config for http")
|
return errors.New("service has no config for http")
|
||||||
}
|
}
|
||||||
@@ -150,23 +72,9 @@ func (c check) observeHttp(service model.Service, attempt int) error {
|
|||||||
|
|
||||||
response, err := client.Do(request)
|
response, err := client.Do(request)
|
||||||
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
|
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
|
||||||
var (
|
|
||||||
e *url.Error
|
|
||||||
opErr *net.OpError
|
|
||||||
)
|
|
||||||
if errors.As(err, &e) && errors.As(e.Err, &opErr) {
|
|
||||||
time.Sleep(2 * time.Second)
|
|
||||||
|
|
||||||
return c.observeHttp(service, attempt+1)
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if errors.Is(err, context.DeadlineExceeded) {
|
|
||||||
return c.observeHttp(service, attempt+1)
|
|
||||||
}
|
|
||||||
|
|
||||||
if response.StatusCode != http.StatusOK {
|
if response.StatusCode != http.StatusOK {
|
||||||
return httpObserveFailed
|
return httpObserveFailed
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user