Skip to content

✨ SQS server#233

Open
aleksul wants to merge 3 commits intomainfrom
sqs
Open

✨ SQS server#233
aleksul wants to merge 3 commits intomainfrom
sqs

Conversation

@aleksul
Copy link
Copy Markdown
Owner

@aleksul aleksul commented Apr 13, 2026

No description provided.

@cloudflare-workers-and-pages
Copy link
Copy Markdown

cloudflare-workers-and-pages bot commented Apr 13, 2026

Deploying repid with  Cloudflare Pages  Cloudflare Pages

Latest commit: c1aa69f
Status: ✅  Deploy successful!
Preview URL: https://8b07a8d1.repid.pages.dev
Branch Preview URL: https://sqs.repid.pages.dev

View logs

@github-actions
Copy link
Copy Markdown

Coverage Report

Name Stmts Miss Cover Missing
TOTAL 7753 0 100%

102 files skipped due to complete coverage.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an Amazon SQS transport to the repid.connections layer (backed by aiobotocore) along with ElasticMQ-powered integration coverage, enabling SQS to be used as a first-class Repid server.

Changes:

  • Introduces SqsServer, SqsSubscriber, and SqsReceivedMessage implementations using aiobotocore.
  • Adds an ElasticMQ docker fixture and extensive SQS-specific integration/unit-style tests.
  • Exposes SqsServer via repid.connections when aiobotocore is installed and adds sqs/typing dependencies in pyproject.toml.

Reviewed changes

Copilot reviewed 8 out of 9 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
repid/connections/sqs/message_broker.py Implements SqsServer lifecycle, publish, and subscribe using aiobotocore.
repid/connections/sqs/subscriber.py Implements background consumption, concurrency limiting, and pause/resume/close semantics for SQS.
repid/connections/sqs/message.py Implements ReceivedMessageT actions (ack/nack/reject/reply) for SQS messages.
repid/connections/sqs/__init__.py Exports SqsServer from the SQS subpackage.
repid/connections/__init__.py Conditionally exposes SqsServer when aiobotocore is available.
tests/integration/conftest.py Adds an ElasticMQ container + sqs_connection fixture and includes it in autoconn parametrization.
tests/integration/test_sqs_specific.py Adds a large SQS-specific test suite covering server/message/subscriber behaviors and edge cases.
pyproject.toml Adds sqs extra (aiobotocore) and typing dependency (types-aiobotocore[sqs]).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

channels_to_callbacks=channels_to_callbacks,
concurrency_limit=concurrency_limit,
)
self._active_subscribers.add(sub)
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscribe() adds the subscriber to _active_subscribers, but there is no task done-callback to remove it if the main task exits unexpectedly (error, cancellation outside close(), etc.). Other servers (e.g., NATS) add a done callback to avoid leaking subscribers. Consider adding a done callback on sub.task (or similar) to discard it from _active_subscribers.

Suggested change
self._active_subscribers.add(sub)
self._active_subscribers.add(sub)
task = getattr(sub, "task", None)
if task is not None and hasattr(task, "add_done_callback"):
task.add_done_callback(lambda _: self._active_subscribers.discard(sub))

Copilot uses AI. Check for mistakes.
Comment on lines +37 to +43
attributes = msg.get("MessageAttributes", {})
for key, value in attributes.items():
if key == "content-type":
self._content_type = value.get("StringValue")
else:
self._headers[key] = value.get("StringValue")

Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SqsReceivedMessage assumes every MessageAttribute has a StringValue and stores it directly into _headers / _content_type. If StringValue is missing (or None), this produces dict[str, str] entries with None values and can later break DLQ/reply publishing when reusing headers. Consider normalizing to str (e.g., defaulting to "" or skipping missing values) and/or handling BinaryValue attributes explicitly.

