Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pandaharvester/harvestercore/job_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,6 @@ def get_job_attributes_for_panda(self):
if self.jobAttributes is None:
return data
# extract only panda attributes
# FIXME use set literal for python >=2.7
panda_attributes = {
"token": "token",
"transExitCode": "trans_exit_code",
Expand Down
24 changes: 19 additions & 5 deletions pandaharvester/harvestermonitor/superfacility_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,28 @@ def check_workers(self, workspec_list):

try:
r = self.sf_client.get(f"/compute/jobs/perlmutter/{jobid}?sacct=true&cached=false")
r.raise_for_status()
data = r.json()
#FIXME: How to handle httperror?
except requests.HTTPError as e:
newStatus = WorkSpec.ST_failed
retList.append((WorkSpec.ST_failed, f"can not get query slurm job {jobid} due to {e}"))
tmpLog.error(f"HTTP error querying slurm job {jobid}: {e}")
retList.append((WorkSpec.ST_failed, f"HTTP error querying slurm job {jobid}: {e}"))
continue
except requests.RequestException as e:
tmpLog.error(f"Request error querying slurm job {jobid}: {e}")
retList.append((WorkSpec.ST_failed, f"Request error querying slurm job {jobid}: {e}"))
continue
except (KeyError, IndexError, ValueError) as e:
tmpLog.error(f"Error parsing response for slurm job {jobid}: {e}")
retList.append((WorkSpec.ST_failed, f"Error parsing response for slurm job {jobid}: {e}"))
continue

try:
batchStatus = data["output"][0]['state'].upper()
except (KeyError, IndexError) as e:
tmpLog.error(f"Missing state information in response for slurm job {jobid}: {e}")
retList.append((WorkSpec.ST_failed, f"Missing state information in response for slurm job {jobid}: {e}"))
continue

batchStatus = data["output"][0]['state'].upper()
#FIXME: Are these mapping correct? Some do not exist, and some seem mismatch
if batchStatus in ["RUNNING", "COMPLETING", "STOPPED", "SUSPENDED"]:
newStatus = WorkSpec.ST_running
elif batchStatus in ["COMPLETED", "PREEMPTED", "TIMEOUT"]:
Expand All @@ -49,6 +62,7 @@ def check_workers(self, workspec_list):
elif batchStatus in ["CONFIGURING", "PENDING"]:
newStatus = WorkSpec.ST_submitted
else:
tmpLog.warning(f"Unknown batch status {batchStatus} for job {jobid}, marking as failed")
newStatus = WorkSpec.ST_failed
tmpLog.debug(f"batchStatus {batchStatus} -> workerStatus {newStatus}")
retList.append((newStatus, ""))
Expand Down
32 changes: 32 additions & 0 deletions pandaharvester/harvesterzipper/simple_zipper.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,47 @@ def __init__(self, **kwarg):

# zip output files
def zip_output(self, jobspec):
"""Zip output files. This method loops over jobspec.outFiles, which is a list of zip file's FileSpecs,
to make a zip file for each zip file's FileSpec. FileSpec.associatedFiles is a list of FileSpecs of
associated files to be zipped. The path of each associated file is available in associated
file's FileSpec.path. Once zip files are made, their FileSpec.path, FileSpec.fsize and
FileSpec.chksum need to be set.

:param jobspec: job specifications
:type jobspec: JobSpec
:return: A tuple of return code (True for success, False otherwise) and error dialog
:rtype: (bool, string)
"""
tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="zip_output")
return self.simple_zip_output(jobspec, tmpLog)

# asynchronous zip output
def async_zip_output(self, jobspec):
"""Zip output files asynchronously. This method is followed by post_zip_output(),
which is typically useful to trigger an asynchronous zipping mechanism such as batch job.
This method loops over jobspec.outFiles, which is a list of zip file's FileSpecs, to make
a zip file for each zip file's FileSpec. FileSpec.associatedFiles is a list of FileSpecs
of associated files to be zipped. The path of each associated file is available in associated
file's FileSpec.path.

:param jobspec: job specifications
:type jobspec: JobSpec
:return: A tuple of return code (True for success, False for fatal, None for pending) and error dialog
:rtype: (bool, string)
"""
tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="zip_output")
# not really asynchronous as two staged zipping is not implemented in this plugin
return self.simple_zip_output(jobspec, tmpLog)

# post zipping
def post_zip_output(self, jobspec):
"""This method is executed after async_zip_output(), to do post-processing for zipping.
Once zip files are made, this method needs to look over jobspec.outFiles to set their
FileSpec.path, FileSpec.fsize, and FileSpec.chksum.

:param jobspec: job specifications
:type jobspec: JobSpec
:return: A tuple of return code (True for success, False for fatal, None for pending) and error dialog
:rtype: (bool, string)
"""
return True, ""
32 changes: 32 additions & 0 deletions pandaharvester/harvesterzipper/ssh_zipper.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,47 @@ def __init__(self, **kwarg):

# zip output files
def zip_output(self, jobspec):
"""Zip output files. This method loops over jobspec.outFiles, which is a list of zip file's FileSpecs,
to make a zip file for each zip file's FileSpec. FileSpec.associatedFiles is a list of FileSpecs of
associated files to be zipped. The path of each associated file is available in associated
file's FileSpec.path. Once zip files are made, their FileSpec.path, FileSpec.fsize and
FileSpec.chksum need to be set.

:param jobspec: job specifications
:type jobspec: JobSpec
:return: A tuple of return code (True for success, False otherwise) and error dialog
:rtype: (bool, string)
"""
tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="zip_output")
return self.ssh_zip_output(jobspec, tmpLog)

# asynchronous zip output
def async_zip_output(self, jobspec):
"""Zip output files asynchronously. This method is followed by post_zip_output(),
which is typically useful to trigger an asynchronous zipping mechanism such as batch job.
This method loops over jobspec.outFiles, which is a list of zip file's FileSpecs, to make
a zip file for each zip file's FileSpec. FileSpec.associatedFiles is a list of FileSpecs
of associated files to be zipped. The path of each associated file is available in associated
file's FileSpec.path.

:param jobspec: job specifications
:type jobspec: JobSpec
:return: A tuple of return code (True for success, False for fatal, None for pending) and error dialog
:rtype: (bool, string)
"""
tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="zip_output")
# not really asynchronous as two staged zipping is not implemented in this plugin
return self.ssh_zip_output(jobspec, tmpLog)

# post zipping
def post_zip_output(self, jobspec):
"""This method is executed after async_zip_output(), to do post-processing for zipping.
Once zip files are made, this method needs to look over jobspec.outFiles to set their
FileSpec.path, FileSpec.fsize, and FileSpec.chksum.

:param jobspec: job specifications
:type jobspec: JobSpec
:return: A tuple of return code (True for success, False for fatal, None for pending) and error dialog
:rtype: (bool, string)
"""
return True, ""