Skip to content

Commit ffe02a8

Browse files
committed
Require a default queue
1 parent fe9810d commit ffe02a8

File tree

7 files changed

+26
-12
lines changed

7 files changed

+26
-12
lines changed

qio/__main__.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def regular(instance: int, iterations: int):
2323
return f"Instance {instance} completed"
2424

2525

26-
@routine(name="raises", queue="qio")
26+
@routine(name="raises")
2727
def raises():
2828
raise ValueError("This is a test exception")
2929

@@ -60,7 +60,9 @@ async def irregular():
6060
def submit():
6161
connection_params = ConnectionParameters()
6262
qio = Qio(
63-
broker=PikaBroker(connection_params), transport=PikaTransport(connection_params)
63+
broker=PikaBroker(connection_params),
64+
transport=PikaTransport(connection_params),
65+
default_queue="qio",
6466
)
6567
try:
6668
qio.submit(irregular())
@@ -75,6 +77,7 @@ def monitor(raw: bool = False):
7577
qio = Qio(
7678
broker=PikaBroker(connection_params),
7779
transport=PikaTransport(connection_params),
80+
default_queue="qio",
7881
)
7982
events = qio.subscribe({object})
8083
try:
@@ -92,7 +95,9 @@ def monitor(raw: bool = False):
9295
def worker():
9396
connection_params = ConnectionParameters()
9497
qio = Qio(
95-
broker=PikaBroker(connection_params), transport=PikaTransport(connection_params)
98+
broker=PikaBroker(connection_params),
99+
transport=PikaTransport(connection_params),
100+
default_queue="qio",
96101
)
97102
Worker(qio, queue="qio", concurrency=3)()
98103

@@ -101,7 +106,9 @@ def worker():
101106
def purge():
102107
connection_params = ConnectionParameters()
103108
qio = Qio(
104-
broker=PikaBroker(connection_params), transport=PikaTransport(connection_params)
109+
broker=PikaBroker(connection_params),
110+
transport=PikaTransport(connection_params),
111+
default_queue="qio",
105112
)
106113
try:
107114
qio.purge(queue="qio")

qio/monitor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def __init__(self):
3232
self.__qio = Qio(
3333
broker=PikaBroker(connection_params),
3434
transport=PikaTransport(connection_params),
35+
default_queue="qio",
3536
)
3637
self.__thread = Thread(target=self.__listen)
3738
self.__events = self.__qio.subscribe(

qio/qio.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,12 @@
3131

3232

3333
class Qio:
34-
def __init__(self, *, broker: Broker, transport: Transport):
34+
def __init__(
35+
self, *, broker: Broker, transport: Transport, default_queue: str = "default"
36+
):
3537
self.__bus = Bus(transport)
3638
self.__broker = broker
39+
self.__default_queue = default_queue
3740
self.__invocations = dict[Invocation, Message]()
3841

3942
def run[R](self, invocation: Invocation[R], /) -> R:
@@ -103,7 +106,8 @@ def submit(self, invocation: Invocation, /):
103106
kwargs=invocation.kwargs,
104107
)
105108
)
106-
self.__broker.enqueue(serialize(invocation), queue=routine.queue)
109+
queue = routine.queue if routine.queue is not None else self.__default_queue
110+
self.__broker.enqueue(serialize(invocation), queue=queue)
107111

108112
def consume(self, *, queue: str, prefetch: int) -> Generator[Invocation]:
109113
for message in self.__broker.consume(queue=queue, prefetch=prefetch):

qio/qio_test.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ def test_qio_with_custom_broker_and_transport():
77
"""Qio custom broker and transport implementations."""
88
broker = StubBroker()
99
transport = StubTransport()
10-
qio = Qio(broker=broker, transport=transport)
10+
qio = Qio(broker=broker, transport=transport, default_queue="qio")
1111

1212
try:
1313
# Test purge (uses broker)
@@ -26,11 +26,11 @@ def test_different_qio_instances_are_independent():
2626
# Create two Qio instances with different stub implementations
2727
broker1 = StubBroker()
2828
transport1 = StubTransport()
29-
qio1 = Qio(broker=broker1, transport=transport1)
29+
qio1 = Qio(broker=broker1, transport=transport1, default_queue="qio")
3030

3131
broker2 = StubBroker()
3232
transport2 = StubTransport()
33-
qio2 = Qio(broker=broker2, transport=transport2)
33+
qio2 = Qio(broker=broker2, transport=transport2, default_queue="qio")
3434

3535
try:
3636
# Both should work independently

qio/registry.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
ROUTINE_REGISTRY: dict[str, Routine] = {}
66

77

8-
def routine(*, name: str, queue: str):
8+
def routine(*, name: str, queue: str | None = None):
99
"""Decorate a function to make it a routine."""
1010

1111
def create_routine[**A, R](fn: Callable[A, R]) -> Routine[A, R]:

qio/routine.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55

66
class Routine[**A, R]:
7-
def __init__(self, fn: Callable[A, R], *, name: str, queue: str):
7+
def __init__(self, fn: Callable[A, R], *, name: str, queue: str | None = None):
88
self.fn = fn
99
self.name = name
1010
self.queue = queue

qio_test.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ def test_integration():
1616
# Prefers a clean environment and queue
1717
connection_params = ConnectionParameters()
1818
qio = Qio(
19-
broker=PikaBroker(connection_params), transport=PikaTransport(connection_params)
19+
broker=PikaBroker(connection_params),
20+
transport=PikaTransport(connection_params),
21+
default_queue="qio",
2022
)
2123

2224
try:

0 commit comments

Comments
 (0)