Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 63 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ jobs:
- name: Install dependencies
run: |
if [ "${{ matrix.python-version }}" == "3.13" ]; then
# Sync everything EXCEPT the 'numpy' extra
uv sync --extra rabbitmq --extra redis --dev --extra mosquitto
# Sync everything EXCEPT the 'numpy' extra that currently has problems on github
uv sync --extra rabbitmq --extra redis --dev --extra mosquitto --extra nats
else
# Install everything for older versions
uv sync --all-extras
Expand All @@ -51,7 +51,6 @@ jobs:
uses: codecov/codecov-action@v5
if: matrix.python-version == '3.12'
with:
file: ./coverage.xml
flags: unittests
name: codecov-umbrella

Expand Down Expand Up @@ -236,3 +235,64 @@ jobs:
run: uv run python -m pytest tests/test_integration.py -k mosquitto -vvv -s
env:
MQTT_HOST: 127.0.0.1

integration_test_nats:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- name: Run NATS server
run: |
docker run -d -p 4222:4222 --name nats nats:2-alpine -js

- name: Log NATS container
run: |
sleep 3
echo "=== Container logs ==="
docker logs nats
echo "=== Config file ==="
docker exec nats cat /etc/nats/nats-server.conf
echo "=== Processes ==="
docker exec nats ps aux
echo "=== Network ==="
docker exec nats netstat -tlnp || docker exec nats ss -tlnp

- name: Install uv
uses: astral-sh/setup-uv@v7
with:
enable-cache: true
cache-dependency-glob: "uv.lock"

- name: Set up Python 3.12
run: uv python install 3.12

- name: Create virtual environment
run: uv venv --python 3.12

- name: Install dependencies
run: uv sync --all-extras

- name: Smoke test NATS
run: |
# Wait for the port to be open
timeout 30s sh -c 'until nc -z localhost 4222; do sleep 1; done'

uv run python -c "
import nats
import sys
import asyncio

async def smoke_test():
try:
nats.connect()
print('NATS connected Successfully')
sys.exit(0)
except Exception as e:
print(f'Failed to connect: {e}')
sys.exit(1)

asyncio.run(smoke_test())
"
- name: Run tests
run: uv run python -m pytest tests/test_integration.py -k nats -vvv -s
env:
NATS_HOST: localhost
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ t:
PYTHONASYNCIODEBUG=1 PYTHONBREAKPOINT=ipdb.set_trace uv run pytest ${t} -s -vvvv --durations=0

integration-test:
uv run pytest tests/ -m "integration" ${t}
uv run pytest tests/ -m "integration" -k rabbitmq ${t}
uv run pytest tests/ -m "integration" -k redis ${t}
uv run pytest tests/ -m "integration" -k python ${t}
uv run pytest tests/ -m "integration" -k mosquitto ${t}
uv run pytest tests/ -m "integration" -k nats ${t} # run ./nats-server -js -sd nats_storage

integration-test-py310:
source .venv310/bin/activate
Expand Down Expand Up @@ -97,8 +101,8 @@ test-py313:
# Releasing
.PHONY: docs clean build-package publish-test publish-pypi copy-md
copy-md:
cp ./README.md docs/source/intro.md
cp ./QUICK_START.md docs/source/quick_start.md
cp ./RECIPES.md docs/source/recipes.md

docs: copy-md
uv run sphinx-build -b html docs/source docs/build/html
Expand Down
25 changes: 14 additions & 11 deletions QUICK_START.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ You can also add it manually to pyproject.toml dependencies:

```toml
dependencies = [
"protobunny[rabbitmq, numpy]>=0.1.0",
"protobunny[rabbitmq, numpy]>=0.1.2a2",
# your other dependencies ...
]
```
Expand Down Expand Up @@ -179,12 +179,15 @@ def worker1(task: mml.main.tasks.TaskMessage) -> None:

def worker2(task: mml.main.tasks.TaskMessage) -> None:
print("2- Working on:", task)

pb.subscribe(mml.main.tasks.TaskMessage, worker1)
import mymessagelib as mml
pb.subscribe(mml.main.tasks.TasqkMessage, worker1)
pb.subscribe(mml.main.tasks.TaskMessage, worker2)

