Skip to content

Commit 0c5d0be

Browse files
committed
feat(ci): xdist per-worker isolation infrastructure + sanity check workflow
- New xdist.py module: per-worker Redis DB, Kafka topics, Snuba URL helpers - sentry.py: wire xdist Redis DB + Snuba URL into settings before initialize_app - sentry.py: deterministic region name (seeded RNG) + per-worker snowflake IDs - kafka.py: per-worker topic names and consumer group IDs - relay.py: per-worker container names, port offsets, Redis DB, Kafka topic template vars - template/config.yml: parameterized Kafka topic names - skips.py: _requires_snuba reads per-worker port from SNUBA env var - test_snowflake.py: explicit region override so expected values are deterministic - backend-xdist.yml: copy of backend.yml with xdist flags and per-worker Snuba bootstrap All isolation changes are no-ops without xdist env vars.
1 parent fe1417f commit 0c5d0be

File tree

11 files changed

+829
-42
lines changed

11 files changed

+829
-42
lines changed

.github/workflows/backend-xdist.yml

Lines changed: 652 additions & 0 deletions
Large diffs are not rendered by default.

docs/memory.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ New file: `.github/workflows/classify-services.yml`
190190

191191
`workflow_dispatch` only. Runs the classifier across 22 shards, merges per-shard reports into one artifact.
192192

193-
### Phase 5: Collection Optimization (G1)
193+
### Phase 5: Collection Optimization (G1) + H1 Overlapped Startup Support
194194

195195
**`pytest_ignore_collect` hook** in `sentry.py`:
196196

@@ -206,9 +206,15 @@ Guards:
206206

207207
Blocks until `/tmp/services-ready` sentinel file exists (created by background service-startup script). Needed because G1 makes collection finish ~50s faster, potentially before services are ready.
208208

209+
**`_requires_snuba` polling** in `skips.py`: Add `_wait_for_service()` polling controlled by `SNUBA_WAIT_TIMEOUT` env var. With H1 overlapped startup, Snuba may not be up when pytest starts. The polling waits instead of failing immediately.
210+
209211
**Two-layer filtering note**: Both G1 (`pytest_ignore_collect`) and `pytest_collection_modifyitems` filter by `SELECTED_TESTS_FILE`. G1 prevents import; `modifyitems` deselects after import. G1 is the performance win; `modifyitems` handles class/test granularity filtering and shard assignment.
210212

211-
### Phase 6: Tiered Workflow
213+
### Phase 6: Performance Optimizations
214+
215+
**Relay container lifecycle**: Broaden `relay_server_setup` and `_relay_container` from function/module scope to session scope. `live_server` is already session-scoped, so this is safe. One Docker container per worker session instead of per test. Only ~6 relay test classes exist, saving ~50-60s. Add `_relay_container` session-scoped fixture, make `relay_server` a thin wrapper calling `_ensure_relay_in_db()` + `adjust_settings_for_relay_tests()`.
216+
217+
### Phase 7: Tiered Workflow
212218

213219
**`backend-xdist-split-poc.yml`**: The full CI workflow.
214220

