From 1552b2590e81ed9fb9fab624b2a38cda7c498edf Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Sat, 7 Feb 2026 23:34:02 +0900 Subject: [PATCH] refactor: extract repo service and CI methods from run_impl_within_lock Extract the top half of NotifyTask.run_impl_within_lock() into focused methods, reducing the method from 322 lines to ~190 lines. - _fetch_commit(): fetches and validates commit from DB - _get_repository_service_for_notify(): gets repo service with error handling - _handle_no_configured_apps(): handles rate-limited/suspended GitHub apps - _fetch_ci_results_or_error(): fetches CI results with error handling - _handle_ci_wait_retry(): retry logic when CI hasn't finished Co-authored-by: Cursor --- apps/worker/tasks/notify.py | 356 ++++++++++++++++++++---------------- 1 file changed, 203 insertions(+), 153 deletions(-) diff --git a/apps/worker/tasks/notify.py b/apps/worker/tasks/notify.py index 660d5caf5..a4ec72da4 100644 --- a/apps/worker/tasks/notify.py +++ b/apps/worker/tasks/notify.py @@ -220,22 +220,17 @@ def run_impl_within_lock( milestone = Milestones.NOTIFICATIONS_SENT log.info("Starting notifications", extra={"commit": commitid, "repoid": repoid}) - commits_query = db_session.query(Commit).filter( - Commit.repoid == repoid, Commit.commitid == commitid - ) - commit: Commit = commits_query.first() - assert commit, "Commit not found in database." - + commit = self._fetch_commit(db_session, repoid, commitid) any_failures, all_tests_passed = get_test_status(commit.repoid, commit.commitid) - # This functionality is disabled for now because it's too noisy for customers + # TA error messaging is disabled for now (too noisy for customers) ta_error_msg = None if any_failures and not all_tests_passed: self._call_upload_breadcrumb_task( commit_sha=commit.commitid, repo_id=commit.repoid, - milestone=Milestones.NOTIFICATIONS_SENT, + milestone=milestone, error=Errors.SKIPPED_NOTIFICATIONS, ) return { @@ -244,158 +239,28 @@ def run_impl_within_lock( "reason": "test_failures", } - try: - installation_name_to_use = get_installation_name_for_owner_for_task( - self.name, commit.repository.author - ) - repository_service = get_repo_provider_service_for_specific_commit( - commit, installation_name_to_use - ) - except RepositoryWithoutValidBotError: - save_commit_error( - commit, error_code=CommitErrorTypes.REPO_BOT_INVALID.value - ) - - log.warning( - "Unable to start notifications because repo doesn't have a valid bot", - extra={"repoid": repoid, "commit": commitid}, - ) - self.log_checkpoint(UploadFlow.NOTIF_NO_VALID_INTEGRATION) - self._call_upload_breadcrumb_task( - commit_sha=commitid, - repo_id=repoid, - milestone=milestone, - error=Errors.REPO_MISSING_VALID_BOT, - ) - return {"notified": False, "notifications": None, "reason": "no_valid_bot"} - except NoConfiguredAppsAvailable as exp: - if exp.rate_limited_count > 0: - # There's at least 1 app that we can use to communicate with GitHub, - # but this app happens to be rate limited now. We try again later. - # Min wait time of 1 minute - retry_delay_seconds = max(60, get_seconds_to_next_hour()) - log.warning( - "Unable to start notifications. Retrying again later.", - extra={ - "repoid": repoid, - "commit": commitid, - "apps_available": exp.apps_count, - "apps_rate_limited": exp.rate_limited_count, - "apps_suspended": exp.suspended_count, - "countdown_seconds": retry_delay_seconds, - }, - ) - self._call_upload_breadcrumb_task( - commit_sha=commitid, - repo_id=repoid, - milestone=milestone, - error=Errors.INTERNAL_APP_RATE_LIMITED, - ) - return self._attempt_retry( - max_retries=10, - countdown=retry_delay_seconds, - current_yaml=current_yaml, - commit=commit, - **kwargs, - ) - # Maybe we have apps that are suspended. We can't communicate with github. - log.warning( - "We can't find an app to communicate with GitHub. Not notifying.", - extra={ - "repoid": repoid, - "commit": commitid, - "apps_available": exp.apps_count, - "apps_suspended": exp.suspended_count, - }, - ) - self.log_checkpoint(UploadFlow.NOTIF_NO_APP_INSTALLATION) - return { - "notified": False, - "notifications": None, - "reason": "no_valid_github_app_found", - } + result = self._get_repository_service_for_notify( + commit, repoid, commitid, milestone, current_yaml=current_yaml, **kwargs + ) + if isinstance(result, dict): + return result + repository_service, installation_name_to_use = result if current_yaml is None: current_yaml = async_to_sync(get_current_yaml)(commit, repository_service) else: current_yaml = UserYaml.from_dict(current_yaml) - try: - ci_results = self.fetch_and_update_whether_ci_passed( - repository_service, commit, current_yaml - ) - except TorngitClientError as ex: - log.info( - "Unable to fetch CI results due to a client problem. Not notifying user", - extra={ - "repoid": commit.repoid, - "commit": commit.commitid, - "code": ex.code, - }, - ) - self.log_checkpoint(UploadFlow.NOTIF_GIT_CLIENT_ERROR) - self._call_upload_breadcrumb_task( - commit_sha=commit.commitid, - repo_id=commit.repoid, - milestone=milestone, - error=Errors.GIT_CLIENT_ERROR, - ) - return { - "notified": False, - "notifications": None, - "reason": "not_able_fetch_ci_result", - } - except TorngitServerFailureError: - log.info( - "Unable to fetch CI results due to server issues. Not notifying user", - extra={"repoid": commit.repoid, "commit": commit.commitid}, - ) - self.log_checkpoint(UploadFlow.NOTIF_GIT_SERVICE_ERROR) - self._call_upload_breadcrumb_task( - commit_sha=commit.commitid, - repo_id=commit.repoid, - milestone=milestone, - error=Errors.GIT_CLIENT_ERROR, - ) - return { - "notified": False, - "notifications": None, - "reason": "server_issues_ci_result", - } + ci_result = self._fetch_ci_results_or_error( + repository_service, commit, current_yaml, milestone + ) + if isinstance(ci_result, dict): + return ci_result + ci_results = ci_result + if self.should_wait_longer(current_yaml, commit, ci_results): - log.info( - "Not sending notifications yet because we are waiting for CI to finish", - extra={"repoid": commit.repoid, "commit": commit.commitid}, - ) - ghapp_default_installations = list( - filter( - lambda obj: obj.name == installation_name_to_use - and obj.is_configured(), - commit.repository.author.github_app_installations or [], - ) - ) - rely_on_webhook_ghapp = ghapp_default_installations != [] and any( - obj.is_repo_covered_by_integration(commit.repository) - for obj in ghapp_default_installations - ) - rely_on_webhook_legacy = commit.repository.using_integration - if ( - rely_on_webhook_ghapp - or rely_on_webhook_legacy - or commit.repository.hookid - ): - # rely on the webhook, but still retry in case we miss the webhook - max_retries = 5 - countdown = (60 * 3) * 2**self.request.retries - else: - max_retries = 10 - countdown = 15 * 2**self.request.retries - return self._attempt_retry( - max_retries=max_retries, - countdown=countdown, - current_yaml=current_yaml, - commit=commit, - **kwargs, + return self._handle_ci_wait_retry( + commit, current_yaml, installation_name_to_use, **kwargs ) report_service = ReportService( @@ -531,6 +396,191 @@ def run_impl_within_lock( ) return {"notified": False, "notifications": None} + def _fetch_commit(self, db_session: Session, repoid: int, commitid: str) -> Commit: + """Fetch and validate the commit from the database.""" + commit = ( + db_session.query(Commit) + .filter(Commit.repoid == repoid, Commit.commitid == commitid) + .first() + ) + assert commit, "Commit not found in database." + return commit + + def _get_repository_service_for_notify( + self, commit, repoid, commitid, milestone, current_yaml=None, **kwargs + ): + """Get the repository service for notification, handling bot/app errors. + + Returns (repository_service, installation_name) on success, or a dict + result for early return on error. + """ + try: + installation_name_to_use = get_installation_name_for_owner_for_task( + self.name, commit.repository.author + ) + repository_service = get_repo_provider_service_for_specific_commit( + commit, installation_name_to_use + ) + return (repository_service, installation_name_to_use) + except RepositoryWithoutValidBotError: + save_commit_error( + commit, error_code=CommitErrorTypes.REPO_BOT_INVALID.value + ) + log.warning( + "Unable to start notifications because repo doesn't have a valid bot", + extra={"repoid": repoid, "commit": commitid}, + ) + self.log_checkpoint(UploadFlow.NOTIF_NO_VALID_INTEGRATION) + self._call_upload_breadcrumb_task( + commit_sha=commitid, + repo_id=repoid, + milestone=milestone, + error=Errors.REPO_MISSING_VALID_BOT, + ) + return {"notified": False, "notifications": None, "reason": "no_valid_bot"} + except NoConfiguredAppsAvailable as exp: + return self._handle_no_configured_apps( + commit, exp, repoid, commitid, milestone, current_yaml, **kwargs + ) + + def _handle_no_configured_apps( + self, commit, exp, repoid, commitid, milestone, current_yaml, **kwargs + ): + """Handle the case where no configured GitHub apps are available. + + Either retries (if apps are rate-limited) or returns an error result. + """ + if exp.rate_limited_count > 0: + # There's at least 1 app that we can use to communicate with GitHub, + # but this app happens to be rate limited now. We try again later. + # Min wait time of 1 minute + retry_delay_seconds = max(60, get_seconds_to_next_hour()) + log.warning( + "Unable to start notifications. Retrying again later.", + extra={ + "repoid": repoid, + "commit": commitid, + "apps_available": exp.apps_count, + "apps_rate_limited": exp.rate_limited_count, + "apps_suspended": exp.suspended_count, + "countdown_seconds": retry_delay_seconds, + }, + ) + self._call_upload_breadcrumb_task( + commit_sha=commitid, + repo_id=repoid, + milestone=milestone, + error=Errors.INTERNAL_APP_RATE_LIMITED, + ) + return self._attempt_retry( + max_retries=10, + countdown=retry_delay_seconds, + current_yaml=current_yaml, + commit=commit, + **kwargs, + ) + # Maybe we have apps that are suspended. We can't communicate with github. + log.warning( + "We can't find an app to communicate with GitHub. Not notifying.", + extra={ + "repoid": repoid, + "commit": commitid, + "apps_available": exp.apps_count, + "apps_suspended": exp.suspended_count, + }, + ) + self.log_checkpoint(UploadFlow.NOTIF_NO_APP_INSTALLATION) + return { + "notified": False, + "notifications": None, + "reason": "no_valid_github_app_found", + } + + def _fetch_ci_results_or_error( + self, repository_service, commit, current_yaml, milestone + ): + """Fetch CI results, handling provider errors. + + Returns ci_results on success, or a dict result for early return on error. + """ + try: + return self.fetch_and_update_whether_ci_passed( + repository_service, commit, current_yaml + ) + except TorngitClientError as ex: + log.info( + "Unable to fetch CI results due to a client problem. Not notifying user", + extra={ + "repoid": commit.repoid, + "commit": commit.commitid, + "code": ex.code, + }, + ) + self.log_checkpoint(UploadFlow.NOTIF_GIT_CLIENT_ERROR) + self._call_upload_breadcrumb_task( + commit_sha=commit.commitid, + repo_id=commit.repoid, + milestone=milestone, + error=Errors.GIT_CLIENT_ERROR, + ) + return { + "notified": False, + "notifications": None, + "reason": "not_able_fetch_ci_result", + } + except TorngitServerFailureError: + log.info( + "Unable to fetch CI results due to server issues. Not notifying user", + extra={"repoid": commit.repoid, "commit": commit.commitid}, + ) + self.log_checkpoint(UploadFlow.NOTIF_GIT_SERVICE_ERROR) + self._call_upload_breadcrumb_task( + commit_sha=commit.commitid, + repo_id=commit.repoid, + milestone=milestone, + error=Errors.GIT_CLIENT_ERROR, + ) + return { + "notified": False, + "notifications": None, + "reason": "server_issues_ci_result", + } + + def _handle_ci_wait_retry( + self, commit, current_yaml, installation_name_to_use, **kwargs + ): + """Handle retrying when CI hasn't finished yet.""" + log.info( + "Not sending notifications yet because we are waiting for CI to finish", + extra={"repoid": commit.repoid, "commit": commit.commitid}, + ) + ghapp_default_installations = list( + filter( + lambda obj: obj.name == installation_name_to_use + and obj.is_configured(), + commit.repository.author.github_app_installations or [], + ) + ) + rely_on_webhook_ghapp = ghapp_default_installations != [] and any( + obj.is_repo_covered_by_integration(commit.repository) + for obj in ghapp_default_installations + ) + rely_on_webhook_legacy = commit.repository.using_integration + if rely_on_webhook_ghapp or rely_on_webhook_legacy or commit.repository.hookid: + # rely on the webhook, but still retry in case we miss the webhook + max_retries = 5 + countdown = (60 * 3) * 2**self.request.retries + else: + max_retries = 10 + countdown = 15 * 2**self.request.retries + return self._attempt_retry( + max_retries=max_retries, + countdown=countdown, + current_yaml=current_yaml, + commit=commit, + **kwargs, + ) + def is_using_codecov_commenter( self, repository_service: TorngitBaseAdapter ) -> bool: