From 0972a7b29051d07e83a8f1098e578a4828dc8be0 Mon Sep 17 00:00:00 2001 From: lorsan Date: Fri, 15 May 2026 20:54:54 +0300 Subject: [PATCH] feat: new agent connections test Heartbeat timeout --- .../service/connection_manager/agent_test.go | 51 +++++++++++++++++-- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/hub/internal/service/connection_manager/agent_test.go b/hub/internal/service/connection_manager/agent_test.go index 799572b..dca76c7 100644 --- a/hub/internal/service/connection_manager/agent_test.go +++ b/hub/internal/service/connection_manager/agent_test.go @@ -3,6 +3,7 @@ package connection_manager import ( "context" "io" + "sync" "testing" "time" @@ -43,6 +44,7 @@ func (f *streamMock) Recv() (*pb.AgentEvent, error) { func (f *streamMock) Close() error { select { case f.closeCh <- struct{}{}: + close(f.recvCh) default: } return nil @@ -82,7 +84,7 @@ func (s *statusMock) Online() { func TestAgentConnection_Heartbeat(t *testing.T) { // Создаем вся поля для Agent Connection // Нужно как то вынести в отдельную функцию - sendStream := make(chan *pb.ServerCommandRequest) + sendStream := make(chan *pb.ServerCommandRequest, 1) recvStream := make(chan *pb.AgentEvent) ctx, cancel := context.WithCancel(context.Background()) @@ -90,7 +92,7 @@ func TestAgentConnection_Heartbeat(t *testing.T) { heartbeat := heartBeatMock{doneCh: make(chan struct{}, 1)} status := statusMock{doneCh: make(chan struct{}, 1)} - agent := newAgentConnection("123", &stream, &heartbeat, &status, zerolog.New(nil)) + agent := newAgentConnection("123", &stream, &heartbeat, &status, 5000, zerolog.New(nil)) go agent.Listen() recvStream <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_Heartbeat{ @@ -126,7 +128,7 @@ func TestAgentConnection_Heartbeat(t *testing.T) { } func TestAgentConnection_Execute(t *testing.T) { - sendStream := make(chan *pb.ServerCommandRequest) + sendStream := make(chan *pb.ServerCommandRequest, 1) recvStream := make(chan *pb.AgentEvent) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -135,7 +137,7 @@ func TestAgentConnection_Execute(t *testing.T) { heartbeat := heartBeatMock{doneCh: make(chan struct{}, 1)} status := statusMock{doneCh: make(chan struct{}, 1)} - agent := newAgentConnection("123", &stream, &heartbeat, &status, zerolog.New(nil)) + agent := newAgentConnection("123", &stream, &heartbeat, &status, 5000, zerolog.New(nil)) go agent.Listen() // Данные для проверки @@ -172,5 +174,44 @@ func TestAgentConnection_Execute(t *testing.T) { } } -//Написать тест когда heartbeat не приходит и все закрывается +// Написать тест когда heartbeat не приходит и все закрывается +func TestAgentConnection_HeartbeatTimeout(t *testing.T) { + sendStream := make(chan *pb.ServerCommandRequest, 1) + recvStream := make(chan *pb.AgentEvent) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + stream := streamMock{recvCh: recvStream, sendCh: sendStream, ctx: ctx, closeCh: make(chan struct{}, 1)} + heartbeat := heartBeatMock{doneCh: make(chan struct{}, 1)} + status := statusMock{doneCh: make(chan struct{}, 1)} + var wg sync.WaitGroup + + agent := newAgentConnection("123", &stream, &heartbeat, &status, 200, zerolog.New(nil)) + + wg.Add(2) + go func() { + err := agent.Listen() + assert.NilError(t, err) + wg.Done() + }() + + go func() { + _, err := agent.Execute(ctx, domainHub.AgentRequest{ + Name: "test", + Args: nil, + TimeOut: 0, + }) + assert.ErrorIs(t, err, ConnectionCloseErr) + wg.Done() + }() + + wg.Wait() + + select { + case <-stream.closeCh: + case <-time.After(500 * time.Millisecond): + t.Fatal("timeout waiting for close") + } +} + //Написать тест при закрытии соединения Execute завершается