diff --git a/pandaharvester/harvestercore/job_spec.py b/pandaharvester/harvestercore/job_spec.py index b7d8dd56..fc3dd5e7 100644 --- a/pandaharvester/harvestercore/job_spec.py +++ b/pandaharvester/harvestercore/job_spec.py @@ -414,7 +414,6 @@ def get_job_attributes_for_panda(self): if self.jobAttributes is None: return data # extract only panda attributes - # FIXME use set literal for python >=2.7 panda_attributes = { "token": "token", "transExitCode": "trans_exit_code", diff --git a/pandaharvester/harvestermonitor/superfacility_monitor.py b/pandaharvester/harvestermonitor/superfacility_monitor.py index c781ebe4..53e9fa9d 100644 --- a/pandaharvester/harvestermonitor/superfacility_monitor.py +++ b/pandaharvester/harvestermonitor/superfacility_monitor.py @@ -31,15 +31,28 @@ def check_workers(self, workspec_list): try: r = self.sf_client.get(f"/compute/jobs/perlmutter/{jobid}?sacct=true&cached=false") + r.raise_for_status() data = r.json() - #FIXME: How to handle httperror? except requests.HTTPError as e: - newStatus = WorkSpec.ST_failed - retList.append((WorkSpec.ST_failed, f"can not get query slurm job {jobid} due to {e}")) + tmpLog.error(f"HTTP error querying slurm job {jobid}: {e}") + retList.append((WorkSpec.ST_failed, f"HTTP error querying slurm job {jobid}: {e}")) + continue + except requests.RequestException as e: + tmpLog.error(f"Request error querying slurm job {jobid}: {e}") + retList.append((WorkSpec.ST_failed, f"Request error querying slurm job {jobid}: {e}")) + continue + except (KeyError, IndexError, ValueError) as e: + tmpLog.error(f"Error parsing response for slurm job {jobid}: {e}") + retList.append((WorkSpec.ST_failed, f"Error parsing response for slurm job {jobid}: {e}")) + continue + + try: + batchStatus = data["output"][0]['state'].upper() + except (KeyError, IndexError) as e: + tmpLog.error(f"Missing state information in response for slurm job {jobid}: {e}") + retList.append((WorkSpec.ST_failed, f"Missing state information in response for slurm job {jobid}: {e}")) continue - batchStatus = data["output"][0]['state'].upper() - #FIXME: Are these mapping correct? Some do not exist, and some seem mismatch if batchStatus in ["RUNNING", "COMPLETING", "STOPPED", "SUSPENDED"]: newStatus = WorkSpec.ST_running elif batchStatus in ["COMPLETED", "PREEMPTED", "TIMEOUT"]: @@ -49,6 +62,7 @@ def check_workers(self, workspec_list): elif batchStatus in ["CONFIGURING", "PENDING"]: newStatus = WorkSpec.ST_submitted else: + tmpLog.warning(f"Unknown batch status {batchStatus} for job {jobid}, marking as failed") newStatus = WorkSpec.ST_failed tmpLog.debug(f"batchStatus {batchStatus} -> workerStatus {newStatus}") retList.append((newStatus, "")) diff --git a/pandaharvester/harvesterzipper/simple_zipper.py b/pandaharvester/harvesterzipper/simple_zipper.py index ca558ff1..42a94b4e 100644 --- a/pandaharvester/harvesterzipper/simple_zipper.py +++ b/pandaharvester/harvesterzipper/simple_zipper.py @@ -14,15 +14,47 @@ def __init__(self, **kwarg): # zip output files def zip_output(self, jobspec): + """Zip output files. This method loops over jobspec.outFiles, which is a list of zip file's FileSpecs, + to make a zip file for each zip file's FileSpec. FileSpec.associatedFiles is a list of FileSpecs of + associated files to be zipped. The path of each associated file is available in associated + file's FileSpec.path. Once zip files are made, their FileSpec.path, FileSpec.fsize and + FileSpec.chksum need to be set. + + :param jobspec: job specifications + :type jobspec: JobSpec + :return: A tuple of return code (True for success, False otherwise) and error dialog + :rtype: (bool, string) + """ tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="zip_output") return self.simple_zip_output(jobspec, tmpLog) # asynchronous zip output def async_zip_output(self, jobspec): + """Zip output files asynchronously. This method is followed by post_zip_output(), + which is typically useful to trigger an asynchronous zipping mechanism such as batch job. + This method loops over jobspec.outFiles, which is a list of zip file's FileSpecs, to make + a zip file for each zip file's FileSpec. FileSpec.associatedFiles is a list of FileSpecs + of associated files to be zipped. The path of each associated file is available in associated + file's FileSpec.path. + + :param jobspec: job specifications + :type jobspec: JobSpec + :return: A tuple of return code (True for success, False for fatal, None for pending) and error dialog + :rtype: (bool, string) + """ tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="zip_output") # not really asynchronous as two staged zipping is not implemented in this plugin return self.simple_zip_output(jobspec, tmpLog) # post zipping def post_zip_output(self, jobspec): + """This method is executed after async_zip_output(), to do post-processing for zipping. + Once zip files are made, this method needs to look over jobspec.outFiles to set their + FileSpec.path, FileSpec.fsize, and FileSpec.chksum. + + :param jobspec: job specifications + :type jobspec: JobSpec + :return: A tuple of return code (True for success, False for fatal, None for pending) and error dialog + :rtype: (bool, string) + """ return True, "" diff --git a/pandaharvester/harvesterzipper/ssh_zipper.py b/pandaharvester/harvesterzipper/ssh_zipper.py index d0322290..f09d92b9 100644 --- a/pandaharvester/harvesterzipper/ssh_zipper.py +++ b/pandaharvester/harvesterzipper/ssh_zipper.py @@ -14,15 +14,47 @@ def __init__(self, **kwarg): # zip output files def zip_output(self, jobspec): + """Zip output files. This method loops over jobspec.outFiles, which is a list of zip file's FileSpecs, + to make a zip file for each zip file's FileSpec. FileSpec.associatedFiles is a list of FileSpecs of + associated files to be zipped. The path of each associated file is available in associated + file's FileSpec.path. Once zip files are made, their FileSpec.path, FileSpec.fsize and + FileSpec.chksum need to be set. + + :param jobspec: job specifications + :type jobspec: JobSpec + :return: A tuple of return code (True for success, False otherwise) and error dialog + :rtype: (bool, string) + """ tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="zip_output") return self.ssh_zip_output(jobspec, tmpLog) # asynchronous zip output def async_zip_output(self, jobspec): + """Zip output files asynchronously. This method is followed by post_zip_output(), + which is typically useful to trigger an asynchronous zipping mechanism such as batch job. + This method loops over jobspec.outFiles, which is a list of zip file's FileSpecs, to make + a zip file for each zip file's FileSpec. FileSpec.associatedFiles is a list of FileSpecs + of associated files to be zipped. The path of each associated file is available in associated + file's FileSpec.path. + + :param jobspec: job specifications + :type jobspec: JobSpec + :return: A tuple of return code (True for success, False for fatal, None for pending) and error dialog + :rtype: (bool, string) + """ tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="zip_output") # not really asynchronous as two staged zipping is not implemented in this plugin return self.ssh_zip_output(jobspec, tmpLog) # post zipping def post_zip_output(self, jobspec): + """This method is executed after async_zip_output(), to do post-processing for zipping. + Once zip files are made, this method needs to look over jobspec.outFiles to set their + FileSpec.path, FileSpec.fsize, and FileSpec.chksum. + + :param jobspec: job specifications + :type jobspec: JobSpec + :return: A tuple of return code (True for success, False for fatal, None for pending) and error dialog + :rtype: (bool, string) + """ return True, ""