Skip to content

Commit 7e7fd5e

Browse files
committed
Add queuespec for worker starting
1 parent 98026ea commit 7e7fd5e

File tree

7 files changed

+232
-6
lines changed

7 files changed

+232
-6
lines changed

qio/__main__.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
from contextlib import suppress
22
from time import sleep as time_sleep
3+
from typing import Annotated
34

45
from pika import ConnectionParameters
6+
from typer import Argument
57
from typer import Typer
68

79
from . import routine
@@ -10,6 +12,7 @@
1012
from .pika.broker import PikaBroker
1113
from .pika.transport import PikaTransport
1214
from .qio import Qio
15+
from .queuespec import QueueSpec
1316
from .sleep import sleep
1417
from .worker import Worker
1518

@@ -92,14 +95,29 @@ def monitor(raw: bool = False):
9295

9396

9497
@app.command()
95-
def worker():
98+
def worker(
99+
queuespec: Annotated[
100+
QueueSpec,
101+
Argument(
102+
parser=QueueSpec.parse,
103+
help="Queue configuration in format 'queue=concurrency'. "
104+
"Examples: 'production=10', 'api,background=5'",
105+
metavar="QUEUE[,QUEUE2,...]=CONCURRENCY",
106+
),
107+
],
108+
):
109+
"""Start a worker process for the specified queue and concurrency.
110+
111+
The worker will process invocations from the specified queue,
112+
as many at a time as specified by the concurrency.
113+
"""
96114
connection_params = ConnectionParameters()
97115
qio = Qio(
98116
broker=PikaBroker(connection_params),
99117
transport=PikaTransport(connection_params),
100118
default_queue="qio",
101119
)
102-
Worker(qio, queue="qio", concurrency=3)()
120+
Worker(qio, queuespec)()
103121

104122

105123
@app.command()

qio/queuespec.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# QueueSpec
2+
3+
The `QueueSpec` class represents a parsed queue configuration for workers.
4+
5+
## Usage
6+
7+
```python
8+
from qio.queuespec import QueueSpec
9+
10+
# Single queue
11+
spec = QueueSpec.parse("production=10")
12+
print(spec.queues) # ['production']
13+
print(spec.concurrency) # 10
14+
15+
# Multiple queues
16+
spec = QueueSpec.parse("high,medium,low=5")
17+
print(spec.queues) # ['high', 'medium', 'low']
18+
print(spec.concurrency) # 5
19+
```
20+
21+
## Format
22+
23+
Queue specifications use the format: `queue[,queue2,...]=concurrency`
24+
25+
## Examples
26+
27+
| Input | Queues | Concurrency |
28+
|---------------------|-----------------------------|-------------|
29+
| `production=10` | `['production']` | `10` |
30+
| `api,background=5` | `['api', 'background']` | `5` |
31+
| `high,medium,low=2` | `['high', 'medium', 'low']` | `2` |

qio/queuespec.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
from dataclasses import dataclass
2+
from typing import Self
3+
4+
5+
@dataclass
6+
class QueueSpec:
7+
"""The parsed specification for how a worker will process tasks.
8+
9+
The queues are all the queues that are assigned to this amount of
10+
concurrency. The exact implementation is broker-specific, but invocations
11+
will often be roughly taken in round-robin order between the queues.
12+
"""
13+
14+
queues: list[str]
15+
concurrency: int
16+
17+
@classmethod
18+
def parse(cls, value: str) -> Self:
19+
"""Parse a queue spec string.
20+
21+
Examples:
22+
23+
| queuespec | queues | concurrency |
24+
+--------------------+----------------------+------------------+
25+
| queue2=5 | ['queue2'] | 5 |
26+
| queue1,queue2=10 | ['queue1', 'queue2'] | 10 |
27+
"""
28+
29+
if not value or value.strip() == "":
30+
raise ValueError("Queue spec cannot be empty")
31+
32+
if "=" not in value:
33+
raise ValueError(
34+
f"Invalid queue spec. Be sure to include both the queue and capacity. "
35+
f"got: '{value}'"
36+
)
37+
38+
# Left split instead of right split to make = invalid for queue names
39+
raw_queues, raw_concurrency = value.split("=", 1)
40+
41+
try:
42+
concurrency = int(raw_concurrency)
43+
except ValueError:
44+
raise ValueError(
45+
f"Concurrency must be a positive integer, "
46+
f"got: '{raw_concurrency}' in '{value}'"
47+
) from None
48+
if concurrency <= 0:
49+
raise ValueError(
50+
f"Concurrency must be a positive integer, "
51+
f"got: '{raw_concurrency}' in '{value}'"
52+
)
53+
54+
queues = [q.strip() for q in raw_queues.split(",") if q.strip()]
55+
56+
if not queues:
57+
raise ValueError(f"No valid queue names found in '{value}'")
58+
59+
return cls(queues=queues, concurrency=concurrency)

