feat: create execute command

This commit is contained in:
2026-05-11 17:40:02 +03:00
parent d27eff2c57
commit 6046b8ae9d
5 changed files with 93 additions and 22 deletions
@@ -6,6 +6,7 @@ import (
"io" "io"
"time" "time"
"github.com/google/uuid"
pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops"
domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain"
"github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager/store" "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager/store"
@@ -25,50 +26,61 @@ type AgentConnection struct {
status statusAgent status statusAgent
AgentID string AgentID string
response *store.ResponseStore response *store.ResponseStore
ctx context.Context
cancel context.CancelFunc
} }
func newAgentConnection(agentID string, stream streamConn, heartbeat heartbeatStore, status statusAgent, logger zerolog.Logger) *AgentConnection { func newAgentConnection(agentID string, stream streamConn, heartbeat heartbeatStore, status statusAgent, logger zerolog.Logger) *AgentConnection {
response := store.NewResponseStore() response := store.NewResponseStore()
return &AgentConnection{stream: stream, response: response, heartbeat: heartbeat, log: logger, AgentID: agentID, status: status} logger = logger.With().Str("agentID", agentID).Logger()
ctx, cancel := context.WithCancel(stream.Context())
return &AgentConnection{stream: stream, response: response, heartbeat: heartbeat, log: logger, AgentID: agentID, status: status, ctx: ctx, cancel: cancel}
} }
func (a *AgentConnection) Listen() error { func (a *AgentConnection) Listen() error {
ctx := a.stream.Context()
defer a.status.Offline() defer a.status.Offline()
heartbeatsChan := make(chan domainHub.CreateHeartbeatModel, 5) heartbeatsChan := make(chan domainHub.CreateHeartbeatModel, 5)
go a.listenHeartbeat(ctx, heartbeatsChan) go a.listenHeartbeat(heartbeatsChan)
defer close(heartbeatsChan)
for { for {
select { select {
case <-ctx.Done(): case <-a.ctx.Done():
return ctx.Err() err := a.stream.Close()
if err != nil {
a.log.Warn().Err(err).Msg("failed stream close")
}
return a.ctx.Err()
default: default:
agentEvent, err := a.stream.Recv() agentEvent, err := a.stream.Recv()
if err == io.EOF { if err == io.EOF {
a.cancel()
return nil return nil
} }
if err != nil { if err != nil {
return fmt.Errorf("stream error: %w", err) a.cancel()
return fmt.Errorf("stream: %w", err)
} }
switch x := agentEvent.Event.(type) { switch x := agentEvent.Event.(type) {
case *pb.AgentEvent_Heartbeat: case *pb.AgentEvent_Heartbeat:
heartbeat := toCreateHeartbeatModel(a.AgentID, x) heartbeat := toCreateHeartbeatModel(a.AgentID, x)
a.log.Debug().
Str("agentID", heartbeat.AgentID).
Float64("cpu usage", heartbeat.Metrics.CpuUsage).
Float64("disk usage", heartbeat.Metrics.DiskUsage).
Float64("memory usage", heartbeat.Metrics.MemoryUsage).Msg("")
heartbeatsChan <- heartbeat heartbeatsChan <- heartbeat
case *pb.AgentEvent_CommandResponse:
response := toAgentResponse(x)
ch, ok := a.response.Read(response.RequestID)
if !ok {
a.log.Warn().Str("requestID", response.RequestID).Msg("not found channel for send response")
continue
}
ch <- response
} }
} }
} }
} }
func (a *AgentConnection) listenHeartbeat(ctx context.Context, heartbeats <-chan domainHub.CreateHeartbeatModel) { func (a *AgentConnection) listenHeartbeat(heartbeats <-chan domainHub.CreateHeartbeatModel) {
lastHeartbeat := 0 lastHeartbeat := 0
timer := time.NewTicker(5 * time.Second) timer := time.NewTicker(5 * time.Second)
defer timer.Stop() defer timer.Stop()
@@ -81,18 +93,51 @@ func (a *AgentConnection) listenHeartbeat(ctx context.Context, heartbeats <-chan
a.status.Offline() a.status.Offline()
continue continue
} }
a.log.Warn().Str("agentID", a.AgentID).Msg("agent did not send heartbeat")
a.stream.Close() a.log.Warn().Msg("agent not send heartbeat")
a.cancel()
return return
case heartbeat := <-heartbeats: case heartbeat := <-heartbeats:
a.log.Debug().
Float64("cpu usage", heartbeat.Metrics.CpuUsage).
Float64("disk usage", heartbeat.Metrics.DiskUsage).
Float64("memory usage", heartbeat.Metrics.MemoryUsage).Msg("")
a.status.Online() a.status.Online()
lastHeartbeat = 0 lastHeartbeat = 0
err := a.heartbeat.CreateHeartbeat(ctx, heartbeat)
err := a.heartbeat.CreateHeartbeat(a.ctx, heartbeat)
if err != nil { if err != nil {
a.log.Error().Err(err).Str("agentID", heartbeat.AgentID).Msg("failed to write heartbeat") a.log.Error().Err(err).Msg("failed to write heartbeat")
} }
case <-ctx.Done(): case <-a.ctx.Done():
return return
} }
} }
} }
func (a *AgentConnection) Execute(ctx context.Context, request domainHub.AgentRequest) (domainHub.AgentResponse, error) {
requestID := uuid.New().String()
ch := make(chan domainHub.AgentResponse, 1)
defer close(ch)
a.response.Write(requestID, ch)
defer a.response.Delete(requestID)
err := a.stream.Send(new(toGRPCCommandRequest(request)))
if err != nil {
return domainHub.AgentResponse{}, fmt.Errorf("execute command: %w", err)
}
a.log.Info().Str("requestID", requestID).Str("command", request.Name).Msg("send command")
select {
case <-a.ctx.Done():
return domainHub.AgentResponse{}, fmt.Errorf("connection close")
case <-ctx.Done():
return domainHub.AgentResponse{}, fmt.Errorf("request timeout")
case response := <-ch:
a.log.Info().Str("requestID", response.RequestID).Msg("received response")
return response, nil
}
}
@@ -0,0 +1,6 @@
package connection_manager
type agentStatus struct {
AgentID string
Online bool
}
@@ -8,7 +8,7 @@ import (
) )
type streamConn interface { type streamConn interface {
Send(request *pb.ServerCommandRequest, err error) Send(request *pb.ServerCommandRequest) error
Recv() (*pb.AgentEvent, error) Recv() (*pb.AgentEvent, error)
Context() context.Context Context() context.Context
Close() error Close() error
@@ -20,3 +20,22 @@ func toCreateHeartbeatModel(agentID string, heartbeat *pb.AgentEvent_Heartbeat)
}, },
} }
} }
func toGRPCCommandRequest(request domainHub.AgentRequest) pb.ServerCommandRequest {
return pb.ServerCommandRequest{
RequestId: request.RequestID,
Name: request.Name,
TimeoutSeconds: int64(request.TimeOut),
Args: request.Args,
}
}
func toAgentResponse(response *pb.AgentEvent_CommandResponse) domainHub.AgentResponse {
return domainHub.AgentResponse{
RequestID: response.CommandResponse.RequestId,
Success: response.CommandResponse.Success,
Error: response.CommandResponse.Error,
Output: response.CommandResponse.Output,
ExecTimeMS: int(response.CommandResponse.ExecTimeMs),
}
}
@@ -22,10 +22,11 @@ func (r *ResponseStore) Write(responseID string, channel chan domainHub.AgentRes
r.store[responseID] = channel r.store[responseID] = channel
} }
func (r *ResponseStore) Read(responseID string) chan domainHub.AgentResponse { func (r *ResponseStore) Read(responseID string) (chan domainHub.AgentResponse, bool) {
r.mutex.RLock() r.mutex.RLock()
defer r.mutex.RUnlock() defer r.mutex.RUnlock()
return r.store[responseID] ch, ok := r.store[responseID]
return ch, ok
} }
func (r *ResponseStore) Delete(responseID string) { func (r *ResponseStore) Delete(responseID string) {