diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index ae9f5a69..a0348a26 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "28-01-2026 09:17:13 on flin (by mightqxc)" +timestamp = "13-02-2026 07:45:33 on flin (by mightqxc)" diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index c0378a7f..967b1b49 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -273,7 +273,8 @@ def execute(self, sql, varmap=None): conLock.release() # return if harvester_config.db.verbose: - self.verbLog.debug("thr={0} {1} sql=[{2}]".format(self.thrName, sw.get_elapsed_time(), newSQL.replace("\n", " ").strip())) + sql_str = newSQL.replace("\n", " ").strip() + self.verbLog.debug(f"thr={self.thrName} {sw.get_elapsed_time()} sql=[{sql_str}]") return retVal # wrapper for executemany @@ -1129,6 +1130,7 @@ def get_num_jobs_to_fetch(self, n_queues: int, interval: int, queue_config_mappe # initialize n_queue_limit_job_eval = nQueueLimitJob n_queue_limit_job_cores_eval = nQueueLimitJobCores + n_queue_limit_job_cores_min_eval = nQueueLimitJobCoresMin # dynamic nQueueLimitJob if nQueueLimitJobRatio is not None: n_queue_limit_job_by_ratio = int(job_stats_map["running"]["n"] * nQueueLimitJobRatio / 100) @@ -1139,11 +1141,18 @@ def get_num_jobs_to_fetch(self, n_queues: int, interval: int, queue_config_mappe n_queue_limit_job_eval = min(n_queue_limit_job_eval, n_queue_limit_job_by_ratio) if nQueueLimitJobCoresRatio is not None: n_queue_limit_cores_by_ratio = int(job_stats_map["running"]["core"] * nQueueLimitJobCoresRatio / 100) - if nQueueLimitJobCoresMin is not None and n_queue_limit_cores_by_ratio < nQueueLimitJobCoresMin: + if nQueueLimitJobMin is not None: + # get n_queue_limit_job_cores_min_eval from nQueueLimitJobMin if nQueueLimitJobCoresMin is not set to ensure the minimum cores (1 core per job) + n_queue_limit_job_cores_min_base = nQueueLimitJobMin * 1 + if n_queue_limit_job_cores_min_eval is None: + n_queue_limit_job_cores_min_eval = n_queue_limit_job_cores_min_base + else: + n_queue_limit_job_cores_min_eval = max(n_queue_limit_job_cores_min_eval, n_queue_limit_job_cores_min_base) + if n_queue_limit_job_cores_min_eval is not None and n_queue_limit_cores_by_ratio < n_queue_limit_job_cores_min_eval: if n_queue_limit_job_cores_eval is not None: - n_queue_limit_job_cores_eval = min(n_queue_limit_job_cores_eval, nQueueLimitJobCoresMin) + n_queue_limit_job_cores_eval = min(n_queue_limit_job_cores_eval, n_queue_limit_job_cores_min_eval) else: - n_queue_limit_job_cores_eval = nQueueLimitJobCoresMin + n_queue_limit_job_cores_eval = n_queue_limit_job_cores_min_eval else: if n_queue_limit_job_cores_eval is not None: n_queue_limit_job_cores_eval = min(n_queue_limit_job_cores_eval, n_queue_limit_cores_by_ratio) @@ -4870,7 +4879,9 @@ def get_worker_limits(self, site_name: str, queue_config) -> tuple[dict, dict]: n_queue_limit_worker_eval = nQueueLimitWorker if nQueueLimitWorker is not None else maxWorkers n_queue_limit_worker_per_rt_eval = n_queue_limit_worker_eval n_queue_limit_worker_cores_eval = nQueueLimitWorkerCores + n_queue_limit_worker_cores_min_eval = nQueueLimitWorkerCoresMin n_queue_limit_worker_mem_eval = nQueueLimitWorkerMemory + n_queue_limit_worker_mem_min_eval = nQueueLimitWorkerMemoryMin # dynamic n_queue_limit_worker_eval if nQueueLimitWorkerRatio is not None: n_queue_limit_worker_by_ratio = int(worker_stats_map["running"]["n"] * nQueueLimitWorkerRatio / 100) @@ -4882,11 +4893,18 @@ def get_worker_limits(self, site_name: str, queue_config) -> tuple[dict, dict]: n_queue_limit_worker_per_rt_eval = n_queue_limit_worker_eval if nQueueLimitWorkerCoresRatio is not None: n_queue_limit_cores_by_ratio = int(worker_stats_map["running"]["core"] * nQueueLimitWorkerCoresRatio / 100) - if nQueueLimitWorkerCoresMin is not None and n_queue_limit_cores_by_ratio < nQueueLimitWorkerCoresMin: + if nQueueLimitWorkerMin is not None: + # get n_queue_limit_worker_cores_min_eval from nQueueLimitWorkerMin if nQueueLimitWorkerCoresMin is not set to ensure the minimum cores (1 core per worker) + n_queue_limit_worker_cores_min_base = int(nQueueLimitWorkerMin * 1) + if n_queue_limit_worker_cores_min_eval is None: + n_queue_limit_worker_cores_min_eval = n_queue_limit_worker_cores_min_base + else: + n_queue_limit_worker_cores_min_eval = max(n_queue_limit_worker_cores_min_eval, n_queue_limit_worker_cores_min_base) + if n_queue_limit_worker_cores_min_eval is not None and n_queue_limit_cores_by_ratio < n_queue_limit_worker_cores_min_eval: if n_queue_limit_worker_cores_eval is not None: - n_queue_limit_worker_cores_eval = min(n_queue_limit_worker_cores_eval, nQueueLimitWorkerCoresMin) + n_queue_limit_worker_cores_eval = min(n_queue_limit_worker_cores_eval, n_queue_limit_worker_cores_min_eval) else: - n_queue_limit_worker_cores_eval = nQueueLimitWorkerCoresMin + n_queue_limit_worker_cores_eval = n_queue_limit_worker_cores_min_eval else: if n_queue_limit_worker_cores_eval is not None: n_queue_limit_worker_cores_eval = min(n_queue_limit_worker_cores_eval, n_queue_limit_cores_by_ratio) @@ -4894,11 +4912,18 @@ def get_worker_limits(self, site_name: str, queue_config) -> tuple[dict, dict]: n_queue_limit_worker_cores_eval = n_queue_limit_cores_by_ratio if nQueueLimitWorkerMemoryRatio is not None: n_queue_limit_mem_by_ratio = int(worker_stats_map["running"]["mem"] * nQueueLimitWorkerMemoryRatio / 100) - if nQueueLimitWorkerMemoryMin is not None and n_queue_limit_mem_by_ratio < nQueueLimitWorkerMemoryMin: + if nQueueLimitWorkerMin is not None: + # get n_queue_limit_worker_mem_min_eval from nQueueLimitWorkerMin if nQueueLimitWorkerMemoryMin is not set to ensure the minimum memory (1000 MB per worker) + n_queue_limit_worker_mem_min_base = int(nQueueLimitWorkerMin * 1000) + if n_queue_limit_worker_mem_min_eval is None: + n_queue_limit_worker_mem_min_eval = n_queue_limit_worker_mem_min_base + else: + n_queue_limit_worker_mem_min_eval = max(n_queue_limit_worker_mem_min_eval, n_queue_limit_worker_mem_min_base) + if n_queue_limit_worker_mem_min_eval is not None and n_queue_limit_mem_by_ratio < n_queue_limit_worker_mem_min_eval: if n_queue_limit_worker_mem_eval is not None: - n_queue_limit_worker_mem_eval = min(n_queue_limit_worker_mem_eval, nQueueLimitWorkerMemoryMin) + n_queue_limit_worker_mem_eval = min(n_queue_limit_worker_mem_eval, n_queue_limit_worker_mem_min_eval) else: - n_queue_limit_worker_mem_eval = nQueueLimitWorkerMemoryMin + n_queue_limit_worker_mem_eval = n_queue_limit_worker_mem_min_eval else: if n_queue_limit_worker_mem_eval is not None: n_queue_limit_worker_mem_eval = min(n_queue_limit_worker_mem_eval, n_queue_limit_mem_by_ratio) diff --git a/pandaharvester/harvestercore/worker_errors.py b/pandaharvester/harvestercore/worker_errors.py index c97cf018..a95cdf29 100644 --- a/pandaharvester/harvestercore/worker_errors.py +++ b/pandaharvester/harvestercore/worker_errors.py @@ -39,6 +39,7 @@ 3649: r"File stage-out failed: .*", # 3.6.8 other transfer or http errors 3681: r"curl_easy_perform.* failed .*", + 3689: r"HTTP response .*", # 3.8 Harvester reserved condor errors 3811: r"condor job .* not found", 3812: r"cannot get JobStatus of job .*", @@ -111,6 +112,9 @@ 5570: r"SYSTEM_PERIODIC_HOLD", 5571: r".* Second start not allowed", # INFN-CNAF 5572: r"job aborted due to .*", # FZK-LCG2 + 5573: r"Job runtime longer than reserved", # DESY-HH_NAF + 5574: r"Memory usage higher than .*", # DESY-HH_NAF + 5575: r"Failed to create session directory", # CYFRONET_EOS, praguelcg2_Karolina_MCORE # 5.6 Slurm errors 5601: r"submission command failed \(exit code = .*\).*", 5602: r"no jobId in submission script's output .*", @@ -137,8 +141,9 @@ CMPM_Kill_Remove = { # 8.9 Condor RemoveReason # 8.9.7 Miscellaneous Condor RemoveReason - 8970: r"Python-initiated action\. \(by user .*\)", + 8970: r"Python-initiated action\.* \(by user .*\)", 8971: r"via condor_rm .*", + 8979: r"PeriodicRemove .*", # catch-all for PeriodicRemove condor job classad # 8.9.8+ Condor RemoveReason reserved on Schedds (when HoldReason is insignificant) 8981: r"removed by SYSTEM_PERIODIC_REMOVE due to job restarted undesirably", 8982: r"removed by SYSTEM_PERIODIC_REMOVE due to job held time exceeded .*",