diff --git a/src/DIRAC/Resources/Computing/ARCComputingElement.py b/src/DIRAC/Resources/Computing/ARCComputingElement.py index 26f38aa7099..55ca03a2cd2 100755 --- a/src/DIRAC/Resources/Computing/ARCComputingElement.py +++ b/src/DIRAC/Resources/Computing/ARCComputingElement.py @@ -259,26 +259,27 @@ def _writeXRSL(self, executableFile, inputs=None, outputs=None, executables=None xrslOutputs += '(%s "")' % (outputFile) xrsl = """ -&(executable="%(executable)s") -(inputFiles=(%(executable)s "%(executableFile)s") %(xrslInputAdditions)s) -(stdout="%(diracStamp)s.out") -(stderr="%(diracStamp)s.err") -(outputFiles=%(xrslOutputFiles)s) -(queue=%(queue)s) -%(xrslMPAdditions)s -%(xrslExecutables)s -%(xrslExtraString)s - """ % { - "executableFile": executableFile, - "executable": os.path.basename(executableFile), - "xrslInputAdditions": xrslInputs, - "diracStamp": diracStamp, - "queue": self.arcQueue, - "xrslOutputFiles": xrslOutputs, - "xrslMPAdditions": xrslMPAdditions, - "xrslExecutables": xrslExecutables, - "xrslExtraString": self.xrslExtraString, - } +&(executable="{executable}") +(inputFiles=({executable} "{executableFile}") {xrslInputAdditions}) +(stdout="{diracStamp}.out") +(stderr="{diracStamp}.err") +(environment=("DIRAC_PILOT_STAMP" "{diracStamp}")) +(outputFiles={xrslOutputFiles}) +(queue={queue}) +{xrslMPAdditions} +{xrslExecutables} +{xrslExtraString} + """.format( + executableFile=executableFile, + executable=os.path.basename(executableFile), + xrslInputAdditions=xrslInputs, + diracStamp=diracStamp, + queue=self.arcQueue, + xrslOutputFiles=xrslOutputs, + xrslMPAdditions=xrslMPAdditions, + xrslExecutables=xrslExecutables, + xrslExtraString=self.xrslExtraString, + ) return xrsl, diracStamp diff --git a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py index dbe4f0ebbae..a058c1e0b37 100644 --- a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py +++ b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py @@ -159,13 +159,14 @@ def __init__(self, ceUniqueID): self.remoteScheddOptions = "" ############################################################################# - def __writeSub(self, executable, nJobs, location, processors): + def __writeSub(self, executable, nJobs, location, processors, pilotStamps): """Create the Sub File for submission. :param str executable: name of the script to execute :param int nJobs: number of desired jobs :param str location: directory that should contain the result of the jobs :param int processors: number of CPU cores to allocate + :param list pilotStamps: list of pilot stamps (strings) """ self.log.debug("Working directory: %s " % self.workingDirectory) @@ -204,7 +205,7 @@ def __writeSub(self, executable, nJobs, location, processors): output = $(Cluster).$(Process).out error = $(Cluster).$(Process).err log = $(Cluster).$(Process).log -environment = "HTCONDOR_JOBID=$(Cluster).$(Process)" +environment = "HTCONDOR_JOBID=$(Cluster).$(Process) DIRAC_PILOT_STAMP=$(stamp)" initialdir = %(initialDir)s grid_resource = condor %(ceName)s %(ceName)s:9619 transfer_output_files = "" @@ -215,7 +216,7 @@ def __writeSub(self, executable, nJobs, location, processors): %(extraString)s -Queue %(nJobs)s +Queue stamp in %(pilotStampList)s """ % dict( executable=executable, @@ -226,6 +227,7 @@ def __writeSub(self, executable, nJobs, location, processors): initialDir=os.path.join(self.workingDirectory, location), localScheddOptions=localScheddOptions, targetUniverse=targetUniverse, + pilotStampList=",".join(pilotStamps) ) subFile.write(sub) subFile.close() @@ -268,7 +270,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1): # We randomize the location of the pilot output and log, because there are just too many of them location = logDir(self.ceName, commonJobStampPart) nProcessors = self.ceParameters.get("NumberOfProcessors", 1) - subName = self.__writeSub(executableFile, numberOfJobs, location, nProcessors) + subName = self.__writeSub(executableFile, numberOfJobs, location, nProcessors, jobStamps) cmd = ["condor_submit", "-terse", subName] # the options for submit to remote are different than the other remoteScheddOptions diff --git a/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py b/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py index cf0314d67da..8f78599b96d 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/FileCacheLoggingPlugin.py @@ -21,6 +21,8 @@ def __init__(self): """ # UUID pattern self.pattern = re.compile(r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$") + # pilot stamp pattern + self.stamppattern = re.compile(r"^[0-9A-F]{8}$") self.meta = {} logPath = os.path.join(os.getcwd(), "pilotlogs") self.meta["LogPath"] = logPath @@ -35,7 +37,7 @@ def sendMessage(self, message, pilotUUID, vo): :param message: text to log in json format :type message: str - :param pilotUUID: pilot id. + :param pilotUUID: pilot id. Optimally it should be a pilot stamp if available, otherwise a generated UUID. :type pilotUUID: a valid pilot UUID :param vo: VO name of a pilot which sent the message. :type vo: str @@ -43,10 +45,10 @@ def sendMessage(self, message, pilotUUID, vo): :rtype: dict """ - res = self.pattern.match(pilotUUID) + res = self._verifyUUIDPattern(pilotUUID) if not res: - sLog.error("Pilot UUID does not match the UUID pattern. ", f"UUID: {pilotUUID}, pattern {self.pattern}") return S_ERROR("Pilot UUID is invalid") + dirname = os.path.join(self.meta["LogPath"], vo) try: if not os.path.exists(dirname): @@ -83,9 +85,9 @@ def finaliseLogs(self, payload, logfile, vo): """ returnCode = json.loads(payload).get("retCode", 0) - res = self.pattern.match(logfile) + + res = self._verifyUUIDPattern(logfile) if not res: - sLog.error("Pilot UUID does not match the UUID pattern. ", f"UUID: {logfile}, pattern {self.pattern}") return S_ERROR("Pilot UUID is invalid") try: @@ -107,3 +109,23 @@ def getMeta(self): if "LogPath" in self.meta: return S_OK(self.meta) return S_ERROR("No Pilot logging directory defined") + + def _verifyUUIDPattern(self, logfile): + """ + Verify if the name of the log file matches the required pattern. + + :param name: file name + :type name: str + :return: re.match result + :rtype: re.Match object or None. + """ + + res = self.stamppattern.match(logfile) + if not res: + res = self.pattern.match(logfile) + if not res: + sLog.error( + "Pilot UUID does not match the UUID or stamp pattern. ", + f"UUID: {logfile}, pilot stamp pattern {self.stamppattern}, UUID pattern {self.pattern}", + ) + return res diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py b/src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py index 3942c563e75..a002c6b57a6 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py @@ -333,8 +333,10 @@ def pilotWrapperScript( # now finally launching the pilot script (which should be called dirac-pilot.py) # get the setup name an -z, if present to get remote logging in place opt = "%s" -# generate pilot UUID -UUID = str(uuid1()) +# try to get a pilot stamp from the environment: +UUID = os.environ.get('DIRAC_PILOT_STAMP') +if UUID is None: + UUID = str(uuid1()) opt = opt + " --pilotUUID " + UUID args = opt.split()