pb.publish(mml.main.tasks.TaskMessage(content="test1"))
pb.publish(mml.main.tasks.TaskMessage(content="test2"))
pb.publish(mml.main.tasks.TaskMessage(content="test3"))
from protobunny.models import ProtoBunnyMessage
print(isinstance(mml.main.tasks.TaskMessage(), ProtoBunnyMessage))
```

You can also introspect/manage an underlying shared queue:
Expand Down Expand Up @@ -293,10 +296,14 @@ if conn.is_connected():
conn.close()
```

If you set the `generated-package-root` folder option, you might need to add the path to your `sys.path`.
If you set the `generated-package-root` folder option, you might need to add that path to your `sys.path`.
You can do it conveniently by calling `config_lib` on top of your module, before importing the library:

```python
pb.config_lib()
import protobunny as pb
pb.config_lib()
# now you can import the library from the generated package root
import mymessagelib as mml
```

## Complete example
Expand All @@ -310,7 +317,7 @@ version = "0.1.0"
description = "Project to test protobunny"
requires-python = ">=3.10"
dependencies = [
"protobunny[rabbitmq,redis,numpy,mosquitto] >=0.1.2a1",
"protobunny[rabbitmq,redis,numpy,mosquitto]>=0.1.2a1",
]

[tool.protobunny]
Expand Down Expand Up @@ -374,7 +381,6 @@ import asyncio
import logging
import sys


import protobunny as pb
from protobunny import asyncio as pb_asyncio

Expand All @@ -386,12 +392,11 @@ pb.config_lib()

import mymessagelib as ml


logging.basicConfig(
level=logging.INFO, format="[%(asctime)s %(levelname)s] %(name)s - %(message)s"
)
log = logging.getLogger(__name__)
conf = pb.default_configuration
conf = pb.config


class TestLibAsync:
Expand All @@ -413,15 +418,13 @@ class TestLibAsync:
async def on_message_mymessage(self, message: ml.main.MyMessage) -> None:
log.info("Got main message: %s", message)


def run_forever(self):
asyncio.run(self.main())

def log_callback(self, incoming, body) -> None:
log.info(f"LOG {incoming.routing_key}: {body}")

async def main(self):

await pb_asyncio.subscribe_logger(self.log_callback)
await pb_asyncio.subscribe(ml.main.tasks.TaskMessage, self.worker1)
await pb_asyncio.subscribe(ml.main.tasks.TaskMessage, self.worker2)
Expand Down
100 changes: 62 additions & 38 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,30 +1,48 @@
# Protobunny

```{warning}
Note: The project is in early development.
```
> [!WARNING]
> The project is in early development.


