Skip to content

Commit 461042e

Browse files
committed
Update Taskbroker / Taskworker Client to Support Push Mode
1 parent e6e831c commit 461042e

File tree

4 files changed

+30
-6
lines changed

4 files changed

+30
-6
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ dependencies = [
101101
"statsd>=3.3.0",
102102
"structlog>=22.1.0",
103103
"symbolic>=12.14.1",
104-
"taskbroker-client>=0.1.7",
104+
"taskbroker-client>=0.1.8",
105105
"tiktoken>=0.8.0",
106106
"tokenizers>=0.22.0",
107107
"tldextract>=5.1.2",

src/sentry/runner/commands/run.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,11 +135,20 @@ def taskworker_scheduler(redis_cluster: str, **options: Any) -> None:
135135

136136

137137
@run.command()
138+
@click.option(
139+
"--push-mode", help="Whether to run in PUSH or PULL mode.", default=False, is_flag=True
140+
)
138141
@click.option(
139142
"--rpc-host",
140-
help="The hostname and port for the taskworker-rpc. When using num-brokers the hostname will be appended with `-{i}` to connect to individual brokers.",
143+
help="The hostname and port for the taskbroker gRPC server. When using num-brokers the hostname will be appended with `-{i}` to connect to individual brokers.",
141144
default="127.0.0.1:50051",
142145
)
146+
@click.option(
147+
"--worker-rpc-port",
148+
help="Port for the taskworker gRPC server to listen on when it is running in push mode.",
149+
default=50052,
150+
type=int,
151+
)
143152
@click.option(
144153
"--num-brokers", help="Number of brokers available to connect to", default=None, type=int
145154
)
@@ -198,6 +207,8 @@ def taskworker(**options: Any) -> None:
198207

199208

200209
def run_taskworker(
210+
push_mode: bool,
211+
worker_rpc_port: int,
201212
rpc_host: str,
202213
num_brokers: int | None,
203214
rpc_host_list: str | None,
@@ -233,7 +244,8 @@ def run_taskworker(
233244
processing_pool_name=processing_pool_name,
234245
health_check_file_path=health_check_file_path,
235246
health_check_sec_per_touch=health_check_sec_per_touch,
236-
**options,
247+
grpc_port=worker_rpc_port,
248+
push_mode=push_mode**options,
237249
)
238250
exitcode = worker.start()
239251
raise SystemExit(exitcode)

src/sentry/taskworker/adapters.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,18 @@ def incr(
5959
sample_rate = settings.SENTRY_METRICS_SAMPLE_RATE
6060
sentry_metrics.incr(name, amount=int(value), tags=tags, sample_rate=sample_rate)
6161

62+
def gauge(
63+
self,
64+
key: str,
65+
value: float,
66+
instance: str | None = None,
67+
tags: Tags | None = None,
68+
sample_rate: float = 1,
69+
unit: str | None = None,
70+
stacklevel: int = 0,
71+
) -> None:
72+
return sentry_metrics.gauge(key, value, tags=tags)
73+
6274
def distribution(
6375
self,
6476
name: str,

uv.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)