This examples are for sync mode.
For async mode, import protobunny with from protobunny import asyncio as pb.
For a full example, using FastAPI, Redis, and protobunny, see this repo.
Add protobunny to your pyproject.toml dependencies (add the backend you need as extra dependency):
uv add protobunny[rabbitmq, numpy]
# or
poetry add protobunnyYou can also add it manually to pyproject.toml dependencies:
dependencies = [
"protobunny[rabbitmq, numpy]>=0.1.2a2",
# your other dependencies ...
]Configure the library in pyproject.toml:
[tool.protobunny]
messages-directory = "messages"
messages-prefix = "acme"
generated-package-name = "mymessagelib.codegen"
mode = "async" # or "sync"
backend = "rabbitmq" # available backends are ['rabbitmq', 'redis', 'mosquitto', 'python']uv lock --prerelease=allow # or poetry lock
uv sync # or poetry sync/installProtobunny connects to the broker (e.g. RabbitMQ) by reading environment variables (RABBITMQ_URL).
# export these variables
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASS=guest
RABBITMQ_VHOST=/testFor other backends, replace RABBITMQ_ prefix with the backend name uppercase (e.g. REDIS_HOST).
If you are using the python backend, you don't need to set any environment variables.
Available backends are:
- RabbitMQ
- Redis
- Mosquitto
- NATS
- Python for local testing (in-process)
mkdir messages
mkdir messages/acme
mkdir messages/acme/tests
# etc.A message that uses JSON-like fields can look like this:
/*test.proto*/
syntax = "proto3";
import "protobunny/commons.proto";
package acme.tests;
message TestMessage {
string content = 10;
int64 number = 20;
commons.JsonContent data = 25;
/* Field with JSON-like content */
optional string detail=30;
/* Optional field */
}The library comes with a protoc wrapper that generates Python code from your protobuf messages
and executes a postcompilation step to manipulate the generated code.
protobunny generateIn mymessagelib/codegen you should see the generated message classes, mirroring the package declaration in your protobuf files.
If you need to generate the classes in another package (e.g. for tests), you can pass the --python_betterproto_out option:
protobunny generate -I messages --python_betterproto_out=tests tests/**/*.proto tests/*.protoThe following examples are for sync mode and can run from the python shell.
To use the async mode, import protobunny with from protobunny import asyncio as pb.
import protobunny as pb
import mymessagelib as mml
def on_message(message: mml.tests.TestMessage) -> None:
print("Got:", message)
pb.subscribe(mml.tests.TestMessage, on_message)
# Prints
# 'Got: TestMessage(content="hello", number=1, data={"test": "test"}, detail=None)'
# when a message is receivedThe following code can run in another process or thread and publishes a message to the topic acme.test.TestMessage.
import protobunny as pb
import mymessagelib as mml
msg = mml.tests.TestMessage(content="hello", number=1, data={"test": "test"})
pb.publish(msg)All messages that are under a protobuffer tasks package are treated as shared queues.
/*
This .proto file contains protobuf message definitions for testing tasks
*/
syntax = "proto3";
import "protobunny/commons.proto";
// Define the tasks package
package tests.tasks;
message TaskMessage {
string content = 10;
repeated float weights = 30 [packed = true];
repeated int64 bbox = 40 [packed = true];
optional commons.JsonContent options=50;
}If a message is treated as a "task queue" message by the library conventions,
subscribe will use a shared queue (multiple workers consuming messages from one queue).
The load is distributed among workers (competing consumers).
import protobunny as pb
import mymessagelib as mml
def worker1(task: mml.main.tasks.TaskMessage) -> None:
print("1- Working on:", task)
def worker2(task: mml.main.tasks.TaskMessage) -> None:
print("2- Working on:", task)
import mymessagelib as mml
pb.subscribe(mml.main.tasks.TasqkMessage, worker1)
pb.subscribe(mml.main.tasks.TaskMessage, worker2)
pb.publish(mml.main.tasks.TaskMessage(content="test1"))
pb.publish(mml.main.tasks.TaskMessage(content="test2"))
pb.publish(mml.main.tasks.TaskMessage(content="test3"))
from protobunny.models import ProtoBunnyMessage
print(isinstance(mml.main.tasks.TaskMessage(), ProtoBunnyMessage))You can also introspect/manage an underlying shared queue:
import protobunny as pb
import mymessagelib as mml
queue = pb.get_queue(mml.main.tasks.TaskMessage)
# Only shared queues can be purged and counted
count = queue.get_message_count()
print("Queued:", count)
queue.purge()import protobunny as pb
import mymessagelib as mml
source = mml.tests.TestMessage(content="hello", number=1)
# create a result message from the source message
result = source.make_result(return_value={"ok": True})
# publish the result
pb.publish_result(result)import protobunny as pb
import mymessagelib as mml
def on_result(res: pb.results.Result) -> None:
print("Result for:", res.source)
print("Return code:", res.return_code)
print("Return value:", res.return_value)
print("Error:", res.error)
pb.subscribe_results(mml.tests.TestMessage, on_result)Protobuf supports maps and lists as message fields. Maps can't have arbitrary structures: the values of a map must be of the same type.
Protobunny adds a layer over protobuf to carry arbitrary structured payloads (dicts/lists), by supporting transparent conversion so you can work with normal Python structures:
- Serialize: dictionaries/lists are encoded into the message field
- Deserialize: those fields come back as Python structures
This is particularly useful for metrics, metadata, and structured return values in results.
Example:
The TaskMessage above has a options field that can carry arbitrary JSON-like payload.
import mymessagelib as mml
msg = mml.tests.TaskMessage(content="test1", options={"test":"Test", "number_list": [1,2,3]})
serialized = bytes(msg)
print(serialized)
deserialized = mml.tests.TaskMessage.parse(serialized)
print(deserialized)
assert deserialized.options == {"test":"Test", "number_list": [1,2,3]}Protobunny includes a convenience subscription for logging message traffic by subscribing to a broad wildcard topic and printing JSON payloads:
import protobunny as pb
def log_callback(_incoming_message, body: str) -> None:
print(body)
pb.subscribe_logger(log_callback)You can start a logger worker with:
protobunny logIf you need explicit connection lifecycle control, you can access the shared connection object:
import protobunny as pb
conn = pb.connect()
if conn.is_connected():
conn.close()If you set the generated-package-root folder option, you might need to add that path to your sys.path.
You can do it conveniently by calling config_lib on top of your module, before importing the library:
import protobunny as pb
pb.config_lib()
# now you can import the library from the generated package root
import mymessagelib as mml[project]
name = "test-project"
version = "0.1.0"
description = "Project to test protobunny"
requires-python = ">=3.10"
dependencies = [
"protobunny[rabbitmq,redis,numpy,mosquitto]>=0.1.2a1",
]
[tool.protobunny]
messages-directory = "messages"
messages-prefix = "acme"
generated-package-name = "mymessagelib"
generated-package-root = "codegen"
backend = "rabbitmq" # configure here the backend (choose between rabbitmq, redis, mosquitto, nats, python)
mode = "async"/* messages/my_message.proto */
syntax = "proto3";
package main;
message MyMessage {
string content = 10;
int64 number = 20;
optional string detail=30;
}
/* messages/tasks.proto */
syntax = "proto3";
import "protobunny/commons.proto";
// Define the tasks package
package main.tasks;
message TaskMessage {
string content = 10;
repeated float weights = 30 [packed = true];
repeated int64 bbox = 40 [packed = true];
optional commons.JsonContent options=50;
}
protobunny generateYou should find the generated classes under codegen/mymessagelib.
import asyncio
import logging
import time
import protobunny as pb
from protobunny import asyncio as pb_asyncio
# # sys.path.append(pb.default_configuration.generated_package_root)
# sys.path.append("./codegen")
pb.config_lib() # this is needed when the python classes for your lib are generated in a subfolder
import mymessagelib as ml
logging.basicConfig(
level=logging.INFO, format="[%(asctime)s %(levelname)s] %(name)s - %(message)s"
)
log = logging.getLogger(__name__)
conf = pb.config
class TestLibAsync:
async def on_message(self, message: ml.tests.TestMessage) -> None:
log.info("Got: %s", message)
result = message.make_result()
await pb_asyncio.publish_result(result)
async def on_message_results(self, result: pb.results.Result) -> None:
log.info("Got result: %s", result)
log.info("Source: %s", result.source)
async def worker1(self, task: ml.main.tasks.TaskMessage) -> None:
log.info("1- Working on: %s", task)
await asyncio.sleep(0.1)
async def worker2(self, task: ml.main.tasks.TaskMessage) -> None:
log.info("2- Working on: %s", task)
await asyncio.sleep(0.1)
async def on_message_mymessage(self, message: ml.main.MyMessage) -> None:
log.info("Got main message: %s", message)
def run_forever(self):
asyncio.run(self.main())
def log_callback(self, incoming, body) -> None:
log.info(f"LOG {incoming.routing_key}: {body}")
async def main(self):
await pb_asyncio.subscribe_logger(self.log_callback)
await pb_asyncio.subscribe(ml.main.tasks.TaskMessage, self.worker1)
await pb_asyncio.subscribe(ml.main.tasks.TaskMessage, self.worker2)
await pb_asyncio.subscribe(ml.tests.TestMessage, self.on_message)
await pb_asyncio.subscribe_results(ml.tests.TestMessage, self.on_message_results)
await pb_asyncio.subscribe(ml.main.MyMessage, self.on_message_mymessage)
await pb_asyncio.publish(ml.main.MyMessage(content="test"))
await pb_asyncio.publish(ml.tests.TestMessage(number=1, content="test", data={"test": 123}))
await pb_asyncio.publish(ml.main.tasks.TaskMessage(content="test1"))
await pb_asyncio.publish(ml.main.tasks.TaskMessage(content="test2"))
await pb_asyncio.publish(ml.main.tasks.TaskMessage(content="test3"))
log.info("TEST LIB started. Press Ctrl+C to exit.")
class TestLib:
def on_message(self, message: ml.tests.TestMessage) -> None:
log.info("Got: %s", message)
result = message.make_result()
pb.publish_result(result)
def on_message_results(self, result: pb.results.Result) -> None:
log.info("Got result: %s", result)
log.info("Source: %s", result.source)
def worker1(self, task: ml.main.tasks.TaskMessage) -> None:
log.info("1- Working on: %s", task)
time.sleep(0.1)
def worker2(self, task: ml.main.tasks.TaskMessage) -> None:
log.info("2- Working on: %s", task)
time.sleep(0.1)
def log_callback(self, incoming, body) -> None:
log.info(f"LOG {incoming.routing_key}: {body}")
def main(self):
pb.subscribe_logger(self.log_callback)
pb.subscribe(ml.main.tasks.TaskMessage, self.worker1)
pb.subscribe(ml.main.tasks.TaskMessage, self.worker2)
pb.subscribe(ml.tests.TestMessage, self.on_message)
pb.subscribe_results(ml.tests.TestMessage, self.on_message_results)
pb.subscribe(ml.main.MyMessage, lambda x: log.info(x))
pb.publish(ml.main.MyMessage(content="test"))
pb.publish(ml.tests.TestMessage(number=1, content="test", data={"test": 123}))
pb.publish(ml.main.tasks.TaskMessage(content="test1"))
pb.publish(ml.main.tasks.TaskMessage(content="test2"))
pb.publish(ml.main.tasks.TaskMessage(content="test3"))
if __name__ == "__main__":
config = pb.config
if config.use_async:
log.info("Using async")
testlib = TestLibAsync()
pb_asyncio.run_forever(testlib.main)
else:
log.info("Using sync")
testlib = TestLib()
testlib.main()
pb.run_forever()python test_lib.py Output:
[2025-12-30 01:05:23,702 INFO] __main__ - Using async
[2025-12-30 01:05:23,702 INFO] protobunny - Started. Press Ctrl+C to exit.
[2025-12-30 01:05:23,742 INFO] protobunny.asyncio.backends.rabbitmq.connection - Establishing RabbitMQ connection to 127.0.0.1:5672/%2F
[2025-12-30 01:05:23,768 INFO] protobunny.asyncio.backends.rabbitmq.connection - Successfully connected to RabbitMQ
[2025-12-30 01:05:23,772 INFO] protobunny.asyncio.backends.rabbitmq.connection - Subscribing to topic 'acme.#' (queue=amq_37755497ba5d47ca95956d0bef5f6ae9, shared=False)
[2025-12-30 01:05:23,775 INFO] protobunny.asyncio.backends.rabbitmq.connection - Subscribing to topic 'acme.main.tasks.TaskMessage' (queue=acme.main.tasks.TaskMessage, shared=True)
[2025-12-30 01:05:23,776 INFO] protobunny.asyncio.backends.rabbitmq.connection - Subscribing to topic 'acme.main.tasks.TaskMessage' (queue=acme.main.tasks.TaskMessage, shared=True)
[2025-12-30 01:05:23,780 INFO] protobunny.asyncio.backends.rabbitmq.connection - Subscribing to topic 'acme.tests.TestMessage' (queue=amq_1cf4275aecd643d6ad711c7bbf6de31d, shared=False)
[2025-12-30 01:05:23,784 INFO] protobunny.asyncio.backends.rabbitmq.connection - Subscribing to topic 'acme.tests.TestMessage.result' (queue=amq_a014606e624348a894965c36e2c7fd26, shared=False)
[2025-12-30 01:05:23,787 INFO] protobunny.asyncio.backends.rabbitmq.connection - Subscribing to topic 'acme.main.MyMessage' (queue=amq_0791c366494348b0be55ff095fc3e71c, shared=False)
[2025-12-30 01:05:23,789 INFO] __main__ - Got main message: MyMessage(content='test', detail=None)
[2025-12-30 01:05:23,789 INFO] __main__ - LOG acme.main.MyMessage: {"content": "test", "number": 0, "detail": null}
[2025-12-30 01:05:23,791 INFO] __main__ - Got: TestMessage(content='test', number=1, data={'test': 123}, detail=None)
[2025-12-30 01:05:23,791 INFO] protobunny.asyncio.backends - Publishing result to: acme.tests.TestMessage.result
[2025-12-30 01:05:23,792 INFO] __main__ - LOG acme.tests.TestMessage: {"content": "test", "number": 1, "data": {"test": 123}, "detail": null}
[2025-12-30 01:05:23,793 INFO] __main__ - LOG acme.main.tasks.TaskMessage: {"content": "test1", "weights": [], "bbox": [], "options": null}
[2025-12-30 01:05:23,793 INFO] __main__ - 1- Working on: TaskMessage(content='test1', options=None)
[2025-12-30 01:05:23,793 INFO] __main__ - Got result: Result(source_message=Any(type_url='mymessagelib.tests.TestMessage', value=b'R\x04test\xa0\x01\x01\xca\x01\x0f\n\r{"test": 123}'), return_code=ReturnCode.SUCCESS, error='', return_value=None)
[2025-12-30 01:05:23,794 INFO] __main__ - Source: TestMessage(content='test', number=1, data={'test': 123}, detail=None)
[2025-12-30 01:05:23,795 INFO] __main__ - LOG acme.tests.TestMessage.result: SUCCESS - {"content": "test", "number": 1, "data": {"test": 123}, "detail": null}
[2025-12-30 01:05:23,795 INFO] __main__ - 2- Working on: TaskMessage(content='test2', options=None)
[2025-12-30 01:05:23,795 INFO] __main__ - LOG acme.main.tasks.TaskMessage: {"content": "test2", "weights": [], "bbox": [], "options": null}
[2025-12-30 01:05:23,796 INFO] __main__ - 1- Working on: TaskMessage(content='test3', options=None)
[2025-12-30 01:05:23,796 INFO] __main__ - LOG acme.main.tasks.TaskMessage: {"content": "test3", "weights": [], "bbox": [], "options": null}
[2025-12-30 01:05:23,796 INFO] __main__ - TEST LIB started. Press Ctrl+C to exit.