Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ services:
build:
context: .
dockerfile: Dockerfile
container_name: python_mailing_api
container_name: rest_mailing
volumes:
- .:/app
command: bash -lc "uvicorn rest.api:app --host 0.0.0.0 --port 6245"
environment:
- PYTHONUNBUFFERED=1
- CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672//
- CELERY_RESULT_BACKEND=rpc://
ports:
- "6245:6245"
depends_on:
Expand Down
6 changes: 4 additions & 2 deletions mailing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ def send_email_task(
) -> Any:
"""Celery task that sends an email via the selected provider.

Delegates to mailing.mailing.send_email.
Delegates to mailing.mailing.send_email and ensures the return value is JSON-serializable
for Celery backends (e.g., RPC), converting provider-specific response objects into
simple dictionaries.
"""

return send_email(
send_email(
provider=provider,
subject=subject,
message=message,
Expand Down
30 changes: 30 additions & 0 deletions mailing/tests/test_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import pytest
from mailing.tasks import send_email_task


class TestSendEmailTask:
def test_task_calls_mailing_send_email_with_all_arguments(self, mocker):
# Arrange
mocked = mocker.patch("mailing.tasks.send_email", return_value={"ok": True})

kwargs = {
"provider": "sendgrid",
"subject": "Hello",
"message": "World",
"recipient_list": ["a@example.com", "b@example.com"],
"from_email": "from@example.com",
"html_content": "<b>Hi</b>",
"api_key": "key-123",
}

# Act
result = send_email_task(**kwargs)

# Assert
# The Celery task currently returns None (delegates to synchronous helper)
assert result is None
mocked.assert_called_once_with(**kwargs)

def test_task_has_expected_name(self):
# Celery registers the @shared_task with a name
assert send_email_task.name == "mailing.send_email"
3 changes: 2 additions & 1 deletion requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ sendgrid==6.12.5
resend==2.16.0
celery==5.5.3
fastapi==0.119.0
httpx==0.28.1
httpx==0.28.1
uvicorn==0.37.0
4 changes: 1 addition & 3 deletions requirements/prod.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
-r base.txt

uvicorn[standard]==0.30.6
-r base.txt
81 changes: 80 additions & 1 deletion rest/api.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,87 @@
from fastapi import FastAPI
from typing import Any, Optional

import logging
from fastapi import FastAPI, status, HTTPException
from pydantic import BaseModel, Field

from mailing import services
from mailing.mailing import send_email as send_email_sync
from mailing.tasks import celery_app
from rest.schemas import SendEmailRequest, SendEmailResponse

logger = logging.getLogger(__name__)

app = FastAPI(title="Python Mailing REST API")


@app.get("/health")
def health():
return {"status": "ok"}


@app.get("/health/queue")
def health_queue():
"""Check connectivity to the message broker (RabbitMQ)."""
try:
# Try establishing a broker connection; minimal retries for quick check
with celery_app.connection_or_acquire() as conn: # type: ignore[attr-defined]
conn.ensure_connection(max_retries=1)
return {"broker": "ok"}
except Exception as exc:
logger.warning("Broker health check failed: %s", exc)
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail={"error": "broker_unavailable", "message": str(exc)},
)


@app.post("/emails", status_code=status.HTTP_202_ACCEPTED, response_model=SendEmailResponse)
def send_email(payload: SendEmailRequest):
"""
Enqueue an email to be sent asynchronously using mailing.services.send_email.

Returns a 202 Accepted with the Celery task id so clients can track progress.
Falls back to synchronous send when the queue is unavailable so email can still be sent.
"""
try:
async_result = services.send_email(
provider=payload.provider,
subject=payload.subject,
message=payload.message,
recipient_list=payload.recipient_list,
from_email=payload.from_email,
html_content=payload.html_content,
api_key=payload.api_key,
)
task_id = getattr(async_result, "id", None) or getattr(async_result, "task_id", None)
return SendEmailResponse(task_id=str(task_id), status="queued")
except Exception as exc: # Broker/result backend might be unavailable
logger.error("Queue unavailable, falling back to synchronous send: %s", exc)
# Fallback: try sending synchronously so the user request can still succeed
try:
provider_response = send_email_sync(
provider=payload.provider,
subject=payload.subject,
message=payload.message,
recipient_list=payload.recipient_list,
from_email=payload.from_email,
html_content=payload.html_content,
api_key=payload.api_key,
)
# Use a sentinel task_id to indicate synchronous path
return SendEmailResponse(
task_id="sync",
status="sent",
extra={"provider_response": str(provider_response)},
)
except Exception as sync_exc:
logger.exception("Synchronous send failed after queue error: %s", sync_exc)
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail={
"error": "send_failed",
"message": "Queue unavailable and synchronous send failed.",
"queue_error": str(exc),
"sync_error": str(sync_exc),
},
)
1 change: 1 addition & 0 deletions rest/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
EMAIL_STATUS_QUEUED = "queued"
20 changes: 20 additions & 0 deletions rest/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from typing import Any

from pydantic import Field, BaseModel
from rest import constants


class SendEmailRequest(BaseModel):
provider: str = Field(..., description="Email provider identifier")
subject: str
message: str
recipient_list: list[str]
from_email: str | None = None
html_content: str | None = None
api_key: str | None = None


