diff --git a/beeflow/client/bee_client.py b/beeflow/client/bee_client.py index ed86b343b..14733dfd0 100644 --- a/beeflow/client/bee_client.py +++ b/beeflow/client/bee_client.py @@ -33,6 +33,8 @@ from beeflow.common.cli import NaturalOrderGroup from beeflow.common.connection import Connection from beeflow.common import paths +from beeflow.common.parser import CwlParser # pylint: disable=C0415 # Costly import +from beeflow.common.parser.parser import CwlParseError from beeflow.common.object_models import generate_workflow_id from beeflow.client import remote_client from beeflow.wf_manager.models import ( @@ -364,12 +366,9 @@ def is_parent(parent, path): wf_path = wf_path.resolve() workdir = workdir.resolve() - # Make sure the workdir is an absolute path and exists + # Make sure the workdir is an absolute path workdir = os.path.expanduser(workdir) workdir = os.path.abspath(workdir) - if not os.path.exists(workdir): - error_exit(f'Workflow working directory "{workdir}" doesn\'t exist') - # Make sure the workdir is not in /var or /var/tmp if os.path.commonpath([os.path.realpath("/tmp"), workdir]) == os.path.realpath( "/tmp" @@ -380,6 +379,8 @@ def is_parent(parent, path): ): error_exit('Workflow working directory cannot be in "/var/tmp"') + # Create workdir if it does not exist + os.makedirs(workdir, exist_ok=True) tarball_path = "" workflow = None encoded_tarball = None @@ -420,12 +421,17 @@ def is_parent(parent, path): main_cwl_path = untar_wf_path / pathlib.Path(main_cwl).name yaml_path = untar_wf_path / pathlib.Path(yaml_file).name - from beeflow.common.parser import CwlParser # pylint: disable=C0415 # Costly import parser = CwlParser() workflow_id = generate_workflow_id() - workflow, tasks = parser.parse_workflow( - workflow_id, str(main_cwl_path), job=str(yaml_path), workdir=workdir - ) + try: + workflow, tasks = parser.parse_workflow( + workflow_id, str(main_cwl_path), job=str(yaml_path), workdir=workdir + ) + + except CwlParseError as err: + print(f"{type(err).__name__}: {err}") + return None, [] + with open(package_path, "rb") as f: encoded_tarball = base64.b64encode(f.read()).decode("utf-8") shutil.rmtree(untar_path) @@ -772,6 +778,7 @@ def reexecute( # Make sure the workdir is an absolute path and exists workdir = os.path.expanduser(workdir) workdir = os.path.abspath(workdir) + if not os.path.exists(workdir): error_exit(f'Workflow working directory "{workdir}" doesn\'t exist') cwl_path = pathlib.Path(tempfile.mkdtemp()) diff --git a/beeflow/common/parser/parser.py b/beeflow/common/parser/parser.py index 9d0dae79d..d3fe9c84c 100644 --- a/beeflow/common/parser/parser.py +++ b/beeflow/common/parser/parser.py @@ -131,8 +131,8 @@ def resolve_input(input_, type_): ) for output in self.cwl.outputs ] - workflow_hints = self.parse_requirements(self.cwl.hints, as_hints=True) - workflow_requirements = self.parse_requirements(self.cwl.requirements) + workflow_hints = self.parse_requirements(self.cwl.hints, workdir, as_hints=True) + workflow_requirements = self.parse_requirements(self.cwl.requirements, workdir) workflow = Workflow( name=workflow_name, @@ -179,10 +179,10 @@ def parse_step(self, step, workflow_id, workdir): step_name = os.path.basename(step_id).split(".")[0] step_command = step_cwl.baseCommand step_inputs = self.parse_step_inputs(step.in_, step_cwl.inputs) - step_requirements = self.parse_requirements(step.requirements) - step_requirements.extend(self.parse_requirements(step_cwl.requirements)) - step_hints = self.parse_requirements(step.hints, as_hints=True) - step_hints.extend(self.parse_requirements(step_cwl.hints, as_hints=True)) + step_requirements = self.parse_requirements(step.requirements, workdir) + step_requirements.extend(self.parse_requirements(step_cwl.requirements, workdir)) + step_hints = self.parse_requirements(step.hints, workdir, as_hints=True) + step_hints.extend(self.parse_requirements(step_cwl.hints, workdir, as_hints=True)) step_workdir = resolve_step_workdir( get_requirement( step_requirements, @@ -377,16 +377,30 @@ def parse_step_outputs(cwl_out, step_outputs, stdout, stderr, workdir=None): outputs.append(StepOutput(id=out, type=output_type, value=None, glob=glob)) return outputs - def _read_requirement_file(self, key, items): + def _read_requirement_file(self, key, items, workdir, stepdir=None): """Read in a requirement file and replace it in the parsed items.""" - base_path = os.path.dirname(self.path) + # pre and post scripts are relative to the step workdir + # if step workdir is not not absolute then use workdir/stepdir + if key in {"pre_script", "post_script"}: + if stepdir is None: + base_path = workdir + else: + base_path = stepdir + else: + base_path = os.path.dirname(self.path) fname = items[key] - path = os.path.join(base_path, fname) + fpath = Path(fname) + if fname.startswith("~"): + fpath = fpath.expanduser() + if fpath.is_absolute(): + path = fpath + else: + path = os.path.join(base_path, fpath) try: with open(path, encoding="utf-8") as fp: items[key] = fp.read() except FileNotFoundError: - msg = f"Could not find a file for {key}: {fname}" + msg = f"Could not find a file for {key}: {path}" raise CwlParseError(msg) from None if key in {"pre_script", "post_script"}: self._validate_prepost_shell_env(key, items, fname) @@ -418,7 +432,7 @@ def _validate_prepost_shell_env(self, key, items, fname): rm_line = "\n".join(rm_line) items.update({key: rm_line}) - def parse_requirements(self, requirements, as_hints=False): + def parse_requirements(self, requirements, workdir, as_hints=False): """Parse CWL hints/requirements. :param requirements: the CWL requirements @@ -431,6 +445,10 @@ def parse_requirements(self, requirements, as_hints=False): if not requirements: return reqs if as_hints: + # The pre and post scripts are relative to the working directory for the step + stepdir = next((req["workdir"] for req in requirements if "workdir" in req), None) + if stepdir: + stepdir = os.path.expanduser(stepdir) for req in requirements: items = {} for k, v in req.items(): @@ -441,11 +459,11 @@ def parse_requirements(self, requirements, as_hints=False): items[k] = str(v) # Load in the dockerfile at parse time if "dockerFile" in items: - self._read_requirement_file("dockerFile", items) + self._read_requirement_file("dockerFile", items, workdir) # Load in pre/post scripts and make sure shell option is defined in cwl file if "pre_script" in items and items["enabled"]: if "shell" in items: - self._read_requirement_file("pre_script", items) + self._read_requirement_file("pre_script", items, workdir, stepdir=stepdir) else: msg = ( "pre script enabled but shell option undefined in cwl file." @@ -453,12 +471,12 @@ def parse_requirements(self, requirements, as_hints=False): raise CwlParseError(msg) from None if "post_script" in items and items["enabled"]: if "shell" in items: - self._read_requirement_file("post_script", items) + self._read_requirement_file("post_script", items, workdir, stepdir=stepdir) else: msg = "post script enabled but shell option undefined in cwl file." raise CwlParseError(msg) from None if "beeflow:bindMounts" in items: - self._read_requirement_file("beeflow:bindMounts", items) + self._read_requirement_file("beeflow:bindMounts", items, workdir) reqs.append(Hint(class_=req["class"], params=items)) else: for req in requirements: diff --git a/beeflow/tests/test_parser.py b/beeflow/tests/test_parser.py index e541d1028..591cd3919 100644 --- a/beeflow/tests/test_parser.py +++ b/beeflow/tests/test_parser.py @@ -52,8 +52,8 @@ def test_parse_workflow_yaml(self): def test_parse_workflow_script(self): """Test parsing of workflow with a YAML input job file.""" - cwl_wf_file = find("beeflow/data/cwl/bee_workflows/clamr-ffmpeg-build_script/clamr_wf.cwl") # pylint: disable=C0301 - cwl_job_yaml = find("beeflow/data/cwl/bee_workflows/clamr-ffmpeg-build_script/clamr_job.yml") # pylint: disable=C0301 + cwl_wf_file = find("ci/test_workflows/pre-post-script/workflow.cwl") + cwl_job_yaml = find("ci/test_workflows/pre-post-script/input.yml") workflow_id = generate_workflow_id() @@ -66,8 +66,8 @@ def test_parse_workflow_script(self): def test_parse_workflow_validate_script(self): """Test parsing of workflow and validate pre/post script files.""" - cwl_wf_file = find("beeflow/data/cwl/bee_workflows/clamr-ffmpeg-validate_script/clamr_wf.cwl") # pylint: disable=C0301 - cwl_job_yaml = find("beeflow/data/cwl/bee_workflows/clamr-ffmpeg-validate_script/clamr_job.yml") # pylint: disable=C0301 + cwl_wf_file = find("ci/test_workflows/pre-post-script-validate/workflow.cwl") + cwl_job_yaml = find("ci/test_workflows/pre-post-script-validate/input.yml") workflow_id = generate_workflow_id() @@ -77,7 +77,7 @@ def test_parse_workflow_validate_script(self): self.assertEqual(context.exception.args[0], "No shebang line found in pre_run.sh") def test_parse_workflow_validate_shell(self): - """Test parsing of workflow and check shell option matches pre/post script shebang line.""" + """Test parsing of workflow and check shell option pre/post script shebang line.""" cwl_wf_file = find("ci/test_workflows/shell_validate/workflow.cwl") cwl_job_yaml = find("ci/test_workflows/shell_validate/input.yml") @@ -86,7 +86,10 @@ def test_parse_workflow_validate_shell(self): with self.assertRaises(Exception) as context: self.parser.parse_workflow(workflow_id, cwl_wf_file, cwl_job_yaml) - self.assertEqual(context.exception.args[0], "CWL file shell #!/bin/bash does not match post.sh shell #!/bin/bashoo") # pylint: disable=C0301 + self.assertEqual( + context.exception.args[0], + "CWL file shell #!/bin/bash does not match post.sh shell #!/bin/bash", + ) def test_parse_workflow_json(self): """Test parsing of workflow with a JSON input job file.""" @@ -439,10 +442,10 @@ def test_parse_workflow_missing_input(self): ), ], ) -def test_parse_requirements_hints(requirements, exp_reqs): +def test_parse_requirements_hints(requirements, workdir, exp_reqs): """Regression test parse_requirements when as_hints=True.""" parser = CwlParser() - reqs = parser.parse_requirements(requirements, as_hints=True) + reqs = parser.parse_requirements(requirements, workdir, as_hints=True) assert reqs == exp_reqs diff --git a/ci/test_workflows/pre-post-script-validate/input.yml b/ci/test_workflows/pre-post-script-validate/input.yml new file mode 100644 index 000000000..eaf9dc0e3 --- /dev/null +++ b/ci/test_workflows/pre-post-script-validate/input.yml @@ -0,0 +1 @@ +sleep_time: 1 diff --git a/ci/test_workflows/pre-post-script-validate/post.sh b/ci/test_workflows/pre-post-script-validate/post.sh new file mode 100644 index 000000000..3322f2820 --- /dev/null +++ b/ci/test_workflows/pre-post-script-validate/post.sh @@ -0,0 +1,3 @@ +touch post.txt + + diff --git a/ci/test_workflows/pre-post-script-validate/pre.sh b/ci/test_workflows/pre-post-script-validate/pre.sh new file mode 100644 index 000000000..139e9084a --- /dev/null +++ b/ci/test_workflows/pre-post-script-validate/pre.sh @@ -0,0 +1 @@ +touch pre.txt diff --git a/ci/test_workflows/pre-post-script-validate/workflow.cwl b/ci/test_workflows/pre-post-script-validate/workflow.cwl new file mode 100644 index 000000000..392548ebb --- /dev/null +++ b/ci/test_workflows/pre-post-script-validate/workflow.cwl @@ -0,0 +1,34 @@ +class: Workflow +cwlVersion: v1.0 + +inputs: + sleep_time: int + +outputs: + step0_stdout: + type: File + outputSource: step0/step0_stdout + +steps: + step0: + run: + class: CommandLineTool + baseCommand: sleep + stdout: step0_stdout.txt + inputs: + sleep_time: + type: int + inputBinding: + position: 0 + outputs: + step0_stdout: + type: stdout + in: + sleep_time: sleep_time + out: [step0_stdout] + hints: + beeflow:ScriptRequirement: + enabled: true + pre_script: "ci/test_workflows/pre-post-script/pre.sh" + post_script: "ci/test_workflows/pre-post-script/post.sh" + shell: "/bin/bash" diff --git a/ci/test_workflows/pre-post-script/post.sh b/ci/test_workflows/pre-post-script/post.sh index 550a41cfb..4acc63abf 100644 --- a/ci/test_workflows/pre-post-script/post.sh +++ b/ci/test_workflows/pre-post-script/post.sh @@ -1,3 +1,4 @@ #!/bin/bash - touch post.txt + + diff --git a/ci/test_workflows/pre-post-script/workflow.cwl b/ci/test_workflows/pre-post-script/workflow.cwl index e1661a9c7..392548ebb 100644 --- a/ci/test_workflows/pre-post-script/workflow.cwl +++ b/ci/test_workflows/pre-post-script/workflow.cwl @@ -29,6 +29,6 @@ steps: hints: beeflow:ScriptRequirement: enabled: true - pre_script: "pre.sh" - post_script: "post.sh" + pre_script: "ci/test_workflows/pre-post-script/pre.sh" + post_script: "ci/test_workflows/pre-post-script/post.sh" shell: "/bin/bash" diff --git a/ci/test_workflows/shell_validate/workflow.cwl b/ci/test_workflows/shell_validate/workflow.cwl index e1661a9c7..392548ebb 100644 --- a/ci/test_workflows/shell_validate/workflow.cwl +++ b/ci/test_workflows/shell_validate/workflow.cwl @@ -29,6 +29,6 @@ steps: hints: beeflow:ScriptRequirement: enabled: true - pre_script: "pre.sh" - post_script: "post.sh" + pre_script: "ci/test_workflows/pre-post-script/pre.sh" + post_script: "ci/test_workflows/pre-post-script/post.sh" shell: "/bin/bash" diff --git a/docs/sphinx/bee_cwl.rst b/docs/sphinx/bee_cwl.rst index 445e7e222..27f719dbd 100644 --- a/docs/sphinx/bee_cwl.rst +++ b/docs/sphinx/bee_cwl.rst @@ -267,20 +267,21 @@ Some tasks may require small additional commands for setup or teardown such as loading modules, setting up checkpointing files, perserving outputs of subtasks that have been restarted, or cleaning up after a run. The script requirement enables this by adding shell scripts that will run before -and after a task. The script must be within the workflow directory. The desired +and after a task. The desired shell interpreter must be specified in both the ``beeflow:ScriptRequirement`` section of the cwl file as well as the shebang line of the script, otherwise, an error will be returned. Furthermore, if different shell interpreters are specified, then expect -an error. Default shell environment variable is ``/bin/bash``. The pre_script is run -before a task and the post_script is run after. Currently, we only support running -scripts outside of a container. We are considering adding container support in the +an error. Default shell environment variable is ``/bin/bash``. The pre_script is run +before a task and the post_script is run after. Currently, we only support running +scripts outside of a container. We are considering adding container support in the future. +The scripts are assumed to be relative to the task working directory unless specified with an absolute path. ScriptRequirement currently supports the following options: * ``enabled`` - Enables pre/post script support -* ``pre_script`` - Path to the pre_script relative to the workflow directory. -* ``post_script`` - Path to the post_script relative to the workflow directory. +* ``pre_script`` - Path to the pre_script relative to the step working directory or specified absolute path. +* ``post_script`` - Path to the post_script relative to the step working directory or specified absolute path. * ``shell`` - Desired shell interpreter. Must match shell interpreter defined in pre/post scripts. An example ``beeflow:ScriptRequirement`` is shown below:: diff --git a/docs/sphinx/development.rst b/docs/sphinx/development.rst index 0e2778c3c..39d7b8cee 100644 --- a/docs/sphinx/development.rst +++ b/docs/sphinx/development.rst @@ -18,19 +18,20 @@ Additional Poetry documentation: Requirement: Python version 3.11 to 3.14 ------------------------------------------ -Installation Using a Python Environment +Installation Using a Python Environment --------------------------------------- To install Poetry using a python environment, you must first set up the environment using the following -commands (Please note that the name of the environment is not limited to beedev-env). +commands (Please note that the name of the environment is not limited to beedev-env). Please note the following instructions are intended for the Bash shell. If your default shell is Zsh, -the instructions may be slightly different. +the instructions may be slightly different. .. code-block:: mkdir beedev-env python3 -m venv beedev-env - source beedev-env/bin/activate + source beedev-env/bin/activate + pip install --upgrade pip pip install poetry You can make sure Poetry is installed by using the following command: ``poetry --version``.