Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 21 additions & 20 deletions src/DIRAC/Resources/Computing/ARCComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,26 +259,27 @@ def _writeXRSL(self, executableFile, inputs=None, outputs=None, executables=None
xrslOutputs += '(%s "")' % (outputFile)

xrsl = """
&(executable="%(executable)s")
(inputFiles=(%(executable)s "%(executableFile)s") %(xrslInputAdditions)s)
(stdout="%(diracStamp)s.out")
(stderr="%(diracStamp)s.err")
(outputFiles=%(xrslOutputFiles)s)
(queue=%(queue)s)
%(xrslMPAdditions)s
%(xrslExecutables)s
%(xrslExtraString)s
""" % {
"executableFile": executableFile,
"executable": os.path.basename(executableFile),
"xrslInputAdditions": xrslInputs,
"diracStamp": diracStamp,
"queue": self.arcQueue,
"xrslOutputFiles": xrslOutputs,
"xrslMPAdditions": xrslMPAdditions,
"xrslExecutables": xrslExecutables,
"xrslExtraString": self.xrslExtraString,
}
&(executable="{executable}")
(inputFiles=({executable} "{executableFile}") {xrslInputAdditions})
(stdout="{diracStamp}.out")
(stderr="{diracStamp}.err")
(environment=("DIRAC_PILOT_STAMP" "{diracStamp}"))
(outputFiles={xrslOutputFiles})
(queue={queue})
{xrslMPAdditions}
{xrslExecutables}
{xrslExtraString}
""".format(
executableFile=executableFile,
executable=os.path.basename(executableFile),
xrslInputAdditions=xrslInputs,
diracStamp=diracStamp,
queue=self.arcQueue,
xrslOutputFiles=xrslOutputs,
xrslMPAdditions=xrslMPAdditions,
xrslExecutables=xrslExecutables,
xrslExtraString=self.xrslExtraString,
)

return xrsl, diracStamp

Expand Down
10 changes: 6 additions & 4 deletions src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,14 @@ def __init__(self, ceUniqueID):
self.remoteScheddOptions = ""

#############################################################################
def __writeSub(self, executable, nJobs, location, processors):
def __writeSub(self, executable, nJobs, location, processors, pilotStamps):
"""Create the Sub File for submission.

:param str executable: name of the script to execute
:param int nJobs: number of desired jobs
:param str location: directory that should contain the result of the jobs
:param int processors: number of CPU cores to allocate
:param list pilotStamps: list of pilot stamps (strings)
"""

self.log.debug("Working directory: %s " % self.workingDirectory)
Expand Down Expand Up @@ -204,7 +205,7 @@ def __writeSub(self, executable, nJobs, location, processors):
output = $(Cluster).$(Process).out
error = $(Cluster).$(Process).err
log = $(Cluster).$(Process).log
environment = "HTCONDOR_JOBID=$(Cluster).$(Process)"
environment = "HTCONDOR_JOBID=$(Cluster).$(Process) DIRAC_PILOT_STAMP=$(stamp)"
initialdir = %(initialDir)s
grid_resource = condor %(ceName)s %(ceName)s:9619
transfer_output_files = ""
Expand All @@ -215,7 +216,7 @@ def __writeSub(self, executable, nJobs, location, processors):

%(extraString)s

Queue %(nJobs)s
Queue stamp in %(pilotStampList)s

""" % dict(
executable=executable,
Expand All @@ -226,6 +227,7 @@ def __writeSub(self, executable, nJobs, location, processors):
initialDir=os.path.join(self.workingDirectory, location),
localScheddOptions=localScheddOptions,
targetUniverse=targetUniverse,
pilotStampList=",".join(pilotStamps)
)
subFile.write(sub)
subFile.close()
Expand Down Expand Up @@ -268,7 +270,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
# We randomize the location of the pilot output and log, because there are just too many of them
location = logDir(self.ceName, commonJobStampPart)
nProcessors = self.ceParameters.get("NumberOfProcessors", 1)
subName = self.__writeSub(executableFile, numberOfJobs, location, nProcessors)
subName = self.__writeSub(executableFile, numberOfJobs, location, nProcessors, jobStamps)

cmd = ["condor_submit", "-terse", subName]
# the options for submit to remote are different than the other remoteScheddOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ def __init__(self):
"""
# UUID pattern
self.pattern = re.compile(r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$")
# pilot stamp pattern
self.stamppattern = re.compile(r"^[0-9A-F]{8}$")
self.meta = {}
logPath = os.path.join(os.getcwd(), "pilotlogs")
self.meta["LogPath"] = logPath
Expand All @@ -35,18 +37,18 @@ def sendMessage(self, message, pilotUUID, vo):

:param message: text to log in json format
:type message: str
:param pilotUUID: pilot id.
:param pilotUUID: pilot id. Optimally it should be a pilot stamp if available, otherwise a generated UUID.
:type pilotUUID: a valid pilot UUID
:param vo: VO name of a pilot which sent the message.
:type vo: str
:return: S_OK or S_ERROR
:rtype: dict
"""

res = self.pattern.match(pilotUUID)
res = self._verifyUUIDPattern(pilotUUID)
if not res:
sLog.error("Pilot UUID does not match the UUID pattern. ", f"UUID: {pilotUUID}, pattern {self.pattern}")
return S_ERROR("Pilot UUID is invalid")

dirname = os.path.join(self.meta["LogPath"], vo)
try:
if not os.path.exists(dirname):
Expand Down Expand Up @@ -83,9 +85,9 @@ def finaliseLogs(self, payload, logfile, vo):
"""

returnCode = json.loads(payload).get("retCode", 0)
res = self.pattern.match(logfile)

res = self._verifyUUIDPattern(logfile)
if not res:
sLog.error("Pilot UUID does not match the UUID pattern. ", f"UUID: {logfile}, pattern {self.pattern}")
return S_ERROR("Pilot UUID is invalid")

try:
Expand All @@ -107,3 +109,23 @@ def getMeta(self):
if "LogPath" in self.meta:
return S_OK(self.meta)
return S_ERROR("No Pilot logging directory defined")

def _verifyUUIDPattern(self, logfile):
"""
Verify if the name of the log file matches the required pattern.

:param name: file name
:type name: str
:return: re.match result
:rtype: re.Match object or None.
"""

res = self.stamppattern.match(logfile)
if not res:
res = self.pattern.match(logfile)
if not res:
sLog.error(
"Pilot UUID does not match the UUID or stamp pattern. ",
f"UUID: {logfile}, pilot stamp pattern {self.stamppattern}, UUID pattern {self.pattern}",
)
return res
6 changes: 4 additions & 2 deletions src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,10 @@ def pilotWrapperScript(
# now finally launching the pilot script (which should be called dirac-pilot.py)
# get the setup name an -z, if present to get remote logging in place
opt = "%s"
# generate pilot UUID
UUID = str(uuid1())
# try to get a pilot stamp from the environment:
UUID = os.environ.get('DIRAC_PILOT_STAMP')
if UUID is None:
UUID = str(uuid1())
opt = opt + " --pilotUUID " + UUID

args = opt.split()
Expand Down