Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pandaharvester/harvestercore/worker_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Integer values for errors must be not less than 1000

import re
from typing import Optional

######## Error Code Message Pattern Maps (CMPMs) ########

Expand Down Expand Up @@ -172,7 +173,7 @@ def __init__(self, code_pattern_map: dict):
self._code_pattern_map = code_pattern_map.copy()
self._pattern_code_map = {v: k for k, v in self._code_pattern_map.items()}

def get_error_code(self, message: str) -> int | None:
def get_error_code(self, message: str) -> Optional[int]:
"""
Get the error code for a given message based on the defined patterns.

Expand Down
8 changes: 6 additions & 2 deletions pandaharvester/harvestersubmitter/submitter_common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import random
from math import log1p
from typing import List, Optional, Tuple

#########################
# Pilot related functions
Expand Down Expand Up @@ -115,8 +116,11 @@ def get_resource_type(resource_type_name, is_unified_queue, all_resource_types,

# Compute weight of each CE according to worker stat, return tuple(dict, total weight score)
def get_ce_weighting(
ce_endpoint_list: list | None = None, worker_ce_all_tuple: tuple | None = None, is_slave_queue: bool = False, fairshare_percent: int = 50
) -> tuple:
ce_endpoint_list: Optional[List] = None,
worker_ce_all_tuple: Optional[Tuple] = None,
is_slave_queue: bool = False,
fairshare_percent: int = 50,
) -> Tuple:
"""
Compute the weighting of each CE based on worker statistics and throughput.

Expand Down
16 changes: 7 additions & 9 deletions pandaharvester/harvestersweeper/superfacility_sweeper.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
import os
import json
import requests
import shutil

from pandaharvester.harvestersweeper.base_sweeper import BaseSweeper
from pandaharvester.harvestercore import core_utils
from pandaharvester.harvestermisc.superfacility_utils import SuperfacilityClient

from pandaharvester.harvestersweeper.base_sweeper import BaseSweeper

baseLogger = core_utils.setup_logger("superfacility_sweeper")


class SuperfacilitySweeper(BaseSweeper):
def __init__(self, **kwargs):
BaseSweeper.__init__(self, **kwargs)
self.cred_dir = kwarg.get("superfacility_cred_dir")
self.cred_dir = kwargs.get("superfacility_cred_dir")
self.sf_client = SuperfacilityClient(self.cred_dir)

def kill_worker(self, workspec):
Expand All @@ -30,7 +28,7 @@ def kill_worker(self, workspec):
tmpLog.error(errStr)
return False, errStr

if data.get('status') == 'success':
if data.get("status") == "success":
tmpLog.info(f"Succeeded to kill workerID={workspec.workerID} batchID={workspec.workerID}")
else:
errStr = f"Failed to cancel job {jobid}: status: {data.get('status')}"
Expand All @@ -44,11 +42,11 @@ def sweep_worker(self, workspec):
if ap and os.path.exists(ap):
try:
shutil.rmtree(ap)
logger.info(f"Removed directory {ap}")
tmpLog.info(f"Removed directory {ap}")
except Exception as e:
err = f"Failed to remove {ap}: {e}"
logger.error(err)
tmpLog.error(err)
return False, err
else:
logger.info("Access point already removed or none provided.")
tmpLog.info("Access point already removed or none provided.")
return True, ""
1 change: 1 addition & 0 deletions pandaharvester/harvestertest/stageInTest_dpb.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sys
import time

from pandaharvester.harvestercore.job_spec import JobSpec
from pandaharvester.harvestercore.plugin_factory import PluginFactory
Expand Down
4 changes: 2 additions & 2 deletions pandaharvester/harvesterzipper/base_zipper.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def ssh_make_one_zip(self, arg_dict):
retCode = p0.returncode
if retCode != 0:
msgStr = f"failed to make tmpargfile remotely with {stdOut}:{stdErr}"
tmp_log.error(msgStr)
self.zip_tmp_log.error(msgStr)
return False, f"failed to zip with {msgStr}"
stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode()
tmpargfile_name = stdOut_str.strip("\n")
Expand Down Expand Up @@ -333,7 +333,7 @@ def ssh_make_one_zip(self, arg_dict):
retCode = p1a.returncode
if retCode != 0:
msgStr = f"failed to delete tmpargfile remotely with {stdOut}:{stdErr}"
tmp_log.error(msgStr)
self.zip_tmp_log.error(msgStr)
del p1a, stdOut, stdErr
gc.collect()
# avoid overwriting
Expand Down
8 changes: 1 addition & 7 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ dependencies = [
'panda-pilot >= 2.7.2.1',
]

requires-python = ">=3.10"
classifiers = [
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
]

[project.optional-dependencies]
kubernetes = ['kubernetes', 'pyyaml']
mysql = ['mysqlclient']
Expand Down Expand Up @@ -83,4 +77,4 @@ aggressive = 3
profile = "black"

[tool.flynt]
line-length = 160
line-length = 160