mirror of
https://github.com/lorsanstand/HomeOps-Hub.git
synced 2026-06-19 14:25:16 +03:00
refactor: switch response chan map for store
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
|||||||
|
|
||||||
pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops"
|
pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops"
|
||||||
domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain"
|
domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain"
|
||||||
|
"github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager/store"
|
||||||
"github.com/rs/zerolog"
|
"github.com/rs/zerolog"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -18,18 +19,17 @@ type statusAgent interface {
|
|||||||
|
|
||||||
// использовать sync.Pool что бы переиспользвоать этот обьект
|
// использовать sync.Pool что бы переиспользвоать этот обьект
|
||||||
type AgentConnection struct {
|
type AgentConnection struct {
|
||||||
stream streamConn
|
stream streamConn
|
||||||
store heartbeatStore
|
heartbeat heartbeatStore
|
||||||
log zerolog.Logger
|
log zerolog.Logger
|
||||||
status statusAgent
|
status statusAgent
|
||||||
AgentID string
|
AgentID string
|
||||||
// Не безопасно, если нужно будет добавлять новый канал и брать какой то все сломается. Исправить
|
response *store.ResponseStore
|
||||||
responseChan map[string]chan domainHub.AgentResponse
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newAgentConnection(agentID string, stream streamConn, store heartbeatStore, status statusAgent, logger zerolog.Logger) *AgentConnection {
|
func newAgentConnection(agentID string, stream streamConn, heartbeat heartbeatStore, status statusAgent, logger zerolog.Logger) *AgentConnection {
|
||||||
response := make(map[string]chan domainHub.AgentResponse)
|
response := store.NewResponseStore()
|
||||||
return &AgentConnection{stream: stream, responseChan: response, store: store, log: logger, AgentID: agentID, status: status}
|
return &AgentConnection{stream: stream, response: response, heartbeat: heartbeat, log: logger, AgentID: agentID, status: status}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *AgentConnection) Listen() error {
|
func (a *AgentConnection) Listen() error {
|
||||||
@@ -87,7 +87,7 @@ func (a *AgentConnection) listenHeartbeat(ctx context.Context, heartbeats <-chan
|
|||||||
case heartbeat := <-heartbeats:
|
case heartbeat := <-heartbeats:
|
||||||
a.status.Online()
|
a.status.Online()
|
||||||
lastHeartbeat = 0
|
lastHeartbeat = 0
|
||||||
err := a.store.CreateHeartbeat(ctx, heartbeat)
|
err := a.heartbeat.CreateHeartbeat(ctx, heartbeat)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Error().Err(err).Str("agentID", heartbeat.AgentID).Msg("failed to write heartbeat")
|
a.log.Error().Err(err).Str("agentID", heartbeat.AgentID).Msg("failed to write heartbeat")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,35 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ResponseStore struct {
|
||||||
|
store map[string]chan domainHub.AgentResponse
|
||||||
|
mutex sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewResponseStore() *ResponseStore {
|
||||||
|
data := make(map[string]chan domainHub.AgentResponse)
|
||||||
|
return &ResponseStore{store: data}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ResponseStore) Write(responseID string, channel chan domainHub.AgentResponse) {
|
||||||
|
r.mutex.Lock()
|
||||||
|
defer r.mutex.Unlock()
|
||||||
|
r.store[responseID] = channel
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ResponseStore) Read(responseID string) chan domainHub.AgentResponse {
|
||||||
|
r.mutex.RLock()
|
||||||
|
defer r.mutex.RUnlock()
|
||||||
|
return r.store[responseID]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ResponseStore) Delete(responseID string) {
|
||||||
|
r.mutex.Lock()
|
||||||
|
defer r.mutex.Unlock()
|
||||||
|
delete(r.store, responseID)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user