Files
HomeOps-Hub/hub/internal/service/connection_manager/agent_test.go
T

333 lines
8.0 KiB
Go

package connection_manager
import (
"context"
"errors"
"sync"
"testing"
"time"
pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops"
domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain"
"github.com/rs/zerolog"
"gotest.tools/v3/assert"
)
type agentTestHarness struct {
ctx context.Context
cancel context.CancelFunc
stream *streamMock
heartbeat *heartBeatMock
status *statusMock
agent *AgentConnection
recvCh chan *pb.AgentEvent
sendCh chan *pb.ServerCommandRequest
}
func newAgentTestHarness(t *testing.T, heartbeatTimeoutMS int) *agentTestHarness {
t.Helper()
sendStream := make(chan *pb.ServerCommandRequest, 4)
recvStream := make(chan *pb.AgentEvent, 4)
ctx, cancel := context.WithCancel(context.Background())
stream := &streamMock{recvCh: recvStream, sendCh: sendStream, ctx: ctx}
heartbeat := &heartBeatMock{doneCh: make(chan struct{}, 2)}
status := &statusMock{doneCh: make(chan struct{}, 2)}
agent := newAgentConnection("123", stream, heartbeat, status, heartbeatTimeoutMS, zerolog.New(nil))
t.Cleanup(func() {
cancel()
})
return &agentTestHarness{
ctx: ctx, cancel: cancel, stream: stream, heartbeat: heartbeat, status: status,
agent: agent, recvCh: recvStream, sendCh: sendStream,
}
}
func waitFor(t *testing.T, ch <-chan struct{}, timeout time.Duration, message string) {
t.Helper()
select {
case <-ch:
case <-time.After(timeout):
t.Fatal(message)
}
}
func commandResponseEvent(requestID, output string) *pb.AgentEvent {
return &pb.AgentEvent{
AgentId: "agent-1",
Event: &pb.AgentEvent_CommandResponse{
CommandResponse: &pb.CommandResponse{
RequestId: requestID,
Success: true,
Output: output,
},
},
}
}
func TestAgentConnection_Heartbeat(t *testing.T) {
h := newAgentTestHarness(t, 5000)
done := make(chan struct{})
go func() {
_ = h.agent.Listen()
close(done)
}()
h.recvCh <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_Heartbeat{
Heartbeat: &pb.Heartbeat{
Timestamp: time.Now().Unix(),
Metrics: &pb.SystemMetrics{CpuUsage: 0.5, MemoryUsage: 0.3, DiskUsage: 0.7},
}}}
waitFor(t, h.heartbeat.doneCh, 500*time.Millisecond, "timeout waiting for heartbeat")
waitFor(t, h.status.doneCh, 500*time.Millisecond, "timeout waiting for status online")
h.heartbeat.mu.Lock()
count := h.heartbeat.countUse
h.heartbeat.mu.Unlock()
assert.Equal(t, count, 1)
assert.Equal(t, h.status.IsOnline(), true)
h.cancel()
waitFor(t, done, 500*time.Millisecond, "timeout waiting for listen stop")
assert.Equal(t, h.status.IsOnline(), false)
}
func TestAgentConnection_Execute(t *testing.T) {
h := newAgentTestHarness(t, 5000)
go func() {
_ = h.agent.Listen()
}()
// Данные для проверки
requestID := make(chan domainHub.AgentResponse)
output := "test output"
name := "test name"
go func() {
response, _ := h.agent.Execute(h.ctx, domainHub.AgentRequest{
Name: name,
Args: nil,
TimeOut: 0,
})
requestID <- response
}()
request := <-h.sendCh
assert.Equal(t, name, request.Name)
h.recvCh <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_CommandResponse{
CommandResponse: &pb.CommandResponse{
RequestId: request.RequestId,
Success: true,
Output: output,
}}}
select {
case response := <-requestID:
assert.Equal(t, output, response.Output)
case <-time.After(5 * time.Second):
t.Fatal("timeout waiting for response")
}
}
func TestAgentConnection_HeartbeatTimeout(t *testing.T) {
h := newAgentTestHarness(t, 200)
listenDone := make(chan error, 1)
execDone := make(chan error, 1)
go func() {
listenDone <- h.agent.Listen()
}()
go func() {
_, err := h.agent.Execute(h.ctx, domainHub.AgentRequest{
Name: "test",
Args: nil,
TimeOut: 0,
})
execDone <- err
}()
timeout := time.After(2 * time.Second)
gotListen := false
gotExec := false
for !(gotListen && gotExec) {
select {
case err := <-listenDone:
assert.ErrorIs(t, err, context.Canceled)
gotListen = true
case err := <-execDone:
assert.ErrorIs(t, err, ErrConnectionClose)
gotExec = true
case <-timeout:
h.cancel()
t.Fatal("timeout waiting for heartbeat timeout")
}
}
}
func TestAgentConnection_ConnectionClose(t *testing.T) {
h := newAgentTestHarness(t, 5000)
var wg sync.WaitGroup
wg.Add(2)
go func() {
err := h.agent.Listen()
assert.ErrorIs(t, err, context.Canceled)
wg.Done()
}()
go func() {
_, err := h.agent.Execute(context.Background(), domainHub.AgentRequest{
Name: "test",
Args: nil,
TimeOut: 0,
})
assert.ErrorIs(t, err, ErrConnectionClose)
wg.Done()
}()
h.cancel()
wg.Wait()
}
func TestAgentConnection_ExecuteClose(t *testing.T) {
h := newAgentTestHarness(t, 5000)
ctxExecute, cancelExecute := context.WithCancel(context.Background())
t.Cleanup(cancelExecute)
executeCh := make(chan struct{})
go func() {
_ = h.agent.Listen()
}()
go func() {
_, err := h.agent.Execute(ctxExecute, domainHub.AgentRequest{
Name: "test",
Args: nil,
TimeOut: 0,
})
assert.ErrorIs(t, err, context.Canceled)
executeCh <- struct{}{}
}()
cancelExecute()
waitFor(t, executeCh, 500*time.Millisecond, "timeout waiting for execute close")
}
func TestAgentConnection_ListenEOF(t *testing.T) {
h := newAgentTestHarness(t, 5000)
h.stream.CloseRecv()
err := h.agent.Listen()
assert.NilError(t, err)
}
func TestAgentConnection_ExecuteSendError(t *testing.T) {
h := newAgentTestHarness(t, 5000)
h.stream.mu.Lock()
h.stream.sendErr = errors.New("send failure")
h.stream.mu.Unlock()
_, err := h.agent.Execute(h.ctx, domainHub.AgentRequest{Name: "test"})
assert.ErrorContains(t, err, "execute command")
}
func TestAgentConnection_ExecuteContextCanceled(t *testing.T) {
h := newAgentTestHarness(t, 5000)
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := h.agent.Execute(ctx, domainHub.AgentRequest{Name: "test"})
assert.ErrorIs(t, err, context.Canceled)
}
func TestAgentConnection_ExecuteConnectionCanceled(t *testing.T) {
h := newAgentTestHarness(t, 5000)
h.cancel()
_, err := h.agent.Execute(context.Background(), domainHub.AgentRequest{Name: "test"})
assert.ErrorIs(t, err, ErrConnectionClose)
}
func TestAgentConnection_UnknownResponseID(t *testing.T) {
h := newAgentTestHarness(t, 5000)
done := make(chan struct{})
go func() {
_ = h.agent.Listen()
close(done)
}()
h.recvCh <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_CommandResponse{
CommandResponse: &pb.CommandResponse{
RequestId: "unknown",
Success: true,
Output: "ok",
}}}
h.cancel()
waitFor(t, done, 500*time.Millisecond, "timeout waiting for listen stop")
}
func TestAgentConnection_HeartbeatErrorDoesNotStop(t *testing.T) {
h := newAgentTestHarness(t, 5000)
h.heartbeat.mu.Lock()
h.heartbeat.err = errors.New("db error")
h.heartbeat.mu.Unlock()
go func() {
_ = h.agent.Listen()
}()
h.recvCh <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_Heartbeat{
Heartbeat: &pb.Heartbeat{
Timestamp: time.Now().Unix(),
Metrics: &pb.SystemMetrics{CpuUsage: 0.2, MemoryUsage: 0.1, DiskUsage: 0.3},
}}}
waitFor(t, h.heartbeat.doneCh, 500*time.Millisecond, "timeout waiting for heartbeat")
h.cancel()
}
func TestAgentConnection_ConcurrentExecute(t *testing.T) {
h := newAgentTestHarness(t, 5000)
go func() {
_ = h.agent.Listen()
}()
responses := make(chan domainHub.AgentResponse, 2)
go func() {
resp, _ := h.agent.Execute(h.ctx, domainHub.AgentRequest{Name: "cmd-1"})
responses <- resp
}()
go func() {
resp, _ := h.agent.Execute(h.ctx, domainHub.AgentRequest{Name: "cmd-2"})
responses <- resp
}()
first := <-h.sendCh
second := <-h.sendCh
// ответы приходят в обратном порядке
h.recvCh <- commandResponseEvent(second.RequestId, "second")
h.recvCh <- commandResponseEvent(first.RequestId, "first")
resp1 := <-responses
resp2 := <-responses
assert.Assert(t, resp1.Output == "first" || resp1.Output == "second")
assert.Assert(t, resp2.Output == "first" || resp2.Output == "second")
assert.Assert(t, resp1.Output != resp2.Output)
h.cancel()
}