Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,8 @@ limitations imposed by the Kale data marshalling model.
## Resources

- Kale introduction [blog post](https://medium.com/kubeflow/automating-jupyter-notebook-deployments-to-kubeflow-pipelines-with-kale-a4ede38bea1f)
- Codelabs showcasing Kale working in MiniKF with Arrikto's [Rok](https://www.arrikto.com/):
- [From Notebook to Kubeflow Pipelines](https://codelabs.developers.google.com/codelabs/cloud-kubeflow-minikf-kale/#0)
- [From Notebook to Kubeflow Pipelines with HP Tuning](https://arrik.to/demowfhp)
- KubeCon NA Tutorial 2019: [From Notebook to Kubeflow Pipelines: An End-to-End Data Science Workflow](https://kccncna19.sched.com/event/Uaeq/tutorial-from-notebook-to-kubeflow-pipelines-an-end-to-end-data-science-workflow-michelle-casbon-google-stefano-fioravanzo-fondazione-bruno-kessler-ilias-katsakioris-arrikto?iframe=no&w=100%&sidebar=yes&bg=no)
/ [video](http://youtube.com/watch?v=C9rJzTzVzvQ)
- CNCF Webinar 2020: [From Notebook to Kubeflow Pipelines with MiniKF & Kale](https://www.cncf.io/webinars/from-notebook-to-kubeflow-pipelines-with-minikf-kale/)
/ [video](https://www.youtube.com/watch?v=1fX9ZFWkvvs)
- KubeCon EU Tutorial 2020: [From Notebook to Kubeflow Pipelines with HP Tuning: A Data Science Journey](https://kccnceu20.sched.com/event/ZerG/tutorial-from-notebook-to-kubeflow-pipelines-with-hp-tuning-a-data-science-journey-stefano-fioravanzo-ilias-katsakioris-arrikto)
/ [video](https://www.youtube.com/watch?v=QK0NxhyADpM)

Expand Down
4 changes: 2 additions & 2 deletions backend/kale/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
log = logging.getLogger(__name__)

PY_FN_TEMPLATE = "py_function_template.jinja2"
NB_FN_TEMPLATE = "new_nb_function_template.jinja2"
PIPELINE_TEMPLATE = "new_pipeline_template.jinja2"
NB_FN_TEMPLATE = "nb_function_template.jinja2"
PIPELINE_TEMPLATE = "pipeline_template.jinja2"
PIPELINE_ORIGIN = {"nb": NB_FN_TEMPLATE, "py": PY_FN_TEMPLATE}

KFP_DSL_ARTIFACT_IMPORTS = [
Expand Down
62 changes: 0 additions & 62 deletions backend/kale/templates/new_pipeline_template.jinja2

This file was deleted.

232 changes: 44 additions & 188 deletions backend/kale/templates/pipeline_template.jinja2
Original file line number Diff line number Diff line change
@@ -1,206 +1,62 @@
import json
import kfp.dsl as kfp_dsl
from kfp.dsl import Input, Output, Dataset, HTML, Metrics, ClassificationMetrics, Artifact, Model

import kfp.dsl as _kfp_dsl
import kfp.components as _kfp_components
{{ lightweight_components | join('\n\n') }}

from collections import OrderedDict
from kubernetes import client as k8s_client

{# PIPELINE LIGHTWEIGHT COMPONENTS #}
{% for func in lightweight_components -%}
{{func}}
{% endfor -%}

{# DEFINE PIPELINE TASKS FROM FUNCTIONS #}
{%- for name in pipeline.steps_names -%}
{% if base_image != '' %}
_kale_{{ name }}_op = _kfp_components.func_to_container_op({{ name }}, base_image='{{ base_image }}')
{% else %}
_kale_{{ name }}_op = _kfp_components.func_to_container_op({{ name }})
{% endif %}
{% endfor -%}

{# DECLARE PIPELINE #}
@_kfp_dsl.pipeline(
name='{{ pipeline_name }}',
description='{{ pipeline_description }}'
@kfp_dsl.pipeline(
name='{{ pipeline_name }}',
description='{{ pipeline_description }}'
)
def auto_generated_pipeline({%- for arg in pipeline.pps_names -%}
{{ arg }}='{{ (pipeline.pps_values)[loop.index-1] }}'
{%- if loop.index < pipeline.pps_values|length -%},
{%- endif -%}
{%- endfor -%}):
_kale_pvolumes_dict = OrderedDict()
_kale_volume_step_names = []
_kale_volume_name_parameters = []

{% if timeout %}
_kfp_dsl.get_pipeline_conf().set_timeout({{ timeout }})
{% endif %}

{% for vol in volumes -%}
{% set name = vol['name'] %}
{% set mountpoint = vol['mount_point'] %}
{% set pvc_size = vol['size']|string|default ('') + vol['size_type']|default ('') %}
{% set annotations = vol['annotations']|default({}) %}
{% set storage_class_name = vol['storage_class_name'] %}
_kale_annotations = {{ annotations }}

{% if vol['type'] == 'pv' %}

_kale_pvc{{ loop.index }} = k8s_client.V1PersistentVolumeClaim(
api_version="v1",
kind="PersistentVolumeClaim",
metadata=k8s_client.V1ObjectMeta(
name="{{ name }}-claim-{{ pipeline_name }}"
),
spec=k8s_client.V1PersistentVolumeClaimSpec(
volume_name="{{ name }}",
access_modes={{ vol['volume_access_mode'] }},
{%- if storage_class_name %}
storage_class_name="{{ storage_class_name }}",
{%- endif %}
resources=k8s_client.V1ResourceRequirements(
requests={"storage": "{{ pvc_size }}"}
)
)
)

_kale_vop{{ loop.index }} = _kfp_dsl.VolumeOp(
name="pvc-data{{ loop.index }}",
annotations=_kale_annotations,
k8s_resource=_kale_pvc{{ loop.index }}
)
_kale_volume = _kale_vop{{ loop.index }}.volume
_kale_volume_step_names.append(_kale_vop{{ loop.index }}.name)
_kale_volume_name_parameters.append(_kale_vop{{ loop.index }}.outputs["name"].full_name)

{% elif vol['type'] == 'pvc' %}

_kale_volume = _kfp_dsl.PipelineVolume(pvc='{{ name }}')

{% elif vol['type'] == 'new_pvc' %}

_kale_vop{{ loop.index }} = _kfp_dsl.VolumeOp(
name='create-volume-{{ loop.index }}',
resource_name='{{ name }}',
{%- if annotations %}
annotations=_kale_annotations,
{% endif -%}
modes={{ vol['volume_access_mode'] }},
{%- if storage_class_name %}
storage_class="{{ storage_class_name }}",
{%- endif %}
size='{{ pvc_size }}'
def auto_generated_pipeline(
{%- for param_name, param_info in pipeline_param_info.items() %}
{{ param_name.lower() }}: {{ param_info.type }} = {% if param_info.type == 'bool' %}{{ param_info.default }}{% else %}{{ param_info.default | tojson }}{% endif %}{% if not loop.last %},{% endif %}
{%- endfor %}
):
"""Auto-generated pipeline function."""

{% set steps_list = pipeline.steps | list %}
{% for step in steps_list %}
{{ step.name }}_task = {{ step.name }}_step(
{%- if step_inputs.get(step.name) %}
{%- for input_var in step_inputs[step.name] %}
{{ input_var }}_input_artifact={{ step_inputs_sources[step.name][input_var] }}_task.outputs["{{ input_var }}_output_artifact"]{% if not loop.last or pipeline_param_info %},{% endif %}
{%- endfor %}
{%- endif %}
{%- if pipeline_param_info %}
{%- for param_name, param_info in pipeline_param_info.items() %}
{{ param_info.clean_name }}={{ param_name.lower() }}{% if not loop.last %},{% endif %}
{%- endfor %}
{%- endif %}
)
_kale_volume = _kale_vop{{ loop.index }}.volume
_kale_volume_step_names.append(_kale_vop{{ loop.index }}.name)
_kale_volume_name_parameters.append(_kale_vop{{ loop.index }}.outputs["name"].full_name)

{% endif %}

_kale_pvolumes_dict['{{ mountpoint }}'] = _kale_volume

{% endfor %}

{% if marshal_volume %}
_kale_marshal_vop = _kfp_dsl.VolumeOp(
name="kale-marshal-volume",
resource_name="kale-marshal-pvc",
modes={{ pipeline.config.volume_access_mode }},
{%- if pipeline.config.storage_class_name %}
storage_class="{{ pipeline.config.storage_class_name }}",
{%- endif %}
size="1Gi"
)
_kale_volume_step_names.append(_kale_marshal_vop.name)
_kale_volume_name_parameters.append(_kale_marshal_vop.outputs["name"].full_name)
_kale_pvolumes_dict['{{ marshal_path }}'] = _kale_marshal_vop.volume
{% if loop.index0 > 0 %}
{%- if step_inputs.get(step.name) %}
{%- for input_var in step_inputs[step.name] %}
{{ step.name }}_task.after({{ step_inputs_sources[step.name][input_var] }}_task)
{%- endfor %}
{%- else %}
{{ step.name }}_task.after({{ steps_list[loop.index0 - 1].name }}_task)
{%- endif %}
{% endif %}

_kale_volume_step_names.sort()
_kale_volume_name_parameters.sort()
{{ step.name }}_task.set_display_name("{{ component_names[step.name] }}-step")

{% for step in pipeline.steps %}
_kale_{{ step.name }}_task = _kale_{{ step.name }}_op({{ pipeline.all_steps_parameters[step.name]|join(', ') }})\
.add_pvolumes(_kale_pvolumes_dict)\
.after({{ pipeline.pipeline_dependencies_tasks[ step.name ]|map('add_prefix', '_kale_')|map('add_suffix', '_task')|join(', ') }})
{%- if step.config.annotations %}
_kale_step_annotations = {{ step.config.annotations }}
for _kale_k, _kale_v in _kale_step_annotations.items():
_kale_{{ step.name }}_task.add_pod_annotation(_kale_k, _kale_v)
{%- endif %}
{%- if step.config.labels %}
_kale_step_labels = {{ step.config.labels }}
for _kale_k, _kale_v in _kale_step_labels.items():
_kale_{{ step.name }}_task.add_pod_label(_kale_k, _kale_v)
{%- endif %}
{%- if step.config.limits %}
_kale_step_limits = {{ step.config.limits }}
for _kale_k, _kale_v in _kale_step_limits.items():
_kale_{{ step.name }}_task.container.add_resource_limit(_kale_k, _kale_v)
{%- endif %}
{%- if step.config.retry_count %}
_kale_{{ step.name }}_task.set_retry_strategy(
num_retries={{ step.config.retry_count }},
retry_policy="Always",
backoff_duration={{ step.config.retry_interval|quote_if_not_none }},
backoff_factor={{ step.config.retry_factor }},
backoff_max_duration={{ step.config.retry_max_interval|quote_if_not_none }})
{%- endif %}
_kale_{{ step.name }}_task.container.working_dir = "{{ abs_working_dir }}"
_kale_{{ step.name }}_task.container.set_security_context(k8s_client.V1SecurityContext(run_as_user=0))
_kale_output_artifacts = {}
{%- if step.metrics %}
_kale_output_artifacts.update({'mlpipeline-metrics': '/tmp/mlpipeline-metrics.json'})
{%- for limit_key, limit_value in step.config.limits.items() %}
{%- if limit_key in ['nvidia.com/gpu', 'amd.com/gpu'] %}
{{ step.name }}_task.set_accelerator_type("{{ limit_key }}").set_accelerator_limit({{ limit_value }})
{%- endif %}
{%- if pipeline.processor.id == "nb" and step.name != "pipeline_metrics" %}
_kale_output_artifacts.update({'mlpipeline-ui-metadata': '/tmp/mlpipeline-ui-metadata.json'})
_kale_output_artifacts.update({'{{ step.name }}': '/{{ step.name }}.html'})
{%- endif %}
{%- if pipeline.processor.id == "py" and step.artifacts and step.name != "pipeline_metrics" %}
_kale_output_artifacts.update({'mlpipeline-ui-metadata': '/tmp/mlpipeline-ui-metadata.json'})
{%- for artifact in step.artifacts %}
_kale_output_artifacts.update({'{{ artifact["name"] }}': '{{ artifact["path"] }}'})
{%- endfor %}
{%- endif %}
_kale_{{ step.name }}_task.output_artifact_paths.update(_kale_output_artifacts)
_kale_{{ step.name }}_task.add_pod_label("pipelines.kubeflow.org/metadata_written", "true")
_kale_dep_names = (_kale_{{ step.name }}_task.dependent_names +
_kale_volume_step_names)
_kale_{{ step.name }}_task.add_pod_annotation(
"kubeflow-kale.org/dependent-templates", json.dumps(_kale_dep_names))
if _kale_volume_name_parameters:
_kale_{{ step.name }}_task.add_pod_annotation(
"kubeflow-kale.org/volume-name-parameters",
json.dumps(_kale_volume_name_parameters))
{% endfor %}

{# Snaphosts #}
{% for vol in volumes -%}
{% if vol['snapshot'] %}
_kale_snapshot{{ loop.index }} = _kfp_dsl.VolumeSnapshotOp(
name='snapshot-volume-{{ loop.index }}',
resource_name='{{ vol['snapshot_name'] }}',
volume=_kale_vop{{ loop.index }}.volume.after({{ pipeline.get_leaf_nodes()|map('add_prefix', '_kale_')|map('add_suffix', '_task')|join(', ') }})
)
{% endif %}
{% endfor %}

{% endfor %}

{# The script will deploy the pipeline if run manually #}
if __name__ == "__main__":
pipeline_func = auto_generated_pipeline
pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)
from kfp import compiler

# Get or create an experiment and submit a pipeline run
import kfp
client = kfp.Client()
experiment = client.create_experiment('{{ experiment_name }}')
pipeline_filename = auto_generated_pipeline.__name__ + '.yaml'
compiler.Compiler().compile(auto_generated_pipeline, pipeline_filename)

# Submit a pipeline run
from kale.common import kfputils
pipeline_id, version_id = kfputils.upload_pipeline(pipeline_filename, "{{ pipeline_name }}")
run_result = kfputils.run_pipeline(experiment_name=experiment.name, pipeline_id=pipeline_id, version_id=version_id)
print(f"Pipeline compiled to {pipeline_filename}")
print("To run, upload this YAML to your KFP v2 instance or use kfp.Client().create_run_from_pipeline_func.")
39 changes: 0 additions & 39 deletions docker/jupyterlab/Dockerfile.rok

This file was deleted.

Loading