docs/notes.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Tiered xdist v2 — Design Notes
2+
3+
## Why Relay needs per-worker Docker containers
4+
5+
Each xdist worker needs its own Relay container because Relay's config is baked in at startup:
6+
- Kafka topics: each worker writes to its own topics (`ingest-events-gw0` vs `ingest-events-gw1`)
7+
- Redis DB: each worker uses its own DB number
8+
- Snuba instance: each worker routes to its own Snuba on a different port
9+
10+
A single Relay container can only have one config, so sharing across workers is not possible.
11+
12+
Within a single worker, we can't share one container across test classes because `TransactionTestCase` flushes the DB between tests, deleting the Relay model row that Sentry uses to authenticate Relay (401s without it). The `_ensure_relay_in_db()` call before each test re-inserts the row, but the container itself persists across tests in the same class.
13+
14+
Currently one container per test (function-scoped). Could be optimized to **one container per worker session** since `live_server` (pytest-django) is session-scoped. Only ~6 relay test classes exist. This optimization is separated from the xdist correctness changes (per-worker naming/ports) to keep concerns clean. The function-scoped `relay_server` fixture would become a thin wrapper calling `_ensure_relay_in_db()` + `adjust_settings_for_relay_tests()`.
15+
16+
## Why Snuba URL must be set before `initialize_app()`
17+
18+
`sentry.utils.snuba` creates a module-level connection pool singleton (`_snuba_pool`) from `settings.SENTRY_SNUBA` at import time. `initialize_app()` transitively triggers that import through the Django app loading chain (100+ modules reference `sentry.utils.snuba`). So `settings.SENTRY_SNUBA` must be overridden before `initialize_app()` is called in `pytest_configure`.
19+
20+
We verified that `sentry.utils.snuba` is NOT imported during plugin loading (before `pytest_configure`), so overriding the setting in `pytest_configure` is early enough. No module-level env var hack needed.
21+
22+
## Why lazy imports inside fixtures are OK but inside class methods are not
23+
24+
Pytest fixtures are lazily invoked — the import only runs when a test actually requests the fixture. This is standard pytest practice for optional/heavy dependencies. Moving imports from module-level into a fixture function body is a single import per fixture, clean and sustainable.
25+
26+
Scattering imports inside every method of a class (like the `Browser` class in selenium.py) is unsustainable — anyone adding a new method must remember to add the import. The better approach for selenium is conditional plugin loading via env var.
27+
28+
## pytest-rerunfailures crash recovery under xdist
29+
30+
The experiment branch disabled `pytest_rerunfailures.HAS_PYTEST_HANDLECRASHITEM`, claiming the socket-based crash recovery protocol deadlocks during heavy xdist startup due to connection timeouts. However, reading the actual source code (v15.0), the server thread (`ServerStatusDB`) calls `self.sock.accept()` in an infinite loop with no timeout, and the socket is set to `setblocking(1)` with no timeout on `recv(1)`. There is no connection window that workers can miss. The deadlock explanation from the experiment docs doesn't match the code. Skip this change and only revisit if we actually hit freezes when enabling xdist.
31+
32+
## Why hash-based sharding beats algorithmic LPT
33+
34+
With 17+ shards and ~32K tests, the law of large numbers gives hash-based (`sha256(nodeid) % N`) sharding good-enough balance (~90-130s spread). LPT algorithms failed because:
35+
- Test count is a poor proxy for duration (files with few slow integration tests get treated as "light")
36+
- Flat duration LPT optimizes `sum(worker_loads)` but actual wall clock = `max(worker_loads)` — it ignores intra-shard parallelism
37+
- Indivisible mega-scopes (large test classes) create unavoidable hotspots under scope-preserving algorithms

docs/tiered-xdist-changes.md

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,48 @@
3333
**What:** Moved `sentry.testutils.pytest.selenium` out of the static `pytest_plugins` list. It's now appended conditionally only when `SENTRY_SKIP_SELENIUM_PLUGIN != "1"`.
3434

