feat: add connection manager

This commit is contained in:
2026-05-17 12:37:51 +03:00
parent 5b297d5c1f
commit bef8409b8c
10 changed files with 181 additions and 107 deletions
@@ -9,30 +9,23 @@ import (
"github.com/google/uuid"
pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops"
domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain"
"github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager/store"
"github.com/rs/zerolog"
)
type statusAgent interface {
Offline()
Online()
}
// использовать sync.Pool что бы переиспользвоать этот обьект
type AgentConnection struct {
stream streamConn
heartbeat heartbeatStore
log zerolog.Logger
status statusAgent
AgentID string
response *store.ResponseStore
response *ResponseStore
ctx context.Context
cancel context.CancelFunc
heartbeatTimeoutMS int
}
func newAgentConnection(agentID string, stream streamConn, heartbeat heartbeatStore, status statusAgent, heartbeatTimeoutMS int, logger zerolog.Logger) *AgentConnection {
response := store.NewResponseStore()
response := NewResponseStore()
logger = logger.With().Str("agentID", agentID).Logger()
ctx, cancel := context.WithCancel(stream.Context())
return &AgentConnection{stream: stream, response: response, heartbeat: heartbeat, log: logger, AgentID: agentID, status: status, ctx: ctx, cancel: cancel, heartbeatTimeoutMS: heartbeatTimeoutMS}
@@ -135,7 +128,7 @@ func (a *AgentConnection) Execute(ctx context.Context, request domainHub.AgentRe
select {
case <-a.ctx.Done():
return domainHub.AgentResponse{}, ConnectionCloseErr
return domainHub.AgentResponse{}, ErrConnectionClose
case <-ctx.Done():
return domainHub.AgentResponse{}, ctx.Err()
case response := <-ch:
@@ -15,13 +15,13 @@ import (
)
type streamMock struct {
recvCh chan *pb.AgentEvent
sendCh chan *pb.ServerCommandRequest
closeCh chan struct{}
ctx context.Context
mu sync.Mutex
sendErr error
recvErr error
recvCh chan *pb.AgentEvent
sendCh chan *pb.ServerCommandRequest
closeCh chan struct{}
ctx context.Context
mu sync.Mutex
sendErr error
recvErr error
closeOnce sync.Once
}
@@ -260,7 +260,7 @@ func TestAgentConnection_HeartbeatTimeout(t *testing.T) {
Args: nil,
TimeOut: 0,
})
assert.ErrorIs(t, err, ConnectionCloseErr)
assert.ErrorIs(t, err, ErrConnectionClose)
wg.Done()
}()
@@ -286,7 +286,7 @@ func TestAgentConnection_ConnectionClose(t *testing.T) {
Args: nil,
TimeOut: 0,
})
assert.ErrorIs(t, err, ConnectionCloseErr)
assert.ErrorIs(t, err, ErrConnectionClose)
wg.Done()
}()
@@ -365,7 +365,7 @@ func TestAgentConnection_ExecuteConnectionCanceled(t *testing.T) {
h.cancel()
_, err := h.agent.Execute(context.Background(), domainHub.AgentRequest{Name: "test"})
assert.ErrorIs(t, err, ConnectionCloseErr)
assert.ErrorIs(t, err, ErrConnectionClose)
}
func TestAgentConnection_UnknownResponseID(t *testing.T) {
@@ -1,6 +0,0 @@
package connection_manager
type agentStatus struct {
AgentID string
Online bool
}
@@ -2,4 +2,6 @@ package connection_manager
import "errors"
var ConnectionCloseErr error = errors.New("connection close")
var ErrConnectionClose = errors.New("connection close")
var ErrNotFoundConn = errors.New("agent connection not found")
@@ -17,3 +17,12 @@ type streamConn interface {
type heartbeatStore interface {
CreateHeartbeat(ctx context.Context, heartbeat domainHub.CreateHeartbeatModel) error
}
type statusAgent interface {
Offline()
Online()
}
type statusNotifier interface {
New(AgentID string) statusAgent
}
@@ -0,0 +1,71 @@
package connection_manager
import (
"context"
"fmt"
"github.com/rs/zerolog"
"google.golang.org/grpc/metadata"
)
const heartbeatTimeoutMS = 6000
type ConnectionManager struct {
heartbeat heartbeatStore
log zerolog.Logger
status statusNotifier
agentConnStore *AgentConnStore
}
func NewConnectionManager(heartbeat heartbeatStore, status statusNotifier, logger zerolog.Logger) *ConnectionManager {
return &ConnectionManager{heartbeat: heartbeat, log: logger, status: status, agentConnStore: NewAgentConnStore()}
}
func (c *ConnectionManager) NewConnection(stream streamConn) {
AgentID, err := agentIDFromMetadata(stream.Context())
if err != nil {
c.log.Error().Err(err).Msg("missing agent id in metadata")
return
}
c.log.Info().Str("agentID", AgentID).Msg("connection accepted")
status := c.status.New(AgentID)
agent := newAgentConnection(AgentID, stream, c.heartbeat, status, heartbeatTimeoutMS, c.log)
c.agentConnStore.Add(AgentID, agent)
go func() {
c.log.Debug().Str("agentID", AgentID).Msg("start listening")
err := agent.Listen()
if err != nil {
c.log.Error().Err(err).Msg("listening agent stopped")
}
c.agentConnStore.Delete(AgentID)
}()
}
func (c *ConnectionManager) GetConnection(AgentID string) (*AgentConnection, error) {
agent := c.agentConnStore.Get(AgentID)
if agent == nil {
return nil, ErrNotFoundConn
}
return agent, nil
}
func (c *ConnectionManager) GetAllAgentID() []string {
return c.agentConnStore.GetAllAgentID()
}
func agentIDFromMetadata(ctx context.Context) (string, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", fmt.Errorf("metadata not found")
}
values := md.Get("agent-id")
if len(values) == 0 || values[0] == "" {
return "", fmt.Errorf("agent-id not found")
}
return values[0], nil
}
@@ -0,0 +1,82 @@
package connection_manager
import (
"sync"
domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain"
)
type AgentConnStore struct {
mutex sync.RWMutex
store map[string]*AgentConnection
}
func NewAgentConnStore() *AgentConnStore {
return &AgentConnStore{store: make(map[string]*AgentConnection)}
}
func (a *AgentConnStore) Get(agentID string) *AgentConnection {
a.mutex.RLock()
defer a.mutex.RUnlock()
return a.store[agentID]
}
func (a *AgentConnStore) Add(agentID string, agentConn *AgentConnection) {
a.mutex.Lock()
defer a.mutex.Unlock()
a.store[agentID] = agentConn
}
func (a *AgentConnStore) Delete(agentID string) {
a.mutex.Lock()
defer a.mutex.Unlock()
delete(a.store, agentID)
}
func (a *AgentConnStore) Pop(agentID string) *AgentConnection {
a.mutex.Lock()
defer a.mutex.Unlock()
agent := a.store[agentID]
delete(a.store, agentID)
return agent
}
func (a *AgentConnStore) GetAllAgentID() []string {
a.mutex.RLock()
defer a.mutex.RUnlock()
var IDs []string
for ID := range a.store {
IDs = append(IDs, ID)
}
return IDs
}
type ResponseStore struct {
store map[string]chan domainHub.AgentResponse
mutex sync.RWMutex
}
func NewResponseStore() *ResponseStore {
data := make(map[string]chan domainHub.AgentResponse)
return &ResponseStore{store: data}
}
func (r *ResponseStore) Write(responseID string, channel chan domainHub.AgentResponse) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.store[responseID] = channel
}
func (r *ResponseStore) Read(responseID string) (chan domainHub.AgentResponse, bool) {
r.mutex.RLock()
defer r.mutex.RUnlock()
ch, ok := r.store[responseID]
return ch, ok
}
func (r *ResponseStore) Delete(responseID string) {
r.mutex.Lock()
defer r.mutex.Unlock()
delete(r.store, responseID)
}
@@ -1,36 +0,0 @@
package store
import (
"sync"
domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain"
)
type ResponseStore struct {
store map[string]chan domainHub.AgentResponse
mutex sync.RWMutex
}
func NewResponseStore() *ResponseStore {
data := make(map[string]chan domainHub.AgentResponse)
return &ResponseStore{store: data}
}
func (r *ResponseStore) Write(responseID string, channel chan domainHub.AgentResponse) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.store[responseID] = channel
}
func (r *ResponseStore) Read(responseID string) (chan domainHub.AgentResponse, bool) {
r.mutex.RLock()
defer r.mutex.RUnlock()
ch, ok := r.store[responseID]
return ch, ok
}
func (r *ResponseStore) Delete(responseID string) {
r.mutex.Lock()
defer r.mutex.Unlock()
delete(r.store, responseID)
}
@@ -1,42 +0,0 @@
package store
import (
"sync"
"github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager"
)
type AgentConnStore struct {
mutex sync.RWMutex
store map[string]*connection_manager.AgentConnection
}
func NewAgentConnStore() *AgentConnStore {
return &AgentConnStore{store: make(map[string]*connection_manager.AgentConnection)}
}
func (a *AgentConnStore) Get(agentID string) *connection_manager.AgentConnection {
a.mutex.RLock()
defer a.mutex.RUnlock()
return a.store[agentID]
}
func (a *AgentConnStore) Add(agentConn *connection_manager.AgentConnection) {
a.mutex.Lock()
defer a.mutex.Unlock()
a.store[agentConn.AgentID] = agentConn
}
func (a *AgentConnStore) Delete(agentID string) {
a.mutex.Lock()
defer a.mutex.Unlock()
delete(a.store, agentID)
}
func (a *AgentConnStore) Pop(agentID string) *connection_manager.AgentConnection {
a.mutex.Lock()
defer a.mutex.Unlock()
agent := a.store[agentID]
delete(a.store, agentID)
return agent
}