diff --git a/hub/internal/service/connection_manager/agent.go b/hub/internal/service/connection_manager/agent.go index 47cfcdf..c932fce 100644 --- a/hub/internal/service/connection_manager/agent.go +++ b/hub/internal/service/connection_manager/agent.go @@ -44,22 +44,23 @@ func (a *AgentConnection) Listen() error { go a.listenHeartbeat(heartbeatsChan) defer close(heartbeatsChan) + defer func() { + err := a.Close() + if err != nil { + a.log.Warn().Err(err).Msg("failed stream close") + } + }() + for { select { case <-a.ctx.Done(): - err := a.stream.Close() - if err != nil { - a.log.Warn().Err(err).Msg("failed stream close") - } return a.ctx.Err() default: agentEvent, err := a.stream.Recv() if err == io.EOF { - a.cancel() return nil } if err != nil { - a.cancel() return fmt.Errorf("stream: %w", err) } @@ -141,3 +142,8 @@ func (a *AgentConnection) Execute(ctx context.Context, request domainHub.AgentRe return response, nil } } + +func (a *AgentConnection) Close() error { + a.cancel() + return a.stream.Close() +}