Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions cmd/telemetry/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
alertregistry "github.com/Space-DF/telemetry-service/internal/alerts/registry"
amqp "github.com/Space-DF/telemetry-service/internal/amqp/multi-tenant"
"github.com/Space-DF/telemetry-service/internal/api"
"github.com/Space-DF/telemetry-service/internal/events/registry"
"github.com/Space-DF/telemetry-service/internal/health"
"github.com/Space-DF/telemetry-service/internal/services"
"github.com/Space-DF/telemetry-service/internal/timescaledb"
Expand Down Expand Up @@ -88,8 +89,18 @@ func cmdServe(ctx *cli.Context, logger *zap.Logger) error {
}
}()

// Initialize rule registry
ruleRegistry := registry.NewRuleRegistry(tsClient, logger)

// Load default rules from YAML
if appConfig.Server.EventRulesDir != "" {
if err := ruleRegistry.LoadDefaultRulesFromDir(appConfig.Server.EventRulesDir); err != nil {
logger.Warn("Failed to load default event rules", zap.Error(err))
}
}

// Initialize location processor
processor := services.NewLocationProcessor(tsClient, logger)
processor := services.NewLocationProcessor(tsClient, ruleRegistry, logger)

// Initialize multi-tenant AMQP consumer with schema initializer
consumer := amqp.NewMultiTenantConsumer(appConfig.AMQP, appConfig.OrgEvents, processor, tsClient, logger)
Expand Down Expand Up @@ -135,12 +146,17 @@ func cmdServe(ctx *cli.Context, logger *zap.Logger) error {
}
}()

// Setup reload signal for alert processors
// Setup reload signal for alert processors and event rules
reloadChan := make(chan os.Signal, 1)
signal.Notify(reloadChan, syscall.SIGHUP)
go func() {
for range reloadChan {
loadAlertProcessors(logger, appConfig.Server.AlertsProcessorsCfg)
if appConfig.Server.EventRulesDir != "" {
if err := ruleRegistry.ReloadDefaultRules(appConfig.Server.EventRulesDir); err != nil {
logger.Warn("Failed to reload default event rules", zap.Error(err))
}
}
}
}()

Expand Down
23 changes: 12 additions & 11 deletions configs/config.yaml
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
# Server Configuration
server:
log_level: "info" # debug, info, warn, error
api_port: 8080 # API server port (includes health endpoints)
log_level: "info" # debug, info, warn, error
api_port: 8080 # API server port (includes health endpoints)
alerts_processors_path: "configs/alerts_processors.yaml"
event_rules_dir: "configs/event_rules" # Directory containing device model event rules

# AMQP/RabbitMQ Configuration
amqp:
broker_url: "" # Set via environment variable AMQP_BROKER_URL
broker_url: "" # Set via environment variable AMQP_BROKER_URL
consumer_tag: "telemetry-service"
prefetch_count: 100
allowed_vhosts: [] # Empty means process all vhosts
allowed_vhosts: [] # Empty means process all vhosts
reconnect_delay: "5s"

# Organization Events Configuration
org_events:
exchange: "org.events" # Topic exchange for org events
queue: "telemetry.org.events.queue" # Telemetry's queue for org events
routing_key: "org.#" # Listen to all org events
exchange: "org.events" # Topic exchange for org events
queue: "telemetry.org.events.queue" # Telemetry's queue for org events
routing_key: "org.#" # Listen to all org events
consumer_tag: "telemetry-org-events"

# TimescaleDB Configuration
Expand All @@ -26,10 +27,10 @@ db:
password: "postgres"
host: "localhost"
port: 5437
batch_size: 1000 # Number of locations to batch before writing
flush_interval: "1s" # Maximum time between batch writes
max_connections: 25 # Maximum number of open database connections
max_idle_conns: 5 # Maximum number of idle connections in the pool
batch_size: 1000 # Number of locations to batch before writing
flush_interval: "1s" # Maximum time between batch writes
max_connections: 25 # Maximum number of open database connections
max_idle_conns: 5 # Maximum number of idle connections in the pool

psql:
dsn: "postgres://postgres:postgres@localhost:5437/spacedf_telemetry?sslmode=disable"
66 changes: 66 additions & 0 deletions configs/event_rules/rakwireless/rak4630.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# RAK4630 Event Rules
brand: "rakwireless"
model: "rak4630"
model_id: "" # Will be resolved from device service
display_name: "RAK4630 Event Rules"

