From 86ff71f33c95ec11479b49bb4d0e8282fbda33da Mon Sep 17 00:00:00 2001 From: lorsan Date: Fri, 15 May 2026 19:51:21 +0300 Subject: [PATCH] refactor: errors moved to errors.go --- .../service/connection_manager/agent.go | 31 ++++++++++--------- .../service/connection_manager/errors.go | 5 +++ 2 files changed, 21 insertions(+), 15 deletions(-) create mode 100644 hub/internal/service/connection_manager/errors.go diff --git a/hub/internal/service/connection_manager/agent.go b/hub/internal/service/connection_manager/agent.go index c932fce..74d93e5 100644 --- a/hub/internal/service/connection_manager/agent.go +++ b/hub/internal/service/connection_manager/agent.go @@ -20,21 +20,22 @@ type statusAgent interface { // использовать sync.Pool что бы переиспользвоать этот обьект type AgentConnection struct { - stream streamConn - heartbeat heartbeatStore - log zerolog.Logger - status statusAgent - AgentID string - response *store.ResponseStore - ctx context.Context - cancel context.CancelFunc + stream streamConn + heartbeat heartbeatStore + log zerolog.Logger + status statusAgent + AgentID string + response *store.ResponseStore + ctx context.Context + cancel context.CancelFunc + heartbeatTimeoutMS int } -func newAgentConnection(agentID string, stream streamConn, heartbeat heartbeatStore, status statusAgent, logger zerolog.Logger) *AgentConnection { +func newAgentConnection(agentID string, stream streamConn, heartbeat heartbeatStore, status statusAgent, heartbeatTimeoutMS int, logger zerolog.Logger) *AgentConnection { response := store.NewResponseStore() logger = logger.With().Str("agentID", agentID).Logger() ctx, cancel := context.WithCancel(stream.Context()) - return &AgentConnection{stream: stream, response: response, heartbeat: heartbeat, log: logger, AgentID: agentID, status: status, ctx: ctx, cancel: cancel} + return &AgentConnection{stream: stream, response: response, heartbeat: heartbeat, log: logger, AgentID: agentID, status: status, ctx: ctx, cancel: cancel, heartbeatTimeoutMS: heartbeatTimeoutMS} } func (a *AgentConnection) Listen() error { @@ -83,14 +84,14 @@ func (a *AgentConnection) Listen() error { func (a *AgentConnection) listenHeartbeat(heartbeats <-chan domainHub.CreateHeartbeatModel) { lastHeartbeat := 0 - timer := time.NewTicker(5 * time.Second) + timer := time.NewTicker(time.Duration(a.heartbeatTimeoutMS) * time.Millisecond) defer timer.Stop() for { select { case <-timer.C: - if lastHeartbeat < 30 { - lastHeartbeat += 5 + if lastHeartbeat < 4 { + lastHeartbeat += 1 a.status.Offline() continue } @@ -134,9 +135,9 @@ func (a *AgentConnection) Execute(ctx context.Context, request domainHub.AgentRe select { case <-a.ctx.Done(): - return domainHub.AgentResponse{}, fmt.Errorf("connection close") + return domainHub.AgentResponse{}, ConnectionCloseErr case <-ctx.Done(): - return domainHub.AgentResponse{}, fmt.Errorf("request timeout") + return domainHub.AgentResponse{}, ctx.Err() case response := <-ch: a.log.Info().Str("requestID", requestID).Msg("received response") return response, nil diff --git a/hub/internal/service/connection_manager/errors.go b/hub/internal/service/connection_manager/errors.go new file mode 100644 index 0000000..56d2a88 --- /dev/null +++ b/hub/internal/service/connection_manager/errors.go @@ -0,0 +1,5 @@ +package connection_manager + +import "errors" + +var ConnectionCloseErr error = errors.New("connection close")