Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ Environment variable | Default | Description

```python
from asyncio import AbstractEventLoop
from typing import Tuple, Optional
from libservice import start, Asset, CheckBase


Expand All @@ -40,8 +39,8 @@ class MyCheck(CheckBase):
key = 'my_check'

@classmethod
async def run(cls, ts: float, asset: Asset) -> Tuple[
Optional[dict], Optional[dict]]:
async def run(cls, ts: float, asset: Asset) -> tuple[
dict | None, dict | None]:
# Return with the state and optionally an error dict which can be
# created using CheckException(my_error_message).to_dict().
# Alternatively, you can rase a CheckException. The return error is
Expand Down
5 changes: 2 additions & 3 deletions libservice/asset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import base64
from asyncio import Lock
from typing import Optional
from cryptography.fernet import Fernet
from .ticonn import ticonn

Expand All @@ -13,8 +12,8 @@ class Asset:
asset_id: int
check_id: int
config: dict
key: Optional[bytes]
lock: Optional[Lock]
key: bytes | None
lock: Lock | None

def __init__(self, container_id: int, asset_id: int, check_id: int,
config: dict):
Expand Down
11 changes: 6 additions & 5 deletions libservice/check.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import abc
from .asset import Asset
from typing import Tuple, List, Optional


class CheckBase(abc.ABC):
key: str # Check key (must not be changed)
use_unchanged: bool = False

def __init_subclass__(cls, **kwargs):
if not hasattr(cls, 'key'):
Expand All @@ -15,13 +15,14 @@ def __init_subclass__(cls, **kwargs):

@classmethod
@abc.abstractmethod
async def run(cls, ts: float, asset: Asset) -> Tuple[
Optional[dict], Optional[dict]]:
async def run(cls, ts: float, asset: Asset) -> tuple[
dict | None, dict | None]:
...


class CheckBaseMulti(abc.ABC):
key: str # Check key (must not be changed)
use_unchanged: bool = False

def __init_subclass__(cls, **kwargs):
if not hasattr(cls, 'key'):
Expand All @@ -32,6 +33,6 @@ def __init_subclass__(cls, **kwargs):

@classmethod
@abc.abstractmethod
async def run(cls, ts: float, assets: List[Asset]) -> List[
Tuple[Optional[dict], Optional[dict]]]:
async def run(cls, ts: float, assets: list[Asset]) -> list[
tuple[dict | None, dict | None]]:
...
12 changes: 6 additions & 6 deletions libservice/hub/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import logging
import random
from typing import Optional, Awaitable, Tuple
from typing import Awaitable
from .net.package import Package
from .protocol import ApiProtocol
from ..loop import loop
Expand All @@ -26,7 +26,7 @@ def close(self):
def is_connected(self) -> bool:
return self._protocol is not None and self._protocol.is_connected()

def connect_pool(self, pool: list) -> Optional[asyncio.Future]:
def connect_pool(self, pool: list) -> asyncio.Future | None:
assert self.is_connected() is False
assert self._reconnecting is False
assert len(pool), 'pool must contain at least one node'
Expand All @@ -44,7 +44,7 @@ def connect(self, host: str, port: int = 8700,
self._pool_idx = 0
return self._connect(timeout=timeout)

def reconnect(self) -> Optional[asyncio.Future]:
def reconnect(self) -> asyncio.Future | None:
if self._reconnecting:
return None
self._reconnecting = True
Expand Down Expand Up @@ -128,7 +128,7 @@ async def _write(self, pkg):
assert self._protocol is not None
return await self._protocol.request(pkg, timeout=10)

def send_check_data(self, path: Tuple[int, int],
def send_check_data(self, path: tuple[int, int],
check_data: dict) -> Awaitable:
pkg = Package.make(
ApiProtocol.PROTO_REQ_DATA,
Expand All @@ -138,8 +138,8 @@ def send_check_data(self, path: Tuple[int, int],
return self._request(pkg)

def get_alerts_count(self, container_ids: list,
asset_ids: Optional[list] = None,
user_id: Optional[int] = None) -> Awaitable:
asset_ids: list[int] | None = None,
user_id: int | None = None) -> Awaitable:
pkg = Package.make(
ApiProtocol.PROTO_REQ_ALERTS_COUNT,
data=[container_ids, asset_ids, user_id]
Expand Down
6 changes: 3 additions & 3 deletions libservice/hub/net/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import msgpack
import struct
from typing import Any, Optional
from typing import Any


class Package(object):
Expand All @@ -11,7 +11,7 @@ class Package(object):

st_package = struct.Struct('<QIHBB')

def __init__(self, barray: Optional[bytearray] = None):
def __init__(self, barray: bytearray | None = None):
if barray is None:
return

Expand All @@ -20,7 +20,7 @@ def __init__(self, barray: Optional[bytearray] = None):
if self.tp != checkbit ^ 255:
raise ValueError('invalid checkbit')
self.total = self.__class__.st_package.size + self.length
self.body: Optional[bytearray] = None
self.body: bytearray | None = None
self.data = None

@classmethod
Expand Down
17 changes: 8 additions & 9 deletions libservice/hub/net/protocol.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import logging
from typing import Union, Optional, Tuple, Dict
from .package import Package


Expand All @@ -11,11 +10,11 @@ class Protocol(asyncio.Protocol):
def __init__(self):
super().__init__()
self._buffered_data = bytearray()
self._package: Optional[Package] = None
self._requests: Dict[int, Tuple[asyncio.Future,
Optional[asyncio.Task]]] = dict()
self._package: Package | None = None
self._requests: dict[int, tuple[asyncio.Future,
asyncio.Task | None]] = dict()
self._pid = 0
self.transport: Optional[asyncio.Transport] = None
self.transport: asyncio.Transport | None = None

def connection_made(self, transport: asyncio.BaseTransport):
'''
Expand All @@ -24,7 +23,7 @@ def connection_made(self, transport: asyncio.BaseTransport):
assert isinstance(transport, asyncio.Transport)
self.transport = transport

def connection_lost(self, exc: Optional[Exception]):
def connection_lost(self, exc: Exception | None):
'''
override asyncio.Protocol
'''
Expand All @@ -36,7 +35,7 @@ def is_connected(self) -> bool:
return self.transport is not None

def request(self, pkg: Package,
timeout: Union[None, float, int] = None
timeout: float | int | None = None
) -> asyncio.Future:
assert self.transport is not None
self._pid += 1
Expand Down Expand Up @@ -82,7 +81,7 @@ def data_received(self, data: bytes):
def on_package_received(self, pkg: Package):
raise NotImplementedError

async def _timer(self, pid: int, timeout: Union[float, int]):
async def _timer(self, pid: int, timeout: float | int):
await asyncio.sleep(timeout)
try:
future, task = self._requests.pop(pid)
Expand All @@ -93,7 +92,7 @@ async def _timer(self, pid: int, timeout: Union[float, int]):
future.set_exception(TimeoutError(
'request timed out on package id {}'.format(pid)))

def _get_future(self, pkg: Package) -> Optional[asyncio.Future]:
def _get_future(self, pkg: Package) -> asyncio.Future | None:
future, task = self._requests.pop(pkg.pid, (None, None))
if future is None:
logging.error(
Expand Down
4 changes: 2 additions & 2 deletions libservice/hub/protocol.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Optional, Callable
from typing import Callable
from .net.package import Package
from .net.protocol import Protocol

Expand All @@ -19,7 +19,7 @@ def __init__(self, connection_lost: Callable):
super().__init__()
self.set_connection_lost(connection_lost)

def connection_lost(self, exc: Optional[Exception]):
def connection_lost(self, exc: Exception | None):
super().connection_lost(exc)
self._connection_lost()

Expand Down
49 changes: 36 additions & 13 deletions libservice/serviceroom.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
from collections import defaultdict
from thingsdb.room import Room
from thingsdb.room import event
from typing import List, Dict, Tuple, Callable, Union, Optional, Type
from typing import Callable
from .hub import hub
from .exceptions import CheckException, NoCountException
from .asset import Asset
from .check import CheckBase, CheckBaseMulti
from .utils import order


THINGSDB_SCOPE = os.getenv('THINGSDB_SCOPE', '//data')
Expand All @@ -26,20 +27,21 @@ class ServiceRoom(Room):

def init(self,
collector_key: str,
checks: Tuple[Union[Type[CheckBase], Type[CheckBaseMulti]], ...],
checks: tuple[type[CheckBase] | type[CheckBaseMulti], ...],
on_log_level: Callable[[str], None],
no_count: bool = False,
max_timeout: float = 300.0):
self.collector_key = collector_key
self._checks = {check.key: check for check in checks}
self._last = int(time.time())-1
self._scheduled: Dict[Tuple[int, int], Dict[int, tuple]] = \
self._scheduled: dict[tuple[int, int], dict[int, tuple]] = \
defaultdict(dict)
self._on_log_level = on_log_level
self._no_count = no_count
self._max_timeout = max_timeout
self._prev_checks: dict[tuple[int, int], dict] = {}

def get_container_id(self, asset_id: int) -> Optional[int]:
def get_container_id(self, asset_id: int) -> int | None:
"""Returns a container Id for a given asset Id if the asset is
scheduled; This can be used to check if an asset is scheduled as
otherwise the return value or this function is None;"""
Expand Down Expand Up @@ -105,8 +107,21 @@ async def _on_container(self, container_id: int, container: dict):
for cid in children:
await self._load(cid)

async def _send_to_hub(self, asset: Asset, result: Optional[dict],
error: Optional[dict], ts: float, no_count: bool):
def _unchanged(self, path: tuple, result: dict | None) -> bool:
if result is None:
self._prev_checks.pop(path, None)
return False
if self._prev_checks.get(path) == result:
return True

order(result)

self._prev_checks[path] = result
return False

async def _send_to_hub(self, asset: Asset, result: dict | None,
error: dict | None, ts: float, no_count: bool,
use_unchanged: bool):
if error:
logging.error(f'Error: {error}; {asset}')

Expand All @@ -120,6 +135,12 @@ async def _send_to_hub(self, asset: Asset, result: Optional[dict],
'no_count': no_count,
}
}
if use_unchanged and self._unchanged(path, result):
logging.debug(f'using unchanged; {asset}')
check_data['framework']['unchanged'] = True
else:
check_data['result'] = result

try:
if DRY_RUN:
output = json.dumps(check_data, indent=2)
Expand All @@ -134,8 +155,8 @@ async def _send_to_hub(self, asset: Asset, result: Optional[dict],
else:
logging.debug(f'Successfully send data to hub; {asset}')

async def _run_multi(self, check: Type[CheckBaseMulti],
assets: List[Asset]):
async def _run_multi(self, check: type[CheckBaseMulti],
assets: list[Asset]):
ts = time.time()
try:
results = await asyncio.wait_for(
Expand All @@ -153,10 +174,11 @@ async def _run_multi(self, check: Type[CheckBaseMulti],
results = [(None, error)] * len(assets)

for asset, (result, error) in zip(assets, results):
await self._send_to_hub(asset, result, error, ts, self._no_count)
await self._send_to_hub(asset, result, error, ts, self._no_count,
check.use_unchanged)
await asyncio.sleep(HUB_REQ_SLEEP)

async def _run(self, check: Type[CheckBase], asset: Asset):
async def _run(self, check: type[CheckBase], asset: Asset):
ts = time.time()
no_count = self._no_count
timeout = min(0.8 * asset.get_interval(), self._max_timeout)
Expand All @@ -174,7 +196,8 @@ async def _run(self, check: Type[CheckBase], asset: Asset):
msg = str(e) or type(e).__name__
result, error = None, CheckException(msg).to_dict()

await self._send_to_hub(asset, result, error, ts, no_count)
await self._send_to_hub(asset, result, error, ts, no_count,
check.use_unchanged)

async def run_loop(self):
while True:
Expand Down Expand Up @@ -205,7 +228,7 @@ def on_set_log_level(self, log_level: str):

@event('upsert-asset')
def on_upsert_asset(self, container_id: int,
service_data: Tuple[int, tuple]):
service_data: tuple[int, tuple]):
logging.debug('on upsert asset')
asset_id, services = service_data
key = (container_id, asset_id)
Expand All @@ -224,7 +247,7 @@ def on_upsert_asset(self, container_id: int,
self._scheduled[key][check_id] = (check_key, config)

@event('unset-assets')
def on_unset_assets(self, container_id: int, asset_ids: Tuple[int, ...]):
def on_unset_assets(self, container_id: int, asset_ids: tuple[int, ...]):
logging.debug('on unset assets')
for asset_id in asset_ids:
key = (container_id, asset_id)
Expand Down
8 changes: 4 additions & 4 deletions libservice/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import signal
import os
from asyncio import AbstractEventLoop
from typing import Tuple, Union, Optional, Callable, Type
from typing import Callable
from .check import CheckBase, CheckBaseMulti
from .serviceroom import service_room
from .hub import hub
Expand Down Expand Up @@ -65,9 +65,9 @@ def _stop(signame, *args):


def start(collector_key: str, version: str,
checks: Tuple[Union[Type[CheckBase], Type[CheckBaseMulti]], ...],
start_func: Optional[Callable[[AbstractEventLoop], None]] = None,
close_func: Optional[Callable[[AbstractEventLoop], None]] = None,
checks: tuple[type[CheckBase] | type[CheckBaseMulti], ...],
start_func: Callable[[AbstractEventLoop], None] | None = None,
close_func: Callable[[AbstractEventLoop], None] | None = None,
no_count: bool = False, max_timeout: float = 300.0):
if THINGSDB_TOKEN is None:
raise Exception('Missing `THINGSDB_TOKEN` environment variable')
Expand Down
8 changes: 8 additions & 0 deletions libservice/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
def _item_name(item: dict) -> str:
return item['name']


def order(result: dict):
"""Order result items by item name."""
for items in result.values():
items.sort(key=_item_name)
2 changes: 1 addition & 1 deletion version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.0.0'
__version__ = '1.0.1'