Protobunny is the open-source evolution of [AM-Flow](https://am-flow.com)'s internal messaging library.
While the original was purpose-built for RabbitMQ, this version has been completely re-engineered to provide a unified,
type-safe interface for any message broker, including Redis and MQTT.
type-safe interface for several message brokers, including Redis, NATS, and MQTT.

It simplifies messaging for asynchronous tasks by providing:
It simplifies messaging for asynchronous message handling by providing:

* A clean “message-first” API
* Python class generation from Protobuf messages using betterproto
* Connections facilities for backends
* A clean “message-first” API by using your protobuf definitions
* Message publishing/subscribing with typed topics
* Support also “task-like queues (shared/competing consumers) vs. broadcast subscriptions
* Supports "task-like" queues (shared/competing consumers) vs. broadcast subscriptions
* Generate and consume `Result` messages (success/failure + optional return payload)
* Transparent messages serialization/deserialization
* Support async and sync contexts
* Transparently serialize "JSON-like" payload fields (numpy-friendly)
* Transparently serialize/deserialize custom "JSON-like" payload fields (numpy-friendly)
* Support async and sync contexts

Supported backends in the current version are:

- RabbitMQ
- Redis
- NATS
- Mosquitto
- Python "backend" with Queue/asyncio.Queue for local in-processing testing


> [!NOTE]
> Protobunny handles backend-specific logic internally to provide a consistent experience and a lean interface.
> Direct access to the internal NATS or Redis clients is intentionally restricted.
> If your project depends on specialized backend parameters not covered by our API, you may find the abstraction too restrictive.

## Requirements

- Python >= 3.10, < 3.14
- Backend message broker (e.g. RabbitMQ)
## Minimal requirements

- Python >= 3.10 <=3.13
- Core Dependencies: betterproto 2.0.0b7, grpcio-tools>=1.62.0
- Backend Drivers (Optional based on your usage):
- NATS: nats-py (Requires NATS Server v2.10+ for full JetStream support).
- Redis: redis (Requires Redis Server v6.2+ for Stream support).
- RabbitMQ: aio-pika
- Mosquitto: aiomqtt


## Project scope

Expand All @@ -39,40 +57,46 @@ Protobunny is designed for teams who use messaging to coordinate work between mi
- Optional validation of required fields
- Builtin logging service

---
## Why Protobunny?

## Usage
While there are many messaging libraries for Python, Protobunny is built specifically for teams that treat **Protobuf as the single source of truth**.

* **Type-Safe by Design**: Built natively for `protobuf/betterproto`.
* **Semantic Routing**: Zero-config infrastructure. Protobunny uses your Protobuf package structure to decide if a message should be broadcast (Pub/Sub) or queued (Producer/Consumer).
* **Backend Agnostic**: You can choose between RabbitMQ, Redis, NATS, and Mosquitto. Python for local testing.
* **Sync & Async**: Support for both `asyncio` and traditional synchronous workloads.
* **Battle-Tested**: Derived from internal libraries used in production systems at AM-Flow.
---

See the [Quick example on GitHub](https://github.com/am-flow/protobunny/blob/main/QUICK_START.md) for installation and quick start guide.
### Feature Comparison with some existing libraries

Full docs are available at [https://am-flow.github.io/protobunny/](https://am-flow.github.io/protobunny/).
| Feature | **Protobunny** | **FastStream** | **Celery** |
|:-----------------------|:-------------------------|:-------------------|:------------------------|
| **Multi-Backend** | ✅ Yes | ✅ Yes | ⚠️ (Tasks only) |
| **Typed Protobufs** | ✅ Native (Betterproto) | ⚠️ Manual/Pydantic | ❌ No |
| **Sync + Async** | ✅ Yes | ✅ Yes | ❌ Sync focus |
| **Pattern Routing** | ✅ Auto (`tasks` pkg) | ❌ Manual Config | ✅ Fixed |
| **Framework Agnostic** | ✅ Yes | ✅ Yes | ❌ Heavyweight |

---

## Development
## Usage

### Run tests
```bash
make test
```
See the [Quick example on GitHub](https://github.com/am-flow/protobunny/blob/main/QUICK_START.md) or on the [docs site](https://am-flow.github.io/protobunny/quickstart.html).

### Integration tests (RabbitMQ required)
Documentation home page: [https://am-flow.github.io/protobunny/](https://am-flow.github.io/protobunny/).

Integration tests expect RabbitMQ to be running (for example via Docker Compose in this repo):
```bash
docker compose up -d
make integration-test
```
---

### Future work

- Support grcp
- Support for RabbitMQ certificates (through `pika`)
- More backends:
- NATS
- Kafka
- Cloud providers (AWS SQS/SNS)
### Roadmap

- [x] **Core Support**: Redis, RabbitMQ, Mosquitto.
- [x] **Semantic Patterns**: Automatic `tasks` package routing.
- [x] **Arbistrary dictionary parsing**: Transparently parse JSON-like fields as dictionaries/lists by using protobunny JsonContent type.
- [x] **Result workflow**: Subscribe to results topics and receive protobunny `Result` messages produced by your callbacks.
- [x] **Cloud-Native**: NATS (Core & JetStream) integration.
- [ ] **Cloud Providers**: AWS (SQS/SNS) and GCP Pub/Sub.
- [ ] **More backends**: Kafka support.
- [ ] **gRPC** Direct Call support

---

Expand Down
Loading