3535
**Why:** selenium is a 23MB package imported at module level. We should avoid loading it when not running acceptance tests. Currently we pass `--ignore tests/acceptance` but that only prevents test collection and not plugin loading.
36+
37+
## 2. xdist Per-Worker Isolation Infrastructure
38+
39+
**Problem:** When pytest-xdist spawns multiple workers (`-n 3`) inside a single shard, all workers share the same Redis, Kafka, Snuba/ClickHouse, and Relay. Without isolation, workers corrupt each other: `flushdb()` wipes another worker's cache, Kafka events cross-pollinate between consumers, `reset_snuba` truncates another worker's data, and identical snowflake IDs cause `IntegrityError` on unique constraints.
40+
41+
**Approach:** Give each worker its own Redis DB number, Kafka topic names, Snuba instance, Relay container, and snowflake ID range. All gated on xdist env vars — **no-ops without them**.
42+
43+
### 2a. xdist helpers + per-worker Redis and Snuba
44+
45+
**New file:** `src/sentry/testutils/pytest/xdist.py` — resolves worker ID once at module level; provides `get_redis_db()`, `get_kafka_topic()`, `get_snuba_url()`.
46+
47+
**Modified:** `src/sentry/testutils/pytest/sentry.py` — Redis cluster settings call `xdist.get_redis_db()` instead of hardcoded `TEST_REDIS_DB`. Snuba URL is overridden via `settings.SENTRY_SNUBA = xdist.get_snuba_url()` in `pytest_configure` before `initialize_app()`. This must happen before `initialize_app` because `sentry.utils.snuba` creates a module-level connection pool singleton (`_snuba_pool`) from `settings.SENTRY_SNUBA` at import time, and `initialize_app` transitively triggers that import.
48+
49+
**Modified:** `src/sentry/testutils/skips.py``_requires_snuba` reads port from `SNUBA` env var instead of hardcoded 1218 (per-worker Snuba uses 1230+N).
50+
51+
### 2b. Deterministic region name + per-worker snowflake IDs
52+
53+
**Modified:** `src/sentry/testutils/pytest/sentry.py` (`_configure_test_env_regions`)
54+
55+
Region name RNG is seeded with `PYTEST_XDIST_TESTRUNUID` so all workers generate the same name (xdist requires identical test collection). Each worker gets `region_snowflake_id = worker_num + 1` so concurrent Project/Organization/Team creation produces unique snowflake IDs instead of colliding.
56+
57+
### 2c. Per-worker Kafka topic isolation
58+
59+
**Modified:** `src/sentry/testutils/pytest/kafka.py` — topic names and consumer group ID use `xdist.get_kafka_topic()`.
60+
61+
**Modified:** `src/sentry/testutils/pytest/template/config.yml` — hardcoded `ingest-events`/`outcomes` replaced with `${KAFKA_TOPIC_EVENTS}`/`${KAFKA_TOPIC_OUTCOMES}` template variables.
62+
63+
**Modified:** `src/sentry/testutils/pytest/relay.py` — passes the per-worker topic names as template variables when rendering Relay config.
64+
65+
### 2d. Per-worker Relay container isolation
66+
67+
**Modified:** `src/sentry/testutils/pytest/relay.py`
68+
69+
- Per-worker container names (`sentry_test_relay_server_gw0`) and port offsets (`33331 + worker_num * 100`) to avoid Docker name and port collisions.
70+
- Per-worker Redis DB via `xdist.get_redis_db()`.
71+
72+
### 2e. xdist CI workflow
73+
74+
**New file:** `.github/workflows/backend-xdist.yml` — copy of `backend.yml` with minimal changes: triggers on `mchen/tiered-xdist-v2` branch, adds `PYTHONHASHSEED=0`, `XDIST_PER_WORKER_SNUBA=1`, `SENTRY_SKIP_SELENIUM_PLUGIN=1`, per-worker Snuba bootstrap step, and runs pytest with `-n 3 --dist=loadfile` instead of `make test-python-ci`.
75+
76+
### 2f. Snowflake test fix
77+
78+
**Modified:** `tests/sentry/utils/test_snowflake.py`
79+
80+
Two tests hardcode expected snowflake values assuming `region_snowflake_id=0`. Under xdist, workers use `worker_num + 1` (from 2b). Fix: wrap in `override_regions` with explicit `Region("test-region", 0, ...)` so expected values are deterministic.

src/sentry/testutils/pytest/kafka.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
from confluent_kafka import Consumer, Producer
77
from confluent_kafka.admin import AdminClient
88

9+
from sentry.testutils.pytest import xdist
10+
911
_log = logging.getLogger(__name__)
1012

