diff --git a/hub/internal/service/connection_manager/agent.go b/hub/internal/service/connection_manager/agent.go index ab7fac6..624681b 100644 --- a/hub/internal/service/connection_manager/agent.go +++ b/hub/internal/service/connection_manager/agent.go @@ -8,6 +8,7 @@ import ( pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" + "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager/store" "github.com/rs/zerolog" ) @@ -18,18 +19,17 @@ type statusAgent interface { // использовать sync.Pool что бы переиспользвоать этот обьект type AgentConnection struct { - stream streamConn - store heartbeatStore - log zerolog.Logger - status statusAgent - AgentID string - // Не безопасно, если нужно будет добавлять новый канал и брать какой то все сломается. Исправить - responseChan map[string]chan domainHub.AgentResponse + stream streamConn + heartbeat heartbeatStore + log zerolog.Logger + status statusAgent + AgentID string + response *store.ResponseStore } -func newAgentConnection(agentID string, stream streamConn, store heartbeatStore, status statusAgent, logger zerolog.Logger) *AgentConnection { - response := make(map[string]chan domainHub.AgentResponse) - return &AgentConnection{stream: stream, responseChan: response, store: store, log: logger, AgentID: agentID, status: status} +func newAgentConnection(agentID string, stream streamConn, heartbeat heartbeatStore, status statusAgent, logger zerolog.Logger) *AgentConnection { + response := store.NewResponseStore() + return &AgentConnection{stream: stream, response: response, heartbeat: heartbeat, log: logger, AgentID: agentID, status: status} } func (a *AgentConnection) Listen() error { @@ -87,7 +87,7 @@ func (a *AgentConnection) listenHeartbeat(ctx context.Context, heartbeats <-chan case heartbeat := <-heartbeats: a.status.Online() lastHeartbeat = 0 - err := a.store.CreateHeartbeat(ctx, heartbeat) + err := a.heartbeat.CreateHeartbeat(ctx, heartbeat) if err != nil { a.log.Error().Err(err).Str("agentID", heartbeat.AgentID).Msg("failed to write heartbeat") } diff --git a/hub/internal/service/connection_manager/store/agent.go b/hub/internal/service/connection_manager/store/agent.go new file mode 100644 index 0000000..730d96c --- /dev/null +++ b/hub/internal/service/connection_manager/store/agent.go @@ -0,0 +1,35 @@ +package store + +import ( + "sync" + + domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" +) + +type ResponseStore struct { + store map[string]chan domainHub.AgentResponse + mutex sync.RWMutex +} + +func NewResponseStore() *ResponseStore { + data := make(map[string]chan domainHub.AgentResponse) + return &ResponseStore{store: data} +} + +func (r *ResponseStore) Write(responseID string, channel chan domainHub.AgentResponse) { + r.mutex.Lock() + defer r.mutex.Unlock() + r.store[responseID] = channel +} + +func (r *ResponseStore) Read(responseID string) chan domainHub.AgentResponse { + r.mutex.RLock() + defer r.mutex.RUnlock() + return r.store[responseID] +} + +func (r *ResponseStore) Delete(responseID string) { + r.mutex.Lock() + defer r.mutex.Unlock() + delete(r.store, responseID) +}