From 66994e9d853cd6358432b2eace0647764c64e3c2 Mon Sep 17 00:00:00 2001 From: Rudra Tiwari Date: Thu, 4 Dec 2025 13:59:28 +1100 Subject: [PATCH 1/3] Add docstrings to SimpleZipper and SshZipper classes Add comprehensive docstrings to zip_output, async_zip_output, and post_zip_output methods in SimpleZipper and SshZipper classes to match the documentation style used in DummyZipper. This improves code maintainability and API documentation consistency. --- .../harvesterzipper/simple_zipper.py | 32 +++++++++++++++++++ pandaharvester/harvesterzipper/ssh_zipper.py | 32 +++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/pandaharvester/harvesterzipper/simple_zipper.py b/pandaharvester/harvesterzipper/simple_zipper.py index ca558ff1..42a94b4e 100644 --- a/pandaharvester/harvesterzipper/simple_zipper.py +++ b/pandaharvester/harvesterzipper/simple_zipper.py @@ -14,15 +14,47 @@ def __init__(self, **kwarg): # zip output files def zip_output(self, jobspec): + """Zip output files. This method loops over jobspec.outFiles, which is a list of zip file's FileSpecs, + to make a zip file for each zip file's FileSpec. FileSpec.associatedFiles is a list of FileSpecs of + associated files to be zipped. The path of each associated file is available in associated + file's FileSpec.path. Once zip files are made, their FileSpec.path, FileSpec.fsize and + FileSpec.chksum need to be set. + + :param jobspec: job specifications + :type jobspec: JobSpec + :return: A tuple of return code (True for success, False otherwise) and error dialog + :rtype: (bool, string) + """ tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="zip_output") return self.simple_zip_output(jobspec, tmpLog) # asynchronous zip output def async_zip_output(self, jobspec): + """Zip output files asynchronously. This method is followed by post_zip_output(), + which is typically useful to trigger an asynchronous zipping mechanism such as batch job. + This method loops over jobspec.outFiles, which is a list of zip file's FileSpecs, to make + a zip file for each zip file's FileSpec. FileSpec.associatedFiles is a list of FileSpecs + of associated files to be zipped. The path of each associated file is available in associated + file's FileSpec.path. + + :param jobspec: job specifications + :type jobspec: JobSpec + :return: A tuple of return code (True for success, False for fatal, None for pending) and error dialog + :rtype: (bool, string) + """ tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="zip_output") # not really asynchronous as two staged zipping is not implemented in this plugin return self.simple_zip_output(jobspec, tmpLog) # post zipping def post_zip_output(self, jobspec): + """This method is executed after async_zip_output(), to do post-processing for zipping. + Once zip files are made, this method needs to look over jobspec.outFiles to set their + FileSpec.path, FileSpec.fsize, and FileSpec.chksum. + + :param jobspec: job specifications + :type jobspec: JobSpec + :return: A tuple of return code (True for success, False for fatal, None for pending) and error dialog + :rtype: (bool, string) + """ return True, "" diff --git a/pandaharvester/harvesterzipper/ssh_zipper.py b/pandaharvester/harvesterzipper/ssh_zipper.py index d0322290..f09d92b9 100644 --- a/pandaharvester/harvesterzipper/ssh_zipper.py +++ b/pandaharvester/harvesterzipper/ssh_zipper.py @@ -14,15 +14,47 @@ def __init__(self, **kwarg): # zip output files def zip_output(self, jobspec): + """Zip output files. This method loops over jobspec.outFiles, which is a list of zip file's FileSpecs, + to make a zip file for each zip file's FileSpec. FileSpec.associatedFiles is a list of FileSpecs of + associated files to be zipped. The path of each associated file is available in associated + file's FileSpec.path. Once zip files are made, their FileSpec.path, FileSpec.fsize and + FileSpec.chksum need to be set. + + :param jobspec: job specifications + :type jobspec: JobSpec + :return: A tuple of return code (True for success, False otherwise) and error dialog + :rtype: (bool, string) + """ tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="zip_output") return self.ssh_zip_output(jobspec, tmpLog) # asynchronous zip output def async_zip_output(self, jobspec): + """Zip output files asynchronously. This method is followed by post_zip_output(), + which is typically useful to trigger an asynchronous zipping mechanism such as batch job. + This method loops over jobspec.outFiles, which is a list of zip file's FileSpecs, to make + a zip file for each zip file's FileSpec. FileSpec.associatedFiles is a list of FileSpecs + of associated files to be zipped. The path of each associated file is available in associated + file's FileSpec.path. + + :param jobspec: job specifications + :type jobspec: JobSpec + :return: A tuple of return code (True for success, False for fatal, None for pending) and error dialog + :rtype: (bool, string) + """ tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="zip_output") # not really asynchronous as two staged zipping is not implemented in this plugin return self.ssh_zip_output(jobspec, tmpLog) # post zipping def post_zip_output(self, jobspec): + """This method is executed after async_zip_output(), to do post-processing for zipping. + Once zip files are made, this method needs to look over jobspec.outFiles to set their + FileSpec.path, FileSpec.fsize, and FileSpec.chksum. + + :param jobspec: job specifications + :type jobspec: JobSpec + :return: A tuple of return code (True for success, False for fatal, None for pending) and error dialog + :rtype: (bool, string) + """ return True, "" From 6894bfda9f0b181da770f536ee777c0d13e4723a Mon Sep 17 00:00:00 2001 From: Rudra Tiwari Date: Thu, 4 Dec 2025 13:59:33 +1100 Subject: [PATCH 2/3] Improve error handling in SuperfacilityMonitor Enhance error handling in check_workers method to properly catch and handle HTTPError, RequestException, and JSON parsing errors. Add explicit raise_for_status call and improve error messages with proper logging. Add validation for response data structure before accessing nested fields. Add warning log for unknown batch status values. --- .../harvestermonitor/superfacility_monitor.py | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/pandaharvester/harvestermonitor/superfacility_monitor.py b/pandaharvester/harvestermonitor/superfacility_monitor.py index c781ebe4..53e9fa9d 100644 --- a/pandaharvester/harvestermonitor/superfacility_monitor.py +++ b/pandaharvester/harvestermonitor/superfacility_monitor.py @@ -31,15 +31,28 @@ def check_workers(self, workspec_list): try: r = self.sf_client.get(f"/compute/jobs/perlmutter/{jobid}?sacct=true&cached=false") + r.raise_for_status() data = r.json() - #FIXME: How to handle httperror? except requests.HTTPError as e: - newStatus = WorkSpec.ST_failed - retList.append((WorkSpec.ST_failed, f"can not get query slurm job {jobid} due to {e}")) + tmpLog.error(f"HTTP error querying slurm job {jobid}: {e}") + retList.append((WorkSpec.ST_failed, f"HTTP error querying slurm job {jobid}: {e}")) + continue + except requests.RequestException as e: + tmpLog.error(f"Request error querying slurm job {jobid}: {e}") + retList.append((WorkSpec.ST_failed, f"Request error querying slurm job {jobid}: {e}")) + continue + except (KeyError, IndexError, ValueError) as e: + tmpLog.error(f"Error parsing response for slurm job {jobid}: {e}") + retList.append((WorkSpec.ST_failed, f"Error parsing response for slurm job {jobid}: {e}")) + continue + + try: + batchStatus = data["output"][0]['state'].upper() + except (KeyError, IndexError) as e: + tmpLog.error(f"Missing state information in response for slurm job {jobid}: {e}") + retList.append((WorkSpec.ST_failed, f"Missing state information in response for slurm job {jobid}: {e}")) continue - batchStatus = data["output"][0]['state'].upper() - #FIXME: Are these mapping correct? Some do not exist, and some seem mismatch if batchStatus in ["RUNNING", "COMPLETING", "STOPPED", "SUSPENDED"]: newStatus = WorkSpec.ST_running elif batchStatus in ["COMPLETED", "PREEMPTED", "TIMEOUT"]: @@ -49,6 +62,7 @@ def check_workers(self, workspec_list): elif batchStatus in ["CONFIGURING", "PENDING"]: newStatus = WorkSpec.ST_submitted else: + tmpLog.warning(f"Unknown batch status {batchStatus} for job {jobid}, marking as failed") newStatus = WorkSpec.ST_failed tmpLog.debug(f"batchStatus {batchStatus} -> workerStatus {newStatus}") retList.append((newStatus, "")) From 820ab28d92f0a8fbf21237271db2b5ff35af8b26 Mon Sep 17 00:00:00 2001 From: Rudra Tiwari Date: Thu, 4 Dec 2025 13:59:38 +1100 Subject: [PATCH 3/3] Remove outdated FIXME comment in job_spec.py Remove FIXME comment about Python 2.7 set literal usage as Python 2.7 has been end of life since 2020. The current dictionary implementation is correct and appropriate for the use case. --- pandaharvester/harvestercore/job_spec.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pandaharvester/harvestercore/job_spec.py b/pandaharvester/harvestercore/job_spec.py index b7d8dd56..fc3dd5e7 100644 --- a/pandaharvester/harvestercore/job_spec.py +++ b/pandaharvester/harvestercore/job_spec.py @@ -414,7 +414,6 @@ def get_job_attributes_for_panda(self): if self.jobAttributes is None: return data # extract only panda attributes - # FIXME use set literal for python >=2.7 panda_attributes = { "token": "token", "transExitCode": "trans_exit_code",