|
16 | 16 | - Integrity verification: Anyone can verify the chain |
17 | 17 |
|
18 | 18 | The chain is the memory of the system made immutable. |
| 19 | +
|
| 20 | +Concurrency: |
| 21 | + seal_and_store() uses optimistic retry — if a concurrent writer claims |
| 22 | + the same sequence number, the UNIQUE constraint rejects the duplicate |
| 23 | + and the method retries with the updated chain head. |
19 | 24 | """ |
20 | 25 |
|
21 | 26 | from __future__ import annotations |
22 | 27 |
|
| 28 | +import logging |
23 | 29 | from dataclasses import dataclass |
24 | 30 | from typing import TYPE_CHECKING |
25 | 31 |
|
| 32 | +from qp_capsule.exceptions import ChainConflictError |
26 | 33 | from qp_capsule.seal import compute_hash |
27 | 34 |
|
28 | 35 | if TYPE_CHECKING: |
29 | 36 | from qp_capsule.capsule import Capsule |
30 | 37 | from qp_capsule.protocol import CapsuleStorageProtocol |
31 | 38 | from qp_capsule.seal import Seal |
32 | 39 |
|
| 40 | +_log = logging.getLogger(__name__) |
| 41 | + |
| 42 | +_MAX_CHAIN_RETRIES = 3 |
| 43 | + |
33 | 44 |
|
34 | 45 | @dataclass |
35 | 46 | class ChainVerificationResult: |
@@ -238,10 +249,57 @@ async def seal_and_store( |
238 | 249 | seal: Seal | None = None, |
239 | 250 | tenant_id: str | None = None, |
240 | 251 | ) -> Capsule: |
241 | | - """Chain, seal, and store a capsule in one call.""" |
| 252 | + """ |
| 253 | + Chain, seal, and store a Capsule in one call. |
| 254 | +
|
| 255 | + Uses optimistic retry to handle concurrent writes: if another writer |
| 256 | + claims the same sequence number, the UNIQUE constraint on the storage |
| 257 | + backend rejects the duplicate and this method retries with the updated |
| 258 | + chain head. Retries up to ``_MAX_CHAIN_RETRIES`` times. |
| 259 | +
|
| 260 | + Raises: |
| 261 | + ChainConflictError: If all retries are exhausted. |
| 262 | + """ |
242 | 263 | from qp_capsule.seal import Seal as SealCls |
243 | 264 |
|
244 | | - capsule = await self.add(capsule, tenant_id=tenant_id) |
245 | 265 | seal_instance = seal or SealCls() |
246 | | - capsule = seal_instance.seal(capsule) |
247 | | - return await self.storage.store(capsule, tenant_id=tenant_id) |
| 266 | + |
| 267 | + for attempt in range(_MAX_CHAIN_RETRIES): |
| 268 | + capsule = await self.add(capsule, tenant_id=tenant_id) |
| 269 | + capsule = seal_instance.seal(capsule) |
| 270 | + |
| 271 | + try: |
| 272 | + return await self.storage.store(capsule, tenant_id=tenant_id) |
| 273 | + except Exception as exc: |
| 274 | + if not _is_integrity_error(exc): |
| 275 | + raise |
| 276 | + |
| 277 | + _log.warning( |
| 278 | + "Chain sequence conflict (attempt %d/%d, seq=%d, tenant=%s), retrying", |
| 279 | + attempt + 1, |
| 280 | + _MAX_CHAIN_RETRIES, |
| 281 | + capsule.sequence, |
| 282 | + tenant_id, |
| 283 | + ) |
| 284 | + capsule.hash = "" |
| 285 | + capsule.signature = "" |
| 286 | + capsule.signature_pq = "" |
| 287 | + capsule.signed_at = None |
| 288 | + capsule.signed_by = "" |
| 289 | + |
| 290 | + raise ChainConflictError( |
| 291 | + f"Failed to add Capsule to chain after {_MAX_CHAIN_RETRIES} retries " |
| 292 | + f"(concurrent writes for tenant={tenant_id!r})" |
| 293 | + ) |
| 294 | + |
| 295 | + |
| 296 | +def _is_integrity_error(exc: BaseException) -> bool: |
| 297 | + """Check if an exception is a database integrity/unique-constraint violation.""" |
| 298 | + from qp_capsule.exceptions import StorageError |
| 299 | + |
| 300 | + name = type(exc).__name__ |
| 301 | + if name in ("IntegrityError", "UniqueViolationError"): |
| 302 | + return True |
| 303 | + if isinstance(exc, StorageError) and exc.__cause__ is not None: |
| 304 | + return _is_integrity_error(exc.__cause__) |
| 305 | + return False |
0 commit comments