diff --git a/go.mod b/go.mod index 292ad1a..bda85b7 100644 --- a/go.mod +++ b/go.mod @@ -5,12 +5,14 @@ go 1.26.1 require ( github.com/docker/docker v28.5.2+incompatible github.com/golang-migrate/migrate/v4 v4.19.1 + github.com/google/uuid v1.6.0 github.com/ilyakaznacheev/cleanenv v1.5.0 github.com/mattn/go-sqlite3 v1.14.44 github.com/rs/zerolog v1.35.1 google.golang.org/grpc v1.81.0 google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v3 v3.0.1 + gotest.tools/v3 v3.5.2 ) require ( @@ -26,7 +28,7 @@ require ( github.com/felixge/httpsnoop v1.0.4 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/google/uuid v1.6.0 // indirect + github.com/google/go-cmp v0.7.0 // indirect github.com/joho/godotenv v1.5.1 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/lib/pq v1.12.3 // indirect @@ -54,7 +56,6 @@ require ( golang.org/x/time v0.15.0 // indirect golang.org/x/tools v0.43.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260504160031-60b97b32f348 // indirect - gotest.tools/v3 v3.5.2 // indirect lukechampine.com/uint128 v1.2.0 // indirect modernc.org/cc/v3 v3.36.3 // indirect modernc.org/ccgo/v3 v3.16.9 // indirect diff --git a/hub/internal/service/connection_manager/agent.go b/hub/internal/service/connection_manager/agent.go index 34a008b..ca736a4 100644 --- a/hub/internal/service/connection_manager/agent.go +++ b/hub/internal/service/connection_manager/agent.go @@ -9,30 +9,23 @@ import ( "github.com/google/uuid" pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" - "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager/store" "github.com/rs/zerolog" ) -type statusAgent interface { - Offline() - Online() -} - -// использовать sync.Pool что бы переиспользвоать этот обьект type AgentConnection struct { stream streamConn heartbeat heartbeatStore log zerolog.Logger status statusAgent AgentID string - response *store.ResponseStore + response *ResponseStore ctx context.Context cancel context.CancelFunc heartbeatTimeoutMS int } func newAgentConnection(agentID string, stream streamConn, heartbeat heartbeatStore, status statusAgent, heartbeatTimeoutMS int, logger zerolog.Logger) *AgentConnection { - response := store.NewResponseStore() + response := NewResponseStore() logger = logger.With().Str("agentID", agentID).Logger() ctx, cancel := context.WithCancel(stream.Context()) return &AgentConnection{stream: stream, response: response, heartbeat: heartbeat, log: logger, AgentID: agentID, status: status, ctx: ctx, cancel: cancel, heartbeatTimeoutMS: heartbeatTimeoutMS} @@ -135,7 +128,7 @@ func (a *AgentConnection) Execute(ctx context.Context, request domainHub.AgentRe select { case <-a.ctx.Done(): - return domainHub.AgentResponse{}, ConnectionCloseErr + return domainHub.AgentResponse{}, ErrConnectionClose case <-ctx.Done(): return domainHub.AgentResponse{}, ctx.Err() case response := <-ch: diff --git a/hub/internal/service/connection_manager/agent_test.go b/hub/internal/service/connection_manager/agent_test.go index 5e68497..2f3bc3c 100644 --- a/hub/internal/service/connection_manager/agent_test.go +++ b/hub/internal/service/connection_manager/agent_test.go @@ -15,13 +15,13 @@ import ( ) type streamMock struct { - recvCh chan *pb.AgentEvent - sendCh chan *pb.ServerCommandRequest - closeCh chan struct{} - ctx context.Context - mu sync.Mutex - sendErr error - recvErr error + recvCh chan *pb.AgentEvent + sendCh chan *pb.ServerCommandRequest + closeCh chan struct{} + ctx context.Context + mu sync.Mutex + sendErr error + recvErr error closeOnce sync.Once } @@ -260,7 +260,7 @@ func TestAgentConnection_HeartbeatTimeout(t *testing.T) { Args: nil, TimeOut: 0, }) - assert.ErrorIs(t, err, ConnectionCloseErr) + assert.ErrorIs(t, err, ErrConnectionClose) wg.Done() }() @@ -286,7 +286,7 @@ func TestAgentConnection_ConnectionClose(t *testing.T) { Args: nil, TimeOut: 0, }) - assert.ErrorIs(t, err, ConnectionCloseErr) + assert.ErrorIs(t, err, ErrConnectionClose) wg.Done() }() @@ -365,7 +365,7 @@ func TestAgentConnection_ExecuteConnectionCanceled(t *testing.T) { h.cancel() _, err := h.agent.Execute(context.Background(), domainHub.AgentRequest{Name: "test"}) - assert.ErrorIs(t, err, ConnectionCloseErr) + assert.ErrorIs(t, err, ErrConnectionClose) } func TestAgentConnection_UnknownResponseID(t *testing.T) { diff --git a/hub/internal/service/connection_manager/domain.go b/hub/internal/service/connection_manager/domain.go deleted file mode 100644 index bdab261..0000000 --- a/hub/internal/service/connection_manager/domain.go +++ /dev/null @@ -1,6 +0,0 @@ -package connection_manager - -type agentStatus struct { - AgentID string - Online bool -} diff --git a/hub/internal/service/connection_manager/errors.go b/hub/internal/service/connection_manager/errors.go index 56d2a88..0d8ef9a 100644 --- a/hub/internal/service/connection_manager/errors.go +++ b/hub/internal/service/connection_manager/errors.go @@ -2,4 +2,6 @@ package connection_manager import "errors" -var ConnectionCloseErr error = errors.New("connection close") +var ErrConnectionClose = errors.New("connection close") + +var ErrNotFoundConn = errors.New("agent connection not found") diff --git a/hub/internal/service/connection_manager/interface.go b/hub/internal/service/connection_manager/interface.go index 2ce54c8..19cc737 100644 --- a/hub/internal/service/connection_manager/interface.go +++ b/hub/internal/service/connection_manager/interface.go @@ -17,3 +17,12 @@ type streamConn interface { type heartbeatStore interface { CreateHeartbeat(ctx context.Context, heartbeat domainHub.CreateHeartbeatModel) error } + +type statusAgent interface { + Offline() + Online() +} + +type statusNotifier interface { + New(AgentID string) statusAgent +} diff --git a/hub/internal/service/connection_manager/manager.go b/hub/internal/service/connection_manager/manager.go new file mode 100644 index 0000000..baacfd6 --- /dev/null +++ b/hub/internal/service/connection_manager/manager.go @@ -0,0 +1,71 @@ +package connection_manager + +import ( + "context" + "fmt" + + "github.com/rs/zerolog" + "google.golang.org/grpc/metadata" +) + +const heartbeatTimeoutMS = 6000 + +type ConnectionManager struct { + heartbeat heartbeatStore + log zerolog.Logger + status statusNotifier + agentConnStore *AgentConnStore +} + +func NewConnectionManager(heartbeat heartbeatStore, status statusNotifier, logger zerolog.Logger) *ConnectionManager { + return &ConnectionManager{heartbeat: heartbeat, log: logger, status: status, agentConnStore: NewAgentConnStore()} +} + +func (c *ConnectionManager) NewConnection(stream streamConn) { + AgentID, err := agentIDFromMetadata(stream.Context()) + if err != nil { + c.log.Error().Err(err).Msg("missing agent id in metadata") + return + } + c.log.Info().Str("agentID", AgentID).Msg("connection accepted") + + status := c.status.New(AgentID) + + agent := newAgentConnection(AgentID, stream, c.heartbeat, status, heartbeatTimeoutMS, c.log) + c.agentConnStore.Add(AgentID, agent) + go func() { + c.log.Debug().Str("agentID", AgentID).Msg("start listening") + err := agent.Listen() + if err != nil { + c.log.Error().Err(err).Msg("listening agent stopped") + } + c.agentConnStore.Delete(AgentID) + }() +} + +func (c *ConnectionManager) GetConnection(AgentID string) (*AgentConnection, error) { + agent := c.agentConnStore.Get(AgentID) + if agent == nil { + return nil, ErrNotFoundConn + } + + return agent, nil +} + +func (c *ConnectionManager) GetAllAgentID() []string { + return c.agentConnStore.GetAllAgentID() +} + +func agentIDFromMetadata(ctx context.Context) (string, error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return "", fmt.Errorf("metadata not found") + } + + values := md.Get("agent-id") + if len(values) == 0 || values[0] == "" { + return "", fmt.Errorf("agent-id not found") + } + + return values[0], nil +} diff --git a/hub/internal/service/connection_manager/store.go b/hub/internal/service/connection_manager/store.go new file mode 100644 index 0000000..3e5ffa8 --- /dev/null +++ b/hub/internal/service/connection_manager/store.go @@ -0,0 +1,82 @@ +package connection_manager + +import ( + "sync" + + domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" +) + +type AgentConnStore struct { + mutex sync.RWMutex + store map[string]*AgentConnection +} + +func NewAgentConnStore() *AgentConnStore { + return &AgentConnStore{store: make(map[string]*AgentConnection)} +} + +func (a *AgentConnStore) Get(agentID string) *AgentConnection { + a.mutex.RLock() + defer a.mutex.RUnlock() + return a.store[agentID] +} + +func (a *AgentConnStore) Add(agentID string, agentConn *AgentConnection) { + a.mutex.Lock() + defer a.mutex.Unlock() + a.store[agentID] = agentConn +} + +func (a *AgentConnStore) Delete(agentID string) { + a.mutex.Lock() + defer a.mutex.Unlock() + delete(a.store, agentID) +} + +func (a *AgentConnStore) Pop(agentID string) *AgentConnection { + a.mutex.Lock() + defer a.mutex.Unlock() + agent := a.store[agentID] + delete(a.store, agentID) + return agent +} + +func (a *AgentConnStore) GetAllAgentID() []string { + a.mutex.RLock() + defer a.mutex.RUnlock() + + var IDs []string + for ID := range a.store { + IDs = append(IDs, ID) + } + return IDs +} + +type ResponseStore struct { + store map[string]chan domainHub.AgentResponse + mutex sync.RWMutex +} + +func NewResponseStore() *ResponseStore { + data := make(map[string]chan domainHub.AgentResponse) + return &ResponseStore{store: data} +} + +func (r *ResponseStore) Write(responseID string, channel chan domainHub.AgentResponse) { + r.mutex.Lock() + defer r.mutex.Unlock() + r.store[responseID] = channel +} + +func (r *ResponseStore) Read(responseID string) (chan domainHub.AgentResponse, bool) { + r.mutex.RLock() + defer r.mutex.RUnlock() + ch, ok := r.store[responseID] + return ch, ok +} + +func (r *ResponseStore) Delete(responseID string) { + r.mutex.Lock() + defer r.mutex.Unlock() + delete(r.store, responseID) +} diff --git a/hub/internal/service/connection_manager/store/agent.go b/hub/internal/service/connection_manager/store/agent.go deleted file mode 100644 index a7ddd32..0000000 --- a/hub/internal/service/connection_manager/store/agent.go +++ /dev/null @@ -1,36 +0,0 @@ -package store - -import ( - "sync" - - domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" -) - -type ResponseStore struct { - store map[string]chan domainHub.AgentResponse - mutex sync.RWMutex -} - -func NewResponseStore() *ResponseStore { - data := make(map[string]chan domainHub.AgentResponse) - return &ResponseStore{store: data} -} - -func (r *ResponseStore) Write(responseID string, channel chan domainHub.AgentResponse) { - r.mutex.Lock() - defer r.mutex.Unlock() - r.store[responseID] = channel -} - -func (r *ResponseStore) Read(responseID string) (chan domainHub.AgentResponse, bool) { - r.mutex.RLock() - defer r.mutex.RUnlock() - ch, ok := r.store[responseID] - return ch, ok -} - -func (r *ResponseStore) Delete(responseID string) { - r.mutex.Lock() - defer r.mutex.Unlock() - delete(r.store, responseID) -} diff --git a/hub/internal/service/connection_manager/store/manager.go b/hub/internal/service/connection_manager/store/manager.go deleted file mode 100644 index b690f36..0000000 --- a/hub/internal/service/connection_manager/store/manager.go +++ /dev/null @@ -1,42 +0,0 @@ -package store - -import ( - "sync" - - "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager" -) - -type AgentConnStore struct { - mutex sync.RWMutex - store map[string]*connection_manager.AgentConnection -} - -func NewAgentConnStore() *AgentConnStore { - return &AgentConnStore{store: make(map[string]*connection_manager.AgentConnection)} -} - -func (a *AgentConnStore) Get(agentID string) *connection_manager.AgentConnection { - a.mutex.RLock() - defer a.mutex.RUnlock() - return a.store[agentID] -} - -func (a *AgentConnStore) Add(agentConn *connection_manager.AgentConnection) { - a.mutex.Lock() - defer a.mutex.Unlock() - a.store[agentConn.AgentID] = agentConn -} - -func (a *AgentConnStore) Delete(agentID string) { - a.mutex.Lock() - defer a.mutex.Unlock() - delete(a.store, agentID) -} - -func (a *AgentConnStore) Pop(agentID string) *connection_manager.AgentConnection { - a.mutex.Lock() - defer a.mutex.Unlock() - agent := a.store[agentID] - delete(a.store, agentID) - return agent -}