Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 34 additions & 8 deletions powermeter/tq_em.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,26 @@ class TQEnergyManager(Powermeter):
"""Powermeter using the TQ Energy Manager JSON API."""

# OBIS codes
_TOTAL_KEY = "1-0:1.4.0*255" # Σ active power
_TOTAL_TO_GRID = 0
_TOTAL_FROM_GRID = 1
_TOTAL_KEYS = (
"1-0:1.4.0*255", # Σ active power (from grid)
"1-0:2.4.0*255", # Σ active power (to grid)
)

_TOTAL_TO_GRID_L1 = 0
_TOTAL_FROM_GRID_L1 = 1
_TOTAL_TO_GRID_L2 = 2
_TOTAL_FROM_GRID_L2 = 3
_TOTAL_TO_GRID_L3 = 4
_TOTAL_FROM_GRID_L3 = 5
_PHASE_KEYS = (
"1-0:21.4.0*255", # L1
"1-0:41.4.0*255", # L2
"1-0:61.4.0*255", # L3
"1-0:21.4.0*255", # L1 active power (from grid)
"1-0:22.4.0*255", # L1 active power (to grid)
"1-0:41.4.0*255", # L2 active power (from grid)
"1-0:42.4.0*255", # L2 active power (to grid)
"1-0:61.4.0*255", # L3 active power (from grid)
"1-0:62.4.0*255", # L3 active power (to grid)
)

_MAX_IDLE = 60 * 30 # 30 min
Expand All @@ -36,10 +51,21 @@ def get_powermeter_watts(self) -> List[float]:
self._login()
data = self._read_live_json()

if all(k in data for k in self._PHASE_KEYS):
return [float(data[k]) for k in self._PHASE_KEYS]
if self._TOTAL_KEY in data:
return [float(data[self._TOTAL_KEY])]
if any(k in data for k in self._PHASE_KEYS):
return [
float(data.get(self._PHASE_KEYS[self._TOTAL_TO_GRID_L1], 0))
- float(data.get(self._PHASE_KEYS[self._TOTAL_FROM_GRID_L1], 0)),
float(data.get(self._PHASE_KEYS[self._TOTAL_TO_GRID_L2], 0))
- float(data.get(self._PHASE_KEYS[self._TOTAL_FROM_GRID_L2], 0)),
float(data.get(self._PHASE_KEYS[self._TOTAL_TO_GRID_L3], 0))
- float(data.get(self._PHASE_KEYS[self._TOTAL_FROM_GRID_L3], 0)),
]

if any(k in data for k in self._TOTAL_KEYS):
return [
float(data.get(self._TOTAL_KEYS[self._TOTAL_TO_GRID], 0))
- float(data.get(self._TOTAL_KEYS[self._TOTAL_FROM_GRID], 0))
]

raise RuntimeError("Required OBIS values missing in payload")

Expand Down
69 changes: 63 additions & 6 deletions powermeter/tq_em_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@ def test_three_phase(self, mock_get, mock_post):
# login GET
mock_get.side_effect = [
MagicMock(
status_code=200, json=lambda: {"serial": "123", "authentication": False}
status_code=200,
json=lambda: {"serial": "123", "authentication": False},
),
MagicMock(
status_code=200,
json=lambda: {
"1-0:21.4.0*255": 1,
"1-0:22.4.0*255": 0,
"1-0:41.4.0*255": 2,
"1-0:42.4.0*255": 0,
"1-0:61.4.0*255": 3,
"1-0:62.4.0*255": 0,
},
),
]
Expand All @@ -34,9 +38,13 @@ def test_three_phase(self, mock_get, mock_post):
def test_total_only(self, mock_get, mock_post):
mock_get.side_effect = [
MagicMock(
status_code=200, json=lambda: {"serial": "321", "authentication": False}
status_code=200,
json=lambda: {"serial": "321", "authentication": False},
),
MagicMock(
status_code=200,
json=lambda: {"1-0:1.4.0*255": 9, "1-0:2.4.0*255": 0},
),
MagicMock(status_code=200, json=lambda: {"1-0:1.4.0*255": 9}),
]
mock_post.return_value = MagicMock(
status_code=200, json=lambda: {"authentication": True}
Expand All @@ -50,13 +58,18 @@ def test_total_only(self, mock_get, mock_post):
def test_relogin_on_expired_session(self, mock_get, mock_post):
mock_get.side_effect = [
MagicMock(
status_code=200, json=lambda: {"serial": "123", "authentication": False}
status_code=200,
json=lambda: {"serial": "123", "authentication": False},
),
MagicMock(status_code=200, json=lambda: {"status": 901}),
MagicMock(
status_code=200, json=lambda: {"serial": "123", "authentication": False}
status_code=200,
json=lambda: {"serial": "123", "authentication": False},
),
MagicMock(
status_code=200,
json=lambda: {"1-0:1.4.0*255": 5, "1-0:2.4.0*255": 0},
),
MagicMock(status_code=200, json=lambda: {"1-0:1.4.0*255": 5}),
]
mock_post.side_effect = [
MagicMock(status_code=200, json=lambda: {"authentication": True}),
Expand All @@ -66,6 +79,50 @@ def test_relogin_on_expired_session(self, mock_get, mock_post):
meter = TQEnergyManager("192.168.0.11")
self.assertEqual(meter.get_powermeter_watts(), [5.0])

@patch("requests.Session.post")
@patch("requests.Session.get")
def test_missing_export(self, mock_get, mock_post):
mock_get.side_effect = [
MagicMock(
status_code=200,
json=lambda: {"serial": "111", "authentication": False},
),
MagicMock(
status_code=200,
json=lambda: {"1-0:1.4.0*255": 4},
),
]
mock_post.return_value = MagicMock(
status_code=200, json=lambda: {"authentication": True}
)

meter = TQEnergyManager("192.168.0.15")
self.assertEqual(meter.get_powermeter_watts(), [4.0])

@patch("requests.Session.post")
@patch("requests.Session.get")
def test_three_phase_missing_export(self, mock_get, mock_post):
mock_get.side_effect = [
MagicMock(
status_code=200,
json=lambda: {"serial": "777", "authentication": False},
),
MagicMock(
status_code=200,
json=lambda: {
"1-0:21.4.0*255": 1,
"1-0:41.4.0*255": 2,
"1-0:61.4.0*255": 3,
},
),
]
mock_post.return_value = MagicMock(
status_code=200, json=lambda: {"authentication": True}
)

meter = TQEnergyManager("192.168.0.16")
self.assertEqual(meter.get_powermeter_watts(), [1.0, 2.0, 3.0])


if __name__ == "__main__":
unittest.main()