Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/dflow/argo_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import logging
import os
import shutil
import tempfile
import time
from collections import UserDict, UserList
Expand Down Expand Up @@ -166,6 +167,11 @@ def modify_output_parameter(
path = tmpdir + "/" + name
with open(path, "w") as f:
f.write(jsonpickle.dumps(value))
if config["mode"] == "debug":
path = upload_s3(path, debug_func=shutil.copy)
self.outputs.artifacts[
"dflow_bigpar_" + name].local_path = path
return
key = upload_s3(path)
s3 = S3Artifact(key=key)
if s3_config["repo_type"] == "s3":
Expand Down
4 changes: 0 additions & 4 deletions src/dflow/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,6 @@ def __init__(
if "value" in kwargs:
self.value = kwargs["value"]
self.save_as_artifact = save_as_artifact
if config["mode"] == "debug":
self.save_as_artifact = False
self.path = path
self.source = source
for k, v in kwargs.items():
Expand Down Expand Up @@ -846,8 +844,6 @@ def __init__(
self.value_from_expression = value_from_expression
self.save_as_artifact = save_as_artifact
self.save_both = save_both
if config["mode"] == "debug":
self.save_as_artifact = False
if "default" in kwargs:
self.default = kwargs["default"]
if "value" in kwargs:
Expand Down
6 changes: 2 additions & 4 deletions src/dflow/python/op.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,7 @@ def __call__(self, *args, **op_in):
if hasattr(s, "default"):
kw["value"] = s.default
if isinstance(s, BigParameter):
kw["save_as_artifact"] = (
config["mode"] != "debug")
kw["save_as_artifact"] = True
else:
kw["type"] = s
dag.inputs.parameters[n] = InputParameter(**kw)
Expand All @@ -330,8 +329,7 @@ def __call__(self, *args, **op_in):
if hasattr(s, "default"):
kw["default"] = s.default
if isinstance(s, BigParameter):
kw["save_as_artifact"] = (
config["mode"] != "debug")
kw["save_as_artifact"] = True
else:
kw["type"] = s
kw["value_from_parameter"] = outputs.get(n)
Expand Down
9 changes: 4 additions & 5 deletions src/dflow/python/python_op_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,12 @@ def __init__(self,
elif isinstance(sign, BigParameter):
if hasattr(sign, "default"):
self.inputs.parameters[name] = InputParameter(
save_as_artifact=config["mode"] != "debug",
save_as_artifact=True,
path="%s/inputs/parameters/" % self.tmp_root + name,
type=sign.type, value=sign.default)
else:
self.inputs.parameters[name] = InputParameter(
save_as_artifact=config["mode"] != "debug",
save_as_artifact=True,
path="%s/inputs/parameters/" % self.tmp_root + name,
type=sign.type)
elif isinstance(sign, Parameter):
Expand Down Expand Up @@ -303,7 +303,7 @@ def __init__(self,
% (self.tmp_root, name), default="")
elif isinstance(sign, BigParameter):
self.outputs.parameters[name] = OutputParameter(
save_as_artifact=config["mode"] != "debug",
save_as_artifact=True,
value_from_path="%s/outputs/parameters/" % self.tmp_root
+ name, type=sign.type)
elif isinstance(sign, Parameter):
Expand Down Expand Up @@ -582,8 +582,7 @@ def render_script(self):
if self.skip_slice_input and slices is not None:
slices = "(None if {{inputs.parameters.dflow_skip_slice_"\
"input}} else %s)" % slices
if isinstance(sign, BigParameter) and \
config["mode"] != "debug":
if isinstance(sign, BigParameter):
script += " input['%s'] = handle_input_parameter('%s',"\
" '', input_sign['%s'], %s, r'%s')\n" \
% (name, name, name, slices, self.tmp_root)
Expand Down
4 changes: 2 additions & 2 deletions src/dflow/python/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def handle_input_parameter(name, value, sign, slices=None, data_root="/tmp"):
for item in jsonpickle.loads(value):
dflow_list += jsonpickle.loads(item)
obj = convert_dflow_list(dflow_list)
elif isinstance(sign, BigParameter) and config["mode"] != "debug":
elif isinstance(sign, BigParameter):
with open(data_root + "/inputs/parameters/" + name, "r") as f:
content = jsonpickle.loads(f.read())
obj = content
Expand Down Expand Up @@ -330,7 +330,7 @@ def handle_output_parameter(name, value, sign, slices=None, data_root="/tmp"):
res = [{"dflow_list_item": value, "order": slices}]
with open(data_root + '/outputs/parameters/' + name, 'w') as f:
f.write(jsonpickle.dumps(res))
elif isinstance(sign, BigParameter) and config["mode"] != "debug":
elif isinstance(sign, BigParameter):
with open(data_root + "/outputs/parameters/" + name, "w") as f:
f.write(jsonpickle.dumps(value))
else:
Expand Down
69 changes: 67 additions & 2 deletions src/dflow/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import sys
import tarfile
import time
import uuid
from copy import copy, deepcopy
from typing import Any, Dict, List, Optional, Union

Expand Down Expand Up @@ -1486,6 +1487,55 @@ def run(self, scope, context=None, order=None):
self.phase = "Skipped"
return

for name, par in list(self.inputs.parameters.items()):
if par.save_as_artifact:
if hasattr(par, "value"):
path = os.path.abspath(
os.path.join("..", config["debug_artifact_dir"],
"upload/%s/%s" % (uuid.uuid4(), name)))
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
f.write(jsonpickle.dumps(par.value))
self.inputs.artifacts["dflow_bigpar_" + name] = \
InputArtifact(path=par.path, source=S3Artifact(path))
elif par.source is not None:
self.inputs.artifacts["dflow_bigpar_" + name] = \
InputArtifact(path=par.path,
source=get_var(par.source, scope))
if "dflow_bigpar_" + name not in \
self.template.inputs.artifacts:
self.template.inputs.artifacts["dflow_bigpar_" + name] = \
InputArtifact(path=par.path)
if name in self.template.inputs.parameters:
del self.template.inputs.parameters[name]
del self.inputs.parameters[name]

for name, par in list(self.outputs.parameters.items()):
if par.save_as_artifact:
if name in self.template.outputs.parameters:
par = self.template.outputs.parameters[name]
kwargs = {
"global_name": "dflow_bigpar_" + par.global_name
if par.global_name is not None else None
}
if par.value_from_path is not None:
art = OutputArtifact(
path=par.value_from_path, **kwargs)
elif par.value_from_parameter is not None:
art = OutputArtifact(
_from=str(par.value_from_parameter), **kwargs)
elif par.value_from_expression is not None:
art = OutputArtifact(from_expression=str(
par.value_from_expression), **kwargs)
self.template.outputs.artifacts[
"dflow_bigpar_" + name] = art
if not par.save_both:
del self.template.outputs.parameters[name]
self.outputs.artifacts["dflow_bigpar_" + name] = deepcopy(
self.template.outputs.artifacts["dflow_bigpar_" + name])
if not par.save_both:
del self.outputs.parameters[name]

# source input parameters
parameters = InputParameters({k: copy(v) for k, v in
self.inputs.parameters.items()})
Expand Down Expand Up @@ -1783,13 +1833,18 @@ def record_output_parameters(self, stepdir, parameters):
value = jsonpickle.dumps(par.value)
with open(par_path, "w") as f:
f.write(value)
if par.type is not None:
if par.type is not None or par.global_name is not None:
metadata = {}
if par.type is not None:
metadata["type"] = type_to_str(par.type)
if par.global_name is not None:
metadata["globalName"] = par.global_name
os.makedirs(os.path.join(
stepdir, "outputs/parameters/.dflow"), exist_ok=True)
with open(os.path.join(
stepdir, "outputs/parameters/.dflow/%s" % name),
"w") as f:
f.write(jsonpickle.dumps({"type": type_to_str(par.type)}))
f.write(jsonpickle.dumps(metadata))
if par.global_name is not None:
os.makedirs(os.path.join(stepdir, "../outputs/parameters"),
exist_ok=True)
Expand All @@ -1807,6 +1862,13 @@ def record_output_artifacts(self, stepdir, artifacts):
art_path = os.path.join(stepdir, "outputs/artifacts/%s" % name)
force_link(art.local_path, art_path)
if art.global_name is not None:
metadata = {"globalName": art.global_name}
os.makedirs(os.path.join(
stepdir, "outputs/artifacts/.dflow"), exist_ok=True)
with open(os.path.join(
stepdir, "outputs/artifacts/.dflow/%s" % name),
"w") as f:
f.write(jsonpickle.dumps(metadata))
os.makedirs(os.path.join(stepdir, "../outputs/artifacts"),
exist_ok=True)
global_art_path = os.path.join(
Expand Down Expand Up @@ -2326,6 +2388,9 @@ def get_var(expr, scope):
return None # ignore
elif fields == ["workflow", "name"]:
return InputParameter(value=scope.workflow_id)
elif fields[:3] == ["workflow", "outputs", "artifacts"]:
return LocalArtifact("%s/../outputs/artifacts/%s" % (
scope.stepdir, fields[3]))
else:
raise RuntimeError("Not supported: %s" % expr)

Expand Down
10 changes: 9 additions & 1 deletion src/dflow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,15 @@ def download_artifact(
skip_exists: skip files with the same MD5
"""
if getattr(artifact, "local_path", None) is not None:
if config["debug_copy_method"] == "symlink":
if os.path.isfile(artifact.local_path):
target = os.path.join(path, os.path.basename(artifact.local_path))
if config["debug_copy_method"] == "symlink":
force_link(artifact.local_path, target)
elif config["debug_copy_method"] == "link":
try_link(artifact.local_path, target)
elif config["debug_copy_method"] == "copy":
shutil.copy(artifact.local_path, target)
elif config["debug_copy_method"] == "symlink":
linktree(artifact.local_path, path)
elif config["debug_copy_method"] == "link":
merge_dir(artifact.local_path, path, try_link)
Expand Down
59 changes: 52 additions & 7 deletions src/dflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,22 +269,37 @@ def submit(
os.makedirs(os.path.join(stepdir, io, "parameters"),
exist_ok=True)
for name, par in step[io].parameters.items():
if hasattr(par, "save_as_artifact"):
continue
with open(os.path.join(stepdir, io, "parameters",
name), "w") as f:
value = par.recover()["value"]
if isinstance(value, str):
f.write(value)
else:
f.write(jsonpickle.dumps(value))
if par.type is not None:
if par.type is not None or hasattr(
par, "globalName"):
metadata = {"type": type_to_str(par.type)}
if hasattr(par, "globalName"):
metadata["globalName"] = par.globalName
os.makedirs(os.path.join(
stepdir, io, "parameters/.dflow"),
exist_ok=True)
with open(os.path.join(
stepdir, io, "parameters/.dflow",
name), "w") as f:
f.write(jsonpickle.dumps({
"type": type_to_str(par.type)}))
f.write(jsonpickle.dumps(metadata))
if hasattr(par, "globalName"):
os.makedirs(os.path.join(
wfdir, io, "parameters"), exist_ok=True)
global_par_path = os.path.join(
wfdir, io, "parameters", par.globalName)
if os.path.exists(global_par_path):
os.remove(global_par_path)
os.symlink(os.path.join(
stepdir, io, "parameters", name),
global_par_path)

os.makedirs(os.path.join(stepdir, io, "artifacts"),
exist_ok=True)
Expand Down Expand Up @@ -321,6 +336,24 @@ def submit(
else:
os.symlink(art.local_path, os.path.join(
stepdir, io, "artifacts", name))
if hasattr(art, "globalName"):
metadata = {"globalName": art.globalName}
os.makedirs(os.path.join(
stepdir, io, "artifacts", ".dflow"),
exist_ok=True)
with open(os.path.join(
stepdir, io, "artifacts", ".dflow",
name), "w") as f:
f.write(jsonpickle.dumps(metadata))
os.makedirs(os.path.join(
wfdir, io, "artifacts"), exist_ok=True)
global_art_path = os.path.join(
wfdir, io, "artifacts", art.globalName)
if os.path.exists(global_art_path):
os.remove(global_art_path)
os.symlink(os.path.join(
stepdir, io, "artifacts", name),
global_art_path)

cwd = os.getcwd()
os.chdir(wfdir)
Expand Down Expand Up @@ -1117,25 +1150,35 @@ def query_step(
with open(os.path.join(stepdir, io, "parameters",
p), "r") as f:
val = f.read()
_type = None
metadata = {"type": None}
if os.path.exists(os.path.join(
stepdir, io, "parameters/.dflow", p)):
with open(os.path.join(
stepdir, io, "parameters/.dflow", p),
"r") as f:
_type = json.load(f)["type"]
metadata.update(json.load(f))
# for backward compatible
if _type not in ["str", str(str)]:
if metadata["type"] not in ["str", str(str)]:
val = jsonpickle.loads(val)
step[io]["parameters"].append({
"name": p, "value": val, "type": _type})
"name": p, "value": val, **metadata})
if os.path.exists(os.path.join(stepdir, io, "artifacts")):
for a in os.listdir(os.path.join(stepdir, io,
"artifacts")):
if a == ".dflow":
continue
metadata = {}
if os.path.exists(os.path.join(
stepdir, io, "artifacts/.dflow", a)):
with open(os.path.join(
stepdir, io, "artifacts/.dflow", a),
"r") as f:
metadata = json.load(f)
step[io]["artifacts"].append({
"name": a,
"local_path": os.path.abspath(os.path.join(
stepdir, io, "artifacts", a)),
**metadata,
})
step = ArgoStep(step, self.id)
step_list.append(step)
Expand Down Expand Up @@ -1246,6 +1289,8 @@ def query_global_outputs(self) -> ArgoWorkflow:
arts = os.path.join(wfdir, "outputs", "artifacts")
if os.path.exists(arts):
for a in os.listdir(arts):
if a == ".dflow":
continue
outputs["artifacts"].append({
"name": a,
"local_path": os.path.abspath(os.path.join(arts, a))})
Expand Down