From a6ce51ef892e1c741527b719bf54622fcffc310e Mon Sep 17 00:00:00 2001 From: Dan Fuller Date: Wed, 15 Apr 2026 15:53:47 -0700 Subject: [PATCH] fix(repos): Have repo sync batch up the work We've had a few task timeouts with the sync, caused by extremely large batches. Switching this over to limit batch sizes to 100, and to just fire parallel tasks instead. This pr just creates the tasks and call them directly rather than scheduling them. This is so that we can be sure the tasks are deployed before we start firing them off. There will be a follow up pr to actually schedule them. --- .../source_code_management/sync_repos.py | 311 ++++++++++++------ .../source_code_management/test_sync_repos.py | 39 ++- 2 files changed, 233 insertions(+), 117 deletions(-) diff --git a/src/sentry/integrations/source_code_management/sync_repos.py b/src/sentry/integrations/source_code_management/sync_repos.py index 69c4deea9523dc..cc042442cd0a92 100644 --- a/src/sentry/integrations/source_code_management/sync_repos.py +++ b/src/sentry/integrations/source_code_management/sync_repos.py @@ -20,6 +20,7 @@ from sentry.features.exceptions import FeatureNotRegistered from sentry.integrations.models.organization_integration import OrganizationIntegration from sentry.integrations.services.integration import integration_service +from sentry.integrations.services.integration.model import RpcIntegration from sentry.integrations.services.repository.service import repository_service from sentry.integrations.source_code_management.metrics import ( SCMIntegrationInteractionEvent, @@ -28,6 +29,7 @@ from sentry.integrations.source_code_management.repo_audit import log_repo_change from sentry.integrations.source_code_management.repository import RepositoryIntegration from sentry.organizations.services.organization import organization_service +from sentry.organizations.services.organization.model import RpcOrganization from sentry.plugins.providers.integration_repository import get_integration_repository_provider from sentry.shared_integrations.exceptions import ApiError from sentry.silo.base import SiloMode @@ -35,6 +37,7 @@ from sentry.taskworker.namespaces import integrations_control_tasks from sentry.utils import metrics from sentry.utils.cursored_scheduler import CursoredScheduler +from sentry.utils.iterators import chunked logger = logging.getLogger(__name__) @@ -51,6 +54,8 @@ "vsts", ] +SYNC_BATCH_SIZE = 100 + def _has_feature(flag: str, org: object) -> bool: """Check a feature flag, returning False if the flag isn't registered.""" @@ -73,43 +78,12 @@ def sync_repos_for_org(organization_integration_id: int) -> None: Sync repositories for a single OrganizationIntegration. Fetches all repos from the SCM provider, diffs against Sentry's - Repository table, and creates/disables/re-enables repos as needed. + Repository table, then dispatches batched apply tasks. """ - try: - oi = OrganizationIntegration.objects.get( - id=organization_integration_id, - status=ObjectStatus.ACTIVE, - ) - except OrganizationIntegration.DoesNotExist: - logger.info( - "sync_repos_for_org.missing_org_integration", - extra={"organization_integration_id": organization_integration_id}, - ) + ctx = _get_sync_context(organization_integration_id) + if ctx is None: return - - integration = integration_service.get_integration( - integration_id=oi.integration_id, status=ObjectStatus.ACTIVE - ) - if integration is None: - logger.info( - "sync_repos_for_org.missing_integration", - extra={"integration_id": oi.integration_id}, - ) - return - - organization_id = oi.organization_id - org_context = organization_service.get_organization_by_id( - id=organization_id, include_projects=False, include_teams=False - ) - if org_context is None: - logger.info( - "sync_repos_for_org.missing_organization", - extra={"organization_id": organization_id}, - ) - return - - rpc_org = org_context.organization - provider_key = integration.provider + integration, rpc_org, provider_key = ctx if not _has_feature(f"organizations:{provider_key}-repo-auto-sync", rpc_org): return @@ -120,10 +94,10 @@ def sync_repos_for_org(organization_integration_id: int) -> None: with SCMIntegrationInteractionEvent( interaction_type=SCMIntegrationInteractionType.SYNC_REPOS, integration_id=integration.id, - organization_id=organization_id, + organization_id=rpc_org.id, provider_key=provider_key, ).capture(): - installation = integration.get_installation(organization_id=organization_id) + installation = integration.get_installation(organization_id=rpc_org.id) assert isinstance(installation, RepositoryIntegration) try: @@ -134,7 +108,7 @@ def sync_repos_for_org(organization_integration_id: int) -> None: "sync_repos_for_org.rate_limited", extra={ "integration_id": integration.id, - "organization_id": organization_id, + "organization_id": rpc_org.id, }, ) raise @@ -142,7 +116,7 @@ def sync_repos_for_org(organization_integration_id: int) -> None: provider_external_ids = {repo["external_id"] for repo in provider_repos} all_repos = repository_service.get_repositories( - organization_id=organization_id, + organization_id=rpc_org.id, integration_id=integration.id, providers=[provider], ) @@ -193,7 +167,7 @@ def sync_repos_for_org(organization_integration_id: int) -> None: extra={ "provider": provider_key, "integration_id": integration.id, - "organization_id": organization_id, + "organization_id": rpc_org.id, "dry_run": dry_run, "provider_total": len(provider_external_ids), "sentry_active": len(sentry_active_ids), @@ -210,76 +184,203 @@ def sync_repos_for_org(organization_integration_id: int) -> None: if dry_run: return - repo_by_external_id = {r.external_id: r for r in active_repos + disabled_repos} + # Build repo configs for new repos + new_repo_configs = [ + { + **repo, + "identifier": str(repo["identifier"]), + "integration_id": integration.id, + "installation": integration.id, + } + for repo in provider_repos + if repo["external_id"] in new_ids + ] + removed_id_list = list(removed_ids) + restored_id_list = list(restored_ids) + + # TODO: Switch to apply_async once the tasks are deployed to all workers + for config_batch in chunked(new_repo_configs, SYNC_BATCH_SIZE): + create_repos_batch( + organization_integration_id=organization_integration_id, + repo_configs=config_batch, + ) - if new_ids: - integration_repo_provider = get_integration_repository_provider(integration) - repo_configs = [ - { - **repo, - "identifier": str(repo["identifier"]), - "integration_id": integration.id, - "installation": integration.id, - } - for repo in provider_repos - if repo["external_id"] in new_ids - ] - if repo_configs: - created_repos, reactivated_repos, _ = integration_repo_provider.create_repositories( - configs=repo_configs, organization=rpc_org + if _has_feature("organizations:scm-repo-auto-sync-removal", rpc_org): + for removed_batch in chunked(removed_id_list, SYNC_BATCH_SIZE): + disable_repos_batch( + organization_integration_id=organization_integration_id, + external_ids=removed_batch, ) - for repo in created_repos: - log_repo_change( - event_name="REPO_ADDED", - organization_id=organization_id, - repo=repo, - source="automatic SCM syncing", - provider=provider_key, - ) - - for repo in reactivated_repos: - log_repo_change( - event_name="REPO_ENABLED", - organization_id=organization_id, - repo=repo, - source="automatic SCM syncing", - provider=provider_key, - ) - - if removed_ids and _has_feature("organizations:scm-repo-auto-sync-removal", rpc_org): - repository_service.disable_repositories_by_external_ids( - organization_id=organization_id, - integration_id=integration.id, - provider=provider, - external_ids=list(removed_ids), + for restored_batch in chunked(restored_id_list, SYNC_BATCH_SIZE): + restore_repos_batch( + organization_integration_id=organization_integration_id, + external_ids=restored_batch, + ) + + +def _get_sync_context( + organization_integration_id: int, +) -> tuple[RpcIntegration, RpcOrganization, str] | None: + """Shared lookup for batch tasks. Returns (integration, rpc_org, provider_key) or None.""" + try: + oi = OrganizationIntegration.objects.get( + id=organization_integration_id, + status=ObjectStatus.ACTIVE, + ) + except OrganizationIntegration.DoesNotExist: + logger.info( + "sync_repos.missing_org_integration", + extra={"organization_integration_id": organization_integration_id}, + ) + return None + + integration = integration_service.get_integration( + integration_id=oi.integration_id, status=ObjectStatus.ACTIVE + ) + if integration is None: + logger.info( + "sync_repos.missing_integration", + extra={"integration_id": oi.integration_id}, + ) + return None + + org_context = organization_service.get_organization_by_id( + id=oi.organization_id, include_projects=False, include_teams=False + ) + if org_context is None: + logger.info( + "sync_repos.missing_organization", + extra={"organization_id": oi.organization_id}, + ) + return None + + return integration, org_context.organization, integration.provider + + +@instrumented_task( + name="sentry.integrations.source_code_management.sync_repos.create_repos_batch", + namespace=integrations_control_tasks, + retry=Retry(times=3, delay=120), + processing_deadline_duration=120, + silo_mode=SiloMode.CONTROL, +) +@retry() +def create_repos_batch( + organization_integration_id: int, + repo_configs: list[dict[str, object]], +) -> None: + ctx = _get_sync_context(organization_integration_id) + if ctx is None: + return + integration, rpc_org, provider_key = ctx + + integration_repo_provider = get_integration_repository_provider(integration) + created_repos, reactivated_repos, _ = integration_repo_provider.create_repositories( + configs=repo_configs, organization=rpc_org + ) + + for repo in created_repos: + log_repo_change( + event_name="REPO_ADDED", + organization_id=rpc_org.id, + repo=repo, + source="automatic SCM syncing", + provider=provider_key, + ) + + for repo in reactivated_repos: + log_repo_change( + event_name="REPO_ENABLED", + organization_id=rpc_org.id, + repo=repo, + source="automatic SCM syncing", + provider=provider_key, + ) + + +@instrumented_task( + name="sentry.integrations.source_code_management.sync_repos.disable_repos_batch", + namespace=integrations_control_tasks, + retry=Retry(times=3, delay=120), + processing_deadline_duration=120, + silo_mode=SiloMode.CONTROL, +) +@retry() +def disable_repos_batch( + organization_integration_id: int, + external_ids: list[str], +) -> None: + ctx = _get_sync_context(organization_integration_id) + if ctx is None: + return + integration, rpc_org, provider_key = ctx + provider = f"integrations:{provider_key}" + + if not _has_feature("organizations:scm-repo-auto-sync-removal", rpc_org): + return + + repository_service.disable_repositories_by_external_ids( + organization_id=rpc_org.id, + integration_id=integration.id, + provider=provider, + external_ids=external_ids, + ) + + all_repos = repository_service.get_repositories( + organization_id=rpc_org.id, + integration_id=integration.id, + providers=[provider], + ) + repo_by_external_id = {r.external_id: r for r in all_repos} + + for eid in external_ids: + removed_repo = repo_by_external_id.get(eid) + if removed_repo: + log_repo_change( + event_name="REPO_DISABLED", + organization_id=rpc_org.id, + repo=removed_repo, + source="automatic SCM syncing", + provider=provider_key, ) - for eid in removed_ids: - removed_repo = repo_by_external_id.get(eid) - if removed_repo: - log_repo_change( - event_name="REPO_DISABLED", - organization_id=organization_id, - repo=removed_repo, - source="automatic SCM syncing", - provider=provider_key, - ) - - if restored_ids: - for repo in disabled_repos: - if repo.external_id in restored_ids: - repo.status = ObjectStatus.ACTIVE - repository_service.update_repository( - organization_id=organization_id, update=repo - ) - log_repo_change( - event_name="REPO_ENABLED", - organization_id=organization_id, - repo=repo, - source="automatic SCM syncing", - provider=provider_key, - ) + +@instrumented_task( + name="sentry.integrations.source_code_management.sync_repos.restore_repos_batch", + namespace=integrations_control_tasks, + retry=Retry(times=3, delay=120), + processing_deadline_duration=120, + silo_mode=SiloMode.CONTROL, +) +@retry() +def restore_repos_batch( + organization_integration_id: int, + external_ids: list[str], +) -> None: + ctx = _get_sync_context(organization_integration_id) + if ctx is None: + return + integration, rpc_org, provider_key = ctx + provider = f"integrations:{provider_key}" + + all_repos = repository_service.get_repositories( + organization_id=rpc_org.id, + integration_id=integration.id, + providers=[provider], + ) + restore_set = set(external_ids) + for repo in all_repos: + if repo.external_id in restore_set: + repo.status = ObjectStatus.ACTIVE + repository_service.update_repository(organization_id=rpc_org.id, update=repo) + log_repo_change( + event_name="REPO_ENABLED", + organization_id=rpc_org.id, + repo=repo, + source="automatic SCM syncing", + provider=provider_key, + ) @instrumented_task( diff --git a/tests/sentry/integrations/source_code_management/test_sync_repos.py b/tests/sentry/integrations/source_code_management/test_sync_repos.py index ceddafb981a40f..6092fb54dc6212 100644 --- a/tests/sentry/integrations/source_code_management/test_sync_repos.py +++ b/tests/sentry/integrations/source_code_management/test_sync_repos.py @@ -52,7 +52,8 @@ def test_creates_new_repos(self, _: MagicMock) -> None: with self.feature( ["organizations:github-repo-auto-sync", "organizations:github-repo-auto-sync-apply"] ): - sync_repos_for_org(self.oi.id) + with self.tasks(): + sync_repos_for_org(self.oi.id) with assume_test_silo_mode(SiloMode.CELL): repos = Repository.objects.filter(organization_id=self.organization.id).order_by("name") @@ -69,8 +70,12 @@ def test_creates_new_repos(self, _: MagicMock) -> None: ) assert entries.count() == 2 + @patch( + "sentry.tasks.seer.cleanup.make_bulk_remove_repositories_request", + return_value=MagicMock(status=200), + ) @responses.activate - def test_disables_removed_repos(self, _: MagicMock) -> None: + def test_disables_removed_repos(self, _: MagicMock, __: MagicMock) -> None: with assume_test_silo_mode(SiloMode.CELL): repo = Repository.objects.create( organization_id=self.organization.id, @@ -91,7 +96,8 @@ def test_disables_removed_repos(self, _: MagicMock) -> None: "organizations:scm-repo-auto-sync-removal", ] ): - sync_repos_for_org(self.oi.id) + with self.tasks(): + sync_repos_for_org(self.oi.id) with assume_test_silo_mode(SiloMode.CELL): repo.refresh_from_db() @@ -130,7 +136,8 @@ def test_re_enables_restored_repos(self, _: MagicMock) -> None: with self.feature( ["organizations:github-repo-auto-sync", "organizations:github-repo-auto-sync-apply"] ): - sync_repos_for_org(self.oi.id) + with self.tasks(): + sync_repos_for_org(self.oi.id) with assume_test_silo_mode(SiloMode.CELL): repo.refresh_from_db() @@ -159,7 +166,8 @@ def test_no_changes_needed(self, _: MagicMock) -> None: with self.feature( ["organizations:github-repo-auto-sync", "organizations:github-repo-auto-sync-apply"] ): - sync_repos_for_org(self.oi.id) + with self.tasks(): + sync_repos_for_org(self.oi.id) with assume_test_silo_mode(SiloMode.CELL): repos = Repository.objects.filter(organization_id=self.organization.id) @@ -178,7 +186,8 @@ def test_inactive_integration(self, _: MagicMock) -> None: with self.feature( ["organizations:github-repo-auto-sync", "organizations:github-repo-auto-sync-apply"] ): - sync_repos_for_org(self.oi.id) + with self.tasks(): + sync_repos_for_org(self.oi.id) with assume_test_silo_mode(SiloMode.CELL): assert Repository.objects.count() == 0 @@ -195,7 +204,8 @@ def test_dry_run_without_apply_flag(self, _: MagicMock) -> None: # Only the sync flag, not the apply flag with self.feature("organizations:github-repo-auto-sync"): - sync_repos_for_org(self.oi.id) + with self.tasks(): + sync_repos_for_org(self.oi.id) # No repos should be created with assume_test_silo_mode(SiloMode.CELL): @@ -221,7 +231,8 @@ def test_rate_limited_raises_for_retry(self, _: MagicMock) -> None: ) with self.feature("organizations:github-repo-auto-sync"), pytest.raises(RetryTaskError): - sync_repos_for_org(self.oi.id) + with self.tasks(): + sync_repos_for_org(self.oi.id) @control_silo_test @@ -263,7 +274,8 @@ def test_creates_new_repos_for_ghe(self, mock_get_repos: MagicMock) -> None: "organizations:github_enterprise-repo-auto-sync-apply", ] ): - sync_repos_for_org(oi.id) + with self.tasks(): + sync_repos_for_org(oi.id) with assume_test_silo_mode(SiloMode.CELL): repos = Repository.objects.filter(organization_id=self.organization.id).order_by("name") @@ -337,7 +349,8 @@ def test_creates_new_repos_for_gitlab(self) -> None: with self.feature( ["organizations:gitlab-repo-auto-sync", "organizations:gitlab-repo-auto-sync-apply"] ): - sync_repos_for_org(oi.id) + with self.tasks(): + sync_repos_for_org(oi.id) with assume_test_silo_mode(SiloMode.CELL): repos = Repository.objects.filter(organization_id=self.organization.id).order_by("name") @@ -396,7 +409,8 @@ def test_creates_new_repos_for_bitbucket(self) -> None: "organizations:bitbucket-repo-auto-sync-apply", ] ): - sync_repos_for_org(oi.id) + with self.tasks(): + sync_repos_for_org(oi.id) with assume_test_silo_mode(SiloMode.CELL): repos = Repository.objects.filter(organization_id=self.organization.id).order_by("name") @@ -460,7 +474,8 @@ def test_creates_new_repos_for_vsts(self, mock_get_client: MagicMock) -> None: with self.feature( ["organizations:vsts-repo-auto-sync", "organizations:vsts-repo-auto-sync-apply"] ): - sync_repos_for_org(oi.id) + with self.tasks(): + sync_repos_for_org(oi.id) with assume_test_silo_mode(SiloMode.CELL): repos = Repository.objects.filter(organization_id=self.organization.id).order_by("name")