Skip to content

Commit a6ce51e

Browse files
committed
fix(repos): Have repo sync batch up the work
We've had a few task timeouts with the sync, caused by extremely large batches. Switching this over to limit batch sizes to 100, and to just fire parallel tasks instead. This pr just creates the tasks and call them directly rather than scheduling them. This is so that we can be sure the tasks are deployed before we start firing them off. There will be a follow up pr to actually schedule them.
1 parent 54243c1 commit a6ce51e

File tree

2 files changed

+233
-117
lines changed

2 files changed

+233
-117
lines changed

src/sentry/integrations/source_code_management/sync_repos.py

Lines changed: 206 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from sentry.features.exceptions import FeatureNotRegistered
2121
from sentry.integrations.models.organization_integration import OrganizationIntegration
2222
from sentry.integrations.services.integration import integration_service
23+
from sentry.integrations.services.integration.model import RpcIntegration
2324
from sentry.integrations.services.repository.service import repository_service
2425
from sentry.integrations.source_code_management.metrics import (
2526
SCMIntegrationInteractionEvent,
@@ -28,13 +29,15 @@
2829
from sentry.integrations.source_code_management.repo_audit import log_repo_change
2930
from sentry.integrations.source_code_management.repository import RepositoryIntegration
3031
from sentry.organizations.services.organization import organization_service
32+
from sentry.organizations.services.organization.model import RpcOrganization
3133
from sentry.plugins.providers.integration_repository import get_integration_repository_provider
3234
from sentry.shared_integrations.exceptions import ApiError
3335
from sentry.silo.base import SiloMode
3436
from sentry.tasks.base import instrumented_task, retry
3537
from sentry.taskworker.namespaces import integrations_control_tasks
3638
from sentry.utils import metrics
3739
from sentry.utils.cursored_scheduler import CursoredScheduler
40+
from sentry.utils.iterators import chunked
3841

3942
logger = logging.getLogger(__name__)
4043

@@ -51,6 +54,8 @@
5154
"vsts",
5255
]
5356

57+
SYNC_BATCH_SIZE = 100
58+
5459

