Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 28 additions & 19 deletions pandaharvester/harvestermisc/htcondor_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@
from pandaharvester.harvestercore.core_utils import SingletonWithID
from pandaharvester.harvestercore.fifos import SpecialFIFOBase

# condor python or command api
# === Condor ====================================================

CONDOR_API_TYPE = "python"
CONDOR_API_VERSION = 1

try:
import htcondor
# try to import htcondor version 2 for htcondor version >= 25
import htcondor2 as htcondor

CONDOR_API_VERSION = 2
except ImportError:
CONDOR_API = "command"
else:
CONDOR_API = "python"
import htcondor


# ===============================================================

Expand Down Expand Up @@ -289,7 +295,7 @@ def __init__(self, submissionHost, *args, **kwargs):
# Initialize
tmpLog.debug("Initializing client")
self.lock = threading.Lock()
self.condor_api = CONDOR_API
self.condor_api_type = CONDOR_API_TYPE
self.condor_schedd = None
self.condor_pool = None
# Parse condor command remote options from workspec
Expand All @@ -305,9 +311,10 @@ def __init__(self, submissionHost, *args, **kwargs):
except ValueError:
tmpLog.error(f"Invalid submissionHost: {self.submissionHost} . Skipped")
# Use Python API or fall back to command
if self.condor_api == "python":
if self.condor_api_type == "python":
try:
self.secman = htcondor.SecMan()
if CONDOR_API_VERSION == 1:
self.secman = htcondor.SecMan()
self.renew_session(init=True)
except Exception as e:
tmpLog.error(f"Error when using htcondor Python API. Exception {e.__class__.__name__}: {e}")
Expand All @@ -321,7 +328,8 @@ def renew_session(self, retry=3, init=False):
# Clear security session if not initialization
if not init:
tmpLog.info("Renew condor session")
self.secman.invalidateAllSessions()
if CONDOR_API_VERSION == 1:
self.secman.invalidateAllSessions()
# Recreate collector and schedd object
i_try = 1
while i_try <= retry:
Expand All @@ -346,7 +354,8 @@ def renew_session(self, retry=3, init=False):
tmpLog.warning(f"Retry {i_try} times. Still failed. Skipped")
return False
i_try += 1
self.secman.invalidateAllSessions()
if CONDOR_API_VERSION == 1:
self.secman.invalidateAllSessions()
time.sleep(3)
# Sleep
time.sleep(3)
Expand Down Expand Up @@ -396,7 +405,7 @@ def get_all(self, batchIDs_dict=None, allJobs=False, to_update_cache=False):
# Get all
tmpLog.debug("Start")
job_ads_all_dict = {}
if self.condor_api == "python":
if self.condor_api_type == "python":
try:
job_ads_all_dict = self.query_with_python(batchIDs_dict, allJobs, to_update_cache)
except Exception as e:
Expand Down Expand Up @@ -490,7 +499,7 @@ def query_with_python(self, batchIDs_dict=None, allJobs=False, to_update_cache=F
# query from cache

def cache_query(constraint=None, projection=CONDOR_JOB_ADS_LIST, timeout=60):
# query from condor xquery and update cache to fifo
# query from condor query and update cache to fifo
def update_cache(lockInterval=90):
tmpLog.debug("update_cache")
# acquire lock with score timestamp
Expand All @@ -499,13 +508,13 @@ def update_cache(lockInterval=90):
if lock_key is not None:
# acquired lock, update from condor schedd
tmpLog.debug("got lock, updating cache")
jobs_iter_orig = self.schedd.xquery(constraint=constraint, projection=projection)
jobs_iter_orig = self.schedd.query(constraint=constraint, projection=projection)
jobs_iter = []
for job in jobs_iter_orig:
try:
jobs_iter.append(dict(job))
except Exception as e:
tmpLog.error(f"In updating cache schedd xquery; got exception {e.__class__.__name__}: {e} ; {repr(job)}")
tmpLog.error(f"In updating cache schedd query; got exception {e.__class__.__name__}: {e} ; {repr(job)}")
timeNow = time.time()
cache_fifo.put(jobs_iter, timeNow)
self.cache = (jobs_iter, timeNow)
Expand Down Expand Up @@ -621,7 +630,7 @@ def cleanup_cache(timeout=60):
return jobs_iter

# query method options
query_method_list = [self.schedd.xquery]
query_method_list = [self.schedd.query]
if self.cacheEnable:
cache_fifo = CondorQCacheFifo(target=self.submissionHost, id=f"{self.submissionHost},{get_ident()}")
query_method_list.insert(0, cache_query)
Expand Down Expand Up @@ -660,7 +669,7 @@ def cleanup_cache(timeout=60):
try:
job_ads_dict = dict(job)
except Exception as e:
tmpLog.error(f"In doing schedd xquery or history; got exception {e.__class__.__name__}: {e} ; {repr(job)}")
tmpLog.error(f"In doing schedd query or history; got exception {e.__class__.__name__}: {e} ; {repr(job)}")
batchid = get_batchid_from_job(job_ads_dict)
condor_job_id = f"{self.submissionHost}#{batchid}"
job_ads_all_dict[condor_job_id] = job_ads_dict
Expand Down Expand Up @@ -703,7 +712,7 @@ def submit(self, jdl_list, use_spool=False):
# Get all
tmpLog.debug("Start")
job_ads_all_dict = {}
if self.condor_api == "python":
if self.condor_api_type == "python":
try:
# TODO: submit_with_python will meet segfault or c++ error after many times of submission; need help from condor team
# TODO: submit_with_python_proces has no such error but spawns some processes that will not terminate after harvester stops
Expand Down Expand Up @@ -854,7 +863,7 @@ def remove(self, batchIDs_list=[]):
# Get all
tmpLog.debug("Start")
job_ads_all_dict = {}
if self.condor_api == "python":
if self.condor_api_type == "python":
try:
retVal = self.remove_with_python(batchIDs_list)
except Exception as e:
Expand Down Expand Up @@ -944,7 +953,7 @@ def remove_with_python(self, batchIDs_list=[]):
clusterids_set = set([get_job_id_tuple_from_batchid(batchid)[0] for batchid in batchIDs_list])
clusterids_str = ",".join(list(clusterids_set))
constraint = f"member(ClusterID, {{{clusterids_str}}}) && JobStatus =!= 3 && JobStatus =!= 4"
jobs_iter = self.schedd.xquery(constraint=constraint, projection=CONDOR_JOB_ADS_LIST)
jobs_iter = self.schedd.query(constraint=constraint, projection=CONDOR_JOB_ADS_LIST)
all_batchid_map = {}
ok_batchid_list = []
ng_batchid_list = []
Expand Down
2 changes: 1 addition & 1 deletion pandaharvester/panda_pkg_info.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "0.7.2"
release_version = "0.7.3"