1113
MAX_SECONDS_WAITING_FOR_EVENT = 16
@@ -71,10 +73,8 @@ def scope_consumers():
7173
7274
"""
7375
all_consumers: MutableMapping[str, Consumer | None] = {
74-
# Relay is configured to use this topic for all ingest messages. See
75-
# `templates/config.yml`.
76-
"ingest-events": None,
77-
"outcomes": None,
76+
xdist.get_kafka_topic("ingest-events"): None,
77+
xdist.get_kafka_topic("outcomes"): None,
7878
}
7979

8080
yield all_consumers
@@ -106,10 +106,8 @@ def ingest_consumer(settings):
106106
from sentry.consumers import get_stream_processor
107107
from sentry.utils.batching_kafka_consumer import create_topics
108108

109-
# Relay is configured to use this topic for all ingest messages. See
110-
# `template/config.yml`.
111109
cluster_name = "default"
112-
topic_event_name = "ingest-events"
110+
topic_event_name = xdist.get_kafka_topic("ingest-events")
113111

114112
if scope_consumers[topic_event_name] is not None:
115113
# reuse whatever was already created (will ignore the settings)
@@ -120,8 +118,7 @@ def ingest_consumer(settings):
120118
admin.delete_topic(topic_event_name)
121119
create_topics(cluster_name, [topic_event_name])
122120

123-
# simulate the event ingestion task
124-
group_id = "test-consumer"
121+
group_id = xdist.get_kafka_topic("test-consumer")
125122

126123
consumer = get_stream_processor(
127124
"ingest-attachments",

src/sentry/testutils/pytest/relay.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import requests
1414

1515
from sentry.runner.commands.devservices import get_docker_client
16-
from sentry.testutils.pytest.sentry import TEST_REDIS_DB
16+
from sentry.testutils.pytest import xdist
1717

1818
_log = logging.getLogger(__name__)
1919

@@ -23,6 +23,8 @@
2323

2424

2525
def _relay_server_container_name() -> str:
26+
if xdist._worker_id:
27+
return f"sentry_test_relay_server_{xdist._worker_id}"
2628
return "sentry_test_relay_server"
2729

2830

@@ -66,9 +68,10 @@ def relay_server_setup(live_server, tmpdir_factory):
6668
template_path = _get_template_dir()
6769
sources = ["config.yml", "credentials.json"]
6870

69-
relay_port = ephemeral_port_reserve.reserve(ip="127.0.0.1", port=33331)
71+
worker_num = xdist._worker_num or 0
72+
relay_port = ephemeral_port_reserve.reserve(ip="127.0.0.1", port=33331 + worker_num * 100)
7073

71-
redis_db = TEST_REDIS_DB
74+
redis_db = xdist.get_redis_db()
7275

7376
from sentry.relay import projectconfig_cache
7477
from sentry.relay.projectconfig_cache.redis import RedisProjectConfigCache
@@ -84,6 +87,8 @@ def relay_server_setup(live_server, tmpdir_factory):
8487
"KAFKA_HOST": "kafka",
8588
"REDIS_HOST": "redis",
8689
"REDIS_DB": redis_db,
90+
"KAFKA_TOPIC_EVENTS": xdist.get_kafka_topic("ingest-events"),
91+
"KAFKA_TOPIC_OUTCOMES": xdist.get_kafka_topic("outcomes"),
8792
}
8893

8994
for source in sources:

src/sentry/testutils/pytest/sentry.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import sentry_sdk
1818
from django.conf import settings
1919

20+
from sentry.testutils.pytest import xdist
2021
from sentry.runner.importer import install_plugin_apps
2122
from sentry.silo.base import SiloMode
2223
from sentry.testutils.region import TestEnvRegionDirectory
@@ -32,9 +33,6 @@
3233
os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, os.pardir, os.pardir, "tests")
3334
)
3435

35-
TEST_REDIS_DB = 9
36-
37-
3836
def _use_monolith_dbs() -> bool:
3937
return os.environ.get("SENTRY_USE_MONOLITH_DBS", "0") == "1"
4038

@@ -69,10 +67,21 @@ def _configure_test_env_regions() -> None:
6967
# Assign a random name on every test run, as a reminder that test setup and
7068
# assertions should not depend on this value. If you need to test behavior that
7169
# depends on region attributes, use `override_regions` in your test case.
72-
region_name = "testregion" + "".join(random.choices(string.digits, k=6))
70+
# Under xdist, seed deterministically so all workers generate the same name
71+
# (divergent names break xdist's requirement for identical test collection).
72+
xdist_uid = os.environ.get("PYTEST_XDIST_TESTRUNUID")
73+
r = random.Random(xdist_uid) if xdist_uid else random
74+
region_name = "testregion" + "".join(r.choices(string.digits, k=6))
75+
76+
# Under xdist, each worker gets a unique snowflake_id (1, 2, 3, ...) so
77+
# concurrent model creation doesn't produce colliding IDs.
78+
region_snowflake_id = xdist._worker_num + 1 if xdist._worker_num is not None else 0
7379

7480
default_region = Region(
75-
region_name, 0, settings.SENTRY_OPTIONS["system.url-prefix"], RegionCategory.MULTI_TENANT
81+
region_name,
82+
region_snowflake_id,
83+
settings.SENTRY_OPTIONS["system.url-prefix"],
84+
RegionCategory.MULTI_TENANT,
7685
)
7786

7887
settings.SENTRY_REGION = region_name
@@ -196,14 +205,17 @@ def pytest_configure(config: pytest.Config) -> None:
196205
settings.SENTRY_RATELIMITER = "sentry.ratelimits.redis.RedisRateLimiter"
197206
settings.SENTRY_RATELIMITER_OPTIONS = {}
198207

208+
if snuba_url := xdist.get_snuba_url():
209+
settings.SENTRY_SNUBA = snuba_url
210+
199211
settings.SENTRY_ISSUE_PLATFORM_FUTURES_MAX_LIMIT = 1
200212

201213
if not hasattr(settings, "SENTRY_OPTIONS"):
202214
settings.SENTRY_OPTIONS = {}
203215

204216
settings.SENTRY_OPTIONS.update(
205217
{
206-
"redis.clusters": {"default": {"hosts": {0: {"db": TEST_REDIS_DB}}}},
218+
"redis.clusters": {"default": {"hosts": {0: {"db": xdist.get_redis_db()}}}},
207219
"mail.backend": "django.core.mail.backends.locmem.EmailBackend",
208220
"system.url-prefix": "http://testserver",
209221
"system.base-hostname": "testserver",

src/sentry/testutils/pytest/template/config.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ processing:
1414
kafka_config:
1515
- {name: 'bootstrap.servers', value: '${KAFKA_HOST}:9093'}
1616
topics:
17-
events: ingest-events
18-
attachments: ingest-events
19-
transactions: ingest-events
20-
outcomes: outcomes
17+
events: ${KAFKA_TOPIC_EVENTS}
18+
attachments: ${KAFKA_TOPIC_EVENTS}
19+
transactions: ${KAFKA_TOPIC_EVENTS}
20+
outcomes: ${KAFKA_TOPIC_OUTCOMES}
2121
redis: redis://${REDIS_HOST}:6379/${REDIS_DB}
2222
aggregator:
2323
bucket_interval: 1 # Use shortest possible interval to speed up tests
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from __future__ import annotations
2+
3+
import os
4+
5+
_TEST_REDIS_DB = 9
6+
_SNUBA_BASE_PORT = 1230
7+
8+
_worker_id: str | None = os.environ.get("PYTEST_XDIST_WORKER")
9+
_worker_num: int | None = int(_worker_id.replace("gw", "")) if _worker_id else None
10+
11+
12+
def get_redis_db() -> int:
13+
if _worker_num is not None:
14+
return _TEST_REDIS_DB + _worker_num
15+
return _TEST_REDIS_DB
16+
17+
18+
def get_kafka_topic(base_name: str) -> str:
19+
if _worker_id:
20+
return f"{base_name}-{_worker_id}"
21+
return base_name
22+
23+
24+
def get_snuba_url() -> str | None:
25+
if _worker_num is not None and os.environ.get("XDIST_PER_WORKER_SNUBA"):
26+
return f"http://127.0.0.1:{_SNUBA_BASE_PORT + _worker_num}"
27+
return None

src/sentry/testutils/skips.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from __future__ import annotations
22

3+
import os
34
import socket
5+
from urllib.parse import urlparse
46

57
import pytest
68

@@ -22,7 +24,8 @@ def _requires_service_message(name: str) -> str:
2224
@pytest.fixture(scope="session")
2325
def _requires_snuba() -> None:
2426
# TODO: ability to ask devservices what port a service is on
25-
if not _service_available("127.0.0.1", 1218):
27+
port = urlparse(os.environ.get("SNUBA", "")).port or 1218
28+
if not _service_available("127.0.0.1", port):
2629
pytest.fail(_requires_service_message("snuba"))
2730

2831

0 commit comments

Comments
 (0)