Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions asab/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

import logging

from ._teskalabs_kazoo import prefer_teskalabs_kazoo

prefer_teskalabs_kazoo()

from .abc.module import Module
from .abc.service import Service
from .abc.singleton import Singleton
Expand Down
9 changes: 7 additions & 2 deletions asab/library/providers/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,24 @@ async def _set_ready(self, ready=True):

async def subscribe(self, path: str, target: typing.Union[str, tuple, None] = None):
"""
Take a path and subscribe to changes in this directory.
Take a path and subscribe to changes in this directory or item.
When change occurs, publish PubSub signal "Library.change!"

Note:
Keep in mind every provider requires specific implementation.
These might vary in lag between change if the library and its propagation.

Mind following when implementing this method:
- user can subscribe only on existing directory. -> Check whether subscribed path is a directory,
- user can subscribe only on an existing directory or item -> Check the subscribed path
specifically in the provider
- subscribing to nonexisting directory or file should lead to silent error

Args:
path: Absolute path to subscribe (starting with "/")
"""
raise NotImplementedError("{}.subscribe()".format(self.__class__.__name__))

@staticmethod
def _is_item_subscription(path: str) -> bool:
name = path.rstrip("/").rsplit("/", 1)[-1]
return "." in name and not name.endswith((".io", ".d"))
Comment on lines +69 to +71
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

_is_item_subscription() can misclassify dotted directories as items

Line 70 removes trailing / before classification, so paths like /.tenants/ or /foo.bar/ are treated as items. That can route directory subscriptions into wrong handling.

Suggested fix
 `@staticmethod`
 def _is_item_subscription(path: str) -> bool:
-	name = path.rstrip("/").rsplit("/", 1)[-1]
-	return "." in name and not name.endswith((".io", ".d"))
+	# Explicit trailing slash means directory subscription.
+	if path.endswith("/"):
+		return False
+	name = path.rstrip("/").rsplit("/", 1)[-1]
+	# Hidden/internal dotted directories should not be treated as items.
+	if name.startswith(".") and not name.endswith((".io", ".d")):
+		return False
+	return "." in name and not name.endswith((".io", ".d"))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _is_item_subscription(path: str) -> bool:
name = path.rstrip("/").rsplit("/", 1)[-1]
return "." in name and not name.endswith((".io", ".d"))
def _is_item_subscription(path: str) -> bool:
# Explicit trailing slash means directory subscription.
if path.endswith("/"):
return False
name = path.rstrip("/").rsplit("/", 1)[-1]
# Hidden/internal dotted directories should not be treated as items.
if name.startswith(".") and not name.endswith((".io", ".d")):
return False
return "." in name and not name.endswith((".io", ".d"))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@asab/library/providers/abc.py` around lines 69 - 71, _is_item_subscription
currently strips trailing slashes then classifies the final path segment,
causing directory paths like "/.tenants/" or "/foo.bar/" to be misclassified as
items; update _is_item_subscription to first check for a trailing "/" and
immediately return False for directory paths, then extract the final segment
with rsplit("/", 1)[-1] (without rstrip) and apply the existing dot-and-suffix
checks (the "." presence and not ending with (".io", ".d")) so only real item
names are classified as items.

275 changes: 237 additions & 38 deletions asab/library/providers/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ def __init__(self, library, path, layer, *, set_ready=True):
self.DisabledFilePath = os.path.join(self.BasePath, '.disabled.yaml')

self.AggrEvents = []
self.WDs = {} # wd -> (subscribed_path, child_path)
self.WDs = {} # wd -> watched directory path
self.WDSubscriptions = {} # wd -> list of subscriptions attached to the watched directory
self.WatchedPaths = {} # watched directory path -> wd

def _current_tenant_id(self):
try:
Expand Down Expand Up @@ -382,74 +384,267 @@ def _list(self, path: str):

async def subscribe(self, path, target: typing.Union[str, tuple, None] = None):
"""
Subscribe to filesystem changes under `path`.

Note:
`target` is accepted for API compatibility; current filesystem implementation
watches the resolved path without target-specific scoping.
Translate a logical library subscription into one or more filesystem watches.

Directories are expanded recursively into watched directory boundaries,
while items are implemented by watching their parent directory and later
filtering low-level events by child filename.

Examples:
``subscribe("/Schemas/")``
Recursively watches ``<BasePath>/Schemas`` and its existing
subdirectories. Any later change under that subtree publishes
``/Schemas/``.

``subscribe("/Correlations/Microsoft/Windows/Account Created.yaml")``
Watches the parent directory
``<BasePath>/Correlations/Microsoft/Windows`` and publishes the
item path only when the low-level event name is
``"Account Created.yaml"``.
"""
if not os.path.isdir(self.BasePath + path):
return
if self.FD is None:
L.warning("Cannot subscribe to changes in the filesystem layer of the library: '{}'".format(self.BasePath))
return
self._subscribe_recursive(path, path)

for actual_path in self._iter_subscription_paths(path, target):
if os.path.isdir(actual_path):
self._subscribe_recursive(
subscription=self._build_directory_subscription(path, actual_path),
watched_path=actual_path,
)
elif os.path.isfile(actual_path):
self._subscribe_item(
subscription=self._build_item_subscription(path, actual_path),
)

def _subscribe_recursive(self, subscribed_path, path_to_be_listed):
binary = (self.BasePath + path_to_be_listed).encode()
wd = inotify_add_watch(self.FD, binary, IN_ALL_EVENTS)
if wd == -1:
L.error("Error in inotify_add_watch")

def _iter_subscription_paths(self, path, target: typing.Union[str, tuple, None]):
"""
Resolve a logical subscription path into concrete filesystem paths.

The returned paths are scope-specific filesystem locations under the
global, tenant, or personal library trees.

Examples:
``("/Schemas/", "global")`` -> ``["<BasePath>/Schemas"]``
``("/Schemas/", ("tenant", "acme"))`` ->
``["<BasePath>/.tenants/acme/Schemas"]``
"""
if target in {None, "global"}:
return [self.build_path(path, tenant_specific=False)]

if target == "tenant":
return [self.build_path(path, tenant_specific=True, tenant=tenant) for tenant in self._get_tenants()]
Comment on lines +436 to +437
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

target="tenant" subscription is a one-time snapshot

Line 410 subscribes only tenants present at subscription time. New tenants created later under /.tenants won’t be watched, so change events can be missed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@asab/library/providers/filesystem.py` around lines 409 - 410, The current
target=="tenant" branch only subscribes to tenants returned by _get_tenants() at
call time, missing tenants created later; change it to also subscribe to the
tenants directory itself and dynamically add per-tenant subscriptions on new
entries. Specifically, update the logic around the target=="tenant" branch
(referencing build_path and _get_tenants) to: 1) subscribe to the parent tenants
path (e.g. build_path("/.tenants", tenant_specific=False)) so you receive
create/delete events for tenant entries, and 2) on receiving a new-tenant create
event, call build_path(new_tenant_path, tenant_specific=True, tenant=new_tenant)
and add that tenant to the active subscription set (extract this into a helper
like _add_tenant_subscription if helpful). Ensure removal on tenant deletion and
avoid duplicating subscriptions for already-known tenants.


