From 54ed8fd0d2056a736eb4991b4c1524974c8c3ea6 Mon Sep 17 00:00:00 2001 From: lorsan Date: Mon, 11 May 2026 09:07:32 +0300 Subject: [PATCH] feat: create heartbeat processing system --- .../service/connection_manager/agent.go | 98 +++++++++++++++++++ .../service/connection_manager/interface.go | 19 ++++ .../service/connection_manager/mapper.go | 22 +++++ 3 files changed, 139 insertions(+) create mode 100644 hub/internal/service/connection_manager/agent.go create mode 100644 hub/internal/service/connection_manager/interface.go create mode 100644 hub/internal/service/connection_manager/mapper.go diff --git a/hub/internal/service/connection_manager/agent.go b/hub/internal/service/connection_manager/agent.go new file mode 100644 index 0000000..ab7fac6 --- /dev/null +++ b/hub/internal/service/connection_manager/agent.go @@ -0,0 +1,98 @@ +package connection_manager + +import ( + "context" + "fmt" + "io" + "time" + + pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" + domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" + "github.com/rs/zerolog" +) + +type statusAgent interface { + Offline() + Online() +} + +// использовать sync.Pool что бы переиспользвоать этот обьект +type AgentConnection struct { + stream streamConn + store heartbeatStore + log zerolog.Logger + status statusAgent + AgentID string + // Не безопасно, если нужно будет добавлять новый канал и брать какой то все сломается. Исправить + responseChan map[string]chan domainHub.AgentResponse +} + +func newAgentConnection(agentID string, stream streamConn, store heartbeatStore, status statusAgent, logger zerolog.Logger) *AgentConnection { + response := make(map[string]chan domainHub.AgentResponse) + return &AgentConnection{stream: stream, responseChan: response, store: store, log: logger, AgentID: agentID, status: status} +} + +func (a *AgentConnection) Listen() error { + ctx := a.stream.Context() + defer a.status.Offline() + + heartbeatsChan := make(chan domainHub.CreateHeartbeatModel, 5) + go a.listenHeartbeat(ctx, heartbeatsChan) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + agentEvent, err := a.stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return fmt.Errorf("stream error: %w", err) + } + + switch x := agentEvent.Event.(type) { + case *pb.AgentEvent_Heartbeat: + heartbeat := toCreateHeartbeatModel(a.AgentID, x) + + a.log.Debug(). + Str("agentID", heartbeat.AgentID). + Float64("cpu usage", heartbeat.Metrics.CpuUsage). + Float64("disk usage", heartbeat.Metrics.DiskUsage). + Float64("memory usage", heartbeat.Metrics.MemoryUsage).Msg("") + + heartbeatsChan <- heartbeat + } + } + } +} + +func (a *AgentConnection) listenHeartbeat(ctx context.Context, heartbeats <-chan domainHub.CreateHeartbeatModel) { + lastHeartbeat := 0 + timer := time.NewTicker(5 * time.Second) + defer timer.Stop() + + for { + select { + case <-timer.C: + if lastHeartbeat < 30 { + lastHeartbeat += 5 + a.status.Offline() + continue + } + a.log.Warn().Str("agentID", a.AgentID).Msg("agent did not send heartbeat") + a.stream.Close() + return + case heartbeat := <-heartbeats: + a.status.Online() + lastHeartbeat = 0 + err := a.store.CreateHeartbeat(ctx, heartbeat) + if err != nil { + a.log.Error().Err(err).Str("agentID", heartbeat.AgentID).Msg("failed to write heartbeat") + } + case <-ctx.Done(): + return + } + } +} diff --git a/hub/internal/service/connection_manager/interface.go b/hub/internal/service/connection_manager/interface.go new file mode 100644 index 0000000..c21a83f --- /dev/null +++ b/hub/internal/service/connection_manager/interface.go @@ -0,0 +1,19 @@ +package connection_manager + +import ( + "context" + + pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" + domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" +) + +type streamConn interface { + Send(request *pb.ServerCommandRequest, err error) + Recv() (*pb.AgentEvent, error) + Context() context.Context + Close() error +} + +type heartbeatStore interface { + CreateHeartbeat(ctx context.Context, heartbeat domainHub.CreateHeartbeatModel) error +} diff --git a/hub/internal/service/connection_manager/mapper.go b/hub/internal/service/connection_manager/mapper.go new file mode 100644 index 0000000..2abd303 --- /dev/null +++ b/hub/internal/service/connection_manager/mapper.go @@ -0,0 +1,22 @@ +package connection_manager + +import ( + "time" + + pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" + domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" +) + +func toCreateHeartbeatModel(agentID string, heartbeat *pb.AgentEvent_Heartbeat) domainHub.CreateHeartbeatModel { + timestamp := time.Unix(heartbeat.Heartbeat.Timestamp, 0) + + return domainHub.CreateHeartbeatModel{ + AgentID: agentID, + Timestamp: timestamp, + Metrics: domainHub.SystemMetrics{ + MemoryUsage: float64(heartbeat.Heartbeat.Metrics.MemoryUsage), + CpuUsage: float64(heartbeat.Heartbeat.Metrics.CpuUsage), + DiskUsage: float64(heartbeat.Heartbeat.Metrics.DiskUsage), + }, + } +}