add support stream in hub

Create connection manager for stream
This commit is contained in:
Станислав
2026-05-23 14:16:18 +03:00
committed by lorsan
30 changed files with 1255 additions and 83 deletions
+1 -2
View File
@@ -37,7 +37,7 @@
Проект состоит из трех основных компонентов:
1. **Hub (The Brain):** Центральный сервис на Go/Python. Хранит конфигурации, управляет состоянием, оркестрирует Runbooks и управляет Telegram-ботом. База данных — Postgres.
1. **Hub (The Brain):** Центральный сервис на Go/Python. Хранит конфигурации, управляет состоянием, оркестрирует Runbooks и управляет Telegram-ботом. База данных — SQLite.
2. **Agent (The Executor):** Легковесный бинарный файл, работающий непосредственно на целевом хосте. Связь с Хабом через **gRPC Stream**.
3. **Telegram Interface:** Основной UI для оперативного управления с использованием Inline-кнопок для быстрых действий (Ack, Mute, Restart).
@@ -66,7 +66,6 @@
## 🔒 Безопасность
* **No Inbound Ports:** Агент не слушает порты. Соединение всегда инициируется изнутри вашей сети.
* **mTLS / Token Auth:** Весь трафик между Агентом и Хабом зашифрован.
* **Hardened Commands:** Невозможно выполнить `rm -rf /` — доступны только те команды, что описаны в коде агента.
*Created with ❤️ for Stasik.*
+9 -1
View File
@@ -2,6 +2,7 @@ package rpc
import (
"context"
"fmt"
pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops"
"github.com/lorsanstand/HomeOps-Hub/shared/domain"
@@ -28,5 +29,12 @@ func (c *Connection) Hub() pb.HubClient {
func (c *Connection) RegisterAgent(ctx context.Context, RegisterData domain.RegisterAgentRequest) (domain.RegisterAgentResponse, error) {
ResponseData, err := c.Hub().RegisterAgent(ctx, new(domain.ToGRPCAgentRequest(RegisterData)))
return domain.ToDomainAgentResponse(ResponseData), err
if err != nil {
return domain.RegisterAgentResponse{}, fmt.Errorf("send register agent: %w", err)
}
response, err := domain.ToDomainAgentResponse(ResponseData)
if err != nil {
return domain.RegisterAgentResponse{}, fmt.Errorf("casting response: %w", err)
}
return response, nil
}
@@ -1,24 +0,0 @@
services:
postgres-db:
image: postgres:latest
volumes:
- pgdata:/var/lib/postgresql
environment:
POSTGRES_DB: "${DB_NAME}"
POSTGRES_USER: "${DB_USER}"
POSTGRES_PASSWORD: "${DB_PASS}"
networks:
- homeops-dev
ports:
- "${DB_PORT}:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${DB_USER}"]
interval: 5s
retries: 5
restart: always
volumes:
pgdata:
networks:
homeops-dev:
+6 -2
View File
@@ -5,12 +5,15 @@ go 1.26.1
require (
github.com/docker/docker v28.5.2+incompatible
github.com/golang-migrate/migrate/v4 v4.19.1
github.com/google/uuid v1.6.0
github.com/ilyakaznacheev/cleanenv v1.5.0
github.com/mattn/go-sqlite3 v1.14.44
github.com/rs/zerolog v1.35.1
github.com/stretchr/testify v1.11.1
google.golang.org/grpc v1.81.0
google.golang.org/protobuf v1.36.11
gopkg.in/yaml.v3 v3.0.1
gotest.tools/v3 v3.5.2
)
require (
@@ -20,13 +23,14 @@ require (
github.com/containerd/errdefs v1.0.0 // indirect
github.com/containerd/errdefs/pkg v0.3.0 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/distribution/reference v0.6.0 // indirect
github.com/docker/go-connections v0.7.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/google/go-cmp v0.7.0 // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/lib/pq v1.12.3 // indirect
@@ -39,6 +43,7 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0 // indirect
@@ -54,7 +59,6 @@ require (
golang.org/x/time v0.15.0 // indirect
golang.org/x/tools v0.43.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260504160031-60b97b32f348 // indirect
gotest.tools/v3 v3.5.2 // indirect
lukechampine.com/uint128 v1.2.0 // indirect
modernc.org/cc/v3 v3.36.3 // indirect
modernc.org/ccgo/v3 v3.16.9 // indirect
+24 -13
View File
@@ -5,12 +5,15 @@ import (
"fmt"
standartlog "log"
"net"
"os"
hubdir "github.com/lorsanstand/HomeOps-Hub/hub/internal"
"github.com/lorsanstand/HomeOps-Hub/hub/internal/migrator"
grpcserv "github.com/lorsanstand/HomeOps-Hub/hub/internal/rpc"
"github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager"
"github.com/lorsanstand/HomeOps-Hub/hub/internal/service/hub_service"
"github.com/lorsanstand/HomeOps-Hub/hub/internal/store"
"github.com/lorsanstand/HomeOps-Hub/hub/internal/utils/notifier"
"github.com/lorsanstand/HomeOps-Hub/shared/config"
"github.com/lorsanstand/HomeOps-Hub/shared/log"
_ "github.com/mattn/go-sqlite3"
@@ -39,7 +42,7 @@ func (a *App) Run() {
DBConn, err := sql.Open("sqlite", "database.db")
if err != nil {
a.log.Error().Err(err).Msg("failed to connect to the database")
return
os.Exit(1)
}
defer func() {
@@ -48,34 +51,31 @@ func (a *App) Run() {
}
}()
mgrt, err := migrator.NewMigrator(hubdir.MigrationsFS, "migrations")
if err != nil {
a.log.Error().Err(err).Msg("failed to create migrator")
return
}
a.log.Info().Msg("applying database migrations")
if err = mgrt.ApplyMigrations(DBConn); err != nil {
a.log.Error().Err(err).Msg("migrations failed to apply")
return
if err := applyMigrations(DBConn); err != nil {
a.log.Error().Err(err).Msg("")
os.Exit(1)
}
a.log.Info().Msg("migrations applied successfully")
hubStore := store.NewHubStore(DBConn)
hubService := hub_service.NewHubService(hubStore, a.log)
statusNotifier := notifier.NewStatusNotifier()
connManger := connection_manager.NewConnectionManager(hubStore, statusNotifier, a.log)
a.log.Info().Msg("starting hub service")
err = a.hubServe(hubService)
err = a.hubServe(hubService, connManger)
if err != nil {
a.log.Error().Err(err).Msg("hub service failed to start")
return
}
}
func (a *App) hubServe(hubService *hub_service.HubService) error {
func (a *App) hubServe(hubService *hub_service.HubService, manager *connection_manager.ConnectionManager) error {
address := fmt.Sprintf("0.0.0.0:%v", a.cfg.Port)
server := grpcserv.NewHubHandler(hubService, a.log)
server := grpcserv.NewHubHandler(hubService, manager, a.log)
lis, err := net.Listen("tcp", address)
if err != nil {
@@ -93,3 +93,14 @@ func (a *App) hubServe(hubService *hub_service.HubService) error {
return nil
}
func applyMigrations(db *sql.DB) error {
mgrt, err := migrator.NewMigrator(hubdir.MigrationsFS, "migrations")
if err != nil {
return fmt.Errorf("failed to create migrator: %w", err)
}
if err = mgrt.ApplyMigrations(db); err != nil {
return fmt.Errorf("migrations failed to apply: %w", err)
}
return nil
}
+42
View File
@@ -0,0 +1,42 @@
package domain
import "time"
type AgentRequest struct {
Name string
Args map[string]string
TimeOut int
}
type AgentResponse struct {
Success bool
Output string
Error string
ExecTimeMS int
}
type AgentAlert struct {
Timestamp int
Level string
Title string
Description string
}
type SystemMetrics struct {
CpuUsage float64
MemoryUsage float64
DiskUsage float64
}
type HeartbeatModel struct {
ID int
AgentID string
Timestamp time.Time
Metrics SystemMetrics
}
type CreateHeartbeatModel struct {
AgentID string
Timestamp time.Time
Metrics SystemMetrics
}
@@ -0,0 +1,3 @@
DROP INDEX idx_heartbeat_agent_id;
DROP TABLE if exists heartbeats;
@@ -0,0 +1,11 @@
CREATE TABLE heartbeats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_id VARCHAR(32) NOT NULL,
cpu_usage FLOAT NOT NULL ,
memory_usage FLOAT NOT NULL ,
disk_usage FLOAT NOT NULL ,
heartbeat_timestamp TIMESTAMP NOT NULL,
FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE
);
CREATE UNIQUE INDEX idx_heartbeat_agent_id ON heartbeats (agent_id);
+5
View File
@@ -0,0 +1,5 @@
package rpc
import "fmt"
var ErrFailedRegister = fmt.Errorf("failed register agent")
+24 -5
View File
@@ -2,8 +2,10 @@ package rpc
import (
"context"
"fmt"
pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops"
"github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager"
"github.com/lorsanstand/HomeOps-Hub/shared/domain"
"github.com/rs/zerolog"
"google.golang.org/grpc"
@@ -14,15 +16,20 @@ type HubService interface {
RegisterAgent(ctx context.Context, data domain.RegisterAgentRequest) (domain.RegisterAgentResponse, error)
}
type ConnectionManager interface {
NewConnection(stream connection_manager.StreamConn) error
}
type HubHandler struct {
pb.UnimplementedHubServer
log zerolog.Logger
GrpcServer *grpc.Server
hub HubService
streamManager ConnectionManager
}
func NewHubHandler(HubServ HubService, logger zerolog.Logger) *HubHandler {
hub := &HubHandler{log: logger, hub: HubServ}
func NewHubHandler(HubServ HubService, manager ConnectionManager, logger zerolog.Logger) *HubHandler {
hub := &HubHandler{log: logger, hub: HubServ, streamManager: manager}
grpcServer := grpc.NewServer()
pb.RegisterHubServer(grpcServer, hub)
@@ -39,12 +46,24 @@ func (h *HubHandler) Ping(ctx context.Context, _ *emptypb.Empty) (*pb.PongRespon
func (h *HubHandler) RegisterAgent(ctx context.Context, request *pb.RegisterAgentRequest) (*pb.RegisterAgentResponse, error) {
h.log.Debug().Str("agentID", request.AgentId).Str("agentName", request.AgentName).Msg("register agent request received")
data := domain.ToDomainAgentRequest(request)
data, err := domain.ToDomainAgentRequest(request)
if err != nil {
h.log.Error().Err(err).Msg("failed to casting request")
return &pb.RegisterAgentResponse{}, ErrFailedRegister
}
resp, err := h.hub.RegisterAgent(ctx, data)
if err != nil {
h.log.Error().Err(err).Str("agentID", request.AgentId).Msg("register agent request failed")
return domain.ToGRPCAgentResponse(resp), err
h.log.Error().Err(err).Msg("failed register agent")
return domain.ToGRPCAgentResponse(resp), ErrFailedRegister
}
h.log.Info().Str("agentID", resp.AgentID).Msg("register agent request completed")
return domain.ToGRPCAgentResponse(resp), nil
}
func (h *HubHandler) StreamConnection(stream grpc.BidiStreamingServer[pb.AgentEvent, pb.ServerCommandRequest]) error {
err := h.streamManager.NewConnection(stream)
if err != nil {
return fmt.Errorf("accept connection: %w", err)
}
return nil
}
@@ -0,0 +1,157 @@
package connection_manager
import (
"context"
"fmt"
"io"
"time"
"github.com/google/uuid"
pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops"
domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain"
"github.com/rs/zerolog"
)
type AgentConnection struct {
stream StreamConn
heartbeat heartbeatStore
log zerolog.Logger
status StatusAgent
AgentID string
response *ResponseStore
ctx context.Context
cancel context.CancelFunc
heartbeatTimeoutMS int
}
func newAgentConnection(agentID string, stream StreamConn, heartbeat heartbeatStore, status StatusAgent, heartbeatTimeoutMS int, logger zerolog.Logger) *AgentConnection {
response := NewResponseStore()
logger = logger.With().Str("agentID", agentID).Logger()
ctx, cancel := context.WithCancel(stream.Context())
return &AgentConnection{stream: stream, response: response, heartbeat: heartbeat, log: logger, AgentID: agentID, status: status, ctx: ctx, cancel: cancel, heartbeatTimeoutMS: heartbeatTimeoutMS}
}
func (a *AgentConnection) Listen() error {
heartbeatsCh := make(chan domainHub.CreateHeartbeatModel, 5)
streamRecvCh := make(chan *pb.AgentEvent, 5)
go a.listenHeartbeat(heartbeatsCh)
go a.listenStream(streamRecvCh)
defer func() {
a.status.Offline()
close(heartbeatsCh)
a.Close()
}()
for {
select {
case <-a.ctx.Done():
return a.ctx.Err()
case msg, ok := <-streamRecvCh:
if !ok {
return nil
}
switch x := msg.Event.(type) {
case *pb.AgentEvent_Heartbeat:
heartbeat := toCreateHeartbeatModel(a.AgentID, x)
heartbeatsCh <- heartbeat
case *pb.AgentEvent_CommandResponse:
ch, ok := a.response.Read(x.CommandResponse.RequestId)
if !ok {
a.log.Warn().Str("requestID", x.CommandResponse.RequestId).Msg("not found channel for send response")
continue
}
response := toAgentResponse(x)
ch <- response
}
default:
}
}
}
func (a *AgentConnection) listenStream(ch chan *pb.AgentEvent) {
defer close(ch)
for {
agentEvent, err := a.stream.Recv()
if err == io.EOF {
return
}
if err != nil {
a.log.Warn().Err(err).Msg("close stream")
return
}
ch <- agentEvent
}
}
func (a *AgentConnection) listenHeartbeat(heartbeats <-chan domainHub.CreateHeartbeatModel) {
lastHeartbeat := 0
timer := time.NewTicker(time.Duration(a.heartbeatTimeoutMS) * time.Millisecond)
defer timer.Stop()
for {
select {
case <-timer.C:
if lastHeartbeat < 4 {
lastHeartbeat += 1
a.status.Offline()
continue
}
a.log.Warn().Msg("agent not send heartbeat")
a.Close()
return
case heartbeat, ok := <-heartbeats:
if !ok {
return
}
a.log.Debug().
Float64("cpu usage", heartbeat.Metrics.CpuUsage).
Float64("disk usage", heartbeat.Metrics.DiskUsage).
Float64("memory usage", heartbeat.Metrics.MemoryUsage).Msg("")
a.status.Online()
lastHeartbeat = 0
err := a.heartbeat.CreateHeartbeat(a.ctx, heartbeat)
if err != nil {
a.log.Error().Err(err).Msg("failed to write heartbeat")
}
case <-a.ctx.Done():
return
}
}
}
func (a *AgentConnection) Execute(ctx context.Context, request domainHub.AgentRequest) (domainHub.AgentResponse, error) {
requestID := uuid.New().String()
ch := make(chan domainHub.AgentResponse, 1)
defer close(ch)
a.response.Write(requestID, ch)
defer a.response.Delete(requestID)
err := a.stream.Send(new(toGRPCCommandRequest(requestID, request)))
if err != nil {
return domainHub.AgentResponse{}, fmt.Errorf("execute command: %w", err)
}
a.log.Info().Str("requestID", requestID).Str("command", request.Name).Msg("send command")
select {
case <-a.ctx.Done():
return domainHub.AgentResponse{}, ErrConnectionClose
case <-ctx.Done():
return domainHub.AgentResponse{}, ctx.Err()
case response := <-ch:
a.log.Info().Str("requestID", requestID).Msg("received response")
return response, nil
}
}
func (a *AgentConnection) Close() {
a.cancel()
}
@@ -0,0 +1,324 @@
package connection_manager
import (
"context"
"errors"
"sync"
"testing"
"time"
pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops"
domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain"
"github.com/rs/zerolog"
"gotest.tools/v3/assert"
)
type agentTestHarness struct {
ctx context.Context
cancel context.CancelFunc
stream *streamMock
heartbeat *heartBeatMock
status *statusMock
agent *AgentConnection
recvCh chan *pb.AgentEvent
sendCh chan *pb.ServerCommandRequest
}
func newAgentTestHarness(t *testing.T, heartbeatTimeoutMS int) *agentTestHarness {
t.Helper()
sendStream := make(chan *pb.ServerCommandRequest, 4)
recvStream := make(chan *pb.AgentEvent, 4)
ctx, cancel := context.WithCancel(context.Background())
stream := &streamMock{recvCh: recvStream, sendCh: sendStream, ctx: ctx}
heartbeat := &heartBeatMock{doneCh: make(chan struct{}, 2)}
status := &statusMock{doneCh: make(chan struct{}, 2)}
agent := newAgentConnection("123", stream, heartbeat, status, heartbeatTimeoutMS, zerolog.New(nil))
t.Cleanup(func() {
cancel()
})
return &agentTestHarness{
ctx: ctx, cancel: cancel, stream: stream, heartbeat: heartbeat, status: status,
agent: agent, recvCh: recvStream, sendCh: sendStream,
}
}
func waitFor(t *testing.T, ch <-chan struct{}, timeout time.Duration, message string) {
t.Helper()
select {
case <-ch:
case <-time.After(timeout):
t.Fatal(message)
}
}
func commandResponseEvent(requestID, output string) *pb.AgentEvent {
return &pb.AgentEvent{
AgentId: "agent-1",
Event: &pb.AgentEvent_CommandResponse{
CommandResponse: &pb.CommandResponse{
RequestId: requestID,
Success: true,
Output: output,
},
},
}
}
func TestAgentConnection_Heartbeat(t *testing.T) {
h := newAgentTestHarness(t, 5000)
done := make(chan struct{})
go func() {
_ = h.agent.Listen()
close(done)
}()
h.recvCh <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_Heartbeat{
Heartbeat: &pb.Heartbeat{
Timestamp: time.Now().Unix(),
Metrics: &pb.SystemMetrics{CpuUsage: 0.5, MemoryUsage: 0.3, DiskUsage: 0.7},
}}}
waitFor(t, h.heartbeat.doneCh, 500*time.Millisecond, "timeout waiting for heartbeat")
waitFor(t, h.status.doneCh, 500*time.Millisecond, "timeout waiting for status online")
h.heartbeat.mu.Lock()
count := h.heartbeat.countUse
h.heartbeat.mu.Unlock()
assert.Equal(t, count, 1)
assert.Equal(t, h.status.IsOnline(), true)
h.cancel()
waitFor(t, done, 500*time.Millisecond, "timeout waiting for listen stop")
assert.Equal(t, h.status.IsOnline(), false)
}
func TestAgentConnection_Execute(t *testing.T) {
h := newAgentTestHarness(t, 5000)
go h.agent.Listen()
// Данные для проверки
requestID := make(chan domainHub.AgentResponse)
output := "test output"
name := "test name"
go func() {
response, _ := h.agent.Execute(h.ctx, domainHub.AgentRequest{
Name: name,
Args: nil,
TimeOut: 0,
})
requestID <- response
}()
request := <-h.sendCh
assert.Equal(t, name, request.Name)
h.recvCh <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_CommandResponse{
CommandResponse: &pb.CommandResponse{
RequestId: request.RequestId,
Success: true,
Output: output,
}}}
select {
case response := <-requestID:
assert.Equal(t, output, response.Output)
case <-time.After(5 * time.Second):
t.Fatal("timeout waiting for response")
}
}
func TestAgentConnection_HeartbeatTimeout(t *testing.T) {
h := newAgentTestHarness(t, 200)
listenDone := make(chan error, 1)
execDone := make(chan error, 1)
go func() {
listenDone <- h.agent.Listen()
}()
go func() {
_, err := h.agent.Execute(h.ctx, domainHub.AgentRequest{
Name: "test",
Args: nil,
TimeOut: 0,
})
execDone <- err
}()
timeout := time.After(2 * time.Second)
gotListen := false
gotExec := false
for !(gotListen && gotExec) {
select {
case err := <-listenDone:
assert.ErrorIs(t, err, context.Canceled)
gotListen = true
case err := <-execDone:
assert.ErrorIs(t, err, ErrConnectionClose)
gotExec = true
case <-timeout:
h.cancel()
t.Fatal("timeout waiting for heartbeat timeout")
}
}
}
func TestAgentConnection_ConnectionClose(t *testing.T) {
h := newAgentTestHarness(t, 5000)
var wg sync.WaitGroup
wg.Add(2)
go func() {
err := h.agent.Listen()
assert.ErrorIs(t, err, context.Canceled)
wg.Done()
}()
go func() {
_, err := h.agent.Execute(context.Background(), domainHub.AgentRequest{
Name: "test",
Args: nil,
TimeOut: 0,
})
assert.ErrorIs(t, err, ErrConnectionClose)
wg.Done()
}()
h.cancel()
wg.Wait()
}
func TestAgentConnection_ExecuteClose(t *testing.T) {
h := newAgentTestHarness(t, 5000)
ctxExecute, cancelExecute := context.WithCancel(context.Background())
t.Cleanup(cancelExecute)
executeCh := make(chan struct{})
go h.agent.Listen()
go func() {
_, err := h.agent.Execute(ctxExecute, domainHub.AgentRequest{
Name: "test",
Args: nil,
TimeOut: 0,
})
assert.ErrorIs(t, err, context.Canceled)
executeCh <- struct{}{}
}()
cancelExecute()
waitFor(t, executeCh, 500*time.Millisecond, "timeout waiting for execute close")
}
func TestAgentConnection_ListenEOF(t *testing.T) {
h := newAgentTestHarness(t, 5000)
h.stream.CloseRecv()
err := h.agent.Listen()
assert.NilError(t, err)
}
func TestAgentConnection_ExecuteSendError(t *testing.T) {
h := newAgentTestHarness(t, 5000)
h.stream.mu.Lock()
h.stream.sendErr = errors.New("send failure")
h.stream.mu.Unlock()
_, err := h.agent.Execute(h.ctx, domainHub.AgentRequest{Name: "test"})
assert.ErrorContains(t, err, "execute command")
}
func TestAgentConnection_ExecuteContextCanceled(t *testing.T) {
h := newAgentTestHarness(t, 5000)
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := h.agent.Execute(ctx, domainHub.AgentRequest{Name: "test"})
assert.ErrorIs(t, err, context.Canceled)
}
func TestAgentConnection_ExecuteConnectionCanceled(t *testing.T) {
h := newAgentTestHarness(t, 5000)
h.cancel()
_, err := h.agent.Execute(context.Background(), domainHub.AgentRequest{Name: "test"})
assert.ErrorIs(t, err, ErrConnectionClose)
}
func TestAgentConnection_UnknownResponseID(t *testing.T) {
h := newAgentTestHarness(t, 5000)
done := make(chan struct{})
go func() {
_ = h.agent.Listen()
close(done)
}()
h.recvCh <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_CommandResponse{
CommandResponse: &pb.CommandResponse{
RequestId: "unknown",
Success: true,
Output: "ok",
}}}
h.cancel()
waitFor(t, done, 500*time.Millisecond, "timeout waiting for listen stop")
}
func TestAgentConnection_HeartbeatErrorDoesNotStop(t *testing.T) {
h := newAgentTestHarness(t, 5000)
h.heartbeat.mu.Lock()
h.heartbeat.err = errors.New("db error")
h.heartbeat.mu.Unlock()
go h.agent.Listen()
h.recvCh <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_Heartbeat{
Heartbeat: &pb.Heartbeat{
Timestamp: time.Now().Unix(),
Metrics: &pb.SystemMetrics{CpuUsage: 0.2, MemoryUsage: 0.1, DiskUsage: 0.3},
}}}
waitFor(t, h.heartbeat.doneCh, 500*time.Millisecond, "timeout waiting for heartbeat")
h.cancel()
}
func TestAgentConnection_ConcurrentExecute(t *testing.T) {
h := newAgentTestHarness(t, 5000)
go h.agent.Listen()
responses := make(chan domainHub.AgentResponse, 2)
go func() {
resp, _ := h.agent.Execute(h.ctx, domainHub.AgentRequest{Name: "cmd-1"})
responses <- resp
}()
go func() {
resp, _ := h.agent.Execute(h.ctx, domainHub.AgentRequest{Name: "cmd-2"})
responses <- resp
}()
first := <-h.sendCh
second := <-h.sendCh
// ответы приходят в обратном порядке
h.recvCh <- commandResponseEvent(second.RequestId, "second")
h.recvCh <- commandResponseEvent(first.RequestId, "first")
resp1 := <-responses
resp2 := <-responses
assert.Assert(t, resp1.Output == "first" || resp1.Output == "second")
assert.Assert(t, resp2.Output == "first" || resp2.Output == "second")
assert.Assert(t, resp1.Output != resp2.Output)
h.cancel()
}
@@ -0,0 +1,7 @@
package connection_manager
import "errors"
var ErrConnectionClose = errors.New("connection close")
var ErrNotFoundConn = errors.New("agent connection not found")
@@ -0,0 +1,27 @@
package connection_manager
import (
"context"
pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops"
domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain"
)
type StreamConn interface {
Send(request *pb.ServerCommandRequest) error
Recv() (*pb.AgentEvent, error)
Context() context.Context
}
type heartbeatStore interface {
CreateHeartbeat(ctx context.Context, heartbeat domainHub.CreateHeartbeatModel) error
}
type StatusAgent interface {
Offline()
Online()
}
type statusNotifier interface {
New(agentID string) StatusAgent
}
@@ -0,0 +1,69 @@
package connection_manager
import (
"context"
"fmt"
"github.com/rs/zerolog"
"google.golang.org/grpc/metadata"
)
const heartbeatTimeoutMS = 6000
type ConnectionManager struct {
heartbeat heartbeatStore
log zerolog.Logger
status statusNotifier
agentConnStore *AgentConnStore
}
func NewConnectionManager(heartbeat heartbeatStore, status statusNotifier, logger zerolog.Logger) *ConnectionManager {
return &ConnectionManager{heartbeat: heartbeat, log: logger, status: status, agentConnStore: NewAgentConnStore()}
}
func (c *ConnectionManager) NewConnection(stream StreamConn) error {
AgentID, err := agentIDFromMetadata(stream.Context())
if err != nil {
c.log.Error().Err(err).Msg("missing agent id in metadata")
return fmt.Errorf("get agent id: %w", err)
}
c.log.Info().Str("agentID", AgentID).Msg("connection accepted")
status := c.status.New(AgentID)
agent := newAgentConnection(AgentID, stream, c.heartbeat, status, heartbeatTimeoutMS, c.log)
c.agentConnStore.Add(AgentID, agent)
defer c.agentConnStore.Delete(AgentID)
c.log.Debug().Str("agentID", AgentID).Msg("start listening")
return agent.Listen()
}
func (c *ConnectionManager) GetConnection(AgentID string) (*AgentConnection, error) {
agent := c.agentConnStore.Get(AgentID)
if agent == nil {
return nil, ErrNotFoundConn
}
return agent, nil
}
func (c *ConnectionManager) GetAllAgentID() []string {
return c.agentConnStore.GetAllAgentID()
}
func agentIDFromMetadata(ctx context.Context) (string, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", fmt.Errorf("metadata not found")
}
values := md.Get("agent-id")
if len(values) == 0 || values[0] == "" {
return "", fmt.Errorf("agent-id not found")
}
return values[0], nil
}
@@ -0,0 +1,116 @@
package connection_manager
import (
"context"
"testing"
"time"
pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/metadata"
"gotest.tools/v3/assert"
)
type connectionManagerTestHarness struct {
heartbeat *heartBeatMock
status *statusNotifierMock
manager *ConnectionManager
}
func newConnectionManagerTestHarness(t *testing.T) *connectionManagerTestHarness {
t.Helper()
heartbeat := &heartBeatMock{doneCh: make(chan struct{}, 2)}
status := &statusNotifierMock{agentIDCh: make(chan string, 1)}
manager := NewConnectionManager(heartbeat, status, zerolog.New(nil))
return &connectionManagerTestHarness{manager: manager, status: status, heartbeat: heartbeat}
}
func newMetadataAgentID(t *testing.T, agentID string) metadata.MD {
t.Helper()
return metadata.New(map[string]string{"agent-id": agentID})
}
func TestNewConnectionManager_NewConnection(t *testing.T) {
h := newConnectionManagerTestHarness(t)
agentID := "123"
ctx := metadata.NewIncomingContext(context.Background(), newMetadataAgentID(t, agentID))
stream := streamMock{ctx: ctx,
recvCh: make(chan *pb.AgentEvent, 1),
sendCh: make(chan *pb.ServerCommandRequest, 1),
}
go func() {
_ = h.manager.NewConnection(&stream)
}()
select {
case ID := <-h.status.agentIDCh:
require.Equal(t, agentID, ID)
case <-time.After(200 * time.Millisecond):
t.Fatalf("get agent id for notifier")
}
agentIDs := h.manager.GetAllAgentID()
assert.Equal(t, agentID, agentIDs[0])
agent, err := h.manager.GetConnection(agentID)
assert.NilError(t, err)
require.NotNil(t, agent)
}
func TestNewConnectionManager_NewConnectionNotAgentID(t *testing.T) {
h := newConnectionManagerTestHarness(t)
stream := streamMock{ctx: context.Background(),
recvCh: make(chan *pb.AgentEvent, 1),
sendCh: make(chan *pb.ServerCommandRequest, 1),
}
wait := make(chan struct{})
go func() {
err := h.manager.NewConnection(&stream)
assert.ErrorContains(t, err, "get agent id")
wait <- struct{}{}
}()
waitFor(t, wait, 5000, "timeout new connection")
}
func TestNewConnectionManager_AgentNotFound(t *testing.T) {
h := newConnectionManagerTestHarness(t)
_, err := h.manager.GetConnection("123")
assert.ErrorIs(t, ErrNotFoundConn, err)
agentIDs := h.manager.GetAllAgentID()
assert.Equal(t, len(agentIDs), 0)
}
func Test_agentIDFromMetadata(t *testing.T) {
agentID := "123"
ctx := metadata.NewIncomingContext(context.Background(), newMetadataAgentID(t, agentID))
id, err := agentIDFromMetadata(ctx)
assert.NilError(t, err)
assert.Equal(t, id, agentID)
}
func Test_agentIDFromMetadata_MetadataNotFound(t *testing.T) {
ctx := context.Background()
_, err := agentIDFromMetadata(ctx)
assert.ErrorContains(t, err, "metadata not found")
}
func Test_agentIDFromMetadata_AgentIDNotFound(t *testing.T) {
ctx := metadata.NewIncomingContext(context.Background(), newMetadataAgentID(t, ""))
_, err := agentIDFromMetadata(ctx)
assert.ErrorContains(t, err, "agent-id not found")
}
@@ -0,0 +1,40 @@
package connection_manager
import (
"time"
pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops"
domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain"
)
func toCreateHeartbeatModel(agentID string, heartbeat *pb.AgentEvent_Heartbeat) domainHub.CreateHeartbeatModel {
timestamp := time.Unix(heartbeat.Heartbeat.Timestamp, 0)
return domainHub.CreateHeartbeatModel{
AgentID: agentID,
Timestamp: timestamp,
Metrics: domainHub.SystemMetrics{
MemoryUsage: float64(heartbeat.Heartbeat.Metrics.MemoryUsage),
CpuUsage: float64(heartbeat.Heartbeat.Metrics.CpuUsage),
DiskUsage: float64(heartbeat.Heartbeat.Metrics.DiskUsage),
},
}
}
func toGRPCCommandRequest(requestID string, request domainHub.AgentRequest) pb.ServerCommandRequest {
return pb.ServerCommandRequest{
RequestId: requestID,
Name: request.Name,
TimeoutSeconds: int64(request.TimeOut),
Args: request.Args,
}
}
func toAgentResponse(response *pb.AgentEvent_CommandResponse) domainHub.AgentResponse {
return domainHub.AgentResponse{
Success: response.CommandResponse.Success,
Error: response.CommandResponse.Error,
Output: response.CommandResponse.Output,
ExecTimeMS: int(response.CommandResponse.ExecTimeMs),
}
}
@@ -0,0 +1,119 @@
package connection_manager
import (
"context"
"io"
"sync"
pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops"
domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain"
)
type streamMock struct {
recvCh chan *pb.AgentEvent
sendCh chan *pb.ServerCommandRequest
ctx context.Context
mu sync.Mutex
sendErr error
recvErr error
closeOnce sync.Once
}
func (f *streamMock) Context() context.Context {
return f.ctx
}
func (f *streamMock) Send(request *pb.ServerCommandRequest) error {
f.mu.Lock()
err := f.sendErr
f.mu.Unlock()
if err != nil {
return err
}
f.sendCh <- request
return nil
}
func (f *streamMock) Recv() (*pb.AgentEvent, error) {
f.mu.Lock()
recvErr := f.recvErr
f.mu.Unlock()
if recvErr != nil {
return nil, recvErr
}
select {
case msg, ok := <-f.recvCh:
if !ok {
return nil, io.EOF
}
return msg, nil
case <-f.ctx.Done():
return nil, f.ctx.Err()
}
}
func (f *streamMock) CloseRecv() {
f.closeOnce.Do(func() {
close(f.recvCh)
})
}
type heartBeatMock struct {
mu sync.Mutex
countUse int
doneCh chan struct{}
err error
}
func (h *heartBeatMock) CreateHeartbeat(ctx context.Context, heartbeat domainHub.CreateHeartbeatModel) error {
h.mu.Lock()
h.countUse += 1
err := h.err
h.mu.Unlock()
select {
case h.doneCh <- struct{}{}:
default:
}
return err
}
type statusMock struct {
mu sync.Mutex
online bool
doneCh chan struct{}
}
func (s *statusMock) Offline() {
s.mu.Lock()
s.online = false
s.mu.Unlock()
}
func (s *statusMock) Online() {
s.mu.Lock()
s.online = true
s.mu.Unlock()
select {
case s.doneCh <- struct{}{}:
default:
}
}
func (s *statusMock) IsOnline() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.online
}
type statusNotifierMock struct {
agentIDCh chan string
}
func (s *statusNotifierMock) New(AgentID string) StatusAgent {
select {
case s.agentIDCh <- AgentID:
default:
}
return &statusMock{doneCh: make(chan struct{}, 2)}
}
@@ -0,0 +1,82 @@
package connection_manager
import (
"sync"
domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain"
)
type AgentConnStore struct {
mutex sync.RWMutex
store map[string]*AgentConnection
}
func NewAgentConnStore() *AgentConnStore {
return &AgentConnStore{store: make(map[string]*AgentConnection)}
}
func (a *AgentConnStore) Get(agentID string) *AgentConnection {
a.mutex.RLock()
defer a.mutex.RUnlock()
return a.store[agentID]
}
func (a *AgentConnStore) Add(agentID string, agentConn *AgentConnection) {
a.mutex.Lock()
defer a.mutex.Unlock()
a.store[agentID] = agentConn
}
func (a *AgentConnStore) Delete(agentID string) {
a.mutex.Lock()
defer a.mutex.Unlock()
delete(a.store, agentID)
}
func (a *AgentConnStore) Pop(agentID string) *AgentConnection {
a.mutex.Lock()
defer a.mutex.Unlock()
agent := a.store[agentID]
delete(a.store, agentID)
return agent
}
func (a *AgentConnStore) GetAllAgentID() []string {
a.mutex.RLock()
defer a.mutex.RUnlock()
var IDs []string
for ID := range a.store {
IDs = append(IDs, ID)
}
return IDs
}
type ResponseStore struct {
store map[string]chan domainHub.AgentResponse
mutex sync.RWMutex
}
func NewResponseStore() *ResponseStore {
data := make(map[string]chan domainHub.AgentResponse)
return &ResponseStore{store: data}
}
func (r *ResponseStore) Write(responseID string, channel chan domainHub.AgentResponse) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.store[responseID] = channel
}
func (r *ResponseStore) Read(responseID string) (chan domainHub.AgentResponse, bool) {
r.mutex.RLock()
defer r.mutex.RUnlock()
ch, ok := r.store[responseID]
return ch, ok
}
func (r *ResponseStore) Delete(responseID string) {
r.mutex.Lock()
defer r.mutex.Unlock()
delete(r.store, responseID)
}
+24
View File
@@ -76,3 +76,27 @@ func toDomainCapabilities(caps []byte) []domain.Capability {
}
return capabilities
}
func toDBHeartbeat(heartbeat domainHub.CreateHeartbeatModel) gen2.InsertHeartbeatParams {
return gen2.InsertHeartbeatParams{
AgentID: heartbeat.AgentID,
HeartbeatTimestamp: heartbeat.Timestamp,
CpuUsage: heartbeat.Metrics.CpuUsage,
DiskUsage: heartbeat.Metrics.DiskUsage,
MemoryUsage: heartbeat.Metrics.MemoryUsage,
}
}
func toHeartBeatModel(heartbeat gen2.Heartbeat) domainHub.HeartbeatModel {
return domainHub.HeartbeatModel{
Timestamp: heartbeat.HeartbeatTimestamp,
AgentID: heartbeat.AgentID,
ID: int(heartbeat.ID),
Metrics: domainHub.SystemMetrics{
CpuUsage: heartbeat.CpuUsage,
DiskUsage: heartbeat.DiskUsage,
MemoryUsage: heartbeat.MemoryUsage,
},
}
}
+1 -1
View File
@@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.30.0
// sqlc v1.31.1
// source: agent.sql
package gen
+1 -1
View File
@@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.30.0
// sqlc v1.31.1
package gen
@@ -0,0 +1,75 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.31.1
// source: heartbeat.sql
package gen
import (
"context"
"time"
)
const insertHeartbeat = `-- name: InsertHeartbeat :exec
INSERT INTO heartbeats (agent_id, cpu_usage, memory_usage, disk_usage, heartbeat_timestamp)
VALUES (?1, ?2, ?3, ?4, ?5)
`
type InsertHeartbeatParams struct {
AgentID string
CpuUsage float64
MemoryUsage float64
DiskUsage float64
HeartbeatTimestamp time.Time
}
func (q *Queries) InsertHeartbeat(ctx context.Context, arg InsertHeartbeatParams) error {
_, err := q.db.ExecContext(ctx, insertHeartbeat,
arg.AgentID,
arg.CpuUsage,
arg.MemoryUsage,
arg.DiskUsage,
arg.HeartbeatTimestamp,
)
return err
}
const selectHeartbeatsAfter = `-- name: SelectHeartbeatsAfter :many
SELECT id, agent_id, cpu_usage, memory_usage, disk_usage, heartbeat_timestamp FROM heartbeats
WHERE agent_id = ?1 AND heartbeat_timestamp > ?2
`
type SelectHeartbeatsAfterParams struct {
AgentID string
Timestamp time.Time
}
func (q *Queries) SelectHeartbeatsAfter(ctx context.Context, arg SelectHeartbeatsAfterParams) ([]Heartbeat, error) {
rows, err := q.db.QueryContext(ctx, selectHeartbeatsAfter, arg.AgentID, arg.Timestamp)
if err != nil {
return nil, err
}
defer rows.Close()
var items []Heartbeat
for rows.Next() {
var i Heartbeat
if err := rows.Scan(
&i.ID,
&i.AgentID,
&i.CpuUsage,
&i.MemoryUsage,
&i.DiskUsage,
&i.HeartbeatTimestamp,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
+10 -1
View File
@@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.30.0
// sqlc v1.31.1
package gen
@@ -19,3 +19,12 @@ type Agent struct {
Capabilities *string
RegisteredAt time.Time
}
type Heartbeat struct {
ID int64
AgentID string
CpuUsage float64
MemoryUsage float64
DiskUsage float64
HeartbeatTimestamp time.Time
}
@@ -0,0 +1,7 @@
-- name: InsertHeartbeat :exec
INSERT INTO heartbeats (agent_id, cpu_usage, memory_usage, disk_usage, heartbeat_timestamp)
VALUES (:agent_id, :cpu_usage, :memory_usage, :disk_usage, :heartbeat_timestamp);
-- name: SelectHeartbeatsAfter :many
SELECT * FROM heartbeats
WHERE agent_id = :agent_id AND heartbeat_timestamp > :timestamp;
+24 -2
View File
@@ -3,6 +3,7 @@ package store
import (
"context"
"database/sql"
"time"
domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain"
"github.com/lorsanstand/HomeOps-Hub/hub/internal/store/sqlc/gen"
@@ -21,8 +22,8 @@ func (h *HubStore) NewAgent(ctx context.Context, agent domainHub.CreateAgentMode
return h.queries.CreateAgent(ctx, toDBAgent(agent))
}
func (h *HubStore) GetAgentByAgentID(ctx context.Context, AgentID string) (domainHub.AgentModel, error) {
data, err := h.queries.GetAgentByAgentID(ctx, AgentID)
func (h *HubStore) GetAgentByAgentID(ctx context.Context, agentID string) (domainHub.AgentModel, error) {
data, err := h.queries.GetAgentByAgentID(ctx, agentID)
if err != nil {
return domainHub.AgentModel{}, err
}
@@ -34,3 +35,24 @@ func (h *HubStore) UpdateAgentByID(ctx context.Context, ID int, updateAgent doma
data.ID = int64(ID)
return h.queries.UpdateAgentByID(ctx, data)
}
func (h *HubStore) CreateHeartbeat(ctx context.Context, heartbeat domainHub.CreateHeartbeatModel) error {
data := toDBHeartbeat(heartbeat)
return h.queries.InsertHeartbeat(ctx, data)
}
func (h *HubStore) GetHeartbeatsByIDAfter(ctx context.Context, agentID string, timestamp time.Time) ([]domainHub.HeartbeatModel, error) {
data := gen.SelectHeartbeatsAfterParams{AgentID: agentID, Timestamp: timestamp}
heartbeats, err := h.queries.SelectHeartbeatsAfter(ctx, data)
if err != nil {
return []domainHub.HeartbeatModel{}, err
}
heartbeatsModel := make([]domainHub.HeartbeatModel, len(heartbeats))
for i, heartbeat := range heartbeats {
heartbeatsModel[i] = toHeartBeatModel(heartbeat)
}
return heartbeatsModel, nil
}
+28
View File
@@ -0,0 +1,28 @@
package notifier
import "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager"
// Временная заглушка
type StatusNotifier struct {
}
type Status struct {
agentID string
online bool
}
func NewStatusNotifier() *StatusNotifier {
return &StatusNotifier{}
}
func (s *StatusNotifier) New(agentID string) connection_manager.StatusAgent {
return &Status{agentID: agentID}
}
func (s *Status) Online() {
s.online = true
}
func (s *Status) Offline() {
s.online = false
}
-15
View File
@@ -9,11 +9,6 @@ import (
)
type Config struct {
DBHost string `env:"DB_HOST"`
DBPort int `env:"DB_PORT"`
DBPassword string `env:"DB_PASS"`
DBUser string `env:"DB_USER"`
DBName string `env:"DB_NAME"`
LogLevel string `env:"LOG_LEVEL" env-default:"INFO"`
Mode string `env:"MODE" env-default:"DEV"`
Port int `env:"PORT" env-default:"9000"`
@@ -33,16 +28,6 @@ func NewConfig() (*Config, error) {
return &cfg, nil
}
func (c *Config) GetURLPostgres() string {
return fmt.Sprintf(
"postgres://%v:%v@%v:%v/%v?sslmode=disable",
c.DBUser,
c.DBPassword,
c.DBHost,
c.DBPort,
c.DBName)
}
func (c *Config) GetLogLevel() zerolog.Level {
level, err := zerolog.ParseLevel(c.LogLevel)
if err != nil {
+9 -6
View File
@@ -1,15 +1,18 @@
package domain
import (
"fmt"
pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops"
)
func ToDomainAgentRequest(request *pb.RegisterAgentRequest) RegisterAgentRequest {
func ToDomainAgentRequest(request *pb.RegisterAgentRequest) (RegisterAgentRequest, error) {
if request == nil {
return RegisterAgentRequest{}
return RegisterAgentRequest{}, fmt.Errorf("request is empty")
}
return RegisterAgentRequest{
AgentVersion: request.Version,
AgentID: request.AgentId,
AgentName: request.AgentName,
Host: HostInfo{
@@ -18,18 +21,18 @@ func ToDomainAgentRequest(request *pb.RegisterAgentRequest) RegisterAgentRequest
Arch: request.Host.Arch,
},
Capabilities: ToDomainCapabilities(request.Capability),
}
}, nil
}
func ToDomainAgentResponse(response *pb.RegisterAgentResponse) RegisterAgentResponse {
func ToDomainAgentResponse(response *pb.RegisterAgentResponse) (RegisterAgentResponse, error) {
if response == nil {
return RegisterAgentResponse{}
return RegisterAgentResponse{}, fmt.Errorf("request is empty")
}
return RegisterAgentResponse{
AgentID: response.AgentId,
Heartbeat: int(response.HeartbeatIntervalSecond),
}
}, nil
}
func ToDomainCapabilities(capability []*pb.Capability) []Capability {