diff --git a/asab/library/providers/cache.py b/asab/library/providers/cache.py new file mode 100644 index 000000000..b0603236f --- /dev/null +++ b/asab/library/providers/cache.py @@ -0,0 +1,96 @@ +import os +import logging +import hashlib + +from asab.config import Config +from asab.library.providers.filesystem import FileSystemLibraryProvider + +L = logging.getLogger(__name__) + + +class CacheLibraryProvider(FileSystemLibraryProvider): + """ + A read-only cache wrapper that points at + [library:cache].dir/@global/. + + Any call to read()/list()/find()/subscribe() will serve from cache if present. + When cache is missing, provider methods return fallback-safe empty values. + """ + + def __init__(self, library, uri, layer): + # 1) Compute the exact same layer_hash your LibraryCacheService wrote + master_hash = hashlib.sha256(uri.encode("utf-8")).hexdigest() + + # 2) Locate the symlink under @global + cache_root = Config.get("library:cache", "dir", fallback=None) + if not cache_root: + raise RuntimeError("Missing [library:cache].dir configuration") + global_link = os.path.join(cache_root, "@global", master_hash) + + # 3) Remember for _cache_live() + self.layer_hash = master_hash + self.cache_dir = global_link + + # 4) Sanity-check cache_root exists + if not os.path.isdir(cache_root): + L.critical("Cache root '{}' not found, exiting.".format(cache_root)) + raise SystemExit(1) + + # 5) Warn if no snapshot directory is present yet + if not self._cache_live(): + L.warning( + "No cache snapshot for URI '{}' at '{}'.".format(uri, self.cache_dir) + ) + + # 6) Delegate to FileSystemLibraryProvider + cache_uri = "file://{}".format(self.cache_dir.rstrip("/")) + super().__init__(library, cache_uri, layer, set_ready=False) + library.App.TaskService.schedule(self._set_ready(self._cache_live())) + library.App.PubSub.subscribe("library.cache.ready!", self._on_cache_ready) + library.App.PubSub.subscribe("library.cache.not_ready!", self._on_cache_not_ready) + + + async def _on_cache_ready(self, *args): + live = self._cache_live() + if live and not self.IsReady: + await self._set_ready(True) + + async def _on_cache_not_ready(self, *args): + live = self._cache_live() + if (not live) and self.IsReady: + await self._set_ready(False) + + def _cache_live(self): + if os.path.islink(self.cache_dir): + return os.path.isdir(os.path.realpath(self.cache_dir)) + return os.path.isdir(self.cache_dir) + + async def read(self, path): + if not self._cache_live(): + return None + + itemio = await super().read(path) + L.warning( + "Cache read %s", + "hit" if itemio is not None else "miss", + struct_data={ + "path": path, + "base": self.cache_dir, + }, + ) + return itemio + + async def list(self, path): + if not self._cache_live(): + return [] + return await super().list(path) + + async def find(self, path): + if not self._cache_live(): + return [] + return await super().find(path) + + async def subscribe(self, path, target=None): + if not self._cache_live(): + return None + return await super().subscribe(path, target) diff --git a/asab/library/providers/filesystem.py b/asab/library/providers/filesystem.py index b2d97e10b..86237bd34 100644 --- a/asab/library/providers/filesystem.py +++ b/asab/library/providers/filesystem.py @@ -1,5 +1,6 @@ import io import os +import ctypes import os.path import stat import glob @@ -68,8 +69,11 @@ def __init__(self, library, path, layer, *, set_ready=True): if inotify_init is not None: init = inotify_init() if init == -1: + err = ctypes.get_errno() L.warning( - "Subscribing to library changes in filesystem provider is not available. Inotify was not initialized.") + "Subscribing to library changes in filesystem provider is not available. Inotify was not initialized.", + struct_data={'errno': err}, + ) self.FD = None else: self.FD = init diff --git a/asab/library/service.py b/asab/library/service.py index b718cb8c6..f305495c4 100644 --- a/asab/library/service.py +++ b/asab/library/service.py @@ -120,37 +120,56 @@ async def _on_tick60(self, message_type): await self._read_disabled() await self._read_favorites() + def _create_library(self, path, layer): - library_provider = None + + if path.startswith('libsreg+'): + from .providers.libsreg import LibsRegLibraryProvider + realp = LibsRegLibraryProvider(self, path, layer) + if 'library:cache' in Config: + from .providers.cache import CacheLibraryProvider + cachep = CacheLibraryProvider(self, path, layer) + self.Libraries.append(cachep) + else: + self.Libraries.append(realp) + return + + elif path.startswith('git+'): + from .providers.git import GitLibraryProvider + realp = GitLibraryProvider(self, path, layer) + if 'library:cache' in Config: + from .providers.cache import CacheLibraryProvider + cachep = CacheLibraryProvider(self, path, layer) + self.Libraries.append(cachep) + else: + self.Libraries.append(realp) + return + + # ZooKeeper (no cache support) if path.startswith('zk://') or path.startswith('zookeeper://'): from .providers.zookeeper import ZooKeeperLibraryProvider - library_provider = ZooKeeperLibraryProvider(self, path, layer) + provider = ZooKeeperLibraryProvider(self, path, layer) + self.Libraries.append(provider) + # Filesystem (no cache support) elif path.startswith('./') or path.startswith('/') or path.startswith('file://'): from .providers.filesystem import FileSystemLibraryProvider - library_provider = FileSystemLibraryProvider(self, path, layer) + provider = FileSystemLibraryProvider(self, path, layer) + self.Libraries.append(provider) + # Azure Storage elif path.startswith('azure+https://'): from .providers.azurestorage import AzureStorageLibraryProvider - library_provider = AzureStorageLibraryProvider(self, path, layer) - - elif path.startswith('git+'): - from .providers.git import GitLibraryProvider - library_provider = GitLibraryProvider(self, path, layer) + provider = AzureStorageLibraryProvider(self, path, layer) + self.Libraries.append(provider) - elif path.startswith('libsreg+'): - from .providers.libsreg import LibsRegLibraryProvider - library_provider = LibsRegLibraryProvider(self, path, layer) - - elif path == '' or path.startswith("#") or path.startswith(";"): - # This is empty or commented line + # comments or blanks + elif not path or path[0] in ('#', ';'): return else: - L.error("Incorrect/unknown provider for '{}'".format(path)) - raise SystemExit("Exit due to a critical configuration error.") - - self.Libraries.append(library_provider) + L.error("Incorrect provider for '{}'".format(path)) + raise SystemExit(1) def is_ready(self) -> bool: """ @@ -372,6 +391,7 @@ async def list(self, path: str = "/", recursive: bool = False, timeout: int = No return items + async def _list(self, path, providers): """ Lists items from all providers, merging items with the same name,