Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions pandaharvester/harvestermisc/k8s_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ def create_job_from_yaml(

tmp_log.debug(f"creating job {yaml_content}")

rsp = self.batchv1.create_namespaced_job(body=yaml_content, namespace=self.namespace)
rsp = self.batchv1.create_namespaced_job(body=yaml_content, namespace=self.namespace, _request_timeout=(5, 20))
Copy link

Copilot AI Jan 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardcoded timeout values (5, 20) should be made configurable. Consider defining these as class constants or configuration parameters to allow adjustment without code changes. This is especially important for production environments where network conditions may vary.

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Jan 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout tuple format (5, 20) represents (connect_timeout, read_timeout) in the Kubernetes Python client. However, the 5-second connect timeout may be too aggressive for some network environments, especially when the Kubernetes API server is under heavy load. Consider increasing the connect timeout to at least 10 seconds or making it configurable based on deployment requirements.

Suggested change
rsp = self.batchv1.create_namespaced_job(body=yaml_content, namespace=self.namespace, _request_timeout=(5, 20))
rsp = self.batchv1.create_namespaced_job(body=yaml_content, namespace=self.namespace, _request_timeout=(10, 20))

Copilot uses AI. Check for mistakes.
return rsp, yaml_content

def generate_ls_from_wsl(self, workspec_list=[]):
Expand Down Expand Up @@ -339,7 +339,7 @@ def get_pods_info(self, label_selector):
tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="get_pods_info")

try:
pods = self.corev1.list_namespaced_pod(namespace=self.namespace, label_selector=label_selector)
pods = self.corev1.list_namespaced_pod(namespace=self.namespace, label_selector=label_selector, _request_timeout=(5, 20))
except Exception as _e:
tmp_log.error(f"Failed call to list_namespaced_pod with: {_e}")
return None # None needs to be treated differently than [] by the caller
Expand Down Expand Up @@ -384,7 +384,7 @@ def get_jobs_info(self, label_selector):
jobs_dict = {}

try:
jobs = self.batchv1.list_namespaced_job(namespace=self.namespace, label_selector=label_selector)
jobs = self.batchv1.list_namespaced_job(namespace=self.namespace, label_selector=label_selector, _request_timeout=(5, 20))

for job in jobs.items:
name = job.metadata.name
Expand Down Expand Up @@ -423,7 +423,7 @@ def delete_pods(self, pod_name_list):
for pod_name in pod_name_list:
rsp = {"name": pod_name}
try:
self.corev1.delete_namespaced_pod(name=pod_name, namespace=self.namespace, body=self.deletev1, grace_period_seconds=0)
self.corev1.delete_namespaced_pod(name=pod_name, namespace=self.namespace, body=self.deletev1, grace_period_seconds=0, _request_timeout=(5, 20))
except ApiException as _e:
rsp["errMsg"] = "" if _e.status == 404 else _e.reason
except Exception as _e:
Expand All @@ -439,13 +439,15 @@ def delete_job(self, job_name):
tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name} job_name={job_name}", method_name="delete_job")
tmp_log.debug(f"Going to delete JOB {job_name}")
try:
self.batchv1.delete_namespaced_job(name=job_name, namespace=self.namespace, body=self.deletev1, grace_period_seconds=0)
self.batchv1.delete_namespaced_job(name=job_name, namespace=self.namespace, body=self.deletev1, grace_period_seconds=0, _request_timeout=(5, 20))
tmp_log.debug(f"Deleted JOB {job_name}")
except Exception as _e:
tmp_log.error(f"Failed to delete JOB {job_name} with: {_e}")

def delete_config_map(self, config_map_name):
self.corev1.delete_namespaced_config_map(name=config_map_name, namespace=self.namespace, body=self.deletev1, grace_period_seconds=0)
self.corev1.delete_namespaced_config_map(
name=config_map_name, namespace=self.namespace, body=self.deletev1, grace_period_seconds=0, _request_timeout=(5, 20)
)

