diff --git a/docker-compose.yml b/docker-compose.yml
index d0e5b37..6519a2f 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -5,12 +5,14 @@ services:
build:
context: .
dockerfile: Dockerfile
- container_name: python_mailing_api
+ container_name: rest_mailing
volumes:
- .:/app
command: bash -lc "uvicorn rest.api:app --host 0.0.0.0 --port 6245"
environment:
- PYTHONUNBUFFERED=1
+ - CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672//
+ - CELERY_RESULT_BACKEND=rpc://
ports:
- "6245:6245"
depends_on:
diff --git a/mailing/tasks.py b/mailing/tasks.py
index 59608f4..113329c 100644
--- a/mailing/tasks.py
+++ b/mailing/tasks.py
@@ -40,10 +40,12 @@ def send_email_task(
) -> Any:
"""Celery task that sends an email via the selected provider.
- Delegates to mailing.mailing.send_email.
+ Delegates to mailing.mailing.send_email and ensures the return value is JSON-serializable
+ for Celery backends (e.g., RPC), converting provider-specific response objects into
+ simple dictionaries.
"""
- return send_email(
+ send_email(
provider=provider,
subject=subject,
message=message,
diff --git a/mailing/tests/test_tasks.py b/mailing/tests/test_tasks.py
new file mode 100644
index 0000000..1f66fcf
--- /dev/null
+++ b/mailing/tests/test_tasks.py
@@ -0,0 +1,30 @@
+import pytest
+from mailing.tasks import send_email_task
+
+
+class TestSendEmailTask:
+ def test_task_calls_mailing_send_email_with_all_arguments(self, mocker):
+ # Arrange
+ mocked = mocker.patch("mailing.tasks.send_email", return_value={"ok": True})
+
+ kwargs = {
+ "provider": "sendgrid",
+ "subject": "Hello",
+ "message": "World",
+ "recipient_list": ["a@example.com", "b@example.com"],
+ "from_email": "from@example.com",
+ "html_content": "Hi",
+ "api_key": "key-123",
+ }
+
+ # Act
+ result = send_email_task(**kwargs)
+
+ # Assert
+ # The Celery task currently returns None (delegates to synchronous helper)
+ assert result is None
+ mocked.assert_called_once_with(**kwargs)
+
+ def test_task_has_expected_name(self):
+ # Celery registers the @shared_task with a name
+ assert send_email_task.name == "mailing.send_email"
diff --git a/requirements/base.txt b/requirements/base.txt
index 09af423..14f3608 100644
--- a/requirements/base.txt
+++ b/requirements/base.txt
@@ -5,4 +5,5 @@ sendgrid==6.12.5
resend==2.16.0
celery==5.5.3
fastapi==0.119.0
-httpx==0.28.1
\ No newline at end of file
+httpx==0.28.1
+uvicorn==0.37.0
\ No newline at end of file
diff --git a/requirements/prod.txt b/requirements/prod.txt
index 706c480..9c9dec9 100644
--- a/requirements/prod.txt
+++ b/requirements/prod.txt
@@ -1,3 +1 @@
--r base.txt
-
-uvicorn[standard]==0.30.6
\ No newline at end of file
+-r base.txt
\ No newline at end of file
diff --git a/rest/api.py b/rest/api.py
index af8aee3..21d4bed 100644
--- a/rest/api.py
+++ b/rest/api.py
@@ -1,4 +1,15 @@
-from fastapi import FastAPI
+from typing import Any, Optional
+
+import logging
+from fastapi import FastAPI, status, HTTPException
+from pydantic import BaseModel, Field
+
+from mailing import services
+from mailing.mailing import send_email as send_email_sync
+from mailing.tasks import celery_app
+from rest.schemas import SendEmailRequest, SendEmailResponse
+
+logger = logging.getLogger(__name__)
app = FastAPI(title="Python Mailing REST API")
@@ -6,3 +17,71 @@
@app.get("/health")
def health():
return {"status": "ok"}
+
+
+@app.get("/health/queue")
+def health_queue():
+ """Check connectivity to the message broker (RabbitMQ)."""
+ try:
+ # Try establishing a broker connection; minimal retries for quick check
+ with celery_app.connection_or_acquire() as conn: # type: ignore[attr-defined]
+ conn.ensure_connection(max_retries=1)
+ return {"broker": "ok"}
+ except Exception as exc:
+ logger.warning("Broker health check failed: %s", exc)
+ raise HTTPException(
+ status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
+ detail={"error": "broker_unavailable", "message": str(exc)},
+ )
+
+
+@app.post("/emails", status_code=status.HTTP_202_ACCEPTED, response_model=SendEmailResponse)
+def send_email(payload: SendEmailRequest):
+ """
+ Enqueue an email to be sent asynchronously using mailing.services.send_email.
+
+ Returns a 202 Accepted with the Celery task id so clients can track progress.
+ Falls back to synchronous send when the queue is unavailable so email can still be sent.
+ """
+ try:
+ async_result = services.send_email(
+ provider=payload.provider,
+ subject=payload.subject,
+ message=payload.message,
+ recipient_list=payload.recipient_list,
+ from_email=payload.from_email,
+ html_content=payload.html_content,
+ api_key=payload.api_key,
+ )
+ task_id = getattr(async_result, "id", None) or getattr(async_result, "task_id", None)
+ return SendEmailResponse(task_id=str(task_id), status="queued")
+ except Exception as exc: # Broker/result backend might be unavailable
+ logger.error("Queue unavailable, falling back to synchronous send: %s", exc)
+ # Fallback: try sending synchronously so the user request can still succeed
+ try:
+ provider_response = send_email_sync(
+ provider=payload.provider,
+ subject=payload.subject,
+ message=payload.message,
+ recipient_list=payload.recipient_list,
+ from_email=payload.from_email,
+ html_content=payload.html_content,
+ api_key=payload.api_key,
+ )
+ # Use a sentinel task_id to indicate synchronous path
+ return SendEmailResponse(
+ task_id="sync",
+ status="sent",
+ extra={"provider_response": str(provider_response)},
+ )
+ except Exception as sync_exc:
+ logger.exception("Synchronous send failed after queue error: %s", sync_exc)
+ raise HTTPException(
+ status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
+ detail={
+ "error": "send_failed",
+ "message": "Queue unavailable and synchronous send failed.",
+ "queue_error": str(exc),
+ "sync_error": str(sync_exc),
+ },
+ )
diff --git a/rest/constants.py b/rest/constants.py
new file mode 100644
index 0000000..41d9245
--- /dev/null
+++ b/rest/constants.py
@@ -0,0 +1 @@
+EMAIL_STATUS_QUEUED = "queued"
diff --git a/rest/schemas.py b/rest/schemas.py
new file mode 100644
index 0000000..07b0c50
--- /dev/null
+++ b/rest/schemas.py
@@ -0,0 +1,20 @@
+from typing import Any
+
+from pydantic import Field, BaseModel
+from rest import constants
+
+
+class SendEmailRequest(BaseModel):
+ provider: str = Field(..., description="Email provider identifier")
+ subject: str
+ message: str
+ recipient_list: list[str]
+ from_email: str | None = None
+ html_content: str | None = None
+ api_key: str | None = None
+
+
+class SendEmailResponse(BaseModel):
+ task_id: str
+ status: str = constants.EMAIL_STATUS_QUEUED
+ extra: dict[str, Any] | None = None
diff --git a/rest/tests/conftest.py b/rest/tests/conftest.py
new file mode 100644
index 0000000..5e8aac4
--- /dev/null
+++ b/rest/tests/conftest.py
@@ -0,0 +1,42 @@
+import pytest
+
+
+class DummyAsyncResult:
+ def __init__(self, task_id: str):
+ self.id = task_id
+
+
+@pytest.fixture
+def email_payload_minimal() -> dict:
+ return {
+ "provider": "sendgrid",
+ "subject": "Hello",
+ "message": "World",
+ "recipient_list": ["user@example.com"],
+ "from_email": None,
+ "html_content": None,
+ "api_key": None,
+ }
+
+
+@pytest.fixture
+def email_payload_full() -> dict:
+ return {
+ "provider": "resend",
+ "subject": "Subj",
+ "message": "Msg",
+ "recipient_list": ["a@example.com", "b@example.com"],
+ "from_email": "from@example.com",
+ "html_content": "hi",
+ "api_key": "key-123",
+ }
+
+
+@pytest.fixture
+def dummy_async_result_abc123() -> DummyAsyncResult:
+ return DummyAsyncResult("abc123")
+
+
+@pytest.fixture
+def dummy_async_result_id999() -> DummyAsyncResult:
+ return DummyAsyncResult("id-999")
diff --git a/rest/tests/test_api_extras.py b/rest/tests/test_api_extras.py
new file mode 100644
index 0000000..6e312c6
--- /dev/null
+++ b/rest/tests/test_api_extras.py
@@ -0,0 +1,73 @@
+from fastapi import status
+from fastapi.testclient import TestClient
+import pytest
+
+from rest.api import app
+
+client = TestClient(app)
+
+
+class DummyConn:
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc, tb):
+ return False
+
+ def ensure_connection(self, max_retries=1):
+ return True
+
+
+class DummyCeleryApp:
+ def connection_or_acquire(self):
+ return DummyConn()
+
+
+class TestHealthQueue:
+ def test_health_queue_ok(self, monkeypatch):
+ # Patch the celery_app on the module to a dummy that successfully connects
+ import rest.api as api_module
+
+ monkeypatch.setattr(api_module, "celery_app", DummyCeleryApp())
+
+ response = client.get("/health/queue")
+ assert response.status_code == status.HTTP_200_OK
+ assert response.json() == {"broker": "ok"}
+
+ def test_health_queue_unavailable(self, monkeypatch):
+ import rest.api as api_module
+
+ class FailingCM:
+ def __enter__(self):
+ raise RuntimeError("cannot connect")
+
+ def __exit__(self, exc_type, exc, tb):
+ return False
+
+ class FailingApp:
+ def connection_or_acquire(self):
+ return FailingCM()
+
+ monkeypatch.setattr(api_module, "celery_app", FailingApp())
+
+ response = client.get("/health/queue")
+ assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE
+ data = response.json()
+ assert data["detail"]["error"] == "broker_unavailable"
+ assert "cannot connect" in data["detail"]["message"]
+
+
+class TestSendEmailFallback:
+ def test_fallback_to_sync_when_queue_unavailable(self, mocker, email_payload_minimal):
+ # services.send_email raises (e.g., broker down)
+ mocker.patch("mailing.services.send_email", side_effect=RuntimeError("broker down"))
+ # synchronous path returns some provider response object
+ mocker.patch("rest.api.send_email_sync", return_value={"provider": "ok"})
+
+ response = client.post("/emails", json=email_payload_minimal)
+ assert response.status_code == status.HTTP_202_ACCEPTED
+ data = response.json()
+ assert data["task_id"] == "sync"
+ assert data["status"] == "sent"
+ assert isinstance(data.get("extra"), dict)
+ assert "provider_response" in data["extra"]
diff --git a/rest/tests/test_send_email.py b/rest/tests/test_send_email.py
new file mode 100644
index 0000000..46324fb
--- /dev/null
+++ b/rest/tests/test_send_email.py
@@ -0,0 +1,54 @@
+from fastapi import status
+from fastapi.testclient import TestClient
+
+from rest.api import app
+
+client = TestClient(app)
+
+
+class TestSendEmailEndpoint:
+ def test_send_email_returns_202_and_task_id(
+ self, mocker, email_payload_minimal, dummy_async_result_abc123
+ ):
+ # Arrange
+ mocker.patch("mailing.services.send_email", return_value=dummy_async_result_abc123)
+
+ # Act
+ response = client.post("/emails", json=email_payload_minimal)
+
+ # Assert
+ assert response.status_code == status.HTTP_202_ACCEPTED
+ data = response.json()
+ assert data == {"task_id": "abc123", "status": "queued", "extra": None}
+
+ def test_send_email_calls_service_with_same_attrs(
+ self, mocker, email_payload_full, dummy_async_result_id999
+ ):
+ mocked = mocker.patch("mailing.services.send_email", return_value=dummy_async_result_id999)
+
+ response = client.post("/emails", json=email_payload_full)
+ assert response.status_code == status.HTTP_202_ACCEPTED
+
+ mocked.assert_called_once_with(
+ provider=email_payload_full["provider"],
+ subject=email_payload_full["subject"],
+ message=email_payload_full["message"],
+ recipient_list=email_payload_full["recipient_list"],
+ from_email=email_payload_full["from_email"],
+ html_content=email_payload_full["html_content"],
+ api_key=email_payload_full["api_key"],
+ )
+
+ def test_fallback_to_sync_when_queue_unavailable(self, mocker, email_payload_minimal):
+ # services.send_email raises (e.g., broker down)
+ mocker.patch("mailing.services.send_email", side_effect=RuntimeError("broker down"))
+ # synchronous path returns some provider response object
+ mocker.patch("rest.api.send_email_sync", return_value={"provider": "ok"})
+
+ response = client.post("/emails", json=email_payload_minimal)
+ assert response.status_code == status.HTTP_202_ACCEPTED
+ data = response.json()
+ assert data["task_id"] == "sync"
+ assert data["status"] == "sent"
+ assert isinstance(data.get("extra"), dict)
+ assert "provider_response" in data["extra"]