diff --git a/hub/internal/service/connection_manager/agent.go b/hub/internal/service/connection_manager/agent.go index 624681b..c205849 100644 --- a/hub/internal/service/connection_manager/agent.go +++ b/hub/internal/service/connection_manager/agent.go @@ -6,6 +6,7 @@ import ( "io" "time" + "github.com/google/uuid" pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager/store" @@ -25,50 +26,61 @@ type AgentConnection struct { status statusAgent AgentID string response *store.ResponseStore + ctx context.Context + cancel context.CancelFunc } func newAgentConnection(agentID string, stream streamConn, heartbeat heartbeatStore, status statusAgent, logger zerolog.Logger) *AgentConnection { response := store.NewResponseStore() - return &AgentConnection{stream: stream, response: response, heartbeat: heartbeat, log: logger, AgentID: agentID, status: status} + logger = logger.With().Str("agentID", agentID).Logger() + ctx, cancel := context.WithCancel(stream.Context()) + return &AgentConnection{stream: stream, response: response, heartbeat: heartbeat, log: logger, AgentID: agentID, status: status, ctx: ctx, cancel: cancel} } func (a *AgentConnection) Listen() error { - ctx := a.stream.Context() defer a.status.Offline() heartbeatsChan := make(chan domainHub.CreateHeartbeatModel, 5) - go a.listenHeartbeat(ctx, heartbeatsChan) + go a.listenHeartbeat(heartbeatsChan) + defer close(heartbeatsChan) for { select { - case <-ctx.Done(): - return ctx.Err() + case <-a.ctx.Done(): + err := a.stream.Close() + if err != nil { + a.log.Warn().Err(err).Msg("failed stream close") + } + return a.ctx.Err() default: agentEvent, err := a.stream.Recv() if err == io.EOF { + a.cancel() return nil } if err != nil { - return fmt.Errorf("stream error: %w", err) + a.cancel() + return fmt.Errorf("stream: %w", err) } switch x := agentEvent.Event.(type) { case *pb.AgentEvent_Heartbeat: heartbeat := toCreateHeartbeatModel(a.AgentID, x) - - a.log.Debug(). - Str("agentID", heartbeat.AgentID). - Float64("cpu usage", heartbeat.Metrics.CpuUsage). - Float64("disk usage", heartbeat.Metrics.DiskUsage). - Float64("memory usage", heartbeat.Metrics.MemoryUsage).Msg("") - heartbeatsChan <- heartbeat + case *pb.AgentEvent_CommandResponse: + response := toAgentResponse(x) + ch, ok := a.response.Read(response.RequestID) + if !ok { + a.log.Warn().Str("requestID", response.RequestID).Msg("not found channel for send response") + continue + } + ch <- response } } } } -func (a *AgentConnection) listenHeartbeat(ctx context.Context, heartbeats <-chan domainHub.CreateHeartbeatModel) { +func (a *AgentConnection) listenHeartbeat(heartbeats <-chan domainHub.CreateHeartbeatModel) { lastHeartbeat := 0 timer := time.NewTicker(5 * time.Second) defer timer.Stop() @@ -81,18 +93,51 @@ func (a *AgentConnection) listenHeartbeat(ctx context.Context, heartbeats <-chan a.status.Offline() continue } - a.log.Warn().Str("agentID", a.AgentID).Msg("agent did not send heartbeat") - a.stream.Close() + + a.log.Warn().Msg("agent not send heartbeat") + a.cancel() return case heartbeat := <-heartbeats: + a.log.Debug(). + Float64("cpu usage", heartbeat.Metrics.CpuUsage). + Float64("disk usage", heartbeat.Metrics.DiskUsage). + Float64("memory usage", heartbeat.Metrics.MemoryUsage).Msg("") + a.status.Online() lastHeartbeat = 0 - err := a.heartbeat.CreateHeartbeat(ctx, heartbeat) + + err := a.heartbeat.CreateHeartbeat(a.ctx, heartbeat) if err != nil { - a.log.Error().Err(err).Str("agentID", heartbeat.AgentID).Msg("failed to write heartbeat") + a.log.Error().Err(err).Msg("failed to write heartbeat") } - case <-ctx.Done(): + case <-a.ctx.Done(): return } } } + +func (a *AgentConnection) Execute(ctx context.Context, request domainHub.AgentRequest) (domainHub.AgentResponse, error) { + requestID := uuid.New().String() + ch := make(chan domainHub.AgentResponse, 1) + defer close(ch) + + a.response.Write(requestID, ch) + defer a.response.Delete(requestID) + + err := a.stream.Send(new(toGRPCCommandRequest(request))) + if err != nil { + return domainHub.AgentResponse{}, fmt.Errorf("execute command: %w", err) + } + + a.log.Info().Str("requestID", requestID).Str("command", request.Name).Msg("send command") + + select { + case <-a.ctx.Done(): + return domainHub.AgentResponse{}, fmt.Errorf("connection close") + case <-ctx.Done(): + return domainHub.AgentResponse{}, fmt.Errorf("request timeout") + case response := <-ch: + a.log.Info().Str("requestID", response.RequestID).Msg("received response") + return response, nil + } +} diff --git a/hub/internal/service/connection_manager/domain.go b/hub/internal/service/connection_manager/domain.go new file mode 100644 index 0000000..bdab261 --- /dev/null +++ b/hub/internal/service/connection_manager/domain.go @@ -0,0 +1,6 @@ +package connection_manager + +type agentStatus struct { + AgentID string + Online bool +} diff --git a/hub/internal/service/connection_manager/interface.go b/hub/internal/service/connection_manager/interface.go index c21a83f..2ce54c8 100644 --- a/hub/internal/service/connection_manager/interface.go +++ b/hub/internal/service/connection_manager/interface.go @@ -8,7 +8,7 @@ import ( ) type streamConn interface { - Send(request *pb.ServerCommandRequest, err error) + Send(request *pb.ServerCommandRequest) error Recv() (*pb.AgentEvent, error) Context() context.Context Close() error diff --git a/hub/internal/service/connection_manager/mapper.go b/hub/internal/service/connection_manager/mapper.go index 2abd303..12783e0 100644 --- a/hub/internal/service/connection_manager/mapper.go +++ b/hub/internal/service/connection_manager/mapper.go @@ -20,3 +20,22 @@ func toCreateHeartbeatModel(agentID string, heartbeat *pb.AgentEvent_Heartbeat) }, } } + +func toGRPCCommandRequest(request domainHub.AgentRequest) pb.ServerCommandRequest { + return pb.ServerCommandRequest{ + RequestId: request.RequestID, + Name: request.Name, + TimeoutSeconds: int64(request.TimeOut), + Args: request.Args, + } +} + +func toAgentResponse(response *pb.AgentEvent_CommandResponse) domainHub.AgentResponse { + return domainHub.AgentResponse{ + RequestID: response.CommandResponse.RequestId, + Success: response.CommandResponse.Success, + Error: response.CommandResponse.Error, + Output: response.CommandResponse.Output, + ExecTimeMS: int(response.CommandResponse.ExecTimeMs), + } +} diff --git a/hub/internal/service/connection_manager/store/agent.go b/hub/internal/service/connection_manager/store/agent.go index 730d96c..a7ddd32 100644 --- a/hub/internal/service/connection_manager/store/agent.go +++ b/hub/internal/service/connection_manager/store/agent.go @@ -22,10 +22,11 @@ func (r *ResponseStore) Write(responseID string, channel chan domainHub.AgentRes r.store[responseID] = channel } -func (r *ResponseStore) Read(responseID string) chan domainHub.AgentResponse { +func (r *ResponseStore) Read(responseID string) (chan domainHub.AgentResponse, bool) { r.mutex.RLock() defer r.mutex.RUnlock() - return r.store[responseID] + ch, ok := r.store[responseID] + return ch, ok } func (r *ResponseStore) Delete(responseID string) {