diff --git a/.gitignore b/.gitignore index 65cbde7..0adaa22 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .env -config.yaml \ No newline at end of file +config.yaml +.idea \ No newline at end of file diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..2cfb250 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,47 @@ +version: "2" + +run: + # Сколько времени давать линтеру на проверку (для больших проектов лучше ставить 5m) + timeout: 3m + # Проверять ли тесты + tests: true + +linters: + # Отключаем все по умолчанию, чтобы включить только нужные нам + disable-all: true + enable: + # --- ОБЯЗАТЕЛЬНЫЕ (База) --- + - errcheck # Проверяет, не забыл ли ты обработать ошибку (if err != nil) + - govet # Официальный инструмент Go, ищет подозрительные конструкции + - ineffassign # Находит переменные, которым присвоили значение, но не использовали + - staticcheck # Огромный набор проверок на логические ошибки + - unused # Ищет неиспользуемый код (функции, переменные, типы) + + # --- ОБУЧАЮЩИЕ (Помогут новичку) --- + - revive # Замена старому golint: следит за именованием и комментариями + - errname # Проверяет, что ошибки названы по стандарту (например, ErrSentinel) + - goconst # Находит строки, которые часто повторяются (подскажет сделать константу) + - makezero # Следит, чтобы ты не делал лишних аллокаций в slice + + settings: + revive: + # Настраиваем правила + rules: + # Отключаем обязательные комментарии для экспортируемых сущностей + - name: exported + disabled: true + # Отключаем требование комментариев к пакетам + - name: package-comments + disabled: true + # Оставляем полезное: проверка именования (var_name -> varName) + - name: var-naming + severity: warning + +issues: + # Не ограничивай количество ошибок, пока учишься + max-issues-per-linter: 0 + max-same-issues: 0 + + # Исключаем некоторые папки (например, сгенерированный код) + exclude-dirs: + - vendor \ No newline at end of file diff --git a/.idea/golinter.xml b/.idea/golinter.xml new file mode 100644 index 0000000..1ccf3ec --- /dev/null +++ b/.idea/golinter.xml @@ -0,0 +1,7 @@ + + + + + \ No newline at end of file diff --git a/.idea/workspace.sync-conflict-20260504-202326-XNSB2YU.xml b/.idea/workspace.sync-conflict-20260504-202326-XNSB2YU.xml new file mode 100644 index 0000000..ef52743 --- /dev/null +++ b/.idea/workspace.sync-conflict-20260504-202326-XNSB2YU.xml @@ -0,0 +1,83 @@ + + + + + + + + + + + + + + + + { + "associatedIndex": 6 +} + + + + + + + + + + + + + + + + + + + 1775479991162 + + + + + + \ No newline at end of file diff --git a/agent/internal/app/app.go b/agent/internal/app/app.go index 0f0a1a7..d7b5f8e 100644 --- a/agent/internal/app/app.go +++ b/agent/internal/app/app.go @@ -55,7 +55,11 @@ func (a *App) Run() { a.log.Info().Msg("connection to the hub successful") conn := rpc.NewConnectAgent(GRPCConn) - defer conn.Close() + defer func() { + if err := conn.Close(); err != nil { + a.log.Warn().Err(err).Msg("failed to close rpc connection") + } + }() var DockerService collector.Docker diff --git a/agent/internal/service/agent_service/agent.go b/agent/internal/service/agent_service/agent.go index 7609223..6eb18b0 100644 --- a/agent/internal/service/agent_service/agent.go +++ b/agent/internal/service/agent_service/agent.go @@ -50,7 +50,7 @@ func (a *AgentService) RegisterAgentConn(ctx context.Context) error { AgentID := a.settings.GetAgentID() AgentName := a.cfg.AppName AgentData := domain.RegisterAgentRequest{ - AgentId: AgentID, + AgentID: AgentID, AgentName: AgentName, Host: info, Capabilities: caps, diff --git a/agent/internal/service/agent_service/agent_test.go b/agent/internal/service/agent_service/agent_test.go index 5c3bb2c..a229c03 100644 --- a/agent/internal/service/agent_service/agent_test.go +++ b/agent/internal/service/agent_service/agent_test.go @@ -2,8 +2,12 @@ package agent_service import ( "context" + "errors" + "testing" + "github.com/lorsanstand/HomeOps-Hub/agent/internal/utils/config_yaml" "github.com/lorsanstand/HomeOps-Hub/shared/domain" + "github.com/rs/zerolog" ) type CollectorMock struct { @@ -18,8 +22,88 @@ func (c *CollectorMock) GatherInfoSystem() (domain.HostInfo, []domain.Capability type ConnectionMock struct { regAgentErr error regResp domain.RegisterAgentResponse + regData domain.RegisterAgentRequest } func (c *ConnectionMock) RegisterAgent(ctx context.Context, RegisterData domain.RegisterAgentRequest) (domain.RegisterAgentResponse, error) { + c.regData = RegisterData return c.regResp, c.regAgentErr } + +type SettingsMock struct { + insertErr error + agentID string + countUse int +} + +func (s *SettingsMock) InsertAgentID(agentID string) error { + s.agentID = agentID + s.countUse++ + return s.insertErr +} + +func (s *SettingsMock) GetAgentID() string { + return s.agentID +} + +func TestAgentService_RegisterAgentConn(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + wantErr error + insertAgentIDUse int + settings SettingsMock + collector CollectorMock + conn ConnectionMock + cfg config_yaml.AgentConfig + }{ + { + name: "success", + wantErr: nil, + insertAgentIDUse: 1, + settings: SettingsMock{agentID: "", insertErr: nil}, + collector: CollectorMock{ + host: domain.HostInfo{System: "Linux", Hostname: "test", Arch: "x64"}, + caps: []domain.Capability{ + {Available: true, Version: "0", Name: "testCaps", Reason: ""}, + }, + }, + conn: ConnectionMock{regAgentErr: nil, regResp: domain.RegisterAgentResponse{AgentID: "123", Heartbeat: 4}}, + cfg: config_yaml.AgentConfig{AppName: "test"}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + ctx := context.Background() + + svc := NewAgentService(&tt.collector, + &tt.conn, + &tt.settings, + &tt.cfg, + zerolog.New(nil), + ) + + err := svc.RegisterAgentConn(ctx) + if !errors.Is(err, tt.wantErr) { + t.Fatalf("expected error %v, got: %v", tt.wantErr, err) + } + + if tt.insertAgentIDUse != tt.settings.countUse { + t.Errorf("expected count insert agent id %v, got: %v", tt.insertAgentIDUse, tt.settings.countUse) + } + + if tt.settings.agentID != tt.conn.regResp.AgentID { + t.Errorf("expected insert agent id %v, got: %v", tt.conn.regResp.AgentID, tt.settings.agentID) + } + + if tt.cfg.AppName != tt.conn.regData.AgentName { + t.Fatalf("expected agent name %v, got: %v", tt.cfg.AppName, tt.conn.regData.AgentName) + } + + }) + } +} diff --git a/agent/internal/service/docker_service/docker_test.go b/agent/internal/service/docker_service/docker_test.go index 25db239..c7b4170 100644 --- a/agent/internal/service/docker_service/docker_test.go +++ b/agent/internal/service/docker_service/docker_test.go @@ -10,7 +10,7 @@ import ( "github.com/rs/zerolog" ) -var testError error = errors.New("test") +var errTest error = errors.New("test") type DockerMock struct { pingErr error @@ -46,11 +46,11 @@ func TestCheckDockerDaemon(t *testing.T) { { name: "docker error", mock: DockerMock{ - pingErr: testError, + pingErr: errTest, containers: nil, containerErr: nil, }, - wantErr: testError, + wantErr: errTest, }, } @@ -98,10 +98,10 @@ func TestContainersList(t *testing.T) { mock: DockerMock{ pingErr: nil, containers: nil, - containerErr: testError, + containerErr: errTest, }, wantLen: 0, - wantErr: testError, + wantErr: errTest, }, { name: "docker empty container", diff --git a/agent/internal/utils/settings/settings.go b/agent/internal/utils/settings/settings.go index b7ca57c..35d7418 100644 --- a/agent/internal/utils/settings/settings.go +++ b/agent/internal/utils/settings/settings.go @@ -13,7 +13,7 @@ type Settings struct { path string } -func ReadSettings(path string) (*Settings, error) { +func ReadSettings(path string) (sett *Settings, err error) { if path == "" { homeDir, err := os.UserHomeDir() if err != nil { @@ -22,7 +22,7 @@ func ReadSettings(path string) (*Settings, error) { path = filepath.Join(homeDir, ".config", "homeops") } - err := os.Mkdir(path, 0755) + err = os.Mkdir(path, 0755) if err != nil { if !errors.Is(err, os.ErrExist) { return nil, err @@ -39,7 +39,12 @@ func ReadSettings(path string) (*Settings, error) { return nil, err } } else { - defer file.Close() + defer func() { + closeErr := file.Close() + if err == nil { + err = closeErr + } + }() err = json.NewDecoder(file).Decode(&settings) if err != nil && !errors.Is(err, io.EOF) { return nil, err @@ -51,12 +56,17 @@ func ReadSettings(path string) (*Settings, error) { return &settings, nil } -func (s *Settings) InsertAgentID(agentID string) error { +func (s *Settings) InsertAgentID(agentID string) (err error) { file, err := os.OpenFile(s.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { return err } - defer file.Close() + defer func() { + closeErr := file.Close() + if err == nil { + err = closeErr + } + }() sett := Settings{AgentID: agentID} diff --git a/api/gen/homeops/hub.pb.go b/api/gen/homeops/hub.pb.go index e8aad80..03f9fcc 100644 --- a/api/gen/homeops/hub.pb.go +++ b/api/gen/homeops/hub.pb.go @@ -2,7 +2,7 @@ // versions: // protoc-gen-go v1.36.11 // protoc v7.34.1 -// source: homeops/cmd.proto +// source: homeops/hub.proto package homeops @@ -322,11 +322,441 @@ func (x *RegisterAgentResponse) GetAgentId() string { return "" } +type ServerCommandRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + Args map[string]string `protobuf:"bytes,3,rep,name=args,proto3" json:"args,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + TimeoutSeconds int64 `protobuf:"varint,4,opt,name=timeout_seconds,json=timeoutSeconds,proto3" json:"timeout_seconds,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ServerCommandRequest) Reset() { + *x = ServerCommandRequest{} + mi := &file_homeops_hub_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ServerCommandRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ServerCommandRequest) ProtoMessage() {} + +func (x *ServerCommandRequest) ProtoReflect() protoreflect.Message { + mi := &file_homeops_hub_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ServerCommandRequest.ProtoReflect.Descriptor instead. +func (*ServerCommandRequest) Descriptor() ([]byte, []int) { + return file_homeops_hub_proto_rawDescGZIP(), []int{5} +} + +func (x *ServerCommandRequest) GetRequestId() string { + if x != nil { + return x.RequestId + } + return "" +} + +func (x *ServerCommandRequest) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *ServerCommandRequest) GetArgs() map[string]string { + if x != nil { + return x.Args + } + return nil +} + +func (x *ServerCommandRequest) GetTimeoutSeconds() int64 { + if x != nil { + return x.TimeoutSeconds + } + return 0 +} + +type AgentEvent struct { + state protoimpl.MessageState `protogen:"open.v1"` + AgentId string `protobuf:"bytes,1,opt,name=agent_id,json=agentId,proto3" json:"agent_id,omitempty"` + // Types that are valid to be assigned to Event: + // + // *AgentEvent_Heartbeat + // *AgentEvent_CommandResponse + // *AgentEvent_Alert + Event isAgentEvent_Event `protobuf_oneof:"event"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AgentEvent) Reset() { + *x = AgentEvent{} + mi := &file_homeops_hub_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AgentEvent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AgentEvent) ProtoMessage() {} + +func (x *AgentEvent) ProtoReflect() protoreflect.Message { + mi := &file_homeops_hub_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AgentEvent.ProtoReflect.Descriptor instead. +func (*AgentEvent) Descriptor() ([]byte, []int) { + return file_homeops_hub_proto_rawDescGZIP(), []int{6} +} + +func (x *AgentEvent) GetAgentId() string { + if x != nil { + return x.AgentId + } + return "" +} + +func (x *AgentEvent) GetEvent() isAgentEvent_Event { + if x != nil { + return x.Event + } + return nil +} + +func (x *AgentEvent) GetHeartbeat() *Heartbeat { + if x != nil { + if x, ok := x.Event.(*AgentEvent_Heartbeat); ok { + return x.Heartbeat + } + } + return nil +} + +func (x *AgentEvent) GetCommandResponse() *CommandResponse { + if x != nil { + if x, ok := x.Event.(*AgentEvent_CommandResponse); ok { + return x.CommandResponse + } + } + return nil +} + +func (x *AgentEvent) GetAlert() *Alert { + if x != nil { + if x, ok := x.Event.(*AgentEvent_Alert); ok { + return x.Alert + } + } + return nil +} + +type isAgentEvent_Event interface { + isAgentEvent_Event() +} + +type AgentEvent_Heartbeat struct { + Heartbeat *Heartbeat `protobuf:"bytes,2,opt,name=heartbeat,proto3,oneof"` +} + +type AgentEvent_CommandResponse struct { + CommandResponse *CommandResponse `protobuf:"bytes,3,opt,name=command_response,json=commandResponse,proto3,oneof"` +} + +type AgentEvent_Alert struct { + Alert *Alert `protobuf:"bytes,4,opt,name=alert,proto3,oneof"` +} + +func (*AgentEvent_Heartbeat) isAgentEvent_Event() {} + +func (*AgentEvent_CommandResponse) isAgentEvent_Event() {} + +func (*AgentEvent_Alert) isAgentEvent_Event() {} + +type Heartbeat struct { + state protoimpl.MessageState `protogen:"open.v1"` + Timestamp int64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Metrics *SystemMetrics `protobuf:"bytes,2,opt,name=metrics,proto3" json:"metrics,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Heartbeat) Reset() { + *x = Heartbeat{} + mi := &file_homeops_hub_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Heartbeat) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Heartbeat) ProtoMessage() {} + +func (x *Heartbeat) ProtoReflect() protoreflect.Message { + mi := &file_homeops_hub_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Heartbeat.ProtoReflect.Descriptor instead. +func (*Heartbeat) Descriptor() ([]byte, []int) { + return file_homeops_hub_proto_rawDescGZIP(), []int{7} +} + +func (x *Heartbeat) GetTimestamp() int64 { + if x != nil { + return x.Timestamp + } + return 0 +} + +func (x *Heartbeat) GetMetrics() *SystemMetrics { + if x != nil { + return x.Metrics + } + return nil +} + +type SystemMetrics struct { + state protoimpl.MessageState `protogen:"open.v1"` + CpuUsage float32 `protobuf:"fixed32,1,opt,name=cpu_usage,json=cpuUsage,proto3" json:"cpu_usage,omitempty"` + MemoryUsage float32 `protobuf:"fixed32,2,opt,name=memory_usage,json=memoryUsage,proto3" json:"memory_usage,omitempty"` + DiskUsage float32 `protobuf:"fixed32,3,opt,name=disk_usage,json=diskUsage,proto3" json:"disk_usage,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *SystemMetrics) Reset() { + *x = SystemMetrics{} + mi := &file_homeops_hub_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *SystemMetrics) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SystemMetrics) ProtoMessage() {} + +func (x *SystemMetrics) ProtoReflect() protoreflect.Message { + mi := &file_homeops_hub_proto_msgTypes[8] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SystemMetrics.ProtoReflect.Descriptor instead. +func (*SystemMetrics) Descriptor() ([]byte, []int) { + return file_homeops_hub_proto_rawDescGZIP(), []int{8} +} + +func (x *SystemMetrics) GetCpuUsage() float32 { + if x != nil { + return x.CpuUsage + } + return 0 +} + +func (x *SystemMetrics) GetMemoryUsage() float32 { + if x != nil { + return x.MemoryUsage + } + return 0 +} + +func (x *SystemMetrics) GetDiskUsage() float32 { + if x != nil { + return x.DiskUsage + } + return 0 +} + +type CommandResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` + Success bool `protobuf:"varint,2,opt,name=success,proto3" json:"success,omitempty"` + Output string `protobuf:"bytes,3,opt,name=output,proto3" json:"output,omitempty"` + Error string `protobuf:"bytes,4,opt,name=error,proto3" json:"error,omitempty"` + ExecTimeMs int64 `protobuf:"varint,5,opt,name=exec_time_ms,json=execTimeMs,proto3" json:"exec_time_ms,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CommandResponse) Reset() { + *x = CommandResponse{} + mi := &file_homeops_hub_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CommandResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CommandResponse) ProtoMessage() {} + +func (x *CommandResponse) ProtoReflect() protoreflect.Message { + mi := &file_homeops_hub_proto_msgTypes[9] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CommandResponse.ProtoReflect.Descriptor instead. +func (*CommandResponse) Descriptor() ([]byte, []int) { + return file_homeops_hub_proto_rawDescGZIP(), []int{9} +} + +func (x *CommandResponse) GetRequestId() string { + if x != nil { + return x.RequestId + } + return "" +} + +func (x *CommandResponse) GetSuccess() bool { + if x != nil { + return x.Success + } + return false +} + +func (x *CommandResponse) GetOutput() string { + if x != nil { + return x.Output + } + return "" +} + +func (x *CommandResponse) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +func (x *CommandResponse) GetExecTimeMs() int64 { + if x != nil { + return x.ExecTimeMs + } + return 0 +} + +type Alert struct { + state protoimpl.MessageState `protogen:"open.v1"` + Timestamp int64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Level string `protobuf:"bytes,2,opt,name=level,proto3" json:"level,omitempty"` + Title string `protobuf:"bytes,3,opt,name=title,proto3" json:"title,omitempty"` + Description string `protobuf:"bytes,4,opt,name=description,proto3" json:"description,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Alert) Reset() { + *x = Alert{} + mi := &file_homeops_hub_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Alert) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Alert) ProtoMessage() {} + +func (x *Alert) ProtoReflect() protoreflect.Message { + mi := &file_homeops_hub_proto_msgTypes[10] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Alert.ProtoReflect.Descriptor instead. +func (*Alert) Descriptor() ([]byte, []int) { + return file_homeops_hub_proto_rawDescGZIP(), []int{10} +} + +func (x *Alert) GetTimestamp() int64 { + if x != nil { + return x.Timestamp + } + return 0 +} + +func (x *Alert) GetLevel() string { + if x != nil { + return x.Level + } + return "" +} + +func (x *Alert) GetTitle() string { + if x != nil { + return x.Title + } + return "" +} + +func (x *Alert) GetDescription() string { + if x != nil { + return x.Description + } + return "" +} + var File_homeops_hub_proto protoreflect.FileDescriptor const file_homeops_hub_proto_rawDesc = "" + "\n" + - "\x11homeops/cmd.proto\x1a\x1bgoogle/protobuf/empty.proto\"\"\n" + + "\x11homeops/hub.proto\x1a\x1bgoogle/protobuf/empty.proto\"\"\n" + "\fPongResponse\x12\x12\n" + "\x04pong\x18\x01 \x01(\tR\x04pong\"\xb6\x01\n" + "\x14RegisterAgentRequest\x12\x19\n" + @@ -350,10 +780,49 @@ const file_homeops_hub_proto_rawDesc = "" + "\x06reason\x18\x04 \x01(\tR\x06reason\"n\n" + "\x15RegisterAgentResponse\x12:\n" + "\x19heartbeat_interval_second\x18\x01 \x01(\x03R\x17heartbeatIntervalSecond\x12\x19\n" + - "\bagent_id\x18\x02 \x01(\tR\aagentId2x\n" + + "\bagent_id\x18\x02 \x01(\tR\aagentId\"\xe0\x01\n" + + "\x14ServerCommandRequest\x12\x1d\n" + + "\n" + + "request_id\x18\x01 \x01(\tR\trequestId\x12\x12\n" + + "\x04name\x18\x02 \x01(\tR\x04name\x123\n" + + "\x04args\x18\x03 \x03(\v2\x1f.ServerCommandRequest.ArgsEntryR\x04args\x12'\n" + + "\x0ftimeout_seconds\x18\x04 \x01(\x03R\x0etimeoutSeconds\x1a7\n" + + "\tArgsEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\xbb\x01\n" + + "\n" + + "AgentEvent\x12\x19\n" + + "\bagent_id\x18\x01 \x01(\tR\aagentId\x12*\n" + + "\theartbeat\x18\x02 \x01(\v2\n" + + ".HeartbeatH\x00R\theartbeat\x12=\n" + + "\x10command_response\x18\x03 \x01(\v2\x10.CommandResponseH\x00R\x0fcommandResponse\x12\x1e\n" + + "\x05alert\x18\x04 \x01(\v2\x06.AlertH\x00R\x05alertB\a\n" + + "\x05event\"S\n" + + "\tHeartbeat\x12\x1c\n" + + "\ttimestamp\x18\x01 \x01(\x03R\ttimestamp\x12(\n" + + "\ametrics\x18\x02 \x01(\v2\x0e.SystemMetricsR\ametrics\"n\n" + + "\rSystemMetrics\x12\x1b\n" + + "\tcpu_usage\x18\x01 \x01(\x02R\bcpuUsage\x12!\n" + + "\fmemory_usage\x18\x02 \x01(\x02R\vmemoryUsage\x12\x1d\n" + + "\n" + + "disk_usage\x18\x03 \x01(\x02R\tdiskUsage\"\x9a\x01\n" + + "\x0fCommandResponse\x12\x1d\n" + + "\n" + + "request_id\x18\x01 \x01(\tR\trequestId\x12\x18\n" + + "\asuccess\x18\x02 \x01(\bR\asuccess\x12\x16\n" + + "\x06output\x18\x03 \x01(\tR\x06output\x12\x14\n" + + "\x05error\x18\x04 \x01(\tR\x05error\x12 \n" + + "\fexec_time_ms\x18\x05 \x01(\x03R\n" + + "execTimeMs\"s\n" + + "\x05Alert\x12\x1c\n" + + "\ttimestamp\x18\x01 \x01(\x03R\ttimestamp\x12\x14\n" + + "\x05level\x18\x02 \x01(\tR\x05level\x12\x14\n" + + "\x05title\x18\x03 \x01(\tR\x05title\x12 \n" + + "\vdescription\x18\x04 \x01(\tR\vdescription2\xb6\x01\n" + "\x03Hub\x12/\n" + "\x04Ping\x12\x16.google.protobuf.Empty\x1a\r.PongResponse\"\x00\x12@\n" + - "\rRegisterAgent\x12\x15.RegisterAgentRequest\x1a\x16.RegisterAgentResponse\"\x00B HostInfo - 3, // 1: RegisterAgentRequest.capability:type_name -> Capability - 5, // 2: Hub.Ping:input_type -> google.protobuf.Empty - 1, // 3: Hub.RegisterAgent:input_type -> RegisterAgentRequest - 0, // 4: Hub.Ping:output_type -> PongResponse - 4, // 5: Hub.RegisterAgent:output_type -> RegisterAgentResponse - 4, // [4:6] is the sub-list for method output_type - 2, // [2:4] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 2, // 0: RegisterAgentRequest.host:type_name -> HostInfo + 3, // 1: RegisterAgentRequest.capability:type_name -> Capability + 11, // 2: ServerCommandRequest.args:type_name -> ServerCommandRequest.ArgsEntry + 7, // 3: AgentEvent.heartbeat:type_name -> Heartbeat + 9, // 4: AgentEvent.command_response:type_name -> CommandResponse + 10, // 5: AgentEvent.alert:type_name -> Alert + 8, // 6: Heartbeat.metrics:type_name -> SystemMetrics + 12, // 7: Hub.Ping:input_type -> google.protobuf.Empty + 1, // 8: Hub.RegisterAgent:input_type -> RegisterAgentRequest + 6, // 9: Hub.StreamConnection:input_type -> AgentEvent + 0, // 10: Hub.Ping:output_type -> PongResponse + 4, // 11: Hub.RegisterAgent:output_type -> RegisterAgentResponse + 5, // 12: Hub.StreamConnection:output_type -> ServerCommandRequest + 10, // [10:13] is the sub-list for method output_type + 7, // [7:10] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name } func init() { file_homeops_hub_proto_init() } @@ -395,13 +878,18 @@ func file_homeops_hub_proto_init() { if File_homeops_hub_proto != nil { return } + file_homeops_hub_proto_msgTypes[6].OneofWrappers = []any{ + (*AgentEvent_Heartbeat)(nil), + (*AgentEvent_CommandResponse)(nil), + (*AgentEvent_Alert)(nil), + } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_homeops_hub_proto_rawDesc), len(file_homeops_hub_proto_rawDesc)), NumEnums: 0, - NumMessages: 5, + NumMessages: 12, NumExtensions: 0, NumServices: 1, }, diff --git a/api/gen/homeops/hub_grpc.pb.go b/api/gen/homeops/hub_grpc.pb.go index c5a605c..d689399 100644 --- a/api/gen/homeops/hub_grpc.pb.go +++ b/api/gen/homeops/hub_grpc.pb.go @@ -2,7 +2,7 @@ // versions: // - protoc-gen-go-grpc v1.6.1 // - protoc v7.34.1 -// source: homeops/cmd.proto +// source: homeops/hub.proto package homeops @@ -20,8 +20,9 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - Hub_Ping_FullMethodName = "/Hub/Ping" - Hub_RegisterAgent_FullMethodName = "/Hub/RegisterAgent" + Hub_Ping_FullMethodName = "/Hub/Ping" + Hub_RegisterAgent_FullMethodName = "/Hub/RegisterAgent" + Hub_StreamConnection_FullMethodName = "/Hub/StreamConnection" ) // HubClient is the client API for Hub service. @@ -30,6 +31,7 @@ const ( type HubClient interface { Ping(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*PongResponse, error) RegisterAgent(ctx context.Context, in *RegisterAgentRequest, opts ...grpc.CallOption) (*RegisterAgentResponse, error) + StreamConnection(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[AgentEvent, ServerCommandRequest], error) } type hubClient struct { @@ -60,12 +62,26 @@ func (c *hubClient) RegisterAgent(ctx context.Context, in *RegisterAgentRequest, return out, nil } +func (c *hubClient) StreamConnection(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[AgentEvent, ServerCommandRequest], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &Hub_ServiceDesc.Streams[0], Hub_StreamConnection_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[AgentEvent, ServerCommandRequest]{ClientStream: stream} + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Hub_StreamConnectionClient = grpc.BidiStreamingClient[AgentEvent, ServerCommandRequest] + // HubServer is the server API for Hub service. // All implementations must embed UnimplementedHubServer // for forward compatibility. type HubServer interface { Ping(context.Context, *emptypb.Empty) (*PongResponse, error) RegisterAgent(context.Context, *RegisterAgentRequest) (*RegisterAgentResponse, error) + StreamConnection(grpc.BidiStreamingServer[AgentEvent, ServerCommandRequest]) error mustEmbedUnimplementedHubServer() } @@ -82,6 +98,9 @@ func (UnimplementedHubServer) Ping(context.Context, *emptypb.Empty) (*PongRespon func (UnimplementedHubServer) RegisterAgent(context.Context, *RegisterAgentRequest) (*RegisterAgentResponse, error) { return nil, status.Error(codes.Unimplemented, "method RegisterAgent not implemented") } +func (UnimplementedHubServer) StreamConnection(grpc.BidiStreamingServer[AgentEvent, ServerCommandRequest]) error { + return status.Error(codes.Unimplemented, "method StreamConnection not implemented") +} func (UnimplementedHubServer) mustEmbedUnimplementedHubServer() {} func (UnimplementedHubServer) testEmbeddedByValue() {} @@ -139,6 +158,13 @@ func _Hub_RegisterAgent_Handler(srv interface{}, ctx context.Context, dec func(i return interceptor(ctx, in, info, handler) } +func _Hub_StreamConnection_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(HubServer).StreamConnection(&grpc.GenericServerStream[AgentEvent, ServerCommandRequest]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Hub_StreamConnectionServer = grpc.BidiStreamingServer[AgentEvent, ServerCommandRequest] + // Hub_ServiceDesc is the grpc.ServiceDesc for Hub service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -155,6 +181,13 @@ var Hub_ServiceDesc = grpc.ServiceDesc{ Handler: _Hub_RegisterAgent_Handler, }, }, - Streams: []grpc.StreamDesc{}, - Metadata: "homeops/cmd.proto", + Streams: []grpc.StreamDesc{ + { + StreamName: "StreamConnection", + Handler: _Hub_StreamConnection_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "homeops/hub.proto", } diff --git a/api/proto/homeops/hub.proto b/api/proto/homeops/hub.proto index e7f32d9..301b992 100644 --- a/api/proto/homeops/hub.proto +++ b/api/proto/homeops/hub.proto @@ -7,6 +7,7 @@ option go_package = "github.com/lorsanstand/HomeOps-Hub/api/gen/homeops;homeops" service Hub { rpc Ping (google.protobuf.Empty) returns (PongResponse) {} rpc RegisterAgent (RegisterAgentRequest) returns (RegisterAgentResponse) {} + rpc StreamConnection (stream AgentEvent) returns (stream ServerCommandRequest) {} } message PongResponse { @@ -37,4 +38,47 @@ message Capability { message RegisterAgentResponse { int64 heartbeat_interval_second = 1; string agent_id = 2; +} + +message ServerCommandRequest { + string request_id = 1; + string name = 2; + map args = 3; + int64 timeout_seconds = 4; +} + +message AgentEvent { + string agent_id = 1; + + oneof event { + Heartbeat heartbeat = 2; + CommandResponse command_response = 3; + Alert alert = 4; + } +} + +message Heartbeat { + int64 timestamp = 1; + SystemMetrics metrics = 2; +} + +message SystemMetrics { + float cpu_usage = 1; + float memory_usage = 2; + float disk_usage = 3; +} + +message CommandResponse { + string request_id = 1; + bool success = 2; + string output = 3; + string error = 4; + int64 exec_time_ms = 5; +} + +message Alert { + int64 timestamp = 1; + string level = 2; + string title = 3; + string description = 4; } \ No newline at end of file diff --git a/hub/internal/app/app.go b/hub/internal/app/app.go index 9b825c3..c1db0e3 100644 --- a/hub/internal/app/app.go +++ b/hub/internal/app/app.go @@ -43,7 +43,6 @@ func (a *App) Run() { a.log.Error().Err(err).Msg("failed to connect to the database for migrations") return } - defer migratePGConn.Close() mgrt, err := migrator.NewMigrator(hubdir.MigrationsFS, "migrations") if err != nil { @@ -57,7 +56,9 @@ func (a *App) Run() { return } a.log.Info().Msg("migrations applied successfully") - migratePGConn.Close() + if err := migratePGConn.Close(); err != nil { + a.log.Warn().Err(err).Msg("failed to close migrate postgres connection") + } a.log.Info().Msg("creating database connection pool") pool, err := pgxpool.New(ctx, a.cfg.GetURLPostgres()) diff --git a/hub/internal/migrator/migrator.go b/hub/internal/migrator/migrator.go index 9f1effe..159bd8e 100644 --- a/hub/internal/migrator/migrator.go +++ b/hub/internal/migrator/migrator.go @@ -26,7 +26,7 @@ func NewMigrator(sqlFiles embed.FS, dirname string) (*Migrator, error) { return &Migrator{srcDriver: d}, nil } -func (m *Migrator) ApplyMigrations(db *sql.DB) error { +func (m *Migrator) ApplyMigrations(db *sql.DB) (err error) { driver, err := postgres.WithInstance(db, &postgres.Config{}) if err != nil { return fmt.Errorf("unable to create db instance: %w", err) @@ -37,7 +37,12 @@ func (m *Migrator) ApplyMigrations(db *sql.DB) error { return fmt.Errorf("unable to create migration: %w", err) } - defer migrator.Close() + defer func() { + closeErr, _ := migrator.Close() + if err == nil { + err = closeErr + } + }() if err = migrator.Up(); err != nil && !errors.Is(err, migrate.ErrNoChange) { return fmt.Errorf("unable to apply migrations: %w", err) diff --git a/hub/internal/service/hub_service/hub.go b/hub/internal/service/hub_service/hub.go index d9b7cae..00d7181 100644 --- a/hub/internal/service/hub_service/hub.go +++ b/hub/internal/service/hub_service/hub.go @@ -30,16 +30,16 @@ func NewHubService(store Store, logger zerolog.Logger) *HubService { } func (h *HubService) RegisterAgent(ctx context.Context, data domain.RegisterAgentRequest) (domain.RegisterAgentResponse, error) { - h.log.Debug().Str("agentID", data.AgentId).Str("agentName", data.AgentName).Msg("started registering agent") - agent, err := h.store.GetAgentByAgentID(ctx, data.AgentId) + h.log.Debug().Str("agentID", data.AgentID).Str("agentName", data.AgentName).Msg("started registering agent") + agent, err := h.store.GetAgentByAgentID(ctx, data.AgentID) if err != nil && !errors.Is(err, sql.ErrNoRows) { return domain.RegisterAgentResponse{}, fmt.Errorf("failed select agent to db: %w", err) } - if data.AgentId != "" && !errors.Is(err, sql.ErrNoRows) { + if data.AgentID != "" && !errors.Is(err, sql.ErrNoRows) { h.log.Debug().Str("agentID", agent.AgentID).Str("agentName", data.AgentName).Msg("agent exists, updating") - data.AgentId = agent.AgentID + data.AgentID = agent.AgentID agentStore := toCreateAgentModel(data) @@ -55,7 +55,7 @@ func (h *HubService) RegisterAgent(ctx context.Context, data domain.RegisterAgen return domain.RegisterAgentResponse{}, fmt.Errorf("generate agent ID: %w", err) } - data.AgentId = AgentID + data.AgentID = AgentID agentStore := toCreateAgentModel(data) diff --git a/hub/internal/service/hub_service/mapper.go b/hub/internal/service/hub_service/mapper.go index e28c843..9013dab 100644 --- a/hub/internal/service/hub_service/mapper.go +++ b/hub/internal/service/hub_service/mapper.go @@ -7,7 +7,7 @@ import ( func toCreateAgentModel(agent domain.RegisterAgentRequest) domainHub.CreateAgentModel { return domainHub.CreateAgentModel{ - AgentID: agent.AgentId, + AgentID: agent.AgentID, AgentName: agent.AgentName, Architecture: agent.Host.Arch, System: agent.Host.System, diff --git a/hub/internal/store/mapper.go b/hub/internal/store/mapper.go index 90e2290..b843735 100644 --- a/hub/internal/store/mapper.go +++ b/hub/internal/store/mapper.go @@ -16,7 +16,7 @@ func toDBAgent(agent domainHub.CreateAgentModel) gen2.CreateAgentParams { System: agent.System, Hostname: agent.Hostname, Version: agent.Version, - Capabilities: toJsonCapabilities(agent.Capabilities), + Capabilities: toJSONCapabilities(agent.Capabilities), } } @@ -28,11 +28,11 @@ func toUpdateDBAgent(agent domainHub.CreateAgentModel) gen2.UpdateAgentByIDParam System: agent.System, Hostname: agent.Hostname, Version: agent.Version, - Capabilities: toJsonCapabilities(agent.Capabilities), + Capabilities: toJSONCapabilities(agent.Capabilities), } } -func toJsonCapabilities(caps []domain.Capability) []byte { +func toJSONCapabilities(caps []domain.Capability) []byte { data, err := json.Marshal(caps) if err != nil { // Note: Error is silently handled - consider logging in production diff --git a/shared/domain/agent.go b/shared/domain/agent.go index 8a5f062..2a24ae5 100644 --- a/shared/domain/agent.go +++ b/shared/domain/agent.go @@ -1,7 +1,7 @@ package domain type RegisterAgentRequest struct { - AgentId string + AgentID string AgentName string AgentVersion string Host HostInfo diff --git a/shared/domain/mapper.go b/shared/domain/mapper.go index 2812e8e..587d361 100644 --- a/shared/domain/mapper.go +++ b/shared/domain/mapper.go @@ -10,7 +10,7 @@ func ToDomainAgentRequest(request *pb.RegisterAgentRequest) RegisterAgentRequest } return RegisterAgentRequest{ - AgentId: request.AgentId, + AgentID: request.AgentId, AgentName: request.AgentName, Host: HostInfo{ System: request.Host.System, @@ -53,7 +53,7 @@ func ToDomainCapabilities(capability []*pb.Capability) []Capability { func ToGRPCAgentRequest(request RegisterAgentRequest) pb.RegisterAgentRequest { return pb.RegisterAgentRequest{ - AgentId: request.AgentId, + AgentId: request.AgentID, AgentName: request.AgentName, Host: &pb.HostInfo{ Hostname: request.Host.Hostname,