From 5b73e6105188f7a3f0c628ce7c7936c767313c83 Mon Sep 17 00:00:00 2001 From: Andrei Tsaregorodtsev Date: Tue, 11 Oct 2022 13:27:30 +0200 Subject: [PATCH 1/7] feat: pass pilot stamp to the pilot environment --- .../Resources/Computing/HTCondorCEComputingElement.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py index dbe4f0ebbae..0bba0bc8d3d 100644 --- a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py +++ b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py @@ -159,13 +159,14 @@ def __init__(self, ceUniqueID): self.remoteScheddOptions = "" ############################################################################# - def __writeSub(self, executable, nJobs, location, processors): + def __writeSub(self, executable, nJobs, location, processors, pilotStamps): """Create the Sub File for submission. :param str executable: name of the script to execute :param int nJobs: number of desired jobs :param str location: directory that should contain the result of the jobs :param int processors: number of CPU cores to allocate + :param list pilotStamps: list of pilot stamps (strings) """ self.log.debug("Working directory: %s " % self.workingDirectory) @@ -204,7 +205,7 @@ def __writeSub(self, executable, nJobs, location, processors): output = $(Cluster).$(Process).out error = $(Cluster).$(Process).err log = $(Cluster).$(Process).log -environment = "HTCONDOR_JOBID=$(Cluster).$(Process)" +environment = "HTCONDOR_JOBID=$(Cluster).$(Process) DIRAC_PILOT_STAMP=$(stamp)" initialdir = %(initialDir)s grid_resource = condor %(ceName)s %(ceName)s:9619 transfer_output_files = "" @@ -215,7 +216,7 @@ def __writeSub(self, executable, nJobs, location, processors): %(extraString)s -Queue %(nJobs)s +Queue stamp in %(pilotStampList)s """ % dict( executable=executable, @@ -226,6 +227,7 @@ def __writeSub(self, executable, nJobs, location, processors): initialDir=os.path.join(self.workingDirectory, location), localScheddOptions=localScheddOptions, targetUniverse=targetUniverse, + pilotStampList=','.join(pilotStamps) ) subFile.write(sub) subFile.close() @@ -268,7 +270,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1): # We randomize the location of the pilot output and log, because there are just too many of them location = logDir(self.ceName, commonJobStampPart) nProcessors = self.ceParameters.get("NumberOfProcessors", 1) - subName = self.__writeSub(executableFile, numberOfJobs, location, nProcessors) + subName = self.__writeSub(executableFile, numberOfJobs, location, nProcessors, jobStamps) cmd = ["condor_submit", "-terse", subName] # the options for submit to remote are different than the other remoteScheddOptions From 5e82408bd19dd73ca98348373092c42b614b7a60 Mon Sep 17 00:00:00 2001 From: Andrei Tsaregorodtsev Date: Tue, 11 Oct 2022 13:40:54 +0200 Subject: [PATCH 2/7] fix: double string quotes --- src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py index 0bba0bc8d3d..a058c1e0b37 100644 --- a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py +++ b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py @@ -227,7 +227,7 @@ def __writeSub(self, executable, nJobs, location, processors, pilotStamps): initialDir=os.path.join(self.workingDirectory, location), localScheddOptions=localScheddOptions, targetUniverse=targetUniverse, - pilotStampList=','.join(pilotStamps) + pilotStampList=",".join(pilotStamps) ) subFile.write(sub) subFile.close() From 9376bd46126e793bd51f681f98f36484a2267a1e Mon Sep 17 00:00:00 2001 From: Andrei Tsaregorodtsev Date: Tue, 11 Oct 2022 13:47:43 +0200 Subject: [PATCH 3/7] feat: pass pilot stamp to the pilot environment --- .../Computing/ARCComputingElement.py | 41 ++++++++++--------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/src/DIRAC/Resources/Computing/ARCComputingElement.py b/src/DIRAC/Resources/Computing/ARCComputingElement.py index 26f38aa7099..594147605d3 100755 --- a/src/DIRAC/Resources/Computing/ARCComputingElement.py +++ b/src/DIRAC/Resources/Computing/ARCComputingElement.py @@ -259,26 +259,27 @@ def _writeXRSL(self, executableFile, inputs=None, outputs=None, executables=None xrslOutputs += '(%s "")' % (outputFile) xrsl = """ -&(executable="%(executable)s") -(inputFiles=(%(executable)s "%(executableFile)s") %(xrslInputAdditions)s) -(stdout="%(diracStamp)s.out") -(stderr="%(diracStamp)s.err") -(outputFiles=%(xrslOutputFiles)s) -(queue=%(queue)s) -%(xrslMPAdditions)s -%(xrslExecutables)s -%(xrslExtraString)s - """ % { - "executableFile": executableFile, - "executable": os.path.basename(executableFile), - "xrslInputAdditions": xrslInputs, - "diracStamp": diracStamp, - "queue": self.arcQueue, - "xrslOutputFiles": xrslOutputs, - "xrslMPAdditions": xrslMPAdditions, - "xrslExecutables": xrslExecutables, - "xrslExtraString": self.xrslExtraString, - } +&(executable="{executable}") +(inputFiles=({executable} "{executableFile}") {xrslInputAdditions}) +(stdout="{diracStamp}.out") +(stderr="{diracStamp}.err") +(environment="(DIRAC_PILOT_STAMP {diracStamp}")) +(outputFiles={xrslOutputFiles}) +(queue={queue}) +{xrslMPAdditions} +{xrslExecutables} +{xrslExtraString} + """.format( + executableFile=executableFile, + executable=os.path.basename(executableFile), + xrslInputAdditions=xrslInputs, + diracStamp=diracStamp, + queue=self.arcQueue, + xrslOutputFiles=xrslOutputs, + xrslMPAdditions=xrslMPAdditions, + xrslExecutables=xrslExecutables, + xrslExtraString=self.xrslExtraString, + ) return xrsl, diracStamp From b2612e999f1a675161425b4286b91688fcf45e10 Mon Sep 17 00:00:00 2001 From: martynia Date: Tue, 11 Oct 2022 17:26:56 +0100 Subject: [PATCH 4/7] modified wrapper to use pilot stamp --- .../WorkloadManagementSystem/Utilities/PilotWrapper.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py b/src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py index 3942c563e75..a002c6b57a6 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py @@ -333,8 +333,10 @@ def pilotWrapperScript( # now finally launching the pilot script (which should be called dirac-pilot.py) # get the setup name an -z, if present to get remote logging in place opt = "%s" -# generate pilot UUID -UUID = str(uuid1()) +# try to get a pilot stamp from the environment: +UUID = os.environ.get('DIRAC_PILOT_STAMP') +if UUID is None: + UUID = str(uuid1()) opt = opt + " --pilotUUID " + UUID args = opt.split() From 7f703b6fd2b65a067fe2fd209740072990e36b28 Mon Sep 17 00:00:00 2001 From: martynia Date: Tue, 11 Oct 2022 19:32:31 +0100 Subject: [PATCH 5/7] add pilot stamp 8 digit HEX pattern as a valid pilot ID --- .../Service/FileCacheLoggingPlugin.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py b/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py index cf0314d67da..da579e37562 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py @@ -21,6 +21,8 @@ def __init__(self): """ # UUID pattern self.pattern = re.compile(r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$") + # pilot stamp pattern + self.stamppattern = re.compile(r"^[0-9A-F]{8}$") self.meta = {} logPath = os.path.join(os.getcwd(), "pilotlogs") self.meta["LogPath"] = logPath @@ -35,7 +37,7 @@ def sendMessage(self, message, pilotUUID, vo): :param message: text to log in json format :type message: str - :param pilotUUID: pilot id. + :param pilotUUID: pilot id. Optimally it should be a pilot stamp if available, otherwise a generated UUID. :type pilotUUID: a valid pilot UUID :param vo: VO name of a pilot which sent the message. :type vo: str @@ -43,9 +45,14 @@ def sendMessage(self, message, pilotUUID, vo): :rtype: dict """ - res = self.pattern.match(pilotUUID) + res = self.stamppattern.match(pilotUUID) if not res: - sLog.error("Pilot UUID does not match the UUID pattern. ", f"UUID: {pilotUUID}, pattern {self.pattern}") + res = self.pattern.match(pilotUUID) + if not res: + sLog.error( + "Pilot UUID does not match the UUID or stamp pattern. ", + f"UUID: {pilotUUID}, pilot stamp pattern {self.stamppattern}, UUID pattern {self.pattern}", + ) return S_ERROR("Pilot UUID is invalid") dirname = os.path.join(self.meta["LogPath"], vo) try: From 9a9645f8518a017662b1752b8aa0d2b5856011ae Mon Sep 17 00:00:00 2001 From: martynia Date: Sun, 16 Oct 2022 18:18:38 +0100 Subject: [PATCH 6/7] fix: add pilot stamp 8 digit HEX pattern as a valid pilot ID for log fineliser --- .../Service/FileCacheLoggingPlugin.py | 33 ++++++++++++++----- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py b/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py index da579e37562..8f78599b96d 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py @@ -45,15 +45,10 @@ def sendMessage(self, message, pilotUUID, vo): :rtype: dict """ - res = self.stamppattern.match(pilotUUID) + res = self._verifyUUIDPattern(pilotUUID) if not res: - res = self.pattern.match(pilotUUID) - if not res: - sLog.error( - "Pilot UUID does not match the UUID or stamp pattern. ", - f"UUID: {pilotUUID}, pilot stamp pattern {self.stamppattern}, UUID pattern {self.pattern}", - ) return S_ERROR("Pilot UUID is invalid") + dirname = os.path.join(self.meta["LogPath"], vo) try: if not os.path.exists(dirname): @@ -90,9 +85,9 @@ def finaliseLogs(self, payload, logfile, vo): """ returnCode = json.loads(payload).get("retCode", 0) - res = self.pattern.match(logfile) + + res = self._verifyUUIDPattern(logfile) if not res: - sLog.error("Pilot UUID does not match the UUID pattern. ", f"UUID: {logfile}, pattern {self.pattern}") return S_ERROR("Pilot UUID is invalid") try: @@ -114,3 +109,23 @@ def getMeta(self): if "LogPath" in self.meta: return S_OK(self.meta) return S_ERROR("No Pilot logging directory defined") + + def _verifyUUIDPattern(self, logfile): + """ + Verify if the name of the log file matches the required pattern. + + :param name: file name + :type name: str + :return: re.match result + :rtype: re.Match object or None. + """ + + res = self.stamppattern.match(logfile) + if not res: + res = self.pattern.match(logfile) + if not res: + sLog.error( + "Pilot UUID does not match the UUID or stamp pattern. ", + f"UUID: {logfile}, pilot stamp pattern {self.stamppattern}, UUID pattern {self.pattern}", + ) + return res From 4dbdb4a48824c9f2e073f41439135dfdd08fc046 Mon Sep 17 00:00:00 2001 From: Alexander Richards Date: Wed, 23 Nov 2022 16:54:40 +0000 Subject: [PATCH 7/7] fix: ARC CE environment fix for pilot ID --- src/DIRAC/Resources/Computing/ARCComputingElement.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DIRAC/Resources/Computing/ARCComputingElement.py b/src/DIRAC/Resources/Computing/ARCComputingElement.py index 594147605d3..55ca03a2cd2 100755 --- a/src/DIRAC/Resources/Computing/ARCComputingElement.py +++ b/src/DIRAC/Resources/Computing/ARCComputingElement.py @@ -263,7 +263,7 @@ def _writeXRSL(self, executableFile, inputs=None, outputs=None, executables=None (inputFiles=({executable} "{executableFile}") {xrslInputAdditions}) (stdout="{diracStamp}.out") (stderr="{diracStamp}.err") -(environment="(DIRAC_PILOT_STAMP {diracStamp}")) +(environment=("DIRAC_PILOT_STAMP" "{diracStamp}")) (outputFiles={xrslOutputFiles}) (queue={queue}) {xrslMPAdditions}