From 2e3b74f91ce772432bfa89efcb02b81867a1e501 Mon Sep 17 00:00:00 2001 From: lorsan Date: Fri, 24 Apr 2026 12:02:29 +0300 Subject: [PATCH 01/24] feat: create new grpc streamconnection --- api/gen/homeops/hub.pb.go | 520 ++++++++++++++++++++++++++++++++- api/gen/homeops/hub_grpc.pb.go | 39 ++- api/proto/homeops/hub.proto | 44 +++ 3 files changed, 584 insertions(+), 19 deletions(-) diff --git a/api/gen/homeops/hub.pb.go b/api/gen/homeops/hub.pb.go index 284f28c..03f9fcc 100644 --- a/api/gen/homeops/hub.pb.go +++ b/api/gen/homeops/hub.pb.go @@ -322,6 +322,436 @@ func (x *RegisterAgentResponse) GetAgentId() string { return "" } +type ServerCommandRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + Args map[string]string `protobuf:"bytes,3,rep,name=args,proto3" json:"args,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + TimeoutSeconds int64 `protobuf:"varint,4,opt,name=timeout_seconds,json=timeoutSeconds,proto3" json:"timeout_seconds,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ServerCommandRequest) Reset() { + *x = ServerCommandRequest{} + mi := &file_homeops_hub_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ServerCommandRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ServerCommandRequest) ProtoMessage() {} + +func (x *ServerCommandRequest) ProtoReflect() protoreflect.Message { + mi := &file_homeops_hub_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ServerCommandRequest.ProtoReflect.Descriptor instead. +func (*ServerCommandRequest) Descriptor() ([]byte, []int) { + return file_homeops_hub_proto_rawDescGZIP(), []int{5} +} + +func (x *ServerCommandRequest) GetRequestId() string { + if x != nil { + return x.RequestId + } + return "" +} + +func (x *ServerCommandRequest) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *ServerCommandRequest) GetArgs() map[string]string { + if x != nil { + return x.Args + } + return nil +} + +func (x *ServerCommandRequest) GetTimeoutSeconds() int64 { + if x != nil { + return x.TimeoutSeconds + } + return 0 +} + +type AgentEvent struct { + state protoimpl.MessageState `protogen:"open.v1"` + AgentId string `protobuf:"bytes,1,opt,name=agent_id,json=agentId,proto3" json:"agent_id,omitempty"` + // Types that are valid to be assigned to Event: + // + // *AgentEvent_Heartbeat + // *AgentEvent_CommandResponse + // *AgentEvent_Alert + Event isAgentEvent_Event `protobuf_oneof:"event"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AgentEvent) Reset() { + *x = AgentEvent{} + mi := &file_homeops_hub_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AgentEvent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AgentEvent) ProtoMessage() {} + +func (x *AgentEvent) ProtoReflect() protoreflect.Message { + mi := &file_homeops_hub_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AgentEvent.ProtoReflect.Descriptor instead. +func (*AgentEvent) Descriptor() ([]byte, []int) { + return file_homeops_hub_proto_rawDescGZIP(), []int{6} +} + +func (x *AgentEvent) GetAgentId() string { + if x != nil { + return x.AgentId + } + return "" +} + +func (x *AgentEvent) GetEvent() isAgentEvent_Event { + if x != nil { + return x.Event + } + return nil +} + +func (x *AgentEvent) GetHeartbeat() *Heartbeat { + if x != nil { + if x, ok := x.Event.(*AgentEvent_Heartbeat); ok { + return x.Heartbeat + } + } + return nil +} + +func (x *AgentEvent) GetCommandResponse() *CommandResponse { + if x != nil { + if x, ok := x.Event.(*AgentEvent_CommandResponse); ok { + return x.CommandResponse + } + } + return nil +} + +func (x *AgentEvent) GetAlert() *Alert { + if x != nil { + if x, ok := x.Event.(*AgentEvent_Alert); ok { + return x.Alert + } + } + return nil +} + +type isAgentEvent_Event interface { + isAgentEvent_Event() +} + +type AgentEvent_Heartbeat struct { + Heartbeat *Heartbeat `protobuf:"bytes,2,opt,name=heartbeat,proto3,oneof"` +} + +type AgentEvent_CommandResponse struct { + CommandResponse *CommandResponse `protobuf:"bytes,3,opt,name=command_response,json=commandResponse,proto3,oneof"` +} + +type AgentEvent_Alert struct { + Alert *Alert `protobuf:"bytes,4,opt,name=alert,proto3,oneof"` +} + +func (*AgentEvent_Heartbeat) isAgentEvent_Event() {} + +func (*AgentEvent_CommandResponse) isAgentEvent_Event() {} + +func (*AgentEvent_Alert) isAgentEvent_Event() {} + +type Heartbeat struct { + state protoimpl.MessageState `protogen:"open.v1"` + Timestamp int64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Metrics *SystemMetrics `protobuf:"bytes,2,opt,name=metrics,proto3" json:"metrics,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Heartbeat) Reset() { + *x = Heartbeat{} + mi := &file_homeops_hub_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Heartbeat) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Heartbeat) ProtoMessage() {} + +func (x *Heartbeat) ProtoReflect() protoreflect.Message { + mi := &file_homeops_hub_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Heartbeat.ProtoReflect.Descriptor instead. +func (*Heartbeat) Descriptor() ([]byte, []int) { + return file_homeops_hub_proto_rawDescGZIP(), []int{7} +} + +func (x *Heartbeat) GetTimestamp() int64 { + if x != nil { + return x.Timestamp + } + return 0 +} + +func (x *Heartbeat) GetMetrics() *SystemMetrics { + if x != nil { + return x.Metrics + } + return nil +} + +type SystemMetrics struct { + state protoimpl.MessageState `protogen:"open.v1"` + CpuUsage float32 `protobuf:"fixed32,1,opt,name=cpu_usage,json=cpuUsage,proto3" json:"cpu_usage,omitempty"` + MemoryUsage float32 `protobuf:"fixed32,2,opt,name=memory_usage,json=memoryUsage,proto3" json:"memory_usage,omitempty"` + DiskUsage float32 `protobuf:"fixed32,3,opt,name=disk_usage,json=diskUsage,proto3" json:"disk_usage,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *SystemMetrics) Reset() { + *x = SystemMetrics{} + mi := &file_homeops_hub_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *SystemMetrics) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SystemMetrics) ProtoMessage() {} + +func (x *SystemMetrics) ProtoReflect() protoreflect.Message { + mi := &file_homeops_hub_proto_msgTypes[8] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SystemMetrics.ProtoReflect.Descriptor instead. +func (*SystemMetrics) Descriptor() ([]byte, []int) { + return file_homeops_hub_proto_rawDescGZIP(), []int{8} +} + +func (x *SystemMetrics) GetCpuUsage() float32 { + if x != nil { + return x.CpuUsage + } + return 0 +} + +func (x *SystemMetrics) GetMemoryUsage() float32 { + if x != nil { + return x.MemoryUsage + } + return 0 +} + +func (x *SystemMetrics) GetDiskUsage() float32 { + if x != nil { + return x.DiskUsage + } + return 0 +} + +type CommandResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` + Success bool `protobuf:"varint,2,opt,name=success,proto3" json:"success,omitempty"` + Output string `protobuf:"bytes,3,opt,name=output,proto3" json:"output,omitempty"` + Error string `protobuf:"bytes,4,opt,name=error,proto3" json:"error,omitempty"` + ExecTimeMs int64 `protobuf:"varint,5,opt,name=exec_time_ms,json=execTimeMs,proto3" json:"exec_time_ms,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CommandResponse) Reset() { + *x = CommandResponse{} + mi := &file_homeops_hub_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CommandResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CommandResponse) ProtoMessage() {} + +func (x *CommandResponse) ProtoReflect() protoreflect.Message { + mi := &file_homeops_hub_proto_msgTypes[9] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CommandResponse.ProtoReflect.Descriptor instead. +func (*CommandResponse) Descriptor() ([]byte, []int) { + return file_homeops_hub_proto_rawDescGZIP(), []int{9} +} + +func (x *CommandResponse) GetRequestId() string { + if x != nil { + return x.RequestId + } + return "" +} + +func (x *CommandResponse) GetSuccess() bool { + if x != nil { + return x.Success + } + return false +} + +func (x *CommandResponse) GetOutput() string { + if x != nil { + return x.Output + } + return "" +} + +func (x *CommandResponse) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +func (x *CommandResponse) GetExecTimeMs() int64 { + if x != nil { + return x.ExecTimeMs + } + return 0 +} + +type Alert struct { + state protoimpl.MessageState `protogen:"open.v1"` + Timestamp int64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Level string `protobuf:"bytes,2,opt,name=level,proto3" json:"level,omitempty"` + Title string `protobuf:"bytes,3,opt,name=title,proto3" json:"title,omitempty"` + Description string `protobuf:"bytes,4,opt,name=description,proto3" json:"description,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Alert) Reset() { + *x = Alert{} + mi := &file_homeops_hub_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Alert) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Alert) ProtoMessage() {} + +func (x *Alert) ProtoReflect() protoreflect.Message { + mi := &file_homeops_hub_proto_msgTypes[10] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Alert.ProtoReflect.Descriptor instead. +func (*Alert) Descriptor() ([]byte, []int) { + return file_homeops_hub_proto_rawDescGZIP(), []int{10} +} + +func (x *Alert) GetTimestamp() int64 { + if x != nil { + return x.Timestamp + } + return 0 +} + +func (x *Alert) GetLevel() string { + if x != nil { + return x.Level + } + return "" +} + +func (x *Alert) GetTitle() string { + if x != nil { + return x.Title + } + return "" +} + +func (x *Alert) GetDescription() string { + if x != nil { + return x.Description + } + return "" +} + var File_homeops_hub_proto protoreflect.FileDescriptor const file_homeops_hub_proto_rawDesc = "" + @@ -350,10 +780,49 @@ const file_homeops_hub_proto_rawDesc = "" + "\x06reason\x18\x04 \x01(\tR\x06reason\"n\n" + "\x15RegisterAgentResponse\x12:\n" + "\x19heartbeat_interval_second\x18\x01 \x01(\x03R\x17heartbeatIntervalSecond\x12\x19\n" + - "\bagent_id\x18\x02 \x01(\tR\aagentId2x\n" + + "\bagent_id\x18\x02 \x01(\tR\aagentId\"\xe0\x01\n" + + "\x14ServerCommandRequest\x12\x1d\n" + + "\n" + + "request_id\x18\x01 \x01(\tR\trequestId\x12\x12\n" + + "\x04name\x18\x02 \x01(\tR\x04name\x123\n" + + "\x04args\x18\x03 \x03(\v2\x1f.ServerCommandRequest.ArgsEntryR\x04args\x12'\n" + + "\x0ftimeout_seconds\x18\x04 \x01(\x03R\x0etimeoutSeconds\x1a7\n" + + "\tArgsEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\xbb\x01\n" + + "\n" + + "AgentEvent\x12\x19\n" + + "\bagent_id\x18\x01 \x01(\tR\aagentId\x12*\n" + + "\theartbeat\x18\x02 \x01(\v2\n" + + ".HeartbeatH\x00R\theartbeat\x12=\n" + + "\x10command_response\x18\x03 \x01(\v2\x10.CommandResponseH\x00R\x0fcommandResponse\x12\x1e\n" + + "\x05alert\x18\x04 \x01(\v2\x06.AlertH\x00R\x05alertB\a\n" + + "\x05event\"S\n" + + "\tHeartbeat\x12\x1c\n" + + "\ttimestamp\x18\x01 \x01(\x03R\ttimestamp\x12(\n" + + "\ametrics\x18\x02 \x01(\v2\x0e.SystemMetricsR\ametrics\"n\n" + + "\rSystemMetrics\x12\x1b\n" + + "\tcpu_usage\x18\x01 \x01(\x02R\bcpuUsage\x12!\n" + + "\fmemory_usage\x18\x02 \x01(\x02R\vmemoryUsage\x12\x1d\n" + + "\n" + + "disk_usage\x18\x03 \x01(\x02R\tdiskUsage\"\x9a\x01\n" + + "\x0fCommandResponse\x12\x1d\n" + + "\n" + + "request_id\x18\x01 \x01(\tR\trequestId\x12\x18\n" + + "\asuccess\x18\x02 \x01(\bR\asuccess\x12\x16\n" + + "\x06output\x18\x03 \x01(\tR\x06output\x12\x14\n" + + "\x05error\x18\x04 \x01(\tR\x05error\x12 \n" + + "\fexec_time_ms\x18\x05 \x01(\x03R\n" + + "execTimeMs\"s\n" + + "\x05Alert\x12\x1c\n" + + "\ttimestamp\x18\x01 \x01(\x03R\ttimestamp\x12\x14\n" + + "\x05level\x18\x02 \x01(\tR\x05level\x12\x14\n" + + "\x05title\x18\x03 \x01(\tR\x05title\x12 \n" + + "\vdescription\x18\x04 \x01(\tR\vdescription2\xb6\x01\n" + "\x03Hub\x12/\n" + "\x04Ping\x12\x16.google.protobuf.Empty\x1a\r.PongResponse\"\x00\x12@\n" + - "\rRegisterAgent\x12\x15.RegisterAgentRequest\x1a\x16.RegisterAgentResponse\"\x00B HostInfo - 3, // 1: RegisterAgentRequest.capability:type_name -> Capability - 5, // 2: Hub.Ping:input_type -> google.protobuf.Empty - 1, // 3: Hub.RegisterAgent:input_type -> RegisterAgentRequest - 0, // 4: Hub.Ping:output_type -> PongResponse - 4, // 5: Hub.RegisterAgent:output_type -> RegisterAgentResponse - 4, // [4:6] is the sub-list for method output_type - 2, // [2:4] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 2, // 0: RegisterAgentRequest.host:type_name -> HostInfo + 3, // 1: RegisterAgentRequest.capability:type_name -> Capability + 11, // 2: ServerCommandRequest.args:type_name -> ServerCommandRequest.ArgsEntry + 7, // 3: AgentEvent.heartbeat:type_name -> Heartbeat + 9, // 4: AgentEvent.command_response:type_name -> CommandResponse + 10, // 5: AgentEvent.alert:type_name -> Alert + 8, // 6: Heartbeat.metrics:type_name -> SystemMetrics + 12, // 7: Hub.Ping:input_type -> google.protobuf.Empty + 1, // 8: Hub.RegisterAgent:input_type -> RegisterAgentRequest + 6, // 9: Hub.StreamConnection:input_type -> AgentEvent + 0, // 10: Hub.Ping:output_type -> PongResponse + 4, // 11: Hub.RegisterAgent:output_type -> RegisterAgentResponse + 5, // 12: Hub.StreamConnection:output_type -> ServerCommandRequest + 10, // [10:13] is the sub-list for method output_type + 7, // [7:10] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name } func init() { file_homeops_hub_proto_init() } @@ -395,13 +878,18 @@ func file_homeops_hub_proto_init() { if File_homeops_hub_proto != nil { return } + file_homeops_hub_proto_msgTypes[6].OneofWrappers = []any{ + (*AgentEvent_Heartbeat)(nil), + (*AgentEvent_CommandResponse)(nil), + (*AgentEvent_Alert)(nil), + } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_homeops_hub_proto_rawDesc), len(file_homeops_hub_proto_rawDesc)), NumEnums: 0, - NumMessages: 5, + NumMessages: 12, NumExtensions: 0, NumServices: 1, }, diff --git a/api/gen/homeops/hub_grpc.pb.go b/api/gen/homeops/hub_grpc.pb.go index e032d6c..d689399 100644 --- a/api/gen/homeops/hub_grpc.pb.go +++ b/api/gen/homeops/hub_grpc.pb.go @@ -20,8 +20,9 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - Hub_Ping_FullMethodName = "/Hub/Ping" - Hub_RegisterAgent_FullMethodName = "/Hub/RegisterAgent" + Hub_Ping_FullMethodName = "/Hub/Ping" + Hub_RegisterAgent_FullMethodName = "/Hub/RegisterAgent" + Hub_StreamConnection_FullMethodName = "/Hub/StreamConnection" ) // HubClient is the client API for Hub service. @@ -30,6 +31,7 @@ const ( type HubClient interface { Ping(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*PongResponse, error) RegisterAgent(ctx context.Context, in *RegisterAgentRequest, opts ...grpc.CallOption) (*RegisterAgentResponse, error) + StreamConnection(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[AgentEvent, ServerCommandRequest], error) } type hubClient struct { @@ -60,12 +62,26 @@ func (c *hubClient) RegisterAgent(ctx context.Context, in *RegisterAgentRequest, return out, nil } +func (c *hubClient) StreamConnection(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[AgentEvent, ServerCommandRequest], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &Hub_ServiceDesc.Streams[0], Hub_StreamConnection_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[AgentEvent, ServerCommandRequest]{ClientStream: stream} + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Hub_StreamConnectionClient = grpc.BidiStreamingClient[AgentEvent, ServerCommandRequest] + // HubServer is the server API for Hub service. // All implementations must embed UnimplementedHubServer // for forward compatibility. type HubServer interface { Ping(context.Context, *emptypb.Empty) (*PongResponse, error) RegisterAgent(context.Context, *RegisterAgentRequest) (*RegisterAgentResponse, error) + StreamConnection(grpc.BidiStreamingServer[AgentEvent, ServerCommandRequest]) error mustEmbedUnimplementedHubServer() } @@ -82,6 +98,9 @@ func (UnimplementedHubServer) Ping(context.Context, *emptypb.Empty) (*PongRespon func (UnimplementedHubServer) RegisterAgent(context.Context, *RegisterAgentRequest) (*RegisterAgentResponse, error) { return nil, status.Error(codes.Unimplemented, "method RegisterAgent not implemented") } +func (UnimplementedHubServer) StreamConnection(grpc.BidiStreamingServer[AgentEvent, ServerCommandRequest]) error { + return status.Error(codes.Unimplemented, "method StreamConnection not implemented") +} func (UnimplementedHubServer) mustEmbedUnimplementedHubServer() {} func (UnimplementedHubServer) testEmbeddedByValue() {} @@ -139,6 +158,13 @@ func _Hub_RegisterAgent_Handler(srv interface{}, ctx context.Context, dec func(i return interceptor(ctx, in, info, handler) } +func _Hub_StreamConnection_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(HubServer).StreamConnection(&grpc.GenericServerStream[AgentEvent, ServerCommandRequest]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Hub_StreamConnectionServer = grpc.BidiStreamingServer[AgentEvent, ServerCommandRequest] + // Hub_ServiceDesc is the grpc.ServiceDesc for Hub service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -155,6 +181,13 @@ var Hub_ServiceDesc = grpc.ServiceDesc{ Handler: _Hub_RegisterAgent_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "StreamConnection", + Handler: _Hub_StreamConnection_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, Metadata: "homeops/hub.proto", } diff --git a/api/proto/homeops/hub.proto b/api/proto/homeops/hub.proto index e7f32d9..301b992 100644 --- a/api/proto/homeops/hub.proto +++ b/api/proto/homeops/hub.proto @@ -7,6 +7,7 @@ option go_package = "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops;homeops" service Hub { rpc Ping (google.protobuf.Empty) returns (PongResponse) {} rpc RegisterAgent (RegisterAgentRequest) returns (RegisterAgentResponse) {} + rpc StreamConnection (stream AgentEvent) returns (stream ServerCommandRequest) {} } message PongResponse { @@ -37,4 +38,47 @@ message Capability { message RegisterAgentResponse { int64 heartbeat_interval_second = 1; string agent_id = 2; +} + +message ServerCommandRequest { + string request_id = 1; + string name = 2; + map args = 3; + int64 timeout_seconds = 4; +} + +message AgentEvent { + string agent_id = 1; + + oneof event { + Heartbeat heartbeat = 2; + CommandResponse command_response = 3; + Alert alert = 4; + } +} + +message Heartbeat { + int64 timestamp = 1; + SystemMetrics metrics = 2; +} + +message SystemMetrics { + float cpu_usage = 1; + float memory_usage = 2; + float disk_usage = 3; +} + +message CommandResponse { + string request_id = 1; + bool success = 2; + string output = 3; + string error = 4; + int64 exec_time_ms = 5; +} + +message Alert { + int64 timestamp = 1; + string level = 2; + string title = 3; + string description = 4; } \ No newline at end of file From c116a7089a84a6d9518219088b869ad2230168b5 Mon Sep 17 00:00:00 2001 From: lorsan Date: Tue, 5 May 2026 18:26:18 +0300 Subject: [PATCH 02/24] feat: new type structure for stream --- .../domain/{structure.go => agent.go} | 0 hub/internal/domain/stream.go | 34 +++++++++++++++++++ 2 files changed, 34 insertions(+) rename hub/internal/domain/{structure.go => agent.go} (100%) create mode 100644 hub/internal/domain/stream.go diff --git a/hub/internal/domain/structure.go b/hub/internal/domain/agent.go similarity index 100% rename from hub/internal/domain/structure.go rename to hub/internal/domain/agent.go diff --git a/hub/internal/domain/stream.go b/hub/internal/domain/stream.go new file mode 100644 index 0000000..a0d4499 --- /dev/null +++ b/hub/internal/domain/stream.go @@ -0,0 +1,34 @@ +package domain + +type AgentRequest struct { + RequestID string + Name string + Args map[string]string + TimeOut int +} + +type AgentResponse struct { + RequestID string + Success bool + Output string + Error string + ExecTimeMS int +} + +type AgentAlert struct { + Timestamp int + Level string + Title string + Description string +} + +type SystemMetrics struct { + CpuUsage float64 + MemoryUsage float64 + DiskUsage float64 +} + +type Heartbeat struct { + Timestamp string + Metrics SystemMetrics +} From f2d0924bca83360a74f6278c19b959fd0db40844 Mon Sep 17 00:00:00 2001 From: lorsan Date: Wed, 6 May 2026 21:38:58 +0300 Subject: [PATCH 03/24] feat: new table heartbeats --- .../20260506182346_create_heartbeat_table.down.sql | 3 +++ .../20260506182346_create_heartbeat_table.up.sql | 11 +++++++++++ 2 files changed, 14 insertions(+) create mode 100644 hub/internal/migrations/20260506182346_create_heartbeat_table.down.sql create mode 100644 hub/internal/migrations/20260506182346_create_heartbeat_table.up.sql diff --git a/hub/internal/migrations/20260506182346_create_heartbeat_table.down.sql b/hub/internal/migrations/20260506182346_create_heartbeat_table.down.sql new file mode 100644 index 0000000..d8cc83d --- /dev/null +++ b/hub/internal/migrations/20260506182346_create_heartbeat_table.down.sql @@ -0,0 +1,3 @@ +DROP INDEX idx_heartbeat_agent_id; + +DROP TABLE if exists heartbeats; \ No newline at end of file diff --git a/hub/internal/migrations/20260506182346_create_heartbeat_table.up.sql b/hub/internal/migrations/20260506182346_create_heartbeat_table.up.sql new file mode 100644 index 0000000..e848921 --- /dev/null +++ b/hub/internal/migrations/20260506182346_create_heartbeat_table.up.sql @@ -0,0 +1,11 @@ +CREATE TABLE heartbeats ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + agent_id VARCHAR(32) UNIQUE NOT NULL, + cpu_usage FLOAT, + memory_usage FLOAT, + disk_usage FLOAT, + heartbeat_timestamp TIMESTAMP + FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE +); + +CREATE UNIQUE INDEX idx_heartbeat_agent_id ON heartbeats (agent_id); \ No newline at end of file From 6277523b234ee779e579c6e9e42accceddeda822 Mon Sep 17 00:00:00 2001 From: lorsan Date: Sat, 9 May 2026 22:31:42 +0300 Subject: [PATCH 04/24] refactor: create heartbeats func in HubStore --- hub/internal/domain/stream.go | 14 +++- ...260506182346_create_heartbeat_table.up.sql | 8 +- hub/internal/store/mapper.go | 24 ++++++ hub/internal/store/sqlc/gen/agent.sql.go | 2 +- hub/internal/store/sqlc/gen/db.go | 2 +- hub/internal/store/sqlc/gen/heartbeat.sql.go | 75 +++++++++++++++++++ hub/internal/store/sqlc/gen/models.go | 11 ++- hub/internal/store/sqlc/queries/heartbeat.sql | 7 ++ hub/internal/store/store.go | 26 ++++++- 9 files changed, 158 insertions(+), 11 deletions(-) create mode 100644 hub/internal/store/sqlc/gen/heartbeat.sql.go create mode 100644 hub/internal/store/sqlc/queries/heartbeat.sql diff --git a/hub/internal/domain/stream.go b/hub/internal/domain/stream.go index a0d4499..7165398 100644 --- a/hub/internal/domain/stream.go +++ b/hub/internal/domain/stream.go @@ -1,5 +1,7 @@ package domain +import "time" + type AgentRequest struct { RequestID string Name string @@ -28,7 +30,15 @@ type SystemMetrics struct { DiskUsage float64 } -type Heartbeat struct { - Timestamp string +type HeartbeatModel struct { + ID int + AgentID string + Timestamp time.Time + Metrics SystemMetrics +} + +type CreateHeartbeatModel struct { + AgentID string + Timestamp time.Time Metrics SystemMetrics } diff --git a/hub/internal/migrations/20260506182346_create_heartbeat_table.up.sql b/hub/internal/migrations/20260506182346_create_heartbeat_table.up.sql index e848921..dd25c58 100644 --- a/hub/internal/migrations/20260506182346_create_heartbeat_table.up.sql +++ b/hub/internal/migrations/20260506182346_create_heartbeat_table.up.sql @@ -1,10 +1,10 @@ CREATE TABLE heartbeats ( id INTEGER PRIMARY KEY AUTOINCREMENT, agent_id VARCHAR(32) UNIQUE NOT NULL, - cpu_usage FLOAT, - memory_usage FLOAT, - disk_usage FLOAT, - heartbeat_timestamp TIMESTAMP + cpu_usage FLOAT NOT NULL , + memory_usage FLOAT NOT NULL , + disk_usage FLOAT NOT NULL , + heartbeat_timestamp TIMESTAMP NOT NULL, FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE ); diff --git a/hub/internal/store/mapper.go b/hub/internal/store/mapper.go index 6fc40b7..e78c9e4 100644 --- a/hub/internal/store/mapper.go +++ b/hub/internal/store/mapper.go @@ -76,3 +76,27 @@ func toDomainCapabilities(caps []byte) []domain.Capability { } return capabilities } + +func toDBHeartbeat(heartbeat domainHub.CreateHeartbeatModel) gen2.InsertHeartbeatParams { + + return gen2.InsertHeartbeatParams{ + AgentID: heartbeat.AgentID, + HeartbeatTimestamp: heartbeat.Timestamp, + CpuUsage: heartbeat.Metrics.CpuUsage, + DiskUsage: heartbeat.Metrics.DiskUsage, + MemoryUsage: heartbeat.Metrics.MemoryUsage, + } +} + +func toHeartBeatModel(heartbeat gen2.Heartbeat) domainHub.HeartbeatModel { + return domainHub.HeartbeatModel{ + Timestamp: heartbeat.HeartbeatTimestamp, + AgentID: heartbeat.AgentID, + ID: int(heartbeat.ID), + Metrics: domainHub.SystemMetrics{ + CpuUsage: heartbeat.CpuUsage, + DiskUsage: heartbeat.DiskUsage, + MemoryUsage: heartbeat.MemoryUsage, + }, + } +} diff --git a/hub/internal/store/sqlc/gen/agent.sql.go b/hub/internal/store/sqlc/gen/agent.sql.go index 704434b..636304f 100644 --- a/hub/internal/store/sqlc/gen/agent.sql.go +++ b/hub/internal/store/sqlc/gen/agent.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.31.1 // source: agent.sql package gen diff --git a/hub/internal/store/sqlc/gen/db.go b/hub/internal/store/sqlc/gen/db.go index d577e39..b6fcf6b 100644 --- a/hub/internal/store/sqlc/gen/db.go +++ b/hub/internal/store/sqlc/gen/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.31.1 package gen diff --git a/hub/internal/store/sqlc/gen/heartbeat.sql.go b/hub/internal/store/sqlc/gen/heartbeat.sql.go new file mode 100644 index 0000000..b98736f --- /dev/null +++ b/hub/internal/store/sqlc/gen/heartbeat.sql.go @@ -0,0 +1,75 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.31.1 +// source: heartbeat.sql + +package gen + +import ( + "context" + "time" +) + +const insertHeartbeat = `-- name: InsertHeartbeat :exec +INSERT INTO heartbeats (agent_id, cpu_usage, memory_usage, disk_usage, heartbeat_timestamp) +VALUES (?1, ?2, ?3, ?4, ?5) +` + +type InsertHeartbeatParams struct { + AgentID string + CpuUsage float64 + MemoryUsage float64 + DiskUsage float64 + HeartbeatTimestamp time.Time +} + +func (q *Queries) InsertHeartbeat(ctx context.Context, arg InsertHeartbeatParams) error { + _, err := q.db.ExecContext(ctx, insertHeartbeat, + arg.AgentID, + arg.CpuUsage, + arg.MemoryUsage, + arg.DiskUsage, + arg.HeartbeatTimestamp, + ) + return err +} + +const selectHeartbeatsAfter = `-- name: SelectHeartbeatsAfter :many +SELECT id, agent_id, cpu_usage, memory_usage, disk_usage, heartbeat_timestamp FROM heartbeats +WHERE agent_id = ?1 AND heartbeat_timestamp > ?2 +` + +type SelectHeartbeatsAfterParams struct { + AgentID string + Timestamp time.Time +} + +func (q *Queries) SelectHeartbeatsAfter(ctx context.Context, arg SelectHeartbeatsAfterParams) ([]Heartbeat, error) { + rows, err := q.db.QueryContext(ctx, selectHeartbeatsAfter, arg.AgentID, arg.Timestamp) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Heartbeat + for rows.Next() { + var i Heartbeat + if err := rows.Scan( + &i.ID, + &i.AgentID, + &i.CpuUsage, + &i.MemoryUsage, + &i.DiskUsage, + &i.HeartbeatTimestamp, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} diff --git a/hub/internal/store/sqlc/gen/models.go b/hub/internal/store/sqlc/gen/models.go index b40fd8b..19c5aef 100644 --- a/hub/internal/store/sqlc/gen/models.go +++ b/hub/internal/store/sqlc/gen/models.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.31.1 package gen @@ -19,3 +19,12 @@ type Agent struct { Capabilities *string RegisteredAt time.Time } + +type Heartbeat struct { + ID int64 + AgentID string + CpuUsage float64 + MemoryUsage float64 + DiskUsage float64 + HeartbeatTimestamp time.Time +} diff --git a/hub/internal/store/sqlc/queries/heartbeat.sql b/hub/internal/store/sqlc/queries/heartbeat.sql new file mode 100644 index 0000000..936bae2 --- /dev/null +++ b/hub/internal/store/sqlc/queries/heartbeat.sql @@ -0,0 +1,7 @@ +-- name: InsertHeartbeat :exec +INSERT INTO heartbeats (agent_id, cpu_usage, memory_usage, disk_usage, heartbeat_timestamp) +VALUES (:agent_id, :cpu_usage, :memory_usage, :disk_usage, :heartbeat_timestamp); + +-- name: SelectHeartbeatsAfter :many +SELECT * FROM heartbeats +WHERE agent_id = :agent_id AND heartbeat_timestamp > :timestamp; \ No newline at end of file diff --git a/hub/internal/store/store.go b/hub/internal/store/store.go index 3cfa60b..9d9fd94 100644 --- a/hub/internal/store/store.go +++ b/hub/internal/store/store.go @@ -3,6 +3,7 @@ package store import ( "context" "database/sql" + "time" domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" "github.com/lorsanstand/HomeOps-Hub/hub/internal/store/sqlc/gen" @@ -21,8 +22,8 @@ func (h *HubStore) NewAgent(ctx context.Context, agent domainHub.CreateAgentMode return h.queries.CreateAgent(ctx, toDBAgent(agent)) } -func (h *HubStore) GetAgentByAgentID(ctx context.Context, AgentID string) (domainHub.AgentModel, error) { - data, err := h.queries.GetAgentByAgentID(ctx, AgentID) +func (h *HubStore) GetAgentByAgentID(ctx context.Context, agentID string) (domainHub.AgentModel, error) { + data, err := h.queries.GetAgentByAgentID(ctx, agentID) if err != nil { return domainHub.AgentModel{}, err } @@ -34,3 +35,24 @@ func (h *HubStore) UpdateAgentByID(ctx context.Context, ID int, updateAgent doma data.ID = int64(ID) return h.queries.UpdateAgentByID(ctx, data) } + +func (h *HubStore) CreateHeartbeat(ctx context.Context, heartbeat domainHub.CreateHeartbeatModel) error { + data := toDBHeartbeat(heartbeat) + return h.queries.InsertHeartbeat(ctx, data) +} + +func (h *HubStore) GetHeartbeatsByIDAfter(ctx context.Context, agentID string, timestamp time.Time) ([]domainHub.HeartbeatModel, error) { + data := gen.SelectHeartbeatsAfterParams{AgentID: agentID, Timestamp: timestamp} + heartbeats, err := h.queries.SelectHeartbeatsAfter(ctx, data) + if err != nil { + return []domainHub.HeartbeatModel{}, err + } + + heartbeatsModel := make([]domainHub.HeartbeatModel, len(heartbeats)) + + for i, heartbeat := range heartbeats { + heartbeatsModel[i] = toHeartBeatModel(heartbeat) + } + + return heartbeatsModel, nil +} From 54ed8fd0d2056a736eb4991b4c1524974c8c3ea6 Mon Sep 17 00:00:00 2001 From: lorsan Date: Mon, 11 May 2026 09:07:32 +0300 Subject: [PATCH 05/24] feat: create heartbeat processing system --- .../service/connection_manager/agent.go | 98 +++++++++++++++++++ .../service/connection_manager/interface.go | 19 ++++ .../service/connection_manager/mapper.go | 22 +++++ 3 files changed, 139 insertions(+) create mode 100644 hub/internal/service/connection_manager/agent.go create mode 100644 hub/internal/service/connection_manager/interface.go create mode 100644 hub/internal/service/connection_manager/mapper.go diff --git a/hub/internal/service/connection_manager/agent.go b/hub/internal/service/connection_manager/agent.go new file mode 100644 index 0000000..ab7fac6 --- /dev/null +++ b/hub/internal/service/connection_manager/agent.go @@ -0,0 +1,98 @@ +package connection_manager + +import ( + "context" + "fmt" + "io" + "time" + + pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" + domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" + "github.com/rs/zerolog" +) + +type statusAgent interface { + Offline() + Online() +} + +// использовать sync.Pool что бы переиспользвоать этот обьект +type AgentConnection struct { + stream streamConn + store heartbeatStore + log zerolog.Logger + status statusAgent + AgentID string + // Не безопасно, если нужно будет добавлять новый канал и брать какой то все сломается. Исправить + responseChan map[string]chan domainHub.AgentResponse +} + +func newAgentConnection(agentID string, stream streamConn, store heartbeatStore, status statusAgent, logger zerolog.Logger) *AgentConnection { + response := make(map[string]chan domainHub.AgentResponse) + return &AgentConnection{stream: stream, responseChan: response, store: store, log: logger, AgentID: agentID, status: status} +} + +func (a *AgentConnection) Listen() error { + ctx := a.stream.Context() + defer a.status.Offline() + + heartbeatsChan := make(chan domainHub.CreateHeartbeatModel, 5) + go a.listenHeartbeat(ctx, heartbeatsChan) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + agentEvent, err := a.stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return fmt.Errorf("stream error: %w", err) + } + + switch x := agentEvent.Event.(type) { + case *pb.AgentEvent_Heartbeat: + heartbeat := toCreateHeartbeatModel(a.AgentID, x) + + a.log.Debug(). + Str("agentID", heartbeat.AgentID). + Float64("cpu usage", heartbeat.Metrics.CpuUsage). + Float64("disk usage", heartbeat.Metrics.DiskUsage). + Float64("memory usage", heartbeat.Metrics.MemoryUsage).Msg("") + + heartbeatsChan <- heartbeat + } + } + } +} + +func (a *AgentConnection) listenHeartbeat(ctx context.Context, heartbeats <-chan domainHub.CreateHeartbeatModel) { + lastHeartbeat := 0 + timer := time.NewTicker(5 * time.Second) + defer timer.Stop() + + for { + select { + case <-timer.C: + if lastHeartbeat < 30 { + lastHeartbeat += 5 + a.status.Offline() + continue + } + a.log.Warn().Str("agentID", a.AgentID).Msg("agent did not send heartbeat") + a.stream.Close() + return + case heartbeat := <-heartbeats: + a.status.Online() + lastHeartbeat = 0 + err := a.store.CreateHeartbeat(ctx, heartbeat) + if err != nil { + a.log.Error().Err(err).Str("agentID", heartbeat.AgentID).Msg("failed to write heartbeat") + } + case <-ctx.Done(): + return + } + } +} diff --git a/hub/internal/service/connection_manager/interface.go b/hub/internal/service/connection_manager/interface.go new file mode 100644 index 0000000..c21a83f --- /dev/null +++ b/hub/internal/service/connection_manager/interface.go @@ -0,0 +1,19 @@ +package connection_manager + +import ( + "context" + + pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" + domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" +) + +type streamConn interface { + Send(request *pb.ServerCommandRequest, err error) + Recv() (*pb.AgentEvent, error) + Context() context.Context + Close() error +} + +type heartbeatStore interface { + CreateHeartbeat(ctx context.Context, heartbeat domainHub.CreateHeartbeatModel) error +} diff --git a/hub/internal/service/connection_manager/mapper.go b/hub/internal/service/connection_manager/mapper.go new file mode 100644 index 0000000..2abd303 --- /dev/null +++ b/hub/internal/service/connection_manager/mapper.go @@ -0,0 +1,22 @@ +package connection_manager + +import ( + "time" + + pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" + domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" +) + +func toCreateHeartbeatModel(agentID string, heartbeat *pb.AgentEvent_Heartbeat) domainHub.CreateHeartbeatModel { + timestamp := time.Unix(heartbeat.Heartbeat.Timestamp, 0) + + return domainHub.CreateHeartbeatModel{ + AgentID: agentID, + Timestamp: timestamp, + Metrics: domainHub.SystemMetrics{ + MemoryUsage: float64(heartbeat.Heartbeat.Metrics.MemoryUsage), + CpuUsage: float64(heartbeat.Heartbeat.Metrics.CpuUsage), + DiskUsage: float64(heartbeat.Heartbeat.Metrics.DiskUsage), + }, + } +} From d27eff2c57e1fd077430190c1b0574e61778a131 Mon Sep 17 00:00:00 2001 From: lorsan Date: Mon, 11 May 2026 10:21:29 +0300 Subject: [PATCH 06/24] refactor: switch response chan map for store --- .../service/connection_manager/agent.go | 22 ++++++------ .../service/connection_manager/store/agent.go | 35 +++++++++++++++++++ 2 files changed, 46 insertions(+), 11 deletions(-) create mode 100644 hub/internal/service/connection_manager/store/agent.go diff --git a/hub/internal/service/connection_manager/agent.go b/hub/internal/service/connection_manager/agent.go index ab7fac6..624681b 100644 --- a/hub/internal/service/connection_manager/agent.go +++ b/hub/internal/service/connection_manager/agent.go @@ -8,6 +8,7 @@ import ( pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" + "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager/store" "github.com/rs/zerolog" ) @@ -18,18 +19,17 @@ type statusAgent interface { // использовать sync.Pool что бы переиспользвоать этот обьект type AgentConnection struct { - stream streamConn - store heartbeatStore - log zerolog.Logger - status statusAgent - AgentID string - // Не безопасно, если нужно будет добавлять новый канал и брать какой то все сломается. Исправить - responseChan map[string]chan domainHub.AgentResponse + stream streamConn + heartbeat heartbeatStore + log zerolog.Logger + status statusAgent + AgentID string + response *store.ResponseStore } -func newAgentConnection(agentID string, stream streamConn, store heartbeatStore, status statusAgent, logger zerolog.Logger) *AgentConnection { - response := make(map[string]chan domainHub.AgentResponse) - return &AgentConnection{stream: stream, responseChan: response, store: store, log: logger, AgentID: agentID, status: status} +func newAgentConnection(agentID string, stream streamConn, heartbeat heartbeatStore, status statusAgent, logger zerolog.Logger) *AgentConnection { + response := store.NewResponseStore() + return &AgentConnection{stream: stream, response: response, heartbeat: heartbeat, log: logger, AgentID: agentID, status: status} } func (a *AgentConnection) Listen() error { @@ -87,7 +87,7 @@ func (a *AgentConnection) listenHeartbeat(ctx context.Context, heartbeats <-chan case heartbeat := <-heartbeats: a.status.Online() lastHeartbeat = 0 - err := a.store.CreateHeartbeat(ctx, heartbeat) + err := a.heartbeat.CreateHeartbeat(ctx, heartbeat) if err != nil { a.log.Error().Err(err).Str("agentID", heartbeat.AgentID).Msg("failed to write heartbeat") } diff --git a/hub/internal/service/connection_manager/store/agent.go b/hub/internal/service/connection_manager/store/agent.go new file mode 100644 index 0000000..730d96c --- /dev/null +++ b/hub/internal/service/connection_manager/store/agent.go @@ -0,0 +1,35 @@ +package store + +import ( + "sync" + + domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" +) + +type ResponseStore struct { + store map[string]chan domainHub.AgentResponse + mutex sync.RWMutex +} + +func NewResponseStore() *ResponseStore { + data := make(map[string]chan domainHub.AgentResponse) + return &ResponseStore{store: data} +} + +func (r *ResponseStore) Write(responseID string, channel chan domainHub.AgentResponse) { + r.mutex.Lock() + defer r.mutex.Unlock() + r.store[responseID] = channel +} + +func (r *ResponseStore) Read(responseID string) chan domainHub.AgentResponse { + r.mutex.RLock() + defer r.mutex.RUnlock() + return r.store[responseID] +} + +func (r *ResponseStore) Delete(responseID string) { + r.mutex.Lock() + defer r.mutex.Unlock() + delete(r.store, responseID) +} From 6046b8ae9d530b7946a982b7c48bb80f061bc246 Mon Sep 17 00:00:00 2001 From: lorsan Date: Mon, 11 May 2026 17:40:02 +0300 Subject: [PATCH 07/24] feat: create execute command --- .../service/connection_manager/agent.go | 83 ++++++++++++++----- .../service/connection_manager/domain.go | 6 ++ .../service/connection_manager/interface.go | 2 +- .../service/connection_manager/mapper.go | 19 +++++ .../service/connection_manager/store/agent.go | 5 +- 5 files changed, 93 insertions(+), 22 deletions(-) create mode 100644 hub/internal/service/connection_manager/domain.go diff --git a/hub/internal/service/connection_manager/agent.go b/hub/internal/service/connection_manager/agent.go index 624681b..c205849 100644 --- a/hub/internal/service/connection_manager/agent.go +++ b/hub/internal/service/connection_manager/agent.go @@ -6,6 +6,7 @@ import ( "io" "time" + "github.com/google/uuid" pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager/store" @@ -25,50 +26,61 @@ type AgentConnection struct { status statusAgent AgentID string response *store.ResponseStore + ctx context.Context + cancel context.CancelFunc } func newAgentConnection(agentID string, stream streamConn, heartbeat heartbeatStore, status statusAgent, logger zerolog.Logger) *AgentConnection { response := store.NewResponseStore() - return &AgentConnection{stream: stream, response: response, heartbeat: heartbeat, log: logger, AgentID: agentID, status: status} + logger = logger.With().Str("agentID", agentID).Logger() + ctx, cancel := context.WithCancel(stream.Context()) + return &AgentConnection{stream: stream, response: response, heartbeat: heartbeat, log: logger, AgentID: agentID, status: status, ctx: ctx, cancel: cancel} } func (a *AgentConnection) Listen() error { - ctx := a.stream.Context() defer a.status.Offline() heartbeatsChan := make(chan domainHub.CreateHeartbeatModel, 5) - go a.listenHeartbeat(ctx, heartbeatsChan) + go a.listenHeartbeat(heartbeatsChan) + defer close(heartbeatsChan) for { select { - case <-ctx.Done(): - return ctx.Err() + case <-a.ctx.Done(): + err := a.stream.Close() + if err != nil { + a.log.Warn().Err(err).Msg("failed stream close") + } + return a.ctx.Err() default: agentEvent, err := a.stream.Recv() if err == io.EOF { + a.cancel() return nil } if err != nil { - return fmt.Errorf("stream error: %w", err) + a.cancel() + return fmt.Errorf("stream: %w", err) } switch x := agentEvent.Event.(type) { case *pb.AgentEvent_Heartbeat: heartbeat := toCreateHeartbeatModel(a.AgentID, x) - - a.log.Debug(). - Str("agentID", heartbeat.AgentID). - Float64("cpu usage", heartbeat.Metrics.CpuUsage). - Float64("disk usage", heartbeat.Metrics.DiskUsage). - Float64("memory usage", heartbeat.Metrics.MemoryUsage).Msg("") - heartbeatsChan <- heartbeat + case *pb.AgentEvent_CommandResponse: + response := toAgentResponse(x) + ch, ok := a.response.Read(response.RequestID) + if !ok { + a.log.Warn().Str("requestID", response.RequestID).Msg("not found channel for send response") + continue + } + ch <- response } } } } -func (a *AgentConnection) listenHeartbeat(ctx context.Context, heartbeats <-chan domainHub.CreateHeartbeatModel) { +func (a *AgentConnection) listenHeartbeat(heartbeats <-chan domainHub.CreateHeartbeatModel) { lastHeartbeat := 0 timer := time.NewTicker(5 * time.Second) defer timer.Stop() @@ -81,18 +93,51 @@ func (a *AgentConnection) listenHeartbeat(ctx context.Context, heartbeats <-chan a.status.Offline() continue } - a.log.Warn().Str("agentID", a.AgentID).Msg("agent did not send heartbeat") - a.stream.Close() + + a.log.Warn().Msg("agent not send heartbeat") + a.cancel() return case heartbeat := <-heartbeats: + a.log.Debug(). + Float64("cpu usage", heartbeat.Metrics.CpuUsage). + Float64("disk usage", heartbeat.Metrics.DiskUsage). + Float64("memory usage", heartbeat.Metrics.MemoryUsage).Msg("") + a.status.Online() lastHeartbeat = 0 - err := a.heartbeat.CreateHeartbeat(ctx, heartbeat) + + err := a.heartbeat.CreateHeartbeat(a.ctx, heartbeat) if err != nil { - a.log.Error().Err(err).Str("agentID", heartbeat.AgentID).Msg("failed to write heartbeat") + a.log.Error().Err(err).Msg("failed to write heartbeat") } - case <-ctx.Done(): + case <-a.ctx.Done(): return } } } + +func (a *AgentConnection) Execute(ctx context.Context, request domainHub.AgentRequest) (domainHub.AgentResponse, error) { + requestID := uuid.New().String() + ch := make(chan domainHub.AgentResponse, 1) + defer close(ch) + + a.response.Write(requestID, ch) + defer a.response.Delete(requestID) + + err := a.stream.Send(new(toGRPCCommandRequest(request))) + if err != nil { + return domainHub.AgentResponse{}, fmt.Errorf("execute command: %w", err) + } + + a.log.Info().Str("requestID", requestID).Str("command", request.Name).Msg("send command") + + select { + case <-a.ctx.Done(): + return domainHub.AgentResponse{}, fmt.Errorf("connection close") + case <-ctx.Done(): + return domainHub.AgentResponse{}, fmt.Errorf("request timeout") + case response := <-ch: + a.log.Info().Str("requestID", response.RequestID).Msg("received response") + return response, nil + } +} diff --git a/hub/internal/service/connection_manager/domain.go b/hub/internal/service/connection_manager/domain.go new file mode 100644 index 0000000..bdab261 --- /dev/null +++ b/hub/internal/service/connection_manager/domain.go @@ -0,0 +1,6 @@ +package connection_manager + +type agentStatus struct { + AgentID string + Online bool +} diff --git a/hub/internal/service/connection_manager/interface.go b/hub/internal/service/connection_manager/interface.go index c21a83f..2ce54c8 100644 --- a/hub/internal/service/connection_manager/interface.go +++ b/hub/internal/service/connection_manager/interface.go @@ -8,7 +8,7 @@ import ( ) type streamConn interface { - Send(request *pb.ServerCommandRequest, err error) + Send(request *pb.ServerCommandRequest) error Recv() (*pb.AgentEvent, error) Context() context.Context Close() error diff --git a/hub/internal/service/connection_manager/mapper.go b/hub/internal/service/connection_manager/mapper.go index 2abd303..12783e0 100644 --- a/hub/internal/service/connection_manager/mapper.go +++ b/hub/internal/service/connection_manager/mapper.go @@ -20,3 +20,22 @@ func toCreateHeartbeatModel(agentID string, heartbeat *pb.AgentEvent_Heartbeat) }, } } + +func toGRPCCommandRequest(request domainHub.AgentRequest) pb.ServerCommandRequest { + return pb.ServerCommandRequest{ + RequestId: request.RequestID, + Name: request.Name, + TimeoutSeconds: int64(request.TimeOut), + Args: request.Args, + } +} + +func toAgentResponse(response *pb.AgentEvent_CommandResponse) domainHub.AgentResponse { + return domainHub.AgentResponse{ + RequestID: response.CommandResponse.RequestId, + Success: response.CommandResponse.Success, + Error: response.CommandResponse.Error, + Output: response.CommandResponse.Output, + ExecTimeMS: int(response.CommandResponse.ExecTimeMs), + } +} diff --git a/hub/internal/service/connection_manager/store/agent.go b/hub/internal/service/connection_manager/store/agent.go index 730d96c..a7ddd32 100644 --- a/hub/internal/service/connection_manager/store/agent.go +++ b/hub/internal/service/connection_manager/store/agent.go @@ -22,10 +22,11 @@ func (r *ResponseStore) Write(responseID string, channel chan domainHub.AgentRes r.store[responseID] = channel } -func (r *ResponseStore) Read(responseID string) chan domainHub.AgentResponse { +func (r *ResponseStore) Read(responseID string) (chan domainHub.AgentResponse, bool) { r.mutex.RLock() defer r.mutex.RUnlock() - return r.store[responseID] + ch, ok := r.store[responseID] + return ch, ok } func (r *ResponseStore) Delete(responseID string) { From b7893603ea8e97468db7a5d48cdfed880364ff6e Mon Sep 17 00:00:00 2001 From: lorsan Date: Thu, 14 May 2026 16:27:59 +0300 Subject: [PATCH 08/24] fix: delete id in response --- hub/internal/domain/stream.go | 8 +++----- hub/internal/service/connection_manager/agent.go | 10 +++++----- hub/internal/service/connection_manager/mapper.go | 5 ++--- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/hub/internal/domain/stream.go b/hub/internal/domain/stream.go index 7165398..8870ae4 100644 --- a/hub/internal/domain/stream.go +++ b/hub/internal/domain/stream.go @@ -3,14 +3,12 @@ package domain import "time" type AgentRequest struct { - RequestID string - Name string - Args map[string]string - TimeOut int + Name string + Args map[string]string + TimeOut int } type AgentResponse struct { - RequestID string Success bool Output string Error string diff --git a/hub/internal/service/connection_manager/agent.go b/hub/internal/service/connection_manager/agent.go index c205849..47cfcdf 100644 --- a/hub/internal/service/connection_manager/agent.go +++ b/hub/internal/service/connection_manager/agent.go @@ -68,12 +68,12 @@ func (a *AgentConnection) Listen() error { heartbeat := toCreateHeartbeatModel(a.AgentID, x) heartbeatsChan <- heartbeat case *pb.AgentEvent_CommandResponse: - response := toAgentResponse(x) - ch, ok := a.response.Read(response.RequestID) + ch, ok := a.response.Read(x.CommandResponse.RequestId) if !ok { - a.log.Warn().Str("requestID", response.RequestID).Msg("not found channel for send response") + a.log.Warn().Str("requestID", x.CommandResponse.RequestId).Msg("not found channel for send response") continue } + response := toAgentResponse(x) ch <- response } } @@ -124,7 +124,7 @@ func (a *AgentConnection) Execute(ctx context.Context, request domainHub.AgentRe a.response.Write(requestID, ch) defer a.response.Delete(requestID) - err := a.stream.Send(new(toGRPCCommandRequest(request))) + err := a.stream.Send(new(toGRPCCommandRequest(requestID, request))) if err != nil { return domainHub.AgentResponse{}, fmt.Errorf("execute command: %w", err) } @@ -137,7 +137,7 @@ func (a *AgentConnection) Execute(ctx context.Context, request domainHub.AgentRe case <-ctx.Done(): return domainHub.AgentResponse{}, fmt.Errorf("request timeout") case response := <-ch: - a.log.Info().Str("requestID", response.RequestID).Msg("received response") + a.log.Info().Str("requestID", requestID).Msg("received response") return response, nil } } diff --git a/hub/internal/service/connection_manager/mapper.go b/hub/internal/service/connection_manager/mapper.go index 12783e0..68fa0ac 100644 --- a/hub/internal/service/connection_manager/mapper.go +++ b/hub/internal/service/connection_manager/mapper.go @@ -21,9 +21,9 @@ func toCreateHeartbeatModel(agentID string, heartbeat *pb.AgentEvent_Heartbeat) } } -func toGRPCCommandRequest(request domainHub.AgentRequest) pb.ServerCommandRequest { +func toGRPCCommandRequest(requestID string, request domainHub.AgentRequest) pb.ServerCommandRequest { return pb.ServerCommandRequest{ - RequestId: request.RequestID, + RequestId: requestID, Name: request.Name, TimeoutSeconds: int64(request.TimeOut), Args: request.Args, @@ -32,7 +32,6 @@ func toGRPCCommandRequest(request domainHub.AgentRequest) pb.ServerCommandReques func toAgentResponse(response *pb.AgentEvent_CommandResponse) domainHub.AgentResponse { return domainHub.AgentResponse{ - RequestID: response.CommandResponse.RequestId, Success: response.CommandResponse.Success, Error: response.CommandResponse.Error, Output: response.CommandResponse.Output, From 8306623645bcdfdcf00d1102115af52a745c2fd2 Mon Sep 17 00:00:00 2001 From: lorsan Date: Thu, 14 May 2026 19:52:19 +0300 Subject: [PATCH 09/24] fix: stream close in agent connection --- .../service/connection_manager/agent.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/hub/internal/service/connection_manager/agent.go b/hub/internal/service/connection_manager/agent.go index 47cfcdf..c932fce 100644 --- a/hub/internal/service/connection_manager/agent.go +++ b/hub/internal/service/connection_manager/agent.go @@ -44,22 +44,23 @@ func (a *AgentConnection) Listen() error { go a.listenHeartbeat(heartbeatsChan) defer close(heartbeatsChan) + defer func() { + err := a.Close() + if err != nil { + a.log.Warn().Err(err).Msg("failed stream close") + } + }() + for { select { case <-a.ctx.Done(): - err := a.stream.Close() - if err != nil { - a.log.Warn().Err(err).Msg("failed stream close") - } return a.ctx.Err() default: agentEvent, err := a.stream.Recv() if err == io.EOF { - a.cancel() return nil } if err != nil { - a.cancel() return fmt.Errorf("stream: %w", err) } @@ -141,3 +142,8 @@ func (a *AgentConnection) Execute(ctx context.Context, request domainHub.AgentRe return response, nil } } + +func (a *AgentConnection) Close() error { + a.cancel() + return a.stream.Close() +} From 942b8f4ddc8eccc3ae95d243a5125aaa3058c96c Mon Sep 17 00:00:00 2001 From: lorsan Date: Thu, 14 May 2026 21:51:51 +0300 Subject: [PATCH 10/24] feat: add test for agent connection --- .../service/connection_manager/agent_test.go | 176 ++++++++++++++++++ 1 file changed, 176 insertions(+) create mode 100644 hub/internal/service/connection_manager/agent_test.go diff --git a/hub/internal/service/connection_manager/agent_test.go b/hub/internal/service/connection_manager/agent_test.go new file mode 100644 index 0000000..799572b --- /dev/null +++ b/hub/internal/service/connection_manager/agent_test.go @@ -0,0 +1,176 @@ +package connection_manager + +import ( + "context" + "io" + "testing" + "time" + + pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" + domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" + "github.com/rs/zerolog" + "gotest.tools/v3/assert" +) + +type streamMock struct { + recvCh chan *pb.AgentEvent + sendCh chan *pb.ServerCommandRequest + closeCh chan struct{} + ctx context.Context +} + +func (f *streamMock) Context() context.Context { + return f.ctx +} + +func (f *streamMock) Send(request *pb.ServerCommandRequest) error { + f.sendCh <- request + return nil +} + +func (f *streamMock) Recv() (*pb.AgentEvent, error) { + select { + case msg, ok := <-f.recvCh: + if !ok { + return nil, io.EOF + } + return msg, nil + case <-f.ctx.Done(): + return nil, f.ctx.Err() + } +} + +func (f *streamMock) Close() error { + select { + case f.closeCh <- struct{}{}: + default: + } + return nil +} + +type heartBeatMock struct { + countUse int + doneCh chan struct{} +} + +func (h *heartBeatMock) CreateHeartbeat(ctx context.Context, heartbeat domainHub.CreateHeartbeatModel) error { + h.countUse += 1 + select { + case h.doneCh <- struct{}{}: + default: + } + return nil +} + +type statusMock struct { + online bool + doneCh chan struct{} +} + +func (s *statusMock) Offline() { + s.online = false +} + +func (s *statusMock) Online() { + s.online = true + select { + case s.doneCh <- struct{}{}: + default: + } +} + +func TestAgentConnection_Heartbeat(t *testing.T) { + // Создаем вся поля для Agent Connection + // Нужно как то вынести в отдельную функцию + sendStream := make(chan *pb.ServerCommandRequest) + recvStream := make(chan *pb.AgentEvent) + ctx, cancel := context.WithCancel(context.Background()) + + stream := streamMock{recvCh: recvStream, sendCh: sendStream, ctx: ctx, closeCh: make(chan struct{}, 1)} + heartbeat := heartBeatMock{doneCh: make(chan struct{}, 1)} + status := statusMock{doneCh: make(chan struct{}, 1)} + + agent := newAgentConnection("123", &stream, &heartbeat, &status, zerolog.New(nil)) + go agent.Listen() + + recvStream <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_Heartbeat{ + Heartbeat: &pb.Heartbeat{ + Timestamp: time.Now().Unix(), + Metrics: &pb.SystemMetrics{CpuUsage: 0.5, MemoryUsage: 0.3, DiskUsage: 0.7}, + }}} + + select { + case <-heartbeat.doneCh: + case <-time.After(500 * time.Millisecond): + t.Fatal("timeout waiting for heartbeat") + } + + select { + case <-status.doneCh: + case <-time.After(500 * time.Millisecond): + t.Fatal("timeout waiting for status online") + } + + assert.Equal(t, heartbeat.countUse, 1) + assert.Equal(t, status.online, true) + + cancel() + + select { + case <-stream.closeCh: + case <-time.After(500 * time.Millisecond): + t.Fatal("timeout waiting for close") + } + + assert.Equal(t, status.online, false) +} + +func TestAgentConnection_Execute(t *testing.T) { + sendStream := make(chan *pb.ServerCommandRequest) + recvStream := make(chan *pb.AgentEvent) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + stream := streamMock{recvCh: recvStream, sendCh: sendStream, ctx: ctx} + heartbeat := heartBeatMock{doneCh: make(chan struct{}, 1)} + status := statusMock{doneCh: make(chan struct{}, 1)} + + agent := newAgentConnection("123", &stream, &heartbeat, &status, zerolog.New(nil)) + go agent.Listen() + + // Данные для проверки + requestID := make(chan domainHub.AgentResponse) + output := "test output" + name := "test name" + + go func() { + response, _ := agent.Execute(ctx, domainHub.AgentRequest{ + Name: name, + Args: nil, + TimeOut: 0, + }) + + requestID <- response + }() + + request := <-sendStream + assert.Equal(t, name, request.Name) + + recvStream <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_CommandResponse{ + CommandResponse: &pb.CommandResponse{ + RequestId: request.RequestId, + Success: true, + Output: output, + }}} + + select { + case response := <-requestID: + assert.Equal(t, output, response.Output) + + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for response") + } +} + +//Написать тест когда heartbeat не приходит и все закрывается +//Написать тест при закрытии соединения Execute завершается From 86ff71f33c95ec11479b49bb4d0e8282fbda33da Mon Sep 17 00:00:00 2001 From: lorsan Date: Fri, 15 May 2026 19:51:21 +0300 Subject: [PATCH 11/24] refactor: errors moved to errors.go --- .../service/connection_manager/agent.go | 31 ++++++++++--------- .../service/connection_manager/errors.go | 5 +++ 2 files changed, 21 insertions(+), 15 deletions(-) create mode 100644 hub/internal/service/connection_manager/errors.go diff --git a/hub/internal/service/connection_manager/agent.go b/hub/internal/service/connection_manager/agent.go index c932fce..74d93e5 100644 --- a/hub/internal/service/connection_manager/agent.go +++ b/hub/internal/service/connection_manager/agent.go @@ -20,21 +20,22 @@ type statusAgent interface { // использовать sync.Pool что бы переиспользвоать этот обьект type AgentConnection struct { - stream streamConn - heartbeat heartbeatStore - log zerolog.Logger - status statusAgent - AgentID string - response *store.ResponseStore - ctx context.Context - cancel context.CancelFunc + stream streamConn + heartbeat heartbeatStore + log zerolog.Logger + status statusAgent + AgentID string + response *store.ResponseStore + ctx context.Context + cancel context.CancelFunc + heartbeatTimeoutMS int } -func newAgentConnection(agentID string, stream streamConn, heartbeat heartbeatStore, status statusAgent, logger zerolog.Logger) *AgentConnection { +func newAgentConnection(agentID string, stream streamConn, heartbeat heartbeatStore, status statusAgent, heartbeatTimeoutMS int, logger zerolog.Logger) *AgentConnection { response := store.NewResponseStore() logger = logger.With().Str("agentID", agentID).Logger() ctx, cancel := context.WithCancel(stream.Context()) - return &AgentConnection{stream: stream, response: response, heartbeat: heartbeat, log: logger, AgentID: agentID, status: status, ctx: ctx, cancel: cancel} + return &AgentConnection{stream: stream, response: response, heartbeat: heartbeat, log: logger, AgentID: agentID, status: status, ctx: ctx, cancel: cancel, heartbeatTimeoutMS: heartbeatTimeoutMS} } func (a *AgentConnection) Listen() error { @@ -83,14 +84,14 @@ func (a *AgentConnection) Listen() error { func (a *AgentConnection) listenHeartbeat(heartbeats <-chan domainHub.CreateHeartbeatModel) { lastHeartbeat := 0 - timer := time.NewTicker(5 * time.Second) + timer := time.NewTicker(time.Duration(a.heartbeatTimeoutMS) * time.Millisecond) defer timer.Stop() for { select { case <-timer.C: - if lastHeartbeat < 30 { - lastHeartbeat += 5 + if lastHeartbeat < 4 { + lastHeartbeat += 1 a.status.Offline() continue } @@ -134,9 +135,9 @@ func (a *AgentConnection) Execute(ctx context.Context, request domainHub.AgentRe select { case <-a.ctx.Done(): - return domainHub.AgentResponse{}, fmt.Errorf("connection close") + return domainHub.AgentResponse{}, ConnectionCloseErr case <-ctx.Done(): - return domainHub.AgentResponse{}, fmt.Errorf("request timeout") + return domainHub.AgentResponse{}, ctx.Err() case response := <-ch: a.log.Info().Str("requestID", requestID).Msg("received response") return response, nil diff --git a/hub/internal/service/connection_manager/errors.go b/hub/internal/service/connection_manager/errors.go new file mode 100644 index 0000000..56d2a88 --- /dev/null +++ b/hub/internal/service/connection_manager/errors.go @@ -0,0 +1,5 @@ +package connection_manager + +import "errors" + +var ConnectionCloseErr error = errors.New("connection close") From a206b0335dba6feac4b1584de6330a727e97295f Mon Sep 17 00:00:00 2001 From: lorsan Date: Fri, 15 May 2026 20:53:48 +0300 Subject: [PATCH 12/24] fix: with heartbeat timeout Listen did not complete --- hub/internal/service/connection_manager/agent.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hub/internal/service/connection_manager/agent.go b/hub/internal/service/connection_manager/agent.go index 74d93e5..34a008b 100644 --- a/hub/internal/service/connection_manager/agent.go +++ b/hub/internal/service/connection_manager/agent.go @@ -97,7 +97,7 @@ func (a *AgentConnection) listenHeartbeat(heartbeats <-chan domainHub.CreateHear } a.log.Warn().Msg("agent not send heartbeat") - a.cancel() + a.Close() return case heartbeat := <-heartbeats: a.log.Debug(). From 0972a7b29051d07e83a8f1098e578a4828dc8be0 Mon Sep 17 00:00:00 2001 From: lorsan Date: Fri, 15 May 2026 20:54:54 +0300 Subject: [PATCH 13/24] feat: new agent connections test Heartbeat timeout --- .../service/connection_manager/agent_test.go | 51 +++++++++++++++++-- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/hub/internal/service/connection_manager/agent_test.go b/hub/internal/service/connection_manager/agent_test.go index 799572b..dca76c7 100644 --- a/hub/internal/service/connection_manager/agent_test.go +++ b/hub/internal/service/connection_manager/agent_test.go @@ -3,6 +3,7 @@ package connection_manager import ( "context" "io" + "sync" "testing" "time" @@ -43,6 +44,7 @@ func (f *streamMock) Recv() (*pb.AgentEvent, error) { func (f *streamMock) Close() error { select { case f.closeCh <- struct{}{}: + close(f.recvCh) default: } return nil @@ -82,7 +84,7 @@ func (s *statusMock) Online() { func TestAgentConnection_Heartbeat(t *testing.T) { // Создаем вся поля для Agent Connection // Нужно как то вынести в отдельную функцию - sendStream := make(chan *pb.ServerCommandRequest) + sendStream := make(chan *pb.ServerCommandRequest, 1) recvStream := make(chan *pb.AgentEvent) ctx, cancel := context.WithCancel(context.Background()) @@ -90,7 +92,7 @@ func TestAgentConnection_Heartbeat(t *testing.T) { heartbeat := heartBeatMock{doneCh: make(chan struct{}, 1)} status := statusMock{doneCh: make(chan struct{}, 1)} - agent := newAgentConnection("123", &stream, &heartbeat, &status, zerolog.New(nil)) + agent := newAgentConnection("123", &stream, &heartbeat, &status, 5000, zerolog.New(nil)) go agent.Listen() recvStream <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_Heartbeat{ @@ -126,7 +128,7 @@ func TestAgentConnection_Heartbeat(t *testing.T) { } func TestAgentConnection_Execute(t *testing.T) { - sendStream := make(chan *pb.ServerCommandRequest) + sendStream := make(chan *pb.ServerCommandRequest, 1) recvStream := make(chan *pb.AgentEvent) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -135,7 +137,7 @@ func TestAgentConnection_Execute(t *testing.T) { heartbeat := heartBeatMock{doneCh: make(chan struct{}, 1)} status := statusMock{doneCh: make(chan struct{}, 1)} - agent := newAgentConnection("123", &stream, &heartbeat, &status, zerolog.New(nil)) + agent := newAgentConnection("123", &stream, &heartbeat, &status, 5000, zerolog.New(nil)) go agent.Listen() // Данные для проверки @@ -172,5 +174,44 @@ func TestAgentConnection_Execute(t *testing.T) { } } -//Написать тест когда heartbeat не приходит и все закрывается +// Написать тест когда heartbeat не приходит и все закрывается +func TestAgentConnection_HeartbeatTimeout(t *testing.T) { + sendStream := make(chan *pb.ServerCommandRequest, 1) + recvStream := make(chan *pb.AgentEvent) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + stream := streamMock{recvCh: recvStream, sendCh: sendStream, ctx: ctx, closeCh: make(chan struct{}, 1)} + heartbeat := heartBeatMock{doneCh: make(chan struct{}, 1)} + status := statusMock{doneCh: make(chan struct{}, 1)} + var wg sync.WaitGroup + + agent := newAgentConnection("123", &stream, &heartbeat, &status, 200, zerolog.New(nil)) + + wg.Add(2) + go func() { + err := agent.Listen() + assert.NilError(t, err) + wg.Done() + }() + + go func() { + _, err := agent.Execute(ctx, domainHub.AgentRequest{ + Name: "test", + Args: nil, + TimeOut: 0, + }) + assert.ErrorIs(t, err, ConnectionCloseErr) + wg.Done() + }() + + wg.Wait() + + select { + case <-stream.closeCh: + case <-time.After(500 * time.Millisecond): + t.Fatal("timeout waiting for close") + } +} + //Написать тест при закрытии соединения Execute завершается From f1042ac9ea44aa57e0069a33a8992c2927840fe9 Mon Sep 17 00:00:00 2001 From: lorsan Date: Sat, 16 May 2026 14:49:44 +0300 Subject: [PATCH 14/24] feat: new agent connections test and refactoring tests --- .../service/connection_manager/agent_test.go | 350 ++++++++++++++---- 1 file changed, 283 insertions(+), 67 deletions(-) diff --git a/hub/internal/service/connection_manager/agent_test.go b/hub/internal/service/connection_manager/agent_test.go index dca76c7..5e68497 100644 --- a/hub/internal/service/connection_manager/agent_test.go +++ b/hub/internal/service/connection_manager/agent_test.go @@ -2,6 +2,7 @@ package connection_manager import ( "context" + "errors" "io" "sync" "testing" @@ -18,6 +19,10 @@ type streamMock struct { sendCh chan *pb.ServerCommandRequest closeCh chan struct{} ctx context.Context + mu sync.Mutex + sendErr error + recvErr error + closeOnce sync.Once } func (f *streamMock) Context() context.Context { @@ -25,11 +30,23 @@ func (f *streamMock) Context() context.Context { } func (f *streamMock) Send(request *pb.ServerCommandRequest) error { + f.mu.Lock() + err := f.sendErr + f.mu.Unlock() + if err != nil { + return err + } f.sendCh <- request return nil } func (f *streamMock) Recv() (*pb.AgentEvent, error) { + f.mu.Lock() + recvErr := f.recvErr + f.mu.Unlock() + if recvErr != nil { + return nil, recvErr + } select { case msg, ok := <-f.recvCh: if !ok { @@ -44,101 +61,153 @@ func (f *streamMock) Recv() (*pb.AgentEvent, error) { func (f *streamMock) Close() error { select { case f.closeCh <- struct{}{}: - close(f.recvCh) default: } + f.closeOnce.Do(func() { + close(f.recvCh) + }) return nil } type heartBeatMock struct { + mu sync.Mutex countUse int doneCh chan struct{} + err error } func (h *heartBeatMock) CreateHeartbeat(ctx context.Context, heartbeat domainHub.CreateHeartbeatModel) error { + h.mu.Lock() h.countUse += 1 + err := h.err + h.mu.Unlock() select { case h.doneCh <- struct{}{}: default: } - return nil + return err } type statusMock struct { + mu sync.Mutex online bool doneCh chan struct{} } func (s *statusMock) Offline() { + s.mu.Lock() s.online = false + s.mu.Unlock() } func (s *statusMock) Online() { + s.mu.Lock() s.online = true + s.mu.Unlock() select { case s.doneCh <- struct{}{}: default: } } -func TestAgentConnection_Heartbeat(t *testing.T) { - // Создаем вся поля для Agent Connection - // Нужно как то вынести в отдельную функцию - sendStream := make(chan *pb.ServerCommandRequest, 1) - recvStream := make(chan *pb.AgentEvent) +func (s *statusMock) IsOnline() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.online +} + +type agentTestHarness struct { + ctx context.Context + cancel context.CancelFunc + stream *streamMock + heartbeat *heartBeatMock + status *statusMock + agent *AgentConnection + recvCh chan *pb.AgentEvent + sendCh chan *pb.ServerCommandRequest +} + +func newAgentTestHarness(t *testing.T, heartbeatTimeoutMS int) *agentTestHarness { + t.Helper() + sendStream := make(chan *pb.ServerCommandRequest, 4) + recvStream := make(chan *pb.AgentEvent, 4) ctx, cancel := context.WithCancel(context.Background()) - stream := streamMock{recvCh: recvStream, sendCh: sendStream, ctx: ctx, closeCh: make(chan struct{}, 1)} - heartbeat := heartBeatMock{doneCh: make(chan struct{}, 1)} - status := statusMock{doneCh: make(chan struct{}, 1)} + stream := &streamMock{recvCh: recvStream, sendCh: sendStream, ctx: ctx, closeCh: make(chan struct{}, 1)} + heartbeat := &heartBeatMock{doneCh: make(chan struct{}, 2)} + status := &statusMock{doneCh: make(chan struct{}, 2)} - agent := newAgentConnection("123", &stream, &heartbeat, &status, 5000, zerolog.New(nil)) - go agent.Listen() + agent := newAgentConnection("123", stream, heartbeat, status, heartbeatTimeoutMS, zerolog.New(nil)) - recvStream <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_Heartbeat{ + t.Cleanup(func() { + cancel() + }) + + return &agentTestHarness{ + ctx: ctx, cancel: cancel, stream: stream, heartbeat: heartbeat, status: status, + agent: agent, recvCh: recvStream, sendCh: sendStream, + } +} + +func waitFor(t *testing.T, ch <-chan struct{}, timeout time.Duration, message string) { + t.Helper() + select { + case <-ch: + case <-time.After(timeout): + t.Fatal(message) + } +} + +func waitForClose(t *testing.T, closeCh <-chan struct{}, timeout time.Duration) { + t.Helper() + select { + case <-closeCh: + case <-time.After(timeout): + t.Fatal("timeout waiting for close") + } +} + +func commandResponseEvent(requestID, output string) *pb.AgentEvent { + return &pb.AgentEvent{ + AgentId: "agent-1", + Event: &pb.AgentEvent_CommandResponse{ + CommandResponse: &pb.CommandResponse{ + RequestId: requestID, + Success: true, + Output: output, + }, + }, + } +} + +func TestAgentConnection_Heartbeat(t *testing.T) { + h := newAgentTestHarness(t, 5000) + go h.agent.Listen() + + h.recvCh <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_Heartbeat{ Heartbeat: &pb.Heartbeat{ Timestamp: time.Now().Unix(), Metrics: &pb.SystemMetrics{CpuUsage: 0.5, MemoryUsage: 0.3, DiskUsage: 0.7}, }}} - select { - case <-heartbeat.doneCh: - case <-time.After(500 * time.Millisecond): - t.Fatal("timeout waiting for heartbeat") - } + waitFor(t, h.heartbeat.doneCh, 500*time.Millisecond, "timeout waiting for heartbeat") + waitFor(t, h.status.doneCh, 500*time.Millisecond, "timeout waiting for status online") - select { - case <-status.doneCh: - case <-time.After(500 * time.Millisecond): - t.Fatal("timeout waiting for status online") - } + h.heartbeat.mu.Lock() + count := h.heartbeat.countUse + h.heartbeat.mu.Unlock() - assert.Equal(t, heartbeat.countUse, 1) - assert.Equal(t, status.online, true) + assert.Equal(t, count, 1) + assert.Equal(t, h.status.IsOnline(), true) - cancel() - - select { - case <-stream.closeCh: - case <-time.After(500 * time.Millisecond): - t.Fatal("timeout waiting for close") - } - - assert.Equal(t, status.online, false) + h.cancel() + waitForClose(t, h.stream.closeCh, 500*time.Millisecond) + assert.Equal(t, h.status.IsOnline(), false) } func TestAgentConnection_Execute(t *testing.T) { - sendStream := make(chan *pb.ServerCommandRequest, 1) - recvStream := make(chan *pb.AgentEvent) - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - stream := streamMock{recvCh: recvStream, sendCh: sendStream, ctx: ctx} - heartbeat := heartBeatMock{doneCh: make(chan struct{}, 1)} - status := statusMock{doneCh: make(chan struct{}, 1)} - - agent := newAgentConnection("123", &stream, &heartbeat, &status, 5000, zerolog.New(nil)) - go agent.Listen() + h := newAgentTestHarness(t, 5000) + go h.agent.Listen() // Данные для проверки requestID := make(chan domainHub.AgentResponse) @@ -146,7 +215,7 @@ func TestAgentConnection_Execute(t *testing.T) { name := "test name" go func() { - response, _ := agent.Execute(ctx, domainHub.AgentRequest{ + response, _ := h.agent.Execute(h.ctx, domainHub.AgentRequest{ Name: name, Args: nil, TimeOut: 0, @@ -155,10 +224,10 @@ func TestAgentConnection_Execute(t *testing.T) { requestID <- response }() - request := <-sendStream + request := <-h.sendCh assert.Equal(t, name, request.Name) - recvStream <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_CommandResponse{ + h.recvCh <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_CommandResponse{ CommandResponse: &pb.CommandResponse{ RequestId: request.RequestId, Success: true, @@ -174,29 +243,19 @@ func TestAgentConnection_Execute(t *testing.T) { } } -// Написать тест когда heartbeat не приходит и все закрывается func TestAgentConnection_HeartbeatTimeout(t *testing.T) { - sendStream := make(chan *pb.ServerCommandRequest, 1) - recvStream := make(chan *pb.AgentEvent) - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - stream := streamMock{recvCh: recvStream, sendCh: sendStream, ctx: ctx, closeCh: make(chan struct{}, 1)} - heartbeat := heartBeatMock{doneCh: make(chan struct{}, 1)} - status := statusMock{doneCh: make(chan struct{}, 1)} + h := newAgentTestHarness(t, 200) var wg sync.WaitGroup - agent := newAgentConnection("123", &stream, &heartbeat, &status, 200, zerolog.New(nil)) - wg.Add(2) go func() { - err := agent.Listen() + err := h.agent.Listen() assert.NilError(t, err) wg.Done() }() go func() { - _, err := agent.Execute(ctx, domainHub.AgentRequest{ + _, err := h.agent.Execute(h.ctx, domainHub.AgentRequest{ Name: "test", Args: nil, TimeOut: 0, @@ -207,11 +266,168 @@ func TestAgentConnection_HeartbeatTimeout(t *testing.T) { wg.Wait() - select { - case <-stream.closeCh: - case <-time.After(500 * time.Millisecond): - t.Fatal("timeout waiting for close") - } + waitForClose(t, h.stream.closeCh, 500*time.Millisecond) } -//Написать тест при закрытии соединения Execute завершается +func TestAgentConnection_ConnectionClose(t *testing.T) { + h := newAgentTestHarness(t, 5000) + var wg sync.WaitGroup + + wg.Add(2) + go func() { + err := h.agent.Listen() + assert.ErrorIs(t, err, context.Canceled) + wg.Done() + }() + + go func() { + _, err := h.agent.Execute(context.Background(), domainHub.AgentRequest{ + Name: "test", + Args: nil, + TimeOut: 0, + }) + assert.ErrorIs(t, err, ConnectionCloseErr) + wg.Done() + }() + + h.cancel() + + wg.Wait() + + waitForClose(t, h.stream.closeCh, 500*time.Millisecond) +} + +func TestAgentConnection_ExecuteClose(t *testing.T) { + h := newAgentTestHarness(t, 5000) + ctxExecute, cancelExecute := context.WithCancel(context.Background()) + t.Cleanup(cancelExecute) + + executeCh := make(chan struct{}) + go h.agent.Listen() + + go func() { + _, err := h.agent.Execute(ctxExecute, domainHub.AgentRequest{ + Name: "test", + Args: nil, + TimeOut: 0, + }) + + assert.ErrorIs(t, err, context.Canceled) + executeCh <- struct{}{} + }() + + cancelExecute() + waitFor(t, executeCh, 500*time.Millisecond, "timeout waiting for execute close") +} + +func TestAgentConnection_ListenEOF(t *testing.T) { + h := newAgentTestHarness(t, 5000) + h.stream.Close() + + err := h.agent.Listen() + assert.NilError(t, err) + waitForClose(t, h.stream.closeCh, 500*time.Millisecond) +} + +func TestAgentConnection_ListenRecvError(t *testing.T) { + h := newAgentTestHarness(t, 5000) + + recvErr := errors.New("recv failure") + h.stream.mu.Lock() + h.stream.recvErr = recvErr + h.stream.mu.Unlock() + + err := h.agent.Listen() + assert.ErrorIs(t, err, recvErr) +} + +func TestAgentConnection_ExecuteSendError(t *testing.T) { + h := newAgentTestHarness(t, 5000) + h.stream.mu.Lock() + h.stream.sendErr = errors.New("send failure") + h.stream.mu.Unlock() + + _, err := h.agent.Execute(h.ctx, domainHub.AgentRequest{Name: "test"}) + assert.ErrorContains(t, err, "execute command") +} + +func TestAgentConnection_ExecuteContextCanceled(t *testing.T) { + h := newAgentTestHarness(t, 5000) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + _, err := h.agent.Execute(ctx, domainHub.AgentRequest{Name: "test"}) + assert.ErrorIs(t, err, context.Canceled) +} + +func TestAgentConnection_ExecuteConnectionCanceled(t *testing.T) { + h := newAgentTestHarness(t, 5000) + h.cancel() + + _, err := h.agent.Execute(context.Background(), domainHub.AgentRequest{Name: "test"}) + assert.ErrorIs(t, err, ConnectionCloseErr) +} + +func TestAgentConnection_UnknownResponseID(t *testing.T) { + h := newAgentTestHarness(t, 5000) + go h.agent.Listen() + + h.recvCh <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_CommandResponse{ + CommandResponse: &pb.CommandResponse{ + RequestId: "unknown", + Success: true, + Output: "ok", + }}} + + h.cancel() + waitForClose(t, h.stream.closeCh, 500*time.Millisecond) +} + +func TestAgentConnection_HeartbeatErrorDoesNotStop(t *testing.T) { + h := newAgentTestHarness(t, 5000) + h.heartbeat.mu.Lock() + h.heartbeat.err = errors.New("db error") + h.heartbeat.mu.Unlock() + + go h.agent.Listen() + h.recvCh <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_Heartbeat{ + Heartbeat: &pb.Heartbeat{ + Timestamp: time.Now().Unix(), + Metrics: &pb.SystemMetrics{CpuUsage: 0.2, MemoryUsage: 0.1, DiskUsage: 0.3}, + }}} + + waitFor(t, h.heartbeat.doneCh, 500*time.Millisecond, "timeout waiting for heartbeat") + h.cancel() +} + +func TestAgentConnection_ConcurrentExecute(t *testing.T) { + h := newAgentTestHarness(t, 5000) + go h.agent.Listen() + + responses := make(chan domainHub.AgentResponse, 2) + + go func() { + resp, _ := h.agent.Execute(h.ctx, domainHub.AgentRequest{Name: "cmd-1"}) + responses <- resp + }() + go func() { + resp, _ := h.agent.Execute(h.ctx, domainHub.AgentRequest{Name: "cmd-2"}) + responses <- resp + }() + + first := <-h.sendCh + second := <-h.sendCh + + // ответы приходят в обратном порядке + h.recvCh <- commandResponseEvent(second.RequestId, "second") + h.recvCh <- commandResponseEvent(first.RequestId, "first") + + resp1 := <-responses + resp2 := <-responses + + assert.Assert(t, resp1.Output == "first" || resp1.Output == "second") + assert.Assert(t, resp2.Output == "first" || resp2.Output == "second") + assert.Assert(t, resp1.Output != resp2.Output) + + h.cancel() +} From 5b297d5c1f49f36141c8031ad6de51a31dd4c597 Mon Sep 17 00:00:00 2001 From: lorsan Date: Sat, 16 May 2026 18:33:36 +0300 Subject: [PATCH 15/24] feat: create agent connection store --- .../connection_manager/store/manager.go | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 hub/internal/service/connection_manager/store/manager.go diff --git a/hub/internal/service/connection_manager/store/manager.go b/hub/internal/service/connection_manager/store/manager.go new file mode 100644 index 0000000..b690f36 --- /dev/null +++ b/hub/internal/service/connection_manager/store/manager.go @@ -0,0 +1,42 @@ +package store + +import ( + "sync" + + "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager" +) + +type AgentConnStore struct { + mutex sync.RWMutex + store map[string]*connection_manager.AgentConnection +} + +func NewAgentConnStore() *AgentConnStore { + return &AgentConnStore{store: make(map[string]*connection_manager.AgentConnection)} +} + +func (a *AgentConnStore) Get(agentID string) *connection_manager.AgentConnection { + a.mutex.RLock() + defer a.mutex.RUnlock() + return a.store[agentID] +} + +func (a *AgentConnStore) Add(agentConn *connection_manager.AgentConnection) { + a.mutex.Lock() + defer a.mutex.Unlock() + a.store[agentConn.AgentID] = agentConn +} + +func (a *AgentConnStore) Delete(agentID string) { + a.mutex.Lock() + defer a.mutex.Unlock() + delete(a.store, agentID) +} + +func (a *AgentConnStore) Pop(agentID string) *connection_manager.AgentConnection { + a.mutex.Lock() + defer a.mutex.Unlock() + agent := a.store[agentID] + delete(a.store, agentID) + return agent +} From bef8409b8c38baa09aaefd8e50e4bb3ce6630d59 Mon Sep 17 00:00:00 2001 From: lorsan Date: Sun, 17 May 2026 12:37:51 +0300 Subject: [PATCH 16/24] feat: add connection manager --- go.mod | 5 +- .../service/connection_manager/agent.go | 13 +-- .../service/connection_manager/agent_test.go | 20 ++--- .../service/connection_manager/domain.go | 6 -- .../service/connection_manager/errors.go | 4 +- .../service/connection_manager/interface.go | 9 ++ .../service/connection_manager/manager.go | 71 ++++++++++++++++ .../service/connection_manager/store.go | 82 +++++++++++++++++++ .../service/connection_manager/store/agent.go | 36 -------- .../connection_manager/store/manager.go | 42 ---------- 10 files changed, 181 insertions(+), 107 deletions(-) delete mode 100644 hub/internal/service/connection_manager/domain.go create mode 100644 hub/internal/service/connection_manager/manager.go create mode 100644 hub/internal/service/connection_manager/store.go delete mode 100644 hub/internal/service/connection_manager/store/agent.go delete mode 100644 hub/internal/service/connection_manager/store/manager.go diff --git a/go.mod b/go.mod index 292ad1a..bda85b7 100644 --- a/go.mod +++ b/go.mod @@ -5,12 +5,14 @@ go 1.26.1 require ( github.com/docker/docker v28.5.2+incompatible github.com/golang-migrate/migrate/v4 v4.19.1 + github.com/google/uuid v1.6.0 github.com/ilyakaznacheev/cleanenv v1.5.0 github.com/mattn/go-sqlite3 v1.14.44 github.com/rs/zerolog v1.35.1 google.golang.org/grpc v1.81.0 google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v3 v3.0.1 + gotest.tools/v3 v3.5.2 ) require ( @@ -26,7 +28,7 @@ require ( github.com/felixge/httpsnoop v1.0.4 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/google/uuid v1.6.0 // indirect + github.com/google/go-cmp v0.7.0 // indirect github.com/joho/godotenv v1.5.1 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/lib/pq v1.12.3 // indirect @@ -54,7 +56,6 @@ require ( golang.org/x/time v0.15.0 // indirect golang.org/x/tools v0.43.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260504160031-60b97b32f348 // indirect - gotest.tools/v3 v3.5.2 // indirect lukechampine.com/uint128 v1.2.0 // indirect modernc.org/cc/v3 v3.36.3 // indirect modernc.org/ccgo/v3 v3.16.9 // indirect diff --git a/hub/internal/service/connection_manager/agent.go b/hub/internal/service/connection_manager/agent.go index 34a008b..ca736a4 100644 --- a/hub/internal/service/connection_manager/agent.go +++ b/hub/internal/service/connection_manager/agent.go @@ -9,30 +9,23 @@ import ( "github.com/google/uuid" pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" - "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager/store" "github.com/rs/zerolog" ) -type statusAgent interface { - Offline() - Online() -} - -// использовать sync.Pool что бы переиспользвоать этот обьект type AgentConnection struct { stream streamConn heartbeat heartbeatStore log zerolog.Logger status statusAgent AgentID string - response *store.ResponseStore + response *ResponseStore ctx context.Context cancel context.CancelFunc heartbeatTimeoutMS int } func newAgentConnection(agentID string, stream streamConn, heartbeat heartbeatStore, status statusAgent, heartbeatTimeoutMS int, logger zerolog.Logger) *AgentConnection { - response := store.NewResponseStore() + response := NewResponseStore() logger = logger.With().Str("agentID", agentID).Logger() ctx, cancel := context.WithCancel(stream.Context()) return &AgentConnection{stream: stream, response: response, heartbeat: heartbeat, log: logger, AgentID: agentID, status: status, ctx: ctx, cancel: cancel, heartbeatTimeoutMS: heartbeatTimeoutMS} @@ -135,7 +128,7 @@ func (a *AgentConnection) Execute(ctx context.Context, request domainHub.AgentRe select { case <-a.ctx.Done(): - return domainHub.AgentResponse{}, ConnectionCloseErr + return domainHub.AgentResponse{}, ErrConnectionClose case <-ctx.Done(): return domainHub.AgentResponse{}, ctx.Err() case response := <-ch: diff --git a/hub/internal/service/connection_manager/agent_test.go b/hub/internal/service/connection_manager/agent_test.go index 5e68497..2f3bc3c 100644 --- a/hub/internal/service/connection_manager/agent_test.go +++ b/hub/internal/service/connection_manager/agent_test.go @@ -15,13 +15,13 @@ import ( ) type streamMock struct { - recvCh chan *pb.AgentEvent - sendCh chan *pb.ServerCommandRequest - closeCh chan struct{} - ctx context.Context - mu sync.Mutex - sendErr error - recvErr error + recvCh chan *pb.AgentEvent + sendCh chan *pb.ServerCommandRequest + closeCh chan struct{} + ctx context.Context + mu sync.Mutex + sendErr error + recvErr error closeOnce sync.Once } @@ -260,7 +260,7 @@ func TestAgentConnection_HeartbeatTimeout(t *testing.T) { Args: nil, TimeOut: 0, }) - assert.ErrorIs(t, err, ConnectionCloseErr) + assert.ErrorIs(t, err, ErrConnectionClose) wg.Done() }() @@ -286,7 +286,7 @@ func TestAgentConnection_ConnectionClose(t *testing.T) { Args: nil, TimeOut: 0, }) - assert.ErrorIs(t, err, ConnectionCloseErr) + assert.ErrorIs(t, err, ErrConnectionClose) wg.Done() }() @@ -365,7 +365,7 @@ func TestAgentConnection_ExecuteConnectionCanceled(t *testing.T) { h.cancel() _, err := h.agent.Execute(context.Background(), domainHub.AgentRequest{Name: "test"}) - assert.ErrorIs(t, err, ConnectionCloseErr) + assert.ErrorIs(t, err, ErrConnectionClose) } func TestAgentConnection_UnknownResponseID(t *testing.T) { diff --git a/hub/internal/service/connection_manager/domain.go b/hub/internal/service/connection_manager/domain.go deleted file mode 100644 index bdab261..0000000 --- a/hub/internal/service/connection_manager/domain.go +++ /dev/null @@ -1,6 +0,0 @@ -package connection_manager - -type agentStatus struct { - AgentID string - Online bool -} diff --git a/hub/internal/service/connection_manager/errors.go b/hub/internal/service/connection_manager/errors.go index 56d2a88..0d8ef9a 100644 --- a/hub/internal/service/connection_manager/errors.go +++ b/hub/internal/service/connection_manager/errors.go @@ -2,4 +2,6 @@ package connection_manager import "errors" -var ConnectionCloseErr error = errors.New("connection close") +var ErrConnectionClose = errors.New("connection close") + +var ErrNotFoundConn = errors.New("agent connection not found") diff --git a/hub/internal/service/connection_manager/interface.go b/hub/internal/service/connection_manager/interface.go index 2ce54c8..19cc737 100644 --- a/hub/internal/service/connection_manager/interface.go +++ b/hub/internal/service/connection_manager/interface.go @@ -17,3 +17,12 @@ type streamConn interface { type heartbeatStore interface { CreateHeartbeat(ctx context.Context, heartbeat domainHub.CreateHeartbeatModel) error } + +type statusAgent interface { + Offline() + Online() +} + +type statusNotifier interface { + New(AgentID string) statusAgent +} diff --git a/hub/internal/service/connection_manager/manager.go b/hub/internal/service/connection_manager/manager.go new file mode 100644 index 0000000..baacfd6 --- /dev/null +++ b/hub/internal/service/connection_manager/manager.go @@ -0,0 +1,71 @@ +package connection_manager + +import ( + "context" + "fmt" + + "github.com/rs/zerolog" + "google.golang.org/grpc/metadata" +) + +const heartbeatTimeoutMS = 6000 + +type ConnectionManager struct { + heartbeat heartbeatStore + log zerolog.Logger + status statusNotifier + agentConnStore *AgentConnStore +} + +func NewConnectionManager(heartbeat heartbeatStore, status statusNotifier, logger zerolog.Logger) *ConnectionManager { + return &ConnectionManager{heartbeat: heartbeat, log: logger, status: status, agentConnStore: NewAgentConnStore()} +} + +func (c *ConnectionManager) NewConnection(stream streamConn) { + AgentID, err := agentIDFromMetadata(stream.Context()) + if err != nil { + c.log.Error().Err(err).Msg("missing agent id in metadata") + return + } + c.log.Info().Str("agentID", AgentID).Msg("connection accepted") + + status := c.status.New(AgentID) + + agent := newAgentConnection(AgentID, stream, c.heartbeat, status, heartbeatTimeoutMS, c.log) + c.agentConnStore.Add(AgentID, agent) + go func() { + c.log.Debug().Str("agentID", AgentID).Msg("start listening") + err := agent.Listen() + if err != nil { + c.log.Error().Err(err).Msg("listening agent stopped") + } + c.agentConnStore.Delete(AgentID) + }() +} + +func (c *ConnectionManager) GetConnection(AgentID string) (*AgentConnection, error) { + agent := c.agentConnStore.Get(AgentID) + if agent == nil { + return nil, ErrNotFoundConn + } + + return agent, nil +} + +func (c *ConnectionManager) GetAllAgentID() []string { + return c.agentConnStore.GetAllAgentID() +} + +func agentIDFromMetadata(ctx context.Context) (string, error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return "", fmt.Errorf("metadata not found") + } + + values := md.Get("agent-id") + if len(values) == 0 || values[0] == "" { + return "", fmt.Errorf("agent-id not found") + } + + return values[0], nil +} diff --git a/hub/internal/service/connection_manager/store.go b/hub/internal/service/connection_manager/store.go new file mode 100644 index 0000000..3e5ffa8 --- /dev/null +++ b/hub/internal/service/connection_manager/store.go @@ -0,0 +1,82 @@ +package connection_manager + +import ( + "sync" + + domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" +) + +type AgentConnStore struct { + mutex sync.RWMutex + store map[string]*AgentConnection +} + +func NewAgentConnStore() *AgentConnStore { + return &AgentConnStore{store: make(map[string]*AgentConnection)} +} + +func (a *AgentConnStore) Get(agentID string) *AgentConnection { + a.mutex.RLock() + defer a.mutex.RUnlock() + return a.store[agentID] +} + +func (a *AgentConnStore) Add(agentID string, agentConn *AgentConnection) { + a.mutex.Lock() + defer a.mutex.Unlock() + a.store[agentID] = agentConn +} + +func (a *AgentConnStore) Delete(agentID string) { + a.mutex.Lock() + defer a.mutex.Unlock() + delete(a.store, agentID) +} + +func (a *AgentConnStore) Pop(agentID string) *AgentConnection { + a.mutex.Lock() + defer a.mutex.Unlock() + agent := a.store[agentID] + delete(a.store, agentID) + return agent +} + +func (a *AgentConnStore) GetAllAgentID() []string { + a.mutex.RLock() + defer a.mutex.RUnlock() + + var IDs []string + for ID := range a.store { + IDs = append(IDs, ID) + } + return IDs +} + +type ResponseStore struct { + store map[string]chan domainHub.AgentResponse + mutex sync.RWMutex +} + +func NewResponseStore() *ResponseStore { + data := make(map[string]chan domainHub.AgentResponse) + return &ResponseStore{store: data} +} + +func (r *ResponseStore) Write(responseID string, channel chan domainHub.AgentResponse) { + r.mutex.Lock() + defer r.mutex.Unlock() + r.store[responseID] = channel +} + +func (r *ResponseStore) Read(responseID string) (chan domainHub.AgentResponse, bool) { + r.mutex.RLock() + defer r.mutex.RUnlock() + ch, ok := r.store[responseID] + return ch, ok +} + +func (r *ResponseStore) Delete(responseID string) { + r.mutex.Lock() + defer r.mutex.Unlock() + delete(r.store, responseID) +} diff --git a/hub/internal/service/connection_manager/store/agent.go b/hub/internal/service/connection_manager/store/agent.go deleted file mode 100644 index a7ddd32..0000000 --- a/hub/internal/service/connection_manager/store/agent.go +++ /dev/null @@ -1,36 +0,0 @@ -package store - -import ( - "sync" - - domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" -) - -type ResponseStore struct { - store map[string]chan domainHub.AgentResponse - mutex sync.RWMutex -} - -func NewResponseStore() *ResponseStore { - data := make(map[string]chan domainHub.AgentResponse) - return &ResponseStore{store: data} -} - -func (r *ResponseStore) Write(responseID string, channel chan domainHub.AgentResponse) { - r.mutex.Lock() - defer r.mutex.Unlock() - r.store[responseID] = channel -} - -func (r *ResponseStore) Read(responseID string) (chan domainHub.AgentResponse, bool) { - r.mutex.RLock() - defer r.mutex.RUnlock() - ch, ok := r.store[responseID] - return ch, ok -} - -func (r *ResponseStore) Delete(responseID string) { - r.mutex.Lock() - defer r.mutex.Unlock() - delete(r.store, responseID) -} diff --git a/hub/internal/service/connection_manager/store/manager.go b/hub/internal/service/connection_manager/store/manager.go deleted file mode 100644 index b690f36..0000000 --- a/hub/internal/service/connection_manager/store/manager.go +++ /dev/null @@ -1,42 +0,0 @@ -package store - -import ( - "sync" - - "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager" -) - -type AgentConnStore struct { - mutex sync.RWMutex - store map[string]*connection_manager.AgentConnection -} - -func NewAgentConnStore() *AgentConnStore { - return &AgentConnStore{store: make(map[string]*connection_manager.AgentConnection)} -} - -func (a *AgentConnStore) Get(agentID string) *connection_manager.AgentConnection { - a.mutex.RLock() - defer a.mutex.RUnlock() - return a.store[agentID] -} - -func (a *AgentConnStore) Add(agentConn *connection_manager.AgentConnection) { - a.mutex.Lock() - defer a.mutex.Unlock() - a.store[agentConn.AgentID] = agentConn -} - -func (a *AgentConnStore) Delete(agentID string) { - a.mutex.Lock() - defer a.mutex.Unlock() - delete(a.store, agentID) -} - -func (a *AgentConnStore) Pop(agentID string) *connection_manager.AgentConnection { - a.mutex.Lock() - defer a.mutex.Unlock() - agent := a.store[agentID] - delete(a.store, agentID) - return agent -} From 8705f8708d1dc8c92e1daa42daafeccbcb13aedd Mon Sep 17 00:00:00 2001 From: lorsan Date: Mon, 18 May 2026 15:58:00 +0300 Subject: [PATCH 17/24] refactor: edit readme --- README.md | 3 +-- hub/internal/service/connection_manager/manager_test.go | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) create mode 100644 hub/internal/service/connection_manager/manager_test.go diff --git a/README.md b/README.md index b31d324..9731669 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ Проект состоит из трех основных компонентов: -1. **Hub (The Brain):** Центральный сервис на Go/Python. Хранит конфигурации, управляет состоянием, оркестрирует Runbooks и управляет Telegram-ботом. База данных — Postgres. +1. **Hub (The Brain):** Центральный сервис на Go/Python. Хранит конфигурации, управляет состоянием, оркестрирует Runbooks и управляет Telegram-ботом. База данных — SQLite. 2. **Agent (The Executor):** Легковесный бинарный файл, работающий непосредственно на целевом хосте. Связь с Хабом через **gRPC Stream**. 3. **Telegram Interface:** Основной UI для оперативного управления с использованием Inline-кнопок для быстрых действий (Ack, Mute, Restart). @@ -66,7 +66,6 @@ ## 🔒 Безопасность * **No Inbound Ports:** Агент не слушает порты. Соединение всегда инициируется изнутри вашей сети. -* **mTLS / Token Auth:** Весь трафик между Агентом и Хабом зашифрован. * **Hardened Commands:** Невозможно выполнить `rm -rf /` — доступны только те команды, что описаны в коде агента. *Created with ❤️ for Stasik.* \ No newline at end of file diff --git a/hub/internal/service/connection_manager/manager_test.go b/hub/internal/service/connection_manager/manager_test.go new file mode 100644 index 0000000..dc1ee8e --- /dev/null +++ b/hub/internal/service/connection_manager/manager_test.go @@ -0,0 +1 @@ +package connection_manager From ef67ccd7b2488d237029f79699e92d3100e067aa Mon Sep 17 00:00:00 2001 From: lorsan Date: Mon, 18 May 2026 16:01:07 +0300 Subject: [PATCH 18/24] fix: agent_id in table --- .../migrations/20260506182346_create_heartbeat_table.up.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hub/internal/migrations/20260506182346_create_heartbeat_table.up.sql b/hub/internal/migrations/20260506182346_create_heartbeat_table.up.sql index dd25c58..e329974 100644 --- a/hub/internal/migrations/20260506182346_create_heartbeat_table.up.sql +++ b/hub/internal/migrations/20260506182346_create_heartbeat_table.up.sql @@ -1,6 +1,6 @@ CREATE TABLE heartbeats ( id INTEGER PRIMARY KEY AUTOINCREMENT, - agent_id VARCHAR(32) UNIQUE NOT NULL, + agent_id VARCHAR(32) NOT NULL, cpu_usage FLOAT NOT NULL , memory_usage FLOAT NOT NULL , disk_usage FLOAT NOT NULL , From f0c3a678265d03560ab0eb46c87f0ab129ec69b2 Mon Sep 17 00:00:00 2001 From: lorsan Date: Mon, 18 May 2026 16:06:29 +0300 Subject: [PATCH 19/24] refactor: comma-ok in listenHeartbeat --- hub/internal/service/connection_manager/agent.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/hub/internal/service/connection_manager/agent.go b/hub/internal/service/connection_manager/agent.go index ca736a4..1ab1ce5 100644 --- a/hub/internal/service/connection_manager/agent.go +++ b/hub/internal/service/connection_manager/agent.go @@ -90,9 +90,12 @@ func (a *AgentConnection) listenHeartbeat(heartbeats <-chan domainHub.CreateHear } a.log.Warn().Msg("agent not send heartbeat") - a.Close() + _ = a.Close() return - case heartbeat := <-heartbeats: + case heartbeat, ok := <-heartbeats: + if !ok { + return + } a.log.Debug(). Float64("cpu usage", heartbeat.Metrics.CpuUsage). Float64("disk usage", heartbeat.Metrics.DiskUsage). From 4f4d213acac3fc9f47ed5d08ac7bc819594e24b5 Mon Sep 17 00:00:00 2001 From: lorsan Date: Wed, 20 May 2026 21:55:43 +0300 Subject: [PATCH 20/24] feat: add test connection manager --- .../service/connection_manager/agent_test.go | 103 --------------- .../service/connection_manager/manager.go | 6 +- .../connection_manager/manager_test.go | 109 +++++++++++++++ .../service/connection_manager/mock_test.go | 125 ++++++++++++++++++ 4 files changed, 238 insertions(+), 105 deletions(-) create mode 100644 hub/internal/service/connection_manager/mock_test.go diff --git a/hub/internal/service/connection_manager/agent_test.go b/hub/internal/service/connection_manager/agent_test.go index 2f3bc3c..e1cad49 100644 --- a/hub/internal/service/connection_manager/agent_test.go +++ b/hub/internal/service/connection_manager/agent_test.go @@ -3,7 +3,6 @@ package connection_manager import ( "context" "errors" - "io" "sync" "testing" "time" @@ -14,108 +13,6 @@ import ( "gotest.tools/v3/assert" ) -type streamMock struct { - recvCh chan *pb.AgentEvent - sendCh chan *pb.ServerCommandRequest - closeCh chan struct{} - ctx context.Context - mu sync.Mutex - sendErr error - recvErr error - closeOnce sync.Once -} - -func (f *streamMock) Context() context.Context { - return f.ctx -} - -func (f *streamMock) Send(request *pb.ServerCommandRequest) error { - f.mu.Lock() - err := f.sendErr - f.mu.Unlock() - if err != nil { - return err - } - f.sendCh <- request - return nil -} - -func (f *streamMock) Recv() (*pb.AgentEvent, error) { - f.mu.Lock() - recvErr := f.recvErr - f.mu.Unlock() - if recvErr != nil { - return nil, recvErr - } - select { - case msg, ok := <-f.recvCh: - if !ok { - return nil, io.EOF - } - return msg, nil - case <-f.ctx.Done(): - return nil, f.ctx.Err() - } -} - -func (f *streamMock) Close() error { - select { - case f.closeCh <- struct{}{}: - default: - } - f.closeOnce.Do(func() { - close(f.recvCh) - }) - return nil -} - -type heartBeatMock struct { - mu sync.Mutex - countUse int - doneCh chan struct{} - err error -} - -func (h *heartBeatMock) CreateHeartbeat(ctx context.Context, heartbeat domainHub.CreateHeartbeatModel) error { - h.mu.Lock() - h.countUse += 1 - err := h.err - h.mu.Unlock() - select { - case h.doneCh <- struct{}{}: - default: - } - return err -} - -type statusMock struct { - mu sync.Mutex - online bool - doneCh chan struct{} -} - -func (s *statusMock) Offline() { - s.mu.Lock() - s.online = false - s.mu.Unlock() -} - -func (s *statusMock) Online() { - s.mu.Lock() - s.online = true - s.mu.Unlock() - select { - case s.doneCh <- struct{}{}: - default: - } -} - -func (s *statusMock) IsOnline() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.online -} - type agentTestHarness struct { ctx context.Context cancel context.CancelFunc diff --git a/hub/internal/service/connection_manager/manager.go b/hub/internal/service/connection_manager/manager.go index baacfd6..acd4159 100644 --- a/hub/internal/service/connection_manager/manager.go +++ b/hub/internal/service/connection_manager/manager.go @@ -21,11 +21,11 @@ func NewConnectionManager(heartbeat heartbeatStore, status statusNotifier, logge return &ConnectionManager{heartbeat: heartbeat, log: logger, status: status, agentConnStore: NewAgentConnStore()} } -func (c *ConnectionManager) NewConnection(stream streamConn) { +func (c *ConnectionManager) NewConnection(stream streamConn) error { AgentID, err := agentIDFromMetadata(stream.Context()) if err != nil { c.log.Error().Err(err).Msg("missing agent id in metadata") - return + return fmt.Errorf("get agent id: %w", err) } c.log.Info().Str("agentID", AgentID).Msg("connection accepted") @@ -41,6 +41,8 @@ func (c *ConnectionManager) NewConnection(stream streamConn) { } c.agentConnStore.Delete(AgentID) }() + + return nil } func (c *ConnectionManager) GetConnection(AgentID string) (*AgentConnection, error) { diff --git a/hub/internal/service/connection_manager/manager_test.go b/hub/internal/service/connection_manager/manager_test.go index dc1ee8e..3bd443d 100644 --- a/hub/internal/service/connection_manager/manager_test.go +++ b/hub/internal/service/connection_manager/manager_test.go @@ -1 +1,110 @@ package connection_manager + +import ( + "context" + "testing" + "time" + + pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/metadata" + "gotest.tools/v3/assert" +) + +type connectionManagerTestHarness struct { + heartbeat *heartBeatMock + status *statusNotifierMock + manager *ConnectionManager +} + +func newConnectionManagerTestHarness(t *testing.T) *connectionManagerTestHarness { + t.Helper() + + heartbeat := &heartBeatMock{doneCh: make(chan struct{}, 2)} + status := &statusNotifierMock{agentIDCh: make(chan string, 1)} + + manager := NewConnectionManager(heartbeat, status, zerolog.New(nil)) + + return &connectionManagerTestHarness{manager: manager, status: status, heartbeat: heartbeat} +} + +func newMetadataAgentID(t *testing.T, agentID string) metadata.MD { + t.Helper() + + return metadata.New(map[string]string{"agent-id": agentID}) +} + +func TestNewConnectionManager_NewConnection(t *testing.T) { + h := newConnectionManagerTestHarness(t) + agentID := "123" + + ctx := metadata.NewIncomingContext(context.Background(), newMetadataAgentID(t, agentID)) + + stream := streamMock{ctx: ctx, + recvCh: make(chan *pb.AgentEvent, 1), + sendCh: make(chan *pb.ServerCommandRequest, 1), + closeCh: make(chan struct{}, 1), + } + + err := h.manager.NewConnection(&stream) + assert.NilError(t, err) + + select { + case ID := <-h.status.agentIDCh: + require.Equal(t, agentID, ID) + case <-time.After(200 * time.Millisecond): + t.Fatalf("get agent id for notifier") + } + + agentIDs := h.manager.GetAllAgentID() + assert.Equal(t, agentID, agentIDs[0]) + + agent, err := h.manager.GetConnection(agentID) + assert.NilError(t, err) + require.NotNil(t, agent) + +} + +func TestNewConnectionManager_NewConnectionNotAgentID(t *testing.T) { + h := newConnectionManagerTestHarness(t) + + stream := streamMock{ctx: context.Background(), + recvCh: make(chan *pb.AgentEvent, 1), + sendCh: make(chan *pb.ServerCommandRequest, 1), + closeCh: make(chan struct{}, 1), + } + + err := h.manager.NewConnection(&stream) + assert.ErrorContains(t, err, "get agent id") +} + +func TestNewConnectionManager_AgentNotFound(t *testing.T) { + h := newConnectionManagerTestHarness(t) + _, err := h.manager.GetConnection("123") + assert.ErrorIs(t, ErrNotFoundConn, err) + + agentIDs := h.manager.GetAllAgentID() + assert.Equal(t, len(agentIDs), 0) +} + +func Test_agentIDFromMetadata(t *testing.T) { + agentID := "123" + + ctx := metadata.NewIncomingContext(context.Background(), newMetadataAgentID(t, agentID)) + id, err := agentIDFromMetadata(ctx) + assert.NilError(t, err) + assert.Equal(t, id, agentID) +} + +func Test_agentIDFromMetadata_MetadataNotFound(t *testing.T) { + ctx := context.Background() + _, err := agentIDFromMetadata(ctx) + assert.ErrorContains(t, err, "metadata not found") +} + +func Test_agentIDFromMetadata_AgentIDNotFound(t *testing.T) { + ctx := metadata.NewIncomingContext(context.Background(), newMetadataAgentID(t, "")) + _, err := agentIDFromMetadata(ctx) + assert.ErrorContains(t, err, "agent-id not found") +} diff --git a/hub/internal/service/connection_manager/mock_test.go b/hub/internal/service/connection_manager/mock_test.go new file mode 100644 index 0000000..881eb5b --- /dev/null +++ b/hub/internal/service/connection_manager/mock_test.go @@ -0,0 +1,125 @@ +package connection_manager + +import ( + "context" + "io" + "sync" + + pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" + domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" +) + +type streamMock struct { + recvCh chan *pb.AgentEvent + sendCh chan *pb.ServerCommandRequest + closeCh chan struct{} + ctx context.Context + mu sync.Mutex + sendErr error + recvErr error + closeOnce sync.Once +} + +func (f *streamMock) Context() context.Context { + return f.ctx +} + +func (f *streamMock) Send(request *pb.ServerCommandRequest) error { + f.mu.Lock() + err := f.sendErr + f.mu.Unlock() + if err != nil { + return err + } + f.sendCh <- request + return nil +} + +func (f *streamMock) Recv() (*pb.AgentEvent, error) { + f.mu.Lock() + recvErr := f.recvErr + f.mu.Unlock() + if recvErr != nil { + return nil, recvErr + } + select { + case msg, ok := <-f.recvCh: + if !ok { + return nil, io.EOF + } + return msg, nil + case <-f.ctx.Done(): + return nil, f.ctx.Err() + } +} + +func (f *streamMock) Close() error { + select { + case f.closeCh <- struct{}{}: + default: + } + f.closeOnce.Do(func() { + close(f.recvCh) + }) + return nil +} + +type heartBeatMock struct { + mu sync.Mutex + countUse int + doneCh chan struct{} + err error +} + +func (h *heartBeatMock) CreateHeartbeat(ctx context.Context, heartbeat domainHub.CreateHeartbeatModel) error { + h.mu.Lock() + h.countUse += 1 + err := h.err + h.mu.Unlock() + select { + case h.doneCh <- struct{}{}: + default: + } + return err +} + +type statusMock struct { + mu sync.Mutex + online bool + doneCh chan struct{} +} + +func (s *statusMock) Offline() { + s.mu.Lock() + s.online = false + s.mu.Unlock() +} + +func (s *statusMock) Online() { + s.mu.Lock() + s.online = true + s.mu.Unlock() + select { + case s.doneCh <- struct{}{}: + default: + } +} + +func (s *statusMock) IsOnline() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.online +} + +type statusNotifierMock struct { + agentIDCh chan string +} + +func (s *statusNotifierMock) New(AgentID string) statusAgent { + select { + case s.agentIDCh <- AgentID: + default: + + } + return &statusMock{doneCh: make(chan struct{}, 2)} +} From 6077e4c37da1577d046620eae166d538c1a3f16d Mon Sep 17 00:00:00 2001 From: lorsan Date: Wed, 20 May 2026 21:56:53 +0300 Subject: [PATCH 21/24] refactor: edit docker compose dev and update go mod --- .../compose/dev/hub.docker-compose.yaml | 24 ------------------- go.mod | 3 +++ 2 files changed, 3 insertions(+), 24 deletions(-) diff --git a/deployments/compose/dev/hub.docker-compose.yaml b/deployments/compose/dev/hub.docker-compose.yaml index 709e12d..e69de29 100644 --- a/deployments/compose/dev/hub.docker-compose.yaml +++ b/deployments/compose/dev/hub.docker-compose.yaml @@ -1,24 +0,0 @@ -services: - postgres-db: - image: postgres:latest - volumes: - - pgdata:/var/lib/postgresql - environment: - POSTGRES_DB: "${DB_NAME}" - POSTGRES_USER: "${DB_USER}" - POSTGRES_PASSWORD: "${DB_PASS}" - networks: - - homeops-dev - ports: - - "${DB_PORT}:5432" - healthcheck: - test: ["CMD-SHELL", "pg_isready -U ${DB_USER}"] - interval: 5s - retries: 5 - restart: always - -volumes: - pgdata: - -networks: - homeops-dev: \ No newline at end of file diff --git a/go.mod b/go.mod index bda85b7..041d5c0 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/ilyakaznacheev/cleanenv v1.5.0 github.com/mattn/go-sqlite3 v1.14.44 github.com/rs/zerolog v1.35.1 + github.com/stretchr/testify v1.11.1 google.golang.org/grpc v1.81.0 google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v3 v3.0.1 @@ -22,6 +23,7 @@ require ( github.com/containerd/errdefs v1.0.0 // indirect github.com/containerd/errdefs/pkg v0.3.0 // indirect github.com/containerd/log v0.1.0 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/distribution/reference v0.6.0 // indirect github.com/docker/go-connections v0.7.0 // indirect github.com/docker/go-units v0.5.0 // indirect @@ -41,6 +43,7 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.1 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0 // indirect From b157d7c32d2403a698efdf0ba373c14cc25d3903 Mon Sep 17 00:00:00 2001 From: lorsan Date: Thu, 21 May 2026 22:17:16 +0300 Subject: [PATCH 22/24] =?UTF-8?q?refactor:=20=D1=81hanged=20the=20connecti?= =?UTF-8?q?on=20manager=20to=20implement=20the=20stream?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/connection_manager/agent.go | 49 +++++++----- .../service/connection_manager/agent_test.go | 74 +++++++++---------- .../service/connection_manager/interface.go | 1 - .../service/connection_manager/manager.go | 14 ++-- .../connection_manager/manager_test.go | 26 ++++--- .../service/connection_manager/mock_test.go | 8 +- 6 files changed, 86 insertions(+), 86 deletions(-) diff --git a/hub/internal/service/connection_manager/agent.go b/hub/internal/service/connection_manager/agent.go index 1ab1ce5..fea989c 100644 --- a/hub/internal/service/connection_manager/agent.go +++ b/hub/internal/service/connection_manager/agent.go @@ -32,36 +32,31 @@ func newAgentConnection(agentID string, stream streamConn, heartbeat heartbeatSt } func (a *AgentConnection) Listen() error { - defer a.status.Offline() + heartbeatsCh := make(chan domainHub.CreateHeartbeatModel, 5) + streamRecvCh := make(chan *pb.AgentEvent, 5) - heartbeatsChan := make(chan domainHub.CreateHeartbeatModel, 5) - go a.listenHeartbeat(heartbeatsChan) - defer close(heartbeatsChan) + go a.listenHeartbeat(heartbeatsCh) + go a.listenStream(streamRecvCh) defer func() { - err := a.Close() - if err != nil { - a.log.Warn().Err(err).Msg("failed stream close") - } + a.status.Offline() + close(heartbeatsCh) + a.Close() }() for { select { case <-a.ctx.Done(): return a.ctx.Err() - default: - agentEvent, err := a.stream.Recv() - if err == io.EOF { + case msg, ok := <-streamRecvCh: + if !ok { return nil } - if err != nil { - return fmt.Errorf("stream: %w", err) - } - switch x := agentEvent.Event.(type) { + switch x := msg.Event.(type) { case *pb.AgentEvent_Heartbeat: heartbeat := toCreateHeartbeatModel(a.AgentID, x) - heartbeatsChan <- heartbeat + heartbeatsCh <- heartbeat case *pb.AgentEvent_CommandResponse: ch, ok := a.response.Read(x.CommandResponse.RequestId) if !ok { @@ -71,10 +66,27 @@ func (a *AgentConnection) Listen() error { response := toAgentResponse(x) ch <- response } + default: } } } +func (a *AgentConnection) listenStream(ch chan *pb.AgentEvent) { + defer close(ch) + for { + agentEvent, err := a.stream.Recv() + if err == io.EOF { + return + } + if err != nil { + a.log.Warn().Err(err).Msg("close stream") + return + } + + ch <- agentEvent + } +} + func (a *AgentConnection) listenHeartbeat(heartbeats <-chan domainHub.CreateHeartbeatModel) { lastHeartbeat := 0 timer := time.NewTicker(time.Duration(a.heartbeatTimeoutMS) * time.Millisecond) @@ -90,7 +102,7 @@ func (a *AgentConnection) listenHeartbeat(heartbeats <-chan domainHub.CreateHear } a.log.Warn().Msg("agent not send heartbeat") - _ = a.Close() + a.Close() return case heartbeat, ok := <-heartbeats: if !ok { @@ -140,7 +152,6 @@ func (a *AgentConnection) Execute(ctx context.Context, request domainHub.AgentRe } } -func (a *AgentConnection) Close() error { +func (a *AgentConnection) Close() { a.cancel() - return a.stream.Close() } diff --git a/hub/internal/service/connection_manager/agent_test.go b/hub/internal/service/connection_manager/agent_test.go index e1cad49..484ab85 100644 --- a/hub/internal/service/connection_manager/agent_test.go +++ b/hub/internal/service/connection_manager/agent_test.go @@ -30,7 +30,7 @@ func newAgentTestHarness(t *testing.T, heartbeatTimeoutMS int) *agentTestHarness recvStream := make(chan *pb.AgentEvent, 4) ctx, cancel := context.WithCancel(context.Background()) - stream := &streamMock{recvCh: recvStream, sendCh: sendStream, ctx: ctx, closeCh: make(chan struct{}, 1)} + stream := &streamMock{recvCh: recvStream, sendCh: sendStream, ctx: ctx} heartbeat := &heartBeatMock{doneCh: make(chan struct{}, 2)} status := &statusMock{doneCh: make(chan struct{}, 2)} @@ -55,15 +55,6 @@ func waitFor(t *testing.T, ch <-chan struct{}, timeout time.Duration, message st } } -func waitForClose(t *testing.T, closeCh <-chan struct{}, timeout time.Duration) { - t.Helper() - select { - case <-closeCh: - case <-time.After(timeout): - t.Fatal("timeout waiting for close") - } -} - func commandResponseEvent(requestID, output string) *pb.AgentEvent { return &pb.AgentEvent{ AgentId: "agent-1", @@ -79,7 +70,11 @@ func commandResponseEvent(requestID, output string) *pb.AgentEvent { func TestAgentConnection_Heartbeat(t *testing.T) { h := newAgentTestHarness(t, 5000) - go h.agent.Listen() + done := make(chan struct{}) + go func() { + _ = h.agent.Listen() + close(done) + }() h.recvCh <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_Heartbeat{ Heartbeat: &pb.Heartbeat{ @@ -98,7 +93,7 @@ func TestAgentConnection_Heartbeat(t *testing.T) { assert.Equal(t, h.status.IsOnline(), true) h.cancel() - waitForClose(t, h.stream.closeCh, 500*time.Millisecond) + waitFor(t, done, 500*time.Millisecond, "timeout waiting for listen stop") assert.Equal(t, h.status.IsOnline(), false) } @@ -142,13 +137,11 @@ func TestAgentConnection_Execute(t *testing.T) { func TestAgentConnection_HeartbeatTimeout(t *testing.T) { h := newAgentTestHarness(t, 200) - var wg sync.WaitGroup + listenDone := make(chan error, 1) + execDone := make(chan error, 1) - wg.Add(2) go func() { - err := h.agent.Listen() - assert.NilError(t, err) - wg.Done() + listenDone <- h.agent.Listen() }() go func() { @@ -157,13 +150,25 @@ func TestAgentConnection_HeartbeatTimeout(t *testing.T) { Args: nil, TimeOut: 0, }) - assert.ErrorIs(t, err, ErrConnectionClose) - wg.Done() + execDone <- err }() - wg.Wait() - - waitForClose(t, h.stream.closeCh, 500*time.Millisecond) + timeout := time.After(2 * time.Second) + gotListen := false + gotExec := false + for !(gotListen && gotExec) { + select { + case err := <-listenDone: + assert.ErrorIs(t, err, context.Canceled) + gotListen = true + case err := <-execDone: + assert.ErrorIs(t, err, ErrConnectionClose) + gotExec = true + case <-timeout: + h.cancel() + t.Fatal("timeout waiting for heartbeat timeout") + } + } } func TestAgentConnection_ConnectionClose(t *testing.T) { @@ -190,8 +195,6 @@ func TestAgentConnection_ConnectionClose(t *testing.T) { h.cancel() wg.Wait() - - waitForClose(t, h.stream.closeCh, 500*time.Millisecond) } func TestAgentConnection_ExecuteClose(t *testing.T) { @@ -219,23 +222,10 @@ func TestAgentConnection_ExecuteClose(t *testing.T) { func TestAgentConnection_ListenEOF(t *testing.T) { h := newAgentTestHarness(t, 5000) - h.stream.Close() + h.stream.CloseRecv() err := h.agent.Listen() assert.NilError(t, err) - waitForClose(t, h.stream.closeCh, 500*time.Millisecond) -} - -func TestAgentConnection_ListenRecvError(t *testing.T) { - h := newAgentTestHarness(t, 5000) - - recvErr := errors.New("recv failure") - h.stream.mu.Lock() - h.stream.recvErr = recvErr - h.stream.mu.Unlock() - - err := h.agent.Listen() - assert.ErrorIs(t, err, recvErr) } func TestAgentConnection_ExecuteSendError(t *testing.T) { @@ -267,7 +257,11 @@ func TestAgentConnection_ExecuteConnectionCanceled(t *testing.T) { func TestAgentConnection_UnknownResponseID(t *testing.T) { h := newAgentTestHarness(t, 5000) - go h.agent.Listen() + done := make(chan struct{}) + go func() { + _ = h.agent.Listen() + close(done) + }() h.recvCh <- &pb.AgentEvent{AgentId: "agent-1", Event: &pb.AgentEvent_CommandResponse{ CommandResponse: &pb.CommandResponse{ @@ -277,7 +271,7 @@ func TestAgentConnection_UnknownResponseID(t *testing.T) { }}} h.cancel() - waitForClose(t, h.stream.closeCh, 500*time.Millisecond) + waitFor(t, done, 500*time.Millisecond, "timeout waiting for listen stop") } func TestAgentConnection_HeartbeatErrorDoesNotStop(t *testing.T) { diff --git a/hub/internal/service/connection_manager/interface.go b/hub/internal/service/connection_manager/interface.go index 19cc737..70c2a95 100644 --- a/hub/internal/service/connection_manager/interface.go +++ b/hub/internal/service/connection_manager/interface.go @@ -11,7 +11,6 @@ type streamConn interface { Send(request *pb.ServerCommandRequest) error Recv() (*pb.AgentEvent, error) Context() context.Context - Close() error } type heartbeatStore interface { diff --git a/hub/internal/service/connection_manager/manager.go b/hub/internal/service/connection_manager/manager.go index acd4159..09d78eb 100644 --- a/hub/internal/service/connection_manager/manager.go +++ b/hub/internal/service/connection_manager/manager.go @@ -27,22 +27,18 @@ func (c *ConnectionManager) NewConnection(stream streamConn) error { c.log.Error().Err(err).Msg("missing agent id in metadata") return fmt.Errorf("get agent id: %w", err) } + c.log.Info().Str("agentID", AgentID).Msg("connection accepted") status := c.status.New(AgentID) agent := newAgentConnection(AgentID, stream, c.heartbeat, status, heartbeatTimeoutMS, c.log) c.agentConnStore.Add(AgentID, agent) - go func() { - c.log.Debug().Str("agentID", AgentID).Msg("start listening") - err := agent.Listen() - if err != nil { - c.log.Error().Err(err).Msg("listening agent stopped") - } - c.agentConnStore.Delete(AgentID) - }() + defer c.agentConnStore.Delete(AgentID) - return nil + c.log.Debug().Str("agentID", AgentID).Msg("start listening") + + return agent.Listen() } func (c *ConnectionManager) GetConnection(AgentID string) (*AgentConnection, error) { diff --git a/hub/internal/service/connection_manager/manager_test.go b/hub/internal/service/connection_manager/manager_test.go index 3bd443d..61a16c6 100644 --- a/hub/internal/service/connection_manager/manager_test.go +++ b/hub/internal/service/connection_manager/manager_test.go @@ -42,13 +42,13 @@ func TestNewConnectionManager_NewConnection(t *testing.T) { ctx := metadata.NewIncomingContext(context.Background(), newMetadataAgentID(t, agentID)) stream := streamMock{ctx: ctx, - recvCh: make(chan *pb.AgentEvent, 1), - sendCh: make(chan *pb.ServerCommandRequest, 1), - closeCh: make(chan struct{}, 1), + recvCh: make(chan *pb.AgentEvent, 1), + sendCh: make(chan *pb.ServerCommandRequest, 1), } - err := h.manager.NewConnection(&stream) - assert.NilError(t, err) + go func() { + _ = h.manager.NewConnection(&stream) + }() select { case ID := <-h.status.agentIDCh: @@ -70,13 +70,19 @@ func TestNewConnectionManager_NewConnectionNotAgentID(t *testing.T) { h := newConnectionManagerTestHarness(t) stream := streamMock{ctx: context.Background(), - recvCh: make(chan *pb.AgentEvent, 1), - sendCh: make(chan *pb.ServerCommandRequest, 1), - closeCh: make(chan struct{}, 1), + recvCh: make(chan *pb.AgentEvent, 1), + sendCh: make(chan *pb.ServerCommandRequest, 1), } - err := h.manager.NewConnection(&stream) - assert.ErrorContains(t, err, "get agent id") + wait := make(chan struct{}) + + go func() { + err := h.manager.NewConnection(&stream) + assert.ErrorContains(t, err, "get agent id") + wait <- struct{}{} + }() + + waitFor(t, wait, 5000, "timeout new connection") } func TestNewConnectionManager_AgentNotFound(t *testing.T) { diff --git a/hub/internal/service/connection_manager/mock_test.go b/hub/internal/service/connection_manager/mock_test.go index 881eb5b..5ae528b 100644 --- a/hub/internal/service/connection_manager/mock_test.go +++ b/hub/internal/service/connection_manager/mock_test.go @@ -12,7 +12,6 @@ import ( type streamMock struct { recvCh chan *pb.AgentEvent sendCh chan *pb.ServerCommandRequest - closeCh chan struct{} ctx context.Context mu sync.Mutex sendErr error @@ -53,15 +52,10 @@ func (f *streamMock) Recv() (*pb.AgentEvent, error) { } } -func (f *streamMock) Close() error { - select { - case f.closeCh <- struct{}{}: - default: - } +func (f *streamMock) CloseRecv() { f.closeOnce.Do(func() { close(f.recvCh) }) - return nil } type heartBeatMock struct { From c41cbc3c2f42492eb3d610696f8003f4ee4adc43 Mon Sep 17 00:00:00 2001 From: lorsan Date: Sat, 23 May 2026 14:03:43 +0300 Subject: [PATCH 23/24] feat: add connect stream to connection manager --- hub/internal/app/app.go | 10 +++++-- hub/internal/rpc/server.go | 30 ++++++++++++++----- .../service/connection_manager/agent.go | 6 ++-- .../service/connection_manager/interface.go | 6 ++-- .../service/connection_manager/manager.go | 2 +- .../service/connection_manager/mock_test.go | 2 +- hub/internal/utils/notifier/status.go | 28 +++++++++++++++++ 7 files changed, 65 insertions(+), 19 deletions(-) create mode 100644 hub/internal/utils/notifier/status.go diff --git a/hub/internal/app/app.go b/hub/internal/app/app.go index 0c90402..5d5072c 100644 --- a/hub/internal/app/app.go +++ b/hub/internal/app/app.go @@ -9,8 +9,10 @@ import ( hubdir "github.com/lorsanstand/HomeOps-Hub/hub/internal" "github.com/lorsanstand/HomeOps-Hub/hub/internal/migrator" grpcserv "github.com/lorsanstand/HomeOps-Hub/hub/internal/rpc" + "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager" "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/hub_service" "github.com/lorsanstand/HomeOps-Hub/hub/internal/store" + "github.com/lorsanstand/HomeOps-Hub/hub/internal/utils/notifier" "github.com/lorsanstand/HomeOps-Hub/shared/config" "github.com/lorsanstand/HomeOps-Hub/shared/log" _ "github.com/mattn/go-sqlite3" @@ -63,19 +65,21 @@ func (a *App) Run() { hubStore := store.NewHubStore(DBConn) hubService := hub_service.NewHubService(hubStore, a.log) + statusNotifier := notifier.NewStatusNotifier() + connManger := connection_manager.NewConnectionManager(hubStore, statusNotifier, a.log) a.log.Info().Msg("starting hub service") - err = a.hubServe(hubService) + err = a.hubServe(hubService, connManger) if err != nil { a.log.Error().Err(err).Msg("hub service failed to start") return } } -func (a *App) hubServe(hubService *hub_service.HubService) error { +func (a *App) hubServe(hubService *hub_service.HubService, manager *connection_manager.ConnectionManager) error { address := fmt.Sprintf("0.0.0.0:%v", a.cfg.Port) - server := grpcserv.NewHubHandler(hubService, a.log) + server := grpcserv.NewHubHandler(hubService, manager, a.log) lis, err := net.Listen("tcp", address) if err != nil { diff --git a/hub/internal/rpc/server.go b/hub/internal/rpc/server.go index d9de289..242d39f 100644 --- a/hub/internal/rpc/server.go +++ b/hub/internal/rpc/server.go @@ -2,8 +2,10 @@ package rpc import ( "context" + "fmt" pb "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops" + "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager" "github.com/lorsanstand/HomeOps-Hub/shared/domain" "github.com/rs/zerolog" "google.golang.org/grpc" @@ -14,15 +16,20 @@ type HubService interface { RegisterAgent(ctx context.Context, data domain.RegisterAgentRequest) (domain.RegisterAgentResponse, error) } -type HubHandler struct { - pb.UnimplementedHubServer - log zerolog.Logger - GrpcServer *grpc.Server - hub HubService +type ConnectionManager interface { + NewConnection(stream connection_manager.StreamConn) error } -func NewHubHandler(HubServ HubService, logger zerolog.Logger) *HubHandler { - hub := &HubHandler{log: logger, hub: HubServ} +type HubHandler struct { + pb.UnimplementedHubServer + log zerolog.Logger + GrpcServer *grpc.Server + hub HubService + streamManager ConnectionManager +} + +func NewHubHandler(HubServ HubService, manager ConnectionManager, logger zerolog.Logger) *HubHandler { + hub := &HubHandler{log: logger, hub: HubServ, streamManager: manager} grpcServer := grpc.NewServer() pb.RegisterHubServer(grpcServer, hub) @@ -42,9 +49,16 @@ func (h *HubHandler) RegisterAgent(ctx context.Context, request *pb.RegisterAgen data := domain.ToDomainAgentRequest(request) resp, err := h.hub.RegisterAgent(ctx, data) if err != nil { - h.log.Error().Err(err).Str("agentID", request.AgentId).Msg("register agent request failed") return domain.ToGRPCAgentResponse(resp), err } h.log.Info().Str("agentID", resp.AgentID).Msg("register agent request completed") return domain.ToGRPCAgentResponse(resp), nil } + +func (h *HubHandler) StreamConnection(stream grpc.BidiStreamingServer[pb.AgentEvent, pb.ServerCommandRequest]) error { + err := h.streamManager.NewConnection(stream) + if err != nil { + return fmt.Errorf("accept connection: %w", err) + } + return nil +} diff --git a/hub/internal/service/connection_manager/agent.go b/hub/internal/service/connection_manager/agent.go index fea989c..a391b98 100644 --- a/hub/internal/service/connection_manager/agent.go +++ b/hub/internal/service/connection_manager/agent.go @@ -13,10 +13,10 @@ import ( ) type AgentConnection struct { - stream streamConn + stream StreamConn heartbeat heartbeatStore log zerolog.Logger - status statusAgent + status StatusAgent AgentID string response *ResponseStore ctx context.Context @@ -24,7 +24,7 @@ type AgentConnection struct { heartbeatTimeoutMS int } -func newAgentConnection(agentID string, stream streamConn, heartbeat heartbeatStore, status statusAgent, heartbeatTimeoutMS int, logger zerolog.Logger) *AgentConnection { +func newAgentConnection(agentID string, stream StreamConn, heartbeat heartbeatStore, status StatusAgent, heartbeatTimeoutMS int, logger zerolog.Logger) *AgentConnection { response := NewResponseStore() logger = logger.With().Str("agentID", agentID).Logger() ctx, cancel := context.WithCancel(stream.Context()) diff --git a/hub/internal/service/connection_manager/interface.go b/hub/internal/service/connection_manager/interface.go index 70c2a95..a6844b9 100644 --- a/hub/internal/service/connection_manager/interface.go +++ b/hub/internal/service/connection_manager/interface.go @@ -7,7 +7,7 @@ import ( domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain" ) -type streamConn interface { +type StreamConn interface { Send(request *pb.ServerCommandRequest) error Recv() (*pb.AgentEvent, error) Context() context.Context @@ -17,11 +17,11 @@ type heartbeatStore interface { CreateHeartbeat(ctx context.Context, heartbeat domainHub.CreateHeartbeatModel) error } -type statusAgent interface { +type StatusAgent interface { Offline() Online() } type statusNotifier interface { - New(AgentID string) statusAgent + New(agentID string) StatusAgent } diff --git a/hub/internal/service/connection_manager/manager.go b/hub/internal/service/connection_manager/manager.go index 09d78eb..13acc81 100644 --- a/hub/internal/service/connection_manager/manager.go +++ b/hub/internal/service/connection_manager/manager.go @@ -21,7 +21,7 @@ func NewConnectionManager(heartbeat heartbeatStore, status statusNotifier, logge return &ConnectionManager{heartbeat: heartbeat, log: logger, status: status, agentConnStore: NewAgentConnStore()} } -func (c *ConnectionManager) NewConnection(stream streamConn) error { +func (c *ConnectionManager) NewConnection(stream StreamConn) error { AgentID, err := agentIDFromMetadata(stream.Context()) if err != nil { c.log.Error().Err(err).Msg("missing agent id in metadata") diff --git a/hub/internal/service/connection_manager/mock_test.go b/hub/internal/service/connection_manager/mock_test.go index 5ae528b..25237ad 100644 --- a/hub/internal/service/connection_manager/mock_test.go +++ b/hub/internal/service/connection_manager/mock_test.go @@ -109,7 +109,7 @@ type statusNotifierMock struct { agentIDCh chan string } -func (s *statusNotifierMock) New(AgentID string) statusAgent { +func (s *statusNotifierMock) New(AgentID string) StatusAgent { select { case s.agentIDCh <- AgentID: default: diff --git a/hub/internal/utils/notifier/status.go b/hub/internal/utils/notifier/status.go new file mode 100644 index 0000000..08f2156 --- /dev/null +++ b/hub/internal/utils/notifier/status.go @@ -0,0 +1,28 @@ +package notifier + +import "github.com/lorsanstand/HomeOps-Hub/hub/internal/service/connection_manager" + +// Временная заглушка +type StatusNotifier struct { +} + +type Status struct { + agentID string + online bool +} + +func NewStatusNotifier() *StatusNotifier { + return &StatusNotifier{} +} + +func (s *StatusNotifier) New(agentID string) connection_manager.StatusAgent { + return &Status{agentID: agentID} +} + +func (s *Status) Online() { + s.online = true +} + +func (s *Status) Offline() { + s.online = false +} From 2ca5e7fa8c636b4fe689af205babe904935f0af7 Mon Sep 17 00:00:00 2001 From: lorsan Date: Sat, 23 May 2026 14:13:49 +0300 Subject: [PATCH 24/24] refactor: separate app run --- hub/internal/app/app.go | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/hub/internal/app/app.go b/hub/internal/app/app.go index 5d5072c..4f7eb4d 100644 --- a/hub/internal/app/app.go +++ b/hub/internal/app/app.go @@ -5,6 +5,7 @@ import ( "fmt" standartlog "log" "net" + "os" hubdir "github.com/lorsanstand/HomeOps-Hub/hub/internal" "github.com/lorsanstand/HomeOps-Hub/hub/internal/migrator" @@ -41,7 +42,7 @@ func (a *App) Run() { DBConn, err := sql.Open("sqlite", "database.db") if err != nil { a.log.Error().Err(err).Msg("failed to connect to the database") - return + os.Exit(1) } defer func() { @@ -50,16 +51,10 @@ func (a *App) Run() { } }() - mgrt, err := migrator.NewMigrator(hubdir.MigrationsFS, "migrations") - if err != nil { - a.log.Error().Err(err).Msg("failed to create migrator") - return - } - a.log.Info().Msg("applying database migrations") - if err = mgrt.ApplyMigrations(DBConn); err != nil { - a.log.Error().Err(err).Msg("migrations failed to apply") - return + if err := applyMigrations(DBConn); err != nil { + a.log.Error().Err(err).Msg("") + os.Exit(1) } a.log.Info().Msg("migrations applied successfully") @@ -69,6 +64,7 @@ func (a *App) Run() { connManger := connection_manager.NewConnectionManager(hubStore, statusNotifier, a.log) a.log.Info().Msg("starting hub service") + err = a.hubServe(hubService, connManger) if err != nil { a.log.Error().Err(err).Msg("hub service failed to start") @@ -97,3 +93,14 @@ func (a *App) hubServe(hubService *hub_service.HubService, manager *connection_m return nil } + +func applyMigrations(db *sql.DB) error { + mgrt, err := migrator.NewMigrator(hubdir.MigrationsFS, "migrations") + if err != nil { + return fmt.Errorf("failed to create migrator: %w", err) + } + if err = mgrt.ApplyMigrations(db); err != nil { + return fmt.Errorf("migrations failed to apply: %w", err) + } + return nil +}