Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions asab/storage/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,12 @@ def generate_id(cls):
return bson.objectid.ObjectId()


async def execute(self, custom_data: typing.Optional[dict] = None, event_type: typing.Optional[str] = None):
async def execute(
self,
custom_data: typing.Optional[dict] = None,
event_type: typing.Optional[str] = None,
op_options: typing.Optional[str] = None
):
Comment on lines +273 to +278
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

🧩 Analysis chain

Fix type of op_options (dict-like, not str) and align API across storages.

The parameter is used as a mapping of operation options, but is annotated as Optional[str]. This is misleading and can cause runtime errors downstream. Also, adding a new parameter only to MongoDBUpsertor breaks cross-storage symmetry.

Apply:

-async def execute(
+async def execute(
 	self,
 	custom_data: typing.Optional[dict] = None,
 	event_type: typing.Optional[str] = None,
-	op_options: typing.Optional[str] = None
+	op_options: typing.Optional[typing.Mapping[str, typing.Any]] = None
 ):

Follow-ups:

  • Consider updating UpsertorABC and other storage implementations (inmemory, elasticsearch) to accept the same parameter (even if unused) to keep a consistent public API. I can prepare that change if desired.

Run to locate all execute() signatures and potential call sites passing op_options:


🏁 Script executed:

#!/bin/bash
# List execute() method signatures in upsertors
rg -nP --type=python -C2 '(class\s+\w*Upsertor\b|async\s+def\s+execute\s*\()' asab/

# Find call sites that pass op_options
rg -nP --type=python -C2 'execute\s*\([^)]*op_options\s*='

Length of output: 61


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Searching for Upsertor classes and execute() definitions ==="
rg -nP -C2 '(class\s+\w*Upsertor\b|async\s+def\s+execute\s*\()' asab/ || true

echo
echo "=== Searching for 'op_options' usages in execute call sites ==="
rg -nP -C2 'execute\s*\([^)]*op_options\s*=' || true

echo
echo "=== Showing asab/storage/mongodb.py around the execute() definition ==="
sed -n '1,220p' asab/storage/mongodb.py || true
echo "-----"
sed -n '220,400p' asab/storage/mongodb.py || true

Length of output: 12368


Fix op_options type and unify Upsertor API

op_options is used as a mapping (dict(op_options or {})) but is annotated as Optional[str] only in MongoDBUpsertor — change its type to a mapping and add the same optional param to the UpsertorABC and all Upsertor implementations to keep a consistent public API.

  • asab/storage/mongodb.py: change execute signature to use a mapping (example diff below).
  • asab/storage/upsertor.py (line ~121): add op_options: typing.Optional[typing.Mapping[str, typing.Any]] = None to the abstract execute signature.
  • asab/storage/inmemory.py (line ~15) and asab/storage/elasticsearch.py (line ~625): add the same op_options parameter to their execute signatures (can be ignored if unused).
  • Confirmed: no call sites found that pass op_options, so updating signatures is safe but implementations must be updated.

Apply (for mongodb):

-async def execute(
+async def execute(
 	self,
 	custom_data: typing.Optional[dict] = None,
 	event_type: typing.Optional[str] = None,
-	op_options: typing.Optional[str] = None
+	op_options: typing.Optional[typing.Mapping[str, typing.Any]] = None
 ):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def execute(
self,
custom_data: typing.Optional[dict] = None,
event_type: typing.Optional[str] = None,
op_options: typing.Optional[str] = None
):
async def execute(
self,
custom_data: typing.Optional[dict] = None,
event_type: typing.Optional[str] = None,
op_options: typing.Optional[typing.Mapping[str, typing.Any]] = None
):
🤖 Prompt for AI Agents
In asab/storage/mongodb.py around lines 273-278 change the execute signature so
op_options is typed as typing.Optional[typing.Mapping[str, typing.Any]] (it is
currently Optional[str]) because the code treats it as a mapping via
dict(op_options or {}); update the implementation to continue using
dict(op_options or {}) as before. Also update the abstract Upsertor.execute
signature in asab/storage/upsertor.py (around line ~121) to include op_options:
typing.Optional[typing.Mapping[str, typing.Any]] = None, and add the same
optional op_options parameter to the implementations in asab/storage/inmemory.py
(around line ~15) and asab/storage/elasticsearch.py (around line ~625) so the
public API is consistent; make sure to import typing.Mapping and typing.Any
where needed and keep behavior unchanged since no callers pass op_options.

session = _tx_session.get() # Can be None

id_name = self.get_id_name()
Expand Down Expand Up @@ -304,14 +309,31 @@ async def execute(self, custom_data: typing.Optional[dict] = None, event_type: t

if len(addobj) > 0:
coll = self.Storage.Database[self.Collection]
opts = dict(op_options or {}) # shallow copy so we can pop safely

# Must be applied on collection
wc = opts.pop("writeConcern", None)
if wc is not None:
wc = wc if isinstance(wc, pymongo.WriteConcern) else pymongo.WriteConcern(**wc)
coll = coll.with_options(write_concern=wc)

# Set usual defaults
defaults = {
"upsert": True if (self.Version == 0) or (self.Version is None) else False,
"return_document": pymongo.collection.ReturnDocument.AFTER,
"session": session
}

# Let custom opts override the defaults
final_opts = {**defaults, **(opts or {})}

Comment on lines +312 to +329
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Harden op_options handling and support write_concern alias.

  • dict(op_options) will throw if op_options is a non-mapping (e.g., string). Guard the type.
  • Consider accepting both "writeConcern" and "write_concern" keys.

Apply:

-			opts = dict(op_options or {})  # shallow copy so we can pop safely
+			if op_options is None:
+				opts = {}
+			elif isinstance(op_options, dict):
+				opts = dict(op_options)  # shallow copy
+			else:
+				raise TypeError("op_options must be a dict-like object of find_one_and_update options.")
 
 			# Must be applied on collection
-			wc = opts.pop("writeConcern", None)
+			wc = opts.pop("writeConcern", None) or opts.pop("write_concern", None)
 			if wc is not None:
 				wc = wc if isinstance(wc, pymongo.WriteConcern) else pymongo.WriteConcern(**wc)
 				coll = coll.with_options(write_concern=wc)
 
 			# Set usual defaults
 			defaults = {
 				"upsert": True if (self.Version == 0) or (self.Version is None) else False,
 				"return_document": pymongo.collection.ReturnDocument.AFTER,
 				"session": session
 			}
 
 			# Let custom opts override the defaults
-			final_opts = {**defaults, **(opts or {})}
+			final_opts = {**defaults, **opts}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
opts = dict(op_options or {}) # shallow copy so we can pop safely
# Must be applied on collection
wc = opts.pop("writeConcern", None)
if wc is not None:
wc = wc if isinstance(wc, pymongo.WriteConcern) else pymongo.WriteConcern(**wc)
coll = coll.with_options(write_concern=wc)
# Set usual defaults
defaults = {
"upsert": True if (self.Version == 0) or (self.Version is None) else False,
"return_document": pymongo.collection.ReturnDocument.AFTER,
"session": session
}
# Let custom opts override the defaults
final_opts = {**defaults, **(opts or {})}
if op_options is None:
opts = {}
elif isinstance(op_options, dict):
opts = dict(op_options) # shallow copy
else:
raise TypeError("op_options must be a dict-like object of find_one_and_update options.")
# Must be applied on collection
wc = opts.pop("writeConcern", None) or opts.pop("write_concern", None)
if wc is not None:
wc = wc if isinstance(wc, pymongo.WriteConcern) else pymongo.WriteConcern(**wc)
coll = coll.with_options(write_concern=wc)
# Set usual defaults
defaults = {
"upsert": True if (self.Version == 0) or (self.Version is None) else False,
"return_document": pymongo.collection.ReturnDocument.AFTER,
"session": session
}
# Let custom opts override the defaults
final_opts = {**defaults, **opts}
🤖 Prompt for AI Agents
In asab/storage/mongodb.py around lines 312 to 329, the code currently does a
blind dict(op_options) which will raise if op_options is not a mapping and only
checks for "writeConcern"; update it to first guard the type by treating
op_options as a Mapping (use collections.abc.Mapping) and fall back to an empty
dict if it isn’t, then make a shallow copy of that mapping; support both
"writeConcern" and "write_concern" by normalizing/aliasing (pop one or the
other) before creating a pymongo.WriteConcern and applying
coll.with_options(write_concern=wc); keep the existing defaults merge logic and
ensure session handling remains unchanged.

try:
ret = await coll.find_one_and_update(
filtr,
update=addobj,
upsert=True if (self.Version == 0) or (self.Version is None) else False,
return_document=pymongo.collection.ReturnDocument.AFTER,
session=session
**final_opts
)

except pymongo.errors.DuplicateKeyError as e:
if hasattr(e, "details"):
raise DuplicateError("Duplicate key error: {}".format(e), self.ObjId, key_value=e.details.get("keyValue"))
Expand Down