From 0944a26bc0423e274f0def3ee64802ee8520b61e Mon Sep 17 00:00:00 2001 From: mightqxc Date: Mon, 26 Jan 2026 12:16:47 +0100 Subject: [PATCH 1/2] htcondor: compatibility for new condor python api verion >= 25 --- .../harvestermisc/htcondor_utils.py | 47 +++++++++++-------- 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/pandaharvester/harvestermisc/htcondor_utils.py b/pandaharvester/harvestermisc/htcondor_utils.py index 19d4262b..b8702861 100644 --- a/pandaharvester/harvestermisc/htcondor_utils.py +++ b/pandaharvester/harvestermisc/htcondor_utils.py @@ -18,13 +18,19 @@ from pandaharvester.harvestercore.core_utils import SingletonWithID from pandaharvester.harvestercore.fifos import SpecialFIFOBase -# condor python or command api +# === Condor ==================================================== + +CONDOR_API_TYPE = "python" +CONDOR_API_VERSION = 1 + try: - import htcondor + # try to import htcondor version 2 for htcondor version >= 25 + import htcondor2 as htcondor + + CONDOR_API_VERSION = 2 except ImportError: - CONDOR_API = "command" -else: - CONDOR_API = "python" + import htcondor + # =============================================================== @@ -289,7 +295,7 @@ def __init__(self, submissionHost, *args, **kwargs): # Initialize tmpLog.debug("Initializing client") self.lock = threading.Lock() - self.condor_api = CONDOR_API + self.condor_api_type = CONDOR_API_TYPE self.condor_schedd = None self.condor_pool = None # Parse condor command remote options from workspec @@ -305,9 +311,10 @@ def __init__(self, submissionHost, *args, **kwargs): except ValueError: tmpLog.error(f"Invalid submissionHost: {self.submissionHost} . Skipped") # Use Python API or fall back to command - if self.condor_api == "python": + if self.condor_api_type == "python": try: - self.secman = htcondor.SecMan() + if CONDOR_API_VERSION == 1: + self.secman = htcondor.SecMan() self.renew_session(init=True) except Exception as e: tmpLog.error(f"Error when using htcondor Python API. Exception {e.__class__.__name__}: {e}") @@ -321,7 +328,8 @@ def renew_session(self, retry=3, init=False): # Clear security session if not initialization if not init: tmpLog.info("Renew condor session") - self.secman.invalidateAllSessions() + if CONDOR_API_VERSION == 1: + self.secman.invalidateAllSessions() # Recreate collector and schedd object i_try = 1 while i_try <= retry: @@ -346,7 +354,8 @@ def renew_session(self, retry=3, init=False): tmpLog.warning(f"Retry {i_try} times. Still failed. Skipped") return False i_try += 1 - self.secman.invalidateAllSessions() + if CONDOR_API_VERSION == 1: + self.secman.invalidateAllSessions() time.sleep(3) # Sleep time.sleep(3) @@ -396,7 +405,7 @@ def get_all(self, batchIDs_dict=None, allJobs=False, to_update_cache=False): # Get all tmpLog.debug("Start") job_ads_all_dict = {} - if self.condor_api == "python": + if self.condor_api_type == "python": try: job_ads_all_dict = self.query_with_python(batchIDs_dict, allJobs, to_update_cache) except Exception as e: @@ -490,7 +499,7 @@ def query_with_python(self, batchIDs_dict=None, allJobs=False, to_update_cache=F # query from cache def cache_query(constraint=None, projection=CONDOR_JOB_ADS_LIST, timeout=60): - # query from condor xquery and update cache to fifo + # query from condor query and update cache to fifo def update_cache(lockInterval=90): tmpLog.debug("update_cache") # acquire lock with score timestamp @@ -499,13 +508,13 @@ def update_cache(lockInterval=90): if lock_key is not None: # acquired lock, update from condor schedd tmpLog.debug("got lock, updating cache") - jobs_iter_orig = self.schedd.xquery(constraint=constraint, projection=projection) + jobs_iter_orig = self.schedd.query(constraint=constraint, projection=projection) jobs_iter = [] for job in jobs_iter_orig: try: jobs_iter.append(dict(job)) except Exception as e: - tmpLog.error(f"In updating cache schedd xquery; got exception {e.__class__.__name__}: {e} ; {repr(job)}") + tmpLog.error(f"In updating cache schedd query; got exception {e.__class__.__name__}: {e} ; {repr(job)}") timeNow = time.time() cache_fifo.put(jobs_iter, timeNow) self.cache = (jobs_iter, timeNow) @@ -621,7 +630,7 @@ def cleanup_cache(timeout=60): return jobs_iter # query method options - query_method_list = [self.schedd.xquery] + query_method_list = [self.schedd.query] if self.cacheEnable: cache_fifo = CondorQCacheFifo(target=self.submissionHost, id=f"{self.submissionHost},{get_ident()}") query_method_list.insert(0, cache_query) @@ -660,7 +669,7 @@ def cleanup_cache(timeout=60): try: job_ads_dict = dict(job) except Exception as e: - tmpLog.error(f"In doing schedd xquery or history; got exception {e.__class__.__name__}: {e} ; {repr(job)}") + tmpLog.error(f"In doing schedd query or history; got exception {e.__class__.__name__}: {e} ; {repr(job)}") batchid = get_batchid_from_job(job_ads_dict) condor_job_id = f"{self.submissionHost}#{batchid}" job_ads_all_dict[condor_job_id] = job_ads_dict @@ -703,7 +712,7 @@ def submit(self, jdl_list, use_spool=False): # Get all tmpLog.debug("Start") job_ads_all_dict = {} - if self.condor_api == "python": + if self.condor_api_type == "python": try: # TODO: submit_with_python will meet segfault or c++ error after many times of submission; need help from condor team # TODO: submit_with_python_proces has no such error but spawns some processes that will not terminate after harvester stops @@ -854,7 +863,7 @@ def remove(self, batchIDs_list=[]): # Get all tmpLog.debug("Start") job_ads_all_dict = {} - if self.condor_api == "python": + if self.condor_api_type == "python": try: retVal = self.remove_with_python(batchIDs_list) except Exception as e: @@ -944,7 +953,7 @@ def remove_with_python(self, batchIDs_list=[]): clusterids_set = set([get_job_id_tuple_from_batchid(batchid)[0] for batchid in batchIDs_list]) clusterids_str = ",".join(list(clusterids_set)) constraint = f"member(ClusterID, {{{clusterids_str}}}) && JobStatus =!= 3 && JobStatus =!= 4" - jobs_iter = self.schedd.xquery(constraint=constraint, projection=CONDOR_JOB_ADS_LIST) + jobs_iter = self.schedd.query(constraint=constraint, projection=CONDOR_JOB_ADS_LIST) all_batchid_map = {} ok_batchid_list = [] ng_batchid_list = [] From 58272a113b281f9aff44acdf28331ffee4c93a01 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Tue, 27 Jan 2026 16:46:51 +0100 Subject: [PATCH 2/2] v0.7.3 --- pandaharvester/panda_pkg_info.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandaharvester/panda_pkg_info.py b/pandaharvester/panda_pkg_info.py index e7b05778..8ad31d59 100644 --- a/pandaharvester/panda_pkg_info.py +++ b/pandaharvester/panda_pkg_info.py @@ -1 +1 @@ -release_version = "0.7.2" +release_version = "0.7.3"