-
Notifications
You must be signed in to change notification settings - Fork 91
Extract shared OnePasswordClient into fides.common.onepassword #7698
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| """1Password client utilities for retrieving secrets and items.""" | ||
|
|
||
| from fides.common.onepassword.client import OnePasswordClient | ||
|
|
||
| __all__ = ["OnePasswordClient"] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,199 @@ | ||
| """ | ||
| Instance-based 1Password client for retrieving secrets and items. | ||
|
|
||
| Provides lazy client initialization and vault item lookup by title. | ||
| Each instance is parameterized by its own service account token and vault ID, | ||
| so multiple consumers (e.g. SaaS test secrets, seed profile resolution) | ||
| can coexist with different credentials. | ||
| """ | ||
|
|
||
| import asyncio | ||
| import json | ||
| from typing import Any, Dict, List, Optional | ||
|
|
||
| from loguru import logger | ||
| from onepassword.client import Client # type: ignore[import-untyped] | ||
|
|
||
|
|
||
| class OnePasswordClient: | ||
| """1Password SDK client with lazy initialization and title-based lookup.""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| service_account_token: str, | ||
| vault_id: str, | ||
| integration_name: str = "Fides", | ||
| integration_version: str = "v1.0.0", | ||
| ): | ||
| self._token = service_account_token | ||
| self._vault_id = vault_id | ||
| self._integration_name = integration_name | ||
| self._integration_version = integration_version | ||
| self._client: Optional[Client] = None | ||
| self._title_to_id: Dict[str, Dict[str, str]] = {} | ||
| self._mappings_initialized = False | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # Internal helpers | ||
| # ------------------------------------------------------------------ | ||
|
|
||
| async def _get_client(self) -> Client: | ||
| """Lazily authenticate and return the 1Password SDK client.""" | ||
| if self._client is None: | ||
| self._client = await Client.authenticate( | ||
| auth=self._token, | ||
| integration_name=self._integration_name, | ||
| integration_version=self._integration_version, | ||
| ) | ||
| return self._client | ||
|
Comment on lines
+40
to
+48
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Critical: Race condition under concurrent async usage Both if self._client is None:
self._client = await Client.authenticate(...) # yields to event loop hereIf two coroutines call Fix with an def __init__(self, ...):
...
self._client_lock = asyncio.Lock()
self._mappings_lock = asyncio.Lock()
async def _get_client(self) -> Client:
if self._client is None:
async with self._client_lock:
if self._client is None: # double-checked locking
self._client = await Client.authenticate(...)
return self._clientThe same pattern applies to |
||
|
|
||
| async def _ensure_mappings(self) -> None: | ||
| """Build the title→ID index for items in the configured vault.""" | ||
| if self._mappings_initialized: | ||
| return | ||
|
|
||
| client = await self._get_client() | ||
| items = await client.items.list(vault_id=self._vault_id) | ||
|
|
||
| for item in items: | ||
| logger.debug(f"1PW: indexed item '{item.title}'") | ||
| self._title_to_id[item.title] = { | ||
| "item_id": item.id, | ||
| "category": item.category, | ||
| } | ||
|
Comment on lines
+58
to
+63
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggestion: Duplicate vault item titles silently drop one If the vault contains two items with the same title, the second silently overwrites the first in Consider adding a warning: if item.title in self._title_to_id:
logger.warning(f"1PW: duplicate item title '{item.title}' in vault {self._vault_id}; overwriting")
self._title_to_id[item.title] = {"item_id": item.id, "category": item.category} |
||
|
|
||
| self._mappings_initialized = True | ||
| logger.info( | ||
| f"1PW: initialized mappings for {len(self._title_to_id)} items " | ||
| f"in vault {self._vault_id}" | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def _item_to_dict(item: Any) -> Dict[str, Any]: | ||
| """Convert a 1Password SDK item object to a plain dict.""" | ||
| item_dict: Dict[str, Any] = { | ||
| "id": item.id, | ||
| "title": item.title, | ||
| "category": item.category, | ||
| "fields": [], | ||
| } | ||
|
|
||
| if hasattr(item, "fields") and item.fields: | ||
| for field in item.fields: | ||
| item_dict["fields"].append( | ||
| { | ||
| "id": getattr(field, "id", ""), | ||
| "title": getattr(field, "title", ""), | ||
| "type": getattr(field, "field_type", ""), | ||
| "value": getattr(field, "value", ""), | ||
| "purpose": getattr(field, "purpose", ""), | ||
| } | ||
| ) | ||
|
|
||
| return item_dict | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # Public API — async | ||
| # ------------------------------------------------------------------ | ||
|
|
||
| async def get_item(self, title: str) -> Optional[Dict[str, Any]]: | ||
| """ | ||
| Retrieve a 1Password item by its title. | ||
|
|
||
| Returns a dict with keys: id, title, category, fields (list of | ||
| dicts with id, title, type, value, purpose). Returns None if | ||
| the item is not found. | ||
| """ | ||
| await self._ensure_mappings() | ||
|
|
||
| if title not in self._title_to_id: | ||
| logger.warning(f"1PW: item '{title}' not found in vault {self._vault_id}") | ||
| return None | ||
|
|
||
| info = self._title_to_id[title] | ||
| client = await self._get_client() | ||
| item = await client.items.get( | ||
| item_id=info["item_id"], | ||
| vault_id=self._vault_id, | ||
| ) | ||
|
|
||
| item_dict = self._item_to_dict(item) | ||
| logger.info( | ||
| f"1PW: retrieved item '{title}' with {len(item_dict['fields'])} fields" | ||
| ) | ||
| return item_dict | ||
|
|
||
| async def get_secrets(self, title: str) -> Dict[str, str]: | ||
| """ | ||
| Get field name → value pairs from a 1Password item. | ||
|
|
||
| Filters out fields with empty titles or values. | ||
| """ | ||
| item = await self.get_item(title) | ||
| if not item: | ||
| return {} | ||
|
|
||
| return { | ||
| field["title"]: field["value"] | ||
| for field in item.get("fields", []) | ||
| if field.get("title") and field.get("value") | ||
| } | ||
|
|
||
| async def get_item_notes(self, title: str) -> Optional[str]: | ||
| """ | ||
| Get the notes (notesPlain) content from a 1Password item. | ||
|
|
||
| Returns the string content of the NOTES field, or None if the | ||
| item is not found or has no notes. | ||
| """ | ||
| item = await self.get_item(title) | ||
| if not item: | ||
| return None | ||
|
|
||
| for field in item.get("fields", []): | ||
|
Comment on lines
+150
to
+153
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice to have: Dual-condition lookup deserves a comment The # Check "purpose" first (canonical); fall back to the well-known field ID
# used by 1Password for plain-text notes items.
if field.get("purpose") == "NOTES" or field.get("id") == "notesPlain": |
||
| if field.get("purpose") == "NOTES" or field.get("id") == "notesPlain": | ||
| return field.get("value") | ||
|
|
||
| return None | ||
|
|
||
| async def get_item_notes_json(self, title: str) -> Optional[Dict[str, Any]]: | ||
| """ | ||
| Get the notes content from a 1Password item, parsed as JSON. | ||
|
|
||
| Returns the parsed dict, or None if the item is not found, | ||
| has no notes, or the notes are not valid JSON. | ||
| """ | ||
| notes = await self.get_item_notes(title) | ||
| if notes is None: | ||
| return None | ||
|
|
||
| try: | ||
| return json.loads(notes) | ||
| except json.JSONDecodeError: | ||
| logger.error(f"1PW: notes for item '{title}' are not valid JSON") | ||
| return None | ||
|
|
||
| async def list_items(self) -> List[str]: | ||
| """List all item titles in the configured vault.""" | ||
| await self._ensure_mappings() | ||
| return list(self._title_to_id.keys()) | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # Public API — sync wrappers | ||
| # ------------------------------------------------------------------ | ||
|
|
||
| def get_secrets_sync(self, title: str) -> Dict[str, str]: | ||
| """Synchronous wrapper around :meth:`get_secrets`.""" | ||
| loop = asyncio.new_event_loop() | ||
| try: | ||
| return loop.run_until_complete(self.get_secrets(title)) | ||
| finally: | ||
|
Comment on lines
+184
to
+190
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggestion: Reusing SDK client across event loops Each call to Whether this works depends on the SDK's internals. If it uses Two options:
|
||
| loop.close() | ||
|
|
||
| def get_item_notes_json_sync(self, title: str) -> Optional[Dict[str, Any]]: | ||
| """Synchronous wrapper around :meth:`get_item_notes_json`.""" | ||
| loop = asyncio.new_event_loop() | ||
| try: | ||
| return loop.run_until_complete(self.get_item_notes_json(title)) | ||
| finally: | ||
| loop.close() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice to have: No unit tests for the new class
OnePasswordClientis a new production module infides.commonbut there are no corresponding tests. The 1Password SDKClientis straightforward to mock — a few tests coveringget_item(found / not found),get_secrets(filters empty fields),get_item_notes(purpose vs. id fallback), andget_item_notes_json(valid JSON / invalid JSON) would give good coverage without needing a real vault.