Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
from beeflow.common.cli import NaturalOrderGroup
from beeflow.common.connection import Connection
from beeflow.common import paths
from beeflow.common.parser import CwlParser # pylint: disable=C0415 # Costly import
from beeflow.common.parser.parser import CwlParseError
from beeflow.common.object_models import generate_workflow_id
from beeflow.client import remote_client
from beeflow.wf_manager.models import (
Expand Down Expand Up @@ -364,12 +366,9 @@ def is_parent(parent, path):
wf_path = wf_path.resolve()
workdir = workdir.resolve()

# Make sure the workdir is an absolute path and exists
# Make sure the workdir is an absolute path
workdir = os.path.expanduser(workdir)
workdir = os.path.abspath(workdir)
if not os.path.exists(workdir):
error_exit(f'Workflow working directory "{workdir}" doesn\'t exist')

# Make sure the workdir is not in /var or /var/tmp
if os.path.commonpath([os.path.realpath("/tmp"), workdir]) == os.path.realpath(
"/tmp"
Expand All @@ -380,6 +379,8 @@ def is_parent(parent, path):
):
error_exit('Workflow working directory cannot be in "/var/tmp"')

# Create workdir if it does not exist
os.makedirs(workdir, exist_ok=True)
tarball_path = ""
workflow = None
encoded_tarball = None
Expand Down Expand Up @@ -420,12 +421,17 @@ def is_parent(parent, path):
main_cwl_path = untar_wf_path / pathlib.Path(main_cwl).name
yaml_path = untar_wf_path / pathlib.Path(yaml_file).name

from beeflow.common.parser import CwlParser # pylint: disable=C0415 # Costly import
parser = CwlParser()
workflow_id = generate_workflow_id()
workflow, tasks = parser.parse_workflow(
workflow_id, str(main_cwl_path), job=str(yaml_path), workdir=workdir
)
try:
workflow, tasks = parser.parse_workflow(
workflow_id, str(main_cwl_path), job=str(yaml_path), workdir=workdir
)

except CwlParseError as err:
print(f"{type(err).__name__}: {err}")
return None, []

with open(package_path, "rb") as f:
encoded_tarball = base64.b64encode(f.read()).decode("utf-8")
shutil.rmtree(untar_path)
Expand Down Expand Up @@ -772,6 +778,7 @@ def reexecute(
# Make sure the workdir is an absolute path and exists
workdir = os.path.expanduser(workdir)
workdir = os.path.abspath(workdir)

if not os.path.exists(workdir):
error_exit(f'Workflow working directory "{workdir}" doesn\'t exist')
cwl_path = pathlib.Path(tempfile.mkdtemp())
Expand Down
48 changes: 33 additions & 15 deletions beeflow/common/parser/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ def resolve_input(input_, type_):
)
for output in self.cwl.outputs
]
workflow_hints = self.parse_requirements(self.cwl.hints, as_hints=True)
workflow_requirements = self.parse_requirements(self.cwl.requirements)
workflow_hints = self.parse_requirements(self.cwl.hints, workdir, as_hints=True)
workflow_requirements = self.parse_requirements(self.cwl.requirements, workdir)

