diff --git a/README.md b/README.md index 8e65d21..de7ac3b 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ import logging from libprobe import logger from libprobe.asset import Asset from libprobe.probe import Probe +from libprobe.check import Check from libprobe.severity import Severity from libprobe.exceptions import ( CheckException, @@ -42,71 +43,84 @@ from libprobe.exceptions import ( __version__ = "0.1.0" -async def my_first_check(asset: Asset, asset_config: dict, check_config: dict): - """My first check. - Arguments: - asset: Asset contains an id, name and check which should be used - for logging; - asset_config: local configuration for this asset, for example credentials; - check_config: configuration for this check; contains for example the - interval at which the check is running and an address of - the asset to probe; - """ - if "ignore_this_check_iteration": - # nothing will be send to InfraSonar for this check iteration; - raise IgnoreResultException() - - if "no_longer_try_this_check": - # nothing will be send to InfraSonar for this check iteration and the - # check will not start again until the probe restarts or configuration - # has been changed; - raise IgnoreCheckException() - - if "something_has_happened": - # send a check error to InfraSonar because something has happened which - # prevents us from building a check result; The default severity for a - # CheckException is MEDIUM but this can be overwritten; - raise CheckException("something went wrong", severity=Severity.LOW) - - if "something_unexpected_has_happened": - # other exceptions will be converted to CheckException, MEDIUM severity - raise Exception("something went wrong") - - # A check result may have multiple types, items, and/or metrics - result = {"myType": [{"name": "my item"}]} - - if "result_is_incomplete": - # optionally, IncompleteResultException can be given another severity; - # the default severity is LOW. - raise IncompleteResultException('missing type x', result) - - if "not_count_as_check_result": - # optionally, NoCountException can be raised in which case the check - # result is not counted by InfraSonar; Thus, the last seen services - # will not "see" this check result. - # A severity can be given if we also want a check error; (similar to - # the IncompleteResultException exception) - raise NoCountException('do not count this check result', result) - - # Use the asset in logging; this will include asset info and the check key - logging.info(f"log something; {asset}") - - # In alpha versions and debug logging enabled, unknown exception will be - # logged when debug logging is enabled. You may use logger.exception() - # yourself if you want exception logging for debug logging only. - try: - 42 / 0 # ZeroDivision error for example - except Exception: - logger.exception() # log the exception only when DEBUG logging - - # Return the check result - return result +class MyFirstCheck(Check): + + key = 'myFirstCheck' + unchanged_eol = 0 # Can be for example 14400, to prevent sending the same + # check result for the next 4 hours (0=disabled) + + @staticmethod + async def run(asset: Asset, local_config: dict, config: dict) -> dict: + """My first check. + Arguments: + asset: + Asset contains an id, name and check which should be used + for logging; + local_config: + local configuration for this asset, for example + credentials; + config: + configuration for this check; contains for example the + interval at which the check is running and an address of + the asset to probe; + """ + if "ignore_this_check_iteration": + # nothing will be send to InfraSonar for this check iteration; + raise IgnoreResultException() + + if "no_longer_try_this_check": + # nothing will be send to InfraSonar for this check iteration and + # the check will not start again until the probe restarts or + # configuration has been changed; + raise IgnoreCheckException() + + if "something_has_happened": + # send a check error to InfraSonar because something has happened + # which prevents us from building a check result; The default + # severity for a CheckException is MEDIUM but this can be + # overwritten; + raise CheckException("something went wrong", severity=Severity.LOW) + + if "something_unexpected_has_happened": + # exceptions will be converted to CheckException, MEDIUM severity + raise Exception("something went wrong") + + # A check result may have multiple types, items, and/or metrics + result = {"myType": [{"name": "my item"}]} + + if "result_is_incomplete": + # optionally, IncompleteResultException with severity; + # the default severity is LOW. + raise IncompleteResultException('missing type x', result) + + if "not_count_as_check_result": + # optionally, NoCountException can be raised in which case the + # check result is not counted by InfraSonar; Thus, the last seen + # services will not "see" this check result. + # A severity can be given if we also want a check error; + # (similar to the IncompleteResultException exception) + raise NoCountException('do not count this check result', result) + + # Use the asset in logging; includes asset info and the check key + logging.info(f"log something; {asset}") + + # In alpha versions and debug logging enabled, unknown exception will + # be logged when debug logging is enabled. + # You may use logger.exception() yourself if you want exception + # logging for debug logging only. + try: + 42 / 0 # ZeroDivision error for example + except Exception: + logger.exception() # log the exception only when DEBUG logging + + # Return the check result + return result if __name__ == "__main__": - checks = { - "myFirstCheck": my_first_check, - } + checks = ( + MyFirstCheck + ) # Initialize the probe with a name, version and checks probe = Probe("myProbe", __version__, checks) diff --git a/libprobe/check.py b/libprobe/check.py new file mode 100644 index 0000000..8c788d5 --- /dev/null +++ b/libprobe/check.py @@ -0,0 +1,23 @@ +import abc +from .asset import Asset + + +class Check(abc.ABC): + key: str # Check key + unchanged_eol: int = 0 # For example: 14400 for 4 hours, 0 for disabled + + def __init_subclass__(cls, **kwargs): + if not hasattr(cls, 'key'): + raise NotImplementedError('key not implemented') + if not isinstance(cls.key, str): + raise NotImplementedError('key must be type str') + return super().__init_subclass__(**kwargs) + + # Method run(..) must return a check result, it receives: + # asset: Asset + # local_config: Local configuration (credentials etc.) + # config: Asset configuration from InfraSonar + @staticmethod + @abc.abstractmethod + async def run(asset: Asset, local_config: dict, config: dict) -> dict: + ... diff --git a/libprobe/config.py b/libprobe/config.py index 6d12948..b3dfdb7 100644 --- a/libprobe/config.py +++ b/libprobe/config.py @@ -17,7 +17,6 @@ username: charlie password: "my other secret" """ -from typing import Optional import logging @@ -50,7 +49,7 @@ def decrypt(layer, fernet): decrypt(v, fernet) -def get_config(conf: dict, probe_name: str, asset_id: int, use: Optional[str]): +def get_config(conf: dict, probe_name: str, asset_id: int, use: str | None): # use might both be None or an empty string, depending if the `_use` option # is available for the probe; both must be ignored probe = conf.get(use or probe_name) diff --git a/libprobe/exceptions.py b/libprobe/exceptions.py index 572e51d..5dbc621 100644 --- a/libprobe/exceptions.py +++ b/libprobe/exceptions.py @@ -1,5 +1,4 @@ from .severity import Severity -from typing import Optional class IgnoreResultException(Exception): @@ -62,7 +61,7 @@ def __init__( self, msg: str, result: dict, - severity: Optional[Severity] = None): + severity: Severity | None = None): assert isinstance(result, dict) self.is_exception = severity is not None super().__init__( diff --git a/libprobe/net/package.py b/libprobe/net/package.py index 47d8e83..03d5151 100644 --- a/libprobe/net/package.py +++ b/libprobe/net/package.py @@ -1,7 +1,7 @@ from __future__ import annotations import msgpack import struct -from typing import Optional, Any +from typing import Any class Package(object): @@ -10,7 +10,7 @@ class Package(object): st_package = struct.Struct(' bool: def request( self, pkg: Package, - timeout: Union[None, float, int] = None + timeout: float | int | None = None ) -> asyncio.Future: self._pid += 1 self._pid %= 0x10000 @@ -86,7 +85,7 @@ def data_received(self, data: bytes): def on_package_received(self, pkg: Package): raise NotImplementedError - async def _timer(self, pid: int, timeout: Union[float, int]): + async def _timer(self, pid: int, timeout: float | int): await asyncio.sleep(timeout) try: future, task = self._requests.pop(pid) @@ -97,7 +96,7 @@ async def _timer(self, pid: int, timeout: Union[float, int]): future.set_exception(TimeoutError( f'request timed out on package id: {pid}')) - def _get_future(self, pkg: Package) -> Optional[asyncio.Future]: + def _get_future(self, pkg: Package) -> asyncio.Future | None: future, task = self._requests.pop(pkg.pid, (None, None)) if future is None: logging.error( diff --git a/libprobe/probe.py b/libprobe/probe.py index b40cb6b..1270e93 100644 --- a/libprobe/probe.py +++ b/libprobe/probe.py @@ -11,7 +11,7 @@ from cryptography.fernet import Fernet from pathlib import Path from setproctitle import setproctitle -from typing import Optional, Dict, Tuple, Callable, Awaitable, Mapping +from typing import Callable, Awaitable, Mapping from .exceptions import ( CheckException, IgnoreResultException, @@ -25,6 +25,8 @@ from .asset import Asset from .config import encrypt, decrypt, get_config from .response import UploadFile, FileType +from .check import Check + HEADER_FILE = """ # WARNING: InfraSonar will make `password` and `secret` values unreadable but @@ -97,7 +99,7 @@ def __init__( self, name: str, version: str, - checks: Mapping[str, Callable[[Asset, dict, dict], Awaitable[dict]]], + checks: tuple[type[Check], ...], config_path: str = INFRASONAR_CONF_FN ): """Initialize a Infrasonar probe. @@ -117,26 +119,26 @@ def __init__( logger.setup_logger() start_msg = 'starting' if dry_run is None else 'dry-run' logging.warning(f'{start_msg} probe collector: {name} v{version}') - self.loop: Optional[asyncio.AbstractEventLoop] = None + self.loop: asyncio.AbstractEventLoop | None = None self.name: str = name self.version: str = version - self._checks_funs: Mapping[ - str, - Callable[[Asset, dict, dict], Awaitable[dict]]] = checks + self._my_checks: Mapping[str, type[Check]] = { + check.key: check for check in checks} self._config_path: Path = Path(config_path) self._connecting: bool = False - self._protocol: Optional[AgentcoreProtocol] = None + self._protocol: AgentcoreProtocol | None = None self._retry_next: int = 0 self._retry_step: int = 1 - self._local_config: Optional[dict] = None - self._local_config_mtime: Optional[float] = None - self._checks_config: Dict[ - Tuple[int, int], - Tuple[Tuple[str, str], dict]] = {} - self._checks: Dict[Tuple[int, int], asyncio.Future] = {} - self._dry_run: Optional[Tuple[Asset, dict]] = \ + self._local_config: dict | None = None + self._local_config_mtime: float | None = None + self._checks_config: dict[ + tuple[int, int], + tuple[tuple[str, str], dict]] = {} + self._checks: dict[tuple[int, int], asyncio.Future] = {} + self._dry_run: tuple[Asset, dict] | None = \ None if dry_run is None else self._load_dry_run_assst(dry_run) self._on_close: Callable[[], Awaitable[None]] | None = None + self._prev_checks: dict[tuple, tuple[float, dict]] = {} if not os.path.exists(config_path): try: @@ -155,7 +157,7 @@ def __init__( logging.exception(f"configuration file invalid: {config_path}") exit(1) - def _load_dry_run_assst(self, dry_run: dict) -> Tuple[Asset, dict]: + def _load_dry_run_assst(self, dry_run: dict) -> tuple[Asset, dict]: asset = dry_run.get('asset') if not isinstance(asset, dict): @@ -182,8 +184,8 @@ def _load_dry_run_assst(self, dry_run: dict) -> Tuple[Asset, dict]: f'Missing or invalid `check` in {DRY_RUN}; example: {EDR}') exit(1) - if check_key not in self._checks_funs: - available = ', '.join(self._checks_funs.keys()) + if check_key not in self._my_checks: + available = ', '.join(self._my_checks.keys()) logging.error( f'Unknown check `{check_key}` in {DRY_RUN}; ' f'Available checks: {available}') @@ -229,7 +231,7 @@ async def _start(self): for _ in range(step): await asyncio.sleep(1) - def start(self, loop: Optional[asyncio.AbstractEventLoop] = None): + def start(self, loop: asyncio.AbstractEventLoop | None = None): """Start a Infrasonar probe Args: @@ -255,8 +257,8 @@ async def _do_dry_run(self): assert self._dry_run is not None asset, config = self._dry_run timeout = MAX_CHECK_TIMEOUT - asset_config = self._asset_config(asset.id, config.get('_use')) - fun = self._checks_funs[asset.check] + local_config = self._get_local_config(asset.id, config.get('_use')) + check = self._my_checks[asset.check] ts = time.time() logging.debug(f'run check (dry-run); {asset}') @@ -265,7 +267,8 @@ async def _do_dry_run(self): try: try: res = await asyncio.wait_for( - fun(asset, asset_config, config), timeout=timeout) + check.run(asset, local_config, config), + timeout=timeout) if not isinstance(res, dict): raise TypeError( 'expecting type `dict` as check result ' @@ -376,11 +379,28 @@ async def _connect(self): finally: self._connecting = False + def _unchanged(self, check: type[Check], path: tuple, + result: dict | None) -> bool: + if not check.unchanged_eol: + return False + if result is None: + self._prev_checks.pop(path, None) + return False + + eol, prev = self._prev_checks.get(path, (0.0, None)) + now = time.time() + if eol > now and prev == result: + return True + + self._prev_checks[path] = now + check.unchanged_eol, result + return False + def send( self, + check: type[Check], path: tuple, - result: Optional[dict], - error: Optional[dict], + result: dict | None, + error: dict | None, ts: float, no_count: bool = False): asset_id, _ = path @@ -388,14 +408,20 @@ def send( 'duration': time.time() - ts, 'timestamp': int(ts), } - if no_count: - framework['no_count'] = True - check_data = { - 'result': result, 'error': error, 'framework': framework } + + if no_count: + framework['no_count'] = True + + if self._unchanged(check, path, result): + logging.debug('using previous result (unchanged)') + framework['unchanged'] = True + else: + check_data['result'] = result + pkg = Package.make( AgentcoreProtocol.PROTO_FAF_DUMP, partid=asset_id, @@ -406,7 +432,7 @@ def send( if len(data) > MAX_PACKAGE_SIZE: e = CheckException(f'data package too large ({len(data)} bytes)') logging.error(f'check error; asset_id `{asset_id}`; {str(e)}') - self.send(path, None, e.to_dict(), ts) + self.send(check, path, None, e.to_dict(), ts) elif self._protocol and self._protocol.transport: self._protocol.transport.write(data) @@ -488,7 +514,7 @@ def _read_local_config(self): self._local_config_mtime = mtime self._local_config = config - def _asset_config(self, asset_id: int, use: Optional[str]) -> dict: + def _get_local_config(self, asset_id: int, use: str | None) -> dict: try: self._read_local_config() except Exception: @@ -514,7 +540,7 @@ def _on_upsert_asset(self, asset: list): new = { tuple(path): (names, config) for path, names, config in checks - if names[CHECK_NAME_IDX] in self._checks_funs} + if names[CHECK_NAME_IDX] in self._my_checks} new_checks_config.update(new) self._set_new_checks_config(new_checks_config) @@ -522,7 +548,7 @@ def _on_set_assets(self, assets: list): new_checks_config = { tuple(path): (names, config) for path, names, config in assets - if names[CHECK_NAME_IDX] in self._checks_funs} + if names[CHECK_NAME_IDX] in self._my_checks} self._set_new_checks_config(new_checks_config) def _set_new_checks_config(self, new_checks_config: dict): @@ -562,7 +588,7 @@ async def _run_check_loop(self, path: tuple): asset_id, check_id = path (asset_name, check_key), config = self._checks_config[path] interval = config.get('_interval') - fun = self._checks_funs[check_key] + check = self._my_checks[check_key] asset = Asset(asset_id, asset_name, check_key) my_task = self._checks[path] @@ -612,14 +638,15 @@ async def _run_check_loop(self, path: tuple): # asset_id and check_key are truly immutable, name is not asset = Asset(asset_id, asset_name, check_key) - asset_config = self._asset_config(asset.id, config.get('_use')) + local_config = self._get_local_config(asset.id, config.get('_use')) logging.debug(f'run check; {asset}') try: try: res = await asyncio.wait_for( - fun(asset, asset_config, config), timeout=timeout) + check.run(asset, local_config, config), + timeout=timeout) if not isinstance(res, dict): raise TypeError( 'expecting type `dict` as check result ' @@ -658,27 +685,28 @@ async def _run_check_loop(self, path: tuple): logging.warning( 'incomplete result; ' f'{asset} error: `{e}` severity: {e.severity}') - self.send(path, e.result, e.to_dict(), ts) + self.send(check, path, e.result, e.to_dict(), ts) except NoCountException as e: if not e.is_exception: logging.debug(f'run check ok ({e}); {asset}') - self.send(path, e.result, None, ts, no_count=True) + self.send(check, path, e.result, None, ts, no_count=True) else: logging.warning( 'incomplete result (no count); ' f'{asset} error: `{e}` severity: {e.severity}') - self.send(path, e.result, e.to_dict(), ts, no_count=True) + self.send(check, path, e.result, e.to_dict(), ts, + no_count=True) except CheckException as e: logging.error( 'check error; ' f'{asset} error: `{e}` severity: {e.severity}') - self.send(path, None, e.to_dict(), ts) + self.send(check, path, None, e.to_dict(), ts) else: logging.debug(f'run check ok; {asset}') - self.send(path, res, None, ts) + self.send(check, path, res, None, ts) @staticmethod def tmp_file(filename: str) -> str: diff --git a/libprobe/protocol.py b/libprobe/protocol.py index f384ef4..946b7e6 100644 --- a/libprobe/protocol.py +++ b/libprobe/protocol.py @@ -12,11 +12,11 @@ class AgentcoreProtocol(Protocol): PROTO_REQ_ANNOUNCE = 0x01 - PROTO_FAF_SET_ASSETS = 0x02 # Overwites all assets + PROTO_FAF_SET_ASSETS = 0x02 # Overwrites all assets PROTO_REQ_INFO = 0x03 - PROTO_FAF_UPSERT_ASSET = 0x04 # Overwite/Add a single asset + PROTO_FAF_UPSERT_ASSET = 0x04 # Overwrite or Add a single asset PROTO_FAF_UNSET_ASSETS = 0x05 # Remove given assets diff --git a/libprobe/version.py b/libprobe/version.py index a6221b3..afced14 100644 --- a/libprobe/version.py +++ b/libprobe/version.py @@ -1 +1 @@ -__version__ = '1.0.2' +__version__ = '2.0.0'