diff --git a/README.md b/README.md index b31d324..9731669 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ Проект состоит из трех основных компонентов: -1. **Hub (The Brain):** Центральный сервис на Go/Python. Хранит конфигурации, управляет состоянием, оркестрирует Runbooks и управляет Telegram-ботом. База данных — Postgres. +1. **Hub (The Brain):** Центральный сервис на Go/Python. Хранит конфигурации, управляет состоянием, оркестрирует Runbooks и управляет Telegram-ботом. База данных — SQLite. 2. **Agent (The Executor):** Легковесный бинарный файл, работающий непосредственно на целевом хосте. Связь с Хабом через **gRPC Stream**. 3. **Telegram Interface:** Основной UI для оперативного управления с использованием Inline-кнопок для быстрых действий (Ack, Mute, Restart). @@ -66,7 +66,6 @@ ## 🔒 Безопасность * **No Inbound Ports:** Агент не слушает порты. Соединение всегда инициируется изнутри вашей сети. -* **mTLS / Token Auth:** Весь трафик между Агентом и Хабом зашифрован. * **Hardened Commands:** Невозможно выполнить `rm -rf /` — доступны только те команды, что описаны в коде агента. *Created with ❤️ for Stasik.* \ No newline at end of file diff --git a/agent/internal/rpc/client.go b/agent/internal/rpc/client.go index 23f9ea3..a59766f 100644 --- a/agent/internal/rpc/client.go +++ b/agent/internal/rpc/client.go @@ -2,6 +2,7 @@ package rpc import ( "context" + "fmt" pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" "github.com/lorsanstand/HomeOps-Hub/shared/domain" @@ -28,5 +29,12 @@ func (c *Connection) Hub() pb.HubClient { func (c *Connection) RegisterAgent(ctx context.Context, RegisterData domain.RegisterAgentRequest) (domain.RegisterAgentResponse, error) { ResponseData, err := c.Hub().RegisterAgent(ctx, new(domain.ToGRPCAgentRequest(RegisterData))) - return domain.ToDomainAgentResponse(ResponseData), err + if err != nil { + return domain.RegisterAgentResponse{}, fmt.Errorf("send register agent: %w", err) + } + response, err := domain.ToDomainAgentResponse(ResponseData) + if err != nil { + return domain.RegisterAgentResponse{}, fmt.Errorf("casting response: %w", err) + } + return response, nil } diff --git a/deployments/compose/dev/hub.docker-compose.yaml b/deployments/compose/dev/hub.docker-compose.yaml index 709e12d..e69de29 100644 --- a/deployments/compose/dev/hub.docker-compose.yaml +++ b/deployments/compose/dev/hub.docker-compose.yaml @@ -1,24 +0,0 @@ -services: - postgres-db: - image: postgres:latest - volumes: - - pgdata:/var/lib/postgresql - environment: - POSTGRES_DB: "${DB_NAME}" - POSTGRES_USER: "${DB_USER}" - POSTGRES_PASSWORD: "${DB_PASS}" - networks: - - homeops-dev - ports: - - "${DB_PORT}:5432" - healthcheck: - test: ["CMD-SHELL", "pg_isready -U ${DB_USER}"] - interval: 5s - retries: 5 - restart: always - -volumes: - pgdata: - -networks: - homeops-dev: \ No newline at end of file diff --git a/go.mod b/go.mod index 292ad1a..041d5c0 100644 --- a/go.mod +++ b/go.mod @@ -5,12 +5,15 @@ go 1.26.1 require ( github.com/docker/docker v28.5.2+incompatible github.com/golang-migrate/migrate/v4 v4.19.1 + github.com/google/uuid v1.6.0 github.com/ilyakaznacheev/cleanenv v1.5.0 github.com/mattn/go-sqlite3 v1.14.44 github.com/rs/zerolog v1.35.1 + github.com/stretchr/testify v1.11.1 google.golang.org/grpc v1.81.0 google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v3 v3.0.1 + gotest.tools/v3 v3.5.2 ) require ( @@ -20,13 +23,14 @@ require ( github.com/containerd/errdefs v1.0.0 // indirect github.com/containerd/errdefs/pkg v0.3.0 // indirect github.com/containerd/log v0.1.0 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/distribution/reference v0.6.0 // indirect github.com/docker/go-connections v0.7.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/google/uuid v1.6.0 // indirect + github.com/google/go-cmp v0.7.0 // indirect github.com/joho/godotenv v1.5.1 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/lib/pq v1.12.3 // indirect @@ -39,6 +43,7 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.1 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0 // indirect @@ -54,7 +59,6 @@ require ( golang.org/x/time v0.15.0 // indirect golang.org/x/tools v0.43.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260504160031-60b97b32f348 // indirect - gotest.tools/v3 v3.5.2 // indirect lukechampine.com/uint128 v1.2.0 // indirect modernc.org/cc/v3 v3.36.3 // indirect modernc.org/ccgo/v3 v3.16.9 // indirect diff --git a/hub/internal/app/app.go b/hub/internal/app/app.go index 0c90402..4f7eb4d 100644 --- a/hub/internal/app/app.go +++ b/hub/internal/app/app.go @@ -5,12 +5,15 @@ import ( "fmt" standartlog "log" "net" + "os" hubdir "github.com/lorsanstand/HomeOps-Hub/hub/internal" "github.com/lorsanstand/HomeOps-Hub/hub/internal/migrator" grpcserv "github.com/lorsanstand/HomeOps-Hub/hub/internal/rpc" + "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager" "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/hub_service" "github.com/lorsanstand/HomeOps-Hub/hub/internal/store" + "github.com/lorsanstand/HomeOps-Hub/hub/internal/utils/notifier" "github.com/lorsanstand/HomeOps-Hub/shared/config" "github.com/lorsanstand/HomeOps-Hub/shared/log" _ "github.com/mattn/go-sqlite3" @@ -39,7 +42,7 @@ func (a *App) Run() { DBConn, err := sql.Open("sqlite", "database.db") if err != nil { a.log.Error().Err(err).Msg("failed to connect to the database") - return + os.Exit(1) } defer func() { @@ -48,34 +51,31 @@ func (a *App) Run() { } }() - mgrt, err := migrator.NewMigrator(hubdir.MigrationsFS, "migrations") - if err != nil { - a.log.Error().Err(err).Msg("failed to create migrator") - return - } - a.log.Info().Msg("applying database migrations") - if err = mgrt.ApplyMigrations(DBConn); err != nil { - a.log.Error().Err(err).Msg("migrations failed to apply") - return + if err := applyMigrations(DBConn); err != nil { + a.log.Error().Err(err).Msg("") + os.Exit(1) } a.log.Info().Msg("migrations applied successfully") hubStore := store.NewHubStore(DBConn) hubService := hub_service.NewHubService(hubStore, a.log) + statusNotifier := notifier.NewStatusNotifier() + connManger := connection_manager.NewConnectionManager(hubStore, statusNotifier, a.log) a.log.Info().Msg("starting hub service") - err = a.hubServe(hubService) + + err = a.hubServe(hubService, connManger) if err != nil { a.log.Error().Err(err).Msg("hub service failed to start") return } } -func (a *App) hubServe(hubService *hub_service.HubService) error { +func (a *App) hubServe(hubService *hub_service.HubService, manager *connection_manager.ConnectionManager) error { address := fmt.Sprintf("0.0.0.0:%v", a.cfg.Port) - server := grpcserv.NewHubHandler(hubService, a.log) + server := grpcserv.NewHubHandler(hubService, manager, a.log) lis, err := net.Listen("tcp", address) if err != nil { @@ -93,3 +93,14 @@ func (a *App) hubServe(hubService *hub_service.HubService) error { return nil } + +func applyMigrations(db *sql.DB) error { + mgrt, err := migrator.NewMigrator(hubdir.MigrationsFS, "migrations") + if err != nil { + return fmt.Errorf("failed to create migrator: %w", err) + } + if err = mgrt.ApplyMigrations(db); err != nil { + return fmt.Errorf("migrations failed to apply: %w", err) + } + return nil +} diff --git a/hub/internal/domain/structure.go b/hub/internal/domain/agent.go similarity index 100% rename from hub/internal/domain/structure.go rename to hub/internal/domain/agent.go diff --git a/hub/internal/domain/stream.go b/hub/internal/domain/stream.go new file mode 100644 index 0000000..8870ae4 --- /dev/null +++ b/hub/internal/domain/stream.go @@ -0,0 +1,42 @@ +package domain + +import "time" + +type AgentRequest struct { + Name string + Args map[string]string + TimeOut int +} + +type AgentResponse struct { + Success bool + Output string + Error string + ExecTimeMS int +} + +type AgentAlert struct { + Timestamp int + Level string + Title string + Description string +} + +type SystemMetrics struct { + CpuUsage float64 + MemoryUsage float64 + DiskUsage float64 +} + +type HeartbeatModel struct { + ID int + AgentID string + Timestamp time.Time + Metrics SystemMetrics +} + +type CreateHeartbeatModel struct { + AgentID string + Timestamp time.Time + Metrics SystemMetrics +} diff --git a/hub/internal/migrations/20260506182346_create_heartbeat_table.down.sql b/hub/internal/migrations/20260506182346_create_heartbeat_table.down.sql new file mode 100644 index 0000000..d8cc83d --- /dev/null +++ b/hub/internal/migrations/20260506182346_create_heartbeat_table.down.sql @@ -0,0 +1,3 @@ +DROP INDEX idx_heartbeat_agent_id; + +DROP TABLE if exists heartbeats; \ No newline at end of file diff --git a/hub/internal/migrations/20260506182346_create_heartbeat_table.up.sql b/hub/internal/migrations/20260506182346_create_heartbeat_table.up.sql new file mode 100644 index 0000000..e329974 --- /dev/null +++ b/hub/internal/migrations/20260506182346_create_heartbeat_table.up.sql @@ -0,0 +1,11 @@ +CREATE TABLE heartbeats ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + agent_id VARCHAR(32) NOT NULL, + cpu_usage FLOAT NOT NULL , + memory_usage FLOAT NOT NULL , + disk_usage FLOAT NOT NULL , + heartbeat_timestamp TIMESTAMP NOT NULL, + FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE +); + +CREATE UNIQUE INDEX idx_heartbeat_agent_id ON heartbeats (agent_id); \ No newline at end of file diff --git a/hub/internal/rpc/errors.go b/hub/internal/rpc/errors.go new file mode 100644 index 0000000..b755049 --- /dev/null +++ b/hub/internal/rpc/errors.go @@ -0,0 +1,5 @@ +package rpc + +import "fmt" + +var ErrFailedRegister = fmt.Errorf("failed register agent") diff --git a/hub/internal/rpc/server.go b/hub/internal/rpc/server.go index d9de289..286b521 100644 --- a/hub/internal/rpc/server.go +++ b/hub/internal/rpc/server.go @@ -2,8 +2,10 @@ package rpc import ( "context" + "fmt" pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" + "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager" "github.com/lorsanstand/HomeOps-Hub/shared/domain" "github.com/rs/zerolog" "google.golang.org/grpc" @@ -14,15 +16,20 @@ type HubService interface { RegisterAgent(ctx context.Context, data domain.RegisterAgentRequest) (domain.RegisterAgentResponse, error) } -type HubHandler struct { - pb.UnimplementedHubServer - log zerolog.Logger - GrpcServer *grpc.Server - hub HubService +type ConnectionManager interface { + NewConnection(stream connection_manager.StreamConn) error } -func NewHubHandler(HubServ HubService, logger zerolog.Logger) *HubHandler { - hub := &HubHandler{log: logger, hub: HubServ} +type HubHandler struct { + pb.UnimplementedHubServer + log zerolog.Logger + GrpcServer *grpc.Server + hub HubService + streamManager ConnectionManager +} + +func NewHubHandler(HubServ HubService, manager ConnectionManager, logger zerolog.Logger) *HubHandler { + hub := &HubHandler{log: logger, hub: HubServ, streamManager: manager} grpcServer := grpc.NewServer() pb.RegisterHubServer(grpcServer, hub) @@ -39,12 +46,24 @@ func (h *HubHandler) Ping(ctx context.Context, _ *emptypb.Empty) (*pb.PongRespon func (h *HubHandler) RegisterAgent(ctx context.Context, request *pb.RegisterAgentRequest) (*pb.RegisterAgentResponse, error) { h.log.Debug().Str("agentID", request.AgentId).Str("agentName", request.AgentName).Msg("register agent request received") - data := domain.ToDomainAgentRequest(request) + data, err := domain.ToDomainAgentRequest(request) + if err != nil { + h.log.Error().Err(err).Msg("failed to casting request") + return &pb.RegisterAgentResponse{}, ErrFailedRegister + } resp, err := h.hub.RegisterAgent(ctx, data) if err != nil { - h.log.Error().Err(err).Str("agentID", request.AgentId).Msg("register agent request failed") - return domain.ToGRPCAgentResponse(resp), err + h.log.Error().Err(err).Msg("failed register agent") + return domain.ToGRPCAgentResponse(resp), ErrFailedRegister } h.log.Info().Str("agentID", resp.AgentID).Msg("register agent request completed") return domain.ToGRPCAgentResponse(resp), nil } + +func (h *HubHandler) StreamConnection(stream grpc.BidiStreamingServer[pb.AgentEvent, pb.ServerCommandRequest]) error { + err := h.streamManager.NewConnection(stream) + if err != nil { + return fmt.Errorf("accept connection: %w", err) + } + return nil +} diff --git a/hub/internal/service/connection_manager/agent.go b/hub/internal/service/connection_manager/agent.go new file mode 100644 index 0000000..a391b98 --- /dev/null +++ b/hub/internal/service/connection_manager/agent.go @@ -0,0 +1,157 @@ +package connection_manager + +import ( + "context" + "fmt" + "io" + "time" + + "github.com/google/uuid" + pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" + domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" + "github.com/rs/zerolog" +) + +type AgentConnection struct { + stream StreamConn + heartbeat heartbeatStore + log zerolog.Logger + status StatusAgent + AgentID string + response *ResponseStore + ctx context.Context + cancel context.CancelFunc + heartbeatTimeoutMS int +} + +func newAgentConnection(agentID string, stream StreamConn, heartbeat heartbeatStore, status StatusAgent, heartbeatTimeoutMS int, logger zerolog.Logger) *AgentConnection { + response := NewResponseStore() + logger = logger.With().Str("agentID", agentID).Logger() + ctx, cancel := context.WithCancel(stream.Context()) + return &AgentConnection{stream: stream, response: response, heartbeat: heartbeat, log: logger, AgentID: agentID, status: status, ctx: ctx, cancel: cancel, heartbeatTimeoutMS: heartbeatTimeoutMS} +} + +func (a *AgentConnection) Listen() error { + heartbeatsCh := make(chan domainHub.CreateHeartbeatModel, 5) + streamRecvCh := make(chan *pb.AgentEvent, 5) + + go a.listenHeartbeat(heartbeatsCh) + go a.listenStream(streamRecvCh) + + defer func() { + a.status.Offline() + close(heartbeatsCh) + a.Close() + }() + + for { + select { + case <-a.ctx.Done(): + return a.ctx.Err() + case msg, ok := <-streamRecvCh: + if !ok { + return nil + } + + switch x := msg.Event.(type) { + case *pb.AgentEvent_Heartbeat: + heartbeat := toCreateHeartbeatModel(a.AgentID, x) + heartbeatsCh <- heartbeat + case *pb.AgentEvent_CommandResponse: + ch, ok := a.response.Read(x.CommandResponse.RequestId) + if !ok { + a.log.Warn().Str("requestID", x.CommandResponse.RequestId).Msg("not found channel for send response") + continue + } + response := toAgentResponse(x) + ch <- response + } + default: + } + } +} + +func (a *AgentConnection) listenStream(ch chan *pb.AgentEvent) { + defer close(ch) + for { + agentEvent, err := a.stream.Recv() + if err == io.EOF { + return + } + if err != nil { + a.log.Warn().Err(err).Msg("close stream") + return + } + + ch <- agentEvent + } +} + +func (a *AgentConnection) listenHeartbeat(heartbeats <-chan domainHub.CreateHeartbeatModel) { + lastHeartbeat := 0 + timer := time.NewTicker(time.Duration(a.heartbeatTimeoutMS) * time.Millisecond) + defer timer.Stop() + + for { + select { + case <-timer.C: + if lastHeartbeat < 4 { + lastHeartbeat += 1 + a.status.Offline() + continue + } + + a.log.Warn().Msg("agent not send heartbeat") + a.Close() + return + case heartbeat, ok := <-heartbeats: + if !ok { + return + } + a.log.Debug(). + Float64("cpu usage", heartbeat.Metrics.CpuUsage). + Float64("disk usage", heartbeat.Metrics.DiskUsage). + Float64("memory usage", heartbeat.Metrics.MemoryUsage).Msg("") + + a.status.Online() + lastHeartbeat = 0 + + err := a.heartbeat.CreateHeartbeat(a.ctx, heartbeat) + if err != nil { + a.log.Error().Err(err).Msg("failed to write heartbeat") + } + case <-a.ctx.Done(): + return + } + } +} + +func (a *AgentConnection) Execute(ctx context.Context, request domainHub.AgentRequest) (domainHub.AgentResponse, error) { + requestID := uuid.New().String() + ch := make(chan domainHub.AgentResponse, 1) + defer close(ch) + + a.response.Write(requestID, ch) + defer a.response.Delete(requestID) + + err := a.stream.Send(new(toGRPCCommandRequest(requestID, request))) + if err != nil { + return domainHub.AgentResponse{}, fmt.Errorf("execute command: %w", err) + } + + a.log.Info().Str("requestID", requestID).Str("command", request.Name).Msg("send command") + + select { + case <-a.ctx.Done(): + return domainHub.AgentResponse{}, ErrConnectionClose + case <-ctx.Done(): + return domainHub.AgentResponse{}, ctx.Err() + case response := <-ch: + a.log.Info().Str("requestID", requestID).Msg("received response") + return response, nil + } +} + +func (a *AgentConnection) Close() { + a.cancel() +} diff --git a/hub/internal/service/connection_manager/agent_test.go b/hub/internal/service/connection_manager/agent_test.go new file mode 100644 index 0000000..484ab85 --- /dev/null +++ b/hub/internal/service/connection_manager/agent_test.go @@ -0,0 +1,324 @@ +package connection_manager + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" + domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" + "github.com/rs/zerolog" + "gotest.tools/v3/assert" +) + +type agentTestHarness struct { + ctx context.Context + cancel context.CancelFunc + stream *streamMock + heartbeat *heartBeatMock + status *statusMock + agent *AgentConnection + recvCh chan *pb.AgentEvent + sendCh chan *pb.ServerCommandRequest +} + +func newAgentTestHarness(t *testing.T, heartbeatTimeoutMS int) *agentTestHarness { + t.Helper() + sendStream := make(chan *pb.ServerCommandRequest, 4) + recvStream := make(chan *pb.AgentEvent, 4) + ctx, cancel := context.WithCancel(context.Background()) + + stream := &streamMock{recvCh: recvStream, sendCh: sendStream, ctx: ctx} + heartbeat := &heartBeatMock{doneCh: make(chan struct{}, 2)} + status := &statusMock{doneCh: make(chan struct{}, 2)} + + agent := newAgentConnection("123", stream, heartbeat, status, heartbeatTimeoutMS, zerolog.New(nil)) + + t.Cleanup(func() { + cancel() + }) + + return &agentTestHarness{ + ctx: ctx, cancel: cancel, stream: stream, heartbeat: heartbeat, status: status, + agent: agent, recvCh: recvStream, sendCh: sendStream, + } +} + +func waitFor(t *testing.T, ch <-chan struct{}, timeout time.Duration, message string) { + t.Helper() + select { + case <-ch: + case <-time.After(timeout): + t.Fatal(message) + } +} + +func commandResponseEvent(requestID, output string) *pb.AgentEvent { + return &pb.AgentEvent{ + AgentId: "agent-1", + Event: &pb.AgentEvent_CommandResponse{ + CommandResponse: &pb.CommandResponse{ + RequestId: requestID, + Success: true, + Output: output, + }, + }, + } +} + +func TestAgentConnection_Heartbeat(t *testing.T) { + h := newAgentTestHarness(t, 5000) + done := make(chan struct{}) + go func() { + _ = h.agent.Listen() + close(done) + }() + + h.recvCh <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_Heartbeat{ + Heartbeat: &pb.Heartbeat{ + Timestamp: time.Now().Unix(), + Metrics: &pb.SystemMetrics{CpuUsage: 0.5, MemoryUsage: 0.3, DiskUsage: 0.7}, + }}} + + waitFor(t, h.heartbeat.doneCh, 500*time.Millisecond, "timeout waiting for heartbeat") + waitFor(t, h.status.doneCh, 500*time.Millisecond, "timeout waiting for status online") + + h.heartbeat.mu.Lock() + count := h.heartbeat.countUse + h.heartbeat.mu.Unlock() + + assert.Equal(t, count, 1) + assert.Equal(t, h.status.IsOnline(), true) + + h.cancel() + waitFor(t, done, 500*time.Millisecond, "timeout waiting for listen stop") + assert.Equal(t, h.status.IsOnline(), false) +} + +func TestAgentConnection_Execute(t *testing.T) { + h := newAgentTestHarness(t, 5000) + go h.agent.Listen() + + // Данные для проверки + requestID := make(chan domainHub.AgentResponse) + output := "test output" + name := "test name" + + go func() { + response, _ := h.agent.Execute(h.ctx, domainHub.AgentRequest{ + Name: name, + Args: nil, + TimeOut: 0, + }) + + requestID <- response + }() + + request := <-h.sendCh + assert.Equal(t, name, request.Name) + + h.recvCh <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_CommandResponse{ + CommandResponse: &pb.CommandResponse{ + RequestId: request.RequestId, + Success: true, + Output: output, + }}} + + select { + case response := <-requestID: + assert.Equal(t, output, response.Output) + + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for response") + } +} + +func TestAgentConnection_HeartbeatTimeout(t *testing.T) { + h := newAgentTestHarness(t, 200) + listenDone := make(chan error, 1) + execDone := make(chan error, 1) + + go func() { + listenDone <- h.agent.Listen() + }() + + go func() { + _, err := h.agent.Execute(h.ctx, domainHub.AgentRequest{ + Name: "test", + Args: nil, + TimeOut: 0, + }) + execDone <- err + }() + + timeout := time.After(2 * time.Second) + gotListen := false + gotExec := false + for !(gotListen && gotExec) { + select { + case err := <-listenDone: + assert.ErrorIs(t, err, context.Canceled) + gotListen = true + case err := <-execDone: + assert.ErrorIs(t, err, ErrConnectionClose) + gotExec = true + case <-timeout: + h.cancel() + t.Fatal("timeout waiting for heartbeat timeout") + } + } +} + +func TestAgentConnection_ConnectionClose(t *testing.T) { + h := newAgentTestHarness(t, 5000) + var wg sync.WaitGroup + + wg.Add(2) + go func() { + err := h.agent.Listen() + assert.ErrorIs(t, err, context.Canceled) + wg.Done() + }() + + go func() { + _, err := h.agent.Execute(context.Background(), domainHub.AgentRequest{ + Name: "test", + Args: nil, + TimeOut: 0, + }) + assert.ErrorIs(t, err, ErrConnectionClose) + wg.Done() + }() + + h.cancel() + + wg.Wait() +} + +func TestAgentConnection_ExecuteClose(t *testing.T) { + h := newAgentTestHarness(t, 5000) + ctxExecute, cancelExecute := context.WithCancel(context.Background()) + t.Cleanup(cancelExecute) + + executeCh := make(chan struct{}) + go h.agent.Listen() + + go func() { + _, err := h.agent.Execute(ctxExecute, domainHub.AgentRequest{ + Name: "test", + Args: nil, + TimeOut: 0, + }) + + assert.ErrorIs(t, err, context.Canceled) + executeCh <- struct{}{} + }() + + cancelExecute() + waitFor(t, executeCh, 500*time.Millisecond, "timeout waiting for execute close") +} + +func TestAgentConnection_ListenEOF(t *testing.T) { + h := newAgentTestHarness(t, 5000) + h.stream.CloseRecv() + + err := h.agent.Listen() + assert.NilError(t, err) +} + +func TestAgentConnection_ExecuteSendError(t *testing.T) { + h := newAgentTestHarness(t, 5000) + h.stream.mu.Lock() + h.stream.sendErr = errors.New("send failure") + h.stream.mu.Unlock() + + _, err := h.agent.Execute(h.ctx, domainHub.AgentRequest{Name: "test"}) + assert.ErrorContains(t, err, "execute command") +} + +func TestAgentConnection_ExecuteContextCanceled(t *testing.T) { + h := newAgentTestHarness(t, 5000) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + _, err := h.agent.Execute(ctx, domainHub.AgentRequest{Name: "test"}) + assert.ErrorIs(t, err, context.Canceled) +} + +func TestAgentConnection_ExecuteConnectionCanceled(t *testing.T) { + h := newAgentTestHarness(t, 5000) + h.cancel() + + _, err := h.agent.Execute(context.Background(), domainHub.AgentRequest{Name: "test"}) + assert.ErrorIs(t, err, ErrConnectionClose) +} + +func TestAgentConnection_UnknownResponseID(t *testing.T) { + h := newAgentTestHarness(t, 5000) + done := make(chan struct{}) + go func() { + _ = h.agent.Listen() + close(done) + }() + + h.recvCh <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_CommandResponse{ + CommandResponse: &pb.CommandResponse{ + RequestId: "unknown", + Success: true, + Output: "ok", + }}} + + h.cancel() + waitFor(t, done, 500*time.Millisecond, "timeout waiting for listen stop") +} + +func TestAgentConnection_HeartbeatErrorDoesNotStop(t *testing.T) { + h := newAgentTestHarness(t, 5000) + h.heartbeat.mu.Lock() + h.heartbeat.err = errors.New("db error") + h.heartbeat.mu.Unlock() + + go h.agent.Listen() + h.recvCh <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_Heartbeat{ + Heartbeat: &pb.Heartbeat{ + Timestamp: time.Now().Unix(), + Metrics: &pb.SystemMetrics{CpuUsage: 0.2, MemoryUsage: 0.1, DiskUsage: 0.3}, + }}} + + waitFor(t, h.heartbeat.doneCh, 500*time.Millisecond, "timeout waiting for heartbeat") + h.cancel() +} + +func TestAgentConnection_ConcurrentExecute(t *testing.T) { + h := newAgentTestHarness(t, 5000) + go h.agent.Listen() + + responses := make(chan domainHub.AgentResponse, 2) + + go func() { + resp, _ := h.agent.Execute(h.ctx, domainHub.AgentRequest{Name: "cmd-1"}) + responses <- resp + }() + go func() { + resp, _ := h.agent.Execute(h.ctx, domainHub.AgentRequest{Name: "cmd-2"}) + responses <- resp + }() + + first := <-h.sendCh + second := <-h.sendCh + + // ответы приходят в обратном порядке + h.recvCh <- commandResponseEvent(second.RequestId, "second") + h.recvCh <- commandResponseEvent(first.RequestId, "first") + + resp1 := <-responses + resp2 := <-responses + + assert.Assert(t, resp1.Output == "first" || resp1.Output == "second") + assert.Assert(t, resp2.Output == "first" || resp2.Output == "second") + assert.Assert(t, resp1.Output != resp2.Output) + + h.cancel() +} diff --git a/hub/internal/service/connection_manager/errors.go b/hub/internal/service/connection_manager/errors.go new file mode 100644 index 0000000..0d8ef9a --- /dev/null +++ b/hub/internal/service/connection_manager/errors.go @@ -0,0 +1,7 @@ +package connection_manager + +import "errors" + +var ErrConnectionClose = errors.New("connection close") + +var ErrNotFoundConn = errors.New("agent connection not found") diff --git a/hub/internal/service/connection_manager/interface.go b/hub/internal/service/connection_manager/interface.go new file mode 100644 index 0000000..a6844b9 --- /dev/null +++ b/hub/internal/service/connection_manager/interface.go @@ -0,0 +1,27 @@ +package connection_manager + +import ( + "context" + + pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" + domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" +) + +type StreamConn interface { + Send(request *pb.ServerCommandRequest) error + Recv() (*pb.AgentEvent, error) + Context() context.Context +} + +type heartbeatStore interface { + CreateHeartbeat(ctx context.Context, heartbeat domainHub.CreateHeartbeatModel) error +} + +type StatusAgent interface { + Offline() + Online() +} + +type statusNotifier interface { + New(agentID string) StatusAgent +} diff --git a/hub/internal/service/connection_manager/manager.go b/hub/internal/service/connection_manager/manager.go new file mode 100644 index 0000000..13acc81 --- /dev/null +++ b/hub/internal/service/connection_manager/manager.go @@ -0,0 +1,69 @@ +package connection_manager + +import ( + "context" + "fmt" + + "github.com/rs/zerolog" + "google.golang.org/grpc/metadata" +) + +const heartbeatTimeoutMS = 6000 + +type ConnectionManager struct { + heartbeat heartbeatStore + log zerolog.Logger + status statusNotifier + agentConnStore *AgentConnStore +} + +func NewConnectionManager(heartbeat heartbeatStore, status statusNotifier, logger zerolog.Logger) *ConnectionManager { + return &ConnectionManager{heartbeat: heartbeat, log: logger, status: status, agentConnStore: NewAgentConnStore()} +} + +func (c *ConnectionManager) NewConnection(stream StreamConn) error { + AgentID, err := agentIDFromMetadata(stream.Context()) + if err != nil { + c.log.Error().Err(err).Msg("missing agent id in metadata") + return fmt.Errorf("get agent id: %w", err) + } + + c.log.Info().Str("agentID", AgentID).Msg("connection accepted") + + status := c.status.New(AgentID) + + agent := newAgentConnection(AgentID, stream, c.heartbeat, status, heartbeatTimeoutMS, c.log) + c.agentConnStore.Add(AgentID, agent) + defer c.agentConnStore.Delete(AgentID) + + c.log.Debug().Str("agentID", AgentID).Msg("start listening") + + return agent.Listen() +} + +func (c *ConnectionManager) GetConnection(AgentID string) (*AgentConnection, error) { + agent := c.agentConnStore.Get(AgentID) + if agent == nil { + return nil, ErrNotFoundConn + } + + return agent, nil +} + +func (c *ConnectionManager) GetAllAgentID() []string { + return c.agentConnStore.GetAllAgentID() +} + +func agentIDFromMetadata(ctx context.Context) (string, error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return "", fmt.Errorf("metadata not found") + } + + values := md.Get("agent-id") + if len(values) == 0 || values[0] == "" { + return "", fmt.Errorf("agent-id not found") + } + + return values[0], nil +} diff --git a/hub/internal/service/connection_manager/manager_test.go b/hub/internal/service/connection_manager/manager_test.go new file mode 100644 index 0000000..61a16c6 --- /dev/null +++ b/hub/internal/service/connection_manager/manager_test.go @@ -0,0 +1,116 @@ +package connection_manager + +import ( + "context" + "testing" + "time" + + pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/metadata" + "gotest.tools/v3/assert" +) + +type connectionManagerTestHarness struct { + heartbeat *heartBeatMock + status *statusNotifierMock + manager *ConnectionManager +} + +func newConnectionManagerTestHarness(t *testing.T) *connectionManagerTestHarness { + t.Helper() + + heartbeat := &heartBeatMock{doneCh: make(chan struct{}, 2)} + status := &statusNotifierMock{agentIDCh: make(chan string, 1)} + + manager := NewConnectionManager(heartbeat, status, zerolog.New(nil)) + + return &connectionManagerTestHarness{manager: manager, status: status, heartbeat: heartbeat} +} + +func newMetadataAgentID(t *testing.T, agentID string) metadata.MD { + t.Helper() + + return metadata.New(map[string]string{"agent-id": agentID}) +} + +func TestNewConnectionManager_NewConnection(t *testing.T) { + h := newConnectionManagerTestHarness(t) + agentID := "123" + + ctx := metadata.NewIncomingContext(context.Background(), newMetadataAgentID(t, agentID)) + + stream := streamMock{ctx: ctx, + recvCh: make(chan *pb.AgentEvent, 1), + sendCh: make(chan *pb.ServerCommandRequest, 1), + } + + go func() { + _ = h.manager.NewConnection(&stream) + }() + + select { + case ID := <-h.status.agentIDCh: + require.Equal(t, agentID, ID) + case <-time.After(200 * time.Millisecond): + t.Fatalf("get agent id for notifier") + } + + agentIDs := h.manager.GetAllAgentID() + assert.Equal(t, agentID, agentIDs[0]) + + agent, err := h.manager.GetConnection(agentID) + assert.NilError(t, err) + require.NotNil(t, agent) + +} + +func TestNewConnectionManager_NewConnectionNotAgentID(t *testing.T) { + h := newConnectionManagerTestHarness(t) + + stream := streamMock{ctx: context.Background(), + recvCh: make(chan *pb.AgentEvent, 1), + sendCh: make(chan *pb.ServerCommandRequest, 1), + } + + wait := make(chan struct{}) + + go func() { + err := h.manager.NewConnection(&stream) + assert.ErrorContains(t, err, "get agent id") + wait <- struct{}{} + }() + + waitFor(t, wait, 5000, "timeout new connection") +} + +func TestNewConnectionManager_AgentNotFound(t *testing.T) { + h := newConnectionManagerTestHarness(t) + _, err := h.manager.GetConnection("123") + assert.ErrorIs(t, ErrNotFoundConn, err) + + agentIDs := h.manager.GetAllAgentID() + assert.Equal(t, len(agentIDs), 0) +} + +func Test_agentIDFromMetadata(t *testing.T) { + agentID := "123" + + ctx := metadata.NewIncomingContext(context.Background(), newMetadataAgentID(t, agentID)) + id, err := agentIDFromMetadata(ctx) + assert.NilError(t, err) + assert.Equal(t, id, agentID) +} + +func Test_agentIDFromMetadata_MetadataNotFound(t *testing.T) { + ctx := context.Background() + _, err := agentIDFromMetadata(ctx) + assert.ErrorContains(t, err, "metadata not found") +} + +func Test_agentIDFromMetadata_AgentIDNotFound(t *testing.T) { + ctx := metadata.NewIncomingContext(context.Background(), newMetadataAgentID(t, "")) + _, err := agentIDFromMetadata(ctx) + assert.ErrorContains(t, err, "agent-id not found") +} diff --git a/hub/internal/service/connection_manager/mapper.go b/hub/internal/service/connection_manager/mapper.go new file mode 100644 index 0000000..68fa0ac --- /dev/null +++ b/hub/internal/service/connection_manager/mapper.go @@ -0,0 +1,40 @@ +package connection_manager + +import ( + "time" + + pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" + domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" +) + +func toCreateHeartbeatModel(agentID string, heartbeat *pb.AgentEvent_Heartbeat) domainHub.CreateHeartbeatModel { + timestamp := time.Unix(heartbeat.Heartbeat.Timestamp, 0) + + return domainHub.CreateHeartbeatModel{ + AgentID: agentID, + Timestamp: timestamp, + Metrics: domainHub.SystemMetrics{ + MemoryUsage: float64(heartbeat.Heartbeat.Metrics.MemoryUsage), + CpuUsage: float64(heartbeat.Heartbeat.Metrics.CpuUsage), + DiskUsage: float64(heartbeat.Heartbeat.Metrics.DiskUsage), + }, + } +} + +func toGRPCCommandRequest(requestID string, request domainHub.AgentRequest) pb.ServerCommandRequest { + return pb.ServerCommandRequest{ + RequestId: requestID, + Name: request.Name, + TimeoutSeconds: int64(request.TimeOut), + Args: request.Args, + } +} + +func toAgentResponse(response *pb.AgentEvent_CommandResponse) domainHub.AgentResponse { + return domainHub.AgentResponse{ + Success: response.CommandResponse.Success, + Error: response.CommandResponse.Error, + Output: response.CommandResponse.Output, + ExecTimeMS: int(response.CommandResponse.ExecTimeMs), + } +} diff --git a/hub/internal/service/connection_manager/mock_test.go b/hub/internal/service/connection_manager/mock_test.go new file mode 100644 index 0000000..25237ad --- /dev/null +++ b/hub/internal/service/connection_manager/mock_test.go @@ -0,0 +1,119 @@ +package connection_manager + +import ( + "context" + "io" + "sync" + + pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" + domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" +) + +type streamMock struct { + recvCh chan *pb.AgentEvent + sendCh chan *pb.ServerCommandRequest + ctx context.Context + mu sync.Mutex + sendErr error + recvErr error + closeOnce sync.Once +} + +func (f *streamMock) Context() context.Context { + return f.ctx +} + +func (f *streamMock) Send(request *pb.ServerCommandRequest) error { + f.mu.Lock() + err := f.sendErr + f.mu.Unlock() + if err != nil { + return err + } + f.sendCh <- request + return nil +} + +func (f *streamMock) Recv() (*pb.AgentEvent, error) { + f.mu.Lock() + recvErr := f.recvErr + f.mu.Unlock() + if recvErr != nil { + return nil, recvErr + } + select { + case msg, ok := <-f.recvCh: + if !ok { + return nil, io.EOF + } + return msg, nil + case <-f.ctx.Done(): + return nil, f.ctx.Err() + } +} + +func (f *streamMock) CloseRecv() { + f.closeOnce.Do(func() { + close(f.recvCh) + }) +} + +type heartBeatMock struct { + mu sync.Mutex + countUse int + doneCh chan struct{} + err error +} + +func (h *heartBeatMock) CreateHeartbeat(ctx context.Context, heartbeat domainHub.CreateHeartbeatModel) error { + h.mu.Lock() + h.countUse += 1 + err := h.err + h.mu.Unlock() + select { + case h.doneCh <- struct{}{}: + default: + } + return err +} + +type statusMock struct { + mu sync.Mutex + online bool + doneCh chan struct{} +} + +func (s *statusMock) Offline() { + s.mu.Lock() + s.online = false + s.mu.Unlock() +} + +func (s *statusMock) Online() { + s.mu.Lock() + s.online = true + s.mu.Unlock() + select { + case s.doneCh <- struct{}{}: + default: + } +} + +func (s *statusMock) IsOnline() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.online +} + +type statusNotifierMock struct { + agentIDCh chan string +} + +func (s *statusNotifierMock) New(AgentID string) StatusAgent { + select { + case s.agentIDCh <- AgentID: + default: + + } + return &statusMock{doneCh: make(chan struct{}, 2)} +} diff --git a/hub/internal/service/connection_manager/store.go b/hub/internal/service/connection_manager/store.go new file mode 100644 index 0000000..3e5ffa8 --- /dev/null +++ b/hub/internal/service/connection_manager/store.go @@ -0,0 +1,82 @@ +package connection_manager + +import ( + "sync" + + domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" +) + +type AgentConnStore struct { + mutex sync.RWMutex + store map[string]*AgentConnection +} + +func NewAgentConnStore() *AgentConnStore { + return &AgentConnStore{store: make(map[string]*AgentConnection)} +} + +func (a *AgentConnStore) Get(agentID string) *AgentConnection { + a.mutex.RLock() + defer a.mutex.RUnlock() + return a.store[agentID] +} + +func (a *AgentConnStore) Add(agentID string, agentConn *AgentConnection) { + a.mutex.Lock() + defer a.mutex.Unlock() + a.store[agentID] = agentConn +} + +func (a *AgentConnStore) Delete(agentID string) { + a.mutex.Lock() + defer a.mutex.Unlock() + delete(a.store, agentID) +} + +func (a *AgentConnStore) Pop(agentID string) *AgentConnection { + a.mutex.Lock() + defer a.mutex.Unlock() + agent := a.store[agentID] + delete(a.store, agentID) + return agent +} + +func (a *AgentConnStore) GetAllAgentID() []string { + a.mutex.RLock() + defer a.mutex.RUnlock() + + var IDs []string + for ID := range a.store { + IDs = append(IDs, ID) + } + return IDs +} + +type ResponseStore struct { + store map[string]chan domainHub.AgentResponse + mutex sync.RWMutex +} + +func NewResponseStore() *ResponseStore { + data := make(map[string]chan domainHub.AgentResponse) + return &ResponseStore{store: data} +} + +func (r *ResponseStore) Write(responseID string, channel chan domainHub.AgentResponse) { + r.mutex.Lock() + defer r.mutex.Unlock() + r.store[responseID] = channel +} + +func (r *ResponseStore) Read(responseID string) (chan domainHub.AgentResponse, bool) { + r.mutex.RLock() + defer r.mutex.RUnlock() + ch, ok := r.store[responseID] + return ch, ok +} + +func (r *ResponseStore) Delete(responseID string) { + r.mutex.Lock() + defer r.mutex.Unlock() + delete(r.store, responseID) +} diff --git a/hub/internal/store/mapper.go b/hub/internal/store/mapper.go index 6fc40b7..e78c9e4 100644 --- a/hub/internal/store/mapper.go +++ b/hub/internal/store/mapper.go @@ -76,3 +76,27 @@ func toDomainCapabilities(caps []byte) []domain.Capability { } return capabilities } + +func toDBHeartbeat(heartbeat domainHub.CreateHeartbeatModel) gen2.InsertHeartbeatParams { + + return gen2.InsertHeartbeatParams{ + AgentID: heartbeat.AgentID, + HeartbeatTimestamp: heartbeat.Timestamp, + CpuUsage: heartbeat.Metrics.CpuUsage, + DiskUsage: heartbeat.Metrics.DiskUsage, + MemoryUsage: heartbeat.Metrics.MemoryUsage, + } +} + +func toHeartBeatModel(heartbeat gen2.Heartbeat) domainHub.HeartbeatModel { + return domainHub.HeartbeatModel{ + Timestamp: heartbeat.HeartbeatTimestamp, + AgentID: heartbeat.AgentID, + ID: int(heartbeat.ID), + Metrics: domainHub.SystemMetrics{ + CpuUsage: heartbeat.CpuUsage, + DiskUsage: heartbeat.DiskUsage, + MemoryUsage: heartbeat.MemoryUsage, + }, + } +} diff --git a/hub/internal/store/sqlc/gen/agent.sql.go b/hub/internal/store/sqlc/gen/agent.sql.go index 704434b..636304f 100644 --- a/hub/internal/store/sqlc/gen/agent.sql.go +++ b/hub/internal/store/sqlc/gen/agent.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.31.1 // source: agent.sql package gen diff --git a/hub/internal/store/sqlc/gen/db.go b/hub/internal/store/sqlc/gen/db.go index d577e39..b6fcf6b 100644 --- a/hub/internal/store/sqlc/gen/db.go +++ b/hub/internal/store/sqlc/gen/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.31.1 package gen diff --git a/hub/internal/store/sqlc/gen/heartbeat.sql.go b/hub/internal/store/sqlc/gen/heartbeat.sql.go new file mode 100644 index 0000000..b98736f --- /dev/null +++ b/hub/internal/store/sqlc/gen/heartbeat.sql.go @@ -0,0 +1,75 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.31.1 +// source: heartbeat.sql + +package gen + +import ( + "context" + "time" +) + +const insertHeartbeat = `-- name: InsertHeartbeat :exec +INSERT INTO heartbeats (agent_id, cpu_usage, memory_usage, disk_usage, heartbeat_timestamp) +VALUES (?1, ?2, ?3, ?4, ?5) +` + +type InsertHeartbeatParams struct { + AgentID string + CpuUsage float64 + MemoryUsage float64 + DiskUsage float64 + HeartbeatTimestamp time.Time +} + +func (q *Queries) InsertHeartbeat(ctx context.Context, arg InsertHeartbeatParams) error { + _, err := q.db.ExecContext(ctx, insertHeartbeat, + arg.AgentID, + arg.CpuUsage, + arg.MemoryUsage, + arg.DiskUsage, + arg.HeartbeatTimestamp, + ) + return err +} + +const selectHeartbeatsAfter = `-- name: SelectHeartbeatsAfter :many +SELECT id, agent_id, cpu_usage, memory_usage, disk_usage, heartbeat_timestamp FROM heartbeats +WHERE agent_id = ?1 AND heartbeat_timestamp > ?2 +` + +type SelectHeartbeatsAfterParams struct { + AgentID string + Timestamp time.Time +} + +func (q *Queries) SelectHeartbeatsAfter(ctx context.Context, arg SelectHeartbeatsAfterParams) ([]Heartbeat, error) { + rows, err := q.db.QueryContext(ctx, selectHeartbeatsAfter, arg.AgentID, arg.Timestamp) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Heartbeat + for rows.Next() { + var i Heartbeat + if err := rows.Scan( + &i.ID, + &i.AgentID, + &i.CpuUsage, + &i.MemoryUsage, + &i.DiskUsage, + &i.HeartbeatTimestamp, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} diff --git a/hub/internal/store/sqlc/gen/models.go b/hub/internal/store/sqlc/gen/models.go index b40fd8b..19c5aef 100644 --- a/hub/internal/store/sqlc/gen/models.go +++ b/hub/internal/store/sqlc/gen/models.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.31.1 package gen @@ -19,3 +19,12 @@ type Agent struct { Capabilities *string RegisteredAt time.Time } + +type Heartbeat struct { + ID int64 + AgentID string + CpuUsage float64 + MemoryUsage float64 + DiskUsage float64 + HeartbeatTimestamp time.Time +} diff --git a/hub/internal/store/sqlc/queries/heartbeat.sql b/hub/internal/store/sqlc/queries/heartbeat.sql new file mode 100644 index 0000000..936bae2 --- /dev/null +++ b/hub/internal/store/sqlc/queries/heartbeat.sql @@ -0,0 +1,7 @@ +-- name: InsertHeartbeat :exec +INSERT INTO heartbeats (agent_id, cpu_usage, memory_usage, disk_usage, heartbeat_timestamp) +VALUES (:agent_id, :cpu_usage, :memory_usage, :disk_usage, :heartbeat_timestamp); + +-- name: SelectHeartbeatsAfter :many +SELECT * FROM heartbeats +WHERE agent_id = :agent_id AND heartbeat_timestamp > :timestamp; \ No newline at end of file diff --git a/hub/internal/store/store.go b/hub/internal/store/store.go index 3cfa60b..9d9fd94 100644 --- a/hub/internal/store/store.go +++ b/hub/internal/store/store.go @@ -3,6 +3,7 @@ package store import ( "context" "database/sql" + "time" domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" "github.com/lorsanstand/HomeOps-Hub/hub/internal/store/sqlc/gen" @@ -21,8 +22,8 @@ func (h *HubStore) NewAgent(ctx context.Context, agent domainHub.CreateAgentMode return h.queries.CreateAgent(ctx, toDBAgent(agent)) } -func (h *HubStore) GetAgentByAgentID(ctx context.Context, AgentID string) (domainHub.AgentModel, error) { - data, err := h.queries.GetAgentByAgentID(ctx, AgentID) +func (h *HubStore) GetAgentByAgentID(ctx context.Context, agentID string) (domainHub.AgentModel, error) { + data, err := h.queries.GetAgentByAgentID(ctx, agentID) if err != nil { return domainHub.AgentModel{}, err } @@ -34,3 +35,24 @@ func (h *HubStore) UpdateAgentByID(ctx context.Context, ID int, updateAgent doma data.ID = int64(ID) return h.queries.UpdateAgentByID(ctx, data) } + +func (h *HubStore) CreateHeartbeat(ctx context.Context, heartbeat domainHub.CreateHeartbeatModel) error { + data := toDBHeartbeat(heartbeat) + return h.queries.InsertHeartbeat(ctx, data) +} + +func (h *HubStore) GetHeartbeatsByIDAfter(ctx context.Context, agentID string, timestamp time.Time) ([]domainHub.HeartbeatModel, error) { + data := gen.SelectHeartbeatsAfterParams{AgentID: agentID, Timestamp: timestamp} + heartbeats, err := h.queries.SelectHeartbeatsAfter(ctx, data) + if err != nil { + return []domainHub.HeartbeatModel{}, err + } + + heartbeatsModel := make([]domainHub.HeartbeatModel, len(heartbeats)) + + for i, heartbeat := range heartbeats { + heartbeatsModel[i] = toHeartBeatModel(heartbeat) + } + + return heartbeatsModel, nil +} diff --git a/hub/internal/utils/notifier/status.go b/hub/internal/utils/notifier/status.go new file mode 100644 index 0000000..08f2156 --- /dev/null +++ b/hub/internal/utils/notifier/status.go @@ -0,0 +1,28 @@ +package notifier + +import "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager" + +// Временная заглушка +type StatusNotifier struct { +} + +type Status struct { + agentID string + online bool +} + +func NewStatusNotifier() *StatusNotifier { + return &StatusNotifier{} +} + +func (s *StatusNotifier) New(agentID string) connection_manager.StatusAgent { + return &Status{agentID: agentID} +} + +func (s *Status) Online() { + s.online = true +} + +func (s *Status) Offline() { + s.online = false +} diff --git a/shared/config/config.go b/shared/config/config.go index 5ce002d..d7667c7 100644 --- a/shared/config/config.go +++ b/shared/config/config.go @@ -9,14 +9,9 @@ import ( ) type Config struct { - DBHost string `env:"DB_HOST"` - DBPort int `env:"DB_PORT"` - DBPassword string `env:"DB_PASS"` - DBUser string `env:"DB_USER"` - DBName string `env:"DB_NAME"` - LogLevel string `env:"LOG_LEVEL" env-default:"INFO"` - Mode string `env:"MODE" env-default:"DEV"` - Port int `env:"PORT" env-default:"9000"` + LogLevel string `env:"LOG_LEVEL" env-default:"INFO"` + Mode string `env:"MODE" env-default:"DEV"` + Port int `env:"PORT" env-default:"9000"` } func NewConfig() (*Config, error) { @@ -33,16 +28,6 @@ func NewConfig() (*Config, error) { return &cfg, nil } -func (c *Config) GetURLPostgres() string { - return fmt.Sprintf( - "postgres://%v:%v@%v:%v/%v?sslmode=disable", - c.DBUser, - c.DBPassword, - c.DBHost, - c.DBPort, - c.DBName) -} - func (c *Config) GetLogLevel() zerolog.Level { level, err := zerolog.ParseLevel(c.LogLevel) if err != nil { diff --git a/shared/domain/mapper.go b/shared/domain/mapper.go index 587d361..6a4e0f6 100644 --- a/shared/domain/mapper.go +++ b/shared/domain/mapper.go @@ -1,35 +1,38 @@ package domain import ( + "fmt" + pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" ) -func ToDomainAgentRequest(request *pb.RegisterAgentRequest) RegisterAgentRequest { +func ToDomainAgentRequest(request *pb.RegisterAgentRequest) (RegisterAgentRequest, error) { if request == nil { - return RegisterAgentRequest{} + return RegisterAgentRequest{}, fmt.Errorf("request is empty") } return RegisterAgentRequest{ - AgentID: request.AgentId, - AgentName: request.AgentName, + AgentVersion: request.Version, + AgentID: request.AgentId, + AgentName: request.AgentName, Host: HostInfo{ System: request.Host.System, Hostname: request.Host.Hostname, Arch: request.Host.Arch, }, Capabilities: ToDomainCapabilities(request.Capability), - } + }, nil } -func ToDomainAgentResponse(response *pb.RegisterAgentResponse) RegisterAgentResponse { +func ToDomainAgentResponse(response *pb.RegisterAgentResponse) (RegisterAgentResponse, error) { if response == nil { - return RegisterAgentResponse{} + return RegisterAgentResponse{}, fmt.Errorf("request is empty") } return RegisterAgentResponse{ AgentID: response.AgentId, Heartbeat: int(response.HeartbeatIntervalSecond), - } + }, nil } func ToDomainCapabilities(capability []*pb.Capability) []Capability {