if isinstance(target, tuple) and len(target) == 2 and target[0] == "tenant":
return [self.build_path(path, tenant_specific=True, tenant=target[1])]
Comment on lines +439 to +440
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate tenant ID before path construction

Line 413 passes target[1] directly into tenant path construction. Values with path separators or traversal tokens can escape the intended .tenants/<tenant> scope.

Suggested fix
+	def _validate_scope_component(self, value: str, name: str) -> str:
+		if not isinstance(value, str) or value in {"", ".", ".."}:
+			raise ValueError("Invalid {} identifier".format(name))
+		if os.sep in value or (os.altsep and os.altsep in value):
+			raise ValueError("Invalid {} identifier".format(name))
+		return value
@@
 		if isinstance(target, tuple) and len(target) == 2 and target[0] == "tenant":
-			return [self.build_path(path, tenant_specific=True, tenant=target[1])]
+			tenant_id = self._validate_scope_component(target[1], "tenant")
+			return [self.build_path(path, tenant_specific=True, tenant=tenant_id)]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@asab/library/providers/filesystem.py` around lines 412 - 413, The code passes
target[1] directly into self.build_path as tenant; validate and normalize that
tenant ID before calling build_path: ensure it's non-empty and does not contain
path separators ('/', '\\'), traversal tokens ('..'), or other unsafe characters
(e.g. require a whitelist like /^[A-Za-z0-9._-]+$/), and either raise a
ValueError on invalid input or sanitize/encode the value and then call
self.build_path(path, tenant_specific=True,
tenant=<validated_or_sanitized_value>); refer to the tuple check for target and
the self.build_path(..., tenant=target[1]) call to locate where to add this
validation.


if target == "personal":
tenant_id = self._current_tenant_id()
cred_id = self._current_credentials_id()
personal_path = self._personal_path(path, tenant_id, cred_id)
return [personal_path] if personal_path is not None else []

Comment on lines +442 to +447
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

target="personal" currently subscribes only one context scope

Line 416–419 resolves only current tenant+credentials, but API semantics describe "personal" as all personal scopes. This misses updates outside current context.

Suggested direction
- def _iter_subscription_paths(self, path, target: typing.Union[str, tuple, None]):
+ async def _iter_subscription_paths(self, path, target: typing.Union[str, tuple, None]):
@@
-		if target == "personal":
-			tenant_id = self._current_tenant_id()
-			cred_id = self._current_credentials_id()
-			personal_path = self._personal_path(path, tenant_id, cred_id)
-			return [personal_path] if personal_path is not None else []
+		if target == "personal":
+			paths = []
+			for tenant_id, cred_id in await self._get_personal_scopes():
+				personal_path = self._personal_path(path, tenant_id, cred_id)
+				if personal_path is not None:
+					paths.append(personal_path)
+			return paths

And in subscribe():

-		for actual_path in self._iter_subscription_paths(path, target):
+		for actual_path in await self._iter_subscription_paths(path, target):
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@asab/library/providers/filesystem.py` around lines 415 - 420, The current
branch for target == "personal" only resolves the single current
tenant+credentials via _current_tenant_id() and _current_credentials_id() and
returns one _personal_path, but "personal" should subscribe all personal scopes;
update subscribe() so that when target == "personal" it enumerates all tenant
IDs and credential IDs (e.g., via existing helpers or by adding helpers like
_all_tenant_ids()/_all_credentials_ids()), call _personal_path(path, tenant_id,
cred_id) for each pair, collect non-None results into the returned list, and
remove the single-current-context shortcut so all personal scopes are included.

