Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Variable | Default | Description
`LOG_COLORIZED` | `0` | Log using colors (`0`=disabled, `1`=enabled).
`LOG_FTM` | `%y%m%d %H:%M:%S` | Log format prefix.
`OUTPUT_TYPE` | `JSON` | Set the output type to `JSON` or `PPRINT` (Only for a dry run).
`UNCHANGED_EOL` | `14400` | Unchanged End-Of-Life in X seconds. Prevents sending equal check data, a value of `0` disables `unchanged` _(defaults to 4 hours)_.

## Usage

Expand Down
3 changes: 1 addition & 2 deletions libprobe/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
username: charlie
password: "my other secret"
"""
from typing import Optional
import logging


Expand Down Expand Up @@ -50,7 +49,7 @@ def decrypt(layer, fernet):
decrypt(v, fernet)


def get_config(conf: dict, probe_name: str, asset_id: int, use: Optional[str]):
def get_config(conf: dict, probe_name: str, asset_id: int, use: str | None):
# use might both be None or an empty string, depending if the `_use` option
# is available for the probe; both must be ignored
probe = conf.get(use or probe_name)
Expand Down
3 changes: 1 addition & 2 deletions libprobe/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from .severity import Severity
from typing import Optional


class IgnoreResultException(Exception):
Expand Down Expand Up @@ -62,7 +61,7 @@ def __init__(
self,
msg: str,
result: dict,
severity: Optional[Severity] = None):
severity: Severity | None = None):
assert isinstance(result, dict)
self.is_exception = severity is not None
super().__init__(
Expand Down
4 changes: 2 additions & 2 deletions libprobe/net/package.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations
import msgpack
import struct
from typing import Optional, Any
from typing import Any


class Package(object):
Expand All @@ -10,7 +10,7 @@ class Package(object):

st_package = struct.Struct('<QIHBB')

def __init__(self, barray: Optional[bytearray] = None):
def __init__(self, barray: bytearray | None = None):
if barray is None:
return

Expand Down
17 changes: 8 additions & 9 deletions libprobe/net/protocol.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import logging
from typing import Union, Optional, Dict, Tuple
from .package import Package


Expand All @@ -14,19 +13,19 @@ class Protocol(asyncio.Protocol):
def __init__(self):
super().__init__()
self._buffered_data = bytearray()
self._package: Optional[Package] = None
self._requests: Dict[int, Tuple[asyncio.Future,
Optional[asyncio.Task]]] = dict()
self._package: Package | None = None
self._requests: dict[int, tuple[asyncio.Future,
asyncio.Task | None]] = dict()
self._pid = 0
self.transport: Optional[asyncio.Transport] = None
self.transport: asyncio.Transport | None = None

def connection_made(self, transport: asyncio.Transport): # type: ignore
'''
override asyncio.Protocol
'''
self.transport = transport

def connection_lost(self, exc: Optional[Exception]):
def connection_lost(self, exc: Exception | None):
'''
override asyncio.Protocol
'''
Expand All @@ -40,7 +39,7 @@ def is_connected(self) -> bool:
def request(
self,
pkg: Package,
timeout: Union[None, float, int] = None
timeout: float | int | None = None
) -> asyncio.Future:
self._pid += 1
self._pid %= 0x10000
Expand Down Expand Up @@ -86,7 +85,7 @@ def data_received(self, data: bytes):
def on_package_received(self, pkg: Package):
raise NotImplementedError

async def _timer(self, pid: int, timeout: Union[float, int]):
async def _timer(self, pid: int, timeout: float | int):
await asyncio.sleep(timeout)
try:
future, task = self._requests.pop(pid)
Expand All @@ -97,7 +96,7 @@ async def _timer(self, pid: int, timeout: Union[float, int]):
future.set_exception(TimeoutError(
f'request timed out on package id: {pid}'))

def _get_future(self, pkg: Package) -> Optional[asyncio.Future]:
def _get_future(self, pkg: Package) -> asyncio.Future | None:
future, task = self._requests.pop(pkg.pid, (None, None))
if future is None:
logging.error(
Expand Down
61 changes: 42 additions & 19 deletions libprobe/probe.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from cryptography.fernet import Fernet
from pathlib import Path
from setproctitle import setproctitle
from typing import Optional, Dict, Tuple, Callable, Awaitable, Mapping
from typing import Callable, Awaitable, Mapping
from .exceptions import (
CheckException,
IgnoreResultException,
Expand Down Expand Up @@ -117,26 +117,28 @@ def __init__(
logger.setup_logger()
start_msg = 'starting' if dry_run is None else 'dry-run'
logging.warning(f'{start_msg} probe collector: {name} v{version}')
self.loop: Optional[asyncio.AbstractEventLoop] = None
self.loop: asyncio.AbstractEventLoop | None = None
self.name: str = name
self.version: str = version
self._checks_funs: Mapping[
str,
Callable[[Asset, dict, dict], Awaitable[dict]]] = checks
self._config_path: Path = Path(config_path)
self._connecting: bool = False
self._protocol: Optional[AgentcoreProtocol] = None
self._protocol: AgentcoreProtocol | None = None
self._retry_next: int = 0
self._retry_step: int = 1
self._local_config: Optional[dict] = None
self._local_config_mtime: Optional[float] = None
self._checks_config: Dict[
Tuple[int, int],
Tuple[Tuple[str, str], dict]] = {}
self._checks: Dict[Tuple[int, int], asyncio.Future] = {}
self._dry_run: Optional[Tuple[Asset, dict]] = \
self._local_config: dict | None = None
self._local_config_mtime: float | None = None
self._checks_config: dict[
tuple[int, int],
tuple[tuple[str, str], dict]] = {}
self._checks: dict[tuple[int, int], asyncio.Future] = {}
self._dry_run: tuple[Asset, dict] | None = \
None if dry_run is None else self._load_dry_run_assst(dry_run)
self._on_close: Callable[[], Awaitable[None]] | None = None
self._prev_checks: dict[tuple, tuple[float, dict]] = {}
self._unchanged_eol = float(os.getenv('UNCHANGED_EOL', '14400'))

if not os.path.exists(config_path):
try:
Expand All @@ -155,7 +157,7 @@ def __init__(
logging.exception(f"configuration file invalid: {config_path}")
exit(1)

def _load_dry_run_assst(self, dry_run: dict) -> Tuple[Asset, dict]:
def _load_dry_run_assst(self, dry_run: dict) -> tuple[Asset, dict]:
asset = dry_run.get('asset')

if not isinstance(asset, dict):
Expand Down Expand Up @@ -229,7 +231,7 @@ async def _start(self):
for _ in range(step):
await asyncio.sleep(1)

def start(self, loop: Optional[asyncio.AbstractEventLoop] = None):
def start(self, loop: asyncio.AbstractEventLoop | None = None):
"""Start a Infrasonar probe

