From 120de2239459df0863fcb5cc20c2eddc907788e7 Mon Sep 17 00:00:00 2001 From: Mithun Shivashankar Date: Thu, 9 Apr 2026 15:47:53 +0200 Subject: [PATCH 1/3] handle transient reconnects without clearing state --- asab/library/providers/zookeeper.py | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/asab/library/providers/zookeeper.py b/asab/library/providers/zookeeper.py index 1ffca7264..595db4862 100644 --- a/asab/library/providers/zookeeper.py +++ b/asab/library/providers/zookeeper.py @@ -233,10 +233,14 @@ async def _on_zk_lost(self, event_name, zkcontainer): async def _get_version_counter(self, event_name=None): - if self.Zookeeper is None: + if self.Zookeeper is None or not self.IsReady: + return + + try: + version = await self.Zookeeper.get_data(self.VersionNodePath) + except kazoo.exceptions.ConnectionClosedError: return - version = await self.Zookeeper.get_data(self.VersionNodePath) self._check_version_counter(version) @@ -512,6 +516,9 @@ def recursive_traversal(path, digest): return digest.digest() async def _on_library_changed(self, event_name=None): + if self.Zookeeper is None or not self.IsReady: + return + for (target, path) in list(self.Subscriptions): async def do_check_path(actual_path): @@ -526,13 +533,21 @@ async def do_check_path(actual_path): if target in {None, "global"}: try: await do_check_path(actual_path=path) + except kazoo.exceptions.ConnectionClosedError: + return except Exception as e: L.exception("Failed to process library changes: '{}'".format(e), struct_data={"path": path}) elif target == "tenant": - for tenant in await self._get_tenants(): + try: + tenants = await self._get_tenants() + except kazoo.exceptions.ConnectionClosedError: + return + for tenant in tenants: try: await do_check_path(actual_path="/.tenants/{}{}".format(tenant, path)) + except kazoo.exceptions.ConnectionClosedError: + return except Exception as e: L.exception("Failed to process library changes: '{}'".format(e), struct_data={"path": path, "tenant": tenant}) @@ -540,6 +555,8 @@ async def do_check_path(actual_path): tenant = target[1] try: await do_check_path(actual_path="/.tenants/{}{}".format(tenant, path)) + except kazoo.exceptions.ConnectionClosedError: + return except Exception as e: L.exception("Failed to process library changes: '{}'".format(e), struct_data={"path": path, "tenant": tenant}) @@ -556,6 +573,8 @@ async def do_check_path(actual_path): if tenant_id and cred_id: try: await do_check_path(actual_path="/.personal/{}/{}{}".format(tenant_id, cred_id, path)) + except kazoo.exceptions.ConnectionClosedError: + return except Exception as e: L.exception( "Failed to process library changes: '{}'".format(e), From 02322f2a3318ca7d6ff5d424ade3d320be162871 Mon Sep 17 00:00:00 2001 From: Mithun Shivashankar Date: Thu, 9 Apr 2026 15:48:26 +0200 Subject: [PATCH 2/3] silence reconnect-time --- asab/library/service.py | 89 +++++++++++++++++++++++++---------------- 1 file changed, 55 insertions(+), 34 deletions(-) diff --git a/asab/library/service.py b/asab/library/service.py index b718cb8c6..f7e94e11c 100644 --- a/asab/library/service.py +++ b/asab/library/service.py @@ -117,6 +117,9 @@ async def finalize(self, app): await lib.finalize(self.App) async def _on_tick60(self, message_type): + if len(self.Libraries) == 0 or not self.Libraries[0].IsReady: + return + await self._read_disabled() await self._read_favorites() @@ -434,18 +437,21 @@ async def _read_favorites(self): - self.Favorites: { '/path/file.ext': ['tenant', '*', ...] } - self.FavoritePaths: [ ('/path/folder/', ['tenant', '*', ...]), ... ] Expected YAML shape: - /path: - tenants: - - system + /path: + tenants: + - system """ + if len(self.Libraries) == 0: + return + fav_data = None + fav_file = None try: fav_file = await self.Libraries[0].read('/.favorites.yaml') except Exception as e: - L.warning("Failed to read '/.favorites.yaml': {}.".format(e)) - self.Favorites = {} - self.FavoritePaths = [] + if getattr(self.Libraries[0], "IsReady", False): + L.warning("Failed to read '/.favorites.yaml': {}.".format(e)) return if fav_file is None: @@ -515,41 +521,56 @@ async def _read_favorites(self): self.FavoritePaths = folders async def _read_disabled(self, publish_changes=False): + if len(self.Libraries) == 0: + return old_disabled = self.Disabled.copy() old_disabled_paths = list(self.DisabledPaths) - # Read the file - disabled_file = await self.Libraries[0].read('/.disabled.yaml') - - if disabled_file is None: - self.Disabled = {} - self.DisabledPaths = [] - else: - try: - disabled_data = yaml.load(disabled_file, Loader=yaml.CSafeLoader) - except Exception: - L.exception("Failed to parse '/.disabled.yaml'") - self.Disabled = {} - self.DisabledPaths = [] - return + disabled_file = None + try: + # Read the file + disabled_file = await self.Libraries[0].read('/.disabled.yaml') + except Exception as e: + if getattr(self.Libraries[0], "IsReady", False): + L.warning("Failed to read '/.disabled.yaml': {}.".format(e)) + return - if disabled_data is None: + try: + if disabled_file is None: self.Disabled = {} self.DisabledPaths = [] - return - - if isinstance(disabled_data, set): - # Backward compatibility (August 2023) - self.Disabled = {key: '*' for key in disabled_data} - self.DisabledPaths = [] else: - self.Disabled = {} - self.DisabledPaths = [] - for k, v in disabled_data.items(): - if k.endswith('/'): - self.DisabledPaths.append((k, v)) - else: - self.Disabled[k] = v + try: + disabled_data = yaml.load(disabled_file, Loader=yaml.CSafeLoader) + except Exception: + L.exception("Failed to parse '/.disabled.yaml'") + self.Disabled = {} + self.DisabledPaths = [] + return + + if disabled_data is None: + self.Disabled = {} + self.DisabledPaths = [] + return + + if isinstance(disabled_data, set): + # Backward compatibility (August 2023) + self.Disabled = {key: '*' for key in disabled_data} + self.DisabledPaths = [] + else: + self.Disabled = {} + self.DisabledPaths = [] + for k, v in disabled_data.items(): + if k.endswith('/'): + self.DisabledPaths.append((k, v)) + else: + self.Disabled[k] = v + finally: + if hasattr(disabled_file, "close"): + try: + disabled_file.close() + except Exception: + pass self.DisabledPaths.sort(key=lambda x: len(x[0])) From 8c6d797731e9ba336bc66d7c8ac2818a10515ebe Mon Sep 17 00:00:00 2001 From: Mithun Shivashankar Date: Fri, 10 Apr 2026 07:29:26 +0200 Subject: [PATCH 3/3] Fix explicit personal ZooKeeper subscription targets --- asab/library/providers/zookeeper.py | 59 ++++++++++++++++------------- 1 file changed, 33 insertions(+), 26 deletions(-) diff --git a/asab/library/providers/zookeeper.py b/asab/library/providers/zookeeper.py index 595db4862..5ffc2da16 100644 --- a/asab/library/providers/zookeeper.py +++ b/asab/library/providers/zookeeper.py @@ -185,6 +185,7 @@ def __init__(self, library, path, layer): self.Subscriptions: typing.Iterable[str] = set() self.NodeDigests: typing.Dict[str, bytes] = {} + self.SubscriptionActualPaths: typing.Dict[typing.Tuple[typing.Union[str, tuple, None], str], typing.List[str]] = {} async def finalize(self, app): @@ -292,6 +293,16 @@ def _personal_node_path(self, path: str, tenant_id: typing.Optional[str], cred_i return None return "{}/.personal/{}/{}{}".format(self.BasePath, tenant_id, cred_id, path).rstrip("/") + def _subscription_personal_path(self, path: str, target: typing.Union[str, tuple, None]) -> typing.Optional[str]: + tenant_id = self._current_tenant_id() + if isinstance(target, tuple) and len(target) == 2 and target[0] == "personal": + cred_id = target[1] + else: + cred_id = self._current_credentials_id() + if not tenant_id or not cred_id: + return None + return "/.personal/{}/{}{}".format(tenant_id, cred_id, path) + async def read(self, path: str) -> typing.Optional[typing.IO]: if self.Zookeeper is None: L.warning("Zookeeper Client has not been established (yet). Cannot read {}".format(path)) @@ -463,34 +474,34 @@ def build_path(self, path, tenant_specific=False): async def subscribe(self, path, target: typing.Union[str, tuple, None] = None): self.Subscriptions.add((target, path)) + if not hasattr(self, "SubscriptionActualPaths"): + self.SubscriptionActualPaths = {} if target in {None, "global"}: + self.SubscriptionActualPaths[(target, path)] = [path] self.NodeDigests[path] = await self._get_directory_hash(path) elif target == "tenant": + actual_paths = [] for tenant in await self._get_tenants(): actual_path = "/.tenants/{}{}".format(tenant, path) + actual_paths.append(actual_path) self.NodeDigests[actual_path] = await self._get_directory_hash(actual_path) + self.SubscriptionActualPaths[(target, path)] = actual_paths elif isinstance(target, tuple) and len(target) == 2 and target[0] == "tenant": _, tenant = target actual_path = "/.tenants/{}{}".format(tenant, path) + self.SubscriptionActualPaths[(target, path)] = [actual_path] self.NodeDigests[actual_path] = await self._get_directory_hash(actual_path) - elif target == "personal": - # current tenant + current credentials - try: - tenant_id = Tenant.get() - except LookupError: - tenant_id = None - try: - authz = Authz.get() - cred_id = getattr(authz, "CredentialsId", None) - except LookupError: - cred_id = None - if tenant_id and cred_id: - actual_path = "/.personal/{}/{}{}".format(tenant_id, cred_id, path) + elif target == "personal" or (isinstance(target, tuple) and len(target) == 2 and target[0] == "personal"): + actual_path = self._subscription_personal_path(path, target) + if actual_path is not None: + self.SubscriptionActualPaths[(target, path)] = [actual_path] self.NodeDigests[actual_path] = await self._get_directory_hash(actual_path) + else: + self.SubscriptionActualPaths[(target, path)] = [] else: raise ValueError("Unexpected target: {!r}".format(target)) @@ -518,6 +529,7 @@ def recursive_traversal(path, digest): async def _on_library_changed(self, event_name=None): if self.Zookeeper is None or not self.IsReady: return + subscription_actual_paths = getattr(self, "SubscriptionActualPaths", {}) for (target, path) in list(self.Subscriptions): @@ -560,25 +572,20 @@ async def do_check_path(actual_path): except Exception as e: L.exception("Failed to process library changes: '{}'".format(e), struct_data={"path": path, "tenant": tenant}) - elif target == "personal": - try: - tenant_id = Tenant.get() - except LookupError: - tenant_id = None - try: - authz = Authz.get() - cred_id = getattr(authz, "CredentialsId", None) - except LookupError: - cred_id = None - if tenant_id and cred_id: + elif target == "personal" or (isinstance(target, tuple) and len(target) == 2 and target[0] == "personal"): + actual_paths = subscription_actual_paths.get((target, path)) + if actual_paths is None: + actual_path = self._subscription_personal_path(path, target) + actual_paths = [actual_path] if actual_path is not None else [] + for actual_path in actual_paths: try: - await do_check_path(actual_path="/.personal/{}/{}{}".format(tenant_id, cred_id, path)) + await do_check_path(actual_path=actual_path) except kazoo.exceptions.ConnectionClosedError: return except Exception as e: L.exception( "Failed to process library changes: '{}'".format(e), - struct_data={"path": path, "tenant": tenant_id, "credentials_id": cred_id}, + struct_data={"path": path}, ) else: raise ValueError("Unexpected target: {!r}".format((target, path)))