rules:
# Battery Low Warning
- rule_key: "battery"
operator: "lt"
operand: "3.3"
event_type: "device_event"
event_level: "system"
description: "Battery voltage is low (< 3.3V)"
allow_new_event: true
is_active: true

# Battery Critical Warning
- rule_key: "battery"
operator: "lt"
operand: "3.0"
event_type: "device_event"
event_level: "system"
description: "Battery voltage is critically low (< 3.0V)"
allow_new_event: true
is_active: true

# High Temperature Warning
- rule_key: "temperature"
operator: "gt"
operand: "50"
event_type: "device_event"
event_level: "system"
description: "Temperature is high (> 50°C)"
allow_new_event: true
is_active: true

# Low Temperature Warning
- rule_key: "temperature"
operator: "lt"
oper`nd: "-10"
event_type: "device_event"
event_level: "system"
description: "Temperature is very low (< -10°C)"
allow_new_event: true
is_active: true

# High Humidity Warning
- rule_key: "humidity"
operator: "gt"
operand: "90"
event_type: "device_event"
event_level: "system"
description: "Humidity is very high (> 90%)"
allow_new_event: true
is_active: true

# Low Humidity Warning
- rule_key: "humidity"
operator: "lt"
operand: "10"
event_type: "device_event"
event_level: "system"
description: "Humidity is very low (< 10%)"
allow_new_event: true
is_active: true
16 changes: 0 additions & 16 deletions internal/alerts/registry/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,3 @@ func LoadFromConfig(path string) (map[string]Processor, error) {

return result, nil
}

// RegisterFromConfig merges processors from YAML into the global registry, overriding existing categories.
func RegisterFromConfig(path string) error {
processors, err := LoadFromConfig(path)
if err != nil {
return err
}

globalRegistry.mu.Lock()
defer globalRegistry.mu.Unlock()

for category, processor := range processors {
globalRegistry.processors[category] = processor
}
return nil
}
217 changes: 217 additions & 0 deletions internal/api/events/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package events

import (
"net/http"
"strconv"
"strings"

"github.com/Space-DF/telemetry-service/internal/api/common"
"github.com/Space-DF/telemetry-service/internal/models"
"github.com/Space-DF/telemetry-service/internal/timescaledb"
"github.com/labstack/echo/v4"
"go.uber.org/zap"
)

// getEventsByDevice returns all events for a specific device
func getEventsByDevice(logger *zap.Logger, tsClient *timescaledb.Client) echo.HandlerFunc {
return func(c echo.Context) error {
orgToUse := common.ResolveOrgFromRequest(c)
if orgToUse == "" {
return c.JSON(http.StatusBadRequest, map[string]string{
"error": "Could not determine organization from hostname or X-Organization header",
})
}

deviceID := strings.TrimSpace(c.Param("device_id"))
if deviceID == "" {
return c.JSON(http.StatusBadRequest, map[string]string{
"error": "device_id is required",
})
}

limit := 100
if limitStr := c.QueryParam("limit"); limitStr != "" {
if l, err := strconv.Atoi(limitStr); err == nil && l > 0 {
limit = l
}
}

// Parse start_time and end_time query parameters
var startTime, endTime *int64
if startTimeStr := c.QueryParam("start_time"); startTimeStr != "" {
if ms, err := strconv.ParseInt(startTimeStr, 10, 64); err == nil {
startTime = &ms
}
}
if endTimeStr := c.QueryParam("end_time"); endTimeStr != "" {
if ms, err := strconv.ParseInt(endTimeStr, 10, 64); err == nil {
endTime = &ms
}
}

ctx := timescaledb.ContextWithOrg(c.Request().Context(), orgToUse)
events, err := tsClient.GetEventsByDevice(ctx, orgToUse, deviceID, limit, startTime, endTime)
if err != nil {
logger.Error("failed to get events by device",
zap.String("device_id", deviceID),
zap.Error(err))
return c.JSON(http.StatusInternalServerError, map[string]string{
"error": "failed to get events",
})
}

return c.JSON(http.StatusOK, map[string]interface{}{
"device_id": deviceID,
"events": events,
"count": len(events),
})
}
}

// getEventRules returns all event rules
func getEventRules(logger *zap.Logger, tsClient *timescaledb.Client) echo.HandlerFunc {
return func(c echo.Context) error {
orgToUse := common.ResolveOrgFromRequest(c)
if orgToUse == "" {
return c.JSON(http.StatusBadRequest, map[string]string{
"error": "Could not determine organization from hostname or X-Organization header",
})
}

page := 1
if pageStr := c.QueryParam("page"); pageStr != "" {
if p, err := strconv.Atoi(pageStr); err == nil && p > 0 {
page = p
}
}

pageSize := 20
if sizeStr := c.QueryParam("page_size"); sizeStr != "" {
if s, err := strconv.Atoi(sizeStr); err == nil && s > 0 && s <= 100 {
pageSize = s
}
}

deviceID := c.QueryParam("device_id")

ctx := timescaledb.ContextWithOrg(c.Request().Context(), orgToUse)
rules, total, err := tsClient.GetEventRules(ctx, deviceID, page, pageSize)
if err != nil {
logger.Error("failed to get event rules",
zap.Error(err))
return c.JSON(http.StatusInternalServerError, map[string]string{
"error": "failed to get event rules",
})
}

return c.JSON(http.StatusOK, models.EventRulesListResponse{
Rules: rules,
TotalCount: total,
Page: page,
PageSize: pageSize,
})
}
}

// createEventRule creates a new event rule
func createEventRule(logger *zap.Logger, tsClient *timescaledb.Client) echo.HandlerFunc {
return func(c echo.Context) error {
orgToUse := common.ResolveOrgFromRequest(c)
if orgToUse == "" {
return c.JSON(http.StatusBadRequest, map[string]string{
"error": "Could not determine organization from hostname or X-Organization header",
})
}

var req models.EventRuleRequest
if err := c.Bind(&req); err != nil {
return c.JSON(http.StatusBadRequest, map[string]string{
"error": "invalid request body",
})
}

ctx := timescaledb.ContextWithOrg(c.Request().Context(), orgToUse)
rule, err := tsClient.CreateEventRule(ctx, &req)
if err != nil {
logger.Error("failed to create event rule",
zap.Error(err))
return c.JSON(http.StatusInternalServerError, map[string]string{
"error": "failed to create event rule",
})
}

return c.JSON(http.StatusCreated, rule)
}
}

// updateEventRule updates an existing event rule
func updateEventRule(logger *zap.Logger, tsClient *timescaledb.Client) echo.HandlerFunc {
return func(c echo.Context) error {
orgToUse := common.ResolveOrgFromRequest(c)
if orgToUse == "" {
return c.JSON(http.StatusBadRequest, map[string]string{
"error": "Could not determine organization from hostname or X-Organization header",
})
}

ruleID := strings.TrimSpace(c.Param("rule_id"))
if ruleID == "" {
return c.JSON(http.StatusBadRequest, map[string]string{
"error": "rule_id is required",
})
}

var req models.EventRuleRequest
if err := c.Bind(&req); err != nil {
return c.JSON(http.StatusBadRequest, map[string]string{
"error": "invalid request body",
})
}

ctx := timescaledb.ContextWithOrg(c.Request().Context(), orgToUse)
rule, err := tsClient.UpdateEventRule(ctx, ruleID, &req)
if err != nil {
logger.Error("failed to update event rule",
zap.String("rule_id", ruleID),
zap.Error(err))
return c.JSON(http.StatusInternalServerError, map[string]string{
"error": "failed to update event rule",
})
}

return c.JSON(http.StatusOK, rule)
}
}

// deleteEventRule deletes an event rule
func deleteEventRule(logger *zap.Logger, tsClient *timescaledb.Client) echo.HandlerFunc {
return func(c echo.Context) error {
orgToUse := common.ResolveOrgFromRequest(c)
if orgToUse == "" {
return c.JSON(http.StatusBadRequest, map[string]string{
"error": "Could not determine organization from hostname or X-Organization header",
})
}

ruleID := strings.TrimSpace(c.Param("rule_id"))
if ruleID == "" {
return c.JSON(http.StatusBadRequest, map[string]string{
"error": "rule_id is required",
})
}

ctx := timescaledb.ContextWithOrg(c.Request().Context(), orgToUse)
if err := tsClient.DeleteEventRule(ctx, ruleID); err != nil {
logger.Error("failed to delete event rule",
zap.String("rule_id", ruleID),
zap.Error(err))
return c.JSON(http.StatusInternalServerError, map[string]string{
"error": "failed to delete event rule",
})
}

return c.JSON(http.StatusOK, map[string]string{
"message": "event rule deleted successfully",
})
}
}
Loading