Merge branch 'main' into feat/stream-hub

This commit is contained in:
2026-05-05 20:34:32 +03:00
13 changed files with 207 additions and 126 deletions
+12 -20
View File
@@ -1,13 +1,11 @@
package app
import (
"context"
"database/sql"
"fmt"
standartlog "log"
"net"
"github.com/jackc/pgx/v5/pgxpool"
hubdir "github.com/lorsanstand/HomeOps-Hub/hub/internal"
"github.com/lorsanstand/HomeOps-Hub/hub/internal/migrator"
grpcserv "github.com/lorsanstand/HomeOps-Hub/hub/internal/rpc"
@@ -15,6 +13,7 @@ import (
"github.com/lorsanstand/HomeOps-Hub/hub/internal/store"
"github.com/lorsanstand/HomeOps-Hub/shared/config"
"github.com/lorsanstand/HomeOps-Hub/shared/log"
_ "github.com/mattn/go-sqlite3"
"github.com/rs/zerolog"
)
@@ -36,14 +35,19 @@ func NewApp() *App {
}
func (a *App) Run() {
ctx := context.Background()
a.log.Info().Str("host", a.cfg.DBHost).Int("port", a.cfg.DBPort).Msg("connecting to database")
migratePGConn, err := sql.Open("pgx", a.cfg.GetURLPostgres())
a.log.Info().Msg("connecting to database")
DBConn, err := sql.Open("sqlite", "database.db")
if err != nil {
a.log.Error().Err(err).Msg("failed to connect to the database for migrations")
a.log.Error().Err(err).Msg("failed to connect to the database")
return
}
defer func() {
if err := DBConn.Close(); err != nil {
a.log.Warn().Err(err).Msg("failed to close migrate postgres connection")
}
}()
mgrt, err := migrator.NewMigrator(hubdir.MigrationsFS, "migrations")
if err != nil {
a.log.Error().Err(err).Msg("failed to create migrator")
@@ -51,25 +55,13 @@ func (a *App) Run() {
}
a.log.Info().Msg("applying database migrations")
if err = mgrt.ApplyMigrations(migratePGConn); err != nil {
if err = mgrt.ApplyMigrations(DBConn); err != nil {
a.log.Error().Err(err).Msg("migrations failed to apply")
return
}
a.log.Info().Msg("migrations applied successfully")
if err := migratePGConn.Close(); err != nil {
a.log.Warn().Err(err).Msg("failed to close migrate postgres connection")
}
a.log.Info().Msg("creating database connection pool")
pool, err := pgxpool.New(ctx, a.cfg.GetURLPostgres())
if err != nil {
a.log.Error().Err(err).Msg("failed to create database connection pool")
return
}
defer pool.Close()
a.log.Info().Msg("database connection pool created")
hubStore := store.NewHubStore(pool)
hubStore := store.NewHubStore(DBConn)
hubService := hub_service.NewHubService(hubStore, a.log)
a.log.Info().Msg("starting hub service")
@@ -1,14 +1,14 @@
CREATE TABLE agents (
id SERIAL PRIMARY KEY,
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_id VARCHAR(32) UNIQUE NOT NULL,
agent_name VARCHAR(255),
architecture VARCHAR(10) NOT NULL,
system VARCHAR(10) NOT NULL,
hostname VARCHAR(100) NOT NULL,
version VARCHAR(10) NOT NULL,
capabilities JSON,
registered_at timestamp without time zone DEFAULT now() NOT NULL
capabilities TEXT,
registered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
);
CREATE UNIQUE INDEX idx_agent_id ON agents (id);
CREATE UNIQUE INDEX idx_agent_id_id On agents (agent_id)
CREATE UNIQUE INDEX idx_agent_id_id ON agents (agent_id);
+6 -11
View File
@@ -6,10 +6,11 @@ import (
"errors"
"fmt"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/golang-migrate/migrate/v4/database/sqlite"
_ "github.com/golang-migrate/migrate/v4/database/sqlite"
_ "github.com/golang-migrate/migrate/v4/database/sqlite3"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database/postgres"
"github.com/golang-migrate/migrate/v4/source"
"github.com/golang-migrate/migrate/v4/source/iofs"
)
@@ -27,24 +28,18 @@ func NewMigrator(sqlFiles embed.FS, dirname string) (*Migrator, error) {
}
func (m *Migrator) ApplyMigrations(db *sql.DB) (err error) {
driver, err := postgres.WithInstance(db, &postgres.Config{})
driver, err := sqlite.WithInstance(db, &sqlite.Config{})
if err != nil {
return fmt.Errorf("unable to create db instance: %w", err)
}
migrator, err := migrate.NewWithInstance("migration_embeded_sql_files", m.srcDriver, "psql_db", driver)
migrator, err := migrate.NewWithInstance("migration_embeded_sql_files", m.srcDriver, "sqlite", driver)
if err != nil {
return fmt.Errorf("unable to create migration: %w", err)
}
defer func() {
closeErr, _ := migrator.Close()
if err == nil {
err = closeErr
}
}()
if err = migrator.Up(); err != nil && !errors.Is(err, migrate.ErrNoChange) {
migrator.Close()
return fmt.Errorf("unable to apply migrations: %w", err)
}
+12 -3
View File
@@ -9,6 +9,8 @@ import (
)
func toDBAgent(agent domainHub.CreateAgentModel) gen2.CreateAgentParams {
jsonCaps := toJSONCapabilities(agent.Capabilities)
capsStr := string(jsonCaps)
return gen2.CreateAgentParams{
AgentID: agent.AgentID,
AgentName: &agent.AgentName,
@@ -16,11 +18,13 @@ func toDBAgent(agent domainHub.CreateAgentModel) gen2.CreateAgentParams {
System: agent.System,
Hostname: agent.Hostname,
Version: agent.Version,
Capabilities: toJSONCapabilities(agent.Capabilities),
Capabilities: &capsStr,
}
}
func toUpdateDBAgent(agent domainHub.CreateAgentModel) gen2.UpdateAgentByIDParams {
jsonCaps := toJSONCapabilities(agent.Capabilities)
capsStr := string(jsonCaps)
return gen2.UpdateAgentByIDParams{
AgentID: agent.AgentID,
AgentName: &agent.AgentName,
@@ -28,7 +32,7 @@ func toUpdateDBAgent(agent domainHub.CreateAgentModel) gen2.UpdateAgentByIDParam
System: agent.System,
Hostname: agent.Hostname,
Version: agent.Version,
Capabilities: toJSONCapabilities(agent.Capabilities),
Capabilities: &capsStr,
}
}
@@ -47,6 +51,11 @@ func toAgentModel(dbAgent gen2.Agent) domainHub.AgentModel {
dbAgentName = *dbAgent.AgentName
}
var capsBytes []byte
if dbAgent.Capabilities != nil {
capsBytes = []byte(*dbAgent.Capabilities)
}
return domainHub.AgentModel{
ID: int(dbAgent.ID),
AgentID: dbAgent.AgentID,
@@ -54,7 +63,7 @@ func toAgentModel(dbAgent gen2.Agent) domainHub.AgentModel {
Architecture: dbAgent.Architecture,
System: dbAgent.System,
Hostname: dbAgent.Hostname,
Capabilities: toDomainCapabilities(dbAgent.Capabilities),
Capabilities: toDomainCapabilities(capsBytes),
}
}
+20 -14
View File
@@ -1,7 +1,7 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.30.0
// source: cmd.sql
// source: agent.sql
package gen
@@ -11,7 +11,7 @@ import (
const createAgent = `-- name: CreateAgent :exec
INSERT INTO agents (agent_id, agent_name, architecture, system, hostname, version, capabilities)
VALUES ($1, $2, $3, $4, $5, $6, $7)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
`
type CreateAgentParams struct {
@@ -21,11 +21,11 @@ type CreateAgentParams struct {
System string
Hostname string
Version string
Capabilities []byte
Capabilities *string
}
func (q *Queries) CreateAgent(ctx context.Context, arg CreateAgentParams) error {
_, err := q.db.Exec(ctx, createAgent,
_, err := q.db.ExecContext(ctx, createAgent,
arg.AgentID,
arg.AgentName,
arg.Architecture,
@@ -39,11 +39,11 @@ func (q *Queries) CreateAgent(ctx context.Context, arg CreateAgentParams) error
const getAgentByAgentID = `-- name: GetAgentByAgentID :one
SELECT id, agent_id, agent_name, architecture, system, hostname, version, capabilities, registered_at from agents
WHERE agent_id=$1
WHERE agent_id = ?1
`
func (q *Queries) GetAgentByAgentID(ctx context.Context, agentID string) (Agent, error) {
row := q.db.QueryRow(ctx, getAgentByAgentID, agentID)
row := q.db.QueryRowContext(ctx, getAgentByAgentID, agentID)
var i Agent
err := row.Scan(
&i.ID,
@@ -61,11 +61,11 @@ func (q *Queries) GetAgentByAgentID(ctx context.Context, agentID string) (Agent,
const getAgentByID = `-- name: GetAgentByID :one
SELECT id, agent_id, agent_name, architecture, system, hostname, version, capabilities, registered_at from agents
WHERE id=$1
WHERE id = ?1
`
func (q *Queries) GetAgentByID(ctx context.Context, id int32) (Agent, error) {
row := q.db.QueryRow(ctx, getAgentByID, id)
func (q *Queries) GetAgentByID(ctx context.Context, id int64) (Agent, error) {
row := q.db.QueryRowContext(ctx, getAgentByID, id)
var i Agent
err := row.Scan(
&i.ID,
@@ -83,8 +83,14 @@ func (q *Queries) GetAgentByID(ctx context.Context, id int32) (Agent, error) {
const updateAgentByID = `-- name: UpdateAgentByID :exec
UPDATE agents
SET agent_id=$1, agent_name=$2, architecture=$3, system=$4, hostname=$5, version=$6, capabilities=$7
WHERE id=$8
SET agent_id = ?1,
agent_name = ?2,
architecture = ?3,
system = ?4,
hostname = ?5,
version = ?6,
capabilities = ?7
WHERE id = ?8
`
type UpdateAgentByIDParams struct {
@@ -94,12 +100,12 @@ type UpdateAgentByIDParams struct {
System string
Hostname string
Version string
Capabilities []byte
ID int32
Capabilities *string
ID int64
}
func (q *Queries) UpdateAgentByID(ctx context.Context, arg UpdateAgentByIDParams) error {
_, err := q.db.Exec(ctx, updateAgentByID,
_, err := q.db.ExecContext(ctx, updateAgentByID,
arg.AgentID,
arg.AgentName,
arg.Architecture,
+6 -7
View File
@@ -6,15 +6,14 @@ package gen
import (
"context"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"database/sql"
)
type DBTX interface {
Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
Query(context.Context, string, ...interface{}) (pgx.Rows, error)
QueryRow(context.Context, string, ...interface{}) pgx.Row
ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
PrepareContext(context.Context, string) (*sql.Stmt, error)
QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
QueryRowContext(context.Context, string, ...interface{}) *sql.Row
}
func New(db DBTX) *Queries {
@@ -25,7 +24,7 @@ type Queries struct {
db DBTX
}
func (q *Queries) WithTx(tx pgx.Tx) *Queries {
func (q *Queries) WithTx(tx *sql.Tx) *Queries {
return &Queries{
db: tx,
}
+4 -4
View File
@@ -5,17 +5,17 @@
package gen
import (
"github.com/jackc/pgx/v5/pgtype"
"time"
)
type Agent struct {
ID int32
ID int64
AgentID string
AgentName *string
Architecture string
System string
Hostname string
Version string
Capabilities []byte
RegisteredAt pgtype.Timestamp
Capabilities *string
RegisteredAt time.Time
}
+11 -5
View File
@@ -1,16 +1,22 @@
-- name: CreateAgent :exec
INSERT INTO agents (agent_id, agent_name, architecture, system, hostname, version, capabilities)
VALUES ($1, $2, $3, $4, $5, $6, $7);
VALUES (:agent_id, :agent_name, :architecture, :system, :hostname, :version, :capabilities);
-- name: GetAgentByID :one
SELECT * from agents
WHERE id=$1;
WHERE id = :id;
-- name: GetAgentByAgentID :one
SELECT * from agents
WHERE agent_id=$1;
WHERE agent_id = :agent_id;
-- name: UpdateAgentByID :exec
UPDATE agents
SET agent_id=$1, agent_name=$2, architecture=$3, system=$4, hostname=$5, version=$6, capabilities=$7
WHERE id=$8;
SET agent_id = :agent_id,
agent_name = :agent_name,
architecture = :architecture,
system = :system,
hostname = :hostname,
version = :version,
capabilities = :capabilities
WHERE id = :id;
+3 -3
View File
@@ -2,8 +2,8 @@ package store
import (
"context"
"database/sql"
"github.com/jackc/pgx/v5/pgxpool"
domainHub "github.com/lorsanstand/HomeOps-Hub/hub/internal/domain"
"github.com/lorsanstand/HomeOps-Hub/hub/internal/store/sqlc/gen"
)
@@ -12,7 +12,7 @@ type HubStore struct {
queries *gen.Queries
}
func NewHubStore(db *pgxpool.Pool) *HubStore {
func NewHubStore(db *sql.DB) *HubStore {
queries := gen.New(db)
return &HubStore{queries}
}
@@ -31,6 +31,6 @@ func (h *HubStore) GetAgentByAgentID(ctx context.Context, AgentID string) (domai
func (h *HubStore) UpdateAgentByID(ctx context.Context, ID int, updateAgent domainHub.CreateAgentModel) error {
data := toUpdateDBAgent(updateAgent)
data.ID = int32(ID)
data.ID = int64(ID)
return h.queries.UpdateAgentByID(ctx, data)
}