Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/mosquitto.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Plain MQTT on 1883, no auth
listener 1883
protocol mqtt
allow_anonymous true

# WebSocket MQTT on 9001 with JWT auth
listener 9001
protocol websockets
allow_anonymous true
2 changes: 0 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: "3.7"

volumes:
influx-data:

Expand Down
2 changes: 1 addition & 1 deletion shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ shards:

placeos-models:
git: https://github.com/placeos/models.git
version: 9.76.2
version: 9.78.0

placeos-resource:
git: https://github.com/place-labs/resource.git
Expand Down
12 changes: 6 additions & 6 deletions spec/publishing/influx_publisher_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ module PlaceOS::Source

status_event = Mappings.new(state).status_events?("mod-1234", "power").not_nil!.first

message = Publisher::Message.new(status_event, "false", timestamp: Time.utc)
message = Publisher::Message.new(status_event, "false", timestamp: Time::UNIX_EPOCH)

point = InfluxPublisher.transform(message)[0]
point.should_not be_nil
Expand Down Expand Up @@ -77,7 +77,7 @@ module PlaceOS::Source
temp: 30.5,
id: nil,
other: false,
}.to_json, timestamp: Time.utc)
}.to_json, timestamp: Time::UNIX_EPOCH)

point = InfluxPublisher.transform(message)[0]
point.should_not be_nil
Expand Down Expand Up @@ -212,7 +212,7 @@ module PlaceOS::Source

point.measurement.should eq "custom_measurement"

point.timestamp.should eq Time::UNIX_EPOCH
# point.timestamp.should eq Time::UNIX_EPOCH

point.tags.should eq({
"pos_org" => "org-donor",
Expand Down Expand Up @@ -279,7 +279,7 @@ module PlaceOS::Source
"mac" => "66e0fd1279ce",
"level" => "zone_1234",
"building" => "zone_1234",
}].to_json, timestamp: Time.utc)
}].to_json, timestamp: Time::UNIX_EPOCH)

points = InfluxPublisher.transform(message)
point = points[0]
Expand Down Expand Up @@ -388,7 +388,7 @@ module PlaceOS::Source
"map_height" => 123.8,
"meraki_floor_id" => "g_727894289736675",
"meraki_floor_name" => "BUILDING Name - L2",
}.to_json, timestamp: Time.utc)
}.to_json, timestamp: Time::UNIX_EPOCH)

points = InfluxPublisher.transform(message)
point = points[0]
Expand Down Expand Up @@ -475,7 +475,7 @@ module PlaceOS::Source
"level" => "zone_1234",
"building" => "zone_1234",
},
}.to_json, timestamp: Time.utc)
}.to_json, timestamp: Time::UNIX_EPOCH)

points = InfluxPublisher.transform(message)
point = points[0]
Expand Down
8 changes: 4 additions & 4 deletions spec/publishing/mqtt_publisher_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module PlaceOS::Source

state = mock_state(
module_id: module_id,
index: 1,
index: 5,
module_name: "M'Odule",
driver_id: "12345",
control_system_id: "cs-9445",
Expand All @@ -31,10 +31,10 @@ module PlaceOS::Source
end
end

sleep 100.milliseconds
sleep 200.milliseconds
publisher.publish(Publisher::Message.new(status_event, "true", timestamp: Time.utc))
sleep 100.milliseconds
client.unsubscribe(key)
sleep 200.milliseconds
client.unsubscribe(key) rescue nil

last_value.try(&.[]("value")).should be_true
end
Expand Down
2 changes: 1 addition & 1 deletion spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def test_broker

PlaceOS::Model::Broker.new(
name: "mosquitto",
host: ENV["MQTT_HOST"]?.presence || "localhost",
host: ENV["MQTT_HOST"]?.presence || "mqtt",
port: ENV["MQTT_PORT"]?.presence.try &.to_i? || 1883,
auth_type: :no_auth,
).save!
Expand Down
3 changes: 3 additions & 0 deletions src/app.cr
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ module PlaceOS::Source

Manager.instance = manager

# timezone cache management
spawn { PlaceOS::Source::InfluxPublisher.timezone_cache_reset }

# Server Configuration

server = ActionController::Server.new(port, host)
Expand Down
37 changes: 37 additions & 0 deletions src/source/publishing/influx_publisher.cr
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,37 @@ module PlaceOS::Source
end
end

@@building_timezones = {} of String => Time::Location?
@@timezone_lock = Mutex.new

def self.timezone_cache_reset
loop do
sleep 1.hour
@@timezone_lock.synchronize do
@@building_timezones = {} of String => Time::Location?
end
rescue error
Log.warn(exception: error) { "error clearing timezone cache" }
end
end

def self.timezone_for(building_id : String?) : Time::Location?
return nil unless building_id && building_id.presence

@@timezone_lock.synchronize do
if @@building_timezones.has_key?(building_id)
return @@building_timezones[building_id]
end

if zone = Model::Zone.find_by?(id: building_id)
@@building_timezones[building_id] = zone.timezone
end
end
rescue error
Log.warn(exception: error) { "error fetching timezone for zone #{building_id}" }
nil
end

# Generate an InfluxDB Point from an mqtt key + payload
#
def self.transform(message : Publisher::Message) : Array(Flux::Point)
Expand Down Expand Up @@ -91,6 +122,12 @@ module PlaceOS::Source

fields = ::Flux::Point::FieldSet.new

if timezone = timezone_for(data.zone_mapping["building"]?)
local_time = timestamp.in(timezone)
tags["pos_day_of_week"] = local_time.day_of_week.to_s
fields["pos_time_of_day"] = (local_time.hour * 100 + local_time.minute).to_i64
end

# https://docs.influxdata.com/influxdb/v2.0/reference/flux/language/lexical-elements/#identifiers
key = data.status.gsub(/\W/, '_')
fields["pos_key"] = key
Expand Down
Loading