From 7d10fa629f2ea431952b442fb8442e345a2cc7c1 Mon Sep 17 00:00:00 2001 From: Eder Ignatowicz Date: Sun, 1 Feb 2026 11:39:50 -0500 Subject: [PATCH] bug(backend): Pipeline Version Link Returns "Cannot retrieve pipeline version" Error Fix race condition in upload_pipeline() that caused "Cannot retrieve pipeline version" errors when creating new pipelines. The previous code created a pipeline (which auto-creates a default version), then created a second version with a random name, then tried to delete the default version by sorting versions by created_at timestamp. If both versions had the same timestamp (common with second-precision timestamps), the sort order was undefined and could delete the wrong version - the one we wanted to keep. The returned version ID then pointed to a deleted version. Simplified the flow to just use the default version that upload_pipeline() creates, eliminating the race condition entirely: - New pipelines: use the default version created by upload_pipeline() - Existing pipelines: upload a new version with random name (unchanged) Signed-off-by: Eder Ignatowicz --- backend/kale/common/kfputils.py | 32 ++++++++++---------------------- 1 file changed, 10 insertions(+), 22 deletions(-) diff --git a/backend/kale/common/kfputils.py b/backend/kale/common/kfputils.py index fb7845e29..68d124c5c 100644 --- a/backend/kale/common/kfputils.py +++ b/backend/kale/common/kfputils.py @@ -126,39 +126,27 @@ def upload_pipeline(pipeline_package_path: str, pipeline_name: str, client = _get_kfp_client(host) log.info("Uploading pipeline '%s'...", pipeline_name) pipeline_id = get_pipeline_id(pipeline_name, host=host) - version_name = utils.random_string() if not pipeline_id: - # The first version of the pipeline is set to the pipeline name value. - # To work around this, upload the first pipeline, then another one - # with a proper version name. Finally delete the original pipeline. + # New pipeline: upload_pipeline creates a default version automatically upp = client.upload_pipeline( pipeline_package_path=pipeline_package_path, pipeline_name=pipeline_name) pipeline_id = upp.pipeline_id - log.info("Uploaded Pipeline '%s' id: %s", pipeline_name, pipeline_id) - upv = client.upload_pipeline_version( - pipeline_package_path=pipeline_package_path, - pipeline_version_name=version_name, - pipeline_id=pipeline_id) - # delete the first version which has the same name as the pipeline + log.info("Uploaded pipeline '%s' with id: %s", pipeline_name, pipeline_id) + # Get the default version that was created with the pipeline versions = client.list_pipeline_versions(pipeline_id=pipeline_id) - sorted_versions = sorted(versions.pipeline_versions, - key=lambda v: v.created_at) - delete_vid = sorted_versions[0].pipeline_version_id - client.delete_pipeline_version( - pipeline_id=pipeline_id, - pipeline_version_id=delete_vid - ) - log.info("Deleted pipeline version with name '%s' and ID: %s", - pipeline_name, upp.pipeline_id) + version_id = versions.pipeline_versions[0].pipeline_version_id else: + # Existing pipeline: upload a new version + version_name = utils.random_string() upv = client.upload_pipeline_version( pipeline_package_path=pipeline_package_path, pipeline_version_name=version_name, pipeline_id=pipeline_id) - log.info("Successfully uploaded version '%s' for pipeline '%s'.", - version_name, pipeline_name) - return pipeline_id, upv.pipeline_version_id + version_id = upv.pipeline_version_id + log.info("Uploaded version '%s' for pipeline '%s'.", + version_name, pipeline_name) + return pipeline_id, version_id def run_pipeline(experiment_name: str, pipeline_id: str, run_name: str = None,