if isinstance(target, tuple) and len(target) == 2 and target[0] == "personal":
tenant_id = self._current_tenant_id()
personal_path = self._personal_path(path, tenant_id, target[1])
return [personal_path] if personal_path is not None else []

raise ValueError("Unexpected target: {!r}".format(target))


def _get_tenants(self) -> typing.List[str]:
root = os.path.join(self.BasePath, ".tenants")
if not os.path.isdir(root):
return []

tenants = []
for tenant in os.listdir(root):
tenant_path = os.path.join(root, tenant)
if tenant.startswith(".") or not os.path.isdir(tenant_path):
continue
tenants.append(tenant)
return tenants


def _build_directory_subscription(self, publish_path, actual_path):
"""Create a descriptor for a directory-style logical subscription."""
return {
"kind": "dir",
"publish_path": publish_path,
"item_name": None,
"key": ("dir", publish_path, actual_path),
}


def _build_item_subscription(self, publish_path, actual_path):
"""Create a descriptor for an item-style logical subscription."""
return {
"kind": "item",
"publish_path": publish_path,
"item_name": os.path.basename(actual_path),
"key": ("item", publish_path, actual_path),
}


def _subscribe_item(self, subscription):
"""
Attach an item subscription to its parent directory watch.

Inotify delivers child-name events relative to watched directories, so
exact item semantics are recovered later by comparing the event name with
the descriptor's ``item_name``.

Example:
For ``/Correlations/Microsoft/Windows/Account Created.yaml`` the
provider watches ``.../Windows`` and later checks whether the decoded
inotify child name equals ``"Account Created.yaml"``.
"""
parent_path = os.path.dirname(subscription["key"][2])
wd = self._ensure_watch(parent_path)
if wd is None:
return
self.WDs[wd] = (subscribed_path, path_to_be_listed)
self._attach_subscription(wd, subscription)


def _subscribe_recursive(self, subscription, watched_path):
"""
Ensure a directory subscription covers ``watched_path`` and all descendants.

Filesystem recursion is not native to inotify, so the provider expands the
current directory tree in user space by registering one watch per existing
subdirectory.

Example:
A subscription for ``/Schemas/`` results in watches for
``.../Schemas``, ``.../Schemas/nested``, ``.../Schemas/nested/deeper``,
and so on for the current subtree snapshot.
"""
wd = self._ensure_watch(watched_path)
if wd is None:
return
self._attach_subscription(wd, subscription)

try:
items = self._list(path_to_be_listed)
except KeyError:
entries = list(os.scandir(watched_path))
except FileNotFoundError:
# subscribing to non-existing directory is silent
return

for item in items:
if item.type == "dir":
self._subscribe_recursive(subscribed_path, item.name)
for entry in entries:
if entry.name.startswith(".") or not entry.is_dir(follow_symlinks=False):
continue
self._subscribe_recursive(subscription, entry.path)


def _ensure_watch(self, watched_path):
"""Create or reuse the inotify watch for one concrete directory path."""
wd = self.WatchedPaths.get(watched_path)
if wd is not None:
return wd

binary = watched_path.encode()
wd = inotify_add_watch(self.FD, binary, IN_ALL_EVENTS)
if wd == -1:
L.error("Error in inotify_add_watch")
return None

self.WatchedPaths[watched_path] = wd
self.WDs[wd] = watched_path
self.WDSubscriptions.setdefault(wd, [])
return wd


