diff --git a/README.md b/README.md index bab4a4c..9033dcf 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,6 @@ Environment variable | Default | Description ```python from asyncio import AbstractEventLoop -from typing import Tuple, Optional from libservice import start, Asset, CheckBase @@ -40,8 +39,8 @@ class MyCheck(CheckBase): key = 'my_check' @classmethod - async def run(cls, ts: float, asset: Asset) -> Tuple[ - Optional[dict], Optional[dict]]: + async def run(cls, ts: float, asset: Asset) -> tuple[ + dict | None, dict | None]: # Return with the state and optionally an error dict which can be # created using CheckException(my_error_message).to_dict(). # Alternatively, you can rase a CheckException. The return error is diff --git a/libservice/asset.py b/libservice/asset.py index b28df7c..676a50e 100644 --- a/libservice/asset.py +++ b/libservice/asset.py @@ -1,6 +1,5 @@ import base64 from asyncio import Lock -from typing import Optional from cryptography.fernet import Fernet from .ticonn import ticonn @@ -13,8 +12,8 @@ class Asset: asset_id: int check_id: int config: dict - key: Optional[bytes] - lock: Optional[Lock] + key: bytes | None + lock: Lock | None def __init__(self, container_id: int, asset_id: int, check_id: int, config: dict): diff --git a/libservice/check.py b/libservice/check.py index 835fe3d..339035a 100644 --- a/libservice/check.py +++ b/libservice/check.py @@ -1,10 +1,10 @@ import abc from .asset import Asset -from typing import Tuple, List, Optional class CheckBase(abc.ABC): key: str # Check key (must not be changed) + use_unchanged: bool = False def __init_subclass__(cls, **kwargs): if not hasattr(cls, 'key'): @@ -15,13 +15,14 @@ def __init_subclass__(cls, **kwargs): @classmethod @abc.abstractmethod - async def run(cls, ts: float, asset: Asset) -> Tuple[ - Optional[dict], Optional[dict]]: + async def run(cls, ts: float, asset: Asset) -> tuple[ + dict | None, dict | None]: ... class CheckBaseMulti(abc.ABC): key: str # Check key (must not be changed) + use_unchanged: bool = False def __init_subclass__(cls, **kwargs): if not hasattr(cls, 'key'): @@ -32,6 +33,6 @@ def __init_subclass__(cls, **kwargs): @classmethod @abc.abstractmethod - async def run(cls, ts: float, assets: List[Asset]) -> List[ - Tuple[Optional[dict], Optional[dict]]]: + async def run(cls, ts: float, assets: list[Asset]) -> list[ + tuple[dict | None, dict | None]]: ... diff --git a/libservice/hub/client.py b/libservice/hub/client.py index 0e965b6..3c972e4 100644 --- a/libservice/hub/client.py +++ b/libservice/hub/client.py @@ -1,7 +1,7 @@ import asyncio import logging import random -from typing import Optional, Awaitable, Tuple +from typing import Awaitable from .net.package import Package from .protocol import ApiProtocol from ..loop import loop @@ -26,7 +26,7 @@ def close(self): def is_connected(self) -> bool: return self._protocol is not None and self._protocol.is_connected() - def connect_pool(self, pool: list) -> Optional[asyncio.Future]: + def connect_pool(self, pool: list) -> asyncio.Future | None: assert self.is_connected() is False assert self._reconnecting is False assert len(pool), 'pool must contain at least one node' @@ -44,7 +44,7 @@ def connect(self, host: str, port: int = 8700, self._pool_idx = 0 return self._connect(timeout=timeout) - def reconnect(self) -> Optional[asyncio.Future]: + def reconnect(self) -> asyncio.Future | None: if self._reconnecting: return None self._reconnecting = True @@ -128,7 +128,7 @@ async def _write(self, pkg): assert self._protocol is not None return await self._protocol.request(pkg, timeout=10) - def send_check_data(self, path: Tuple[int, int], + def send_check_data(self, path: tuple[int, int], check_data: dict) -> Awaitable: pkg = Package.make( ApiProtocol.PROTO_REQ_DATA, @@ -138,8 +138,8 @@ def send_check_data(self, path: Tuple[int, int], return self._request(pkg) def get_alerts_count(self, container_ids: list, - asset_ids: Optional[list] = None, - user_id: Optional[int] = None) -> Awaitable: + asset_ids: list[int] | None = None, + user_id: int | None = None) -> Awaitable: pkg = Package.make( ApiProtocol.PROTO_REQ_ALERTS_COUNT, data=[container_ids, asset_ids, user_id] diff --git a/libservice/hub/net/package.py b/libservice/hub/net/package.py index 26958ef..bd15060 100644 --- a/libservice/hub/net/package.py +++ b/libservice/hub/net/package.py @@ -2,7 +2,7 @@ import logging import msgpack import struct -from typing import Any, Optional +from typing import Any class Package(object): @@ -11,7 +11,7 @@ class Package(object): st_package = struct.Struct(' bool: return self.transport is not None def request(self, pkg: Package, - timeout: Union[None, float, int] = None + timeout: float | int | None = None ) -> asyncio.Future: assert self.transport is not None self._pid += 1 @@ -82,7 +81,7 @@ def data_received(self, data: bytes): def on_package_received(self, pkg: Package): raise NotImplementedError - async def _timer(self, pid: int, timeout: Union[float, int]): + async def _timer(self, pid: int, timeout: float | int): await asyncio.sleep(timeout) try: future, task = self._requests.pop(pid) @@ -93,7 +92,7 @@ async def _timer(self, pid: int, timeout: Union[float, int]): future.set_exception(TimeoutError( 'request timed out on package id {}'.format(pid))) - def _get_future(self, pkg: Package) -> Optional[asyncio.Future]: + def _get_future(self, pkg: Package) -> asyncio.Future | None: future, task = self._requests.pop(pkg.pid, (None, None)) if future is None: logging.error( diff --git a/libservice/hub/protocol.py b/libservice/hub/protocol.py index 4b42c18..e7a4504 100644 --- a/libservice/hub/protocol.py +++ b/libservice/hub/protocol.py @@ -1,5 +1,5 @@ import logging -from typing import Optional, Callable +from typing import Callable from .net.package import Package from .net.protocol import Protocol @@ -19,7 +19,7 @@ def __init__(self, connection_lost: Callable): super().__init__() self.set_connection_lost(connection_lost) - def connection_lost(self, exc: Optional[Exception]): + def connection_lost(self, exc: Exception | None): super().connection_lost(exc) self._connection_lost() diff --git a/libservice/serviceroom.py b/libservice/serviceroom.py index 15a30c5..8eec688 100644 --- a/libservice/serviceroom.py +++ b/libservice/serviceroom.py @@ -8,11 +8,12 @@ from collections import defaultdict from thingsdb.room import Room from thingsdb.room import event -from typing import List, Dict, Tuple, Callable, Union, Optional, Type +from typing import Callable from .hub import hub from .exceptions import CheckException, NoCountException from .asset import Asset from .check import CheckBase, CheckBaseMulti +from .utils import order THINGSDB_SCOPE = os.getenv('THINGSDB_SCOPE', '//data') @@ -26,20 +27,21 @@ class ServiceRoom(Room): def init(self, collector_key: str, - checks: Tuple[Union[Type[CheckBase], Type[CheckBaseMulti]], ...], + checks: tuple[type[CheckBase] | type[CheckBaseMulti], ...], on_log_level: Callable[[str], None], no_count: bool = False, max_timeout: float = 300.0): self.collector_key = collector_key self._checks = {check.key: check for check in checks} self._last = int(time.time())-1 - self._scheduled: Dict[Tuple[int, int], Dict[int, tuple]] = \ + self._scheduled: dict[tuple[int, int], dict[int, tuple]] = \ defaultdict(dict) self._on_log_level = on_log_level self._no_count = no_count self._max_timeout = max_timeout + self._prev_checks: dict[tuple[int, int], dict] = {} - def get_container_id(self, asset_id: int) -> Optional[int]: + def get_container_id(self, asset_id: int) -> int | None: """Returns a container Id for a given asset Id if the asset is scheduled; This can be used to check if an asset is scheduled as otherwise the return value or this function is None;""" @@ -105,8 +107,21 @@ async def _on_container(self, container_id: int, container: dict): for cid in children: await self._load(cid) - async def _send_to_hub(self, asset: Asset, result: Optional[dict], - error: Optional[dict], ts: float, no_count: bool): + def _unchanged(self, path: tuple, result: dict | None) -> bool: + if result is None: + self._prev_checks.pop(path, None) + return False + if self._prev_checks.get(path) == result: + return True + + order(result) + + self._prev_checks[path] = result + return False + + async def _send_to_hub(self, asset: Asset, result: dict | None, + error: dict | None, ts: float, no_count: bool, + use_unchanged: bool): if error: logging.error(f'Error: {error}; {asset}') @@ -120,6 +135,12 @@ async def _send_to_hub(self, asset: Asset, result: Optional[dict], 'no_count': no_count, } } + if use_unchanged and self._unchanged(path, result): + logging.debug(f'using unchanged; {asset}') + check_data['framework']['unchanged'] = True + else: + check_data['result'] = result + try: if DRY_RUN: output = json.dumps(check_data, indent=2) @@ -134,8 +155,8 @@ async def _send_to_hub(self, asset: Asset, result: Optional[dict], else: logging.debug(f'Successfully send data to hub; {asset}') - async def _run_multi(self, check: Type[CheckBaseMulti], - assets: List[Asset]): + async def _run_multi(self, check: type[CheckBaseMulti], + assets: list[Asset]): ts = time.time() try: results = await asyncio.wait_for( @@ -153,10 +174,11 @@ async def _run_multi(self, check: Type[CheckBaseMulti], results = [(None, error)] * len(assets) for asset, (result, error) in zip(assets, results): - await self._send_to_hub(asset, result, error, ts, self._no_count) + await self._send_to_hub(asset, result, error, ts, self._no_count, + check.use_unchanged) await asyncio.sleep(HUB_REQ_SLEEP) - async def _run(self, check: Type[CheckBase], asset: Asset): + async def _run(self, check: type[CheckBase], asset: Asset): ts = time.time() no_count = self._no_count timeout = min(0.8 * asset.get_interval(), self._max_timeout) @@ -174,7 +196,8 @@ async def _run(self, check: Type[CheckBase], asset: Asset): msg = str(e) or type(e).__name__ result, error = None, CheckException(msg).to_dict() - await self._send_to_hub(asset, result, error, ts, no_count) + await self._send_to_hub(asset, result, error, ts, no_count, + check.use_unchanged) async def run_loop(self): while True: @@ -205,7 +228,7 @@ def on_set_log_level(self, log_level: str): @event('upsert-asset') def on_upsert_asset(self, container_id: int, - service_data: Tuple[int, tuple]): + service_data: tuple[int, tuple]): logging.debug('on upsert asset') asset_id, services = service_data key = (container_id, asset_id) @@ -224,7 +247,7 @@ def on_upsert_asset(self, container_id: int, self._scheduled[key][check_id] = (check_key, config) @event('unset-assets') - def on_unset_assets(self, container_id: int, asset_ids: Tuple[int, ...]): + def on_unset_assets(self, container_id: int, asset_ids: tuple[int, ...]): logging.debug('on unset assets') for asset_id in asset_ids: key = (container_id, asset_id) diff --git a/libservice/start.py b/libservice/start.py index a142d97..0a87bec 100644 --- a/libservice/start.py +++ b/libservice/start.py @@ -3,7 +3,7 @@ import signal import os from asyncio import AbstractEventLoop -from typing import Tuple, Union, Optional, Callable, Type +from typing import Callable from .check import CheckBase, CheckBaseMulti from .serviceroom import service_room from .hub import hub @@ -65,9 +65,9 @@ def _stop(signame, *args): def start(collector_key: str, version: str, - checks: Tuple[Union[Type[CheckBase], Type[CheckBaseMulti]], ...], - start_func: Optional[Callable[[AbstractEventLoop], None]] = None, - close_func: Optional[Callable[[AbstractEventLoop], None]] = None, + checks: tuple[type[CheckBase] | type[CheckBaseMulti], ...], + start_func: Callable[[AbstractEventLoop], None] | None = None, + close_func: Callable[[AbstractEventLoop], None] | None = None, no_count: bool = False, max_timeout: float = 300.0): if THINGSDB_TOKEN is None: raise Exception('Missing `THINGSDB_TOKEN` environment variable') diff --git a/libservice/utils.py b/libservice/utils.py new file mode 100644 index 0000000..aa6b41c --- /dev/null +++ b/libservice/utils.py @@ -0,0 +1,8 @@ +def _item_name(item: dict) -> str: + return item['name'] + + +def order(result: dict): + """Order result items by item name.""" + for items in result.values(): + items.sort(key=_item_name) diff --git a/version.py b/version.py index 1f356cc..cd7ca49 100644 --- a/version.py +++ b/version.py @@ -1 +1 @@ -__version__ = '1.0.0' +__version__ = '1.0.1'