diff --git a/src/sentry/integrations/source_code_management/sync_repos.py b/src/sentry/integrations/source_code_management/sync_repos.py index 69c4deea9523dc..cc042442cd0a92 100644 --- a/src/sentry/integrations/source_code_management/sync_repos.py +++ b/src/sentry/integrations/source_code_management/sync_repos.py @@ -20,6 +20,7 @@ from sentry.features.exceptions import FeatureNotRegistered from sentry.integrations.models.organization_integration import OrganizationIntegration from sentry.integrations.services.integration import integration_service +from sentry.integrations.services.integration.model import RpcIntegration from sentry.integrations.services.repository.service import repository_service from sentry.integrations.source_code_management.metrics import ( SCMIntegrationInteractionEvent, @@ -28,6 +29,7 @@ from sentry.integrations.source_code_management.repo_audit import log_repo_change from sentry.integrations.source_code_management.repository import RepositoryIntegration from sentry.organizations.services.organization import organization_service +from sentry.organizations.services.organization.model import RpcOrganization from sentry.plugins.providers.integration_repository import get_integration_repository_provider from sentry.shared_integrations.exceptions import ApiError from sentry.silo.base import SiloMode @@ -35,6 +37,7 @@ from sentry.taskworker.namespaces import integrations_control_tasks from sentry.utils import metrics from sentry.utils.cursored_scheduler import CursoredScheduler +from sentry.utils.iterators import chunked logger = logging.getLogger(__name__) @@ -51,6 +54,8 @@ "vsts", ] +SYNC_BATCH_SIZE = 100 + def _has_feature(flag: str, org: object) -> bool: """Check a feature flag, returning False if the flag isn't registered.""" @@ -73,43 +78,12 @@ def sync_repos_for_org(organization_integration_id: int) -> None: Sync repositories for a single OrganizationIntegration. Fetches all repos from the SCM provider, diffs against Sentry's - Repository table, and creates/disables/re-enables repos as needed. + Repository table, then dispatches batched apply tasks. """ - try: - oi = OrganizationIntegration.objects.get( - id=organization_integration_id, - status=ObjectStatus.ACTIVE, - ) - except OrganizationIntegration.DoesNotExist: - logger.info( - "sync_repos_for_org.missing_org_integration", - extra={"organization_integration_id": organization_integration_id}, - ) + ctx = _get_sync_context(organization_integration_id) + if ctx is None: return - - integration = integration_service.get_integration( - integration_id=oi.integration_id, status=ObjectStatus.ACTIVE - ) - if integration is None: - logger.info( - "sync_repos_for_org.missing_integration", - extra={"integration_id": oi.integration_id}, - ) - return - - organization_id = oi.organization_id - org_context = organization_service.get_organization_by_id( - id=organization_id, include_projects=False, include_teams=False - ) - if org_context is None: - logger.info( - "sync_repos_for_org.missing_organization", - extra={"organization_id": organization_id}, - ) - return - - rpc_org = org_context.organization - provider_key = integration.provider + integration, rpc_org, provider_key = ctx if not _has_feature(f"organizations:{provider_key}-repo-auto-sync", rpc_org): return @@ -120,10 +94,10 @@ def sync_repos_for_org(organization_integration_id: int) -> None: with SCMIntegrationInteractionEvent( interaction_type=SCMIntegrationInteractionType.SYNC_REPOS, integration_id=integration.id, - organization_id=organization_id, + organization_id=rpc_org.id, provider_key=provider_key, ).capture(): - installation = integration.get_installation(organization_id=organization_id) + installation = integration.get_installation(organization_id=rpc_org.id) assert isinstance(installation, RepositoryIntegration) try: @@ -134,7 +108,7 @@ def sync_repos_for_org(organization_integration_id: int) -> None: "sync_repos_for_org.rate_limited", extra={ "integration_id": integration.id, - "organization_id": organization_id, + "organization_id": rpc_org.id, }, ) raise @@ -142,7 +116,7 @@ def sync_repos_for_org(organization_integration_id: int) -> None: provider_external_ids = {repo["external_id"] for repo in provider_repos} all_repos = repository_service.get_repositories( - organization_id=organization_id, + organization_id=rpc_org.id, integration_id=integration.id, providers=[provider], ) @@ -193,7 +167,7 @@ def sync_repos_for_org(organization_integration_id: int) -> None: extra={ "provider": provider_key, "integration_id": integration.id, - "organization_id": organization_id, + "organization_id": rpc_org.id, "dry_run": dry_run, "provider_total": len(provider_external_ids), "sentry_active": len(sentry_active_ids), @@ -210,76 +184,203 @@ def sync_repos_for_org(organization_integration_id: int) -> None: if dry_run: return - repo_by_external_id = {r.external_id: r for r in active_repos + disabled_repos} + # Build repo configs for new repos + new_repo_configs = [ + { + **repo, + "identifier": str(repo["identifier"]), + "integration_id": integration.id, + "installation": integration.id, + } + for repo in provider_repos + if repo["external_id"] in new_ids + ] + removed_id_list = list(removed_ids) + restored_id_list = list(restored_ids) + + # TODO: Switch to apply_async once the tasks are deployed to all workers + for config_batch in chunked(new_repo_configs, SYNC_BATCH_SIZE): + create_repos_batch( + organization_integration_id=organization_integration_id, + repo_configs=config_batch, + ) - if new_ids: - integration_repo_provider = get_integration_repository_provider(integration) - repo_configs = [ - { - **repo, - "identifier": str(repo["identifier"]), - "integration_id": integration.id, - "installation": integration.id, - } - for repo in provider_repos - if repo["external_id"] in new_ids - ] - if repo_configs: - created_repos, reactivated_repos, _ = integration_repo_provider.create_repositories( - configs=repo_configs, organization=rpc_org + if _has_feature("organizations:scm-repo-auto-sync-removal", rpc_org): + for removed_batch in chunked(removed_id_list, SYNC_BATCH_SIZE): + disable_repos_batch( + organization_integration_id=organization_integration_id, + external_ids=removed_batch, ) - for repo in created_repos: - log_repo_change( - event_name="REPO_ADDED", - organization_id=organization_id, - repo=repo, - source="automatic SCM syncing", - provider=provider_key, - ) - - for repo in reactivated_repos: - log_repo_change( - event_name="REPO_ENABLED", - organization_id=organization_id, - repo=repo, - source="automatic SCM syncing", - provider=provider_key, - ) - - if removed_ids and _has_feature("organizations:scm-repo-auto-sync-removal", rpc_org): - repository_service.disable_repositories_by_external_ids( - organization_id=organization_id, - integration_id=integration.id, - provider=provider, - external_ids=list(removed_ids), + for restored_batch in chunked(restored_id_list, SYNC_BATCH_SIZE): + restore_repos_batch( + organization_integration_id=organization_integration_id, + external_ids=restored_batch, + ) + + +def _get_sync_context( + organization_integration_id: int, +) -> tuple[RpcIntegration, RpcOrganization, str] | None: + """Shared lookup for batch tasks. Returns (integration, rpc_org, provider_key) or None.""" + try: + oi = OrganizationIntegration.objects.get( + id=organization_integration_id, + status=ObjectStatus.ACTIVE, + ) + except OrganizationIntegration.DoesNotExist: + logger.info( + "sync_repos.missing_org_integration", + extra={"organization_integration_id": organization_integration_id}, + ) + return None + + integration = integration_service.get_integration( + integration_id=oi.integration_id, status=ObjectStatus.ACTIVE + ) + if integration is None: + logger.info( + "sync_repos.missing_integration", + extra={"integration_id": oi.integration_id}, + ) + return None + + org_context = organization_service.get_organization_by_id( + id=oi.organization_id, include_projects=False, include_teams=False + ) + if org_context is None: + logger.info( + "sync_repos.missing_organization", + extra={"organization_id": oi.organization_id}, + ) + return None + + return integration, org_context.organization, integration.provider + + +@instrumented_task( + name="sentry.integrations.source_code_management.sync_repos.create_repos_batch", + namespace=integrations_control_tasks, + retry=Retry(times=3, delay=120), + processing_deadline_duration=120, + silo_mode=SiloMode.CONTROL, +) +@retry() +def create_repos_batch( + organization_integration_id: int, + repo_configs: list[dict[str, object]], +) -> None: + ctx = _get_sync_context(organization_integration_id) + if ctx is None: + return + integration, rpc_org, provider_key = ctx + + integration_repo_provider = get_integration_repository_provider(integration) + created_repos, reactivated_repos, _ = integration_repo_provider.create_repositories( + configs=repo_configs, organization=rpc_org + ) + + for repo in created_repos: + log_repo_change( + event_name="REPO_ADDED", + organization_id=rpc_org.id, + repo=repo, + source="automatic SCM syncing", + provider=provider_key, + ) + + for repo in reactivated_repos: + log_repo_change( + event_name="REPO_ENABLED", + organization_id=rpc_org.id, + repo=repo, + source="automatic SCM syncing", + provider=provider_key, + ) + + +@instrumented_task( + name="sentry.integrations.source_code_management.sync_repos.disable_repos_batch", + namespace=integrations_control_tasks, + retry=Retry(times=3, delay=120), + processing_deadline_duration=120, + silo_mode=SiloMode.CONTROL, +) +@retry() +def disable_repos_batch( + organization_integration_id: int, + external_ids: list[str], +) -> None: + ctx = _get_sync_context(organization_integration_id) + if ctx is None: + return + integration, rpc_org, provider_key = ctx + provider = f"integrations:{provider_key}" + + if not _has_feature("organizations:scm-repo-auto-sync-removal", rpc_org): + return + + repository_service.disable_repositories_by_external_ids( + organization_id=rpc_org.id, + integration_id=integration.id, + provider=provider, + external_ids=external_ids, + ) + + all_repos = repository_service.get_repositories( + organization_id=rpc_org.id, + integration_id=integration.id, + providers=[provider], + ) + repo_by_external_id = {r.external_id: r for r in all_repos} + + for eid in external_ids: + removed_repo = repo_by_external_id.get(eid) + if removed_repo: + log_repo_change( + event_name="REPO_DISABLED", + organization_id=rpc_org.id, + repo=removed_repo, + source="automatic SCM syncing", + provider=provider_key, ) - for eid in removed_ids: - removed_repo = repo_by_external_id.get(eid) - if removed_repo: - log_repo_change( - event_name="REPO_DISABLED", - organization_id=organization_id, - repo=removed_repo, - source="automatic SCM syncing", - provider=provider_key, - ) - - if restored_ids: - for repo in disabled_repos: - if repo.external_id in restored_ids: - repo.status = ObjectStatus.ACTIVE - repository_service.update_repository( - organization_id=organization_id, update=repo - ) - log_repo_change( - event_name="REPO_ENABLED", - organization_id=organization_id, - repo=repo, - source="automatic SCM syncing", - provider=provider_key, - ) + +@instrumented_task( + name="sentry.integrations.source_code_management.sync_repos.restore_repos_batch", + namespace=integrations_control_tasks, + retry=Retry(times=3, delay=120), + processing_deadline_duration=120, + silo_mode=SiloMode.CONTROL, +) +@retry() +def restore_repos_batch( + organization_integration_id: int, + external_ids: list[str], +) -> None: + ctx = _get_sync_context(organization_integration_id) + if ctx is None: + return + integration, rpc_org, provider_key = ctx + provider = f"integrations:{provider_key}" + + all_repos = repository_service.get_repositories( + organization_id=rpc_org.id, + integration_id=integration.id, + providers=[provider], + ) + restore_set = set(external_ids) + for repo in all_repos: + if repo.external_id in restore_set: + repo.status = ObjectStatus.ACTIVE + repository_service.update_repository(organization_id=rpc_org.id, update=repo) + log_repo_change( + event_name="REPO_ENABLED", + organization_id=rpc_org.id, + repo=repo, + source="automatic SCM syncing", + provider=provider_key, + ) @instrumented_task( diff --git a/tests/sentry/integrations/source_code_management/test_sync_repos.py b/tests/sentry/integrations/source_code_management/test_sync_repos.py index ceddafb981a40f..6092fb54dc6212 100644 --- a/tests/sentry/integrations/source_code_management/test_sync_repos.py +++ b/tests/sentry/integrations/source_code_management/test_sync_repos.py @@ -52,7 +52,8 @@ def test_creates_new_repos(self, _: MagicMock) -> None: with self.feature( ["organizations:github-repo-auto-sync", "organizations:github-repo-auto-sync-apply"] ): - sync_repos_for_org(self.oi.id) + with self.tasks(): + sync_repos_for_org(self.oi.id) with assume_test_silo_mode(SiloMode.CELL): repos = Repository.objects.filter(organization_id=self.organization.id).order_by("name") @@ -69,8 +70,12 @@ def test_creates_new_repos(self, _: MagicMock) -> None: ) assert entries.count() == 2 + @patch( + "sentry.tasks.seer.cleanup.make_bulk_remove_repositories_request", + return_value=MagicMock(status=200), + ) @responses.activate - def test_disables_removed_repos(self, _: MagicMock) -> None: + def test_disables_removed_repos(self, _: MagicMock, __: MagicMock) -> None: with assume_test_silo_mode(SiloMode.CELL): repo = Repository.objects.create( organization_id=self.organization.id, @@ -91,7 +96,8 @@ def test_disables_removed_repos(self, _: MagicMock) -> None: "organizations:scm-repo-auto-sync-removal", ] ): - sync_repos_for_org(self.oi.id) + with self.tasks(): + sync_repos_for_org(self.oi.id) with assume_test_silo_mode(SiloMode.CELL): repo.refresh_from_db() @@ -130,7 +136,8 @@ def test_re_enables_restored_repos(self, _: MagicMock) -> None: with self.feature( ["organizations:github-repo-auto-sync", "organizations:github-repo-auto-sync-apply"] ): - sync_repos_for_org(self.oi.id) + with self.tasks(): + sync_repos_for_org(self.oi.id) with assume_test_silo_mode(SiloMode.CELL): repo.refresh_from_db() @@ -159,7 +166,8 @@ def test_no_changes_needed(self, _: MagicMock) -> None: with self.feature( ["organizations:github-repo-auto-sync", "organizations:github-repo-auto-sync-apply"] ): - sync_repos_for_org(self.oi.id) + with self.tasks(): + sync_repos_for_org(self.oi.id) with assume_test_silo_mode(SiloMode.CELL): repos = Repository.objects.filter(organization_id=self.organization.id) @@ -178,7 +186,8 @@ def test_inactive_integration(self, _: MagicMock) -> None: with self.feature( ["organizations:github-repo-auto-sync", "organizations:github-repo-auto-sync-apply"] ): - sync_repos_for_org(self.oi.id) + with self.tasks(): + sync_repos_for_org(self.oi.id) with assume_test_silo_mode(SiloMode.CELL): assert Repository.objects.count() == 0 @@ -195,7 +204,8 @@ def test_dry_run_without_apply_flag(self, _: MagicMock) -> None: # Only the sync flag, not the apply flag with self.feature("organizations:github-repo-auto-sync"): - sync_repos_for_org(self.oi.id) + with self.tasks(): + sync_repos_for_org(self.oi.id) # No repos should be created with assume_test_silo_mode(SiloMode.CELL): @@ -221,7 +231,8 @@ def test_rate_limited_raises_for_retry(self, _: MagicMock) -> None: ) with self.feature("organizations:github-repo-auto-sync"), pytest.raises(RetryTaskError): - sync_repos_for_org(self.oi.id) + with self.tasks(): + sync_repos_for_org(self.oi.id) @control_silo_test @@ -263,7 +274,8 @@ def test_creates_new_repos_for_ghe(self, mock_get_repos: MagicMock) -> None: "organizations:github_enterprise-repo-auto-sync-apply", ] ): - sync_repos_for_org(oi.id) + with self.tasks(): + sync_repos_for_org(oi.id) with assume_test_silo_mode(SiloMode.CELL): repos = Repository.objects.filter(organization_id=self.organization.id).order_by("name") @@ -337,7 +349,8 @@ def test_creates_new_repos_for_gitlab(self) -> None: with self.feature( ["organizations:gitlab-repo-auto-sync", "organizations:gitlab-repo-auto-sync-apply"] ): - sync_repos_for_org(oi.id) + with self.tasks(): + sync_repos_for_org(oi.id) with assume_test_silo_mode(SiloMode.CELL): repos = Repository.objects.filter(organization_id=self.organization.id).order_by("name") @@ -396,7 +409,8 @@ def test_creates_new_repos_for_bitbucket(self) -> None: "organizations:bitbucket-repo-auto-sync-apply", ] ): - sync_repos_for_org(oi.id) + with self.tasks(): + sync_repos_for_org(oi.id) with assume_test_silo_mode(SiloMode.CELL): repos = Repository.objects.filter(organization_id=self.organization.id).order_by("name") @@ -460,7 +474,8 @@ def test_creates_new_repos_for_vsts(self, mock_get_client: MagicMock) -> None: with self.feature( ["organizations:vsts-repo-auto-sync", "organizations:vsts-repo-auto-sync-apply"] ): - sync_repos_for_org(oi.id) + with self.tasks(): + sync_repos_for_org(oi.id) with assume_test_silo_mode(SiloMode.CELL): repos = Repository.objects.filter(organization_id=self.organization.id).order_by("name")