diff --git a/hub/internal/service/connection_manager/agent_test.go b/hub/internal/service/connection_manager/agent_test.go index dca76c7..5e68497 100644 --- a/hub/internal/service/connection_manager/agent_test.go +++ b/hub/internal/service/connection_manager/agent_test.go @@ -2,6 +2,7 @@ package connection_manager import ( "context" + "errors" "io" "sync" "testing" @@ -18,6 +19,10 @@ type streamMock struct { sendCh chan *pb.ServerCommandRequest closeCh chan struct{} ctx context.Context + mu sync.Mutex + sendErr error + recvErr error + closeOnce sync.Once } func (f *streamMock) Context() context.Context { @@ -25,11 +30,23 @@ func (f *streamMock) Context() context.Context { } func (f *streamMock) Send(request *pb.ServerCommandRequest) error { + f.mu.Lock() + err := f.sendErr + f.mu.Unlock() + if err != nil { + return err + } f.sendCh <- request return nil } func (f *streamMock) Recv() (*pb.AgentEvent, error) { + f.mu.Lock() + recvErr := f.recvErr + f.mu.Unlock() + if recvErr != nil { + return nil, recvErr + } select { case msg, ok := <-f.recvCh: if !ok { @@ -44,101 +61,153 @@ func (f *streamMock) Recv() (*pb.AgentEvent, error) { func (f *streamMock) Close() error { select { case f.closeCh <- struct{}{}: - close(f.recvCh) default: } + f.closeOnce.Do(func() { + close(f.recvCh) + }) return nil } type heartBeatMock struct { + mu sync.Mutex countUse int doneCh chan struct{} + err error } func (h *heartBeatMock) CreateHeartbeat(ctx context.Context, heartbeat domainHub.CreateHeartbeatModel) error { + h.mu.Lock() h.countUse += 1 + err := h.err + h.mu.Unlock() select { case h.doneCh <- struct{}{}: default: } - return nil + return err } type statusMock struct { + mu sync.Mutex online bool doneCh chan struct{} } func (s *statusMock) Offline() { + s.mu.Lock() s.online = false + s.mu.Unlock() } func (s *statusMock) Online() { + s.mu.Lock() s.online = true + s.mu.Unlock() select { case s.doneCh <- struct{}{}: default: } } -func TestAgentConnection_Heartbeat(t *testing.T) { - // Создаем вся поля для Agent Connection - // Нужно как то вынести в отдельную функцию - sendStream := make(chan *pb.ServerCommandRequest, 1) - recvStream := make(chan *pb.AgentEvent) +func (s *statusMock) IsOnline() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.online +} + +type agentTestHarness struct { + ctx context.Context + cancel context.CancelFunc + stream *streamMock + heartbeat *heartBeatMock + status *statusMock + agent *AgentConnection + recvCh chan *pb.AgentEvent + sendCh chan *pb.ServerCommandRequest +} + +func newAgentTestHarness(t *testing.T, heartbeatTimeoutMS int) *agentTestHarness { + t.Helper() + sendStream := make(chan *pb.ServerCommandRequest, 4) + recvStream := make(chan *pb.AgentEvent, 4) ctx, cancel := context.WithCancel(context.Background()) - stream := streamMock{recvCh: recvStream, sendCh: sendStream, ctx: ctx, closeCh: make(chan struct{}, 1)} - heartbeat := heartBeatMock{doneCh: make(chan struct{}, 1)} - status := statusMock{doneCh: make(chan struct{}, 1)} + stream := &streamMock{recvCh: recvStream, sendCh: sendStream, ctx: ctx, closeCh: make(chan struct{}, 1)} + heartbeat := &heartBeatMock{doneCh: make(chan struct{}, 2)} + status := &statusMock{doneCh: make(chan struct{}, 2)} - agent := newAgentConnection("123", &stream, &heartbeat, &status, 5000, zerolog.New(nil)) - go agent.Listen() + agent := newAgentConnection("123", stream, heartbeat, status, heartbeatTimeoutMS, zerolog.New(nil)) - recvStream <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_Heartbeat{ + t.Cleanup(func() { + cancel() + }) + + return &agentTestHarness{ + ctx: ctx, cancel: cancel, stream: stream, heartbeat: heartbeat, status: status, + agent: agent, recvCh: recvStream, sendCh: sendStream, + } +} + +func waitFor(t *testing.T, ch <-chan struct{}, timeout time.Duration, message string) { + t.Helper() + select { + case <-ch: + case <-time.After(timeout): + t.Fatal(message) + } +} + +func waitForClose(t *testing.T, closeCh <-chan struct{}, timeout time.Duration) { + t.Helper() + select { + case <-closeCh: + case <-time.After(timeout): + t.Fatal("timeout waiting for close") + } +} + +func commandResponseEvent(requestID, output string) *pb.AgentEvent { + return &pb.AgentEvent{ + AgentId: "agent-1", + Event: &pb.AgentEvent_CommandResponse{ + CommandResponse: &pb.CommandResponse{ + RequestId: requestID, + Success: true, + Output: output, + }, + }, + } +} + +func TestAgentConnection_Heartbeat(t *testing.T) { + h := newAgentTestHarness(t, 5000) + go h.agent.Listen() + + h.recvCh <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_Heartbeat{ Heartbeat: &pb.Heartbeat{ Timestamp: time.Now().Unix(), Metrics: &pb.SystemMetrics{CpuUsage: 0.5, MemoryUsage: 0.3, DiskUsage: 0.7}, }}} - select { - case <-heartbeat.doneCh: - case <-time.After(500 * time.Millisecond): - t.Fatal("timeout waiting for heartbeat") - } + waitFor(t, h.heartbeat.doneCh, 500*time.Millisecond, "timeout waiting for heartbeat") + waitFor(t, h.status.doneCh, 500*time.Millisecond, "timeout waiting for status online") - select { - case <-status.doneCh: - case <-time.After(500 * time.Millisecond): - t.Fatal("timeout waiting for status online") - } + h.heartbeat.mu.Lock() + count := h.heartbeat.countUse + h.heartbeat.mu.Unlock() - assert.Equal(t, heartbeat.countUse, 1) - assert.Equal(t, status.online, true) + assert.Equal(t, count, 1) + assert.Equal(t, h.status.IsOnline(), true) - cancel() - - select { - case <-stream.closeCh: - case <-time.After(500 * time.Millisecond): - t.Fatal("timeout waiting for close") - } - - assert.Equal(t, status.online, false) + h.cancel() + waitForClose(t, h.stream.closeCh, 500*time.Millisecond) + assert.Equal(t, h.status.IsOnline(), false) } func TestAgentConnection_Execute(t *testing.T) { - sendStream := make(chan *pb.ServerCommandRequest, 1) - recvStream := make(chan *pb.AgentEvent) - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - stream := streamMock{recvCh: recvStream, sendCh: sendStream, ctx: ctx} - heartbeat := heartBeatMock{doneCh: make(chan struct{}, 1)} - status := statusMock{doneCh: make(chan struct{}, 1)} - - agent := newAgentConnection("123", &stream, &heartbeat, &status, 5000, zerolog.New(nil)) - go agent.Listen() + h := newAgentTestHarness(t, 5000) + go h.agent.Listen() // Данные для проверки requestID := make(chan domainHub.AgentResponse) @@ -146,7 +215,7 @@ func TestAgentConnection_Execute(t *testing.T) { name := "test name" go func() { - response, _ := agent.Execute(ctx, domainHub.AgentRequest{ + response, _ := h.agent.Execute(h.ctx, domainHub.AgentRequest{ Name: name, Args: nil, TimeOut: 0, @@ -155,10 +224,10 @@ func TestAgentConnection_Execute(t *testing.T) { requestID <- response }() - request := <-sendStream + request := <-h.sendCh assert.Equal(t, name, request.Name) - recvStream <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_CommandResponse{ + h.recvCh <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_CommandResponse{ CommandResponse: &pb.CommandResponse{ RequestId: request.RequestId, Success: true, @@ -174,29 +243,19 @@ func TestAgentConnection_Execute(t *testing.T) { } } -// Написать тест когда heartbeat не приходит и все закрывается func TestAgentConnection_HeartbeatTimeout(t *testing.T) { - sendStream := make(chan *pb.ServerCommandRequest, 1) - recvStream := make(chan *pb.AgentEvent) - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - stream := streamMock{recvCh: recvStream, sendCh: sendStream, ctx: ctx, closeCh: make(chan struct{}, 1)} - heartbeat := heartBeatMock{doneCh: make(chan struct{}, 1)} - status := statusMock{doneCh: make(chan struct{}, 1)} + h := newAgentTestHarness(t, 200) var wg sync.WaitGroup - agent := newAgentConnection("123", &stream, &heartbeat, &status, 200, zerolog.New(nil)) - wg.Add(2) go func() { - err := agent.Listen() + err := h.agent.Listen() assert.NilError(t, err) wg.Done() }() go func() { - _, err := agent.Execute(ctx, domainHub.AgentRequest{ + _, err := h.agent.Execute(h.ctx, domainHub.AgentRequest{ Name: "test", Args: nil, TimeOut: 0, @@ -207,11 +266,168 @@ func TestAgentConnection_HeartbeatTimeout(t *testing.T) { wg.Wait() - select { - case <-stream.closeCh: - case <-time.After(500 * time.Millisecond): - t.Fatal("timeout waiting for close") - } + waitForClose(t, h.stream.closeCh, 500*time.Millisecond) } -//Написать тест при закрытии соединения Execute завершается +func TestAgentConnection_ConnectionClose(t *testing.T) { + h := newAgentTestHarness(t, 5000) + var wg sync.WaitGroup + + wg.Add(2) + go func() { + err := h.agent.Listen() + assert.ErrorIs(t, err, context.Canceled) + wg.Done() + }() + + go func() { + _, err := h.agent.Execute(context.Background(), domainHub.AgentRequest{ + Name: "test", + Args: nil, + TimeOut: 0, + }) + assert.ErrorIs(t, err, ConnectionCloseErr) + wg.Done() + }() + + h.cancel() + + wg.Wait() + + waitForClose(t, h.stream.closeCh, 500*time.Millisecond) +} + +func TestAgentConnection_ExecuteClose(t *testing.T) { + h := newAgentTestHarness(t, 5000) + ctxExecute, cancelExecute := context.WithCancel(context.Background()) + t.Cleanup(cancelExecute) + + executeCh := make(chan struct{}) + go h.agent.Listen() + + go func() { + _, err := h.agent.Execute(ctxExecute, domainHub.AgentRequest{ + Name: "test", + Args: nil, + TimeOut: 0, + }) + + assert.ErrorIs(t, err, context.Canceled) + executeCh <- struct{}{} + }() + + cancelExecute() + waitFor(t, executeCh, 500*time.Millisecond, "timeout waiting for execute close") +} + +func TestAgentConnection_ListenEOF(t *testing.T) { + h := newAgentTestHarness(t, 5000) + h.stream.Close() + + err := h.agent.Listen() + assert.NilError(t, err) + waitForClose(t, h.stream.closeCh, 500*time.Millisecond) +} + +func TestAgentConnection_ListenRecvError(t *testing.T) { + h := newAgentTestHarness(t, 5000) + + recvErr := errors.New("recv failure") + h.stream.mu.Lock() + h.stream.recvErr = recvErr + h.stream.mu.Unlock() + + err := h.agent.Listen() + assert.ErrorIs(t, err, recvErr) +} + +func TestAgentConnection_ExecuteSendError(t *testing.T) { + h := newAgentTestHarness(t, 5000) + h.stream.mu.Lock() + h.stream.sendErr = errors.New("send failure") + h.stream.mu.Unlock() + + _, err := h.agent.Execute(h.ctx, domainHub.AgentRequest{Name: "test"}) + assert.ErrorContains(t, err, "execute command") +} + +func TestAgentConnection_ExecuteContextCanceled(t *testing.T) { + h := newAgentTestHarness(t, 5000) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + _, err := h.agent.Execute(ctx, domainHub.AgentRequest{Name: "test"}) + assert.ErrorIs(t, err, context.Canceled) +} + +func TestAgentConnection_ExecuteConnectionCanceled(t *testing.T) { + h := newAgentTestHarness(t, 5000) + h.cancel() + + _, err := h.agent.Execute(context.Background(), domainHub.AgentRequest{Name: "test"}) + assert.ErrorIs(t, err, ConnectionCloseErr) +} + +func TestAgentConnection_UnknownResponseID(t *testing.T) { + h := newAgentTestHarness(t, 5000) + go h.agent.Listen() + + h.recvCh <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_CommandResponse{ + CommandResponse: &pb.CommandResponse{ + RequestId: "unknown", + Success: true, + Output: "ok", + }}} + + h.cancel() + waitForClose(t, h.stream.closeCh, 500*time.Millisecond) +} + +func TestAgentConnection_HeartbeatErrorDoesNotStop(t *testing.T) { + h := newAgentTestHarness(t, 5000) + h.heartbeat.mu.Lock() + h.heartbeat.err = errors.New("db error") + h.heartbeat.mu.Unlock() + + go h.agent.Listen() + h.recvCh <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_Heartbeat{ + Heartbeat: &pb.Heartbeat{ + Timestamp: time.Now().Unix(), + Metrics: &pb.SystemMetrics{CpuUsage: 0.2, MemoryUsage: 0.1, DiskUsage: 0.3}, + }}} + + waitFor(t, h.heartbeat.doneCh, 500*time.Millisecond, "timeout waiting for heartbeat") + h.cancel() +} + +func TestAgentConnection_ConcurrentExecute(t *testing.T) { + h := newAgentTestHarness(t, 5000) + go h.agent.Listen() + + responses := make(chan domainHub.AgentResponse, 2) + + go func() { + resp, _ := h.agent.Execute(h.ctx, domainHub.AgentRequest{Name: "cmd-1"}) + responses <- resp + }() + go func() { + resp, _ := h.agent.Execute(h.ctx, domainHub.AgentRequest{Name: "cmd-2"}) + responses <- resp + }() + + first := <-h.sendCh + second := <-h.sendCh + + // ответы приходят в обратном порядке + h.recvCh <- commandResponseEvent(second.RequestId, "second") + h.recvCh <- commandResponseEvent(first.RequestId, "first") + + resp1 := <-responses + resp2 := <-responses + + assert.Assert(t, resp1.Output == "first" || resp1.Output == "second") + assert.Assert(t, resp2.Output == "first" || resp2.Output == "second") + assert.Assert(t, resp1.Output != resp2.Output) + + h.cancel() +}