Skip to content

Commit 39402c3

Browse files
committed
✨ Kafka server
1 parent 37ca4da commit 39402c3

File tree

11 files changed

+2180
-1058
lines changed

11 files changed

+2180
-1058
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,6 @@ repos:
4545
name: mypy
4646
language: python
4747
pass_filenames: false
48-
entry: uv run mypy .
48+
entry: uv run --no-active mypy .
4949
types: [python]
5050
require_serial: true

pyproject.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ requires-python = ">=3.10"
5656

5757
[project.optional-dependencies]
5858
amqp = []
59+
kafka = ["aiokafka>=0.13.0,<0.14.0"]
5960
pubsub = [
6061
"google-auth>=2.43.0",
6162
"grpcio>=1.76.0"
@@ -80,7 +81,7 @@ exclude_lines = [
8081
fail_under = 100
8182

8283
[tool.coverage.run]
83-
omit = ["repid/connections/rabbitmq/protocols.py"]
84+
omit = ["repid/connections/rabbitmq/protocols.py", "repid/connections/kafka/protocols.py"]
8485
source = ["repid"]
8586

8687
[tool.hatch.build.targets.wheel]
@@ -119,7 +120,7 @@ module = "tests.*"
119120

120121
[[tool.mypy.overrides]]
121122
ignore_missing_imports = true
122-
module = ["pytest_docker_tools"]
123+
module = ["pytest_docker_tools", "aiokafka.*"]
123124

124125
[tool.pytest.ini_options]
125126
asyncio_mode = "auto"

repid/connections/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@
2929

3030
__all__ += ["RedisServer"]
3131

32+
if is_installed("aiokafka"):
33+
from repid.connections.kafka import KafkaServer
34+
35+
__all__ += ["KafkaServer"]
36+
3237
# check via try-import because pubsub is a sub-package of gcloud.aio
3338
imported_pubsub = False
3439
try:
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from repid.connections.kafka.message_broker import KafkaServer as KafkaServer

repid/connections/kafka/message.py

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
from collections.abc import Callable, Coroutine
5+
from typing import TYPE_CHECKING, Any
6+
7+
from repid.connections.abc import MessageAction, ReceivedMessageT
8+
9+
if TYPE_CHECKING:
10+
from repid.connections.kafka.message_broker import KafkaServer
11+
from repid.connections.kafka.protocols import ConsumerRecordProtocol
12+
13+
logger = logging.getLogger("repid.connections.kafka")
14+
15+
16+
class KafkaReceivedMessage(ReceivedMessageT):
17+
def __init__(
18+
self,
19+
server: KafkaServer,
20+
record: ConsumerRecordProtocol,
21+
mark_complete_callback: Callable[[ConsumerRecordProtocol], Coroutine[Any, Any, None]],
22+
) -> None:
23+
self._server = server
24+
self._record = record
25+
self._mark_complete_callback = mark_complete_callback
26+
27+
self._action: MessageAction | None = None
28+
self._payload = record.value
29+
self._channel = record.topic
30+
self._message_id = str(record.offset)
31+
32+
self._headers: dict[str, str] = {}
33+
self._content_type: str | None = None
34+
35+
if record.headers:
36+
for k, v in record.headers:
37+
if k == "content-type":
38+
self._content_type = v.decode()
39+
else:
40+
self._headers[k] = v.decode()
41+
42+
@property
43+
def payload(self) -> bytes:
44+
return self._payload if self._payload is not None else b""
45+
46+
@property
47+
def headers(self) -> dict[str, str] | None:
48+
return self._headers
49+
50+
@property
51+
def content_type(self) -> str | None:
52+
return self._content_type
53+
54+
@property
55+
def channel(self) -> str:
56+
return str(self._channel)
57+
58+
@property
59+
def action(self) -> MessageAction | None:
60+
return self._action
61+
62+
@property
63+
def is_acted_on(self) -> bool:
64+
return self._action is not None
65+
66+
@property
67+
def message_id(self) -> str:
68+
return self._message_id
69+
70+
async def ack(self) -> None:
71+
if self._action is not None:
72+
return
73+
self._action = MessageAction.acked
74+
await self._mark_complete_callback(self._record)
75+
76+
async def nack(self) -> None:
77+
if self._action is not None:
78+
return
79+
self._action = MessageAction.nacked
80+
81+
if self._server._producer is None: # pragma: no cover
82+
# it should be impossible to get here since subscribing requires a connection,
83+
# but we'll check just in case
84+
raise ConnectionError("Kafka producer is not connected.")
85+
86+
dlq_topic_strategy = self._server._dlq_topic_strategy
87+
if dlq_topic_strategy is not None and self._server._producer is not None:
88+
dlq_topic = dlq_topic_strategy(self._channel)
89+
headers = []
90+
if self._headers:
91+
for k, v in self._headers.items():
92+
headers.append((k, v.encode()))
93+
if self._content_type:
94+
headers.append(("content-type", self._content_type.encode()))
95+
96+
headers.append(("x-original-topic", self._channel.encode()))
97+
headers.append(("x-original-offset", str(self._record.offset).encode()))
98+
99+
await self._server._producer.send_and_wait(
100+
topic=dlq_topic,
101+
value=self._payload,
102+
headers=headers,
103+
)
104+
105+
await self._mark_complete_callback(self._record)
106+
107+
async def reject(self) -> None:
108+
if self._action is not None:
109+
return
110+
self._action = MessageAction.rejected
111+
112+
if self._server._producer is None: # pragma: no cover
113+
# it should be impossible to get here since subscribing requires a connection,
114+
# but we'll check just in case
115+
raise ConnectionError("Kafka producer is not connected.")
116+
117+
reject_topic_strategy = self._server._reject_topic_strategy
118+
if reject_topic_strategy is not None:
119+
reject_topic = reject_topic_strategy(self._channel)
120+
headers = []
121+
if self._headers:
122+
for k, v in self._headers.items():
123+
headers.append((k, v.encode()))
124+
if self._content_type:
125+
headers.append(("content-type", self._content_type.encode()))
126+
127+
await self._server._producer.send_and_wait(
128+
topic=reject_topic,
129+
value=self._payload,
130+
headers=headers,
131+
)
132+
133+
await self._mark_complete_callback(self._record)
134+
135+
async def reply(
136+
self,
137+
*,
138+
payload: bytes,
139+
headers: dict[str, str] | None = None,
140+
content_type: str | None = None,
141+
channel: str | None = None,
142+
server_specific_parameters: dict[str, Any] | None = None, # noqa: ARG002
143+
) -> None:
144+
if self._action is not None:
145+
return
146+
self._action = MessageAction.replied
147+
148+
if self._server._producer is None: # pragma: no cover
149+
# it should be impossible to get here since subscribing requires a connection,
150+
# but we'll check just in case
151+
raise ConnectionError("Kafka producer is not connected.")
152+
153+
reply_headers = []
154+
if headers:
155+
for k, v in headers.items():
156+
reply_headers.append((k, v.encode()))
157+
if content_type:
158+
reply_headers.append(("content-type", content_type.encode()))
159+
160+
reply_channel = channel or self._channel
161+
162+
await self._server._producer.send_and_wait(
163+
topic=reply_channel,
164+
value=payload,
165+
headers=reply_headers,
166+
)
167+
168+
await self._mark_complete_callback(self._record)

0 commit comments

Comments
 (0)