5560
def _has_feature(flag: str, org: object) -> bool:
5661
"""Check a feature flag, returning False if the flag isn't registered."""
@@ -73,43 +78,12 @@ def sync_repos_for_org(organization_integration_id: int) -> None:
7378
Sync repositories for a single OrganizationIntegration.
7479
7580
Fetches all repos from the SCM provider, diffs against Sentry's
76-
Repository table, and creates/disables/re-enables repos as needed.
81+
Repository table, then dispatches batched apply tasks.
7782
"""
78-
try:
79-
oi = OrganizationIntegration.objects.get(
80-
id=organization_integration_id,
81-
status=ObjectStatus.ACTIVE,
82-
)
83-
except OrganizationIntegration.DoesNotExist:
84-
logger.info(
85-
"sync_repos_for_org.missing_org_integration",
86-
extra={"organization_integration_id": organization_integration_id},
87-
)
83+
ctx = _get_sync_context(organization_integration_id)
84+
if ctx is None:
8885
return
89-
90-
integration = integration_service.get_integration(
91-
integration_id=oi.integration_id, status=ObjectStatus.ACTIVE
92-
)
93-
if integration is None:
94-
logger.info(
95-
"sync_repos_for_org.missing_integration",
96-
extra={"integration_id": oi.integration_id},
97-
)
98-
return
99-
100-
organization_id = oi.organization_id
101-
org_context = organization_service.get_organization_by_id(
102-
id=organization_id, include_projects=False, include_teams=False
103-
)
104-
if org_context is None:
105-
logger.info(
106-
"sync_repos_for_org.missing_organization",
107-
extra={"organization_id": organization_id},
108-
)
109-
return
110-
111-
rpc_org = org_context.organization
112-
provider_key = integration.provider
86+
integration, rpc_org, provider_key = ctx
11387

11488
if not _has_feature(f"organizations:{provider_key}-repo-auto-sync", rpc_org):
11589
return
@@ -120,10 +94,10 @@ def sync_repos_for_org(organization_integration_id: int) -> None:
12094
with SCMIntegrationInteractionEvent(
12195
interaction_type=SCMIntegrationInteractionType.SYNC_REPOS,
12296
integration_id=integration.id,
123-
organization_id=organization_id,
97+
organization_id=rpc_org.id,
12498
provider_key=provider_key,
12599
).capture():
126-
installation = integration.get_installation(organization_id=organization_id)
100+
installation = integration.get_installation(organization_id=rpc_org.id)
127101
assert isinstance(installation, RepositoryIntegration)
128102

129103
try:
@@ -134,15 +108,15 @@ def sync_repos_for_org(organization_integration_id: int) -> None:
134108
"sync_repos_for_org.rate_limited",
135109
extra={
136110
"integration_id": integration.id,
137-
"organization_id": organization_id,
111+
"organization_id": rpc_org.id,
138112
},
139113
)
140114
raise
141115

142116
provider_external_ids = {repo["external_id"] for repo in provider_repos}
143117

144118
all_repos = repository_service.get_repositories(
145-
organization_id=organization_id,
119+
organization_id=rpc_org.id,
146120
integration_id=integration.id,
147121
providers=[provider],
148122
)
@@ -193,7 +167,7 @@ def sync_repos_for_org(organization_integration_id: int) -> None:
193167
extra={
194168
"provider": provider_key,
195169
"integration_id": integration.id,
196-
"organization_id": organization_id,
170+
"organization_id": rpc_org.id,
197171
"dry_run": dry_run,
198172
"provider_total": len(provider_external_ids),
199173
"sentry_active": len(sentry_active_ids),
@@ -210,76 +184,203 @@ def sync_repos_for_org(organization_integration_id: int) -> None:
210184
if dry_run:
211185
return
212186

213-
repo_by_external_id = {r.external_id: r for r in active_repos + disabled_repos}
187+
# Build repo configs for new repos
188+
new_repo_configs = [
189+
{
190+
**repo,
191+
"identifier": str(repo["identifier"]),
192+
"integration_id": integration.id,
193+
"installation": integration.id,
194+
}
195+
for repo in provider_repos
196+
if repo["external_id"] in new_ids
197+
]
198+
removed_id_list = list(removed_ids)
199+
restored_id_list = list(restored_ids)
200+
201+
# TODO: Switch to apply_async once the tasks are deployed to all workers
202+
for config_batch in chunked(new_repo_configs, SYNC_BATCH_SIZE):
203+
create_repos_batch(
204+
organization_integration_id=organization_integration_id,
205+
repo_configs=config_batch,
206+
)
214207

215-
if new_ids:
216-
integration_repo_provider = get_integration_repository_provider(integration)
217-
repo_configs = [
218-
{
219-
**repo,
220-
"identifier": str(repo["identifier"]),
221-
"integration_id": integration.id,
222-
"installation": integration.id,
223-
}
224-
for repo in provider_repos
225-
if repo["external_id"] in new_ids
226-
]
227-
if repo_configs:
228-
created_repos, reactivated_repos, _ = integration_repo_provider.create_repositories(
229-
configs=repo_configs, organization=rpc_org
208+
if _has_feature("organizations:scm-repo-auto-sync-removal", rpc_org):
209+
for removed_batch in chunked(removed_id_list, SYNC_BATCH_SIZE):
210+
disable_repos_batch(
211+
organization_integration_id=organization_integration_id,
212+
external_ids=removed_batch,
230213
)
231214

232-
for repo in created_repos:
233-
log_repo_change(
234-
event_name="REPO_ADDED",
235-
organization_id=organization_id,
236-
repo=repo,
237-
source="automatic SCM syncing",
238-
provider=provider_key,
239-
)
240-
241-
for repo in reactivated_repos:
242-
log_repo_change(
243-
event_name="REPO_ENABLED",
244-
organization_id=organization_id,
245-
repo=repo,
246-
source="automatic SCM syncing",
247-
provider=provider_key,
248-
)
249-
250-
if removed_ids and _has_feature("organizations:scm-repo-auto-sync-removal", rpc_org):
251-
repository_service.disable_repositories_by_external_ids(
252-
organization_id=organization_id,
253-
integration_id=integration.id,
254-
provider=provider,
255-
external_ids=list(removed_ids),
215+
for restored_batch in chunked(restored_id_list, SYNC_BATCH_SIZE):
216+
restore_repos_batch(
217+
organization_integration_id=organization_integration_id,
218+
external_ids=restored_batch,
219+
)
220+
221+
222+
def _get_sync_context(
223+
organization_integration_id: int,
224+
) -> tuple[RpcIntegration, RpcOrganization, str] | None:
225+
"""Shared lookup for batch tasks. Returns (integration, rpc_org, provider_key) or None."""
226+
try:
227+
oi = OrganizationIntegration.objects.get(
228+
id=organization_integration_id,
229+
status=ObjectStatus.ACTIVE,
230+
)
231+
except OrganizationIntegration.DoesNotExist:
232+
logger.info(
233+
"sync_repos.missing_org_integration",
234+
extra={"organization_integration_id": organization_integration_id},
235+
)
236+
return None
237+
238+
integration = integration_service.get_integration(
239+
integration_id=oi.integration_id, status=ObjectStatus.ACTIVE
240+
)
241+
if integration is None:
242+
logger.info(
243+
"sync_repos.missing_integration",
244+
extra={"integration_id": oi.integration_id},
245+
)
246+
return None
247+
248+
org_context = organization_service.get_organization_by_id(
249+
id=oi.organization_id, include_projects=False, include_teams=False
250+
)
251+
if org_context is None:
252+
logger.info(
253+
"sync_repos.missing_organization",
254+
extra={"organization_id": oi.organization_id},
255+
)
256+
return None
257+
258+
return integration, org_context.organization, integration.provider
259+
260+
261+
@instrumented_task(
262+
name="sentry.integrations.source_code_management.sync_repos.create_repos_batch",
263+
namespace=integrations_control_tasks,
264+
retry=Retry(times=3, delay=120),
265+
processing_deadline_duration=120,
266+
silo_mode=SiloMode.CONTROL,
267+
)
268+
@retry()
269+
def create_repos_batch(
270+
organization_integration_id: int,
271+
repo_configs: list[dict[str, object]],
272+
) -> None:
273+
ctx = _get_sync_context(organization_integration_id)
274+
if ctx is None:
275+
return
276+
integration, rpc_org, provider_key = ctx
277+
278+
integration_repo_provider = get_integration_repository_provider(integration)
279+
created_repos, reactivated_repos, _ = integration_repo_provider.create_repositories(
280+
configs=repo_configs, organization=rpc_org
281+
)
282+
283+
for repo in created_repos:
284+
log_repo_change(
285+
event_name="REPO_ADDED",
286+
organization_id=rpc_org.id,
287+
repo=repo,
288+
source="automatic SCM syncing",
289+
provider=provider_key,
290+
)
291+
292+
for repo in reactivated_repos:
293+
log_repo_change(
294+
event_name="REPO_ENABLED",
295+
organization_id=rpc_org.id,
296+
repo=repo,
297+
source="automatic SCM syncing",
298+
provider=provider_key,
299+
)
300+
301+
302+
@instrumented_task(
303+
name="sentry.integrations.source_code_management.sync_repos.disable_repos_batch",
304+
namespace=integrations_control_tasks,
305+
retry=Retry(times=3, delay=120),
306+
processing_deadline_duration=120,
307+
silo_mode=SiloMode.CONTROL,
308+
)
309+
@retry()
310+
def disable_repos_batch(
311+
organization_integration_id: int,
312+
external_ids: list[str],
313+
) -> None:
314+
ctx = _get_sync_context(organization_integration_id)
315+
if ctx is None:
316+
return
317+
integration, rpc_org, provider_key = ctx
318+
provider = f"integrations:{provider_key}"
319+
320+
if not _has_feature("organizations:scm-repo-auto-sync-removal", rpc_org):
321+
return
322+
323+
repository_service.disable_repositories_by_external_ids(
324+
organization_id=rpc_org.id,
325+
integration_id=integration.id,
326+
provider=provider,
327+
external_ids=external_ids,
328+
)
329+
330+
all_repos = repository_service.get_repositories(
331+
organization_id=rpc_org.id,
332+
integration_id=integration.id,
333+
providers=[provider],
334+
)
335+
repo_by_external_id = {r.external_id: r for r in all_repos}
336+
337+
for eid in external_ids:
338+
removed_repo = repo_by_external_id.get(eid)
339+
if removed_repo:
340+
log_repo_change(
341+
event_name="REPO_DISABLED",
342+
organization_id=rpc_org.id,
343+
repo=removed_repo,
344+
source="automatic SCM syncing",
345+
provider=provider_key,
256346
)
257347

258-
for eid in removed_ids:
259-
removed_repo = repo_by_external_id.get(eid)
260-
if removed_repo:
261-
log_repo_change(
262-
event_name="REPO_DISABLED",
263-
organization_id=organization_id,
264-
repo=removed_repo,
265-
source="automatic SCM syncing",
266-
provider=provider_key,
267-
)
268-
269-
if restored_ids:
270-
for repo in disabled_repos:
271-
if repo.external_id in restored_ids:
272-
repo.status = ObjectStatus.ACTIVE
273-
repository_service.update_repository(
274-
organization_id=organization_id, update=repo
275-
)
276-
log_repo_change(
277-
event_name="REPO_ENABLED",
278-
organization_id=organization_id,
279-
repo=repo,
280-
source="automatic SCM syncing",
281-
provider=provider_key,
282-
)
348+
349+
@instrumented_task(
350+
name="sentry.integrations.source_code_management.sync_repos.restore_repos_batch",
351+
namespace=integrations_control_tasks,
352+
retry=Retry(times=3, delay=120),
353+
processing_deadline_duration=120,
354+
silo_mode=SiloMode.CONTROL,
355+
)
356+
@retry()
357+
def restore_repos_batch(
358+
organization_integration_id: int,
359+
external_ids: list[str],
360+
) -> None:
361+
ctx = _get_sync_context(organization_integration_id)
362+
if ctx is None:
363+
return
364+
integration, rpc_org, provider_key = ctx
365+
provider = f"integrations:{provider_key}"
366+
367+
all_repos = repository_service.get_repositories(
368+
organization_id=rpc_org.id,
369+
integration_id=integration.id,
370+
providers=[provider],
371+
)
372+
restore_set = set(external_ids)
373+
for repo in all_repos:
374+
if repo.external_id in restore_set:
375+
repo.status = ObjectStatus.ACTIVE
376+
repository_service.update_repository(organization_id=rpc_org.id, update=repo)
377+
log_repo_change(
378+
event_name="REPO_ENABLED",
379+
organization_id=rpc_org.id,
380+
repo=repo,
381+
source="automatic SCM syncing",
382+
provider=provider_key,
383+
)
283384

284385

285386
@instrumented_task(

0 commit comments

Comments
 (0)