Skip to content

Commit 5b293a7

Browse files
committed
✨ Kafka server
1 parent 37ca4da commit 5b293a7

File tree

9 files changed

+2366
-1402
lines changed

9 files changed

+2366
-1402
lines changed

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ requires-python = ">=3.10"
5656

5757
[project.optional-dependencies]
5858
amqp = []
59+
kafka = ["aiokafka>=0.12.0"]
5960
pubsub = [
6061
"google-auth>=2.43.0",
6162
"grpcio>=1.76.0"
@@ -119,7 +120,7 @@ module = "tests.*"
119120

120121
[[tool.mypy.overrides]]
121122
ignore_missing_imports = true
122-
module = ["pytest_docker_tools"]
123+
module = ["pytest_docker_tools", "aiokafka.*"]
123124

124125
[tool.pytest.ini_options]
125126
asyncio_mode = "auto"

repid/connections/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@
2929

3030
__all__ += ["RedisServer"]
3131

32+
if is_installed("aiokafka"):
33+
from repid.connections.kafka import KafkaServer
34+
35+
__all__ += ["KafkaServer"]
36+
3237
# check via try-import because pubsub is a sub-package of gcloud.aio
3338
imported_pubsub = False
3439
try:
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from __future__ import annotations
2+
3+
from repid.connections.kafka.message_broker import KafkaServer
4+
5+
__all__ = ["KafkaServer"]

repid/connections/kafka/message.py

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
from typing import TYPE_CHECKING, Any
5+
6+
from repid.connections.abc import MessageAction, ReceivedMessageT
7+
8+
if TYPE_CHECKING:
9+
from repid.connections.kafka.message_broker import KafkaServer
10+
11+
logger = logging.getLogger("repid.connections.kafka")
12+
13+
14+
class KafkaReceivedMessage(ReceivedMessageT):
15+
def __init__(
16+
self,
17+
server: KafkaServer,
18+
record: Any,
19+
mark_complete_callback: Any,
20+
) -> None:
21+
self._server = server
22+
self._record = record
23+
self._mark_complete_callback = mark_complete_callback
24+
25+
self._action: MessageAction | None = None
26+
self._payload = record.value
27+
self._channel = record.topic
28+
self._message_id = str(record.offset)
29+
30+
self._headers: dict[str, str] = {}
31+
self._content_type: str | None = None
32+
33+
if record.headers:
34+
for k, v in record.headers:
35+
if k == "content-type":
36+
self._content_type = v.decode()
37+
else:
38+
self._headers[k] = v.decode()
39+
40+
@property
41+
def payload(self) -> bytes:
42+
return self._payload if self._payload is not None else b""
43+
44+
@property
45+
def headers(self) -> dict[str, str] | None:
46+
return self._headers
47+
48+
@property
49+
def content_type(self) -> str | None:
50+
return self._content_type
51+
52+
@property
53+
def channel(self) -> str:
54+
return str(self._channel)
55+
56+
@property
57+
def action(self) -> MessageAction | None:
58+
return self._action
59+
60+
@property
61+
def is_acted_on(self) -> bool:
62+
return self._action is not None
63+
64+
@property
65+
def message_id(self) -> str:
66+
return self._message_id
67+
68+
async def ack(self) -> None:
69+
if self._action is not None:
70+
return
71+
self._action = MessageAction.acked
72+
await self._mark_complete_callback(self._record)
73+
74+
async def nack(self) -> None:
75+
if self._action is not None:
76+
return
77+
self._action = MessageAction.nacked
78+
79+
dlq_topic_strategy = self._server._dlq_topic_strategy
80+
if dlq_topic_strategy is not None and self._server._producer is not None:
81+
dlq_topic = dlq_topic_strategy(self._channel)
82+
headers = []
83+
if self._headers:
84+
for k, v in self._headers.items():
85+
headers.append((k, v.encode()))
86+
if self._content_type:
87+
headers.append(("content-type", self._content_type.encode()))
88+
89+
headers.append(("x-original-topic", self._channel.encode()))
90+
headers.append(("x-original-offset", str(self._record.offset).encode()))
91+
92+
await self._server._producer.send_and_wait(
93+
topic=dlq_topic,
94+
value=self._payload,
95+
headers=headers,
96+
)
97+
98+
await self._mark_complete_callback(self._record)
99+
100+
async def reject(self) -> None:
101+
if self._action is not None:
102+
return
103+
self._action = MessageAction.rejected
104+
105+
if self._server._producer is not None:
106+
headers = []
107+
if self._headers:
108+
for k, v in self._headers.items():
109+
headers.append((k, v.encode()))
110+
if self._content_type:
111+
headers.append(("content-type", self._content_type.encode()))
112+
113+
await self._server._producer.send_and_wait(
114+
topic=self._channel,
115+
value=self._payload,
116+
headers=headers,
117+
)
118+
119+
await self._mark_complete_callback(self._record)
120+
121+
async def reply(
122+
self,
123+
*,
124+
payload: bytes,
125+
headers: dict[str, str] | None = None,
126+
content_type: str | None = None,
127+
channel: str | None = None,
128+
server_specific_parameters: dict[str, Any] | None = None, # noqa: ARG002
129+
) -> None:
130+
if self._action is not None:
131+
return
132+
self._action = MessageAction.replied
133+
134+
if self._server._producer is not None:
135+
reply_headers = []
136+
if headers:
137+
for k, v in headers.items():
138+
reply_headers.append((k, v.encode()))
139+
if content_type:
140+
reply_headers.append(("content-type", content_type.encode()))
141+
142+
reply_channel = channel or self._channel
143+
144+
await self._server._producer.send_and_wait(
145+
topic=reply_channel,
146+
value=payload,
147+
headers=reply_headers,
148+
)
149+
150+
await self._mark_complete_callback(self._record)
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
from collections.abc import AsyncIterator, Callable, Coroutine, Mapping, Sequence
5+
from contextlib import asynccontextmanager
6+
from typing import TYPE_CHECKING, Any
7+
8+
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
9+
10+
from repid.connections.abc import CapabilitiesT, SentMessageT, ServerT, SubscriberT
11+
from repid.connections.kafka.subscriber import KafkaSubscriber
12+
13+
if TYPE_CHECKING:
14+
from repid.asyncapi.models.common import ServerBindingsObject
15+
from repid.asyncapi.models.servers import ServerVariable
16+
from repid.connections.abc import ReceivedMessageT
17+
from repid.data import ExternalDocs, Tag
18+
19+
logger = logging.getLogger("repid.connections.kafka")
20+
21+
22+
class KafkaServer(ServerT):
23+
def __init__(
24+
self,
25+
dsn: str,
26+
*,
27+
dlq_topic_strategy: Callable[[str], str] | None = lambda channel: f"repid_{channel}_dlq",
28+
title: str | None = None,
29+
summary: str | None = None,
30+
description: str | None = None,
31+
variables: Mapping[str, ServerVariable] | None = None,
32+
security: Sequence[Any] | None = None,
33+
tags: Sequence[Tag] | None = None,
34+
external_docs: ExternalDocs | None = None,
35+
bindings: ServerBindingsObject | None = None,
36+
) -> None:
37+
self.dsn = dsn
38+
self._producer: AIOKafkaProducer | None = None # type: ignore[no-any-unimported]
39+
self._dlq_topic_strategy = dlq_topic_strategy
40+
41+
# AsyncAPI metadata
42+
self._title = title
43+
self._summary = summary
44+
self._description = description
45+
self._variables = variables
46+
self._security = security
47+
self._tags = tags
48+
self._external_docs = external_docs
49+
self._bindings = bindings
50+
51+
@property
52+
def host(self) -> str:
53+
return self.dsn
54+
55+
@property
56+
def protocol(self) -> str:
57+
return "kafka"
58+
59+
@property
60+
def pathname(self) -> str | None:
61+
return None
62+
63+
@property
64+
def title(self) -> str | None:
65+
return self._title
66+
67+
@property
68+
def summary(self) -> str | None:
69+
return self._summary
70+
71+
@property
72+
def description(self) -> str | None:
73+
return self._description
74+
75+
@property
76+
def protocol_version(self) -> str | None:
77+
return None
78+
79+
@property
80+
def variables(self) -> Mapping[str, ServerVariable] | None:
81+
return self._variables
82+
83+
@property
84+
def security(self) -> Sequence[Any] | None:
85+
return self._security
86+
87+
@property
88+
def tags(self) -> Sequence[Tag] | None:
89+
return self._tags
90+
91+
@property
92+
def external_docs(self) -> ExternalDocs | None:
93+
return self._external_docs
94+
95+
@property
96+
def bindings(self) -> ServerBindingsObject | None:
97+
return self._bindings
98+
99+
@property
100+
def capabilities(self) -> CapabilitiesT:
101+
return {
102+
"supports_acknowledgments": True,
103+
"supports_persistence": True,
104+
"supports_reply": True,
105+
"supports_lightweight_pause": False,
106+
}
107+
108+
@property
109+
def is_connected(self) -> bool:
110+
return self._producer is not None
111+
112+
async def connect(self) -> None:
113+
if self._producer is None:
114+
self._producer = AIOKafkaProducer(bootstrap_servers=self.dsn)
115+
await self._producer.start()
116+
logger.info("server.connect", extra={"host": self.dsn})
117+
118+
async def disconnect(self) -> None:
119+
if self._producer is not None:
120+
await self._producer.stop()
121+
self._producer = None
122+
logger.info("server.disconnect")
123+
124+
@asynccontextmanager
125+
async def connection(self) -> AsyncIterator[KafkaServer]:
126+
await self.connect()
127+
try:
128+
yield self
129+
finally:
130+
await self.disconnect()
131+
132+
async def publish(
133+
self,
134+
*,
135+
channel: str,
136+
message: SentMessageT,
137+
server_specific_parameters: dict[str, Any] | None = None, # noqa: ARG002
138+
) -> None:
139+
if self._producer is None:
140+
raise RuntimeError("Kafka producer is not connected.")
141+
142+
logger.debug("channel.publish", extra={"channel": channel})
143+
144+
headers = []
145+
if message.headers:
146+
for k, v in message.headers.items():
147+
headers.append((k, v.encode()))
148+
if message.content_type:
149+
headers.append(("content-type", message.content_type.encode()))
150+
151+
await self._producer.send_and_wait(
152+
topic=channel,
153+
value=message.payload,
154+
headers=headers,
155+
)
156+
157+
async def subscribe(
158+
self,
159+
*,
160+
channels_to_callbacks: dict[str, Callable[[ReceivedMessageT], Coroutine[None, None, None]]],
161+
concurrency_limit: int | None = None,
162+
) -> SubscriberT:
163+
logger.debug("channel.subscribe", extra={"channels": list(channels_to_callbacks.keys())})
164+
165+
consumer = AIOKafkaConsumer(
166+
*channels_to_callbacks.keys(),
167+
bootstrap_servers=self.dsn,
168+
group_id="repid-group",
169+
enable_auto_commit=False,
170+
auto_offset_reset="earliest",
171+
)
172+
await consumer.start()
173+
174+
return KafkaSubscriber(
175+
server=self,
176+
consumer=consumer,
177+
channels_to_callbacks=channels_to_callbacks,
178+
concurrency_limit=concurrency_limit,
179+
)

0 commit comments

Comments
 (0)