Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 55 additions & 29 deletions asab/library/providers/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def __init__(self, library, path, layer):

self.Subscriptions: typing.Iterable[str] = set()
self.NodeDigests: typing.Dict[str, bytes] = {}
self.SubscriptionActualPaths: typing.Dict[typing.Tuple[typing.Union[str, tuple, None], str], typing.List[str]] = {}


async def finalize(self, app):
Expand Down Expand Up @@ -233,10 +234,14 @@ async def _on_zk_lost(self, event_name, zkcontainer):


async def _get_version_counter(self, event_name=None):
if self.Zookeeper is None:
if self.Zookeeper is None or not self.IsReady:
return

try:
version = await self.Zookeeper.get_data(self.VersionNodePath)
except kazoo.exceptions.ConnectionClosedError:
return

version = await self.Zookeeper.get_data(self.VersionNodePath)
self._check_version_counter(version)


Expand Down Expand Up @@ -288,6 +293,16 @@ def _personal_node_path(self, path: str, tenant_id: typing.Optional[str], cred_i
return None
return "{}/.personal/{}/{}{}".format(self.BasePath, tenant_id, cred_id, path).rstrip("/")

def _subscription_personal_path(self, path: str, target: typing.Union[str, tuple, None]) -> typing.Optional[str]:
tenant_id = self._current_tenant_id()
if isinstance(target, tuple) and len(target) == 2 and target[0] == "personal":
cred_id = target[1]
else:
cred_id = self._current_credentials_id()
if not tenant_id or not cred_id:
return None
return "/.personal/{}/{}{}".format(tenant_id, cred_id, path)

async def read(self, path: str) -> typing.Optional[typing.IO]:
if self.Zookeeper is None:
L.warning("Zookeeper Client has not been established (yet). Cannot read {}".format(path))
Expand Down Expand Up @@ -459,34 +474,34 @@ def build_path(self, path, tenant_specific=False):

async def subscribe(self, path, target: typing.Union[str, tuple, None] = None):
self.Subscriptions.add((target, path))
if not hasattr(self, "SubscriptionActualPaths"):
self.SubscriptionActualPaths = {}

if target in {None, "global"}:
self.SubscriptionActualPaths[(target, path)] = [path]
self.NodeDigests[path] = await self._get_directory_hash(path)

elif target == "tenant":
actual_paths = []
for tenant in await self._get_tenants():
actual_path = "/.tenants/{}{}".format(tenant, path)
actual_paths.append(actual_path)
self.NodeDigests[actual_path] = await self._get_directory_hash(actual_path)
self.SubscriptionActualPaths[(target, path)] = actual_paths

elif isinstance(target, tuple) and len(target) == 2 and target[0] == "tenant":
_, tenant = target
actual_path = "/.tenants/{}{}".format(tenant, path)
self.SubscriptionActualPaths[(target, path)] = [actual_path]
self.NodeDigests[actual_path] = await self._get_directory_hash(actual_path)

elif target == "personal":
# current tenant + current credentials
try:
tenant_id = Tenant.get()
except LookupError:
tenant_id = None
try:
authz = Authz.get()
cred_id = getattr(authz, "CredentialsId", None)
except LookupError:
cred_id = None
if tenant_id and cred_id:
actual_path = "/.personal/{}/{}{}".format(tenant_id, cred_id, path)
elif target == "personal" or (isinstance(target, tuple) and len(target) == 2 and target[0] == "personal"):
actual_path = self._subscription_personal_path(path, target)
if actual_path is not None:
self.SubscriptionActualPaths[(target, path)] = [actual_path]
self.NodeDigests[actual_path] = await self._get_directory_hash(actual_path)
else:
self.SubscriptionActualPaths[(target, path)] = []
else:
Comment on lines +498 to 505
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Personal subscriptions can get stuck permanently when context is missing at subscribe time.

If tenant/credentials are unavailable, subscribe() stores [] for (target, path). Later, _on_library_changed() treats that as a valid cached value and never recomputes, so updates may never be observed.

💡 Proposed fix
@@
 		elif target == "personal" or (isinstance(target, tuple) and len(target) == 2 and target[0] == "personal"):
 			actual_path = self._subscription_personal_path(path, target)
 			if actual_path is not None:
 				self.SubscriptionActualPaths[(target, path)] = [actual_path]
 				self.NodeDigests[actual_path] = await self._get_directory_hash(actual_path)
 			else:
-				self.SubscriptionActualPaths[(target, path)] = []
+				# Keep unresolved; allow late resolution during change checks.
+				self.SubscriptionActualPaths.pop((target, path), None)
@@
 			elif target == "personal" or (isinstance(target, tuple) and len(target) == 2 and target[0] == "personal"):
 				actual_paths = subscription_actual_paths.get((target, path))
-				if actual_paths is None:
+				if not actual_paths:
 					actual_path = self._subscription_personal_path(path, target)
 					actual_paths = [actual_path] if actual_path is not None else []
+					if actual_paths:
+						self.SubscriptionActualPaths[(target, path)] = actual_paths
 				for actual_path in actual_paths:

Also applies to: 575-580

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@asab/library/providers/zookeeper.py` around lines 498 - 505, The current
subscribe() branch for personal targets caches an empty list when
self._subscription_personal_path(path, target) returns None, causing
_on_library_changed() to treat it as resolved and never retry; instead, change
the logic in the subscribe path (the branch that calls
self._subscription_personal_path and currently assigns [] to
self.SubscriptionActualPaths[(target, path)]) to NOT cache an empty list —
either delete the key or store a sentinel (e.g., None or a dedicated unresolved
marker) when actual_path is None, and update _on_library_changed() to treat that
sentinel/absent key as unresolved (triggering recomputation of
self._subscription_personal_path and re-evaluation of NodeDigests) so personal
subscriptions are retried when context becomes available.

raise ValueError("Unexpected target: {!r}".format(target))

Expand All @@ -512,6 +527,10 @@ def recursive_traversal(path, digest):
return digest.digest()

async def _on_library_changed(self, event_name=None):
if self.Zookeeper is None or not self.IsReady:
return
subscription_actual_paths = getattr(self, "SubscriptionActualPaths", {})

for (target, path) in list(self.Subscriptions):

async def do_check_path(actual_path):
Expand All @@ -526,40 +545,47 @@ async def do_check_path(actual_path):
if target in {None, "global"}:
try:
await do_check_path(actual_path=path)
except kazoo.exceptions.ConnectionClosedError:
return
except Exception as e:
L.exception("Failed to process library changes: '{}'".format(e), struct_data={"path": path})

elif target == "tenant":
for tenant in await self._get_tenants():
try:
tenants = await self._get_tenants()
except kazoo.exceptions.ConnectionClosedError:
return
for tenant in tenants:
try:
await do_check_path(actual_path="/.tenants/{}{}".format(tenant, path))
except kazoo.exceptions.ConnectionClosedError:
return
except Exception as e:
L.exception("Failed to process library changes: '{}'".format(e), struct_data={"path": path, "tenant": tenant})

elif isinstance(target, tuple) and len(target) == 2 and target[0] == "tenant":
tenant = target[1]
try:
await do_check_path(actual_path="/.tenants/{}{}".format(tenant, path))
except kazoo.exceptions.ConnectionClosedError:
return
except Exception as e:
L.exception("Failed to process library changes: '{}'".format(e), struct_data={"path": path, "tenant": tenant})

elif target == "personal":
try:
tenant_id = Tenant.get()
except LookupError:
tenant_id = None
try:
authz = Authz.get()
cred_id = getattr(authz, "CredentialsId", None)
except LookupError:
cred_id = None
if tenant_id and cred_id:
elif target == "personal" or (isinstance(target, tuple) and len(target) == 2 and target[0] == "personal"):
actual_paths = subscription_actual_paths.get((target, path))
if actual_paths is None:
actual_path = self._subscription_personal_path(path, target)
actual_paths = [actual_path] if actual_path is not None else []
for actual_path in actual_paths:
try:
await do_check_path(actual_path="/.personal/{}/{}{}".format(tenant_id, cred_id, path))
await do_check_path(actual_path=actual_path)
except kazoo.exceptions.ConnectionClosedError:
return
except Exception as e:
L.exception(
"Failed to process library changes: '{}'".format(e),
struct_data={"path": path, "tenant": tenant_id, "credentials_id": cred_id},
struct_data={"path": path},
)
else:
raise ValueError("Unexpected target: {!r}".format((target, path)))
Expand Down
89 changes: 55 additions & 34 deletions asab/library/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ async def finalize(self, app):
await lib.finalize(self.App)

async def _on_tick60(self, message_type):
if len(self.Libraries) == 0 or not self.Libraries[0].IsReady:
return

await self._read_disabled()
await self._read_favorites()

Expand Down Expand Up @@ -434,18 +437,21 @@ async def _read_favorites(self):
- self.Favorites: { '/path/file.ext': ['tenant', '*', ...] }
- self.FavoritePaths: [ ('/path/folder/', ['tenant', '*', ...]), ... ]
Expected YAML shape:
/path:
tenants:
- system
/path:
tenants:
- system
"""
if len(self.Libraries) == 0:
return

fav_data = None
fav_file = None

try:
fav_file = await self.Libraries[0].read('/.favorites.yaml')
except Exception as e:
L.warning("Failed to read '/.favorites.yaml': {}.".format(e))
self.Favorites = {}
self.FavoritePaths = []
if getattr(self.Libraries[0], "IsReady", False):
L.warning("Failed to read '/.favorites.yaml': {}.".format(e))
return

if fav_file is None:
Expand Down Expand Up @@ -515,41 +521,56 @@ async def _read_favorites(self):
self.FavoritePaths = folders

async def _read_disabled(self, publish_changes=False):
if len(self.Libraries) == 0:
return

old_disabled = self.Disabled.copy()
old_disabled_paths = list(self.DisabledPaths)
# Read the file
disabled_file = await self.Libraries[0].read('/.disabled.yaml')

if disabled_file is None:
self.Disabled = {}
self.DisabledPaths = []
else:
try:
disabled_data = yaml.load(disabled_file, Loader=yaml.CSafeLoader)
except Exception:
L.exception("Failed to parse '/.disabled.yaml'")
self.Disabled = {}
self.DisabledPaths = []
return
disabled_file = None
try:
# Read the file
disabled_file = await self.Libraries[0].read('/.disabled.yaml')
except Exception as e:
if getattr(self.Libraries[0], "IsReady", False):
L.warning("Failed to read '/.disabled.yaml': {}.".format(e))
return

if disabled_data is None:
try:
if disabled_file is None:
self.Disabled = {}
self.DisabledPaths = []
return

if isinstance(disabled_data, set):
# Backward compatibility (August 2023)
self.Disabled = {key: '*' for key in disabled_data}
self.DisabledPaths = []
else:
self.Disabled = {}
self.DisabledPaths = []
for k, v in disabled_data.items():
if k.endswith('/'):
self.DisabledPaths.append((k, v))
else:
self.Disabled[k] = v
try:
disabled_data = yaml.load(disabled_file, Loader=yaml.CSafeLoader)
except Exception:
L.exception("Failed to parse '/.disabled.yaml'")
self.Disabled = {}
self.DisabledPaths = []
return

if disabled_data is None:
self.Disabled = {}
self.DisabledPaths = []
return

if isinstance(disabled_data, set):
# Backward compatibility (August 2023)
self.Disabled = {key: '*' for key in disabled_data}
self.DisabledPaths = []
else:
self.Disabled = {}
self.DisabledPaths = []
for k, v in disabled_data.items():
if k.endswith('/'):
self.DisabledPaths.append((k, v))
else:
self.Disabled[k] = v
finally:
if hasattr(disabled_file, "close"):
try:
disabled_file.close()
except Exception:
pass

self.DisabledPaths.sort(key=lambda x: len(x[0]))

Expand Down
Loading