def set_affinity(self, yaml_content, use_affinity, use_anti_affinity):
if not use_affinity and not use_anti_affinity:
Expand Down Expand Up @@ -512,11 +514,11 @@ def create_or_patch_secret(self, file_list, secret_name):
body = client.V1Secret(data=data, metadata=metadata)
try:
try:
rsp = self.corev1.patch_namespaced_secret(name=secret_name, body=body, namespace=self.namespace)
rsp = self.corev1.patch_namespaced_secret(name=secret_name, body=body, namespace=self.namespace, _request_timeout=(5, 20))
tmp_log.debug("Patched secret")
except ApiException as e:
tmp_log.debug(f"Exception when patching secret: {e} . Try to create secret instead...")
rsp = self.corev1.create_namespaced_secret(body=body, namespace=self.namespace)
rsp = self.corev1.create_namespaced_secret(body=body, namespace=self.namespace, _request_timeout=(5, 20))
tmp_log.debug("Created secret")
except Exception as e:
tmp_log.error(f"Exception when patching or creating secret: {e}.")
Expand Down Expand Up @@ -550,7 +552,7 @@ def create_configmap(self, work_spec):
config_map = client.V1ConfigMap(api_version="v1", kind="ConfigMap", data=data, metadata=metadata)

# create the configmap object in K8s
api_response = self.corev1.create_namespaced_config_map(namespace=self.namespace, body=config_map)
api_response = self.corev1.create_namespaced_config_map(namespace=self.namespace, body=config_map, _request_timeout=(5, 20))
Copy link

Copilot AI Jan 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable api_response is not used.

Copilot uses AI. Check for mistakes.
tmp_log.debug(f"Created configmap for worker id: {worker_id}")
return True

Expand Down Expand Up @@ -578,11 +580,11 @@ def create_or_patch_configmap_starter(self):
config_map = client.V1ConfigMap(api_version="v1", kind="ConfigMap", data=data, metadata=metadata)

try:
api_response = self.corev1.patch_namespaced_config_map(name=name, body=config_map, namespace=self.namespace)
api_response = self.corev1.patch_namespaced_config_map(name=name, body=config_map, namespace=self.namespace, _request_timeout=(5, 20))
Copy link

Copilot AI Jan 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assignment to 'api_response' is unnecessary as it is redefined before this value is used.

Copilot uses AI. Check for mistakes.
tmp_log.debug("Patched pilots-starter config_map")
except ApiException as e:
tmp_log.debug(f"Exception when patching pilots-starter config_map: {e} . Try to create it instead...")
api_response = self.corev1.create_namespaced_config_map(namespace=self.namespace, body=config_map)
api_response = self.corev1.create_namespaced_config_map(namespace=self.namespace, body=config_map, _request_timeout=(5, 20))
Copy link

Copilot AI Jan 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable api_response is not used.

Copilot uses AI. Check for mistakes.
tmp_log.debug("Created pilots-starter config_map")
return True

Expand All @@ -593,7 +595,7 @@ def create_or_patch_configmap_starter(self):
def get_pod_logs(self, pod_name, previous=False):
tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="get_pod_logs")
try:
rsp = self.corev1.read_namespaced_pod_log(name=pod_name, namespace=self.namespace, previous=previous)
rsp = self.corev1.read_namespaced_pod_log(name=pod_name, namespace=self.namespace, previous=previous, _request_timeout=(5, 20))
Copy link

Copilot AI Jan 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The read_namespaced_pod_log operation can potentially return large amounts of data when retrieving pod logs. The 20-second read timeout may be insufficient for pods with extensive logs. Consider using a longer timeout for this specific operation or implementing streaming/pagination for log retrieval.

Suggested change
rsp = self.corev1.read_namespaced_pod_log(name=pod_name, namespace=self.namespace, previous=previous, _request_timeout=(5, 20))
rsp = self.corev1.read_namespaced_pod_log(
name=pod_name,
namespace=self.namespace,
previous=previous,
_request_timeout=(5, 300),
)

Copilot uses AI. Check for mistakes.
tmp_log.debug(f"Log file retrieved for {pod_name}")
except Exception as e:
tmp_log.debug(f"Exception when getting logs for pod {pod_name} : {e}. Skipped")
Expand Down
Loading