feat: add connect stream to connection manager

This commit is contained in:
2026-05-23 14:03:43 +03:00
parent b157d7c32d
commit c41cbc3c2f
7 changed files with 65 additions and 19 deletions
+7 -3
View File
@@ -9,8 +9,10 @@ import (
hubdir "github.com/lorsanstand/HomeOps-Hub/hub/internal"
"github.com/lorsanstand/HomeOps-Hub/hub/internal/migrator"
grpcserv "github.com/lorsanstand/HomeOps-Hub/hub/internal/rpc"
"github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager"
"github.com/lorsanstand/HomeOps-Hub/hub/internal/service/hub_service"
"github.com/lorsanstand/HomeOps-Hub/hub/internal/store"
"github.com/lorsanstand/HomeOps-Hub/hub/internal/utils/notifier"
"github.com/lorsanstand/HomeOps-Hub/shared/config"
"github.com/lorsanstand/HomeOps-Hub/shared/log"
_ "github.com/mattn/go-sqlite3"
@@ -63,19 +65,21 @@ func (a *App) Run() {
hubStore := store.NewHubStore(DBConn)
hubService := hub_service.NewHubService(hubStore, a.log)
statusNotifier := notifier.NewStatusNotifier()
connManger := connection_manager.NewConnectionManager(hubStore, statusNotifier, a.log)
a.log.Info().Msg("starting hub service")
err = a.hubServe(hubService)
err = a.hubServe(hubService, connManger)
if err != nil {
a.log.Error().Err(err).Msg("hub service failed to start")
return
}
}
func (a *App) hubServe(hubService *hub_service.HubService) error {
func (a *App) hubServe(hubService *hub_service.HubService, manager *connection_manager.ConnectionManager) error {
address := fmt.Sprintf("0.0.0.0:%v", a.cfg.Port)
server := grpcserv.NewHubHandler(hubService, a.log)
server := grpcserv.NewHubHandler(hubService, manager, a.log)
lis, err := net.Listen("tcp", address)
if err != nil {
+17 -3
View File
@@ -2,8 +2,10 @@ package rpc
import (
"context"
"fmt"
pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops"
"github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager"
"github.com/lorsanstand/HomeOps-Hub/shared/domain"
"github.com/rs/zerolog"
"google.golang.org/grpc"
@@ -14,15 +16,20 @@ type HubService interface {
RegisterAgent(ctx context.Context, data domain.RegisterAgentRequest) (domain.RegisterAgentResponse, error)
}
type ConnectionManager interface {
NewConnection(stream connection_manager.StreamConn) error
}
type HubHandler struct {
pb.UnimplementedHubServer
log zerolog.Logger
GrpcServer *grpc.Server
hub HubService
streamManager ConnectionManager
}
func NewHubHandler(HubServ HubService, logger zerolog.Logger) *HubHandler {
hub := &HubHandler{log: logger, hub: HubServ}
func NewHubHandler(HubServ HubService, manager ConnectionManager, logger zerolog.Logger) *HubHandler {
hub := &HubHandler{log: logger, hub: HubServ, streamManager: manager}
grpcServer := grpc.NewServer()
pb.RegisterHubServer(grpcServer, hub)
@@ -42,9 +49,16 @@ func (h *HubHandler) RegisterAgent(ctx context.Context, request *pb.RegisterAgen
data := domain.ToDomainAgentRequest(request)
resp, err := h.hub.RegisterAgent(ctx, data)
if err != nil {
h.log.Error().Err(err).Str("agentID", request.AgentId).Msg("register agent request failed")
return domain.ToGRPCAgentResponse(resp), err
}
h.log.Info().Str("agentID", resp.AgentID).Msg("register agent request completed")
return domain.ToGRPCAgentResponse(resp), nil
}
func (h *HubHandler) StreamConnection(stream grpc.BidiStreamingServer[pb.AgentEvent, pb.ServerCommandRequest]) error {
err := h.streamManager.NewConnection(stream)
if err != nil {
return fmt.Errorf("accept connection: %w", err)
}
return nil
}
@@ -13,10 +13,10 @@ import (
)
type AgentConnection struct {
stream streamConn
stream StreamConn
heartbeat heartbeatStore
log zerolog.Logger
status statusAgent
status StatusAgent
AgentID string
response *ResponseStore
ctx context.Context
@@ -24,7 +24,7 @@ type AgentConnection struct {
heartbeatTimeoutMS int
}
func newAgentConnection(agentID string, stream streamConn, heartbeat heartbeatStore, status statusAgent, heartbeatTimeoutMS int, logger zerolog.Logger) *AgentConnection {
func newAgentConnection(agentID string, stream StreamConn, heartbeat heartbeatStore, status StatusAgent, heartbeatTimeoutMS int, logger zerolog.Logger) *AgentConnection {
response := NewResponseStore()
logger = logger.With().Str("agentID", agentID).Logger()
ctx, cancel := context.WithCancel(stream.Context())
@@ -7,7 +7,7 @@ import (
domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain"
)
type streamConn interface {
type StreamConn interface {
Send(request *pb.ServerCommandRequest) error
Recv() (*pb.AgentEvent, error)
Context() context.Context
@@ -17,11 +17,11 @@ type heartbeatStore interface {
CreateHeartbeat(ctx context.Context, heartbeat domainHub.CreateHeartbeatModel) error
}
type statusAgent interface {
type StatusAgent interface {
Offline()
Online()
}
type statusNotifier interface {
New(AgentID string) statusAgent
New(agentID string) StatusAgent
}
@@ -21,7 +21,7 @@ func NewConnectionManager(heartbeat heartbeatStore, status statusNotifier, logge
return &ConnectionManager{heartbeat: heartbeat, log: logger, status: status, agentConnStore: NewAgentConnStore()}
}
func (c *ConnectionManager) NewConnection(stream streamConn) error {
func (c *ConnectionManager) NewConnection(stream StreamConn) error {
AgentID, err := agentIDFromMetadata(stream.Context())
if err != nil {
c.log.Error().Err(err).Msg("missing agent id in metadata")
@@ -109,7 +109,7 @@ type statusNotifierMock struct {
agentIDCh chan string
}
func (s *statusNotifierMock) New(AgentID string) statusAgent {
func (s *statusNotifierMock) New(AgentID string) StatusAgent {
select {
case s.agentIDCh <- AgentID:
default:
+28
View File
@@ -0,0 +1,28 @@
package notifier
import "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager"
// Временная заглушка
type StatusNotifier struct {
}
type Status struct {
agentID string
online bool
}
func NewStatusNotifier() *StatusNotifier {
return &StatusNotifier{}
}
func (s *StatusNotifier) New(agentID string) connection_manager.StatusAgent {
return &Status{agentID: agentID}
}
func (s *Status) Online() {
s.online = true
}
func (s *Status) Offline() {
s.online = false
}