Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,7 @@ jobs:
pip install -r requirements.txt
- name: Run migrations
run: alembic upgrade head
- name: Start server
run: uvicorn app.fastapi_app:app --host 0.0.0.0 --port 8000 &
- name: Run tests
run: pytest -q
21 changes: 2 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,25 +108,8 @@ while True:
```

## Architecture

```
+-----------+ +----------+
| Producer | | Worker |
| (enqueue) | (lease) |
+----+---+ +----+----+
| |
v v
+----------------------------------+
| FastAPI Service |
| /jobs /queues/*/lease /dashboard
+-----------------+----------------+
|
v
+----------------------------------+
| PostgreSQL Database |
| jobs table users table indexes
+----------------------------------+
```
Refer to the below image:
![Architecture Diagram](system-design/Architecture_diagram.png)

## Features

Expand Down
4 changes: 4 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ def _set_test_env() -> None:
"""
os.environ.setdefault("OPENQUEUE_TOKEN_HMAC_SECRET", "openqueue-test-secret")
os.environ.setdefault("OPENQUEUE_ENV", "test")
os.environ.setdefault(
"DATABASE_URL",
"postgresql://openqueue:openqueue_password@localhost:5432/openqueue",
)


# pytest-asyncio guidance (no custom event_loop fixture)
Expand Down
225 changes: 225 additions & 0 deletions tests/test_api_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
from __future__ import annotations

import os
import hashlib
import hmac
import asyncio
import uuid
from typing import Iterator

import asyncpg
import httpx
import pytest


def _unique_queue(prefix: str) -> str:
return f"{prefix}-{uuid.uuid4().hex[:10]}"


def _sha256_hex(value: str) -> str:
return hashlib.sha256(value.encode("utf-8")).hexdigest()


def _hmac_sha256_hex(secret: str, value: str) -> str:
return hmac.new(
key=secret.encode("utf-8"),
msg=value.encode("utf-8"),
digestmod=hashlib.sha256,
).hexdigest()


@pytest.fixture(scope="session")
def api_base_url() -> str:
return os.getenv("OPENQUEUE_API_URL", "http://localhost:8000").rstrip("/")


@pytest.fixture(scope="session")
def api_token() -> str:
return os.getenv(
"OPENQUEUE_API_TOKEN",
"oq_test_integration_token",
)


@pytest.fixture(scope="session")
def database_url() -> str:
return os.getenv(
"DATABASE_URL",
"postgresql://openqueue:openqueue_password@localhost:5432/openqueue",
)


@pytest.fixture(scope="session")
def client(api_base_url: str) -> Iterator[httpx.Client]:
client = httpx.Client(base_url=api_base_url, timeout=10.0)
try:
health = client.get("/health")
if health.status_code != 200:
pytest.skip(f"API not healthy: status={health.status_code}")
except httpx.HTTPError as exc:
pytest.skip(f"API not reachable at {api_base_url}: {exc}")
yield client
client.close()


@pytest.fixture
def auth_headers(api_token: str, database_url: str) -> dict[str, str]:
async def _ensure_user() -> None:
conn = await asyncpg.connect(database_url)
try:
await conn.execute(
"""
INSERT INTO users (email, api_token_hash, is_active)
VALUES ($1, $2, TRUE)
ON CONFLICT (api_token_hash) DO NOTHING
""",
"integration-sha@openqueue.local",
_sha256_hex(api_token),
)

# Optional compatibility for environments where API uses HMAC hashing.
test_secret = os.getenv("OPENQUEUE_TOKEN_HMAC_SECRET", "openqueue-test-secret")
await conn.execute(
"""
INSERT INTO users (email, api_token_hash, is_active)
VALUES ($1, $2, TRUE)
ON CONFLICT (api_token_hash) DO NOTHING
""",
"integration-hmac@openqueue.local",
_hmac_sha256_hex(test_secret, api_token),
)
finally:
await conn.close()

asyncio.run(_ensure_user())
return {"Authorization": f"Bearer {api_token}"}


def test_health_and_ready(client: httpx.Client) -> None:
health = client.get("/health")
assert health.status_code == 200
assert health.json()["status"] == "ok"

ready = client.get("/ready")
assert ready.status_code == 200
assert ready.json()["status"] == "ready"


def test_job_lifecycle_enqueue_lease_heartbeat_ack_detail(
client: httpx.Client,
auth_headers: dict[str, str],
) -> None:
queue_name = _unique_queue("it-lifecycle")

create = client.post(
"/jobs",
headers=auth_headers,
json={"queue_name": queue_name, "payload": {"task": "lifecycle"}},
)
assert create.status_code == 201
job_id = create.json()["job_id"]

lease = client.post(
f"/queues/{queue_name}/lease",
headers=auth_headers,
json={"worker_id": "integration-worker", "lease_seconds": 30},
)
assert lease.status_code == 200
leased = lease.json()
assert leased is not None
assert leased["job"]["id"] == job_id
lease_token = leased["lease_token"]

heartbeat = client.post(
f"/jobs/{job_id}/heartbeat",
headers=auth_headers,
json={"lease_token": lease_token, "lease_seconds": 30},
)
assert heartbeat.status_code == 200

ack = client.post(
f"/jobs/{job_id}/ack",
headers=auth_headers,
json={"lease_token": lease_token, "result": {"ok": True}},
)
assert ack.status_code == 200

detail = client.get(f"/jobs/{job_id}/detail", headers=auth_headers)
assert detail.status_code == 200
job = detail.json()
assert job["status"] == "completed"
assert job["result"] == {"ok": True}


def test_list_jobs_filters_and_pagination(
client: httpx.Client,
auth_headers: dict[str, str],
) -> None:
queue_name = _unique_queue("it-list")

for idx in range(3):
resp = client.post(
"/jobs",
headers=auth_headers,
json={"queue_name": queue_name, "payload": {"idx": idx}},
)
assert resp.status_code == 201

page_1 = client.get(
"/jobs",
headers=auth_headers,
params={"queue_name": queue_name, "status": "pending", "limit": 2, "offset": 0},
)
assert page_1.status_code == 200
body_1 = page_1.json()
assert body_1["total"] >= 3
assert body_1["limit"] == 2
assert body_1["offset"] == 0
assert len(body_1["items"]) == 2
assert all(item["status"] == "pending" for item in body_1["items"])

page_2 = client.get(
"/jobs",
headers=auth_headers,
params={"queue_name": queue_name, "status": "pending", "limit": 2, "offset": 2},
)
assert page_2.status_code == 200
body_2 = page_2.json()
assert body_2["offset"] == 2
assert len(body_2["items"]) >= 1


def test_cancel_pending_job(client: httpx.Client, auth_headers: dict[str, str]) -> None:
queue_name = _unique_queue("it-cancel")
create = client.post(
"/jobs",
headers=auth_headers,
json={"queue_name": queue_name, "payload": {"task": "cancel-me"}},
)
assert create.status_code == 201
job_id = create.json()["job_id"]

cancel = client.post(f"/jobs/{job_id}/cancel", headers=auth_headers)
assert cancel.status_code == 200
assert cancel.json()["status"] == "cancelled"

status_res = client.get(f"/jobs/{job_id}", headers=auth_headers)
assert status_res.status_code == 200
assert status_res.json()["status"] == "cancelled"


def test_invalid_job_id_returns_422(
client: httpx.Client,
auth_headers: dict[str, str],
) -> None:
bad_job_id = "not-a-valid-uuid"

status_res = client.get(f"/jobs/{bad_job_id}", headers=auth_headers)
assert status_res.status_code == 422

ack_res = client.post(
f"/jobs/{bad_job_id}/ack",
headers=auth_headers,
json={"lease_token": str(uuid.uuid4()), "result": {"ok": True}},
)
assert ack_res.status_code == 422