Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 23 additions & 20 deletions app/services/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"os"
"time"

"github.com/labstack/gommon/log"

"hostlink/app/services/agentstate"
"hostlink/config/appconf"
"hostlink/domain/credential"
Expand All @@ -27,14 +29,14 @@ type Pusher interface {
}

type metricspusher struct {
apiserver apiserver.MetricsOperations
agentstate agentstate.Operations
metricscollector pgmetrics.Collector
syscollector sysmetrics.Collector
netcollector networkmetrics.Collector
storagecollector storagemetrics.Collector
crypto crypto.Service
privateKeyPath string
apiserver apiserver.MetricsOperations
agentstate agentstate.Operations
metricscollector pgmetrics.Collector
syscollector sysmetrics.Collector
netcollector networkmetrics.Collector
storagecollector storagemetrics.Collector
crypto crypto.Service
privateKeyPath string
}

func NewWithConf() (*metricspusher, error) {
Expand Down Expand Up @@ -130,11 +132,10 @@ func (mp *metricspusher) Push(cred credential.Credential) error {

ctx := context.Background()
var metricSets []domainmetrics.MetricSet
var collectionErrors []error

sysMetrics, err := mp.syscollector.Collect(ctx)
if err != nil {
collectionErrors = append(collectionErrors, fmt.Errorf("system metrics: %w", err))
log.Warnf("system metrics collection failed: %v", err)
} else {
metricSets = append(metricSets, domainmetrics.MetricSet{
Type: domainmetrics.MetricTypeSystem,
Expand All @@ -144,7 +145,7 @@ func (mp *metricspusher) Push(cred credential.Credential) error {

netMetrics, err := mp.netcollector.Collect(ctx)
if err != nil {
collectionErrors = append(collectionErrors, fmt.Errorf("network metrics: %w", err))
log.Warnf("network metrics collection failed: %v", err)
} else {
metricSets = append(metricSets, domainmetrics.MetricSet{
Type: domainmetrics.MetricTypeNetwork,
Expand All @@ -154,17 +155,19 @@ func (mp *metricspusher) Push(cred credential.Credential) error {

dbMetrics, err := mp.metricscollector.Collect(cred)
if err != nil {
collectionErrors = append(collectionErrors, fmt.Errorf("database metrics: %w", err))
log.Warnf("database metrics collection failed: %v", err)
dbMetrics = domainmetrics.PostgreSQLDatabaseMetrics{Up: false}
} else {
metricSets = append(metricSets, domainmetrics.MetricSet{
Type: domainmetrics.MetricTypePostgreSQLDatabase,
Metrics: dbMetrics,
})
dbMetrics.Up = true
}
metricSets = append(metricSets, domainmetrics.MetricSet{
Type: domainmetrics.MetricTypePostgreSQLDatabase,
Metrics: dbMetrics,
})

storageMetrics, err := mp.storagecollector.Collect(ctx)
if err != nil {
collectionErrors = append(collectionErrors, fmt.Errorf("storage metrics: %w", err))
log.Warnf("storage metrics collection failed: %v", err)
} else {
for _, sm := range storageMetrics {
metricSets = append(metricSets, domainmetrics.MetricSet{
Expand All @@ -180,9 +183,9 @@ func (mp *metricspusher) Push(cred credential.Credential) error {
}
}

if len(metricSets) == 0 {
return fmt.Errorf("all metrics collection failed: %v", collectionErrors)
}
// If only the postgresql.database metric set exists (with up=false) and
// all other collectors failed, we still push so the server knows the agent
// is alive and PostgreSQL status is reported.

hostname, _ := os.Hostname()

Expand Down
105 changes: 89 additions & 16 deletions app/services/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func TestPush_SystemMetricsFailure_StillPushesDbMetrics(t *testing.T) {
setupNetCollectorMocks(mocks.netcollector)
setupStorageCollectorMocks(mocks.storagecollector)
mocks.collector.On("Collect", testCred).
Return(domainmetrics.PostgreSQLDatabaseMetrics{ConnectionsTotal: 5}, nil)
Return(domainmetrics.PostgreSQLDatabaseMetrics{Up: true, ConnectionsTotal: 5}, nil)
mocks.apiserver.On("PushMetrics", mock.Anything, mock.MatchedBy(func(p domainmetrics.MetricPayload) bool {
hasNetwork := false
hasDb := false
Expand All @@ -469,7 +469,8 @@ func TestPush_SystemMetricsFailure_StillPushesDbMetrics(t *testing.T) {
hasNetwork = true
}
if ms.Type == domainmetrics.MetricTypePostgreSQLDatabase {
hasDb = true
dbMetrics := ms.Metrics.(domainmetrics.PostgreSQLDatabaseMetrics)
hasDb = dbMetrics.Up
}
if ms.Type == domainmetrics.MetricTypeStorage {
hasStorage = true
Expand All @@ -486,7 +487,7 @@ func TestPush_SystemMetricsFailure_StillPushesDbMetrics(t *testing.T) {
}

// Push Tests - Partial Collection (system succeeds, db fails)
func TestPush_DatabaseMetricsFailure_StillPushesSystemMetrics(t *testing.T) {
func TestPush_DatabaseMetricsFailure_StillPushesSystemMetricsAndDbWithUpFalse(t *testing.T) {
mp, mocks := setupTestMetricsPusher()
testCred := credential.Credential{Host: "localhost", DataDirectory: "/var/lib/postgresql"}
collectErr := errors.New("connection refused")
Expand All @@ -501,6 +502,7 @@ func TestPush_DatabaseMetricsFailure_StillPushesSystemMetrics(t *testing.T) {
hasSys := false
hasNetwork := false
hasStorage := false
hasDbWithUpFalse := false
for _, ms := range p.MetricSets {
if ms.Type == domainmetrics.MetricTypeSystem {
hasSys = true
Expand All @@ -511,8 +513,12 @@ func TestPush_DatabaseMetricsFailure_StillPushesSystemMetrics(t *testing.T) {
if ms.Type == domainmetrics.MetricTypeStorage {
hasStorage = true
}
if ms.Type == domainmetrics.MetricTypePostgreSQLDatabase {
dbMetrics := ms.Metrics.(domainmetrics.PostgreSQLDatabaseMetrics)
hasDbWithUpFalse = !dbMetrics.Up
}
}
return hasSys && hasNetwork && hasStorage
return hasSys && hasNetwork && hasStorage && hasDbWithUpFalse
})).Return(nil)

err := mp.Push(testCred)
Expand All @@ -522,8 +528,8 @@ func TestPush_DatabaseMetricsFailure_StillPushesSystemMetrics(t *testing.T) {
mocks.apiserver.AssertExpectations(t)
}

// Push Tests - All Collections Fail
func TestPush_AllCollectionsFail(t *testing.T) {
// Push Tests - All Collections Fail — still pushes postgresql.database with up=false
func TestPush_AllCollectionsFail_StillPushesDbWithUpFalse(t *testing.T) {
mp, mocks := setupTestMetricsPusher()
testCred := credential.Credential{Host: "localhost", DataDirectory: "/var/lib/postgresql"}

Expand All @@ -536,12 +542,22 @@ func TestPush_AllCollectionsFail(t *testing.T) {
Return(nil, errors.New("storage failed"))
mocks.collector.On("Collect", testCred).
Return(domainmetrics.PostgreSQLDatabaseMetrics{}, errors.New("connection refused"))
mocks.apiserver.On("PushMetrics", mock.Anything, mock.MatchedBy(func(p domainmetrics.MetricPayload) bool {
if len(p.MetricSets) != 1 {
return false
}
ms := p.MetricSets[0]
if ms.Type != domainmetrics.MetricTypePostgreSQLDatabase {
return false
}
dbMetrics := ms.Metrics.(domainmetrics.PostgreSQLDatabaseMetrics)
return !dbMetrics.Up
})).Return(nil)

err := mp.Push(testCred)

assert.Error(t, err)
assert.Contains(t, err.Error(), "all metrics collection failed")
mocks.apiserver.AssertNotCalled(t, "PushMetrics")
assert.NoError(t, err)
mocks.apiserver.AssertExpectations(t)
}

func TestPush_APIServerPushFailure(t *testing.T) {
Expand All @@ -554,7 +570,7 @@ func TestPush_APIServerPushFailure(t *testing.T) {
setupNetCollectorMocks(mocks.netcollector)
setupStorageCollectorMocks(mocks.storagecollector)
mocks.collector.On("Collect", testCred).
Return(domainmetrics.PostgreSQLDatabaseMetrics{ConnectionsTotal: 5}, nil)
Return(domainmetrics.PostgreSQLDatabaseMetrics{Up: true, ConnectionsTotal: 5}, nil)
mocks.apiserver.On("PushMetrics", mock.Anything, mock.Anything).
Return(pushErr)

Expand All @@ -579,6 +595,7 @@ func TestPush_Success_ValidatesPayloadSchema(t *testing.T) {
setupStorageCollectorMocks(mocks.storagecollector)
mocks.collector.On("Collect", testCred).
Return(domainmetrics.PostgreSQLDatabaseMetrics{
Up: true,
ConnectionsTotal: 10,
MaxConnections: 100,
CacheHitRatio: 95.5,
Expand Down Expand Up @@ -653,6 +670,9 @@ func TestPush_Success_ValidatesPayloadSchema(t *testing.T) {
}

// Check database metrics values
if !dbMetrics.Up {
return false
}
if dbMetrics.ConnectionsTotal != 10 {
return false
}
Expand Down Expand Up @@ -695,7 +715,7 @@ func TestPush_ContextPropagation(t *testing.T) {
setupNetCollectorMocks(mocks.netcollector)
setupStorageCollectorMocks(mocks.storagecollector)
mocks.collector.On("Collect", testCred).
Return(domainmetrics.PostgreSQLDatabaseMetrics{}, nil)
Return(domainmetrics.PostgreSQLDatabaseMetrics{Up: true}, nil)
mocks.apiserver.On("PushMetrics", mock.MatchedBy(func(ctx context.Context) bool {
return ctx != nil
}), mock.Anything).
Expand Down Expand Up @@ -742,7 +762,7 @@ func TestPush_CredentialPassedCorrectly(t *testing.T) {
c.Port == testCred.Port &&
c.Username == testCred.Username &&
c.Dialect == testCred.Dialect
})).Return(domainmetrics.PostgreSQLDatabaseMetrics{}, nil)
})).Return(domainmetrics.PostgreSQLDatabaseMetrics{Up: true}, nil)
mocks.apiserver.On("PushMetrics", mock.Anything, mock.Anything).
Return(nil)

Expand Down Expand Up @@ -793,6 +813,59 @@ func setupStorageCollectorMocks(collector *MockStorageCollector) {
}, nil)
}

// Verifies database down sends up=false with zero metrics
func TestPush_DatabaseDown_SendsUpFalseWithZeroMetrics(t *testing.T) {
mp, mocks := setupTestMetricsPusher()
testCred := credential.Credential{Host: "localhost", DataDirectory: "/var/lib/postgresql"}

mocks.agentstate.On("GetAgentID").Return("agent-123")
setupSysCollectorMocks(mocks.syscollector)
setupNetCollectorMocks(mocks.netcollector)
setupStorageCollectorMocks(mocks.storagecollector)
mocks.collector.On("Collect", testCred).
Return(domainmetrics.PostgreSQLDatabaseMetrics{}, errors.New("connection refused"))
mocks.apiserver.On("PushMetrics", mock.Anything, mock.MatchedBy(func(p domainmetrics.MetricPayload) bool {
for _, ms := range p.MetricSets {
if ms.Type == domainmetrics.MetricTypePostgreSQLDatabase {
dbMetrics := ms.Metrics.(domainmetrics.PostgreSQLDatabaseMetrics)
// up must be false
if dbMetrics.Up {
return false
}
// all other fields must be zero
if dbMetrics.ConnectionsTotal != 0 {
return false
}
if dbMetrics.MaxConnections != 0 {
return false
}
if dbMetrics.CacheHitRatio != 0 {
return false
}
if dbMetrics.TransactionsPerSecond != 0 {
return false
}
if dbMetrics.CommittedTxPerSecond != 0 {
return false
}
if dbMetrics.BlocksReadPerSecond != 0 {
return false
}
if dbMetrics.ReplicationLagSeconds != 0 {
return false
}
return true
}
}
return false // postgresql.database metric set must be present
})).Return(nil)

err := mp.Push(testCred)

assert.NoError(t, err)
mocks.apiserver.AssertExpectations(t)
}

// Verifies storage metrics are included in the payload
func TestPush_IncludesStorageMetrics(t *testing.T) {
mp, mocks := setupTestMetricsPusher()
Expand All @@ -803,7 +876,7 @@ func TestPush_IncludesStorageMetrics(t *testing.T) {
setupNetCollectorMocks(mocks.netcollector)
setupStorageCollectorMocks(mocks.storagecollector)
mocks.collector.On("Collect", testCred).
Return(domainmetrics.PostgreSQLDatabaseMetrics{}, nil)
Return(domainmetrics.PostgreSQLDatabaseMetrics{Up: true}, nil)
mocks.apiserver.On("PushMetrics", mock.Anything, mock.MatchedBy(func(p domainmetrics.MetricPayload) bool {
hasStorage := false
for _, ms := range p.MetricSets {
Expand Down Expand Up @@ -832,7 +905,7 @@ func TestPush_StorageMetricsFailure_StillPushesOtherMetrics(t *testing.T) {
mocks.storagecollector.On("Collect", mock.Anything).
Return(nil, errors.New("storage collection failed"))
mocks.collector.On("Collect", testCred).
Return(domainmetrics.PostgreSQLDatabaseMetrics{}, nil)
Return(domainmetrics.PostgreSQLDatabaseMetrics{Up: true}, nil)
mocks.apiserver.On("PushMetrics", mock.Anything, mock.MatchedBy(func(p domainmetrics.MetricPayload) bool {
hasSys := false
hasNet := false
Expand Down Expand Up @@ -876,7 +949,7 @@ func TestPush_StorageMetricsMultipleMounts(t *testing.T) {
},
}, nil)
mocks.collector.On("Collect", testCred).
Return(domainmetrics.PostgreSQLDatabaseMetrics{}, nil)
Return(domainmetrics.PostgreSQLDatabaseMetrics{Up: true}, nil)
mocks.apiserver.On("PushMetrics", mock.Anything, mock.MatchedBy(func(p domainmetrics.MetricPayload) bool {
storageCount := 0
for _, ms := range p.MetricSets {
Expand Down Expand Up @@ -913,7 +986,7 @@ func TestPush_StorageMetricsWithAttributes(t *testing.T) {
},
}, nil)
mocks.collector.On("Collect", testCred).
Return(domainmetrics.PostgreSQLDatabaseMetrics{}, nil)
Return(domainmetrics.PostgreSQLDatabaseMetrics{Up: true}, nil)
mocks.apiserver.On("PushMetrics", mock.Anything, mock.MatchedBy(func(p domainmetrics.MetricPayload) bool {
for _, ms := range p.MetricSets {
if ms.Type == domainmetrics.MetricTypeStorage {
Expand Down
1 change: 1 addition & 0 deletions domain/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type NetworkMetrics struct {
}

type PostgreSQLDatabaseMetrics struct {
Up bool `json:"up"`
ConnectionsTotal int `json:"connections_total"`
MaxConnections int `json:"max_connections"`
CacheHitRatio float64 `json:"cache_hit_ratio"`
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module hostlink

go 1.24.4
go 1.26.0

require (
github.com/glebarez/sqlite v1.11.0
Expand All @@ -14,10 +14,12 @@ require (
github.com/mattn/go-shellwords v1.0.12
github.com/oklog/ulid/v2 v2.1.1
github.com/shirou/gopsutil/v4 v4.25.11
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.11.1
github.com/testcontainers/testcontainers-go v0.39.0
github.com/testcontainers/testcontainers-go/modules/postgres v0.39.0
github.com/urfave/cli/v3 v3.4.1
golang.org/x/sys v0.39.0
gorm.io/gorm v1.31.0
)

Expand Down Expand Up @@ -71,7 +73,6 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/tklauser/go-sysconf v0.3.16 // indirect
github.com/tklauser/numcpus v0.11.0 // indirect
Expand All @@ -87,7 +88,6 @@ require (
golang.org/x/crypto v0.46.0 // indirect
golang.org/x/exp v0.0.0-20250819193227-8b4c13bb791b // indirect
golang.org/x/net v0.47.0 // indirect
golang.org/x/sys v0.39.0 // indirect
golang.org/x/text v0.32.0 // indirect
golang.org/x/time v0.11.0 // indirect
google.golang.org/grpc v1.75.1 // indirect
Expand Down