Skip to content

Commit ee0fc71

Browse files
committed
✨ SQS server
1 parent 848edde commit ee0fc71

9 files changed

Lines changed: 1998 additions & 2 deletions

File tree

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ docs = [
1414
lint = [
1515
"mypy>=1.19.1",
1616
"ruff>=0.15.8",
17-
"types-grpcio>=1.0.0.20251009"
17+
"types-grpcio>=1.0.0.20251009",
18+
"types-aiobotocore[sqs]>=3.4.0"
1819
]
1920
pre-commit = [
2021
"pre-commit>=4.5.1"
@@ -64,6 +65,7 @@ pubsub = [
6465
]
6566
pydantic = ["pydantic>=2.0.0,<3.0.0"]
6667
redis = ["redis<8.0.0,>=7.0.0"]
68+
sqs = ["aiobotocore>=2.10.0,<3.0.0"]
6769

6870
[project.urls]
6971
documentation = "https://repid.aleksul.space"

repid/connections/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,8 @@
5353
from repid.connections.nats import NatsServer
5454

5555
__all__ += ["NatsServer"]
56+
57+
if is_installed("aiobotocore"):
58+
from repid.connections.sqs import SqsServer
59+
60+
__all__ += ["SqsServer"]

repid/connections/sqs/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from repid.connections.sqs.message_broker import SqsServer
2+
3+
__all__ = ["SqsServer"]

repid/connections/sqs/message.py

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
from __future__ import annotations
2+
3+
import base64
4+
import logging
5+
from collections.abc import Mapping
6+
from typing import TYPE_CHECKING, Any
7+
8+
from repid.connections.abc import MessageAction, ReceivedMessageT
9+
10+
if TYPE_CHECKING:
11+
from repid.connections.sqs.message_broker import SqsServer
12+
13+
logger = logging.getLogger("repid.connections.sqs")
14+
15+
16+
class SqsReceivedMessage(ReceivedMessageT):
17+
def __init__(
18+
self,
19+
server: SqsServer,
20+
channel: str,
21+
queue_url: str,
22+
msg: Mapping[str, Any],
23+
) -> None:
24+
self._server = server
25+
self._channel = channel
26+
self._queue_url = queue_url
27+
self._msg = msg
28+
self._action: MessageAction | None = None
29+
30+
self._message_id = msg.get("MessageId")
31+
self._receipt_handle = msg.get("ReceiptHandle")
32+
33+
self._headers: dict[str, str] = {}
34+
self._content_type: str | None = None
35+
36+
attributes = msg.get("MessageAttributes", {})
37+
for key, value in attributes.items():
38+
if key == "content-type":
39+
self._content_type = value.get("StringValue")
40+
else:
41+
self._headers[key] = value.get("StringValue")
42+
43+
body = str(msg.get("Body", ""))
44+
self._payload = base64.b64decode(body)
45+
46+
@property
47+
def payload(self) -> bytes:
48+
return self._payload
49+
50+
@property
51+
def headers(self) -> dict[str, str] | None:
52+
return self._headers
53+
54+
@property
55+
def content_type(self) -> str | None:
56+
return self._content_type
57+
58+
@property
59+
def channel(self) -> str:
60+
return self._channel
61+
62+
@property
63+
def action(self) -> MessageAction | None:
64+
return self._action
65+
66+
@property
67+
def is_acted_on(self) -> bool:
68+
return self._action is not None
69+
70+
@property
71+
def message_id(self) -> str | None:
72+
return self._message_id
73+
74+
async def ack(self) -> None:
75+
if self._action is not None:
76+
return
77+
self._action = MessageAction.acked
78+
79+
if self._server._client is not None and self._receipt_handle:
80+
await self._server._client.delete_message(
81+
QueueUrl=self._queue_url,
82+
ReceiptHandle=self._receipt_handle,
83+
)
84+
85+
async def nack(self) -> None:
86+
if self._action is not None:
87+
return
88+
self._action = MessageAction.nacked
89+
90+
if self._server._client is not None and self._receipt_handle:
91+
dlq_strategy = self._server._dlq_queue_strategy
92+
if dlq_strategy:
93+
dlq_channel = dlq_strategy(self._channel)
94+
dlq_queue_url = await self._server._get_queue_url(dlq_channel)
95+
96+
message_attributes: dict[str, Any] = {}
97+
for k, v in self._headers.items():
98+
message_attributes[k] = {"DataType": "String", "StringValue": v}
99+
if self._content_type:
100+
message_attributes["content-type"] = {
101+
"DataType": "String",
102+
"StringValue": self._content_type,
103+
}
104+
105+
await self._server._client.send_message(
106+
QueueUrl=dlq_queue_url,
107+
MessageBody=self._msg.get("Body", ""),
108+
MessageAttributes=message_attributes,
109+
)
110+
111+
# SQS needs delete_message after NACK if moved to DLQ,
112+
# or VisibilityTimeout=0 if we don't handle DLQ natively and just want redelivery.
113+
# But the plan specifies "followed by an ack() on the original queue".
114+
await self._server._client.delete_message(
115+
QueueUrl=self._queue_url,
116+
ReceiptHandle=self._receipt_handle,
117+
)
118+
119+
async def reject(self) -> None:
120+
if self._action is not None:
121+
return
122+
self._action = MessageAction.rejected
123+
124+
if self._server._client is not None and self._receipt_handle:
125+
await self._server._client.change_message_visibility(
126+
QueueUrl=self._queue_url,
127+
ReceiptHandle=self._receipt_handle,
128+
VisibilityTimeout=0,
129+
)
130+
131+
async def reply(
132+
self,
133+
*,
134+
payload: bytes,
135+
headers: dict[str, str] | None = None,
136+
content_type: str | None = None,
137+
channel: str | None = None,
138+
server_specific_parameters: dict[str, Any] | None = None,
139+
) -> None:
140+
if self._action is not None:
141+
return
142+
self._action = MessageAction.replied
143+
144+
if self._server._client is not None and self._receipt_handle:
145+
reply_channel = channel if channel is not None else self._channel
146+
reply_queue_url = await self._server._get_queue_url(reply_channel)
147+
148+
message_attributes: dict[str, Any] = {}
149+
if headers:
150+
for k, v in headers.items():
151+
message_attributes[k] = {"DataType": "String", "StringValue": v}
152+
if content_type:
153+
message_attributes["content-type"] = {
154+
"DataType": "String",
155+
"StringValue": content_type,
156+
}
157+
158+
kwargs: dict[str, Any] = {
159+
"QueueUrl": reply_queue_url,
160+
"MessageBody": base64.b64encode(payload).decode("ascii") or " ",
161+
"MessageAttributes": message_attributes,
162+
}
163+
if server_specific_parameters:
164+
kwargs.update(server_specific_parameters)
165+
166+
await self._server._client.send_message(**kwargs)
167+
168+
await self._server._client.delete_message(
169+
QueueUrl=self._queue_url,
170+
ReceiptHandle=self._receipt_handle,
171+
)

0 commit comments

Comments
 (0)