Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ repos:
name: mypy
language: python
pass_filenames: false
entry: uv run mypy .
entry: uv run --no-active mypy .
types: [python]
require_serial: true
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ requires-python = ">=3.10"

[project.optional-dependencies]
amqp = []
kafka = ["aiokafka>=0.13.0,<0.14.0"]
pubsub = [
"google-auth>=2.43.0",
"grpcio>=1.76.0"
Expand All @@ -80,7 +81,7 @@ exclude_lines = [
fail_under = 100

[tool.coverage.run]
omit = ["repid/connections/rabbitmq/protocols.py"]
omit = ["repid/connections/rabbitmq/protocols.py", "repid/connections/kafka/protocols.py"]
source = ["repid"]

[tool.hatch.build.targets.wheel]
Expand Down Expand Up @@ -119,7 +120,7 @@ module = "tests.*"

[[tool.mypy.overrides]]
ignore_missing_imports = true
module = ["pytest_docker_tools"]
module = ["pytest_docker_tools", "aiokafka.*"]

[tool.pytest.ini_options]
asyncio_mode = "auto"
Expand Down
5 changes: 5 additions & 0 deletions repid/connections/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@

__all__ += ["RedisServer"]

if is_installed("aiokafka"):
from repid.connections.kafka import KafkaServer

__all__ += ["KafkaServer"]

# check via try-import because pubsub is a sub-package of gcloud.aio
imported_pubsub = False
try:
Expand Down
1 change: 1 addition & 0 deletions repid/connections/kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from repid.connections.kafka.message_broker import KafkaServer as KafkaServer
185 changes: 185 additions & 0 deletions repid/connections/kafka/message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
from __future__ import annotations

import logging
from collections.abc import Callable, Coroutine
from typing import TYPE_CHECKING, Any

from repid.connections.abc import MessageAction, ReceivedMessageT

if TYPE_CHECKING:
from repid.connections.kafka.message_broker import KafkaServer
from repid.connections.kafka.protocols import ConsumerRecordProtocol

logger = logging.getLogger("repid.connections.kafka")


class KafkaReceivedMessage(ReceivedMessageT):
def __init__(
self,
server: KafkaServer,
record: ConsumerRecordProtocol,
mark_complete_callback: Callable[[ConsumerRecordProtocol], Coroutine[Any, Any, None]],
) -> None:
self._server = server
self._record = record
self._mark_complete_callback = mark_complete_callback

self._action: MessageAction | None = None
self._payload = record.value
self._channel = record.topic
self._message_id = f"{record.topic}:{record.partition}:{record.offset}"

self._headers: dict[str, str] = {}
self._content_type: str | None = None

if record.headers:
for k, v in record.headers:
if k == "content-type":
self._content_type = v.decode(errors="replace") if v is not None else ""
else:
self._headers[k] = v.decode(errors="replace") if v is not None else ""

@property
def payload(self) -> bytes:
return self._payload if self._payload is not None else b""

@property
def headers(self) -> dict[str, str] | None:
return self._headers

@property
def content_type(self) -> str | None:
return self._content_type

@property
def channel(self) -> str:
return str(self._channel)

@property
def action(self) -> MessageAction | None:
return self._action

@property
def is_acted_on(self) -> bool:
return self._action is not None

@property
def message_id(self) -> str:
return self._message_id

async def ack(self) -> None:
if self._action is not None:
return

self._action = MessageAction.acked
try:
await self._mark_complete_callback(self._record)
except Exception: # pragma: no cover
self._action = None
raise

async def nack(self) -> None:
if self._action is not None:
return

if self._server._producer is None: # pragma: no cover
# it should be impossible to get here since subscribing requires a connection,
# but we'll check just in case
raise ConnectionError("Kafka producer is not connected.")

try:
dlq_topic_strategy = self._server._dlq_topic_strategy
if dlq_topic_strategy is not None and self._server._producer is not None:
dlq_topic = dlq_topic_strategy(self._channel)
headers = []
if self._headers:
for k, v in self._headers.items():
headers.append((k, v.encode()))
if self._content_type:
headers.append(("content-type", self._content_type.encode()))

headers.append(("x-original-topic", self._channel.encode()))
headers.append(("x-original-offset", str(self._record.offset).encode()))

await self._server._producer.send_and_wait(
topic=dlq_topic,
value=self._payload,
headers=headers,
)

self._action = MessageAction.nacked
await self._mark_complete_callback(self._record)
except Exception: # pragma: no cover
self._action = None
raise

async def reject(self) -> None:
if self._action is not None:
return

if self._server._producer is None: # pragma: no cover
# it should be impossible to get here since subscribing requires a connection,
# but we'll check just in case
raise ConnectionError("Kafka producer is not connected.")

try:
reject_topic_strategy = self._server._reject_topic_strategy
if reject_topic_strategy is not None:
reject_topic = reject_topic_strategy(self._channel)
headers = []
if self._headers:
for k, v in self._headers.items():
headers.append((k, v.encode()))
if self._content_type:
headers.append(("content-type", self._content_type.encode()))

await self._server._producer.send_and_wait(
topic=reject_topic,
value=self._payload,
headers=headers,
)

self._action = MessageAction.rejected
await self._mark_complete_callback(self._record)
except Exception: # pragma: no cover
self._action = None
raise

async def reply(
self,
*,
payload: bytes,
headers: dict[str, str] | None = None,
content_type: str | None = None,
channel: str | None = None,
server_specific_parameters: dict[str, Any] | None = None, # noqa: ARG002
) -> None:
if self._action is not None:
return

if self._server._producer is None: # pragma: no cover
# it should be impossible to get here since subscribing requires a connection,
# but we'll check just in case
raise ConnectionError("Kafka producer is not connected.")

try:
reply_headers = []
if headers:
for k, v in headers.items():
reply_headers.append((k, v.encode()))
if content_type:
reply_headers.append(("content-type", content_type.encode()))

reply_channel = channel or self._channel

await self._server._producer.send_and_wait(
topic=reply_channel,
value=payload,
headers=reply_headers,
)

self._action = MessageAction.replied
await self._mark_complete_callback(self._record)
except Exception: # pragma: no cover
self._action = None
raise
Loading
Loading