diff --git a/hub/internal/service/connection_manager/agent_test.go b/hub/internal/service/connection_manager/agent_test.go index 2f3bc3c..e1cad49 100644 --- a/hub/internal/service/connection_manager/agent_test.go +++ b/hub/internal/service/connection_manager/agent_test.go @@ -3,7 +3,6 @@ package connection_manager import ( "context" "errors" - "io" "sync" "testing" "time" @@ -14,108 +13,6 @@ import ( "gotest.tools/v3/assert" ) -type streamMock struct { - recvCh chan *pb.AgentEvent - sendCh chan *pb.ServerCommandRequest - closeCh chan struct{} - ctx context.Context - mu sync.Mutex - sendErr error - recvErr error - closeOnce sync.Once -} - -func (f *streamMock) Context() context.Context { - return f.ctx -} - -func (f *streamMock) Send(request *pb.ServerCommandRequest) error { - f.mu.Lock() - err := f.sendErr - f.mu.Unlock() - if err != nil { - return err - } - f.sendCh <- request - return nil -} - -func (f *streamMock) Recv() (*pb.AgentEvent, error) { - f.mu.Lock() - recvErr := f.recvErr - f.mu.Unlock() - if recvErr != nil { - return nil, recvErr - } - select { - case msg, ok := <-f.recvCh: - if !ok { - return nil, io.EOF - } - return msg, nil - case <-f.ctx.Done(): - return nil, f.ctx.Err() - } -} - -func (f *streamMock) Close() error { - select { - case f.closeCh <- struct{}{}: - default: - } - f.closeOnce.Do(func() { - close(f.recvCh) - }) - return nil -} - -type heartBeatMock struct { - mu sync.Mutex - countUse int - doneCh chan struct{} - err error -} - -func (h *heartBeatMock) CreateHeartbeat(ctx context.Context, heartbeat domainHub.CreateHeartbeatModel) error { - h.mu.Lock() - h.countUse += 1 - err := h.err - h.mu.Unlock() - select { - case h.doneCh <- struct{}{}: - default: - } - return err -} - -type statusMock struct { - mu sync.Mutex - online bool - doneCh chan struct{} -} - -func (s *statusMock) Offline() { - s.mu.Lock() - s.online = false - s.mu.Unlock() -} - -func (s *statusMock) Online() { - s.mu.Lock() - s.online = true - s.mu.Unlock() - select { - case s.doneCh <- struct{}{}: - default: - } -} - -func (s *statusMock) IsOnline() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.online -} - type agentTestHarness struct { ctx context.Context cancel context.CancelFunc diff --git a/hub/internal/service/connection_manager/manager.go b/hub/internal/service/connection_manager/manager.go index baacfd6..acd4159 100644 --- a/hub/internal/service/connection_manager/manager.go +++ b/hub/internal/service/connection_manager/manager.go @@ -21,11 +21,11 @@ func NewConnectionManager(heartbeat heartbeatStore, status statusNotifier, logge return &ConnectionManager{heartbeat: heartbeat, log: logger, status: status, agentConnStore: NewAgentConnStore()} } -func (c *ConnectionManager) NewConnection(stream streamConn) { +func (c *ConnectionManager) NewConnection(stream streamConn) error { AgentID, err := agentIDFromMetadata(stream.Context()) if err != nil { c.log.Error().Err(err).Msg("missing agent id in metadata") - return + return fmt.Errorf("get agent id: %w", err) } c.log.Info().Str("agentID", AgentID).Msg("connection accepted") @@ -41,6 +41,8 @@ func (c *ConnectionManager) NewConnection(stream streamConn) { } c.agentConnStore.Delete(AgentID) }() + + return nil } func (c *ConnectionManager) GetConnection(AgentID string) (*AgentConnection, error) { diff --git a/hub/internal/service/connection_manager/manager_test.go b/hub/internal/service/connection_manager/manager_test.go index dc1ee8e..3bd443d 100644 --- a/hub/internal/service/connection_manager/manager_test.go +++ b/hub/internal/service/connection_manager/manager_test.go @@ -1 +1,110 @@ package connection_manager + +import ( + "context" + "testing" + "time" + + pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/metadata" + "gotest.tools/v3/assert" +) + +type connectionManagerTestHarness struct { + heartbeat *heartBeatMock + status *statusNotifierMock + manager *ConnectionManager +} + +func newConnectionManagerTestHarness(t *testing.T) *connectionManagerTestHarness { + t.Helper() + + heartbeat := &heartBeatMock{doneCh: make(chan struct{}, 2)} + status := &statusNotifierMock{agentIDCh: make(chan string, 1)} + + manager := NewConnectionManager(heartbeat, status, zerolog.New(nil)) + + return &connectionManagerTestHarness{manager: manager, status: status, heartbeat: heartbeat} +} + +func newMetadataAgentID(t *testing.T, agentID string) metadata.MD { + t.Helper() + + return metadata.New(map[string]string{"agent-id": agentID}) +} + +func TestNewConnectionManager_NewConnection(t *testing.T) { + h := newConnectionManagerTestHarness(t) + agentID := "123" + + ctx := metadata.NewIncomingContext(context.Background(), newMetadataAgentID(t, agentID)) + + stream := streamMock{ctx: ctx, + recvCh: make(chan *pb.AgentEvent, 1), + sendCh: make(chan *pb.ServerCommandRequest, 1), + closeCh: make(chan struct{}, 1), + } + + err := h.manager.NewConnection(&stream) + assert.NilError(t, err) + + select { + case ID := <-h.status.agentIDCh: + require.Equal(t, agentID, ID) + case <-time.After(200 * time.Millisecond): + t.Fatalf("get agent id for notifier") + } + + agentIDs := h.manager.GetAllAgentID() + assert.Equal(t, agentID, agentIDs[0]) + + agent, err := h.manager.GetConnection(agentID) + assert.NilError(t, err) + require.NotNil(t, agent) + +} + +func TestNewConnectionManager_NewConnectionNotAgentID(t *testing.T) { + h := newConnectionManagerTestHarness(t) + + stream := streamMock{ctx: context.Background(), + recvCh: make(chan *pb.AgentEvent, 1), + sendCh: make(chan *pb.ServerCommandRequest, 1), + closeCh: make(chan struct{}, 1), + } + + err := h.manager.NewConnection(&stream) + assert.ErrorContains(t, err, "get agent id") +} + +func TestNewConnectionManager_AgentNotFound(t *testing.T) { + h := newConnectionManagerTestHarness(t) + _, err := h.manager.GetConnection("123") + assert.ErrorIs(t, ErrNotFoundConn, err) + + agentIDs := h.manager.GetAllAgentID() + assert.Equal(t, len(agentIDs), 0) +} + +func Test_agentIDFromMetadata(t *testing.T) { + agentID := "123" + + ctx := metadata.NewIncomingContext(context.Background(), newMetadataAgentID(t, agentID)) + id, err := agentIDFromMetadata(ctx) + assert.NilError(t, err) + assert.Equal(t, id, agentID) +} + +func Test_agentIDFromMetadata_MetadataNotFound(t *testing.T) { + ctx := context.Background() + _, err := agentIDFromMetadata(ctx) + assert.ErrorContains(t, err, "metadata not found") +} + +func Test_agentIDFromMetadata_AgentIDNotFound(t *testing.T) { + ctx := metadata.NewIncomingContext(context.Background(), newMetadataAgentID(t, "")) + _, err := agentIDFromMetadata(ctx) + assert.ErrorContains(t, err, "agent-id not found") +} diff --git a/hub/internal/service/connection_manager/mock_test.go b/hub/internal/service/connection_manager/mock_test.go new file mode 100644 index 0000000..881eb5b --- /dev/null +++ b/hub/internal/service/connection_manager/mock_test.go @@ -0,0 +1,125 @@ +package connection_manager + +import ( + "context" + "io" + "sync" + + pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" + domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" +) + +type streamMock struct { + recvCh chan *pb.AgentEvent + sendCh chan *pb.ServerCommandRequest + closeCh chan struct{} + ctx context.Context + mu sync.Mutex + sendErr error + recvErr error + closeOnce sync.Once +} + +func (f *streamMock) Context() context.Context { + return f.ctx +} + +func (f *streamMock) Send(request *pb.ServerCommandRequest) error { + f.mu.Lock() + err := f.sendErr + f.mu.Unlock() + if err != nil { + return err + } + f.sendCh <- request + return nil +} + +func (f *streamMock) Recv() (*pb.AgentEvent, error) { + f.mu.Lock() + recvErr := f.recvErr + f.mu.Unlock() + if recvErr != nil { + return nil, recvErr + } + select { + case msg, ok := <-f.recvCh: + if !ok { + return nil, io.EOF + } + return msg, nil + case <-f.ctx.Done(): + return nil, f.ctx.Err() + } +} + +func (f *streamMock) Close() error { + select { + case f.closeCh <- struct{}{}: + default: + } + f.closeOnce.Do(func() { + close(f.recvCh) + }) + return nil +} + +type heartBeatMock struct { + mu sync.Mutex + countUse int + doneCh chan struct{} + err error +} + +func (h *heartBeatMock) CreateHeartbeat(ctx context.Context, heartbeat domainHub.CreateHeartbeatModel) error { + h.mu.Lock() + h.countUse += 1 + err := h.err + h.mu.Unlock() + select { + case h.doneCh <- struct{}{}: + default: + } + return err +} + +type statusMock struct { + mu sync.Mutex + online bool + doneCh chan struct{} +} + +func (s *statusMock) Offline() { + s.mu.Lock() + s.online = false + s.mu.Unlock() +} + +func (s *statusMock) Online() { + s.mu.Lock() + s.online = true + s.mu.Unlock() + select { + case s.doneCh <- struct{}{}: + default: + } +} + +func (s *statusMock) IsOnline() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.online +} + +type statusNotifierMock struct { + agentIDCh chan string +} + +func (s *statusNotifierMock) New(AgentID string) statusAgent { + select { + case s.agentIDCh <- AgentID: + default: + + } + return &statusMock{doneCh: make(chan struct{}, 2)} +}