qio/queuespec_test.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import pytest
2+
3+
from .queuespec import QueueSpec
4+
5+
6+
def test_single_queue_parsing():
7+
spec = QueueSpec.parse("production=10")
8+
assert spec.queues == ["production"]
9+
assert spec.concurrency == 10
10+
11+
12+
def test_multi_queue_parsing():
13+
spec = QueueSpec.parse("high,medium,low=5")
14+
assert spec.queues == ["high", "medium", "low"]
15+
assert spec.concurrency == 5
16+
17+
18+
def test_whitespace_handling():
19+
spec = QueueSpec.parse(" api , background = 8 ")
20+
assert spec.queues == ["api", "background"]
21+
assert spec.concurrency == 8
22+
23+
24+
def test_empty_spec_error():
25+
with pytest.raises(ValueError, match="Queue spec cannot be empty"):
26+
QueueSpec.parse("")
27+
28+
29+
def test_missing_equals_error():
30+
with pytest.raises(ValueError, match="Invalid queue spec"):
31+
QueueSpec.parse("production")
32+
33+
34+
def test_empty_concurrency_error():
35+
with pytest.raises(ValueError, match="Concurrency must be a positive integer"):
36+
QueueSpec.parse("queue=")
37+
38+
39+
def test_invalid_concurrency_error():
40+
with pytest.raises(ValueError, match="Concurrency must be a positive integer"):
41+
QueueSpec.parse("queue=abc")
42+
43+
44+
def test_zero_concurrency_error():
45+
with pytest.raises(ValueError, match="Concurrency must be a positive integer"):
46+
QueueSpec.parse("queue=0")
47+
48+
49+
def test_negative_concurrency_error():
50+
with pytest.raises(ValueError, match="Concurrency must be a positive integer"):
51+
QueueSpec.parse("queue=-5")
52+
53+
54+
def test_empty_queue_names_error():
55+
with pytest.raises(ValueError, match="No valid queue names found"):
56+
QueueSpec.parse("=10")
57+
58+
59+
def test_comma_only_error():
60+
with pytest.raises(ValueError, match="Invalid queue spec"):
61+
QueueSpec.parse(",")
62+
63+
64+
def test_empty_queue_names_in_list():
65+
spec = QueueSpec.parse("queue1,,queue2=5")
66+
assert spec.queues == ["queue1", "queue2"]
67+
assert spec.concurrency == 5

qio/worker.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Worker
2+
3+
The `Worker` class processes invocations from message queues with specified concurrency.
4+
5+
## Usage
6+
7+
```python
8+
from qio.qio import Qio
9+
from qio.queuespec import QueueSpec
10+
from qio.worker import Worker
11+
12+
# Create queue specification
13+
queuespec = QueueSpec.parse("production=5")
14+
15+
# Create worker
16+
qio = Qio(broker=broker, transport=transport)
17+
worker = Worker(qio, queuespec)
18+
19+
# Run worker (blocks until stopped)
20+
worker()
21+
```
22+
23+
## Concurrency
24+
25+
The worker spawns multiple runner threads based on the concurrency setting:
26+
- Each thread processes invocations independently
27+
- Prefetch limit matches concurrency to prevent overloading
28+
- Threads coordinate through internal task queues
29+
30+
## Lifecycle
31+
32+
1. **Initialization**: Validates single queue, creates consumer and threads
33+
2. **Running**: Starts all threads and monitors for completion
34+
3. **Shutdown**: Stops gracefully on `KeyboardInterrupt` or thread failure
35+
36+
## CLI Integration
37+
38+
```bash
39+
python -m qio worker production=10
40+
```
41+
42+
The CLI uses `QueueSpec.parse()` to convert string arguments into `QueueSpec` objects.

qio/worker.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,29 @@
1313
from .invocation import Invocation
1414
from .invocation import LocalInvocationSuspended
1515
from .qio import Qio
16+
from .queuespec import QueueSpec
1617
from .thread import Thread
1718

1819

1920
class Worker:
20-
def __init__(self, qio: Qio, *, queue: str, concurrency: int):
21+
def __init__(self, qio: Qio, queuespec: QueueSpec):
2122
self.__qio = qio
23+
24+
if not queuespec.queues:
25+
raise ValueError("No queues specified in queuespec")
26+
if len(queuespec.queues) != 1:
27+
raise ValueError("Only one queue is supported")
28+
2229
self.__tasks = Queue[Invocation | SendContinuation | ThrowContinuation]()
23-
self.__consumer = self.__qio.consume(queue=queue, prefetch=concurrency)
30+
self.__consumer = self.__qio.consume(
31+
queue=queuespec.queues[0], prefetch=queuespec.concurrency
32+
)
2433
self.__continuer_events = self.__qio.subscribe({LocalInvocationSuspended})
2534

2635
# Start threads event queues are created
2736
self.__runner_threads = [
2837
Thread(target=self.__runner, name=f"qio-runner-{i + 1}")
29-
for i in range(concurrency)
38+
for i in range(queuespec.concurrency)
3039
]
3140
self.__continuer_thread = Thread(target=self.__continuer, name="qio-continuer")
3241
self.__receiver_thread = Thread(target=self.__receiver, name="qio-receiver")

qio_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def test_integration():
2929

3030
# 1. Start worker process in the background
3131
worker = subprocess.Popen(
32-
[sys.executable, "-m", "qio", "worker"],
32+
[sys.executable, "-m", "qio", "worker", "qio=3"],
3333
stdout=subprocess.PIPE,
3434
stderr=subprocess.PIPE,
3535
)

0 commit comments

Comments
 (0)