Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 10 additions & 22 deletions backend/kale/common/kfputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,39 +137,27 @@ def upload_pipeline(pipeline_package_path: str, pipeline_name: str,
client = _get_kfp_client(host)
log.info("Uploading pipeline '%s'...", pipeline_name)
pipeline_id = get_pipeline_id(pipeline_name, host=host)
version_name = utils.random_string()
if not pipeline_id:
# The first version of the pipeline is set to the pipeline name value.
# To work around this, upload the first pipeline, then another one
# with a proper version name. Finally delete the original pipeline.
# New pipeline: upload_pipeline creates a default version automatically
upp = client.upload_pipeline(
pipeline_package_path=pipeline_package_path,
pipeline_name=pipeline_name)
pipeline_id = upp.pipeline_id
log.info("Uploaded Pipeline '%s' id: %s", pipeline_name, pipeline_id)
upv = client.upload_pipeline_version(
pipeline_package_path=pipeline_package_path,
pipeline_version_name=version_name,
pipeline_id=pipeline_id)
# delete the first version which has the same name as the pipeline
log.info("Uploaded pipeline '%s' with id: %s", pipeline_name, pipeline_id)
# Get the default version that was created with the pipeline
versions = client.list_pipeline_versions(pipeline_id=pipeline_id)
sorted_versions = sorted(versions.pipeline_versions,
key=lambda v: v.created_at)
delete_vid = sorted_versions[0].pipeline_version_id
client.delete_pipeline_version(
pipeline_id=pipeline_id,
pipeline_version_id=delete_vid
)
log.info("Deleted pipeline version with name '%s' and ID: %s",
pipeline_name, upp.pipeline_id)
version_id = versions.pipeline_versions[0].pipeline_version_id
else:
# Existing pipeline: upload a new version
version_name = utils.random_string()
upv = client.upload_pipeline_version(
pipeline_package_path=pipeline_package_path,
pipeline_version_name=version_name,
pipeline_id=pipeline_id)
log.info("Successfully uploaded version '%s' for pipeline '%s'.",
version_name, pipeline_name)
return pipeline_id, upv.pipeline_version_id
version_id = upv.pipeline_version_id
log.info("Uploaded version '%s' for pipeline '%s'.",
version_name, pipeline_name)
return pipeline_id, version_id


def run_pipeline(experiment_name: str, pipeline_id: str, run_name: str = None,
Expand Down