Copilot uses AI. Check for mistakes.
Comment on lines +25 to +28
self._server = server
self._channels_to_callbacks = channels_to_callbacks
self._concurrency_limit = concurrency_limit
self._semaphore = asyncio.Semaphore(concurrency_limit) if concurrency_limit else None
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asyncio.Semaphore(concurrency_limit) is created for any truthy concurrency_limit, but negative values will raise at runtime and 0 is treated as “no limit” implicitly. Other subscribers validate concurrency_limit > 0 before creating a semaphore. Consider validating concurrency_limit (None or >0) and raising a clear error for invalid values.

Suggested change
self._server = server
self._channels_to_callbacks = channels_to_callbacks
self._concurrency_limit = concurrency_limit
self._semaphore = asyncio.Semaphore(concurrency_limit) if concurrency_limit else None
if concurrency_limit is not None and concurrency_limit <= 0:
raise ValueError("concurrency_limit must be None or a positive integer.")
self._server = server
self._channels_to_callbacks = channels_to_callbacks
self._concurrency_limit = concurrency_limit
self._semaphore = asyncio.Semaphore(concurrency_limit) if concurrency_limit is not None else None

Copilot uses AI. Check for mistakes.
Comment on lines +181 to +182
for task in self._tasks:
task.cancel()
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pause() cancels the per-channel consumer tasks in _tasks but never awaits them. This can leave cancelled tasks pending and produce “Task was destroyed but it is pending!” warnings, especially in test/short-lived event loops. Consider awaiting the cancelled _tasks (e.g., await asyncio.gather(*tasks, return_exceptions=True) under suppress(asyncio.CancelledError)) before clearing the list, similar to the Kafka/InMemory subscriber implementations.

Suggested change
for task in self._tasks:
task.cancel()
tasks_to_await = list(self._tasks)
for task in tasks_to_await:
task.cancel()
if tasks_to_await:
with contextlib.suppress(asyncio.CancelledError):
await asyncio.gather(*tasks_to_await, return_exceptions=True)

Copilot uses AI. Check for mistakes.
Comment on lines +191 to +209
queue_url = await self._get_queue_url(channel)

body_str = message.payload.decode("utf-8")

message_attributes: dict[str, Any] = {}
if message.headers:
for k, v in message.headers.items():
message_attributes[k] = {"DataType": "String", "StringValue": v}
if message.content_type:
message_attributes["content-type"] = {
"DataType": "String",
"StringValue": message.content_type,
}

kwargs: dict[str, Any] = {
"QueueUrl": queue_url,
"MessageBody": body_str or " ",
"MessageAttributes": message_attributes,
}
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

publish() decodes message.payload with decode('utf-8'), which will raise UnicodeDecodeError for arbitrary bytes. Since SentMessageT.payload is defined as bytes, this makes SQS publishing unreliable for non-UTF8 payloads and inconsistent with other brokers (e.g., Redis/Kafka keep bytes). Consider encoding payload to a safe string representation (e.g., base64) and restoring it on receive, or at least decoding with errors='replace'/surrogateescape and documenting the limitation.

Copilot uses AI. Check for mistakes.
Comment on lines +162 to +181
if self._server._client is not None and self._receipt_handle:
reply_channel = channel if channel is not None else self._channel
reply_queue_url = await self._server._get_queue_url(reply_channel)

body_str = payload.decode("utf-8")

message_attributes: dict[str, Any] = {}
if headers:
for k, v in headers.items():
message_attributes[k] = {"DataType": "String", "StringValue": v}
if content_type:
message_attributes["content-type"] = {
"DataType": "String",
"StringValue": content_type,
}