Args:
Expand Down Expand Up @@ -376,26 +378,47 @@ async def _connect(self):
finally:
self._connecting = False

def _unchanged(self, path: tuple, result: dict | None) -> bool:
if not self._unchanged_eol:
return False
if result is None:
self._prev_checks.pop(path, None)
return False

eol, prev = self._prev_checks.get(path, (0.0, None))
now = time.time()
if eol > now and prev == result:
return True

self._prev_checks[path] = now + self._unchanged_eol, result
return False

def send(
self,
path: tuple,
result: Optional[dict],
error: Optional[dict],
result: dict | None,
error: dict | None,
ts: float,
no_count: bool = False):
asset_id, _ = path
framework = {
'duration': time.time() - ts,
'timestamp': int(ts),
}
if no_count:
framework['no_count'] = True

check_data = {
'result': result,
'error': error,
'framework': framework
}

if no_count:
framework['no_count'] = True

if self._unchanged(path, result):
logging.debug('using previous result (unchanged)')
framework['unchanged'] = True
else:
check_data['result'] = result

pkg = Package.make(
AgentcoreProtocol.PROTO_FAF_DUMP,
partid=asset_id,
Expand Down Expand Up @@ -488,7 +511,7 @@ def _read_local_config(self):
self._local_config_mtime = mtime
self._local_config = config

def _asset_config(self, asset_id: int, use: Optional[str]) -> dict:
def _asset_config(self, asset_id: int, use: str | None) -> dict:
try:
self._read_local_config()
except Exception:
Expand Down
2 changes: 1 addition & 1 deletion libprobe/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.0.2'
__version__ = '1.0.3'