From c41cbc3c2f42492eb3d610696f8003f4ee4adc43 Mon Sep 17 00:00:00 2001 From: lorsan Date: Sat, 23 May 2026 14:03:43 +0300 Subject: [PATCH] feat: add connect stream to connection manager --- hub/internal/app/app.go | 10 +++++-- hub/internal/rpc/server.go | 30 ++++++++++++++----- .../service/connection_manager/agent.go | 6 ++-- .../service/connection_manager/interface.go | 6 ++-- .../service/connection_manager/manager.go | 2 +- .../service/connection_manager/mock_test.go | 2 +- hub/internal/utils/notifier/status.go | 28 +++++++++++++++++ 7 files changed, 65 insertions(+), 19 deletions(-) create mode 100644 hub/internal/utils/notifier/status.go diff --git a/hub/internal/app/app.go b/hub/internal/app/app.go index 0c90402..5d5072c 100644 --- a/hub/internal/app/app.go +++ b/hub/internal/app/app.go @@ -9,8 +9,10 @@ import ( hubdir "github.com/lorsanstand/HomeOps-Hub/hub/internal" "github.com/lorsanstand/HomeOps-Hub/hub/internal/migrator" grpcserv "github.com/lorsanstand/HomeOps-Hub/hub/internal/rpc" + "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager" "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/hub_service" "github.com/lorsanstand/HomeOps-Hub/hub/internal/store" + "github.com/lorsanstand/HomeOps-Hub/hub/internal/utils/notifier" "github.com/lorsanstand/HomeOps-Hub/shared/config" "github.com/lorsanstand/HomeOps-Hub/shared/log" _ "github.com/mattn/go-sqlite3" @@ -63,19 +65,21 @@ func (a *App) Run() { hubStore := store.NewHubStore(DBConn) hubService := hub_service.NewHubService(hubStore, a.log) + statusNotifier := notifier.NewStatusNotifier() + connManger := connection_manager.NewConnectionManager(hubStore, statusNotifier, a.log) a.log.Info().Msg("starting hub service") - err = a.hubServe(hubService) + err = a.hubServe(hubService, connManger) if err != nil { a.log.Error().Err(err).Msg("hub service failed to start") return } } -func (a *App) hubServe(hubService *hub_service.HubService) error { +func (a *App) hubServe(hubService *hub_service.HubService, manager *connection_manager.ConnectionManager) error { address := fmt.Sprintf("0.0.0.0:%v", a.cfg.Port) - server := grpcserv.NewHubHandler(hubService, a.log) + server := grpcserv.NewHubHandler(hubService, manager, a.log) lis, err := net.Listen("tcp", address) if err != nil { diff --git a/hub/internal/rpc/server.go b/hub/internal/rpc/server.go index d9de289..242d39f 100644 --- a/hub/internal/rpc/server.go +++ b/hub/internal/rpc/server.go @@ -2,8 +2,10 @@ package rpc import ( "context" + "fmt" pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" + "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager" "github.com/lorsanstand/HomeOps-Hub/shared/domain" "github.com/rs/zerolog" "google.golang.org/grpc" @@ -14,15 +16,20 @@ type HubService interface { RegisterAgent(ctx context.Context, data domain.RegisterAgentRequest) (domain.RegisterAgentResponse, error) } -type HubHandler struct { - pb.UnimplementedHubServer - log zerolog.Logger - GrpcServer *grpc.Server - hub HubService +type ConnectionManager interface { + NewConnection(stream connection_manager.StreamConn) error } -func NewHubHandler(HubServ HubService, logger zerolog.Logger) *HubHandler { - hub := &HubHandler{log: logger, hub: HubServ} +type HubHandler struct { + pb.UnimplementedHubServer + log zerolog.Logger + GrpcServer *grpc.Server + hub HubService + streamManager ConnectionManager +} + +func NewHubHandler(HubServ HubService, manager ConnectionManager, logger zerolog.Logger) *HubHandler { + hub := &HubHandler{log: logger, hub: HubServ, streamManager: manager} grpcServer := grpc.NewServer() pb.RegisterHubServer(grpcServer, hub) @@ -42,9 +49,16 @@ func (h *HubHandler) RegisterAgent(ctx context.Context, request *pb.RegisterAgen data := domain.ToDomainAgentRequest(request) resp, err := h.hub.RegisterAgent(ctx, data) if err != nil { - h.log.Error().Err(err).Str("agentID", request.AgentId).Msg("register agent request failed") return domain.ToGRPCAgentResponse(resp), err } h.log.Info().Str("agentID", resp.AgentID).Msg("register agent request completed") return domain.ToGRPCAgentResponse(resp), nil } + +func (h *HubHandler) StreamConnection(stream grpc.BidiStreamingServer[pb.AgentEvent, pb.ServerCommandRequest]) error { + err := h.streamManager.NewConnection(stream) + if err != nil { + return fmt.Errorf("accept connection: %w", err) + } + return nil +} diff --git a/hub/internal/service/connection_manager/agent.go b/hub/internal/service/connection_manager/agent.go index fea989c..a391b98 100644 --- a/hub/internal/service/connection_manager/agent.go +++ b/hub/internal/service/connection_manager/agent.go @@ -13,10 +13,10 @@ import ( ) type AgentConnection struct { - stream streamConn + stream StreamConn heartbeat heartbeatStore log zerolog.Logger - status statusAgent + status StatusAgent AgentID string response *ResponseStore ctx context.Context @@ -24,7 +24,7 @@ type AgentConnection struct { heartbeatTimeoutMS int } -func newAgentConnection(agentID string, stream streamConn, heartbeat heartbeatStore, status statusAgent, heartbeatTimeoutMS int, logger zerolog.Logger) *AgentConnection { +func newAgentConnection(agentID string, stream StreamConn, heartbeat heartbeatStore, status StatusAgent, heartbeatTimeoutMS int, logger zerolog.Logger) *AgentConnection { response := NewResponseStore() logger = logger.With().Str("agentID", agentID).Logger() ctx, cancel := context.WithCancel(stream.Context()) diff --git a/hub/internal/service/connection_manager/interface.go b/hub/internal/service/connection_manager/interface.go index 70c2a95..a6844b9 100644 --- a/hub/internal/service/connection_manager/interface.go +++ b/hub/internal/service/connection_manager/interface.go @@ -7,7 +7,7 @@ import ( domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" ) -type streamConn interface { +type StreamConn interface { Send(request *pb.ServerCommandRequest) error Recv() (*pb.AgentEvent, error) Context() context.Context @@ -17,11 +17,11 @@ type heartbeatStore interface { CreateHeartbeat(ctx context.Context, heartbeat domainHub.CreateHeartbeatModel) error } -type statusAgent interface { +type StatusAgent interface { Offline() Online() } type statusNotifier interface { - New(AgentID string) statusAgent + New(agentID string) StatusAgent } diff --git a/hub/internal/service/connection_manager/manager.go b/hub/internal/service/connection_manager/manager.go index 09d78eb..13acc81 100644 --- a/hub/internal/service/connection_manager/manager.go +++ b/hub/internal/service/connection_manager/manager.go @@ -21,7 +21,7 @@ func NewConnectionManager(heartbeat heartbeatStore, status statusNotifier, logge return &ConnectionManager{heartbeat: heartbeat, log: logger, status: status, agentConnStore: NewAgentConnStore()} } -func (c *ConnectionManager) NewConnection(stream streamConn) error { +func (c *ConnectionManager) NewConnection(stream StreamConn) error { AgentID, err := agentIDFromMetadata(stream.Context()) if err != nil { c.log.Error().Err(err).Msg("missing agent id in metadata") diff --git a/hub/internal/service/connection_manager/mock_test.go b/hub/internal/service/connection_manager/mock_test.go index 5ae528b..25237ad 100644 --- a/hub/internal/service/connection_manager/mock_test.go +++ b/hub/internal/service/connection_manager/mock_test.go @@ -109,7 +109,7 @@ type statusNotifierMock struct { agentIDCh chan string } -func (s *statusNotifierMock) New(AgentID string) statusAgent { +func (s *statusNotifierMock) New(AgentID string) StatusAgent { select { case s.agentIDCh <- AgentID: default: diff --git a/hub/internal/utils/notifier/status.go b/hub/internal/utils/notifier/status.go new file mode 100644 index 0000000..08f2156 --- /dev/null +++ b/hub/internal/utils/notifier/status.go @@ -0,0 +1,28 @@ +package notifier + +import "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager" + +// Временная заглушка +type StatusNotifier struct { +} + +type Status struct { + agentID string + online bool +} + +func NewStatusNotifier() *StatusNotifier { + return &StatusNotifier{} +} + +func (s *StatusNotifier) New(agentID string) connection_manager.StatusAgent { + return &Status{agentID: agentID} +} + +func (s *Status) Online() { + s.online = true +} + +func (s *Status) Offline() { + s.online = false +}