Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions pyasic/miners/backends/btminer.py
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,10 @@ async def upgrade_firmware(
str(DataOptions.FAN_PSU): DataFunction(
"_get_psu_fans", [RPCAPICommand("rpc_get_device_info", "get.device.info")]
),
str(DataOptions.ERRORS): DataFunction(
"_get_errors",
[RPCAPICommand("rpc_get_device_info", "get.device.info")],
),
str(DataOptions.HASHBOARDS): DataFunction(
"_get_hashboards",
[
Expand Down Expand Up @@ -1133,6 +1137,40 @@ async def _get_psu_fans(self, rpc_get_device_info: dict | None = None) -> list[F
rpm = rpc_get_device_info.get("msg", {}).get("power", {}).get("fanspeed")
return [Fan(speed=rpm)] if rpm is not None else []

async def _get_errors(
self, rpc_get_device_info: dict | None = None
) -> list[MinerErrorData]:
if rpc_get_device_info is None:
try:
rpc_get_device_info = await self.rpc.get_device_info()
except APIError:
return []

if rpc_get_device_info is None:
return []

raw_errors = rpc_get_device_info.get("msg", {}).get("error-code", [])
if not isinstance(raw_errors, list):
return []

parsed_codes: list[int] = []
for item in raw_errors:
if isinstance(item, dict):
for key in item.keys():
if str(key).lower() == "reason":
continue
try:
parsed_codes.append(int(key))
except (TypeError, ValueError):
continue
else:
try:
parsed_codes.append(int(item))
except (TypeError, ValueError):
continue

return [WhatsminerError(error_code=code) for code in sorted(set(parsed_codes))]

async def _get_serial_number(
self, rpc_get_device_info: dict | None = None
) -> str | None:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import unittest
from unittest.mock import AsyncMock

from pyasic.errors import APIError
from pyasic.miners.backends.btminer import BTMinerV3


class TestBTMinerV3ErrorMapping(unittest.IsolatedAsyncioTestCase):
def setUp(self):
self.miner = BTMinerV3(ip="10.1.10.24")

async def test_get_errors_maps_device_info_codes(self):
rpc_get_device_info = {
"msg": {
"error-code": [
"2010",
541,
{"541": "Slot 1 error reading chip id.", "reason": "hashboard"},
{"5032": "Slot 2 voltage abnormal"},
"not-a-code",
None,
]
}
}

errors = await self.miner._get_errors(rpc_get_device_info=rpc_get_device_info)
codes = [error.error_code for error in errors]

self.assertEqual(codes, [541, 2010, 5032])

async def test_get_errors_ignores_non_list_error_code_payload(self):
rpc_get_device_info = {"msg": {"error-code": "btminer process is down err"}}

errors = await self.miner._get_errors(rpc_get_device_info=rpc_get_device_info)

self.assertEqual(errors, [])

async def test_get_errors_returns_empty_on_rpc_api_error(self):
self.miner.rpc = AsyncMock()
self.miner.rpc.get_device_info = AsyncMock(side_effect=APIError("rpc failed"))

errors = await self.miner._get_errors()

self.assertEqual(errors, [])


if __name__ == "__main__":
unittest.main()