Skip to content

Commit c4e39cb

Browse files
committed
✨ Kafka server
1 parent 37ca4da commit c4e39cb

11 files changed

Lines changed: 2231 additions & 1058 deletions

File tree

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,6 @@ repos:
4545
name: mypy
4646
language: python
4747
pass_filenames: false
48-
entry: uv run mypy .
48+
entry: uv run --no-active mypy .
4949
types: [python]
5050
require_serial: true

pyproject.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ requires-python = ">=3.10"
5656

5757
[project.optional-dependencies]
5858
amqp = []
59+
kafka = ["aiokafka>=0.13.0,<0.14.0"]
5960
pubsub = [
6061
"google-auth>=2.43.0",
6162
"grpcio>=1.76.0"
@@ -80,7 +81,7 @@ exclude_lines = [
8081
fail_under = 100
8182

8283
[tool.coverage.run]
83-
omit = ["repid/connections/rabbitmq/protocols.py"]
84+
omit = ["repid/connections/rabbitmq/protocols.py", "repid/connections/kafka/protocols.py"]
8485
source = ["repid"]
8586

8687
[tool.hatch.build.targets.wheel]
@@ -119,7 +120,7 @@ module = "tests.*"
119120

120121
[[tool.mypy.overrides]]
121122
ignore_missing_imports = true
122-
module = ["pytest_docker_tools"]
123+
module = ["pytest_docker_tools", "aiokafka.*"]
123124

124125
[tool.pytest.ini_options]
125126
asyncio_mode = "auto"

repid/connections/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@
2929

3030
__all__ += ["RedisServer"]
3131

32+
if is_installed("aiokafka"):
33+
from repid.connections.kafka import KafkaServer
34+
35+
__all__ += ["KafkaServer"]
36+
3237
# check via try-import because pubsub is a sub-package of gcloud.aio
3338
imported_pubsub = False
3439
try:
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from repid.connections.kafka.message_broker import KafkaServer as KafkaServer

repid/connections/kafka/message.py

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
from collections.abc import Callable, Coroutine
5+
from typing import TYPE_CHECKING, Any
6+
7+
from repid.connections.abc import MessageAction, ReceivedMessageT
8+
9+
if TYPE_CHECKING:
10+
from repid.connections.kafka.message_broker import KafkaServer
11+
from repid.connections.kafka.protocols import ConsumerRecordProtocol
12+
13+
logger = logging.getLogger("repid.connections.kafka")
14+
15+
16+
class KafkaReceivedMessage(ReceivedMessageT):
17+
def __init__(
18+
self,
19+
server: KafkaServer,
20+
record: ConsumerRecordProtocol,
21+
mark_complete_callback: Callable[[ConsumerRecordProtocol], Coroutine[Any, Any, None]],
22+
) -> None:
23+
self._server = server
24+
self._record = record
25+
self._mark_complete_callback = mark_complete_callback
26+
27+
self._action: MessageAction | None = None
28+
self._payload = record.value
29+
self._channel = record.topic
30+
self._message_id = f"{record.topic}:{record.partition}:{record.offset}"
31+
32+
self._headers: dict[str, str] = {}
33+
self._content_type: str | None = None
34+
35+
if record.headers:
36+
for k, v in record.headers:
37+
if k == "content-type":
38+
self._content_type = v.decode(errors="replace") if v is not None else ""
39+
else:
40+
self._headers[k] = v.decode(errors="replace") if v is not None else ""
41+
42+
@property
43+
def payload(self) -> bytes:
44+
return self._payload if self._payload is not None else b""
45+
46+
@property
47+
def headers(self) -> dict[str, str] | None:
48+
return self._headers
49+
50+
@property
51+
def content_type(self) -> str | None:
52+
return self._content_type
53+
54+
@property
55+
def channel(self) -> str:
56+
return str(self._channel)
57+
58+
@property
59+
def action(self) -> MessageAction | None:
60+
return self._action
61+
62+
@property
63+
def is_acted_on(self) -> bool:
64+
return self._action is not None
65+
66+
@property
67+
def message_id(self) -> str:
68+
return self._message_id
69+
70+
async def ack(self) -> None:
71+
if self._action is not None:
72+
return
73+
74+
self._action = MessageAction.acked
75+
try:
76+
await self._mark_complete_callback(self._record)
77+
except Exception: # pragma: no cover
78+
self._action = None
79+
raise
80+
81+
async def nack(self) -> None:
82+
if self._action is not None:
83+
return
84+
85+
if self._server._producer is None: # pragma: no cover
86+
# it should be impossible to get here since subscribing requires a connection,
87+
# but we'll check just in case
88+
raise ConnectionError("Kafka producer is not connected.")
89+
90+
try:
91+
dlq_topic_strategy = self._server._dlq_topic_strategy
92+
if dlq_topic_strategy is not None and self._server._producer is not None:
93+
dlq_topic = dlq_topic_strategy(self._channel)
94+
headers = []
95+
if self._headers:
96+
for k, v in self._headers.items():
97+
headers.append((k, v.encode()))
98+
if self._content_type:
99+
headers.append(("content-type", self._content_type.encode()))
100+
101+
headers.append(("x-original-topic", self._channel.encode()))
102+
headers.append(("x-original-offset", str(self._record.offset).encode()))
103+
104+
await self._server._producer.send_and_wait(
105+
topic=dlq_topic,
106+
value=self._payload,
107+
headers=headers,
108+
)
109+
110+
self._action = MessageAction.nacked
111+
await self._mark_complete_callback(self._record)
112+
except Exception: # pragma: no cover
113+
self._action = None
114+
raise
115+
116+
async def reject(self) -> None:
117+
if self._action is not None:
118+
return
119+
120+
if self._server._producer is None: # pragma: no cover
121+
# it should be impossible to get here since subscribing requires a connection,
122+
# but we'll check just in case
123+
raise ConnectionError("Kafka producer is not connected.")
124+
125+
try:
126+
reject_topic_strategy = self._server._reject_topic_strategy
127+
if reject_topic_strategy is not None:
128+
reject_topic = reject_topic_strategy(self._channel)
129+
headers = []
130+
if self._headers:
131+
for k, v in self._headers.items():
132+
headers.append((k, v.encode()))
133+
if self._content_type:
134+
headers.append(("content-type", self._content_type.encode()))
135+
136+
await self._server._producer.send_and_wait(
137+
topic=reject_topic,
138+
value=self._payload,
139+
headers=headers,
140+
)
141+
142+
self._action = MessageAction.rejected
143+
await self._mark_complete_callback(self._record)
144+
except Exception: # pragma: no cover
145+
self._action = None
146+
raise
147+
148+
async def reply(
149+
self,
150+
*,
151+
payload: bytes,
152+
headers: dict[str, str] | None = None,
153+
content_type: str | None = None,
154+
channel: str | None = None,
155+
server_specific_parameters: dict[str, Any] | None = None, # noqa: ARG002
156+
) -> None:
157+
if self._action is not None:
158+
return
159+
160+
if self._server._producer is None: # pragma: no cover
161+
# it should be impossible to get here since subscribing requires a connection,
162+
# but we'll check just in case
163+
raise ConnectionError("Kafka producer is not connected.")
164+
165+
try:
166+
reply_headers = []
167+
if headers:
168+
for k, v in headers.items():
169+
reply_headers.append((k, v.encode()))
170+
if content_type:
171+
reply_headers.append(("content-type", content_type.encode()))
172+
173+
reply_channel = channel or self._channel
174+
175+
await self._server._producer.send_and_wait(
176+
topic=reply_channel,
177+
value=payload,
178+
headers=reply_headers,
179+
)
180+
181+
self._action = MessageAction.replied
182+
await self._mark_complete_callback(self._record)
183+
except Exception: # pragma: no cover
184+
self._action = None
185+
raise

0 commit comments

Comments
 (0)