diff --git a/cmd/telemetry/serve.go b/cmd/telemetry/serve.go index d321324..6c4532e 100644 --- a/cmd/telemetry/serve.go +++ b/cmd/telemetry/serve.go @@ -29,6 +29,7 @@ import ( alertregistry "github.com/Space-DF/telemetry-service/internal/alerts/registry" amqp "github.com/Space-DF/telemetry-service/internal/amqp/multi-tenant" "github.com/Space-DF/telemetry-service/internal/api" + "github.com/Space-DF/telemetry-service/internal/events/registry" "github.com/Space-DF/telemetry-service/internal/health" "github.com/Space-DF/telemetry-service/internal/services" "github.com/Space-DF/telemetry-service/internal/timescaledb" @@ -88,8 +89,18 @@ func cmdServe(ctx *cli.Context, logger *zap.Logger) error { } }() + // Initialize rule registry + ruleRegistry := registry.NewRuleRegistry(tsClient, logger) + + // Load default rules from YAML + if appConfig.Server.EventRulesDir != "" { + if err := ruleRegistry.LoadDefaultRulesFromDir(appConfig.Server.EventRulesDir); err != nil { + logger.Warn("Failed to load default event rules", zap.Error(err)) + } + } + // Initialize location processor - processor := services.NewLocationProcessor(tsClient, logger) + processor := services.NewLocationProcessor(tsClient, ruleRegistry, logger) // Initialize multi-tenant AMQP consumer with schema initializer consumer := amqp.NewMultiTenantConsumer(appConfig.AMQP, appConfig.OrgEvents, processor, tsClient, logger) @@ -135,12 +146,17 @@ func cmdServe(ctx *cli.Context, logger *zap.Logger) error { } }() - // Setup reload signal for alert processors + // Setup reload signal for alert processors and event rules reloadChan := make(chan os.Signal, 1) signal.Notify(reloadChan, syscall.SIGHUP) go func() { for range reloadChan { loadAlertProcessors(logger, appConfig.Server.AlertsProcessorsCfg) + if appConfig.Server.EventRulesDir != "" { + if err := ruleRegistry.ReloadDefaultRules(appConfig.Server.EventRulesDir); err != nil { + logger.Warn("Failed to reload default event rules", zap.Error(err)) + } + } } }() diff --git a/configs/config.yaml b/configs/config.yaml index cbb8152..093d69f 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -1,22 +1,23 @@ # Server Configuration server: - log_level: "info" # debug, info, warn, error - api_port: 8080 # API server port (includes health endpoints) + log_level: "info" # debug, info, warn, error + api_port: 8080 # API server port (includes health endpoints) alerts_processors_path: "configs/alerts_processors.yaml" + event_rules_dir: "configs/event_rules" # Directory containing device model event rules # AMQP/RabbitMQ Configuration amqp: - broker_url: "" # Set via environment variable AMQP_BROKER_URL + broker_url: "" # Set via environment variable AMQP_BROKER_URL consumer_tag: "telemetry-service" prefetch_count: 100 - allowed_vhosts: [] # Empty means process all vhosts + allowed_vhosts: [] # Empty means process all vhosts reconnect_delay: "5s" # Organization Events Configuration org_events: - exchange: "org.events" # Topic exchange for org events - queue: "telemetry.org.events.queue" # Telemetry's queue for org events - routing_key: "org.#" # Listen to all org events + exchange: "org.events" # Topic exchange for org events + queue: "telemetry.org.events.queue" # Telemetry's queue for org events + routing_key: "org.#" # Listen to all org events consumer_tag: "telemetry-org-events" # TimescaleDB Configuration @@ -26,10 +27,10 @@ db: password: "postgres" host: "localhost" port: 5437 - batch_size: 1000 # Number of locations to batch before writing - flush_interval: "1s" # Maximum time between batch writes - max_connections: 25 # Maximum number of open database connections - max_idle_conns: 5 # Maximum number of idle connections in the pool + batch_size: 1000 # Number of locations to batch before writing + flush_interval: "1s" # Maximum time between batch writes + max_connections: 25 # Maximum number of open database connections + max_idle_conns: 5 # Maximum number of idle connections in the pool psql: dsn: "postgres://postgres:postgres@localhost:5437/spacedf_telemetry?sslmode=disable" diff --git a/configs/event_rules/rakwireless/rak4630.yaml b/configs/event_rules/rakwireless/rak4630.yaml new file mode 100644 index 0000000..834408c --- /dev/null +++ b/configs/event_rules/rakwireless/rak4630.yaml @@ -0,0 +1,66 @@ +# RAK4630 Event Rules +brand: "rakwireless" +model: "rak4630" +model_id: "" # Will be resolved from device service +display_name: "RAK4630 Event Rules" + +rules: + # Battery Low Warning + - rule_key: "battery" + operator: "lt" + operand: "3.3" + event_type: "device_event" + event_level: "system" + description: "Battery voltage is low (< 3.3V)" + allow_new_event: true + is_active: true + + # Battery Critical Warning + - rule_key: "battery" + operator: "lt" + operand: "3.0" + event_type: "device_event" + event_level: "system" + description: "Battery voltage is critically low (< 3.0V)" + allow_new_event: true + is_active: true + + # High Temperature Warning + - rule_key: "temperature" + operator: "gt" + operand: "50" + event_type: "device_event" + event_level: "system" + description: "Temperature is high (> 50°C)" + allow_new_event: true + is_active: true + + # Low Temperature Warning + - rule_key: "temperature" + operator: "lt" + oper`nd: "-10" + event_type: "device_event" + event_level: "system" + description: "Temperature is very low (< -10°C)" + allow_new_event: true + is_active: true + + # High Humidity Warning + - rule_key: "humidity" + operator: "gt" + operand: "90" + event_type: "device_event" + event_level: "system" + description: "Humidity is very high (> 90%)" + allow_new_event: true + is_active: true + + # Low Humidity Warning + - rule_key: "humidity" + operator: "lt" + operand: "10" + event_type: "device_event" + event_level: "system" + description: "Humidity is very low (< 10%)" + allow_new_event: true + is_active: true diff --git a/internal/alerts/registry/loader.go b/internal/alerts/registry/loader.go index 4d96be0..42b4ca1 100644 --- a/internal/alerts/registry/loader.go +++ b/internal/alerts/registry/loader.go @@ -63,19 +63,3 @@ func LoadFromConfig(path string) (map[string]Processor, error) { return result, nil } - -// RegisterFromConfig merges processors from YAML into the global registry, overriding existing categories. -func RegisterFromConfig(path string) error { - processors, err := LoadFromConfig(path) - if err != nil { - return err - } - - globalRegistry.mu.Lock() - defer globalRegistry.mu.Unlock() - - for category, processor := range processors { - globalRegistry.processors[category] = processor - } - return nil -} diff --git a/internal/api/events/handler.go b/internal/api/events/handler.go new file mode 100644 index 0000000..af8c560 --- /dev/null +++ b/internal/api/events/handler.go @@ -0,0 +1,217 @@ +package events + +import ( + "net/http" + "strconv" + "strings" + + "github.com/Space-DF/telemetry-service/internal/api/common" + "github.com/Space-DF/telemetry-service/internal/models" + "github.com/Space-DF/telemetry-service/internal/timescaledb" + "github.com/labstack/echo/v4" + "go.uber.org/zap" +) + +// getEventsByDevice returns all events for a specific device +func getEventsByDevice(logger *zap.Logger, tsClient *timescaledb.Client) echo.HandlerFunc { + return func(c echo.Context) error { + orgToUse := common.ResolveOrgFromRequest(c) + if orgToUse == "" { + return c.JSON(http.StatusBadRequest, map[string]string{ + "error": "Could not determine organization from hostname or X-Organization header", + }) + } + + deviceID := strings.TrimSpace(c.Param("device_id")) + if deviceID == "" { + return c.JSON(http.StatusBadRequest, map[string]string{ + "error": "device_id is required", + }) + } + + limit := 100 + if limitStr := c.QueryParam("limit"); limitStr != "" { + if l, err := strconv.Atoi(limitStr); err == nil && l > 0 { + limit = l + } + } + + // Parse start_time and end_time query parameters + var startTime, endTime *int64 + if startTimeStr := c.QueryParam("start_time"); startTimeStr != "" { + if ms, err := strconv.ParseInt(startTimeStr, 10, 64); err == nil { + startTime = &ms + } + } + if endTimeStr := c.QueryParam("end_time"); endTimeStr != "" { + if ms, err := strconv.ParseInt(endTimeStr, 10, 64); err == nil { + endTime = &ms + } + } + + ctx := timescaledb.ContextWithOrg(c.Request().Context(), orgToUse) + events, err := tsClient.GetEventsByDevice(ctx, orgToUse, deviceID, limit, startTime, endTime) + if err != nil { + logger.Error("failed to get events by device", + zap.String("device_id", deviceID), + zap.Error(err)) + return c.JSON(http.StatusInternalServerError, map[string]string{ + "error": "failed to get events", + }) + } + + return c.JSON(http.StatusOK, map[string]interface{}{ + "device_id": deviceID, + "events": events, + "count": len(events), + }) + } +} + +// getEventRules returns all event rules +func getEventRules(logger *zap.Logger, tsClient *timescaledb.Client) echo.HandlerFunc { + return func(c echo.Context) error { + orgToUse := common.ResolveOrgFromRequest(c) + if orgToUse == "" { + return c.JSON(http.StatusBadRequest, map[string]string{ + "error": "Could not determine organization from hostname or X-Organization header", + }) + } + + page := 1 + if pageStr := c.QueryParam("page"); pageStr != "" { + if p, err := strconv.Atoi(pageStr); err == nil && p > 0 { + page = p + } + } + + pageSize := 20 + if sizeStr := c.QueryParam("page_size"); sizeStr != "" { + if s, err := strconv.Atoi(sizeStr); err == nil && s > 0 && s <= 100 { + pageSize = s + } + } + + deviceID := c.QueryParam("device_id") + + ctx := timescaledb.ContextWithOrg(c.Request().Context(), orgToUse) + rules, total, err := tsClient.GetEventRules(ctx, deviceID, page, pageSize) + if err != nil { + logger.Error("failed to get event rules", + zap.Error(err)) + return c.JSON(http.StatusInternalServerError, map[string]string{ + "error": "failed to get event rules", + }) + } + + return c.JSON(http.StatusOK, models.EventRulesListResponse{ + Rules: rules, + TotalCount: total, + Page: page, + PageSize: pageSize, + }) + } +} + +// createEventRule creates a new event rule +func createEventRule(logger *zap.Logger, tsClient *timescaledb.Client) echo.HandlerFunc { + return func(c echo.Context) error { + orgToUse := common.ResolveOrgFromRequest(c) + if orgToUse == "" { + return c.JSON(http.StatusBadRequest, map[string]string{ + "error": "Could not determine organization from hostname or X-Organization header", + }) + } + + var req models.EventRuleRequest + if err := c.Bind(&req); err != nil { + return c.JSON(http.StatusBadRequest, map[string]string{ + "error": "invalid request body", + }) + } + + ctx := timescaledb.ContextWithOrg(c.Request().Context(), orgToUse) + rule, err := tsClient.CreateEventRule(ctx, &req) + if err != nil { + logger.Error("failed to create event rule", + zap.Error(err)) + return c.JSON(http.StatusInternalServerError, map[string]string{ + "error": "failed to create event rule", + }) + } + + return c.JSON(http.StatusCreated, rule) + } +} + +// updateEventRule updates an existing event rule +func updateEventRule(logger *zap.Logger, tsClient *timescaledb.Client) echo.HandlerFunc { + return func(c echo.Context) error { + orgToUse := common.ResolveOrgFromRequest(c) + if orgToUse == "" { + return c.JSON(http.StatusBadRequest, map[string]string{ + "error": "Could not determine organization from hostname or X-Organization header", + }) + } + + ruleID := strings.TrimSpace(c.Param("rule_id")) + if ruleID == "" { + return c.JSON(http.StatusBadRequest, map[string]string{ + "error": "rule_id is required", + }) + } + + var req models.EventRuleRequest + if err := c.Bind(&req); err != nil { + return c.JSON(http.StatusBadRequest, map[string]string{ + "error": "invalid request body", + }) + } + + ctx := timescaledb.ContextWithOrg(c.Request().Context(), orgToUse) + rule, err := tsClient.UpdateEventRule(ctx, ruleID, &req) + if err != nil { + logger.Error("failed to update event rule", + zap.String("rule_id", ruleID), + zap.Error(err)) + return c.JSON(http.StatusInternalServerError, map[string]string{ + "error": "failed to update event rule", + }) + } + + return c.JSON(http.StatusOK, rule) + } +} + +// deleteEventRule deletes an event rule +func deleteEventRule(logger *zap.Logger, tsClient *timescaledb.Client) echo.HandlerFunc { + return func(c echo.Context) error { + orgToUse := common.ResolveOrgFromRequest(c) + if orgToUse == "" { + return c.JSON(http.StatusBadRequest, map[string]string{ + "error": "Could not determine organization from hostname or X-Organization header", + }) + } + + ruleID := strings.TrimSpace(c.Param("rule_id")) + if ruleID == "" { + return c.JSON(http.StatusBadRequest, map[string]string{ + "error": "rule_id is required", + }) + } + + ctx := timescaledb.ContextWithOrg(c.Request().Context(), orgToUse) + if err := tsClient.DeleteEventRule(ctx, ruleID); err != nil { + logger.Error("failed to delete event rule", + zap.String("rule_id", ruleID), + zap.Error(err)) + return c.JSON(http.StatusInternalServerError, map[string]string{ + "error": "failed to delete event rule", + }) + } + + return c.JSON(http.StatusOK, map[string]string{ + "message": "event rule deleted successfully", + }) + } +} \ No newline at end of file diff --git a/internal/api/events/router.go b/internal/api/events/router.go new file mode 100644 index 0000000..d33b4d1 --- /dev/null +++ b/internal/api/events/router.go @@ -0,0 +1,22 @@ +package events + +import ( + "github.com/Space-DF/telemetry-service/internal/config" + "github.com/Space-DF/telemetry-service/internal/timescaledb" + "github.com/labstack/echo/v4" + "go.uber.org/zap" +) + +func RegisterRoutes(e *echo.Group, cfg *config.Config, logger *zap.Logger, tsClient *timescaledb.Client) { + // Get events for a specific device + e.GET("/events/device/:device_id", getEventsByDevice(logger, tsClient)) + + // Get event rules + e.GET("/event-rules", getEventRules(logger, tsClient)) + // Create a new event rule + e.POST("/event-rules", createEventRule(logger, tsClient)) + // Update an event rule + e.PUT("/event-rules/:rule_id", updateEventRule(logger, tsClient)) + // Delete an event rule + e.DELETE("/event-rules/:rule_id", deleteEventRule(logger, tsClient)) +} diff --git a/internal/api/router.go b/internal/api/router.go index e4dc72b..f11062c 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -4,6 +4,7 @@ import ( "github.com/Space-DF/telemetry-service/internal/api/alerts" "github.com/Space-DF/telemetry-service/internal/api/data" "github.com/Space-DF/telemetry-service/internal/api/entities" + "github.com/Space-DF/telemetry-service/internal/api/events" "github.com/Space-DF/telemetry-service/internal/api/location" "github.com/Space-DF/telemetry-service/internal/api/widget" "github.com/Space-DF/telemetry-service/internal/config" @@ -19,4 +20,5 @@ func Setup(cfg *config.Config, e *echo.Group, logger *zap.Logger, tsClient *time alerts.RegisterRoutes(group, logger, tsClient) widget.RegisterRoutes(group, logger, tsClient) data.RegisterRoutes(group, logger, tsClient) + events.RegisterRoutes(group, cfg, logger, tsClient) } diff --git a/internal/config/config.go b/internal/config/config.go index 4850eda..d1aa427 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -32,6 +32,7 @@ type Server struct { LogLevel string `mapstructure:"log_level"` APIPort int `mapstructure:"api_port"` AlertsProcessorsCfg string `mapstructure:"alerts_processors_path"` + EventRulesDir string `mapstructure:"event_rules_dir"` } // AMQP contains RabbitMQ configuration diff --git a/internal/config/event_rules.go b/internal/config/event_rules.go new file mode 100644 index 0000000..43af323 --- /dev/null +++ b/internal/config/event_rules.go @@ -0,0 +1,156 @@ +package config + +import ( + "fmt" + "os" + "path/filepath" + "strings" + + "gopkg.in/yaml.v3" +) + +// EventRuleConfig represents a single event rule configuration +type EventRuleConfig struct { + RuleKey string `yaml:"rule_key"` + Operator string `yaml:"operator"` + Operand string `yaml:"operand"` + EventType string `yaml:"event_type"` + EventLevel string `yaml:"event_level"` + Description string `yaml:"description"` + Status string `yaml:"status"` + IsActive bool `yaml:"is_active"` +} + +// DeviceModelRules represents event rules for a specific device model +type DeviceModelRules struct { + DeviceModel string `yaml:"device_model"` + DisplayName string `yaml:"display_name"` + Rules []EventRuleConfig `yaml:"rules"` +} + +// EventRulesConfig represents the aggregated event rules configuration +type EventRulesConfig struct { + DeviceModels []DeviceModelRules `yaml:"device_models"` +} + +// LoadEventRulesConfig loads event rules from a YAML file +func LoadEventRulesConfig(path string) (*EventRulesConfig, error) { + if path == "" { + return nil, fmt.Errorf("event rules config path is empty") + } + + // Validate the path is within allowed directories (security: prevent path traversal) + absPath, err := filepath.Abs(path) + if err != nil { + return nil, fmt.Errorf("invalid path: %w", err) + } + absPath = filepath.Clean(absPath) + + if !isPathAllowed(absPath) { + return nil, fmt.Errorf("path traversal detected: file must be within configs directory: %s", path) + } + + data, err := os.ReadFile(absPath) + if err != nil { + return nil, fmt.Errorf("failed to read event rules config file: %w", err) + } + + var cfg EventRulesConfig + if err := yaml.Unmarshal(data, &cfg); err != nil { + return nil, fmt.Errorf("failed to parse event rules config: %w", err) + } + + return &cfg, nil +} + +// LoadEventRulesFromDir loads all event rule YAML files from a directory +// Each file should contain a single device model's rules +func LoadEventRulesFromDir(dir string) (*EventRulesConfig, error) { + if dir == "" { + return nil, fmt.Errorf("event rules directory is empty") + } + + entries, err := os.ReadDir(dir) + if err != nil { + return nil, fmt.Errorf("failed to read event rules directory: %w", err) + } + + var cfg EventRulesConfig + + for _, entry := range entries { + // Skip directories and non-YAML files + if entry.IsDir() { + continue + } + + // Check for .yaml or .yml extension + name := entry.Name() + ext := filepath.Ext(name) + if ext != ".yaml" && ext != ".yml" { + continue + } + + // Load device model rules from file + path := filepath.Join(dir, name) + dmRules, err := loadDeviceModelRules(path) + if err != nil { + // Log warning but continue loading other files + fmt.Printf("Warning: failed to load %s: %v\n", name, err) + continue + } + + cfg.DeviceModels = append(cfg.DeviceModels, *dmRules) + } + + if len(cfg.DeviceModels) == 0 { + return nil, fmt.Errorf("no valid event rules found in directory: %s", dir) + } + + return &cfg, nil +} + +// loadDeviceModelRules loads a single device model's rules from a YAML file +func loadDeviceModelRules(path string) (*DeviceModelRules, error) { + // Validate the path is within allowed directories (security: prevent path traversal) + absPath, err := filepath.Abs(path) + if err != nil { + return nil, fmt.Errorf("invalid path: %w", err) + } + absPath = filepath.Clean(absPath) + + // Check if file is within an allowed directory (configs/event_rules or configs) + // This prevents directory traversal attacks + if !isPathAllowed(absPath) { + return nil, fmt.Errorf("path traversal detected: file must be within configs directory: %s", path) + } + + data, err := os.ReadFile(absPath) + if err != nil { + return nil, fmt.Errorf("failed to read file: %w", err) + } + + var dm DeviceModelRules + if err := yaml.Unmarshal(data, &dm); err != nil { + return nil, fmt.Errorf("failed to parse device model rules: %w", err) + } + + return &dm, nil +} + +// isPathAllowed checks if a path is within the configs directory +func isPathAllowed(path string) bool { + absPath := filepath.Clean(path) + // Check for common allowed prefixes + allowedPrefixes := []string{ + "configs/event_rules", + "configs" + string(filepath.Separator) + "event_rules", + } + + for _, prefix := range allowedPrefixes { + allowedPath, _ := filepath.Abs(prefix) + if strings.HasPrefix(absPath, allowedPath) { + return true + } + } + return false +} \ No newline at end of file diff --git a/internal/events/evaluator/evaluator.go b/internal/events/evaluator/evaluator.go new file mode 100644 index 0000000..41ea694 --- /dev/null +++ b/internal/events/evaluator/evaluator.go @@ -0,0 +1,175 @@ +package evaluator + +import ( + "fmt" + "strings" + "time" + + "github.com/Space-DF/telemetry-service/internal/events" + "github.com/Space-DF/telemetry-service/internal/events/loader" + "github.com/Space-DF/telemetry-service/internal/models" + "go.uber.org/zap" +) + +// Evaluator handles rule evaluation logic +type Evaluator struct { + logger *zap.Logger +} + +// NewEvaluator creates a new rule evaluator +func NewEvaluator(logger *zap.Logger) *Evaluator { + return &Evaluator{ + logger: logger, + } +} + +// EvaluateRule evaluates a single YAML rule against an entity +func (e *Evaluator) EvaluateRule(rule loader.YAMLRule, deviceID string, entity models.TelemetryEntity) *models.MatchedEvent { + // Skip inactive rules + if !rule.IsActive { + return nil + } + + // Check if rule allows creating new events + if !rule.AllowNewEvent { + return nil + } + + // Get the value from entity attributes based on rule_key + value, exists := e.getEntityValue(entity, rule.RuleKey) + if !exists { + return nil + } + + // Parse the operand as float64 + operand, ok := events.ParseFloat64(rule.Operand) + if !ok { + e.logger.Warn("Failed to parse operand as float64", + zap.String("rule_key", rule.RuleKey), + zap.String("operand", rule.Operand)) + return nil + } + + // Evaluate the condition + matched := events.CompareValues(value, operand, rule.Operator, e.logger) + if !matched { + return nil + } + + matchedEvent := &models.MatchedEvent{ + EntityID: deviceID, + EntityType: entity.EntityType, + RuleKey: rule.RuleKey, + EventType: rule.EventType, + EventLevel: rule.EventLevel, + Description: rule.Description, + Value: value, + Threshold: operand, + Operator: rule.Operator, + Timestamp: time.Now().UnixMilli(), + StateID: entity.StateID, + } + + return matchedEvent +} + +// EvaluateRuleDB evaluates a database rule against an entity +func (e *Evaluator) EvaluateRuleDB(rule models.EventRule, deviceID string, entity models.TelemetryEntity) *models.MatchedEvent { + // Skip inactive rules + if rule.IsActive != nil && !*rule.IsActive { + return nil + } + + // Check if rule allows creating new events + if rule.AllowNewEvent != nil && !*rule.AllowNewEvent { + return nil + } + + // Get rule key from database rule + ruleKey := "" + if rule.RuleKey != nil { + ruleKey = *rule.RuleKey + } + if ruleKey == "" { + return nil + } + + // Get the value from entity based on rule_key + value, exists := e.getEntityValue(entity, ruleKey) + if !exists { + return nil + } + + // Parse the operand as float64 + operand, ok := events.ParseFloat64(rule.Operand) + if !ok { + e.logger.Warn("Failed to parse operand as float64", + zap.String("rule_key", ruleKey), + zap.String("operand", rule.Operand)) + return nil + } + + // Get operator + operator := "" + if rule.Operator != nil { + operator = *rule.Operator + } + if operator == "" { + operator = "eq" // default + } + + // Evaluate the condition + matched := events.CompareValues(value, operand, operator, e.logger) + if !matched { + return nil + } + + // Build description + description := fmt.Sprintf("Rule %s matched: %.2f %s %.2f", ruleKey, value, operator, operand) + + matchedEvent := &models.MatchedEvent{ + EntityID: deviceID, + EntityType: entity.EntityType, + RuleKey: ruleKey, + EventType: "device_event", + EventLevel: "automation", + Description: description, + Value: value, + Threshold: operand, + Operator: operator, + Timestamp: time.Now().UnixMilli(), + EventRuleID: &rule.EventRuleID, + StateID: entity.StateID, + } + + return matchedEvent +} + +// getEntityValue extracts a numeric value from an entity state based on the rule key +func (e *Evaluator) getEntityValue(entity models.TelemetryEntity, ruleKey string) (float64, bool) { + if entity.State == nil { + return 0, false + } + + switch s := entity.State.(type) { + case map[string]interface{}: + if val, ok := s[ruleKey]; ok { + return events.ParseFloat64(val) + } + default: + if val, ok := events.ParseFloat64(s); ok { + if e.isRuleKeyMatched(ruleKey, entity.EntityType) { + return val, true + } + } + } + + return 0, false +} + +// isRuleKeyMatched checks if a rule key matches an entity type +func (e *Evaluator) isRuleKeyMatched(ruleKey, entityType string) bool { + ruleKeyLower := strings.ToLower(ruleKey) + + return ruleKeyLower == strings.ToLower(entityType) +} diff --git a/internal/events/helpers.go b/internal/events/helpers.go new file mode 100644 index 0000000..f7bd1a5 --- /dev/null +++ b/internal/events/helpers.go @@ -0,0 +1,60 @@ +package events + +import ( + "strconv" + "strings" + + "go.uber.org/zap" +) + +// ParseFloat64 converts various types to float64 +func ParseFloat64(value interface{}) (float64, bool) { + switch v := value.(type) { + case float64: + return v, true + case float32: + return float64(v), true + case int: + return float64(v), true + case int64: + return float64(v), true + case int32: + return float64(v), true + case string: + f, err := strconv.ParseFloat(v, 64) + if err != nil { + return 0, false + } + return f, true + default: + return 0, false + } +} + +// CompareValues performs comparison based on operator +func CompareValues(value, threshold float64, operator string, logger *zap.Logger) bool { + switch operator { + case "gt": + return value > threshold + case "lt": + return value < threshold + case "gte": + return value >= threshold + case "lte": + return value <= threshold + case "eq": + return value == threshold + case "ne": + return value != threshold + default: + if logger != nil { + logger.Warn("Unknown operator", zap.String("operator", operator)) + } + return false + } +} + +// Contains checks if the target string contains the substring (case-insensitive) +func Contains(str, substr string) bool { + return strings.Contains(strings.ToLower(str), strings.ToLower(substr)) +} diff --git a/internal/events/loader/system_rules_loader.go b/internal/events/loader/system_rules_loader.go new file mode 100644 index 0000000..ba68e3d --- /dev/null +++ b/internal/events/loader/system_rules_loader.go @@ -0,0 +1,127 @@ +package loader + +import ( + "fmt" + "os" + "path/filepath" + "strings" + + "golang.org/x/text/cases" + "golang.org/x/text/language" + "gopkg.in/yaml.v3" +) + +// YAMLRule represents a single event rule from YAML configuration +type YAMLRule struct { + RuleKey string `yaml:"rule_key"` + EntityIDPattern string `yaml:"entity_id_pattern"` + Operator string `yaml:"operator"` + Operand string `yaml:"operand"` + EventType string `yaml:"event_type"` + EventLevel string `yaml:"event_level"` + Description string `yaml:"description"` + IsActive bool `yaml:"is_active"` + AllowNewEvent bool `yaml:"allow_new_event"` +} + +// DeviceModelRules represents event rules for a specific device model +type DeviceModelRules struct { + Brand string `yaml:"brand"` // e.g., "rakwireless" + Model string `yaml:"model"` // e.g., "rak4630" + ModelID string `yaml:"model_id"` // Resolved from device service + DisplayName string `yaml:"display_name"` + Rules []YAMLRule `yaml:"rules"` +} + +// LoadSystemDefaultRules loads YAML files from brand/model directory structure +// These are system default rules that apply to all devices of a specific brand/model +func LoadSystemDefaultRules(dir string) (map[string]*DeviceModelRules, error) { + result := make(map[string]*DeviceModelRules) + + // Walk through the directory structure + err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + // Skip directories and non-YAML files + if info.IsDir() { + return nil + } + + ext := strings.ToLower(filepath.Ext(path)) + if ext != ".yaml" && ext != ".yml" { + return nil + } + + // Load the device model rules from this file + rules, err := loadDeviceModelRules(path, dir) + if err != nil { + // Log warning but continue loading other files + fmt.Printf("Warning: failed to load %s: %v\n", path, err) + return nil + } + + // Index by brand/model + key := fmt.Sprintf("%s/%s", strings.ToLower(rules.Brand), strings.ToLower(rules.Model)) + result[key] = rules + + return nil + }) + + if err != nil { + return nil, fmt.Errorf("failed to walk directory: %w", err) + } + + if len(result) == 0 { + return nil, fmt.Errorf("no valid event rules found in directory: %s", dir) + } + + return result, nil +} + +// loadDeviceModelRules loads rules from a single YAML file +func loadDeviceModelRules(filePath, baseDir string) (*DeviceModelRules, error) { + absBaseDir, err := filepath.Abs(baseDir) + if err != nil { + return nil, fmt.Errorf("failed to resolve base directory: %w", err) + } + + absFilePath, err := filepath.Abs(filePath) + if err != nil { + return nil, fmt.Errorf("failed to resolve file path: %w", err) + } + + // Check if the file path is within the base directory + relPath, err := filepath.Rel(absBaseDir, absFilePath) + if err != nil || strings.HasPrefix(relPath, "..") { + return nil, fmt.Errorf("file path is outside base directory: %s", filePath) + } + + // #nosec G304 -- Path is validated above to ensure it's within baseDir + data, err := os.ReadFile(absFilePath) + if err != nil { + return nil, fmt.Errorf("failed to read file: %w", err) + } + + var rules DeviceModelRules + if err := yaml.Unmarshal(data, &rules); err != nil { + return nil, fmt.Errorf("failed to parse YAML: %w", err) + } + + // Validate required fields + if rules.Brand == "" { + return nil, fmt.Errorf("missing required field: brand") + } + if rules.Model == "" { + return nil, fmt.Errorf("missing required field: model") + } + + // Set defaults + if rules.DisplayName == "" { + caser := cases.Title(language.English) + rules.DisplayName = fmt.Sprintf("%s %s Rules", caser.String(rules.Brand), strings.ToUpper(rules.Model)) + } + + return &rules, nil +} \ No newline at end of file diff --git a/internal/events/registry/cache.go b/internal/events/registry/cache.go new file mode 100644 index 0000000..7bd006f --- /dev/null +++ b/internal/events/registry/cache.go @@ -0,0 +1,245 @@ +package registry + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/Space-DF/telemetry-service/internal/models" + "github.com/Space-DF/telemetry-service/internal/timescaledb" + "go.uber.org/zap" +) + +const ( + // DefaultCacheTTL is the default time-to-live for cached device rules + DefaultCacheTTL = 5 * time.Minute + // DefaultCacheCleanupInterval is how often to clean expired cache entries + DefaultCacheCleanupInterval = 1 * time.Minute +) + +// DeviceRulesCacheEntry represents a cached entry for device automation rules +type DeviceRulesCacheEntry struct { + Rules []models.EventRule // Flat array (for compatibility) + RulesByKey map[string][]models.EventRule // Grouped by rule_key for O(1) lookup + CachedAt time.Time + ExpiresAt time.Time +} + +// DeviceRulesCache manages caching of device automation rules +type DeviceRulesCache struct { + cache map[string]*DeviceRulesCacheEntry // key: deviceID + mu sync.RWMutex + ttl time.Duration + db *timescaledb.Client + logger *zap.Logger + + // Metrics (atomic counters for thread safety) + hits atomic.Int64 + misses atomic.Int64 + + // Cleanup management + ticker *time.Ticker + stopped chan struct{} +} + +// NewDeviceRulesCache creates a new device rules cache +func NewDeviceRulesCache(db *timescaledb.Client, logger *zap.Logger) *DeviceRulesCache { + return &DeviceRulesCache{ + cache: make(map[string]*DeviceRulesCacheEntry), + ttl: DefaultCacheTTL, + ticker: time.NewTicker(DefaultCacheCleanupInterval), + stopped: make(chan struct{}), + db: db, + logger: logger, + } +} + +// Start begins the background cleanup goroutine +func (c *DeviceRulesCache) Start() { + go func() { + for { + select { + case <-c.ticker.C: + c.cleanupExpired() + case <-c.stopped: + c.ticker.Stop() + return + } + } + }() +} + +// Stop stops the cache cleanup goroutine +func (c *DeviceRulesCache) Stop() { + close(c.stopped) +} + +// Get retrieves device automation rules from cache or database +func (c *DeviceRulesCache) Get(ctx context.Context, deviceID string) []models.EventRule { + now := time.Now() + + // Try cache first (read lock) + c.mu.RLock() + entry, found := c.cache[deviceID] + c.mu.RUnlock() + + if found && now.Before(entry.ExpiresAt) { + // Cache hit - valid entry + c.hits.Add(1) + c.logger.Debug("Device rules cache hit", + zap.String("device_id", deviceID), + zap.Int("rule_count", len(entry.Rules))) + return entry.Rules + } + + // Cache miss or expired + c.misses.Add(1) + + if found { + c.logger.Debug("Device rules cache expired", + zap.String("device_id", deviceID), + zap.Time("expired_at", entry.ExpiresAt)) + } + + // Fetch from database + rules, err := c.db.GetActiveRulesForDevice(ctx, deviceID) + if err != nil { + c.logger.Error("Failed to fetch automation rules for device", + zap.String("device_id", deviceID), + zap.Error(err)) + return nil + } + + // Group rules by rule_key for O(1) lookup + rulesByKey := c.groupRulesByKey(rules) + + // Store in cache (write lock) + entry = &DeviceRulesCacheEntry{ + Rules: rules, + RulesByKey: rulesByKey, + CachedAt: now, + ExpiresAt: now.Add(c.ttl), + } + + c.mu.Lock() + c.cache[deviceID] = entry + c.mu.Unlock() + + c.logger.Debug("Cached device automation rules", + zap.String("device_id", deviceID), + zap.Int("rule_count", len(rules)), + zap.Duration("ttl", c.ttl)) + + return rules +} + +// GetGrouped retrieves grouped device automation rules from cache or database +func (c *DeviceRulesCache) GetGrouped(ctx context.Context, deviceID string) map[string][]models.EventRule { + now := time.Now() + + // Try cache first (read lock) + c.mu.RLock() + entry, found := c.cache[deviceID] + c.mu.RUnlock() + + if found && now.Before(entry.ExpiresAt) { + // Cache hit + c.hits.Add(1) + return entry.RulesByKey + } + + // Cache miss - call Get() which will populate the cache + c.Get(ctx, deviceID) + + // Try cache again + c.mu.RLock() + entry, found = c.cache[deviceID] + c.mu.RUnlock() + + if found && now.Before(entry.ExpiresAt) { + return entry.RulesByKey + } + + return nil +} + +// groupRulesByKey groups rules by their rule_key for O(1) lookup +func (c *DeviceRulesCache) groupRulesByKey(rules []models.EventRule) map[string][]models.EventRule { + result := make(map[string][]models.EventRule) + + for _, rule := range rules { + if rule.RuleKey != nil && *rule.RuleKey != "" { + result[*rule.RuleKey] = append(result[*rule.RuleKey], rule) + } + } + + return result +} + +// Invalidate removes cached rules for a specific device +// Call this when automation rules are created, updated, or deleted +func (c *DeviceRulesCache) Invalidate(deviceID string) { + c.mu.Lock() + delete(c.cache, deviceID) + c.mu.Unlock() + + c.logger.Info("Invalidated device automation rules cache", + zap.String("device_id", deviceID)) +} + +// InvalidateAll clears the entire cache +func (c *DeviceRulesCache) InvalidateAll() { + c.mu.Lock() + c.cache = make(map[string]*DeviceRulesCacheEntry) + c.mu.Unlock() + + c.logger.Info("Invalidated all device automation rules cache") +} + +// GetMetrics returns cache performance metrics +func (c *DeviceRulesCache) GetMetrics() map[string]interface{} { + hits := c.hits.Load() + misses := c.misses.Load() + total := hits + misses + + hitRate := float64(0) + if total > 0 { + hitRate = float64(hits) / float64(total) * 100 + } + + c.mu.RLock() + cacheSize := len(c.cache) + c.mu.RUnlock() + + return map[string]interface{}{ + "hits": hits, + "misses": misses, + "total": total, + "hit_rate": hitRate, + "cache_size": cacheSize, + "ttl_seconds": c.ttl.Seconds(), + } +} + +// cleanupExpired removes expired entries from the cache +func (c *DeviceRulesCache) cleanupExpired() { + now := time.Now() + expiredCount := 0 + + c.mu.Lock() + defer c.mu.Unlock() + + for deviceID, entry := range c.cache { + if now.After(entry.ExpiresAt) { + delete(c.cache, deviceID) + expiredCount++ + } + } + + if expiredCount > 0 { + c.logger.Debug("Cleaned expired device rules cache entries", + zap.Int("expired_count", expiredCount), + zap.Int("remaining_cache_size", len(c.cache))) + } +} diff --git a/internal/events/registry/registry.go b/internal/events/registry/registry.go new file mode 100644 index 0000000..d2635e4 --- /dev/null +++ b/internal/events/registry/registry.go @@ -0,0 +1,185 @@ +package registry + +import ( + "context" + "fmt" + "strings" + "sync" + + "github.com/Space-DF/telemetry-service/internal/events/evaluator" + "github.com/Space-DF/telemetry-service/internal/events/loader" + "github.com/Space-DF/telemetry-service/internal/models" + "github.com/Space-DF/telemetry-service/internal/timescaledb" + "go.uber.org/zap" +) + +// RuleRegistry manages event rules from both YAML files and database +type RuleRegistry struct { + // Default rules from YAML (key: "brand/model" e.g., "rakwireless/rak4630") + defaultRules map[string]*loader.DeviceModelRules + groupedDefaultRules map[string]map[string][]loader.YAMLRule // "brand/model" → rule_key → rules + defaultRulesMu sync.RWMutex + + // Cache for device automation rules + cache *DeviceRulesCache + + evaluator *evaluator.Evaluator + db *timescaledb.Client + logger *zap.Logger +} + +// NewRuleRegistry creates a new rule registry +func NewRuleRegistry(db *timescaledb.Client, logger *zap.Logger) *RuleRegistry { + r := &RuleRegistry{ + defaultRules: make(map[string]*loader.DeviceModelRules), + groupedDefaultRules: make(map[string]map[string][]loader.YAMLRule), + cache: NewDeviceRulesCache(db, logger), + evaluator: evaluator.NewEvaluator(logger), + db: db, + logger: logger, + } + + // Start background cache cleanup + r.cache.Start() + + return r +} + +// LoadDefaultRulesFromDir loads system default event rules from YAML files organized by brand/model +func (r *RuleRegistry) LoadDefaultRulesFromDir(dir string) error { + r.defaultRulesMu.Lock() + defer r.defaultRulesMu.Unlock() + + rules, err := loader.LoadSystemDefaultRules(dir) + if err != nil { + return fmt.Errorf("failed to load default rules from directory: %w", err) + } + + r.defaultRules = rules + + // Group default rules by rule_key for O(1) lookup + r.groupedDefaultRules = make(map[string]map[string][]loader.YAMLRule) + for key, dm := range rules { + grouped := make(map[string][]loader.YAMLRule) + for _, rule := range dm.Rules { + grouped[rule.RuleKey] = append(grouped[rule.RuleKey], rule) + } + r.groupedDefaultRules[key] = grouped + } + + // Log loaded rules + for _, dm := range rules { + r.logger.Info("Loaded default event rules", + zap.String("brand", dm.Brand), + zap.String("model", dm.Model), + zap.Int("rule_count", len(dm.Rules))) + } + + r.logger.Info("Default event rules loaded successfully", + zap.Int("device_models", len(rules))) + + return nil +} + +// This function is the core of the rule evaluation process, It checks if there's any custom automation event rules that +// Created by the user for the the specific device. If there are, it evaluates those first. If not, it falls back to the default system rules +func (r *RuleRegistry) Evaluate(ctx context.Context, deviceID, brand, model string, entities []models.TelemetryEntity) []models.MatchedEvent { + var matchedEvents []models.MatchedEvent + matchedRuleKeys := make(map[string]bool) + + if len(entities) == 0 { + return matchedEvents + } + + // Try to get grouped automation rules from cache first + rulesByKey := r.cache.GetGrouped(ctx, deviceID) + + // Evaluate custom automation rules if they exist + if len(rulesByKey) > 0 { + r.logger.Debug("Using custom automation rules for device", + zap.String("device_id", deviceID), + zap.Int("rule_count", len(rulesByKey))) + + // Match entity to rules by rule_key + for _, entity := range entities { + // Check entity_type for state-based entities + if entity.EntityType != "" { + if rules, exists := rulesByKey[entity.EntityType]; exists { + for _, rule := range rules { + if matched := r.evaluator.EvaluateRuleDB(rule, deviceID, entity); matched != nil { + matchedEvents = append(matchedEvents, *matched) + matchedRuleKeys[matched.RuleKey] = true + } + } + } + } + } + } + + // Evaluate default system rules + // Only for rule_keys that didn't match custom automation rules + r.defaultRulesMu.RLock() + key := fmt.Sprintf("%s/%s", strings.ToLower(brand), strings.ToLower(model)) + defaultRulesByKey, exists := r.groupedDefaultRules[key] + r.defaultRulesMu.RUnlock() + + if exists { + for _, entity := range entities { + // Check entity_type for state-based entities + if entity.EntityType != "" { + // Skip if custom automation rule already matched for this rule_key + if !matchedRuleKeys[entity.EntityType] { + if rules, exists := defaultRulesByKey[entity.EntityType]; exists { + for _, rule := range rules { + if matched := r.evaluator.EvaluateRule(rule, deviceID, entity); matched != nil { + matchedEvents = append(matchedEvents, *matched) + } + } + } + } + } + } + } + + return matchedEvents +} + +// GetDefaultRules returns all loaded default rules (for debugging/inspection) +func (r *RuleRegistry) GetDefaultRules() map[string]*loader.DeviceModelRules { + r.defaultRulesMu.RLock() + defer r.defaultRulesMu.RUnlock() + + // Return a copy to avoid race conditions + result := make(map[string]*loader.DeviceModelRules, len(r.defaultRules)) + for k, v := range r.defaultRules { + result[k] = v + } + return result +} + +// ReloadDefaultRules reloads default rules from the configured directory +func (r *RuleRegistry) ReloadDefaultRules(dir string) error { + r.logger.Info("Reloading default event rules", zap.String("dir", dir)) + return r.LoadDefaultRulesFromDir(dir) +} + +// InvalidateDeviceCache removes cached rules for a specific device +// Call this when automation rules are created, updated, or deleted for a device +func (r *RuleRegistry) InvalidateDeviceCache(deviceID string) { + r.cache.Invalidate(deviceID) +} + +// InvalidateAllDeviceCache clears the entire device rules cache +func (r *RuleRegistry) InvalidateAllDeviceCache() { + r.cache.InvalidateAll() +} + +// GetCacheMetrics returns cache performance metrics +func (r *RuleRegistry) GetCacheMetrics() map[string]interface{} { + return r.cache.GetMetrics() +} + +// Stop stops the registry and cleanup goroutines +func (r *RuleRegistry) Stop() { + r.cache.Stop() +} diff --git a/internal/models/events.go b/internal/models/events.go new file mode 100644 index 0000000..26cb977 --- /dev/null +++ b/internal/models/events.go @@ -0,0 +1,261 @@ +package models + +import ( + "encoding/json" + "time" +) + +// EventType represents a type of event (e.g., "state_changed", "service_call") +type EventType struct { + EventTypeID int `json:"event_type_id" db:"event_type_id"` + EventType string `json:"event_type" db:"event_type"` + CreatedAt time.Time `json:"created_at" db:"created_at"` +} + +// EventData represents shared event data (deduplicated by hash) +type EventData struct { + DataID int64 `json:"data_id" db:"data_id"` + Hash int64 `json:"hash" db:"hash"` + SharedData json.RawMessage `json:"shared_data" db:"shared_data"` + CreatedAt time.Time `json:"created_at" db:"created_at"` +} + +// EventRule represents an automation rule for triggering events based on conditions +type EventRule struct { + EventRuleID string `json:"event_rule_id" db:"event_rule_id"` + DeviceID *string `json:"device_id,omitempty" db:"device_id"` + RuleKey *string `json:"rule_key,omitempty" db:"rule_key"` // e.g., 'battery_low', 'temperature_low' + Operator *string `json:"operator,omitempty" db:"operator"` // eq, ne, gt, lt, gte, lte,... + Operand string `json:"operand" db:"operand"` + IsActive *bool `json:"is_active,omitempty" db:"is_active"` + StartTime *time.Time `json:"start_time,omitempty" db:"start_time"` + EndTime *time.Time `json:"end_time,omitempty" db:"end_time"` + AllowNewEvent *bool `json:"allow_new_event,omitempty" db:"allow_new_event"` + CreatedAt time.Time `json:"created_at" db:"created_at"` + UpdatedAt time.Time `json:"updated_at" db:"updated_at"` +} + +// EventRuleRequest represents a request to create or update an event rule +type EventRuleRequest struct { + DeviceID *string `json:"device_id,omitempty" validate:"required,uuid"` + RuleKey *string `json:"rule_key,omitempty" validate:"required"` + Operator *string `json:"operator,omitempty" validate:"omitempty,oneof=eq ne gt lt gte lte contains"` + Operand string `json:"operand" validate:"required"` + IsActive *bool `json:"is_active,omitempty"` + AllowNewEvent *bool `json:"allow_new_event,omitempty"` + StartTime *string `json:"start_time,omitempty" validate:"omitempty,datetime=2006-01-02T15:04:05Z07:00"` + EndTime *string `json:"end_time,omitempty" validate:"omitempty,datetime=2006-01-02T15:04:05Z07:00"` +} + +// EventRuleResponse represents an event rule response +type EventRuleResponse struct { + EventRuleID string `json:"event_rule_id"` + DeviceID *string `json:"device_id,omitempty"` + RuleKey *string `json:"rule_key,omitempty"` + Operator *string `json:"operator,omitempty"` + Operand string `json:"operand"` + IsActive *bool `json:"is_active,omitempty"` + StartTime *time.Time `json:"start_time,omitempty"` + EndTime *time.Time `json:"end_time,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +// EventRulesListResponse represents a paginated list of event rules +type EventRulesListResponse struct { + Rules []EventRule `json:"rules"` + TotalCount int `json:"total_count"` + Page int `json:"page"` + PageSize int `json:"page_size"` +} + +// Event represents an event occurrence +type Event struct { + EventID int64 `json:"event_id" db:"event_id"` + EventTypeID int `json:"event_type_id" db:"event_type_id"` + DataID *int64 `json:"data_id,omitempty" db:"data_id"` + EventLevel *string `json:"event_level,omitempty" db:"event_level"` // manufacturer, system, automation + EventRuleID *string `json:"event_rule_id,omitempty" db:"event_rule_id"` + SpaceSlug string `json:"space_slug,omitempty" db:"space_slug"` + EntityID *string `json:"entity_id,omitempty" db:"entity_id"` + StateID *int64 `json:"state_id,omitempty" db:"state_id"` + TriggerID *string `json:"trigger_id,omitempty" db:"trigger_id"` + TimeFiredTs int64 `json:"time_fired_ts" db:"time_fired_ts"` + CreatedAt time.Time `json:"created_at" db:"created_at"` + + // Joined fields + EventType string `json:"event_type,omitempty" db:"event_type"` + SharedData json.RawMessage `json:"shared_data,omitempty" db:"shared_data"` +} + +// StatesMeta represents metadata about entity states +type StatesMeta struct { + MetadataID int `json:"metadata_id" db:"metadata_id"` + EntityID string `json:"entity_id" db:"entity_id"` + CreatedAt time.Time `json:"created_at" db:"created_at"` +} + +// StateAttributes represents shared state attributes (deduplicated by hash) +type StateAttributes struct { + AttributesID int `json:"attributes_id" db:"attributes_id"` + Hash int64 `json:"hash" db:"hash"` + SharedAttrs json.RawMessage `json:"shared_attrs" db:"shared_attrs"` + CreatedAt time.Time `json:"created_at" db:"created_at"` +} + +// State represents an entity state with event linkage +type State struct { + StateID int64 `json:"state_id" db:"state_id"` + MetadataID int `json:"metadata_id" db:"metadata_id"` + State string `json:"state" db:"state"` + AttributesID *int `json:"attributes_id,omitempty" db:"attributes_id"` + EventID *int64 `json:"event_id,omitempty" db:"event_id"` + LastChangedTs int64 `json:"last_changed_ts" db:"last_changed_ts"` + LastUpdatedTs int64 `json:"last_updated_ts" db:"last_updated_ts"` + CreatedAt time.Time `json:"created_at" db:"created_at"` + + // Joined fields + EntityID string `json:"entity_id,omitempty" db:"entity_id"` + SharedAttrs json.RawMessage `json:"shared_attrs,omitempty" db:"shared_attrs"` +} + +// StateChangeRequest represents a request to record a state change +type StateChangeRequest struct { + EntityID string `json:"entity_id"` + SpaceSlug string `json:"space_slug,omitempty"` // optional, will be resolved from headers if not provided + NewState string `json:"new_state"` + OldState string `json:"old_state,omitempty"` + Attributes map[string]interface{} `json:"attributes,omitempty"` + EventType string `json:"event_type,omitempty"` // defaults to "state_changed" + TimeFiredTs *int64 `json:"time_fired_ts,omitempty"` + ContextID []byte `json:"context_id,omitempty"` + TriggerID *string `json:"trigger_id,omitempty"` // for future automations reference +} + +// StateHistoryResponse represents historical state data for an entity +type StateHistoryResponse struct { + EntityID string `json:"entity_id"` + State string `json:"state"` + Attributes map[string]interface{} `json:"attributes,omitempty"` + LastChanged time.Time `json:"last_changed"` + LastUpdated time.Time `json:"last_updated"` + EventID *int64 `json:"event_id,omitempty"` +} + +// EventsListResponse represents paginated events +type EventsListResponse struct { + Events []Event `json:"events"` + TotalCount int `json:"total_count"` + Page int `json:"page"` + PageSize int `json:"page_size"` +} + +// StateDetailResponse combines state with metadata and attributes +type StateDetailResponse struct { + StateID int64 `json:"state_id"` + EntityID string `json:"entity_id"` + State string `json:"state"` + Attributes map[string]interface{} `json:"attributes,omitempty"` + LastChanged time.Time `json:"last_changed"` + LastUpdated time.Time `json:"last_updated"` + TriggeringEvent *EventDetail `json:"triggering_event,omitempty"` +} + +// EventDetail represents a full event with its type and data +type EventDetail struct { + EventID int64 `json:"event_id"` + EventType string `json:"event_type"` + LevelEventID *string `json:"level_event_id,omitempty"` + EventRuleID *string `json:"event_rule_id,omitempty"` + SpaceSlug string `json:"space_slug,omitempty"` + EntityID *string `json:"entity_id,omitempty"` + StateID *int64 `json:"state_id,omitempty"` + TriggerID *string `json:"trigger_id,omitempty"` + TimeFired time.Time `json:"time_fired"` + EventData map[string]interface{} `json:"event_data,omitempty"` + ContextID []byte `json:"context_id,omitempty"` + + // Joined fields + EventRule *EventRule `json:"event_rule,omitempty"` +} + +// TimestampsToTime converts timestamp fields to time.Time +func (s *State) LastChangedTime() time.Time { + return time.UnixMilli(s.LastChangedTs) +} + +// LastUpdatedTime converts the last_updated_ts to time.Time +func (s *State) LastUpdatedTime() time.Time { + return time.UnixMilli(s.LastUpdatedTs) +} + +// TimeFired converts the time_fired_ts to time.Time +func (e *Event) TimeFired() time.Time { + return time.UnixMilli(e.TimeFiredTs) +} + +// ParseAttributes parses the shared_attrs JSON into a map +func (s *State) ParseAttributes() (map[string]interface{}, error) { + if s.SharedAttrs == nil { + return nil, nil + } + var attrs map[string]interface{} + if err := json.Unmarshal(s.SharedAttrs, &attrs); err != nil { + return nil, err + } + return attrs, nil +} + +// ParseEventData parses the shared_data JSON into a map +func (e *Event) ParseEventData() (map[string]interface{}, error) { + if e.SharedData == nil { + return nil, nil + } + var data map[string]interface{} + if err := json.Unmarshal(e.SharedData, &data); err != nil { + return nil, err + } + return data, nil +} + +// SetSharedAttrs sets the shared_attrs from a map +func (s *StateAttributes) SetSharedAttrs(attrs map[string]interface{}) error { + if attrs == nil { + return nil + } + data, err := json.Marshal(attrs) + if err != nil { + return err + } + s.SharedAttrs = data + return nil +} + +// SetSharedData sets the shared_data from a map +func (e *EventData) SetSharedData(data map[string]interface{}) error { + if data == nil { + return nil + } + raw, err := json.Marshal(data) + if err != nil { + return err + } + e.SharedData = raw + return nil +} + +// MatchedEvent represents an event rule that matched evaluation +type MatchedEvent struct { + EntityID string `json:"entity_id"` + EntityType string `json:"entity_type"` + RuleKey string `json:"rule_key"` + EventType string `json:"event_type"` + EventLevel string `json:"event_level"` + Description string `json:"description"` + Value float64 `json:"value"` + Threshold float64 `json:"threshold"` + Operator string `json:"operator"` + Timestamp int64 `json:"timestamp"` // Unix timestamp in milliseconds + EventRuleID *string `json:"event_rule_id,omitempty"` + StateID *string `json:"state_id,omitempty"` +} diff --git a/internal/models/telemetry.go b/internal/models/telemetry.go index 308c8eb..9a4e021 100644 --- a/internal/models/telemetry.go +++ b/internal/models/telemetry.go @@ -35,4 +35,5 @@ type TelemetryEntity struct { UnitOfMeas string `json:"unit_of_measurement,omitempty"` Icon string `json:"icon,omitempty"` Timestamp string `json:"timestamp"` + StateID *string `json:"state_id,omitempty"` } diff --git a/internal/services/processor.go b/internal/services/processor.go index a6ea578..5af3a64 100644 --- a/internal/services/processor.go +++ b/internal/services/processor.go @@ -4,7 +4,9 @@ import ( "context" "fmt" "sync/atomic" + "time" + "github.com/Space-DF/telemetry-service/internal/events/registry" "github.com/Space-DF/telemetry-service/internal/models" timescaledb "github.com/Space-DF/telemetry-service/internal/timescaledb" "go.uber.org/zap" @@ -12,8 +14,9 @@ import ( // LocationProcessor processes device location messages and stores them in Psql type LocationProcessor struct { - tsClient *timescaledb.Client - logger *zap.Logger + tsClient *timescaledb.Client + ruleRegistry *registry.RuleRegistry + logger *zap.Logger // Counters for monitoring processedCount atomic.Int64 @@ -22,10 +25,11 @@ type LocationProcessor struct { } // NewLocationProcessor creates a new location processor -func NewLocationProcessor(tsClient *timescaledb.Client, logger *zap.Logger) *LocationProcessor { +func NewLocationProcessor(tsClient *timescaledb.Client, ruleRegistry *registry.RuleRegistry, logger *zap.Logger) *LocationProcessor { return &LocationProcessor{ - tsClient: tsClient, - logger: logger, + tsClient: tsClient, + ruleRegistry: ruleRegistry, + logger: logger, } } @@ -147,6 +151,36 @@ func (p *LocationProcessor) ProcessTelemetry(ctx context.Context, payload *model return err } + // Evaluate rules and create events for matched rules + if p.ruleRegistry != nil { + matchedEvents := p.ruleRegistry.Evaluate(ctx, + payload.DeviceID, + payload.DeviceInfo.Manufacturer, + payload.DeviceInfo.Model, + payload.Entities) + + for _, event := range matchedEvents { + // Set timestamp to current time if not set + if event.Timestamp == 0 { + event.Timestamp = time.Now().UnixMilli() + } + if err := p.tsClient.CreateEvent(ctx, payload.Organization, &event, payload.SpaceSlug); err != nil { + p.logger.Error("Failed to create event", + zap.Error(err), + zap.String("entity_id", event.EntityID), + zap.String("rule_key", event.RuleKey)) + } else { + p.logger.Info("Event created from rule match", + zap.String("entity_id", event.EntityID), + zap.String("rule_key", event.RuleKey), + zap.String("event_type", event.EventType), + zap.String("event_level", event.EventLevel), + zap.Float64("value", event.Value), + zap.Float64("threshold", event.Threshold)) + } + } + } + return nil } diff --git a/internal/timescaledb/attributes.go b/internal/timescaledb/attributes.go index 432c891..7d09374 100644 --- a/internal/timescaledb/attributes.go +++ b/internal/timescaledb/attributes.go @@ -13,64 +13,6 @@ import ( "go.uber.org/zap" ) -// GetLatestAttributesForDeviceAt returns the shared attributes JSON for the -// given device at or before the provided timestamp. If there are no -// attributes available it returns (nil, nil). -func (c *Client) GetLatestAttributesForDeviceAt(ctx context.Context, deviceID string, at time.Time) (map[string]interface{}, error) { - org := orgFromContext(ctx) - - query := `SELECT a.shared_attrs - FROM entities e - JOIN entity_states s ON s.entity_id = e.id - LEFT JOIN entity_state_attributes a ON s.attributes_id = a.id - WHERE e.device_id::text = $1 AND s.reported_at <= $2 AND a.shared_attrs IS NOT NULL - ORDER BY s.reported_at DESC - LIMIT 1` - - var rawAttrs []byte - if org != "" { - if err := c.WithOrgTx(ctx, org, func(txCtx context.Context, tx bob.Tx) error { - rows, err := tx.QueryContext(txCtx, query, deviceID, at) - if err != nil { - return err - } - defer func() { - _ = rows.Close() - }() - if rows.Next() { - return rows.Scan(&rawAttrs) - } - return nil - }); err != nil { - return nil, fmt.Errorf("failed to query attributes: %w", err) - } - } else { - rows, err := c.DB.QueryContext(ctx, query, deviceID, at) - if err != nil { - return nil, fmt.Errorf("failed to query attributes: %w", err) - } - defer func() { - _ = rows.Close() - }() - if rows.Next() { - if err := rows.Scan(&rawAttrs); err != nil { - return nil, err - } - } - } - - if len(rawAttrs) == 0 { - return nil, nil - } - - var attrs map[string]interface{} - if err := json.Unmarshal(rawAttrs, &attrs); err != nil { - return nil, fmt.Errorf("failed to unmarshal attributes JSON: %w", err) - } - - return attrs, nil -} - type Location struct { Time time.Time DeviceID string diff --git a/internal/timescaledb/events.go b/internal/timescaledb/events.go new file mode 100644 index 0000000..eeb02c3 --- /dev/null +++ b/internal/timescaledb/events.go @@ -0,0 +1,472 @@ +package timescaledb + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "hash/crc32" + "time" + + "github.com/Space-DF/telemetry-service/internal/models" + "github.com/stephenafamo/bob" +) + +// EventType constants +const ( + EventTypeStateChanged = "state_changed" + EventTypeAutomation = "automation_triggered" +) + +// Pagination constants +const ( + DefaultPage = 1 + DefaultPageSize = 20 + MaxPageSize = 100 + DefaultEventLimit = 100 +) + +// GetEventsByDevice retrieves all events for a specific entity. +func (c *Client) GetEventsByDevice(ctx context.Context, org, deviceID string, limit int, startTime, endTime *int64) ([]models.Event, error) { + if org == "" { + return nil, fmt.Errorf("organization is required") + } + if deviceID == "" { + return nil, fmt.Errorf("device_id is required") + } + if limit <= 0 { + limit = DefaultEventLimit + } + + var events []models.Event + + err := c.WithOrgTx(ctx, org, func(txCtx context.Context, tx bob.Tx) error { + // Build base query with device_id filter + whereClause := `ed.shared_data->>'device_id' = $1` + args := []interface{}{deviceID} + + // Add time range filters if provided + argIndex := 2 + if startTime != nil { + whereClause += fmt.Sprintf(" AND e.time_fired_ts >= $%d", argIndex) + args = append(args, *startTime) + argIndex++ + } + if endTime != nil { + whereClause += fmt.Sprintf(" AND e.time_fired_ts <= $%d", argIndex) + args = append(args, *endTime) + argIndex++ + } + args = append(args, limit) + + // Complete the query + query := fmt.Sprintf(` + SELECT e.event_id, e.event_type_id, e.data_id, e.space_slug, + e.trigger_id, e.time_fired_ts, et.event_type, ed.shared_data + FROM events e + JOIN event_types et ON e.event_type_id = et.event_type_id + LEFT JOIN event_data ed ON e.data_id = ed.data_id + WHERE %s + ORDER BY e.time_fired_ts DESC + LIMIT $%d + `, whereClause, argIndex) + + rows, err := tx.QueryContext(txCtx, query, args...) + if err != nil { + return fmt.Errorf("failed to query events by device: %w", err) + } + defer func(){ + _ = rows.Close() + }() + + for rows.Next() { + var e models.Event + var dataID sql.NullInt64 + var slug sql.NullString + var contextID []byte + var triggerID sql.NullString + var sharedData []byte + + if err := rows.Scan(&e.EventID, &e.EventTypeID, &dataID, &slug, &contextID, &triggerID, &e.TimeFiredTs, &e.EventType, &sharedData); err != nil { + return err + } + + if dataID.Valid { + e.DataID = &dataID.Int64 + } + if slug.Valid { + e.SpaceSlug = slug.String + } + if triggerID.Valid { + e.TriggerID = &triggerID.String + } + if len(sharedData) > 0 { + e.SharedData = sharedData + } + + events = append(events, e) + } + + return rows.Err() + }) + + if err != nil { + return nil, err + } + + return events, nil +} + +// populateEventRuleResponse populates an EventRuleResponse from request data and times +func populateEventRuleResponse(result *models.EventRuleResponse, req *models.EventRuleRequest, startTime, endTime *time.Time) { + if req.DeviceID != nil { + result.DeviceID = req.DeviceID + } + if req.RuleKey != nil { + result.RuleKey = req.RuleKey + } + if req.Operator != nil { + result.Operator = req.Operator + } + result.Operand = req.Operand + if req.IsActive != nil { + result.IsActive = req.IsActive + } + result.StartTime = startTime + result.EndTime = endTime +} + +// GetEventRules retrieves event rules with pagination +func (c *Client) GetEventRules(ctx context.Context, deviceID string, page, pageSize int) ([]models.EventRule, int, error) { + if page <= 0 { + page = DefaultPage + } + if pageSize <= 0 || pageSize > MaxPageSize { + pageSize = DefaultPageSize + } + + offset := (page - 1) * pageSize + + var rules []models.EventRule + var total int + + org := orgFromContext(ctx) + if org == "" { + return nil, 0, fmt.Errorf("organization not found in context") + } + + err := c.WithOrgTx(ctx, org, func(txCtx context.Context, tx bob.Tx) error { + // Count total + countQuery := `SELECT COUNT(*) FROM event_rules` + args := []interface{}{} + + whereClause := "" + if deviceID != "" { + whereClause = " WHERE device_id = $1" + args = append(args, deviceID) + } + + countQuery += whereClause + err := tx.QueryRowContext(txCtx, countQuery, args...).Scan(&total) + if err != nil { + return fmt.Errorf("failed to count event rules: %w", err) + } + + // Query rules + query := ` + SELECT er.event_rule_id, er.device_id, er.rule_key, er.operator, er.operand, + er.is_active, er.start_time, er.end_time, er.allow_new_event, er.created_at, er.updated_at + FROM event_rules er + ` + whereClause + ` ORDER BY er.created_at DESC LIMIT $` + fmt.Sprintf("%d", len(args)+1) + ` OFFSET $` + fmt.Sprintf("%d", len(args)+2) + args = append(args, pageSize, offset) + + rows, err := tx.QueryContext(txCtx, query, args...) + if err != nil { + return fmt.Errorf("failed to query event rules: %w", err) + } + defer func() { _ = rows.Close() }() + + for rows.Next() { + var r models.EventRule + if err := rows.Scan( + &r.EventRuleID, &r.DeviceID, &r.RuleKey, &r.Operator, &r.Operand, + &r.IsActive, &r.StartTime, &r.EndTime, &r.AllowNewEvent, &r.CreatedAt, &r.UpdatedAt, + ); err != nil { + return err + } + + rules = append(rules, r) + } + + return rows.Err() + }) + + if err != nil { + return nil, 0, err + } + + return rules, total, nil +} + +// GetActiveRulesForDevice retrieves active automation rules for a specific device +// Returns only device-specific automation rules created by users +// If no automation rules exist, the caller should fall back to default system rules +func (c *Client) GetActiveRulesForDevice(ctx context.Context, deviceID string) ([]models.EventRule, error) { + var rules []models.EventRule + + org := orgFromContext(ctx) + if org == "" { + return nil, fmt.Errorf("organization not found in context") + } + + err := c.WithOrgTx(ctx, org, func(txCtx context.Context, tx bob.Tx) error { + // Query automation rules for this specific device only + // Filter by time range to exclude expired rules + query := ` + SELECT er.event_rule_id, er.device_id, er.rule_key, er.operator, er.operand, + er.is_active, er.start_time, er.end_time, er.allow_new_event, er.created_at, er.updated_at + FROM event_rules er + WHERE er.is_active = true + AND er.device_id = $1 + AND (er.start_time IS NULL OR er.start_time <= NOW()) + AND (er.end_time IS NULL OR er.end_time > NOW()) + ORDER BY er.created_at DESC + ` + + rows, err := tx.QueryContext(txCtx, query, deviceID) + if err != nil { + return fmt.Errorf("failed to query event rules: %w", err) + } + defer func() { _ = rows.Close() }() + + for rows.Next() { + var r models.EventRule + if err := rows.Scan( + &r.EventRuleID, &r.DeviceID, &r.RuleKey, &r.Operator, &r.Operand, + &r.IsActive, &r.StartTime, &r.EndTime, &r.AllowNewEvent, &r.CreatedAt, &r.UpdatedAt, + ); err != nil { + return err + } + + rules = append(rules, r) + } + + return rows.Err() + }) + + if err != nil { + return nil, err + } + + return rules, nil +} + +// CreateEventRule creates a new event rule +func (c *Client) CreateEventRule(ctx context.Context, req *models.EventRuleRequest) (*models.EventRuleResponse, error) { + if req == nil { + return nil, fmt.Errorf("nil request") + } + + var result models.EventRuleResponse + + org := orgFromContext(ctx) + if org == "" { + return nil, fmt.Errorf("organization not found in context") + } + + err := c.WithOrgTx(ctx, org, func(txCtx context.Context, tx bob.Tx) error { + // Parse start and end times + var startTime, endTime *time.Time + if req.StartTime != nil { + t, parseErr := time.Parse(time.RFC3339, *req.StartTime) + if parseErr != nil { + return fmt.Errorf("invalid start_time format: %w", parseErr) + } + startTime = &t + } + if req.EndTime != nil { + t, parseErr := time.Parse(time.RFC3339, *req.EndTime) + if parseErr != nil { + return fmt.Errorf("invalid end_time format: %w", parseErr) + } + endTime = &t + } + + // Insert event rule + err := tx.QueryRowContext(txCtx, ` + INSERT INTO event_rules (device_id, rule_key, operator, operand, is_active, allow_new_event, start_time, end_time) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + RETURNING event_rule_id, created_at, updated_at + `, req.DeviceID, req.RuleKey, req.Operator, req.Operand, + req.IsActive, req.AllowNewEvent, startTime, endTime).Scan( + &result.EventRuleID, &result.CreatedAt, &result.UpdatedAt, + ) + + if err != nil { + return fmt.Errorf("failed to insert event rule: %w", err) + } + populateEventRuleResponse(&result, req, startTime, endTime) + + return nil + }) + + if err != nil { + return nil, err + } + + return &result, nil +} + +// UpdateEventRule updates an existing event rule +func (c *Client) UpdateEventRule(ctx context.Context, ruleID string, req *models.EventRuleRequest) (*models.EventRuleResponse, error) { + if req == nil { + return nil, fmt.Errorf("nil request") + } + if ruleID == "" { + return nil, fmt.Errorf("rule_id is required") + } + + var result models.EventRuleResponse + + org := orgFromContext(ctx) + if org == "" { + return nil, fmt.Errorf("organization not found in context") + } + + err := c.WithOrgTx(ctx, org, func(txCtx context.Context, tx bob.Tx) error { + // Parse start and end times + var startTime, endTime *time.Time + if req.StartTime != nil { + t, parseErr := time.Parse(time.RFC3339, *req.StartTime) + if parseErr != nil { + return fmt.Errorf("invalid start_time format: %w", parseErr) + } + startTime = &t + } + if req.EndTime != nil { + t, parseErr := time.Parse(time.RFC3339, *req.EndTime) + if parseErr != nil { + return fmt.Errorf("invalid end_time format: %w", parseErr) + } + endTime = &t + } + + // Update event rule + err := tx.QueryRowContext(txCtx, ` + UPDATE event_rules + SET device_id = $1, rule_key = $2, operator = $3, operand = $4, + is_active = $5, allow_new_event = $6, start_time = $7, end_time = $8, updated_at = NOW() + WHERE event_rule_id = $9 + RETURNING event_rule_id, created_at, updated_at + `, req.DeviceID, req.RuleKey, req.Operator, req.Operand, + req.IsActive, req.AllowNewEvent, startTime, endTime, ruleID).Scan( + &result.EventRuleID, &result.CreatedAt, &result.UpdatedAt, + ) + + if err != nil { + return fmt.Errorf("failed to update event rule: %w", err) + } + populateEventRuleResponse(&result, req, startTime, endTime) + + return nil + }) + + if err != nil { + return nil, err + } + + return &result, nil +} + +// DeleteEventRule deletes an event rule +func (c *Client) DeleteEventRule(ctx context.Context, ruleID string) error { + if ruleID == "" { + return fmt.Errorf("rule_id is required") + } + + org := orgFromContext(ctx) + if org == "" { + return fmt.Errorf("organization not found in context") + } + + return c.WithOrgTx(ctx, org, func(txCtx context.Context, tx bob.Tx) error { + result, err := tx.ExecContext(txCtx, `DELETE FROM event_rules WHERE event_rule_id = $1`, ruleID) + if err != nil { + return fmt.Errorf("failed to delete event rule: %w", err) + } + + rows, _ := result.RowsAffected() + if rows == 0 { + return fmt.Errorf("event rule not found") + } + + return nil + }) +} + +// CreateEvent creates a new event from a matched event rule +func (c *Client) CreateEvent(ctx context.Context, org string, event *models.MatchedEvent, spaceSlug string) error { + if event == nil { + return fmt.Errorf("nil event") + } + if org == "" { + return fmt.Errorf("organization is required") + } + + return c.WithOrgTx(ctx, org, func(txCtx context.Context, tx bob.Tx) error { + // Get event_type + var eventTypeID int + err := tx.QueryRowContext(txCtx, ` + SELECT event_type_id FROM event_types WHERE event_type = $1 + `, event.EventType).Scan(&eventTypeID) + + if err != nil { + if err == sql.ErrNoRows { + return fmt.Errorf("event type '%s' does not exist", event.EventType) + } + return fmt.Errorf("failed to get event_type: %w", err) + } + + // Create event_data with event details + dataID := sql.NullInt64{Valid: false} + eventData := map[string]interface{}{ + "description": event.Description, + "value": event.Value, + "threshold": event.Threshold, + "operator": event.Operator, + "rule_key": event.RuleKey, + } + + rawData, err := json.Marshal(eventData) + if err != nil { + return fmt.Errorf("failed to marshal event data: %w", err) + } + + hash := int64(crc32.ChecksumIEEE(rawData)) + err = tx.QueryRowContext(txCtx, ` + INSERT INTO event_data (hash, shared_data) + VALUES ($1, $2) + ON CONFLICT (hash) DO UPDATE SET shared_data = EXCLUDED.shared_data + RETURNING data_id + `, hash, rawData).Scan(&dataID) + + if err != nil { + return fmt.Errorf("failed to create event_data: %w", err) + } + + // Create the event + _, err = tx.ExecContext(txCtx, ` + INSERT INTO events ( + event_type_id, data_id, event_level, event_rule_id, + space_slug, entity_id, state_id, time_fired_ts + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + `, eventTypeID, dataID, event.EventLevel, event.EventRuleID, spaceSlug, event.EntityID, event.StateID, event.Timestamp) + + if err != nil { + return fmt.Errorf("failed to create event: %w", err) + } + + return nil + }) +} + diff --git a/internal/timescaledb/telemetry.go b/internal/timescaledb/telemetry.go index 8c85a54..06cdba0 100644 --- a/internal/timescaledb/telemetry.go +++ b/internal/timescaledb/telemetry.go @@ -25,23 +25,22 @@ func (c *Client) SaveTelemetryPayload(ctx context.Context, payload *models.Telem return fmt.Errorf("missing organization in telemetry payload") } - log.Printf("[Telemetry] SaveTelemetryPayload: org=%s, device_id=%s, entities=%d", org, payload.DeviceID, len(payload.Entities)) return c.WithOrgTx(ctx, org, func(txCtx context.Context, tx bob.Tx) error { - for _, ent := range payload.Entities { - if err := c.upsertTelemetryEntity(txCtx, tx, &ent, payload); err != nil { + for i := range payload.Entities { + stateID, err := c.upsertTelemetryEntity(txCtx, tx, &payload.Entities[i], payload) + if err != nil { log.Printf("[Telemetry] ERROR upserting entity: %v", err) return err } - log.Printf("[Telemetry] Entity upserted: org=%s, device_id=%s, entity_id=%s", org, payload.DeviceID, ent.UniqueID) + payload.Entities[i].StateID = &stateID } - log.Printf("[Telemetry] Successfully saved payload: org=%s, device_id=%s", org, payload.DeviceID) return nil }) } -func (c *Client) upsertTelemetryEntity(ctx context.Context, tx bob.Tx, ent *models.TelemetryEntity, payload *models.TelemetryPayload) error { +func (c *Client) upsertTelemetryEntity(ctx context.Context, tx bob.Tx, ent *models.TelemetryEntity, payload *models.TelemetryPayload) (string, error) { if ent == nil { - return fmt.Errorf("nil telemetry entity") + return "", fmt.Errorf("nil telemetry entity") } displayType := ent.DisplayType @@ -65,7 +64,7 @@ func (c *Client) upsertTelemetryEntity(ctx context.Context, tx bob.Tx, ent *mode ent.EntityType, entityTypeKey, ).Scan(&entityTypeID); err != nil { - return fmt.Errorf("upsert entity_type '%s': %w", entityTypeKey, err) + return "", fmt.Errorf("upsert entity_type '%s': %w", entityTypeKey, err) } // Prepare optional device_id. @@ -104,7 +103,7 @@ func (c *Client) upsertTelemetryEntity(ctx context.Context, tx bob.Tx, ent *mode ent.UnitOfMeas, pq.Array(displayType), ).Scan(&entityID); err != nil { - return fmt.Errorf("upsert entity '%s': %w", ent.UniqueID, err) + return "", fmt.Errorf("upsert entity '%s': %w", ent.UniqueID, err) } // Handle attributes: deduplicate by hash to reuse existing row. @@ -112,7 +111,7 @@ func (c *Client) upsertTelemetryEntity(ctx context.Context, tx bob.Tx, ent *mode if len(ent.Attributes) > 0 { rawAttrs, err := json.Marshal(ent.Attributes) if err != nil { - return fmt.Errorf("marshal attributes for '%s': %w", ent.UniqueID, err) + return "", fmt.Errorf("marshal attributes for '%s': %w", ent.UniqueID, err) } hash := int64(crc32.ChecksumIEEE(rawAttrs)) @@ -125,7 +124,7 @@ func (c *Client) upsertTelemetryEntity(ctx context.Context, tx bob.Tx, ent *mode hash, rawAttrs, ).Scan(&attrsID); err != nil { - return fmt.Errorf("upsert attributes for '%s': %w", ent.UniqueID, err) + return "", fmt.Errorf("upsert attributes for '%s': %w", ent.UniqueID, err) } } @@ -184,10 +183,10 @@ func (c *Client) upsertTelemetryEntity(ctx context.Context, tx bob.Tx, ent *mode changedAt, ) if err != nil { - return fmt.Errorf("insert entity_state for '%s': %w", ent.UniqueID, err) + return "", fmt.Errorf("insert entity_state for '%s': %w", ent.UniqueID, err) } - return nil + return stateID.String(), nil } func parseRFC3339(ts string) time.Time { diff --git a/pkgs/db/migrations/20251225000000_create_events_schema.sql b/pkgs/db/migrations/20251225000000_create_events_schema.sql new file mode 100644 index 0000000..7b0cdfc --- /dev/null +++ b/pkgs/db/migrations/20251225000000_create_events_schema.sql @@ -0,0 +1,94 @@ +-- migrate:up +-- Events Schema +CREATE TABLE IF NOT EXISTS event_types ( + event_type_id SERIAL PRIMARY KEY, + event_type TEXT NOT NULL UNIQUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +-- Insert common event types +INSERT INTO event_types (event_type) VALUES + ('state_changed'), + ('automation_triggered'), + ('device_event'), + ('user_action') +ON CONFLICT (event_type) DO NOTHING; + +-- Event Data: Shared data deduplicated by hash +CREATE TABLE IF NOT EXISTS event_data ( + data_id SERIAL PRIMARY KEY, + hash BIGINT NOT NULL UNIQUE, + shared_data JSONB NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_event_data_hash ON event_data (hash); + +-- Event Rules: Rules for triggering events based on conditions +CREATE TABLE IF NOT EXISTS event_rules ( + event_rule_id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + device_id UUID, -- Device-specific automation rules + rule_key TEXT NOT NULL, -- e.g., 'battery_low', 'temperature_low' + operator VARCHAR(16) CHECK (operator IN ('eq', 'ne', 'gt', 'lt', 'gte', 'lte', 'contains')), + operand TEXT NOT NULL, + status VARCHAR(16) CHECK (status IN ('active', 'inactive', 'paused')) DEFAULT 'active', + is_active BOOLEAN DEFAULT true, + start_time TIMESTAMPTZ, + end_time TIMESTAMPTZ, + allow_new_event BOOLEAN DEFAULT true, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +-- Indexes for event_rules +CREATE INDEX IF NOT EXISTS idx_event_rules_device_id ON event_rules (device_id); +CREATE INDEX IF NOT EXISTS idx_event_rules_status ON event_rules (status); +CREATE INDEX IF NOT EXISTS idx_event_rules_is_active ON event_rules (is_active); +-- Composite index for active device rules query +CREATE INDEX IF NOT EXISTS idx_event_rules_active_device ON event_rules (is_active, device_id, created_at DESC) +WHERE is_active = true; +CREATE INDEX IF NOT EXISTS idx_event_rules_time_range ON event_rules (start_time, end_time) +WHERE is_active = true; + +-- Events: Event occurrences linking to event_type and event_data +-- space_slug is stored here for filtering events within a space +-- entity_id links the event to a specific entity for faster queries +CREATE TABLE IF NOT EXISTS events ( + event_id BIGSERIAL PRIMARY KEY, + event_type_id INTEGER NOT NULL REFERENCES event_types(event_type_id) ON DELETE CASCADE, + data_id INTEGER REFERENCES event_data(data_id) ON DELETE SET NULL, + event_level TEXT CHECK (event_level IN ('manufacturer', 'system', 'automation')), + event_rule_id UUID REFERENCES event_rules(event_rule_id) ON DELETE SET NULL, + space_slug TEXT, + entity_id TEXT, + state_id UUID REFERENCES entity_states(id) ON DELETE SET NULL, + trigger_id UUID, -- for future automation scale + time_fired_ts BIGINT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_events_event_type_id ON events (event_type_id); +CREATE INDEX IF NOT EXISTS idx_events_event_rule_id ON events (event_rule_id); +CREATE INDEX IF NOT EXISTS idx_events_space_slug ON events (space_slug); +CREATE INDEX IF NOT EXISTS idx_events_entity_id ON events (entity_id); +CREATE INDEX IF NOT EXISTS idx_events_state_id ON events (state_id); +CREATE INDEX IF NOT EXISTS idx_events_trigger_id ON events (trigger_id); +CREATE INDEX IF NOT EXISTS idx_events_time_fired_ts ON events (time_fired_ts DESC); +CREATE INDEX IF NOT EXISTS idx_events_data_id ON events (data_id); + +-- Add event_id column to entity_states to create bidirectional linkage +-- This allows states to reference their triggering event +ALTER TABLE entity_states ADD COLUMN IF NOT EXISTS event_id BIGINT REFERENCES events(event_id) ON DELETE SET NULL; + +CREATE INDEX IF NOT EXISTS idx_entity_states_event_id ON entity_states (event_id); + +-- migrate:down + +-- Remove event_id column from entity_states +ALTER TABLE entity_states DROP COLUMN IF EXISTS event_id; + +-- Drop events schema tables (entity_states and entity_state_attributes remain) +DROP TABLE IF EXISTS events CASCADE; +DROP TABLE IF EXISTS event_rules CASCADE; +DROP TABLE IF EXISTS event_data CASCADE; +DROP TABLE IF EXISTS event_types CASCADE;