def _attach_subscription(self, wd, subscription):
"""Attach a logical subscription descriptor to one watched directory."""
subscriptions = self.WDSubscriptions.setdefault(wd, [])
if any(existing["key"] == subscription["key"] for existing in subscriptions):
return
subscriptions.append(subscription)


def _remove_watch(self, wd):
watched_path = self.WDs.pop(wd, None)
self.WDSubscriptions.pop(wd, None)
if watched_path is not None:
self.WatchedPaths.pop(watched_path, None)


def _on_inotify_read(self):
"""
Read and decode one batch of packed ``inotify_event`` records.

A single ``os.read`` call may return many concatenated records, so the
buffer is walked record by record and each decoded event is handed to the
provider-level event interpreter.
"""
data = os.read(self.FD, 64 * 1024)

pos = 0
while pos < len(data):
wd, mask, cookie, namesize = struct.unpack_from(EVENT_FMT, data, pos)
pos += EVENT_SIZE + namesize
name = (data[pos - namesize: pos].split(b'\x00', 1)[0]).decode()
self._handle_inotify_event(wd, mask, name)

if mask & IN_ISDIR == IN_ISDIR and ((mask & IN_CREATE == IN_CREATE) or (mask & IN_MOVED_TO == IN_MOVED_TO)):
subscribed_path, child_path = self.WDs[wd]
self._subscribe_recursive(subscribed_path, "/".join([child_path, name]))
self.AggrTimer.restart(0.2)

if mask & IN_IGNORED == IN_IGNORED:
# cleanup
del self.WDs[wd]
continue

name = (data[pos - namesize: pos].split(b'\x00', 1)[0]).decode()
def _handle_inotify_event(self, wd, mask, name):
"""
Interpret one decoded inotify event against attached subscription descriptors.

full_path = os.path.join(self.BasePath, name)
if os.path.normpath(full_path) == os.path.normpath(self.DisabledFilePath):
self.App.TaskService.schedule(self.Library._read_disabled(publish_changes=True))
Newly created or moved-in directories are expanded into fresh recursive
watches so directory subscriptions keep covering the subtree as it grows.

self.AggrTimer.restart(0.2)
Example:
If a watched ``.../Schemas`` directory reports a new child directory
named ``nested``, the provider adds recursive watches under
``.../Schemas/nested`` before aggregating the logical change
publication.
"""
watched_path = self.WDs.get(wd)
subscriptions = list(self.WDSubscriptions.get(wd, ()))

if mask & IN_ISDIR == IN_ISDIR and ((mask & IN_CREATE == IN_CREATE) or (mask & IN_MOVED_TO == IN_MOVED_TO)):
# Extend recursive coverage when a new directory appears under an
# already subscribed directory subtree.
child_path = os.path.join(watched_path, name) if watched_path is not None else None
if child_path is not None:
for subscription in subscriptions:
if subscription["kind"] != "dir":
continue
self._subscribe_recursive(subscription, child_path)

self.AggrEvents.append((subscriptions, name))

full_path = os.path.join(watched_path, name) if watched_path is not None and len(name) > 0 else watched_path
if full_path is not None and os.path.normpath(full_path) == os.path.normpath(self.DisabledFilePath):
self.App.TaskService.schedule(self.Library._read_disabled(publish_changes=True))

if mask & IN_IGNORED == IN_IGNORED:
self._remove_watch(wd)


async def _on_aggr_timer(self):
"""
Collapse low-level inotify bursts into deduplicated logical publications.

This keeps filesystem notification multiplicity from leaking into the
``Library.change!`` contract.

Example:
Several low-level write-related events for
``Account Created.yaml`` collapse into one logical publication of
``/Correlations/Microsoft/Windows/Account Created.yaml``.
"""
to_advertise = set()
for wd, mask, cookie, name in self.AggrEvents:
# When wathed directory is being removed, more than one inotify events are being produced.
# When IN_IGNORED event occurs, respective wd is removed from self.WDs,
# but some other events (like IN_DELETE_SELF) get to this point, without having its reference in self.WDs.
subscribed_path, _ = self.WDs.get(wd, (None, None))
to_advertise.add(subscribed_path)
for subscriptions, name in self.AggrEvents:
for subscription in subscriptions:
if subscription["kind"] == "dir":
to_advertise.add(subscription["publish_path"])
elif name == subscription["item_name"]:
to_advertise.add(subscription["publish_path"])
self.AggrEvents.clear()

for path in to_advertise:
Expand Down Expand Up @@ -504,3 +699,7 @@ async def finalize(self, app):
if self.FD is not None:
self.App.Loop.remove_reader(self.FD)
os.close(self.FD)
self.AggrEvents.clear()
self.WDs.clear()
self.WDSubscriptions.clear()
self.WatchedPaths.clear()
Loading
Loading