mirror of
https://github.com/lorsanstand/HomeOps-Hub.git
synced 2026-06-19 17:45:17 +03:00
refactor: errors moved to errors.go
This commit is contained in:
@@ -20,21 +20,22 @@ type statusAgent interface {
|
|||||||
|
|
||||||
// использовать sync.Pool что бы переиспользвоать этот обьект
|
// использовать sync.Pool что бы переиспользвоать этот обьект
|
||||||
type AgentConnection struct {
|
type AgentConnection struct {
|
||||||
stream streamConn
|
stream streamConn
|
||||||
heartbeat heartbeatStore
|
heartbeat heartbeatStore
|
||||||
log zerolog.Logger
|
log zerolog.Logger
|
||||||
status statusAgent
|
status statusAgent
|
||||||
AgentID string
|
AgentID string
|
||||||
response *store.ResponseStore
|
response *store.ResponseStore
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
|
heartbeatTimeoutMS int
|
||||||
}
|
}
|
||||||
|
|
||||||
func newAgentConnection(agentID string, stream streamConn, heartbeat heartbeatStore, status statusAgent, logger zerolog.Logger) *AgentConnection {
|
func newAgentConnection(agentID string, stream streamConn, heartbeat heartbeatStore, status statusAgent, heartbeatTimeoutMS int, logger zerolog.Logger) *AgentConnection {
|
||||||
response := store.NewResponseStore()
|
response := store.NewResponseStore()
|
||||||
logger = logger.With().Str("agentID", agentID).Logger()
|
logger = logger.With().Str("agentID", agentID).Logger()
|
||||||
ctx, cancel := context.WithCancel(stream.Context())
|
ctx, cancel := context.WithCancel(stream.Context())
|
||||||
return &AgentConnection{stream: stream, response: response, heartbeat: heartbeat, log: logger, AgentID: agentID, status: status, ctx: ctx, cancel: cancel}
|
return &AgentConnection{stream: stream, response: response, heartbeat: heartbeat, log: logger, AgentID: agentID, status: status, ctx: ctx, cancel: cancel, heartbeatTimeoutMS: heartbeatTimeoutMS}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *AgentConnection) Listen() error {
|
func (a *AgentConnection) Listen() error {
|
||||||
@@ -83,14 +84,14 @@ func (a *AgentConnection) Listen() error {
|
|||||||
|
|
||||||
func (a *AgentConnection) listenHeartbeat(heartbeats <-chan domainHub.CreateHeartbeatModel) {
|
func (a *AgentConnection) listenHeartbeat(heartbeats <-chan domainHub.CreateHeartbeatModel) {
|
||||||
lastHeartbeat := 0
|
lastHeartbeat := 0
|
||||||
timer := time.NewTicker(5 * time.Second)
|
timer := time.NewTicker(time.Duration(a.heartbeatTimeoutMS) * time.Millisecond)
|
||||||
defer timer.Stop()
|
defer timer.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
if lastHeartbeat < 30 {
|
if lastHeartbeat < 4 {
|
||||||
lastHeartbeat += 5
|
lastHeartbeat += 1
|
||||||
a.status.Offline()
|
a.status.Offline()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -134,9 +135,9 @@ func (a *AgentConnection) Execute(ctx context.Context, request domainHub.AgentRe
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case <-a.ctx.Done():
|
case <-a.ctx.Done():
|
||||||
return domainHub.AgentResponse{}, fmt.Errorf("connection close")
|
return domainHub.AgentResponse{}, ConnectionCloseErr
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return domainHub.AgentResponse{}, fmt.Errorf("request timeout")
|
return domainHub.AgentResponse{}, ctx.Err()
|
||||||
case response := <-ch:
|
case response := <-ch:
|
||||||
a.log.Info().Str("requestID", requestID).Msg("received response")
|
a.log.Info().Str("requestID", requestID).Msg("received response")
|
||||||
return response, nil
|
return response, nil
|
||||||
|
|||||||
@@ -0,0 +1,5 @@
|
|||||||
|
package connection_manager
|
||||||
|
|
||||||
|
import "errors"
|
||||||
|
|
||||||
|
var ConnectionCloseErr error = errors.New("connection close")
|
||||||
Reference in New Issue
Block a user