diff --git a/pandaharvester/harvestermisc/k8s_utils.py b/pandaharvester/harvestermisc/k8s_utils.py index 18fb7c01..850d139e 100644 --- a/pandaharvester/harvestermisc/k8s_utils.py +++ b/pandaharvester/harvestermisc/k8s_utils.py @@ -282,7 +282,7 @@ def create_job_from_yaml( tmp_log.debug(f"creating job {yaml_content}") - rsp = self.batchv1.create_namespaced_job(body=yaml_content, namespace=self.namespace) + rsp = self.batchv1.create_namespaced_job(body=yaml_content, namespace=self.namespace, _request_timeout=(5, 20)) return rsp, yaml_content def generate_ls_from_wsl(self, workspec_list=[]): @@ -339,7 +339,7 @@ def get_pods_info(self, label_selector): tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="get_pods_info") try: - pods = self.corev1.list_namespaced_pod(namespace=self.namespace, label_selector=label_selector) + pods = self.corev1.list_namespaced_pod(namespace=self.namespace, label_selector=label_selector, _request_timeout=(5, 20)) except Exception as _e: tmp_log.error(f"Failed call to list_namespaced_pod with: {_e}") return None # None needs to be treated differently than [] by the caller @@ -384,7 +384,7 @@ def get_jobs_info(self, label_selector): jobs_dict = {} try: - jobs = self.batchv1.list_namespaced_job(namespace=self.namespace, label_selector=label_selector) + jobs = self.batchv1.list_namespaced_job(namespace=self.namespace, label_selector=label_selector, _request_timeout=(5, 20)) for job in jobs.items: name = job.metadata.name @@ -423,7 +423,7 @@ def delete_pods(self, pod_name_list): for pod_name in pod_name_list: rsp = {"name": pod_name} try: - self.corev1.delete_namespaced_pod(name=pod_name, namespace=self.namespace, body=self.deletev1, grace_period_seconds=0) + self.corev1.delete_namespaced_pod(name=pod_name, namespace=self.namespace, body=self.deletev1, grace_period_seconds=0, _request_timeout=(5, 20)) except ApiException as _e: rsp["errMsg"] = "" if _e.status == 404 else _e.reason except Exception as _e: @@ -439,13 +439,15 @@ def delete_job(self, job_name): tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name} job_name={job_name}", method_name="delete_job") tmp_log.debug(f"Going to delete JOB {job_name}") try: - self.batchv1.delete_namespaced_job(name=job_name, namespace=self.namespace, body=self.deletev1, grace_period_seconds=0) + self.batchv1.delete_namespaced_job(name=job_name, namespace=self.namespace, body=self.deletev1, grace_period_seconds=0, _request_timeout=(5, 20)) tmp_log.debug(f"Deleted JOB {job_name}") except Exception as _e: tmp_log.error(f"Failed to delete JOB {job_name} with: {_e}") def delete_config_map(self, config_map_name): - self.corev1.delete_namespaced_config_map(name=config_map_name, namespace=self.namespace, body=self.deletev1, grace_period_seconds=0) + self.corev1.delete_namespaced_config_map( + name=config_map_name, namespace=self.namespace, body=self.deletev1, grace_period_seconds=0, _request_timeout=(5, 20) + ) def set_affinity(self, yaml_content, use_affinity, use_anti_affinity): if not use_affinity and not use_anti_affinity: @@ -512,11 +514,11 @@ def create_or_patch_secret(self, file_list, secret_name): body = client.V1Secret(data=data, metadata=metadata) try: try: - rsp = self.corev1.patch_namespaced_secret(name=secret_name, body=body, namespace=self.namespace) + rsp = self.corev1.patch_namespaced_secret(name=secret_name, body=body, namespace=self.namespace, _request_timeout=(5, 20)) tmp_log.debug("Patched secret") except ApiException as e: tmp_log.debug(f"Exception when patching secret: {e} . Try to create secret instead...") - rsp = self.corev1.create_namespaced_secret(body=body, namespace=self.namespace) + rsp = self.corev1.create_namespaced_secret(body=body, namespace=self.namespace, _request_timeout=(5, 20)) tmp_log.debug("Created secret") except Exception as e: tmp_log.error(f"Exception when patching or creating secret: {e}.") @@ -550,7 +552,7 @@ def create_configmap(self, work_spec): config_map = client.V1ConfigMap(api_version="v1", kind="ConfigMap", data=data, metadata=metadata) # create the configmap object in K8s - api_response = self.corev1.create_namespaced_config_map(namespace=self.namespace, body=config_map) + api_response = self.corev1.create_namespaced_config_map(namespace=self.namespace, body=config_map, _request_timeout=(5, 20)) tmp_log.debug(f"Created configmap for worker id: {worker_id}") return True @@ -578,11 +580,11 @@ def create_or_patch_configmap_starter(self): config_map = client.V1ConfigMap(api_version="v1", kind="ConfigMap", data=data, metadata=metadata) try: - api_response = self.corev1.patch_namespaced_config_map(name=name, body=config_map, namespace=self.namespace) + api_response = self.corev1.patch_namespaced_config_map(name=name, body=config_map, namespace=self.namespace, _request_timeout=(5, 20)) tmp_log.debug("Patched pilots-starter config_map") except ApiException as e: tmp_log.debug(f"Exception when patching pilots-starter config_map: {e} . Try to create it instead...") - api_response = self.corev1.create_namespaced_config_map(namespace=self.namespace, body=config_map) + api_response = self.corev1.create_namespaced_config_map(namespace=self.namespace, body=config_map, _request_timeout=(5, 20)) tmp_log.debug("Created pilots-starter config_map") return True @@ -593,7 +595,7 @@ def create_or_patch_configmap_starter(self): def get_pod_logs(self, pod_name, previous=False): tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="get_pod_logs") try: - rsp = self.corev1.read_namespaced_pod_log(name=pod_name, namespace=self.namespace, previous=previous) + rsp = self.corev1.read_namespaced_pod_log(name=pod_name, namespace=self.namespace, previous=previous, _request_timeout=(5, 20)) tmp_log.debug(f"Log file retrieved for {pod_name}") except Exception as e: tmp_log.debug(f"Exception when getting logs for pod {pod_name} : {e}. Skipped")