From cf24e7ce019bb6e6202412d0c406f7cf70b3db9e Mon Sep 17 00:00:00 2001 From: Chet Nieter Date: Wed, 7 Dec 2016 14:40:40 -0500 Subject: [PATCH 1/8] Added method to call json conversion script. Added method to the NWChem workflow that calls the json conversion script. Currently calling it from the update_output method. --- .../hpccloud/taskflow/nwchem/__init__.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/server/taskflows/hpccloud/taskflow/nwchem/__init__.py b/server/taskflows/hpccloud/taskflow/nwchem/__init__.py index f7ad4ea6..ae90f496 100644 --- a/server/taskflows/hpccloud/taskflow/nwchem/__init__.py +++ b/server/taskflows/hpccloud/taskflow/nwchem/__init__.py @@ -26,6 +26,7 @@ from cumulus.tasks.job import submit_job, _monitor_jobs from cumulus.tasks.job import download_job_input_folders from cumulus.tasks.job import upload_job_output_to_folder, job_directory +from cumulus.tasks.job import _put_script from cumulus.transport import get_connection from hpccloud.taskflow.utility import * @@ -84,6 +85,17 @@ def create_geometry_symlink(task, job, cluster, fileName): with get_connection(task.taskflow.girder_token, cluster) as conn: conn.execute('ln -s %s %s' % (filePath, linkPath)) +def create_json_output(task, job, cluster): + job_dir = job_directory(cluster, job) + cmds = ['cd %s' % job_dir] + outFile = '%s-%s.o%s' % (job['name'], os.path.basename(job_dir), job['queueJobId']) + nwchem_cmd = 'python /opt/NWChemOutputToJson/NWChemJsonConversion.py %s\n' % outFile + cmds.append(nwchem_cmd) + + with get_connection(task.taskflow.girder_token, cluster) as conn: + cmd = _put_script(conn, '\n'.join(cmds)) + conn.execute(cmd) + @cumulus.taskflow.task def setup_input(task, *args, **kwargs): input_folder_id = kwargs['input']['folder']['id'] @@ -131,7 +143,6 @@ def create_job(task, upstream_result): task.taskflow.logger.info('Create NWChem job.') input_folder_id = upstream_result['input']['folder']['id'] - # TODO: setup command to run with mpi body = { 'name': 'nwchem_run', 'commands': [ @@ -210,7 +221,6 @@ def monitor_nwchem_job(task, upstream_result): task.throws=(Retry,), job = upstream_result['job'] - # TODO - We are currently reaching in and used a 'private' function _monitor_jobs(task, cluster, [job], girder_token=girder_token, monitor_interval=30) return upstream_result @@ -222,6 +232,8 @@ def upload_output(task, upstream_result): cluster = upstream_result['cluster'] job = upstream_result['job'] + create_json_output(task, job, cluster) + client = create_girder_client( task.taskflow.girder_api_url, task.taskflow.girder_token) From 70e29e3cb2be98c73b8d29c53e7711d4e56a63ca Mon Sep 17 00:00:00 2001 From: Chet Nieter Date: Thu, 8 Dec 2016 10:02:48 -0500 Subject: [PATCH 2/8] Added redirect to nwchen submission script. Added redirect to nwchem submission script. This means the file holding the standard output from nwchem will have a name that is independent of the job scheduler used making easier to run the json converter on it. --- server/taskflows/hpccloud/taskflow/nwchem/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/taskflows/hpccloud/taskflow/nwchem/__init__.py b/server/taskflows/hpccloud/taskflow/nwchem/__init__.py index ae90f496..da625cfb 100644 --- a/server/taskflows/hpccloud/taskflow/nwchem/__init__.py +++ b/server/taskflows/hpccloud/taskflow/nwchem/__init__.py @@ -88,7 +88,7 @@ def create_geometry_symlink(task, job, cluster, fileName): def create_json_output(task, job, cluster): job_dir = job_directory(cluster, job) cmds = ['cd %s' % job_dir] - outFile = '%s-%s.o%s' % (job['name'], os.path.basename(job_dir), job['queueJobId']) + outFile = '%s.out' % (job['name']) nwchem_cmd = 'python /opt/NWChemOutputToJson/NWChemJsonConversion.py %s\n' % outFile cmds.append(nwchem_cmd) @@ -146,7 +146,7 @@ def create_job(task, upstream_result): body = { 'name': 'nwchem_run', 'commands': [ - "mpiexec -n %s nwchem input/%s" % ( + "mpiexec -n %s nwchem input/%s &> nwchem_run.out" % ( upstream_result['numberOfProcs'], upstream_result['nwFilename']) ], From dc65096e20a66f647fb9e084e516df32cd9c56bb Mon Sep 17 00:00:00 2001 From: Chet Nieter Date: Thu, 8 Dec 2016 10:44:35 -0500 Subject: [PATCH 3/8] Moved json conversion to its own task. Moved the json conversion from the upload_output task to its own task. --- .../hpccloud/taskflow/nwchem/__init__.py | 30 +++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/server/taskflows/hpccloud/taskflow/nwchem/__init__.py b/server/taskflows/hpccloud/taskflow/nwchem/__init__.py index da625cfb..a7865f76 100644 --- a/server/taskflows/hpccloud/taskflow/nwchem/__init__.py +++ b/server/taskflows/hpccloud/taskflow/nwchem/__init__.py @@ -72,6 +72,7 @@ def start(self, *args, **kwargs): submit.s() | \ submit_nwchem_job.s() | \ monitor_nwchem_job.s().set(queue='monitor') | \ + create_json_output.s() | \ upload_output.s() ) super(NWChemTaskFlow, self).start(self, *args, **kwargs) @@ -85,16 +86,6 @@ def create_geometry_symlink(task, job, cluster, fileName): with get_connection(task.taskflow.girder_token, cluster) as conn: conn.execute('ln -s %s %s' % (filePath, linkPath)) -def create_json_output(task, job, cluster): - job_dir = job_directory(cluster, job) - cmds = ['cd %s' % job_dir] - outFile = '%s.out' % (job['name']) - nwchem_cmd = 'python /opt/NWChemOutputToJson/NWChemJsonConversion.py %s\n' % outFile - cmds.append(nwchem_cmd) - - with get_connection(task.taskflow.girder_token, cluster) as conn: - cmd = _put_script(conn, '\n'.join(cmds)) - conn.execute(cmd) @cumulus.taskflow.task def setup_input(task, *args, **kwargs): @@ -225,6 +216,23 @@ def monitor_nwchem_job(task, upstream_result): return upstream_result +@cumulus.taskflow.task +def create_json_output(task, upstream_result): + task.logger.info('Converting nwchem output to json format.') + cluster = upstream_result['cluster'] + job = upstream_result['job'] + job_dir = job_directory(cluster, job) + cmds = ['cd %s' % job_dir] + outFile = '%s.out' % (job['name']) + conversion_cmd = 'python /opt/NWChemOutputToJson/NWChemJsonConversion.py %s\n' % outFile + cmds.append(conversion_cmd) + + with get_connection(task.taskflow.girder_token, cluster) as conn: + cmd = _put_script(conn, '\n'.join(cmds)) + conn.execute(cmd) + + return upstream_result + @cumulus.taskflow.task def upload_output(task, upstream_result): task.taskflow.logger.info('Uploading results from cluster') @@ -232,8 +240,6 @@ def upload_output(task, upstream_result): cluster = upstream_result['cluster'] job = upstream_result['job'] - create_json_output(task, job, cluster) - client = create_girder_client( task.taskflow.girder_api_url, task.taskflow.girder_token) From 1d359452ed53b4eccd753cc065b38eed3c8a0adc Mon Sep 17 00:00:00 2001 From: Chet Nieter Date: Mon, 12 Dec 2016 16:35:15 -0500 Subject: [PATCH 4/8] Initial Dockerfile for nwchem-json container. Initial Dockerfile for container to hold the nwchem json converter and eventually avogadro. I was able to run the json conversion by running docker run with -v to mount the directory with the nwchem output file. --- docker/Dockerfile | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 docker/Dockerfile diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 00000000..6b7e1caf --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,18 @@ +FROM ubuntu:16.04 +MAINTAINER Chet Nieter + +# +# Run using docker run -rm -v :/hpccloud +# + +RUN apt-get -y update && apt-get install -y \ + git \ + python \ +&& rm -rf /var/lib/apt/lits/* + +RUN mkdir /hpccloud +WORKDIR /hpccloud + +RUN git clone https://github.com/wadejong/NWChemOutputToJson.git /opt/NWChemOutputToJson + +CMD ["python", "/opt/NWChemOutputToJson/NWChemJsonConversion.py", "/hpccloud/nwchem_run.out"] \ No newline at end of file From 23df0c42346bca3abe55a2be5b3ed42a2707dc94 Mon Sep 17 00:00:00 2001 From: Chet Nieter Date: Wed, 14 Dec 2016 16:22:33 -0500 Subject: [PATCH 5/8] Using docker container for nwchem json conversion. Switched to running the json conversion script for nwchem in a Docker container. The nwchem tasks now copies the output file to the server, runs the docker container to generate the json, and then copies the json output back to the cluster. Several things need to be cleaned up like the hard-coded settings for the docker container. --- .../hpccloud/taskflow/nwchem/__init__.py | 46 +++++++++++++++---- 1 file changed, 38 insertions(+), 8 deletions(-) diff --git a/server/taskflows/hpccloud/taskflow/nwchem/__init__.py b/server/taskflows/hpccloud/taskflow/nwchem/__init__.py index a7865f76..4cf45a44 100644 --- a/server/taskflows/hpccloud/taskflow/nwchem/__init__.py +++ b/server/taskflows/hpccloud/taskflow/nwchem/__init__.py @@ -16,8 +16,11 @@ # See the License for the specific language governing permissions and # limitations under the License. ############################################################################### +import tempfile import json import os +import subprocess +import shutil from jsonpath_rw import parse from celery.exceptions import Retry @@ -222,14 +225,41 @@ def create_json_output(task, upstream_result): cluster = upstream_result['cluster'] job = upstream_result['job'] job_dir = job_directory(cluster, job) - cmds = ['cd %s' % job_dir] - outFile = '%s.out' % (job['name']) - conversion_cmd = 'python /opt/NWChemOutputToJson/NWChemJsonConversion.py %s\n' % outFile - cmds.append(conversion_cmd) - - with get_connection(task.taskflow.girder_token, cluster) as conn: - cmd = _put_script(conn, '\n'.join(cmds)) - conn.execute(cmd) + out_file = '%s.out' % (job['name']) + + try: + # Copy the nwchem output to server + tmp_dir = tempfile.mkdtemp() + cluster_path = os.path.join(job_dir, out_file) + local_path = os.path.join(tmp_dir, out_file) + with open(local_path, 'w') as local_fp: + with get_connection(task.taskflow.girder_token, cluster) as conn: + with conn.get(cluster_path) as remote_fp: + local_fp.write(remote_fp.read()) + + # Run docker container to post-process results - need to add docker image to upstream_result + command = ['docker', 'run', '--rm', '-v', '%s:/hpccloud' % tmp_dir, 'chetnieter/nwchem-postprocess'] + p = subprocess.Popen(args=command, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + + if p.returncode != 0: + print('Error running Docker container.') + print('STDOUT: ' + stdout) + print('STDERR: ' + stderr) + raise Exception('Docker returned code {}.'.format(p.returncode)) + + # Copy json file back to cluster? + cluster_path = os.path.join(job_dir, out_file + '.json') + local_path = os.path.join(tmp_dir, out_file + '.json') + with get_connection(task.taskflow.girder_token, cluster) as conn: + with open(local_path, 'r') as local_fp: + conn.put(local_fp, cluster_path) + + # Delete temporary storage + finally: + if os.path.exists(tmp_dir): + shutil.rmtree(tmp_dir) return upstream_result From e8a050cc206ca233a6d973612622d1f562bda61f Mon Sep 17 00:00:00 2001 From: Chet Nieter Date: Thu, 15 Dec 2016 16:50:17 -0500 Subject: [PATCH 6/8] Passing in nwchem output filename on command line. Now passing in the nwchem output filename to the docker run command rather than having it hard-coded in the docker image. --- docker/Dockerfile | 2 +- server/taskflows/hpccloud/taskflow/nwchem/__init__.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index 6b7e1caf..24c9563c 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -15,4 +15,4 @@ WORKDIR /hpccloud RUN git clone https://github.com/wadejong/NWChemOutputToJson.git /opt/NWChemOutputToJson -CMD ["python", "/opt/NWChemOutputToJson/NWChemJsonConversion.py", "/hpccloud/nwchem_run.out"] \ No newline at end of file +ENTRYPOINT ["python", "/opt/NWChemOutputToJson/NWChemJsonConversion.py"] \ No newline at end of file diff --git a/server/taskflows/hpccloud/taskflow/nwchem/__init__.py b/server/taskflows/hpccloud/taskflow/nwchem/__init__.py index 4cf45a44..fc6291cb 100644 --- a/server/taskflows/hpccloud/taskflow/nwchem/__init__.py +++ b/server/taskflows/hpccloud/taskflow/nwchem/__init__.py @@ -238,7 +238,8 @@ def create_json_output(task, upstream_result): local_fp.write(remote_fp.read()) # Run docker container to post-process results - need to add docker image to upstream_result - command = ['docker', 'run', '--rm', '-v', '%s:/hpccloud' % tmp_dir, 'chetnieter/nwchem-postprocess'] + command = ['docker', 'run', '--rm', '-v', '%s:/hpccloud' % tmp_dir, + 'chetnieter/nwchem-postprocess', out_file] p = subprocess.Popen(args=command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() From 0407b9efac9a74adef2cb9c5bfc04a90ecd52885 Mon Sep 17 00:00:00 2001 From: Chet Nieter Date: Fri, 16 Dec 2016 14:36:19 -0500 Subject: [PATCH 7/8] Clean up before pausing work. Some minor clean up including fixing a comment and removing a python module that is not being used. --- docker/Dockerfile | 3 ++- server/taskflows/hpccloud/taskflow/nwchem/__init__.py | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index 24c9563c..bd37a054 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -2,7 +2,8 @@ FROM ubuntu:16.04 MAINTAINER Chet Nieter # -# Run using docker run -rm -v :/hpccloud +# Run using the following command in teh +# docker run -rm -v :/hpccloud # RUN apt-get -y update && apt-get install -y \ diff --git a/server/taskflows/hpccloud/taskflow/nwchem/__init__.py b/server/taskflows/hpccloud/taskflow/nwchem/__init__.py index fc6291cb..d38cc7cf 100644 --- a/server/taskflows/hpccloud/taskflow/nwchem/__init__.py +++ b/server/taskflows/hpccloud/taskflow/nwchem/__init__.py @@ -29,7 +29,6 @@ from cumulus.tasks.job import submit_job, _monitor_jobs from cumulus.tasks.job import download_job_input_folders from cumulus.tasks.job import upload_job_output_to_folder, job_directory -from cumulus.tasks.job import _put_script from cumulus.transport import get_connection from hpccloud.taskflow.utility import * From a7bd2df4e8d305f7691dc08abb26a5094afbba0e Mon Sep 17 00:00:00 2001 From: Chet Nieter Date: Mon, 19 Dec 2016 13:58:08 -0500 Subject: [PATCH 8/8] Changes from code review. Sone changes from code review. This includes moving the docker file to a more appropriate location and in a sub-folder that reflects the associated taskflow. Also passing stderr and stdout from any failed calls to docker in nwchem task flow. Using context manager for temporary directory which adds dependency on backports module. Uploading json output directly to girder rather than copying it back to the cluster still needs to be done. --- requirements.txt | 1 + {docker => server/docker/nwchem}/Dockerfile | 0 .../hpccloud/taskflow/nwchem/__init__.py | 17 ++++++----------- 3 files changed, 7 insertions(+), 11 deletions(-) rename {docker => server/docker/nwchem}/Dockerfile (100%) diff --git a/requirements.txt b/requirements.txt index d6ece693..d74f1c62 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +backports.tempfile==1.0rc1 jsonschema==2.4.0 jsonpath-rw==1.4.0 jsonpath-rw-ext==1.0.0 diff --git a/docker/Dockerfile b/server/docker/nwchem/Dockerfile similarity index 100% rename from docker/Dockerfile rename to server/docker/nwchem/Dockerfile diff --git a/server/taskflows/hpccloud/taskflow/nwchem/__init__.py b/server/taskflows/hpccloud/taskflow/nwchem/__init__.py index d38cc7cf..b8a0711d 100644 --- a/server/taskflows/hpccloud/taskflow/nwchem/__init__.py +++ b/server/taskflows/hpccloud/taskflow/nwchem/__init__.py @@ -16,11 +16,11 @@ # See the License for the specific language governing permissions and # limitations under the License. ############################################################################### -import tempfile import json import os import subprocess import shutil +from backports.tempfile import TemporaryDirectory from jsonpath_rw import parse from celery.exceptions import Retry @@ -226,9 +226,8 @@ def create_json_output(task, upstream_result): job_dir = job_directory(cluster, job) out_file = '%s.out' % (job['name']) - try: + with TemporaryDirectory() as tmp_dir: # Copy the nwchem output to server - tmp_dir = tempfile.mkdtemp() cluster_path = os.path.join(job_dir, out_file) local_path = os.path.join(tmp_dir, out_file) with open(local_path, 'w') as local_fp: @@ -244,9 +243,10 @@ def create_json_output(task, upstream_result): stdout, stderr = p.communicate() if p.returncode != 0: - print('Error running Docker container.') - print('STDOUT: ' + stdout) - print('STDERR: ' + stderr) + task + task.logger.error('Error running Docker container.') + task.logger.error('STDOUT: ' + stdout) + task.logger.error('STDERR: ' + stderr) raise Exception('Docker returned code {}.'.format(p.returncode)) # Copy json file back to cluster? @@ -256,11 +256,6 @@ def create_json_output(task, upstream_result): with open(local_path, 'r') as local_fp: conn.put(local_fp, cluster_path) - # Delete temporary storage - finally: - if os.path.exists(tmp_dir): - shutil.rmtree(tmp_dir) - return upstream_result @cumulus.taskflow.task