diff --git a/src/dflow/argo_objects.py b/src/dflow/argo_objects.py index 67a3e402..85c99668 100644 --- a/src/dflow/argo_objects.py +++ b/src/dflow/argo_objects.py @@ -2,6 +2,7 @@ import json import logging import os +import shutil import tempfile import time from collections import UserDict, UserList @@ -166,6 +167,11 @@ def modify_output_parameter( path = tmpdir + "/" + name with open(path, "w") as f: f.write(jsonpickle.dumps(value)) + if config["mode"] == "debug": + path = upload_s3(path, debug_func=shutil.copy) + self.outputs.artifacts[ + "dflow_bigpar_" + name].local_path = path + return key = upload_s3(path) s3 = S3Artifact(key=key) if s3_config["repo_type"] == "s3": diff --git a/src/dflow/io.py b/src/dflow/io.py index 4a5ae15b..8eae61b8 100644 --- a/src/dflow/io.py +++ b/src/dflow/io.py @@ -432,8 +432,6 @@ def __init__( if "value" in kwargs: self.value = kwargs["value"] self.save_as_artifact = save_as_artifact - if config["mode"] == "debug": - self.save_as_artifact = False self.path = path self.source = source for k, v in kwargs.items(): @@ -846,8 +844,6 @@ def __init__( self.value_from_expression = value_from_expression self.save_as_artifact = save_as_artifact self.save_both = save_both - if config["mode"] == "debug": - self.save_as_artifact = False if "default" in kwargs: self.default = kwargs["default"] if "value" in kwargs: diff --git a/src/dflow/python/op.py b/src/dflow/python/op.py index c09fa1f6..a22463db 100644 --- a/src/dflow/python/op.py +++ b/src/dflow/python/op.py @@ -312,8 +312,7 @@ def __call__(self, *args, **op_in): if hasattr(s, "default"): kw["value"] = s.default if isinstance(s, BigParameter): - kw["save_as_artifact"] = ( - config["mode"] != "debug") + kw["save_as_artifact"] = True else: kw["type"] = s dag.inputs.parameters[n] = InputParameter(**kw) @@ -330,8 +329,7 @@ def __call__(self, *args, **op_in): if hasattr(s, "default"): kw["default"] = s.default if isinstance(s, BigParameter): - kw["save_as_artifact"] = ( - config["mode"] != "debug") + kw["save_as_artifact"] = True else: kw["type"] = s kw["value_from_parameter"] = outputs.get(n) diff --git a/src/dflow/python/python_op_template.py b/src/dflow/python/python_op_template.py index 448afcb7..83b51b6c 100644 --- a/src/dflow/python/python_op_template.py +++ b/src/dflow/python/python_op_template.py @@ -266,12 +266,12 @@ def __init__(self, elif isinstance(sign, BigParameter): if hasattr(sign, "default"): self.inputs.parameters[name] = InputParameter( - save_as_artifact=config["mode"] != "debug", + save_as_artifact=True, path="%s/inputs/parameters/" % self.tmp_root + name, type=sign.type, value=sign.default) else: self.inputs.parameters[name] = InputParameter( - save_as_artifact=config["mode"] != "debug", + save_as_artifact=True, path="%s/inputs/parameters/" % self.tmp_root + name, type=sign.type) elif isinstance(sign, Parameter): @@ -303,7 +303,7 @@ def __init__(self, % (self.tmp_root, name), default="") elif isinstance(sign, BigParameter): self.outputs.parameters[name] = OutputParameter( - save_as_artifact=config["mode"] != "debug", + save_as_artifact=True, value_from_path="%s/outputs/parameters/" % self.tmp_root + name, type=sign.type) elif isinstance(sign, Parameter): @@ -582,8 +582,7 @@ def render_script(self): if self.skip_slice_input and slices is not None: slices = "(None if {{inputs.parameters.dflow_skip_slice_"\ "input}} else %s)" % slices - if isinstance(sign, BigParameter) and \ - config["mode"] != "debug": + if isinstance(sign, BigParameter): script += " input['%s'] = handle_input_parameter('%s',"\ " '', input_sign['%s'], %s, r'%s')\n" \ % (name, name, name, slices, self.tmp_root) diff --git a/src/dflow/python/utils.py b/src/dflow/python/utils.py index 1b0bb6d2..8dc3188c 100644 --- a/src/dflow/python/utils.py +++ b/src/dflow/python/utils.py @@ -181,7 +181,7 @@ def handle_input_parameter(name, value, sign, slices=None, data_root="/tmp"): for item in jsonpickle.loads(value): dflow_list += jsonpickle.loads(item) obj = convert_dflow_list(dflow_list) - elif isinstance(sign, BigParameter) and config["mode"] != "debug": + elif isinstance(sign, BigParameter): with open(data_root + "/inputs/parameters/" + name, "r") as f: content = jsonpickle.loads(f.read()) obj = content @@ -330,7 +330,7 @@ def handle_output_parameter(name, value, sign, slices=None, data_root="/tmp"): res = [{"dflow_list_item": value, "order": slices}] with open(data_root + '/outputs/parameters/' + name, 'w') as f: f.write(jsonpickle.dumps(res)) - elif isinstance(sign, BigParameter) and config["mode"] != "debug": + elif isinstance(sign, BigParameter): with open(data_root + "/outputs/parameters/" + name, "w") as f: f.write(jsonpickle.dumps(value)) else: diff --git a/src/dflow/step.py b/src/dflow/step.py index 96dbf72f..144e2884 100644 --- a/src/dflow/step.py +++ b/src/dflow/step.py @@ -7,6 +7,7 @@ import sys import tarfile import time +import uuid from copy import copy, deepcopy from typing import Any, Dict, List, Optional, Union @@ -1486,6 +1487,55 @@ def run(self, scope, context=None, order=None): self.phase = "Skipped" return + for name, par in list(self.inputs.parameters.items()): + if par.save_as_artifact: + if hasattr(par, "value"): + path = os.path.abspath( + os.path.join("..", config["debug_artifact_dir"], + "upload/%s/%s" % (uuid.uuid4(), name))) + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, "w") as f: + f.write(jsonpickle.dumps(par.value)) + self.inputs.artifacts["dflow_bigpar_" + name] = \ + InputArtifact(path=par.path, source=S3Artifact(path)) + elif par.source is not None: + self.inputs.artifacts["dflow_bigpar_" + name] = \ + InputArtifact(path=par.path, + source=get_var(par.source, scope)) + if "dflow_bigpar_" + name not in \ + self.template.inputs.artifacts: + self.template.inputs.artifacts["dflow_bigpar_" + name] = \ + InputArtifact(path=par.path) + if name in self.template.inputs.parameters: + del self.template.inputs.parameters[name] + del self.inputs.parameters[name] + + for name, par in list(self.outputs.parameters.items()): + if par.save_as_artifact: + if name in self.template.outputs.parameters: + par = self.template.outputs.parameters[name] + kwargs = { + "global_name": "dflow_bigpar_" + par.global_name + if par.global_name is not None else None + } + if par.value_from_path is not None: + art = OutputArtifact( + path=par.value_from_path, **kwargs) + elif par.value_from_parameter is not None: + art = OutputArtifact( + _from=str(par.value_from_parameter), **kwargs) + elif par.value_from_expression is not None: + art = OutputArtifact(from_expression=str( + par.value_from_expression), **kwargs) + self.template.outputs.artifacts[ + "dflow_bigpar_" + name] = art + if not par.save_both: + del self.template.outputs.parameters[name] + self.outputs.artifacts["dflow_bigpar_" + name] = deepcopy( + self.template.outputs.artifacts["dflow_bigpar_" + name]) + if not par.save_both: + del self.outputs.parameters[name] + # source input parameters parameters = InputParameters({k: copy(v) for k, v in self.inputs.parameters.items()}) @@ -1783,13 +1833,18 @@ def record_output_parameters(self, stepdir, parameters): value = jsonpickle.dumps(par.value) with open(par_path, "w") as f: f.write(value) - if par.type is not None: + if par.type is not None or par.global_name is not None: + metadata = {} + if par.type is not None: + metadata["type"] = type_to_str(par.type) + if par.global_name is not None: + metadata["globalName"] = par.global_name os.makedirs(os.path.join( stepdir, "outputs/parameters/.dflow"), exist_ok=True) with open(os.path.join( stepdir, "outputs/parameters/.dflow/%s" % name), "w") as f: - f.write(jsonpickle.dumps({"type": type_to_str(par.type)})) + f.write(jsonpickle.dumps(metadata)) if par.global_name is not None: os.makedirs(os.path.join(stepdir, "../outputs/parameters"), exist_ok=True) @@ -1807,6 +1862,13 @@ def record_output_artifacts(self, stepdir, artifacts): art_path = os.path.join(stepdir, "outputs/artifacts/%s" % name) force_link(art.local_path, art_path) if art.global_name is not None: + metadata = {"globalName": art.global_name} + os.makedirs(os.path.join( + stepdir, "outputs/artifacts/.dflow"), exist_ok=True) + with open(os.path.join( + stepdir, "outputs/artifacts/.dflow/%s" % name), + "w") as f: + f.write(jsonpickle.dumps(metadata)) os.makedirs(os.path.join(stepdir, "../outputs/artifacts"), exist_ok=True) global_art_path = os.path.join( @@ -2326,6 +2388,9 @@ def get_var(expr, scope): return None # ignore elif fields == ["workflow", "name"]: return InputParameter(value=scope.workflow_id) + elif fields[:3] == ["workflow", "outputs", "artifacts"]: + return LocalArtifact("%s/../outputs/artifacts/%s" % ( + scope.stepdir, fields[3])) else: raise RuntimeError("Not supported: %s" % expr) diff --git a/src/dflow/utils.py b/src/dflow/utils.py index 42f58022..519aade3 100644 --- a/src/dflow/utils.py +++ b/src/dflow/utils.py @@ -87,7 +87,15 @@ def download_artifact( skip_exists: skip files with the same MD5 """ if getattr(artifact, "local_path", None) is not None: - if config["debug_copy_method"] == "symlink": + if os.path.isfile(artifact.local_path): + target = os.path.join(path, os.path.basename(artifact.local_path)) + if config["debug_copy_method"] == "symlink": + force_link(artifact.local_path, target) + elif config["debug_copy_method"] == "link": + try_link(artifact.local_path, target) + elif config["debug_copy_method"] == "copy": + shutil.copy(artifact.local_path, target) + elif config["debug_copy_method"] == "symlink": linktree(artifact.local_path, path) elif config["debug_copy_method"] == "link": merge_dir(artifact.local_path, path, try_link) diff --git a/src/dflow/workflow.py b/src/dflow/workflow.py index c4d3018c..c36ce021 100644 --- a/src/dflow/workflow.py +++ b/src/dflow/workflow.py @@ -269,6 +269,8 @@ def submit( os.makedirs(os.path.join(stepdir, io, "parameters"), exist_ok=True) for name, par in step[io].parameters.items(): + if hasattr(par, "save_as_artifact"): + continue with open(os.path.join(stepdir, io, "parameters", name), "w") as f: value = par.recover()["value"] @@ -276,15 +278,28 @@ def submit( f.write(value) else: f.write(jsonpickle.dumps(value)) - if par.type is not None: + if par.type is not None or hasattr( + par, "globalName"): + metadata = {"type": type_to_str(par.type)} + if hasattr(par, "globalName"): + metadata["globalName"] = par.globalName os.makedirs(os.path.join( stepdir, io, "parameters/.dflow"), exist_ok=True) with open(os.path.join( stepdir, io, "parameters/.dflow", name), "w") as f: - f.write(jsonpickle.dumps({ - "type": type_to_str(par.type)})) + f.write(jsonpickle.dumps(metadata)) + if hasattr(par, "globalName"): + os.makedirs(os.path.join( + wfdir, io, "parameters"), exist_ok=True) + global_par_path = os.path.join( + wfdir, io, "parameters", par.globalName) + if os.path.exists(global_par_path): + os.remove(global_par_path) + os.symlink(os.path.join( + stepdir, io, "parameters", name), + global_par_path) os.makedirs(os.path.join(stepdir, io, "artifacts"), exist_ok=True) @@ -321,6 +336,24 @@ def submit( else: os.symlink(art.local_path, os.path.join( stepdir, io, "artifacts", name)) + if hasattr(art, "globalName"): + metadata = {"globalName": art.globalName} + os.makedirs(os.path.join( + stepdir, io, "artifacts", ".dflow"), + exist_ok=True) + with open(os.path.join( + stepdir, io, "artifacts", ".dflow", + name), "w") as f: + f.write(jsonpickle.dumps(metadata)) + os.makedirs(os.path.join( + wfdir, io, "artifacts"), exist_ok=True) + global_art_path = os.path.join( + wfdir, io, "artifacts", art.globalName) + if os.path.exists(global_art_path): + os.remove(global_art_path) + os.symlink(os.path.join( + stepdir, io, "artifacts", name), + global_art_path) cwd = os.getcwd() os.chdir(wfdir) @@ -1117,25 +1150,35 @@ def query_step( with open(os.path.join(stepdir, io, "parameters", p), "r") as f: val = f.read() - _type = None + metadata = {"type": None} if os.path.exists(os.path.join( stepdir, io, "parameters/.dflow", p)): with open(os.path.join( stepdir, io, "parameters/.dflow", p), "r") as f: - _type = json.load(f)["type"] + metadata.update(json.load(f)) # for backward compatible - if _type not in ["str", str(str)]: + if metadata["type"] not in ["str", str(str)]: val = jsonpickle.loads(val) step[io]["parameters"].append({ - "name": p, "value": val, "type": _type}) + "name": p, "value": val, **metadata}) if os.path.exists(os.path.join(stepdir, io, "artifacts")): for a in os.listdir(os.path.join(stepdir, io, "artifacts")): + if a == ".dflow": + continue + metadata = {} + if os.path.exists(os.path.join( + stepdir, io, "artifacts/.dflow", a)): + with open(os.path.join( + stepdir, io, "artifacts/.dflow", a), + "r") as f: + metadata = json.load(f) step[io]["artifacts"].append({ "name": a, "local_path": os.path.abspath(os.path.join( stepdir, io, "artifacts", a)), + **metadata, }) step = ArgoStep(step, self.id) step_list.append(step) @@ -1246,6 +1289,8 @@ def query_global_outputs(self) -> ArgoWorkflow: arts = os.path.join(wfdir, "outputs", "artifacts") if os.path.exists(arts): for a in os.listdir(arts): + if a == ".dflow": + continue outputs["artifacts"].append({ "name": a, "local_path": os.path.abspath(os.path.join(arts, a))})