Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pandaharvester/commit_timestamp.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
timestamp = "28-01-2026 09:17:13 on flin (by mightqxc)"
timestamp = "13-02-2026 07:45:33 on flin (by mightqxc)"
45 changes: 35 additions & 10 deletions pandaharvester/harvestercore/db_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ def execute(self, sql, varmap=None):
conLock.release()
# return
if harvester_config.db.verbose:
self.verbLog.debug("thr={0} {1} sql=[{2}]".format(self.thrName, sw.get_elapsed_time(), newSQL.replace("\n", " ").strip()))
sql_str = newSQL.replace("\n", " ").strip()
self.verbLog.debug(f"thr={self.thrName} {sw.get_elapsed_time()} sql=[{sql_str}]")
return retVal

# wrapper for executemany
Expand Down Expand Up @@ -1129,6 +1130,7 @@ def get_num_jobs_to_fetch(self, n_queues: int, interval: int, queue_config_mappe
# initialize
n_queue_limit_job_eval = nQueueLimitJob
n_queue_limit_job_cores_eval = nQueueLimitJobCores
n_queue_limit_job_cores_min_eval = nQueueLimitJobCoresMin
# dynamic nQueueLimitJob
if nQueueLimitJobRatio is not None:
n_queue_limit_job_by_ratio = int(job_stats_map["running"]["n"] * nQueueLimitJobRatio / 100)
Expand All @@ -1139,11 +1141,18 @@ def get_num_jobs_to_fetch(self, n_queues: int, interval: int, queue_config_mappe
n_queue_limit_job_eval = min(n_queue_limit_job_eval, n_queue_limit_job_by_ratio)
if nQueueLimitJobCoresRatio is not None:
n_queue_limit_cores_by_ratio = int(job_stats_map["running"]["core"] * nQueueLimitJobCoresRatio / 100)
if nQueueLimitJobCoresMin is not None and n_queue_limit_cores_by_ratio < nQueueLimitJobCoresMin:
if nQueueLimitJobMin is not None:
# get n_queue_limit_job_cores_min_eval from nQueueLimitJobMin if nQueueLimitJobCoresMin is not set to ensure the minimum cores (1 core per job)
n_queue_limit_job_cores_min_base = nQueueLimitJobMin * 1
if n_queue_limit_job_cores_min_eval is None:
n_queue_limit_job_cores_min_eval = n_queue_limit_job_cores_min_base
else:
n_queue_limit_job_cores_min_eval = max(n_queue_limit_job_cores_min_eval, n_queue_limit_job_cores_min_base)
if n_queue_limit_job_cores_min_eval is not None and n_queue_limit_cores_by_ratio < n_queue_limit_job_cores_min_eval:
if n_queue_limit_job_cores_eval is not None:
n_queue_limit_job_cores_eval = min(n_queue_limit_job_cores_eval, nQueueLimitJobCoresMin)
n_queue_limit_job_cores_eval = min(n_queue_limit_job_cores_eval, n_queue_limit_job_cores_min_eval)
else:
n_queue_limit_job_cores_eval = nQueueLimitJobCoresMin
n_queue_limit_job_cores_eval = n_queue_limit_job_cores_min_eval
else:
if n_queue_limit_job_cores_eval is not None:
n_queue_limit_job_cores_eval = min(n_queue_limit_job_cores_eval, n_queue_limit_cores_by_ratio)
Expand Down Expand Up @@ -4870,7 +4879,9 @@ def get_worker_limits(self, site_name: str, queue_config) -> tuple[dict, dict]:
n_queue_limit_worker_eval = nQueueLimitWorker if nQueueLimitWorker is not None else maxWorkers
n_queue_limit_worker_per_rt_eval = n_queue_limit_worker_eval
n_queue_limit_worker_cores_eval = nQueueLimitWorkerCores
n_queue_limit_worker_cores_min_eval = nQueueLimitWorkerCoresMin
n_queue_limit_worker_mem_eval = nQueueLimitWorkerMemory
n_queue_limit_worker_mem_min_eval = nQueueLimitWorkerMemoryMin
# dynamic n_queue_limit_worker_eval
if nQueueLimitWorkerRatio is not None:
n_queue_limit_worker_by_ratio = int(worker_stats_map["running"]["n"] * nQueueLimitWorkerRatio / 100)
Expand All @@ -4882,23 +4893,37 @@ def get_worker_limits(self, site_name: str, queue_config) -> tuple[dict, dict]:
n_queue_limit_worker_per_rt_eval = n_queue_limit_worker_eval
if nQueueLimitWorkerCoresRatio is not None:
n_queue_limit_cores_by_ratio = int(worker_stats_map["running"]["core"] * nQueueLimitWorkerCoresRatio / 100)
if nQueueLimitWorkerCoresMin is not None and n_queue_limit_cores_by_ratio < nQueueLimitWorkerCoresMin:
if nQueueLimitWorkerMin is not None:
# get n_queue_limit_worker_cores_min_eval from nQueueLimitWorkerMin if nQueueLimitWorkerCoresMin is not set to ensure the minimum cores (1 core per worker)
n_queue_limit_worker_cores_min_base = int(nQueueLimitWorkerMin * 1)
if n_queue_limit_worker_cores_min_eval is None:
n_queue_limit_worker_cores_min_eval = n_queue_limit_worker_cores_min_base
else:
n_queue_limit_worker_cores_min_eval = max(n_queue_limit_worker_cores_min_eval, n_queue_limit_worker_cores_min_base)
if n_queue_limit_worker_cores_min_eval is not None and n_queue_limit_cores_by_ratio < n_queue_limit_worker_cores_min_eval:
if n_queue_limit_worker_cores_eval is not None:
n_queue_limit_worker_cores_eval = min(n_queue_limit_worker_cores_eval, nQueueLimitWorkerCoresMin)
n_queue_limit_worker_cores_eval = min(n_queue_limit_worker_cores_eval, n_queue_limit_worker_cores_min_eval)
else:
n_queue_limit_worker_cores_eval = nQueueLimitWorkerCoresMin
n_queue_limit_worker_cores_eval = n_queue_limit_worker_cores_min_eval
else:
if n_queue_limit_worker_cores_eval is not None:
n_queue_limit_worker_cores_eval = min(n_queue_limit_worker_cores_eval, n_queue_limit_cores_by_ratio)
else:
n_queue_limit_worker_cores_eval = n_queue_limit_cores_by_ratio
if nQueueLimitWorkerMemoryRatio is not None:
n_queue_limit_mem_by_ratio = int(worker_stats_map["running"]["mem"] * nQueueLimitWorkerMemoryRatio / 100)
if nQueueLimitWorkerMemoryMin is not None and n_queue_limit_mem_by_ratio < nQueueLimitWorkerMemoryMin:
if nQueueLimitWorkerMin is not None:
# get n_queue_limit_worker_mem_min_eval from nQueueLimitWorkerMin if nQueueLimitWorkerMemoryMin is not set to ensure the minimum memory (1000 MB per worker)
n_queue_limit_worker_mem_min_base = int(nQueueLimitWorkerMin * 1000)
if n_queue_limit_worker_mem_min_eval is None:
n_queue_limit_worker_mem_min_eval = n_queue_limit_worker_mem_min_base
else:
n_queue_limit_worker_mem_min_eval = max(n_queue_limit_worker_mem_min_eval, n_queue_limit_worker_mem_min_base)
if n_queue_limit_worker_mem_min_eval is not None and n_queue_limit_mem_by_ratio < n_queue_limit_worker_mem_min_eval:
if n_queue_limit_worker_mem_eval is not None:
n_queue_limit_worker_mem_eval = min(n_queue_limit_worker_mem_eval, nQueueLimitWorkerMemoryMin)
n_queue_limit_worker_mem_eval = min(n_queue_limit_worker_mem_eval, n_queue_limit_worker_mem_min_eval)
else:
n_queue_limit_worker_mem_eval = nQueueLimitWorkerMemoryMin
n_queue_limit_worker_mem_eval = n_queue_limit_worker_mem_min_eval
else:
if n_queue_limit_worker_mem_eval is not None:
n_queue_limit_worker_mem_eval = min(n_queue_limit_worker_mem_eval, n_queue_limit_mem_by_ratio)
Expand Down
7 changes: 6 additions & 1 deletion pandaharvester/harvestercore/worker_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
3649: r"File stage-out failed: .*",
# 3.6.8 other transfer or http errors
3681: r"curl_easy_perform.* failed .*",
3689: r"HTTP response .*",
# 3.8 Harvester reserved condor errors
3811: r"condor job .* not found",
3812: r"cannot get JobStatus of job .*",
Expand Down Expand Up @@ -111,6 +112,9 @@
5570: r"SYSTEM_PERIODIC_HOLD",
5571: r".* Second start not allowed", # INFN-CNAF
5572: r"job aborted due to .*", # FZK-LCG2
5573: r"Job runtime longer than reserved", # DESY-HH_NAF
5574: r"Memory usage higher than .*", # DESY-HH_NAF
5575: r"Failed to create session directory", # CYFRONET_EOS, praguelcg2_Karolina_MCORE
# 5.6 Slurm errors
5601: r"submission command failed \(exit code = .*\).*",
5602: r"no jobId in submission script's output .*",
Expand All @@ -137,8 +141,9 @@
CMPM_Kill_Remove = {
# 8.9 Condor RemoveReason
# 8.9.7 Miscellaneous Condor RemoveReason
8970: r"Python-initiated action\. \(by user .*\)",
8970: r"Python-initiated action\.* \(by user .*\)",
8971: r"via condor_rm .*",
8979: r"PeriodicRemove .*", # catch-all for PeriodicRemove condor job classad
# 8.9.8+ Condor RemoveReason reserved on Schedds (when HoldReason is insignificant)
8981: r"removed by SYSTEM_PERIODIC_REMOVE due to job restarted undesirably",
8982: r"removed by SYSTEM_PERIODIC_REMOVE due to job held time exceeded .*",
Expand Down