diff --git a/cmd/uuid_counters.go b/cmd/uuid_counters.go index 038631e..0ae397c 100644 --- a/cmd/uuid_counters.go +++ b/cmd/uuid_counters.go @@ -128,12 +128,23 @@ func UUIDCountersPipeline(config *AppConfig) { continue } + parsed := make(map[string]interface{}) + if err := json.Unmarshal(event.Value, &parsed); err != nil { + log.Warn("Cannot parse JSON message: " + err.Error()) + continue + } + + sensorUUID, _ := parsed["sensor_uuid"].(string) + sensorName, _ := parsed["sensor_name"].(string) + pipeline.Produce(event.Value, map[string]interface{}{ // NOTE batch_group and uuid should be the same to ensure that // messages from the same organization are grouped together when // they are counted. "uuid": org, "batch_group": org, + "sensor_uuid": sensorUUID, + "sensor_name": sensorName, }, nil) default: diff --git a/counter/counter.go b/counter/counter.go index 8f00830..3c0be82 100644 --- a/counter/counter.go +++ b/counter/counter.go @@ -31,6 +31,8 @@ type Monitor struct { Unit string `json:"unit"` Value uint64 `json:"value"` UUID string `json:"organization_uuid"` + SensorUUID string `json:"sensor_uuid,omitempty"` + SensorName string `json:"sensor_name,omitempty"` Timestamp int64 `json:"timestamp"` } @@ -76,10 +78,16 @@ func (c *Counter) OnMessage(m *utils.Message, done utils.Done) { Timestamp: time.Now().Unix(), } - if uuid, ok := m.Opts.Get("uuid"); ok { - if uuid, ok := uuid.(string); ok { - countData.UUID = uuid - } + if v, ok := m.Opts.Get("uuid"); ok { + if s, ok := v.(string); ok { countData.UUID = s } + } + + // sensor_uuid / sensor_name (must be added to m.Opts in uuid_counters.go) + if v, ok := m.Opts.Get("sensor_uuid"); ok { + if s, ok := v.(string); ok && s != "" { countData.SensorUUID = s } + } + if v, ok := m.Opts.Get("sensor_name"); ok { + if s, ok := v.(string); ok && s != "" { countData.SensorName = s } } countMessage, _ := json.Marshal(countData)