kwargs: dict[str, Any] = {
"QueueUrl": reply_queue_url,
"MessageBody": body_str or " ",
"MessageAttributes": message_attributes,
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reply() decodes payload with decode('utf-8'), which will raise UnicodeDecodeError for non-UTF8 payloads. This has the same bytes-handling problem as SqsServer.publish() and should use a safe encoding strategy for arbitrary bytes (e.g., base64 + attribute flag) or a non-throwing decode mode.

Copilot uses AI. Check for mistakes.
Comment on lines +188 to +191
await self._server._client.delete_message(
QueueUrl=self._queue_url,
ReceiptHandle=self._receipt_handle,
)
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reply() deletes the original message without the same ClientError handling used in ack() / nack() / reject() (e.g., ignoring ReceiptHandleIsInvalid). This can make reply() raise in cases where other actions are intentionally tolerant, and can leave the message’s _action unset on partial failures. Consider adding consistent error handling around delete_message() (and possibly send_message()) and only setting _action = replied when the operations succeed or are intentionally suppressed.

Suggested change
await self._server._client.delete_message(
QueueUrl=self._queue_url,
ReceiptHandle=self._receipt_handle,
)
try:
await self._server._client.delete_message(
QueueUrl=self._queue_url,
ReceiptHandle=self._receipt_handle,
)
except botocore.exceptions.ClientError as e:
error_response = getattr(e, "response", {})
err = error_response.get("Error", {}) if isinstance(error_response, dict) else {}
if isinstance(err, dict) and err.get("Code") != "ReceiptHandleIsInvalid":
raise

Copilot uses AI. Check for mistakes.
Comment on lines +130 to +144
async def connect(self) -> None:
if self._client is None:
# We use an AsyncExitStack or simply manage the async context ourselves
# But SQSClient doesn't strictly need a long-lived async context manager if we use create_client
# Wait, create_client is an async context manager. We should enter it.
self._client_cm = self._session.create_client(
"sqs",
endpoint_url=self.endpoint_url,
region_name=self.region_name,
aws_access_key_id=self._aws_access_key_id,
aws_secret_access_key=self._aws_secret_access_key,
aws_session_token=self._aws_session_token,
)
self._client = await self._client_cm.__aenter__()
logger.info("server.connect", extra={"host": self.host})
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connect() contains leftover “thinking out loud” comments and relies on assigning self._client_cm dynamically. It would be clearer/safer to define _client_cm in __init__ (typed as an async context manager) and remove these internal notes, keeping only the actual lifecycle logic (enter the client context manager in connect(), exit it in disconnect()).

Copilot uses AI. Check for mistakes.
Comment on lines +146 to +157
async def disconnect(self) -> None:
if self._client is not None:
for subscriber in list(self._active_subscribers):
try:
await subscriber.close()
except Exception:
logger.exception("subscriber.closing_error")

await self._client_cm.__aexit__(None, None, None)
self._client = None
self._queue_url_cache.clear()
logger.info("server.disconnect")
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disconnect() attempts to close active subscribers, but if subscriber.close() raises, the subscriber remains in _active_subscribers because the set is never cleared. This can leave stale references across reconnects. Consider discarding each subscriber in a finally (or clearing the set after the loop), similar to how NatsServer.disconnect() clears _active_subscribers.

Copilot uses AI. Check for mistakes.
@aleksul
Copy link
Copy Markdown
Owner Author

aleksul commented Apr 13, 2026

@copilot apply changes based on the comments in this thread

Copilot AI and others added 2 commits April 13, 2026 23:28
Copy link
Copy Markdown
Contributor

Copilot AI commented Apr 13, 2026

@copilot apply changes based on the comments in this thread

Applied all changes from the review thread (commits c15ea3d and c1aa69f):

  • subscriber.py: Added concurrency_limit > 0 validation with a clear ValueError; pause() now awaits cancelled _tasks via asyncio.gather before clearing.
  • message.py: Skips None StringValue entries; excludes _repid_encoding from user headers; decodes body from base64 when _repid_encoding == base64; reply() now uses base64 encoding and has consistent ClientError handling for delete_message() matching ack()/nack()/reject().
  • message_broker.py: _client_cm initialized in __init__; "thinking out loud" comments removed from connect(); subscribe() adds a done callback to auto-remove from _active_subscribers; disconnect() clears _active_subscribers in a finally block; publish() now uses base64 encoding for arbitrary-bytes safety.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants