Conversation
WalkthroughMongoDBUpsertor.execute now accepts an optional op_options parameter, builds and merges operation/collection options (including writeConcern), and forwards them to find_one_and_update via kwargs. Error handling for DuplicateKeyError was expanded to raise DuplicateError even when details are missing. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant C as Caller
participant U as MongoDBUpsertor.execute
participant Col as pymongo.Collection
participant DB as MongoDB Server
C->>U: execute(custom_data, event_type, op_options)
U->>U: Extract opts, session, writeConcern
alt writeConcern provided
U->>Col: with_options(write_concern)
Col-->>U: collection'
else no writeConcern
U->>Col: use original collection
Col-->>U: collection
end
U->>U: Build final_opts (upsert, return_document, session, ...)
U->>Col: find_one_and_update(filter, update, **final_opts)
Col->>DB: Command
DB-->>Col: Result or DuplicateKeyError
alt Success
Col-->>U: Updated/Upserted document
U-->>C: Return result
else DuplicateKeyError with details
U->>C: raise DuplicateError(keyValue, ObjId)
else DuplicateKeyError without details
U->>C: raise DuplicateError(str(e), ObjId)
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
asab/storage/mongodb.py (3)
338-341: Robust DuplicateKeyError handling (details may be None).hasattr(e, "details") is True even when details is None; calling .get() then raises. Extract safely and handle non-dict.
Apply:
- except pymongo.errors.DuplicateKeyError as e: - if hasattr(e, "details"): - raise DuplicateError("Duplicate key error: {}".format(e), self.ObjId, key_value=e.details.get("keyValue")) - else: - raise DuplicateError("Duplicate key error: {}".format(e), self.ObjId) + except pymongo.errors.DuplicateKeyError as e: + details = getattr(e, "details", None) + key_value = details.get("keyValue") if isinstance(details, dict) else None + raise DuplicateError(f"Duplicate key error: {e}", self.ObjId, key_value=key_value)
349-353: Use the same session for both operations.find_one_and_update uses session from final_opts, but update_one uses the context var session, which may differ or be None. Use the same session.
Apply:
await coll.update_one( {id_name: ret[id_name]}, {'$set': {'_c': ret['_m']}}, - session=session + session=final_opts.get("session") )
202-202: Await bulk_write in async method.This returns a coroutine instead of the result. Must await.
Apply:
- return coll.bulk_write(bulk) + return await coll.bulk_write(bulk)
🧹 Nitpick comments (1)
asab/storage/mongodb.py (1)
170-170: Deprecated count(): use count_documents or estimated_document_count.Motor/pymongo deprecate collection.count(). Prefer:
- exact: await coll.count_documents({})
- fast estimate: await coll.estimated_document_count()
Example:
- count = await coll.count() + count = await coll.estimated_document_count()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
asab/storage/mongodb.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
asab/storage/mongodb.py (3)
asab/storage/elasticsearch.py (1)
execute(625-630)asab/storage/inmemory.py (1)
execute(15-77)asab/storage/upsertor.py (1)
execute(121-133)
| async def execute( | ||
| self, | ||
| custom_data: typing.Optional[dict] = None, | ||
| event_type: typing.Optional[str] = None, | ||
| op_options: typing.Optional[str] = None | ||
| ): |
There was a problem hiding this comment.
🛠️ Refactor suggestion
🧩 Analysis chain
Fix type of op_options (dict-like, not str) and align API across storages.
The parameter is used as a mapping of operation options, but is annotated as Optional[str]. This is misleading and can cause runtime errors downstream. Also, adding a new parameter only to MongoDBUpsertor breaks cross-storage symmetry.
Apply:
-async def execute(
+async def execute(
self,
custom_data: typing.Optional[dict] = None,
event_type: typing.Optional[str] = None,
- op_options: typing.Optional[str] = None
+ op_options: typing.Optional[typing.Mapping[str, typing.Any]] = None
):Follow-ups:
- Consider updating UpsertorABC and other storage implementations (inmemory, elasticsearch) to accept the same parameter (even if unused) to keep a consistent public API. I can prepare that change if desired.
Run to locate all execute() signatures and potential call sites passing op_options:
🏁 Script executed:
#!/bin/bash
# List execute() method signatures in upsertors
rg -nP --type=python -C2 '(class\s+\w*Upsertor\b|async\s+def\s+execute\s*\()' asab/
# Find call sites that pass op_options
rg -nP --type=python -C2 'execute\s*\([^)]*op_options\s*='Length of output: 61
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Searching for Upsertor classes and execute() definitions ==="
rg -nP -C2 '(class\s+\w*Upsertor\b|async\s+def\s+execute\s*\()' asab/ || true
echo
echo "=== Searching for 'op_options' usages in execute call sites ==="
rg -nP -C2 'execute\s*\([^)]*op_options\s*=' || true
echo
echo "=== Showing asab/storage/mongodb.py around the execute() definition ==="
sed -n '1,220p' asab/storage/mongodb.py || true
echo "-----"
sed -n '220,400p' asab/storage/mongodb.py || trueLength of output: 12368
Fix op_options type and unify Upsertor API
op_options is used as a mapping (dict(op_options or {})) but is annotated as Optional[str] only in MongoDBUpsertor — change its type to a mapping and add the same optional param to the UpsertorABC and all Upsertor implementations to keep a consistent public API.
- asab/storage/mongodb.py: change execute signature to use a mapping (example diff below).
- asab/storage/upsertor.py (line ~121): add op_options: typing.Optional[typing.Mapping[str, typing.Any]] = None to the abstract execute signature.
- asab/storage/inmemory.py (line ~15) and asab/storage/elasticsearch.py (line ~625): add the same op_options parameter to their execute signatures (can be ignored if unused).
- Confirmed: no call sites found that pass op_options, so updating signatures is safe but implementations must be updated.
Apply (for mongodb):
-async def execute(
+async def execute(
self,
custom_data: typing.Optional[dict] = None,
event_type: typing.Optional[str] = None,
- op_options: typing.Optional[str] = None
+ op_options: typing.Optional[typing.Mapping[str, typing.Any]] = None
):📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def execute( | |
| self, | |
| custom_data: typing.Optional[dict] = None, | |
| event_type: typing.Optional[str] = None, | |
| op_options: typing.Optional[str] = None | |
| ): | |
| async def execute( | |
| self, | |
| custom_data: typing.Optional[dict] = None, | |
| event_type: typing.Optional[str] = None, | |
| op_options: typing.Optional[typing.Mapping[str, typing.Any]] = None | |
| ): |
🤖 Prompt for AI Agents
In asab/storage/mongodb.py around lines 273-278 change the execute signature so
op_options is typed as typing.Optional[typing.Mapping[str, typing.Any]] (it is
currently Optional[str]) because the code treats it as a mapping via
dict(op_options or {}); update the implementation to continue using
dict(op_options or {}) as before. Also update the abstract Upsertor.execute
signature in asab/storage/upsertor.py (around line ~121) to include op_options:
typing.Optional[typing.Mapping[str, typing.Any]] = None, and add the same
optional op_options parameter to the implementations in asab/storage/inmemory.py
(around line ~15) and asab/storage/elasticsearch.py (around line ~625) so the
public API is consistent; make sure to import typing.Mapping and typing.Any
where needed and keep behavior unchanged since no callers pass op_options.
| opts = dict(op_options or {}) # shallow copy so we can pop safely | ||
|
|
||
| # Must be applied on collection | ||
| wc = opts.pop("writeConcern", None) | ||
| if wc is not None: | ||
| wc = wc if isinstance(wc, pymongo.WriteConcern) else pymongo.WriteConcern(**wc) | ||
| coll = coll.with_options(write_concern=wc) | ||
|
|
||
| # Set usual defaults | ||
| defaults = { | ||
| "upsert": True if (self.Version == 0) or (self.Version is None) else False, | ||
| "return_document": pymongo.collection.ReturnDocument.AFTER, | ||
| "session": session | ||
| } | ||
|
|
||
| # Let custom opts override the defaults | ||
| final_opts = {**defaults, **(opts or {})} | ||
|
|
There was a problem hiding this comment.
Harden op_options handling and support write_concern alias.
- dict(op_options) will throw if op_options is a non-mapping (e.g., string). Guard the type.
- Consider accepting both "writeConcern" and "write_concern" keys.
Apply:
- opts = dict(op_options or {}) # shallow copy so we can pop safely
+ if op_options is None:
+ opts = {}
+ elif isinstance(op_options, dict):
+ opts = dict(op_options) # shallow copy
+ else:
+ raise TypeError("op_options must be a dict-like object of find_one_and_update options.")
# Must be applied on collection
- wc = opts.pop("writeConcern", None)
+ wc = opts.pop("writeConcern", None) or opts.pop("write_concern", None)
if wc is not None:
wc = wc if isinstance(wc, pymongo.WriteConcern) else pymongo.WriteConcern(**wc)
coll = coll.with_options(write_concern=wc)
# Set usual defaults
defaults = {
"upsert": True if (self.Version == 0) or (self.Version is None) else False,
"return_document": pymongo.collection.ReturnDocument.AFTER,
"session": session
}
# Let custom opts override the defaults
- final_opts = {**defaults, **(opts or {})}
+ final_opts = {**defaults, **opts}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| opts = dict(op_options or {}) # shallow copy so we can pop safely | |
| # Must be applied on collection | |
| wc = opts.pop("writeConcern", None) | |
| if wc is not None: | |
| wc = wc if isinstance(wc, pymongo.WriteConcern) else pymongo.WriteConcern(**wc) | |
| coll = coll.with_options(write_concern=wc) | |
| # Set usual defaults | |
| defaults = { | |
| "upsert": True if (self.Version == 0) or (self.Version is None) else False, | |
| "return_document": pymongo.collection.ReturnDocument.AFTER, | |
| "session": session | |
| } | |
| # Let custom opts override the defaults | |
| final_opts = {**defaults, **(opts or {})} | |
| if op_options is None: | |
| opts = {} | |
| elif isinstance(op_options, dict): | |
| opts = dict(op_options) # shallow copy | |
| else: | |
| raise TypeError("op_options must be a dict-like object of find_one_and_update options.") | |
| # Must be applied on collection | |
| wc = opts.pop("writeConcern", None) or opts.pop("write_concern", None) | |
| if wc is not None: | |
| wc = wc if isinstance(wc, pymongo.WriteConcern) else pymongo.WriteConcern(**wc) | |
| coll = coll.with_options(write_concern=wc) | |
| # Set usual defaults | |
| defaults = { | |
| "upsert": True if (self.Version == 0) or (self.Version is None) else False, | |
| "return_document": pymongo.collection.ReturnDocument.AFTER, | |
| "session": session | |
| } | |
| # Let custom opts override the defaults | |
| final_opts = {**defaults, **opts} |
🤖 Prompt for AI Agents
In asab/storage/mongodb.py around lines 312 to 329, the code currently does a
blind dict(op_options) which will raise if op_options is not a mapping and only
checks for "writeConcern"; update it to first guard the type by treating
op_options as a Mapping (use collections.abc.Mapping) and fall back to an empty
dict if it isn’t, then make a shallow copy of that mapping; support both
"writeConcern" and "write_concern" by normalizing/aliasing (pop one or the
other) before creating a pymongo.WriteConcern and applying
coll.with_options(write_concern=wc); keep the existing defaults merge logic and
ensure session handling remains unchanged.
|
Do you want still to merge this one? @language-shprt |
|
I don't need it immediately (I removed upsertor as we discussed), but it might be nice to have this flexibility, so yes. |
This is needed so that alerts can write to the database more flexibly
Summary by CodeRabbit