Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ docs = [
lint = [
"mypy>=1.19.1",
"ruff>=0.15.8",
"types-grpcio>=1.0.0.20251009"
"types-grpcio>=1.0.0.20251009",
"types-aiobotocore[sqs]>=3.4.0"
]
pre-commit = [
"pre-commit>=4.5.1"
Expand Down Expand Up @@ -64,6 +65,7 @@ pubsub = [
]
pydantic = ["pydantic>=2.0.0,<3.0.0"]
redis = ["redis<8.0.0,>=7.0.0"]
sqs = ["aiobotocore>=2.10.0,<3.0.0"]

[project.urls]
documentation = "https://repid.aleksul.space"
Expand Down
5 changes: 5 additions & 0 deletions repid/connections/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,8 @@
from repid.connections.nats import NatsServer

__all__ += ["NatsServer"]

if is_installed("aiobotocore"):
from repid.connections.sqs import SqsServer

__all__ += ["SqsServer"]
3 changes: 3 additions & 0 deletions repid/connections/sqs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from repid.connections.sqs.message_broker import SqsServer

__all__ = ["SqsServer"]
207 changes: 207 additions & 0 deletions repid/connections/sqs/message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
from __future__ import annotations

import base64
import binascii
import logging
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any

import botocore.exceptions

from repid.connections.abc import MessageAction, ReceivedMessageT

if TYPE_CHECKING:
from repid.connections.sqs.message_broker import SqsServer

logger = logging.getLogger("repid.connections.sqs")


class SqsReceivedMessage(ReceivedMessageT):
def __init__(
self,
server: SqsServer,
channel: str,
queue_url: str,
msg: Mapping[str, Any],
) -> None:
self._server = server
self._channel = channel
self._queue_url = queue_url
self._msg = msg
self._action: MessageAction | None = None

self._message_id = msg.get("MessageId")
self._receipt_handle = msg.get("ReceiptHandle")

self._headers: dict[str, str] = {}
self._content_type: str | None = None

attributes = msg.get("MessageAttributes", {})
for key, value in attributes.items():
str_value = value.get("StringValue")
if key == "content-type":
if str_value is not None:
self._content_type = str_value
else:
if str_value is not None:
self._headers[key] = str_value

Comment on lines +39 to +48
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SqsReceivedMessage assumes every MessageAttribute has a StringValue and stores it directly into _headers / _content_type. If StringValue is missing (or None), this produces dict[str, str] entries with None values and can later break DLQ/reply publishing when reusing headers. Consider normalizing to str (e.g., defaulting to "" or skipping missing values) and/or handling BinaryValue attributes explicitly.

Copilot uses AI. Check for mistakes.
body = msg.get("Body", "")
try:
self._payload = base64.b64decode(body, validate=True)
except (binascii.Error, ValueError):
self._payload = str(body).encode("utf-8", errors="replace")
Comment on lines +49 to +53
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

base64.b64decode(body) is called without validate=True, which can silently accept/ignore non-base64 characters and yield corrupted payloads when _repid_encoding is set. Consider decoding with validation (or otherwise detecting invalid base64) so corrupted message bodies fall back to the raw bytes path instead of producing unexpected payloads.

Copilot uses AI. Check for mistakes.

@property
def payload(self) -> bytes:
return self._payload

@property
def headers(self) -> dict[str, str] | None:
return self._headers

@property
def content_type(self) -> str | None:
return self._content_type

@property
def channel(self) -> str:
return self._channel

@property
def action(self) -> MessageAction | None:
return self._action

@property
def is_acted_on(self) -> bool:
return self._action is not None

@property
def message_id(self) -> str | None:
return self._message_id

async def ack(self) -> None:
if self._action is not None:
return

if self._server._client is not None and self._receipt_handle:
try:
await self._server._client.delete_message(
QueueUrl=self._queue_url,
ReceiptHandle=self._receipt_handle,
)
except botocore.exceptions.ClientError as e:
error_response = getattr(e, "response", {})
err = error_response.get("Error", {}) if isinstance(error_response, dict) else {}
if isinstance(err, dict) and err.get("Code") != "ReceiptHandleIsInvalid":
raise

self._action = MessageAction.acked

async def nack(self) -> None:
if self._action is not None:
return

if self._server._client is not None and self._receipt_handle:
dlq_strategy = self._server._dlq_queue_strategy
if dlq_strategy:
dlq_channel = dlq_strategy(self._channel)
dlq_queue_url = await self._server._get_queue_url(dlq_channel)

message_attributes: dict[str, Any] = {}
for k, v in self._headers.items():
message_attributes[k] = {"DataType": "String", "StringValue": v}
if self._content_type:
message_attributes["content-type"] = {
"DataType": "String",
"StringValue": self._content_type,
}

await self._server._client.send_message(
QueueUrl=dlq_queue_url,
MessageBody=self._msg.get("Body", ""),
MessageAttributes=message_attributes,
)
Comment on lines +105 to +124
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When routing a nacked message to the DLQ, the code forwards the original base64-encoded Body but does not include the _repid_encoding=base64 message attribute. As a result, DLQ consumers will treat the body as plain text and SqsReceivedMessage.payload will become the base64 string bytes rather than the original payload. Preserve _repid_encoding (and ideally forward existing message attributes consistently) when publishing to the DLQ so the payload is decoded correctly.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal is to have repid message scheduling framework-agnostic, so ideally we would avoid _repid_encoding attribute.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed _repid_encoding entirely (commit d2b1e13). publish() and reply() always base64-encode the body with no attribute flag. SqsReceivedMessage always attempts base64.b64decode(body, validate=True) and falls back to UTF-8 for non-base64 bodies. This also fixes the DLQ case — nack() forwards the original body (already base64-encoded) and the existing user attributes without any framework-specific attribute.


try:
await self._server._client.delete_message(
QueueUrl=self._queue_url,
ReceiptHandle=self._receipt_handle,
)
except botocore.exceptions.ClientError as e:
error_response = getattr(e, "response", {})
err = error_response.get("Error", {}) if isinstance(error_response, dict) else {}
if isinstance(err, dict) and err.get("Code") != "ReceiptHandleIsInvalid":
raise

self._action = MessageAction.nacked

async def reject(self) -> None:
if self._action is not None:
return

if self._server._client is not None and self._receipt_handle:
try:
await self._server._client.change_message_visibility(
QueueUrl=self._queue_url,
ReceiptHandle=self._receipt_handle,
VisibilityTimeout=0,
)
except botocore.exceptions.ClientError as e:
error_response = getattr(e, "response", {})
err = error_response.get("Error", {}) if isinstance(error_response, dict) else {}
if isinstance(err, dict) and err.get("Code") != "ReceiptHandleIsInvalid":
raise

self._action = MessageAction.rejected

async def reply(
self,
*,
payload: bytes,
headers: dict[str, str] | None = None,
content_type: str | None = None,
channel: str | None = None,
server_specific_parameters: dict[str, Any] | None = None,
) -> None:
if self._action is not None:
return

if self._server._client is not None and self._receipt_handle:
reply_channel = channel if channel is not None else self._channel
reply_queue_url = await self._server._get_queue_url(reply_channel)

body_str = base64.b64encode(payload).decode("ascii")

message_attributes: dict[str, Any] = {}
if headers:
for k, v in headers.items():
message_attributes[k] = {"DataType": "String", "StringValue": v}
if content_type:
message_attributes["content-type"] = {
"DataType": "String",
"StringValue": content_type,
}

kwargs: dict[str, Any] = {
"QueueUrl": reply_queue_url,
"MessageBody": body_str,
"MessageAttributes": message_attributes,
Comment on lines +170 to +189
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reply() decodes payload with decode('utf-8'), which will raise UnicodeDecodeError for non-UTF8 payloads. This has the same bytes-handling problem as SqsServer.publish() and should use a safe encoding strategy for arbitrary bytes (e.g., base64 + attribute flag) or a non-throwing decode mode.

Copilot uses AI. Check for mistakes.
}
if server_specific_parameters:
kwargs.update(server_specific_parameters)

await self._server._client.send_message(**kwargs)

try:
await self._server._client.delete_message(
QueueUrl=self._queue_url,
ReceiptHandle=self._receipt_handle,
)
except botocore.exceptions.ClientError as e:
error_response = getattr(e, "response", {})
err = error_response.get("Error", {}) if isinstance(error_response, dict) else {}
if isinstance(err, dict) and err.get("Code") != "ReceiptHandleIsInvalid":
raise

self._action = MessageAction.replied
Loading