workflow = Workflow(
name=workflow_name,
Expand Down Expand Up @@ -179,10 +179,10 @@ def parse_step(self, step, workflow_id, workdir):
step_name = os.path.basename(step_id).split(".")[0]
step_command = step_cwl.baseCommand
step_inputs = self.parse_step_inputs(step.in_, step_cwl.inputs)
step_requirements = self.parse_requirements(step.requirements)
step_requirements.extend(self.parse_requirements(step_cwl.requirements))
step_hints = self.parse_requirements(step.hints, as_hints=True)
step_hints.extend(self.parse_requirements(step_cwl.hints, as_hints=True))
step_requirements = self.parse_requirements(step.requirements, workdir)
step_requirements.extend(self.parse_requirements(step_cwl.requirements, workdir))
step_hints = self.parse_requirements(step.hints, workdir, as_hints=True)
step_hints.extend(self.parse_requirements(step_cwl.hints, workdir, as_hints=True))
step_workdir = resolve_step_workdir(
get_requirement(
step_requirements,
Expand Down Expand Up @@ -377,16 +377,30 @@ def parse_step_outputs(cwl_out, step_outputs, stdout, stderr, workdir=None):
outputs.append(StepOutput(id=out, type=output_type, value=None, glob=glob))
return outputs

def _read_requirement_file(self, key, items):
def _read_requirement_file(self, key, items, workdir, stepdir=None):
"""Read in a requirement file and replace it in the parsed items."""
base_path = os.path.dirname(self.path)
# pre and post scripts are relative to the step workdir
# if step workdir is not not absolute then use workdir/stepdir
if key in {"pre_script", "post_script"}:
if stepdir is None:
base_path = workdir
else:
base_path = stepdir
else:
base_path = os.path.dirname(self.path)
fname = items[key]
path = os.path.join(base_path, fname)
fpath = Path(fname)
if fname.startswith("~"):
fpath = fpath.expanduser()
if fpath.is_absolute():
path = fpath
else:
path = os.path.join(base_path, fpath)
try:
with open(path, encoding="utf-8") as fp:
items[key] = fp.read()
except FileNotFoundError:
msg = f"Could not find a file for {key}: {fname}"
msg = f"Could not find a file for {key}: {path}"
raise CwlParseError(msg) from None
if key in {"pre_script", "post_script"}:
self._validate_prepost_shell_env(key, items, fname)
Expand Down Expand Up @@ -418,7 +432,7 @@ def _validate_prepost_shell_env(self, key, items, fname):
rm_line = "\n".join(rm_line)
items.update({key: rm_line})

def parse_requirements(self, requirements, as_hints=False):
def parse_requirements(self, requirements, workdir, as_hints=False):
"""Parse CWL hints/requirements.

:param requirements: the CWL requirements
Expand All @@ -431,6 +445,10 @@ def parse_requirements(self, requirements, as_hints=False):
if not requirements:
return reqs
if as_hints:
# The pre and post scripts are relative to the working directory for the step
stepdir = next((req["workdir"] for req in requirements if "workdir" in req), None)
if stepdir:
stepdir = os.path.expanduser(stepdir)
for req in requirements:
items = {}
for k, v in req.items():
Expand All @@ -441,24 +459,24 @@ def parse_requirements(self, requirements, as_hints=False):
items[k] = str(v)
# Load in the dockerfile at parse time
if "dockerFile" in items:
self._read_requirement_file("dockerFile", items)
self._read_requirement_file("dockerFile", items, workdir)
# Load in pre/post scripts and make sure shell option is defined in cwl file
if "pre_script" in items and items["enabled"]:
if "shell" in items:
self._read_requirement_file("pre_script", items)
self._read_requirement_file("pre_script", items, workdir, stepdir=stepdir)
else:
msg = (
"pre script enabled but shell option undefined in cwl file."
)
raise CwlParseError(msg) from None
if "post_script" in items and items["enabled"]:
if "shell" in items:
self._read_requirement_file("post_script", items)
self._read_requirement_file("post_script", items, workdir, stepdir=stepdir)
else:
msg = "post script enabled but shell option undefined in cwl file."
raise CwlParseError(msg) from None
if "beeflow:bindMounts" in items:
self._read_requirement_file("beeflow:bindMounts", items)
self._read_requirement_file("beeflow:bindMounts", items, workdir)
reqs.append(Hint(class_=req["class"], params=items))
else:
for req in requirements:
Expand Down
19 changes: 11 additions & 8 deletions beeflow/tests/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ def test_parse_workflow_yaml(self):

def test_parse_workflow_script(self):
"""Test parsing of workflow with a YAML input job file."""
cwl_wf_file = find("beeflow/data/cwl/bee_workflows/clamr-ffmpeg-build_script/clamr_wf.cwl") # pylint: disable=C0301
cwl_job_yaml = find("beeflow/data/cwl/bee_workflows/clamr-ffmpeg-build_script/clamr_job.yml") # pylint: disable=C0301
cwl_wf_file = find("ci/test_workflows/pre-post-script/workflow.cwl")
cwl_job_yaml = find("ci/test_workflows/pre-post-script/input.yml")

workflow_id = generate_workflow_id()

Expand All @@ -66,8 +66,8 @@ def test_parse_workflow_script(self):

def test_parse_workflow_validate_script(self):
"""Test parsing of workflow and validate pre/post script files."""
cwl_wf_file = find("beeflow/data/cwl/bee_workflows/clamr-ffmpeg-validate_script/clamr_wf.cwl") # pylint: disable=C0301
cwl_job_yaml = find("beeflow/data/cwl/bee_workflows/clamr-ffmpeg-validate_script/clamr_job.yml") # pylint: disable=C0301
cwl_wf_file = find("ci/test_workflows/pre-post-script-validate/workflow.cwl")
cwl_job_yaml = find("ci/test_workflows/pre-post-script-validate/input.yml")

workflow_id = generate_workflow_id()

Expand All @@ -77,7 +77,7 @@ def test_parse_workflow_validate_script(self):
self.assertEqual(context.exception.args[0], "No shebang line found in pre_run.sh")

def test_parse_workflow_validate_shell(self):
"""Test parsing of workflow and check shell option matches pre/post script shebang line."""
"""Test parsing of workflow and check shell option pre/post script shebang line."""
cwl_wf_file = find("ci/test_workflows/shell_validate/workflow.cwl")
cwl_job_yaml = find("ci/test_workflows/shell_validate/input.yml")

Expand All @@ -86,7 +86,10 @@ def test_parse_workflow_validate_shell(self):
with self.assertRaises(Exception) as context:
self.parser.parse_workflow(workflow_id, cwl_wf_file, cwl_job_yaml)

self.assertEqual(context.exception.args[0], "CWL file shell #!/bin/bash does not match post.sh shell #!/bin/bashoo") # pylint: disable=C0301
self.assertEqual(
context.exception.args[0],
"CWL file shell #!/bin/bash does not match post.sh shell #!/bin/bash",
)

def test_parse_workflow_json(self):
"""Test parsing of workflow with a JSON input job file."""
Expand Down Expand Up @@ -439,10 +442,10 @@ def test_parse_workflow_missing_input(self):
),
],
)
def test_parse_requirements_hints(requirements, exp_reqs):
def test_parse_requirements_hints(requirements, workdir, exp_reqs):
"""Regression test parse_requirements when as_hints=True."""
parser = CwlParser()
reqs = parser.parse_requirements(requirements, as_hints=True)
reqs = parser.parse_requirements(requirements, workdir, as_hints=True)
assert reqs == exp_reqs


Expand Down
1 change: 1 addition & 0 deletions ci/test_workflows/pre-post-script-validate/input.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sleep_time: 1
3 changes: 3 additions & 0 deletions ci/test_workflows/pre-post-script-validate/post.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
touch post.txt


1 change: 1 addition & 0 deletions ci/test_workflows/pre-post-script-validate/pre.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
touch pre.txt
34 changes: 34 additions & 0 deletions ci/test_workflows/pre-post-script-validate/workflow.cwl
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
class: Workflow
cwlVersion: v1.0

inputs:
sleep_time: int

outputs:
step0_stdout:
type: File
outputSource: step0/step0_stdout

steps:
step0:
run:
class: CommandLineTool
baseCommand: sleep
stdout: step0_stdout.txt
inputs:
sleep_time:
type: int
inputBinding:
position: 0
outputs:
step0_stdout:
type: stdout
in:
sleep_time: sleep_time
out: [step0_stdout]
hints:
beeflow:ScriptRequirement:
enabled: true
pre_script: "ci/test_workflows/pre-post-script/pre.sh"
post_script: "ci/test_workflows/pre-post-script/post.sh"
shell: "/bin/bash"
3 changes: 2 additions & 1 deletion ci/test_workflows/pre-post-script/post.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#!/bin/bash

touch post.txt


4 changes: 2 additions & 2 deletions ci/test_workflows/pre-post-script/workflow.cwl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ steps:
hints:
beeflow:ScriptRequirement:
enabled: true
pre_script: "pre.sh"
post_script: "post.sh"
pre_script: "ci/test_workflows/pre-post-script/pre.sh"
post_script: "ci/test_workflows/pre-post-script/post.sh"
shell: "/bin/bash"
4 changes: 2 additions & 2 deletions ci/test_workflows/shell_validate/workflow.cwl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ steps:
hints:
beeflow:ScriptRequirement:
enabled: true
pre_script: "pre.sh"
post_script: "post.sh"
pre_script: "ci/test_workflows/pre-post-script/pre.sh"
post_script: "ci/test_workflows/pre-post-script/post.sh"
shell: "/bin/bash"
13 changes: 7 additions & 6 deletions docs/sphinx/bee_cwl.rst
Original file line number Diff line number Diff line change
Expand Up @@ -267,20 +267,21 @@ Some tasks may require small additional commands for setup or teardown such as
loading modules, setting up checkpointing files, perserving outputs of subtasks
that have been restarted, or cleaning up after a run.
The script requirement enables this by adding shell scripts that will run before
and after a task. The script must be within the workflow directory. The desired
and after a task. The desired
shell interpreter must be specified in both the ``beeflow:ScriptRequirement`` section
of the cwl file as well as the shebang line of the script, otherwise, an error will be
returned. Furthermore, if different shell interpreters are specified, then expect
an error. Default shell environment variable is ``/bin/bash``. The pre_script is run
before a task and the post_script is run after. Currently, we only support running
scripts outside of a container. We are considering adding container support in the
an error. Default shell environment variable is ``/bin/bash``. The pre_script is run
before a task and the post_script is run after. Currently, we only support running
scripts outside of a container. We are considering adding container support in the
future.
The scripts are assumed to be relative to the task working directory unless specified with an absolute path.

ScriptRequirement currently supports the following options:

* ``enabled`` - Enables pre/post script support
* ``pre_script`` - Path to the pre_script relative to the workflow directory.
* ``post_script`` - Path to the post_script relative to the workflow directory.
* ``pre_script`` - Path to the pre_script relative to the step working directory or specified absolute path.
* ``post_script`` - Path to the post_script relative to the step working directory or specified absolute path.
* ``shell`` - Desired shell interpreter. Must match shell interpreter defined in pre/post scripts.

An example ``beeflow:ScriptRequirement`` is shown below::
Expand Down
9 changes: 5 additions & 4 deletions docs/sphinx/development.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@ Additional Poetry documentation:
Requirement: Python version 3.11 to 3.14
------------------------------------------

Installation Using a Python Environment
Installation Using a Python Environment
---------------------------------------
To install Poetry using a python environment, you must first set up the environment using the following
commands (Please note that the name of the environment is not limited to beedev-env).
commands (Please note that the name of the environment is not limited to beedev-env).

Please note the following instructions are intended for the Bash shell. If your default shell is Zsh,
the instructions may be slightly different.
the instructions may be slightly different.

.. code-block::

mkdir beedev-env
python3 -m venv beedev-env
source beedev-env/bin/activate
source beedev-env/bin/activate
pip install --upgrade pip
pip install poetry

You can make sure Poetry is installed by using the following command: ``poetry --version``.
Expand Down
Loading