Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
356 changes: 203 additions & 153 deletions apps/worker/tasks/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,22 +220,17 @@ def run_impl_within_lock(
milestone = Milestones.NOTIFICATIONS_SENT

log.info("Starting notifications", extra={"commit": commitid, "repoid": repoid})
commits_query = db_session.query(Commit).filter(
Commit.repoid == repoid, Commit.commitid == commitid
)
commit: Commit = commits_query.first()
assert commit, "Commit not found in database."

commit = self._fetch_commit(db_session, repoid, commitid)
any_failures, all_tests_passed = get_test_status(commit.repoid, commit.commitid)

# This functionality is disabled for now because it's too noisy for customers
# TA error messaging is disabled for now (too noisy for customers)
ta_error_msg = None

if any_failures and not all_tests_passed:
self._call_upload_breadcrumb_task(
commit_sha=commit.commitid,
repo_id=commit.repoid,
milestone=Milestones.NOTIFICATIONS_SENT,
milestone=milestone,
error=Errors.SKIPPED_NOTIFICATIONS,
)
return {
Expand All @@ -244,158 +239,28 @@ def run_impl_within_lock(
"reason": "test_failures",
}

try:
installation_name_to_use = get_installation_name_for_owner_for_task(
self.name, commit.repository.author
)
repository_service = get_repo_provider_service_for_specific_commit(
commit, installation_name_to_use
)
except RepositoryWithoutValidBotError:
save_commit_error(
commit, error_code=CommitErrorTypes.REPO_BOT_INVALID.value
)

log.warning(
"Unable to start notifications because repo doesn't have a valid bot",
extra={"repoid": repoid, "commit": commitid},
)
self.log_checkpoint(UploadFlow.NOTIF_NO_VALID_INTEGRATION)
self._call_upload_breadcrumb_task(
commit_sha=commitid,
repo_id=repoid,
milestone=milestone,
error=Errors.REPO_MISSING_VALID_BOT,
)
return {"notified": False, "notifications": None, "reason": "no_valid_bot"}
except NoConfiguredAppsAvailable as exp:
if exp.rate_limited_count > 0:
# There's at least 1 app that we can use to communicate with GitHub,
# but this app happens to be rate limited now. We try again later.
# Min wait time of 1 minute
retry_delay_seconds = max(60, get_seconds_to_next_hour())
log.warning(
"Unable to start notifications. Retrying again later.",
extra={
"repoid": repoid,
"commit": commitid,
"apps_available": exp.apps_count,
"apps_rate_limited": exp.rate_limited_count,
"apps_suspended": exp.suspended_count,
"countdown_seconds": retry_delay_seconds,
},
)
self._call_upload_breadcrumb_task(
commit_sha=commitid,
repo_id=repoid,
milestone=milestone,
error=Errors.INTERNAL_APP_RATE_LIMITED,
)
return self._attempt_retry(
max_retries=10,
countdown=retry_delay_seconds,
current_yaml=current_yaml,
commit=commit,
**kwargs,
)
# Maybe we have apps that are suspended. We can't communicate with github.
log.warning(
"We can't find an app to communicate with GitHub. Not notifying.",
extra={
"repoid": repoid,
"commit": commitid,
"apps_available": exp.apps_count,
"apps_suspended": exp.suspended_count,
},
)
self.log_checkpoint(UploadFlow.NOTIF_NO_APP_INSTALLATION)
return {
"notified": False,
"notifications": None,
"reason": "no_valid_github_app_found",
}
result = self._get_repository_service_for_notify(
commit, repoid, commitid, milestone, current_yaml=current_yaml, **kwargs
)
if isinstance(result, dict):
return result
repository_service, installation_name_to_use = result

if current_yaml is None:
current_yaml = async_to_sync(get_current_yaml)(commit, repository_service)
else:
current_yaml = UserYaml.from_dict(current_yaml)

try:
ci_results = self.fetch_and_update_whether_ci_passed(
repository_service, commit, current_yaml
)
except TorngitClientError as ex:
log.info(
"Unable to fetch CI results due to a client problem. Not notifying user",
extra={
"repoid": commit.repoid,
"commit": commit.commitid,
"code": ex.code,
},
)
self.log_checkpoint(UploadFlow.NOTIF_GIT_CLIENT_ERROR)
self._call_upload_breadcrumb_task(
commit_sha=commit.commitid,
repo_id=commit.repoid,
milestone=milestone,
error=Errors.GIT_CLIENT_ERROR,
)
return {
"notified": False,
"notifications": None,
"reason": "not_able_fetch_ci_result",
}
except TorngitServerFailureError:
log.info(
"Unable to fetch CI results due to server issues. Not notifying user",
extra={"repoid": commit.repoid, "commit": commit.commitid},
)
self.log_checkpoint(UploadFlow.NOTIF_GIT_SERVICE_ERROR)
self._call_upload_breadcrumb_task(
commit_sha=commit.commitid,
repo_id=commit.repoid,
milestone=milestone,
error=Errors.GIT_CLIENT_ERROR,
)
return {
"notified": False,
"notifications": None,
"reason": "server_issues_ci_result",
}
ci_result = self._fetch_ci_results_or_error(
repository_service, commit, current_yaml, milestone
)
if isinstance(ci_result, dict):
return ci_result
ci_results = ci_result

if self.should_wait_longer(current_yaml, commit, ci_results):
log.info(
"Not sending notifications yet because we are waiting for CI to finish",
extra={"repoid": commit.repoid, "commit": commit.commitid},
)
ghapp_default_installations = list(
filter(
lambda obj: obj.name == installation_name_to_use
and obj.is_configured(),
commit.repository.author.github_app_installations or [],
)
)
rely_on_webhook_ghapp = ghapp_default_installations != [] and any(
obj.is_repo_covered_by_integration(commit.repository)
for obj in ghapp_default_installations
)
rely_on_webhook_legacy = commit.repository.using_integration
if (
rely_on_webhook_ghapp
or rely_on_webhook_legacy
or commit.repository.hookid
):
# rely on the webhook, but still retry in case we miss the webhook
max_retries = 5
countdown = (60 * 3) * 2**self.request.retries
else:
max_retries = 10
countdown = 15 * 2**self.request.retries
return self._attempt_retry(
max_retries=max_retries,
countdown=countdown,
current_yaml=current_yaml,
commit=commit,
**kwargs,
return self._handle_ci_wait_retry(
commit, current_yaml, installation_name_to_use, **kwargs
)

report_service = ReportService(
Expand Down Expand Up @@ -531,6 +396,191 @@ def run_impl_within_lock(
)
return {"notified": False, "notifications": None}

def _fetch_commit(self, db_session: Session, repoid: int, commitid: str) -> Commit:
"""Fetch and validate the commit from the database."""
commit = (
db_session.query(Commit)
.filter(Commit.repoid == repoid, Commit.commitid == commitid)
.first()
)
assert commit, "Commit not found in database."
return commit

def _get_repository_service_for_notify(
self, commit, repoid, commitid, milestone, current_yaml=None, **kwargs
):
"""Get the repository service for notification, handling bot/app errors.

Returns (repository_service, installation_name) on success, or a dict
result for early return on error.
"""
try:
installation_name_to_use = get_installation_name_for_owner_for_task(
self.name, commit.repository.author
)
repository_service = get_repo_provider_service_for_specific_commit(
commit, installation_name_to_use
)
return (repository_service, installation_name_to_use)
except RepositoryWithoutValidBotError:
save_commit_error(
commit, error_code=CommitErrorTypes.REPO_BOT_INVALID.value
)
log.warning(
"Unable to start notifications because repo doesn't have a valid bot",
extra={"repoid": repoid, "commit": commitid},
)
self.log_checkpoint(UploadFlow.NOTIF_NO_VALID_INTEGRATION)
self._call_upload_breadcrumb_task(
commit_sha=commitid,
repo_id=repoid,
milestone=milestone,
error=Errors.REPO_MISSING_VALID_BOT,
)
return {"notified": False, "notifications": None, "reason": "no_valid_bot"}
except NoConfiguredAppsAvailable as exp:
return self._handle_no_configured_apps(
commit, exp, repoid, commitid, milestone, current_yaml, **kwargs
)

def _handle_no_configured_apps(
self, commit, exp, repoid, commitid, milestone, current_yaml, **kwargs
):
"""Handle the case where no configured GitHub apps are available.

Either retries (if apps are rate-limited) or returns an error result.
"""
if exp.rate_limited_count > 0:
# There's at least 1 app that we can use to communicate with GitHub,
# but this app happens to be rate limited now. We try again later.
# Min wait time of 1 minute
retry_delay_seconds = max(60, get_seconds_to_next_hour())
log.warning(
"Unable to start notifications. Retrying again later.",
extra={
"repoid": repoid,
"commit": commitid,
"apps_available": exp.apps_count,
"apps_rate_limited": exp.rate_limited_count,
"apps_suspended": exp.suspended_count,
"countdown_seconds": retry_delay_seconds,
},
)
self._call_upload_breadcrumb_task(
commit_sha=commitid,
repo_id=repoid,
milestone=milestone,
error=Errors.INTERNAL_APP_RATE_LIMITED,
)
return self._attempt_retry(
max_retries=10,
countdown=retry_delay_seconds,
current_yaml=current_yaml,
commit=commit,
**kwargs,
)
# Maybe we have apps that are suspended. We can't communicate with github.
log.warning(
"We can't find an app to communicate with GitHub. Not notifying.",
extra={
"repoid": repoid,
"commit": commitid,
"apps_available": exp.apps_count,
"apps_suspended": exp.suspended_count,
},
)
self.log_checkpoint(UploadFlow.NOTIF_NO_APP_INSTALLATION)
return {
"notified": False,
"notifications": None,
"reason": "no_valid_github_app_found",
}

def _fetch_ci_results_or_error(
self, repository_service, commit, current_yaml, milestone
):
"""Fetch CI results, handling provider errors.

Returns ci_results on success, or a dict result for early return on error.
"""
try:
return self.fetch_and_update_whether_ci_passed(
repository_service, commit, current_yaml
)
except TorngitClientError as ex:
log.info(
"Unable to fetch CI results due to a client problem. Not notifying user",
extra={
"repoid": commit.repoid,
"commit": commit.commitid,
"code": ex.code,
},
)
self.log_checkpoint(UploadFlow.NOTIF_GIT_CLIENT_ERROR)
self._call_upload_breadcrumb_task(
commit_sha=commit.commitid,
repo_id=commit.repoid,
milestone=milestone,
error=Errors.GIT_CLIENT_ERROR,
)
return {
"notified": False,
"notifications": None,
"reason": "not_able_fetch_ci_result",
}
except TorngitServerFailureError:
log.info(
"Unable to fetch CI results due to server issues. Not notifying user",
extra={"repoid": commit.repoid, "commit": commit.commitid},
)
self.log_checkpoint(UploadFlow.NOTIF_GIT_SERVICE_ERROR)
self._call_upload_breadcrumb_task(
commit_sha=commit.commitid,
repo_id=commit.repoid,
milestone=milestone,
error=Errors.GIT_CLIENT_ERROR,
)
return {
"notified": False,
"notifications": None,
"reason": "server_issues_ci_result",
}

def _handle_ci_wait_retry(
self, commit, current_yaml, installation_name_to_use, **kwargs
):
"""Handle retrying when CI hasn't finished yet."""
log.info(
"Not sending notifications yet because we are waiting for CI to finish",
extra={"repoid": commit.repoid, "commit": commit.commitid},
)
ghapp_default_installations = list(
filter(
lambda obj: obj.name == installation_name_to_use
and obj.is_configured(),
commit.repository.author.github_app_installations or [],
)
)
rely_on_webhook_ghapp = ghapp_default_installations != [] and any(
obj.is_repo_covered_by_integration(commit.repository)
for obj in ghapp_default_installations
)
rely_on_webhook_legacy = commit.repository.using_integration
if rely_on_webhook_ghapp or rely_on_webhook_legacy or commit.repository.hookid:
# rely on the webhook, but still retry in case we miss the webhook
max_retries = 5
countdown = (60 * 3) * 2**self.request.retries
else:
max_retries = 10
countdown = 15 * 2**self.request.retries
return self._attempt_retry(
max_retries=max_retries,
countdown=countdown,
current_yaml=current_yaml,
commit=commit,
**kwargs,
)

def is_using_codecov_commenter(
self, repository_service: TorngitBaseAdapter
) -> bool:
Expand Down
Loading