diff --git a/README.md b/README.md index 8e65d21..4dcac65 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ Variable | Default | Description `LOG_COLORIZED` | `0` | Log using colors (`0`=disabled, `1`=enabled). `LOG_FTM` | `%y%m%d %H:%M:%S` | Log format prefix. `OUTPUT_TYPE` | `JSON` | Set the output type to `JSON` or `PPRINT` (Only for a dry run). +`UNCHANGED_EOL` | `14400` | Unchanged End-Of-Life in X seconds. Prevents sending equal check data, a value of `0` disables `unchanged` _(defaults to 4 hours)_. ## Usage diff --git a/libprobe/config.py b/libprobe/config.py index 6d12948..b3dfdb7 100644 --- a/libprobe/config.py +++ b/libprobe/config.py @@ -17,7 +17,6 @@ username: charlie password: "my other secret" """ -from typing import Optional import logging @@ -50,7 +49,7 @@ def decrypt(layer, fernet): decrypt(v, fernet) -def get_config(conf: dict, probe_name: str, asset_id: int, use: Optional[str]): +def get_config(conf: dict, probe_name: str, asset_id: int, use: str | None): # use might both be None or an empty string, depending if the `_use` option # is available for the probe; both must be ignored probe = conf.get(use or probe_name) diff --git a/libprobe/exceptions.py b/libprobe/exceptions.py index 572e51d..5dbc621 100644 --- a/libprobe/exceptions.py +++ b/libprobe/exceptions.py @@ -1,5 +1,4 @@ from .severity import Severity -from typing import Optional class IgnoreResultException(Exception): @@ -62,7 +61,7 @@ def __init__( self, msg: str, result: dict, - severity: Optional[Severity] = None): + severity: Severity | None = None): assert isinstance(result, dict) self.is_exception = severity is not None super().__init__( diff --git a/libprobe/net/package.py b/libprobe/net/package.py index 47d8e83..03d5151 100644 --- a/libprobe/net/package.py +++ b/libprobe/net/package.py @@ -1,7 +1,7 @@ from __future__ import annotations import msgpack import struct -from typing import Optional, Any +from typing import Any class Package(object): @@ -10,7 +10,7 @@ class Package(object): st_package = struct.Struct(' bool: def request( self, pkg: Package, - timeout: Union[None, float, int] = None + timeout: float | int | None = None ) -> asyncio.Future: self._pid += 1 self._pid %= 0x10000 @@ -86,7 +85,7 @@ def data_received(self, data: bytes): def on_package_received(self, pkg: Package): raise NotImplementedError - async def _timer(self, pid: int, timeout: Union[float, int]): + async def _timer(self, pid: int, timeout: float | int): await asyncio.sleep(timeout) try: future, task = self._requests.pop(pid) @@ -97,7 +96,7 @@ async def _timer(self, pid: int, timeout: Union[float, int]): future.set_exception(TimeoutError( f'request timed out on package id: {pid}')) - def _get_future(self, pkg: Package) -> Optional[asyncio.Future]: + def _get_future(self, pkg: Package) -> asyncio.Future | None: future, task = self._requests.pop(pkg.pid, (None, None)) if future is None: logging.error( diff --git a/libprobe/probe.py b/libprobe/probe.py index b40cb6b..feb1c7d 100644 --- a/libprobe/probe.py +++ b/libprobe/probe.py @@ -11,7 +11,7 @@ from cryptography.fernet import Fernet from pathlib import Path from setproctitle import setproctitle -from typing import Optional, Dict, Tuple, Callable, Awaitable, Mapping +from typing import Callable, Awaitable, Mapping from .exceptions import ( CheckException, IgnoreResultException, @@ -117,7 +117,7 @@ def __init__( logger.setup_logger() start_msg = 'starting' if dry_run is None else 'dry-run' logging.warning(f'{start_msg} probe collector: {name} v{version}') - self.loop: Optional[asyncio.AbstractEventLoop] = None + self.loop: asyncio.AbstractEventLoop | None = None self.name: str = name self.version: str = version self._checks_funs: Mapping[ @@ -125,18 +125,20 @@ def __init__( Callable[[Asset, dict, dict], Awaitable[dict]]] = checks self._config_path: Path = Path(config_path) self._connecting: bool = False - self._protocol: Optional[AgentcoreProtocol] = None + self._protocol: AgentcoreProtocol | None = None self._retry_next: int = 0 self._retry_step: int = 1 - self._local_config: Optional[dict] = None - self._local_config_mtime: Optional[float] = None - self._checks_config: Dict[ - Tuple[int, int], - Tuple[Tuple[str, str], dict]] = {} - self._checks: Dict[Tuple[int, int], asyncio.Future] = {} - self._dry_run: Optional[Tuple[Asset, dict]] = \ + self._local_config: dict | None = None + self._local_config_mtime: float | None = None + self._checks_config: dict[ + tuple[int, int], + tuple[tuple[str, str], dict]] = {} + self._checks: dict[tuple[int, int], asyncio.Future] = {} + self._dry_run: tuple[Asset, dict] | None = \ None if dry_run is None else self._load_dry_run_assst(dry_run) self._on_close: Callable[[], Awaitable[None]] | None = None + self._prev_checks: dict[tuple, tuple[float, dict]] = {} + self._unchanged_eol = float(os.getenv('UNCHANGED_EOL', '14400')) if not os.path.exists(config_path): try: @@ -155,7 +157,7 @@ def __init__( logging.exception(f"configuration file invalid: {config_path}") exit(1) - def _load_dry_run_assst(self, dry_run: dict) -> Tuple[Asset, dict]: + def _load_dry_run_assst(self, dry_run: dict) -> tuple[Asset, dict]: asset = dry_run.get('asset') if not isinstance(asset, dict): @@ -229,7 +231,7 @@ async def _start(self): for _ in range(step): await asyncio.sleep(1) - def start(self, loop: Optional[asyncio.AbstractEventLoop] = None): + def start(self, loop: asyncio.AbstractEventLoop | None = None): """Start a Infrasonar probe Args: @@ -376,11 +378,26 @@ async def _connect(self): finally: self._connecting = False + def _unchanged(self, path: tuple, result: dict | None) -> bool: + if not self._unchanged_eol: + return False + if result is None: + self._prev_checks.pop(path, None) + return False + + eol, prev = self._prev_checks.get(path, (0.0, None)) + now = time.time() + if eol > now and prev == result: + return True + + self._prev_checks[path] = now + self._unchanged_eol, result + return False + def send( self, path: tuple, - result: Optional[dict], - error: Optional[dict], + result: dict | None, + error: dict | None, ts: float, no_count: bool = False): asset_id, _ = path @@ -388,14 +405,20 @@ def send( 'duration': time.time() - ts, 'timestamp': int(ts), } - if no_count: - framework['no_count'] = True - check_data = { - 'result': result, 'error': error, 'framework': framework } + + if no_count: + framework['no_count'] = True + + if self._unchanged(path, result): + logging.debug('using previous result (unchanged)') + framework['unchanged'] = True + else: + check_data['result'] = result + pkg = Package.make( AgentcoreProtocol.PROTO_FAF_DUMP, partid=asset_id, @@ -488,7 +511,7 @@ def _read_local_config(self): self._local_config_mtime = mtime self._local_config = config - def _asset_config(self, asset_id: int, use: Optional[str]) -> dict: + def _asset_config(self, asset_id: int, use: str | None) -> dict: try: self._read_local_config() except Exception: diff --git a/libprobe/version.py b/libprobe/version.py index a6221b3..3f6fab6 100644 --- a/libprobe/version.py +++ b/libprobe/version.py @@ -1 +1 @@ -__version__ = '1.0.2' +__version__ = '1.0.3'