class SendEmailResponse(BaseModel):
task_id: str
status: str = constants.EMAIL_STATUS_QUEUED
extra: dict[str, Any] | None = None
42 changes: 42 additions & 0 deletions rest/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import pytest


class DummyAsyncResult:
def __init__(self, task_id: str):
self.id = task_id


@pytest.fixture
def email_payload_minimal() -> dict:
return {
"provider": "sendgrid",
"subject": "Hello",
"message": "World",
"recipient_list": ["user@example.com"],
"from_email": None,
"html_content": None,
"api_key": None,
}


@pytest.fixture
def email_payload_full() -> dict:
return {
"provider": "resend",
"subject": "Subj",
"message": "Msg",
"recipient_list": ["a@example.com", "b@example.com"],
"from_email": "from@example.com",
"html_content": "<b>hi</b>",
"api_key": "key-123",
}


@pytest.fixture
def dummy_async_result_abc123() -> DummyAsyncResult:
return DummyAsyncResult("abc123")


@pytest.fixture
def dummy_async_result_id999() -> DummyAsyncResult:
return DummyAsyncResult("id-999")
73 changes: 73 additions & 0 deletions rest/tests/test_api_extras.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from fastapi import status
from fastapi.testclient import TestClient
import pytest

from rest.api import app

client = TestClient(app)


class DummyConn:
def __enter__(self):
return self

def __exit__(self, exc_type, exc, tb):
return False

def ensure_connection(self, max_retries=1):
return True


class DummyCeleryApp:
def connection_or_acquire(self):
return DummyConn()


class TestHealthQueue:
def test_health_queue_ok(self, monkeypatch):
# Patch the celery_app on the module to a dummy that successfully connects
import rest.api as api_module

monkeypatch.setattr(api_module, "celery_app", DummyCeleryApp())

response = client.get("/health/queue")
assert response.status_code == status.HTTP_200_OK
assert response.json() == {"broker": "ok"}

def test_health_queue_unavailable(self, monkeypatch):
import rest.api as api_module

class FailingCM:
def __enter__(self):
raise RuntimeError("cannot connect")

def __exit__(self, exc_type, exc, tb):
return False

class FailingApp:
def connection_or_acquire(self):
return FailingCM()

monkeypatch.setattr(api_module, "celery_app", FailingApp())

response = client.get("/health/queue")
assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE
data = response.json()
assert data["detail"]["error"] == "broker_unavailable"
assert "cannot connect" in data["detail"]["message"]


class TestSendEmailFallback:
def test_fallback_to_sync_when_queue_unavailable(self, mocker, email_payload_minimal):
# services.send_email raises (e.g., broker down)
mocker.patch("mailing.services.send_email", side_effect=RuntimeError("broker down"))
# synchronous path returns some provider response object
mocker.patch("rest.api.send_email_sync", return_value={"provider": "ok"})

response = client.post("/emails", json=email_payload_minimal)
assert response.status_code == status.HTTP_202_ACCEPTED
data = response.json()
assert data["task_id"] == "sync"
assert data["status"] == "sent"
assert isinstance(data.get("extra"), dict)
assert "provider_response" in data["extra"]
54 changes: 54 additions & 0 deletions rest/tests/test_send_email.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from fastapi import status
from fastapi.testclient import TestClient

from rest.api import app

client = TestClient(app)


class TestSendEmailEndpoint:
def test_send_email_returns_202_and_task_id(
self, mocker, email_payload_minimal, dummy_async_result_abc123
):
# Arrange
mocker.patch("mailing.services.send_email", return_value=dummy_async_result_abc123)

# Act
response = client.post("/emails", json=email_payload_minimal)

# Assert
assert response.status_code == status.HTTP_202_ACCEPTED
data = response.json()
assert data == {"task_id": "abc123", "status": "queued", "extra": None}

def test_send_email_calls_service_with_same_attrs(
self, mocker, email_payload_full, dummy_async_result_id999
):
mocked = mocker.patch("mailing.services.send_email", return_value=dummy_async_result_id999)

response = client.post("/emails", json=email_payload_full)
assert response.status_code == status.HTTP_202_ACCEPTED

mocked.assert_called_once_with(
provider=email_payload_full["provider"],
subject=email_payload_full["subject"],
message=email_payload_full["message"],
recipient_list=email_payload_full["recipient_list"],
from_email=email_payload_full["from_email"],
html_content=email_payload_full["html_content"],
api_key=email_payload_full["api_key"],
)

def test_fallback_to_sync_when_queue_unavailable(self, mocker, email_payload_minimal):
# services.send_email raises (e.g., broker down)
mocker.patch("mailing.services.send_email", side_effect=RuntimeError("broker down"))
# synchronous path returns some provider response object
mocker.patch("rest.api.send_email_sync", return_value={"provider": "ok"})

response = client.post("/emails", json=email_payload_minimal)
assert response.status_code == status.HTTP_202_ACCEPTED
data = response.json()
assert data["task_id"] == "sync"
assert data["status"] == "sent"
assert isinstance(data.get("extra"), dict)
assert "provider_response" in data["extra"]