-
Notifications
You must be signed in to change notification settings - Fork 7
Introduce cache provider #704
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
6578a93
11ccb21
2cf5951
ea17922
a90c667
95ede91
60635d5
5fafca1
d473f70
f02030a
3f2af1c
302872b
bc368ce
5554df3
1e1eade
66f5e53
2e7d311
cd662e8
86d5726
85fa0a8
903a039
bb5f1d9
5012fbc
fd63bd0
6054391
1623c02
64daa54
3617df9
40269eb
f3e0a90
97b0143
4d8ea2f
3ea77bf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,96 @@ | ||
| import os | ||
| import logging | ||
| import hashlib | ||
|
|
||
| from asab.config import Config | ||
| from asab.library.providers.filesystem import FileSystemLibraryProvider | ||
|
|
||
| L = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class CacheLibraryProvider(FileSystemLibraryProvider): | ||
| """ | ||
| A read-only cache wrapper that points at | ||
| [library:cache].dir/@global/<layer_hash>. | ||
|
|
||
| Any call to read()/list()/find()/subscribe() will serve from cache if present. | ||
| When cache is missing, provider methods return fallback-safe empty values. | ||
| """ | ||
|
|
||
| def __init__(self, library, uri, layer): | ||
| # 1) Compute the exact same layer_hash your LibraryCacheService wrote | ||
| master_hash = hashlib.sha256(uri.encode("utf-8")).hexdigest() | ||
|
|
||
| # 2) Locate the symlink under @global | ||
| cache_root = Config.get("library:cache", "dir", fallback=None) | ||
| if not cache_root: | ||
| raise RuntimeError("Missing [library:cache].dir configuration") | ||
| global_link = os.path.join(cache_root, "@global", master_hash) | ||
|
|
||
| # 3) Remember for _cache_live() | ||
| self.layer_hash = master_hash | ||
| self.cache_dir = global_link | ||
|
|
||
| # 4) Sanity-check cache_root exists | ||
| if not os.path.isdir(cache_root): | ||
| L.critical("Cache root '{}' not found, exiting.".format(cache_root)) | ||
| raise SystemExit(1) | ||
|
|
||
| # 5) Warn if no snapshot directory is present yet | ||
| if not self._cache_live(): | ||
| L.warning( | ||
| "No cache snapshot for URI '{}' at '{}'.".format(uri, self.cache_dir) | ||
| ) | ||
|
|
||
| # 6) Delegate to FileSystemLibraryProvider | ||
| cache_uri = "file://{}".format(self.cache_dir.rstrip("/")) | ||
| super().__init__(library, cache_uri, layer, set_ready=False) | ||
| library.App.TaskService.schedule(self._set_ready(self._cache_live())) | ||
| library.App.PubSub.subscribe("library.cache.ready!", self._on_cache_ready) | ||
| library.App.PubSub.subscribe("library.cache.not_ready!", self._on_cache_not_ready) | ||
|
|
||
|
|
||
| async def _on_cache_ready(self, *args): | ||
| live = self._cache_live() | ||
| if live and not self.IsReady: | ||
| await self._set_ready(True) | ||
|
|
||
| async def _on_cache_not_ready(self, *args): | ||
| live = self._cache_live() | ||
| if (not live) and self.IsReady: | ||
| await self._set_ready(False) | ||
|
|
||
| def _cache_live(self): | ||
| if os.path.islink(self.cache_dir): | ||
| return os.path.isdir(os.path.realpath(self.cache_dir)) | ||
| return os.path.isdir(self.cache_dir) | ||
|
|
||
| async def read(self, path): | ||
| if not self._cache_live(): | ||
| return None | ||
|
|
||
| itemio = await super().read(path) | ||
| L.warning( | ||
| "Cache read %s", | ||
| "hit" if itemio is not None else "miss", | ||
| struct_data={ | ||
| "path": path, | ||
| "base": self.cache_dir, | ||
| }, | ||
| ) | ||
| return itemio | ||
|
|
||
| async def list(self, path): | ||
| if not self._cache_live(): | ||
| return [] | ||
| return await super().list(path) | ||
|
|
||
| async def find(self, path): | ||
| if not self._cache_live(): | ||
| return [] | ||
| return await super().find(path) | ||
|
|
||
| async def subscribe(self, path, target=None): | ||
| if not self._cache_live(): | ||
| return None | ||
| return await super().subscribe(path, target) | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -120,37 +120,56 @@ async def _on_tick60(self, message_type): | |
| await self._read_disabled() | ||
| await self._read_favorites() | ||
|
|
||
|
|
||
| def _create_library(self, path, layer): | ||
| library_provider = None | ||
|
|
||
| if path.startswith('libsreg+'): | ||
| from .providers.libsreg import LibsRegLibraryProvider | ||
| realp = LibsRegLibraryProvider(self, path, layer) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like having two layers here:
We agreed with Mithun to start with a simple check in the init time: There can be scenarios that break that:
Let's start with something, make sure it survives an upgrade procedure and only after then solve these scenarios/conditions?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. During the upgrade,
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess the right approach is to use init only for initial state, then keep liveness checks and readiness updates at runtime, so we can fall back to libsreg when cache is unavailable and automatically use cache again when it becomes live. |
||
| if 'library:cache' in Config: | ||
| from .providers.cache import CacheLibraryProvider | ||
| cachep = CacheLibraryProvider(self, path, layer) | ||
| self.Libraries.append(cachep) | ||
| else: | ||
| self.Libraries.append(realp) | ||
| return | ||
|
|
||
| elif path.startswith('git+'): | ||
| from .providers.git import GitLibraryProvider | ||
| realp = GitLibraryProvider(self, path, layer) | ||
| if 'library:cache' in Config: | ||
| from .providers.cache import CacheLibraryProvider | ||
| cachep = CacheLibraryProvider(self, path, layer) | ||
| self.Libraries.append(cachep) | ||
| else: | ||
| self.Libraries.append(realp) | ||
| return | ||
|
|
||
| # ZooKeeper (no cache support) | ||
| if path.startswith('zk://') or path.startswith('zookeeper://'): | ||
| from .providers.zookeeper import ZooKeeperLibraryProvider | ||
| library_provider = ZooKeeperLibraryProvider(self, path, layer) | ||
| provider = ZooKeeperLibraryProvider(self, path, layer) | ||
| self.Libraries.append(provider) | ||
|
|
||
| # Filesystem (no cache support) | ||
| elif path.startswith('./') or path.startswith('/') or path.startswith('file://'): | ||
| from .providers.filesystem import FileSystemLibraryProvider | ||
| library_provider = FileSystemLibraryProvider(self, path, layer) | ||
| provider = FileSystemLibraryProvider(self, path, layer) | ||
| self.Libraries.append(provider) | ||
|
|
||
| # Azure Storage | ||
| elif path.startswith('azure+https://'): | ||
| from .providers.azurestorage import AzureStorageLibraryProvider | ||
| library_provider = AzureStorageLibraryProvider(self, path, layer) | ||
|
|
||
| elif path.startswith('git+'): | ||
| from .providers.git import GitLibraryProvider | ||
| library_provider = GitLibraryProvider(self, path, layer) | ||
| provider = AzureStorageLibraryProvider(self, path, layer) | ||
| self.Libraries.append(provider) | ||
|
|
||
| elif path.startswith('libsreg+'): | ||
| from .providers.libsreg import LibsRegLibraryProvider | ||
| library_provider = LibsRegLibraryProvider(self, path, layer) | ||
|
|
||
| elif path == '' or path.startswith("#") or path.startswith(";"): | ||
| # This is empty or commented line | ||
| # comments or blanks | ||
| elif not path or path[0] in ('#', ';'): | ||
| return | ||
|
|
||
| else: | ||
| L.error("Incorrect/unknown provider for '{}'".format(path)) | ||
| raise SystemExit("Exit due to a critical configuration error.") | ||
|
|
||
| self.Libraries.append(library_provider) | ||
| L.error("Incorrect provider for '{}'".format(path)) | ||
| raise SystemExit(1) | ||
|
|
||
| def is_ready(self) -> bool: | ||
| """ | ||
|
|
@@ -372,6 +391,7 @@ async def list(self, path: str = "/", recursive: bool = False, timeout: int = No | |
|
|
||
| return items | ||
|
|
||
|
|
||
| async def _list(self, path, providers): | ||
| """ | ||
| Lists items from all providers, merging items with the same name, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.