Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ to provide a uniform view for various resources.

For a detailed description and installation instructions, please check out this project's wiki tab:
https://github.com/HSF/harvester/wiki

----------
2 changes: 1 addition & 1 deletion pandaharvester/commit_timestamp.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
timestamp = "26-07-2019 07:58:24 on master (by tmaeno)"
timestamp = "01-08-2019 09:17:12 on contrib_cern (by fahui)"
135 changes: 68 additions & 67 deletions pandaharvester/harvesterbody/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@ def execute(self):
timeNow = datetime.datetime.utcnow()
if os.path.exists(logFilePath):
# get latest timestamp
tmpLogDuration = None
try:
p = subprocess.Popen(['tail', '-1', logFilePath],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
line = p.communicate()[0]
tmpLastTime = datetime.datetime.strptime(line[:23], "%Y-%m-%d %H:%M:%S,%f")
tmpLastTime = datetime.datetime.strptime(str(line[:23], 'utf-8'), "%Y-%m-%d %H:%M:%S,%f")
except Exception:
tmpLastTime = None
# get processing time for last 1000 queries
Expand All @@ -76,11 +77,11 @@ def execute(self):
logFilePath),
stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
line = p.communicate()[0]
firstTime = datetime.datetime.strptime(line[:23], "%Y-%m-%d %H:%M:%S,%f")
firstTime = datetime.datetime.strptime(str(line[:23], 'utf-8'), "%Y-%m-%d %H:%M:%S,%f")
if tmpLastTime is not None:
tmpLogDuration = tmpLastTime - firstTime
except Exception:
pass
except Exception as e:
mainLog.warning('Skip with error {0}: {1}'.format(e.__class__.__name__, e))
tmpMsg = 'log={0} : last message at {0}. '.format(logFileName, tmpLastTime)
if tmpLogDuration is not None:
tmpMsg += '{0} messages took {1} sec'.format(harvester_config.watcher.nMessages,
Expand All @@ -92,69 +93,69 @@ def execute(self):
if tmpLogDuration is not None and (logDuration is None or logDuration < tmpLogDuration):
logDuration = tmpLogDuration
logDurationName = logFileName
# check timestamp
doAction = False
if harvester_config.watcher.maxStalled > 0 and lastTime is not None and \
timeNow - lastTime > datetime.timedelta(seconds=harvester_config.watcher.maxStalled):
mainLog.warning('last log message is too old in {0}. seems to be stalled'.format(lastTimeName))
doAction = True
elif harvester_config.watcher.maxDuration > 0 and logDuration is not None and \
logDuration.total_seconds() > harvester_config.watcher.maxDuration:
mainLog.warning('slow message generation in {0}. seems to be a performance issue'.format(
logDurationName))
doAction = True
# take action
if doAction:
# email
if 'email' in harvester_config.watcher.actions.split(','):
# get pass phrase
toSkip = False
mailUser = None
mailPass = None
if harvester_config.watcher.mailUser != '' and \
harvester_config.watcher.mailPassword != '':
envName = harvester_config.watcher.passphraseEnv
if envName not in os.environ:
tmpMsg = '{0} is undefined in etc/sysconfig/panda_harvester'.format(envName)
mainLog.error(tmpMsg)
toSkip = True
else:
key = os.environ[envName]
mailUser = core_utils.decrypt_string(key, harvester_config.watcher.mailUser)
mailPass = core_utils.decrypt_string(key, harvester_config.watcher.mailPassword)
if not toSkip:
# message
msgBody = 'harvester {0} '.format(harvester_config.master.harvester_id)
msgBody += 'is having a problem on {0} '.format(socket.getfqdn())
msgBody += 'at {0} (UTC)'.format(datetime.datetime.utcnow())
message = MIMEText(msgBody)
message['Subject'] = "Harvester Alarm"
message['From'] = harvester_config.watcher.mailFrom
message['To'] = harvester_config.watcher.mailTo
# send email
mainLog.debug('sending email to {0}'.format(harvester_config.watcher.mailTo))
server = smtplib.SMTP(harvester_config.watcher.mailServer,
harvester_config.watcher.mailPort)
if hasattr(harvester_config.watcher, 'mailUseSSL') and \
harvester_config.watcher.mailUseSSL is True:
server.starttls()
if mailUser is not None and mailPass is not None:
server.login(mailUser, mailPass)
server.ehlo()
server.sendmail(harvester_config.watcher.mailFrom,
harvester_config.watcher.mailTo.split(','),
message.as_string())
server.quit()
# kill
if 'kill' in harvester_config.watcher.actions.split(','):
# send USR2 fist
mainLog.debug('sending SIGUSR2')
os.killpg(os.getpgrp(), signal.SIGUSR2)
time.sleep(60)
mainLog.debug('sending SIGKILL')
os.killpg(os.getpgrp(), signal.SIGKILL)
else:
mainLog.debug('skip as {0} is missing'.format(logFileName))
# check timestamp
doAction = False
if harvester_config.watcher.maxStalled > 0 and lastTime is not None and \
timeNow - lastTime > datetime.timedelta(seconds=harvester_config.watcher.maxStalled):
mainLog.warning('last log message is too old in {0}. seems to be stalled'.format(lastTimeName))
doAction = True
elif harvester_config.watcher.maxDuration > 0 and logDuration is not None and \
logDuration.total_seconds() > harvester_config.watcher.maxDuration:
mainLog.warning('slow message generation in {0}. seems to be a performance issue'.format(
logDurationName))
doAction = True
# take action
if doAction:
# email
if 'email' in harvester_config.watcher.actions.split(','):
# get pass phrase
toSkip = False
mailUser = None
mailPass = None
if harvester_config.watcher.mailUser != '' and \
harvester_config.watcher.mailPassword != '':
envName = harvester_config.watcher.passphraseEnv
if envName not in os.environ:
tmpMsg = '{0} is undefined in etc/sysconfig/panda_harvester'.format(envName)
mainLog.error(tmpMsg)
toSkip = True
else:
key = os.environ[envName]
mailUser = core_utils.decrypt_string(key, harvester_config.watcher.mailUser)
mailPass = core_utils.decrypt_string(key, harvester_config.watcher.mailPassword)
if not toSkip:
# message
msgBody = 'harvester {0} '.format(harvester_config.master.harvester_id)
msgBody += 'is having a problem on {0} '.format(socket.getfqdn())
msgBody += 'at {0} (UTC)'.format(datetime.datetime.utcnow())
message = MIMEText(msgBody)
message['Subject'] = "Harvester Alarm"
message['From'] = harvester_config.watcher.mailFrom
message['To'] = harvester_config.watcher.mailTo
# send email
mainLog.debug('sending email to {0}'.format(harvester_config.watcher.mailTo))
server = smtplib.SMTP(harvester_config.watcher.mailServer,
harvester_config.watcher.mailPort)
if hasattr(harvester_config.watcher, 'mailUseSSL') and \
harvester_config.watcher.mailUseSSL is True:
server.starttls()
if mailUser is not None and mailPass is not None:
server.login(mailUser, mailPass)
server.ehlo()
server.sendmail(harvester_config.watcher.mailFrom,
harvester_config.watcher.mailTo.split(','),
message.as_string())
server.quit()
# kill
if 'kill' in harvester_config.watcher.actions.split(','):
# send USR2 fist
mainLog.debug('sending SIGUSR2')
os.killpg(os.getpgrp(), signal.SIGUSR2)
time.sleep(60)
mainLog.debug('sending SIGKILL')
os.killpg(os.getpgrp(), signal.SIGKILL)
else:
mainLog.debug('No action needed for {0}'.format(logFileName))
except IOError:
mainLog.debug('skip as locked by another thread or too early to check')
except Exception:
Expand Down
4 changes: 2 additions & 2 deletions pandaharvester/harvesterpreparator/gridftp_preparator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
"preparator": {
"name": "GridFtpPreparator",
"module": "pandaharvester.harvesterpreparator.gridftp_preparator",
# base path for source GridFTP server
# base path for source GridFTP server
"srcBasePath": "gsiftp://dcdum02.aglt2.org/pnfs/aglt2.org/atlasdatadisk/rucio/",
# base path for destination GridFTP server
"dstBasePath": "gsiftp://dcgftp.usatlas.bnl.gov:2811/pnfs/usatlas.bnl.gov/atlasscratchdisk/rucio/",
# base path for local access to the copied files
"localBasePath": "/data/rucio",
# max number of attempts
maxAttempts: 3,
"maxAttempts": 3,
# options for globus-url-copy
"gulOpts": "-cred /tmp/x509_u1234 -sync -sync-level 3 -verify-checksum -v"
}
Expand Down
4 changes: 2 additions & 2 deletions pandaharvester/harvesterstager/gridftp_stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
"stager":{
"name":"GridFtpStager",
"module":"pandaharvester.harvesterstager.gridftp_stager",
"objstoreID_ES":117,
"objstoreID":117,
# base path for local access to the files to be copied
"srcOldBasePath":"/tmp/workdirs",
# base path for access through source GridFTP server to the files to be copied
"srcNewBasePath":"gsiftp://dcdum02.aglt2.org/pnfs/aglt2.org",
# base path for destination GridFTP server
"dstBasePath":"gsiftp://dcgftp.usatlas.bnl.gov:2811/pnfs/usatlas.bnl.gov/atlasscratchdisk/rucio",
# max number of attempts
maxAttempts: 3,
"maxAttempts": 3,
# options for globus-url-copy
"gulOpts":"-verify-checksum -v"
}
Expand Down
29 changes: 29 additions & 0 deletions pandaharvester/harvesterzipper/ssh_zipper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from pandaharvester.harvestercore import core_utils
from .base_zipper import BaseZipper

# logger
_logger = core_utils.setup_logger('ssh_zipper')


# ssh plugin for zipper
class SshZipper(BaseZipper):
# constructor
def __init__(self, **kwarg):
BaseZipper.__init__(self, **kwarg)

# zip output files
def zip_output(self, jobspec):
tmpLog = self.make_logger(_logger, 'PandaID={0}'.format(jobspec.PandaID),
method_name='zip_output')
return self.ssh_zip_output(jobspec, tmpLog)

# asynchronous zip output
def async_zip_output(self, jobspec):
tmpLog = self.make_logger(_logger, 'PandaID={0}'.format(jobspec.PandaID),
method_name='zip_output')
# not really asynchronous as two staged zipping is not implemented in this plugin
return self.ssh_zip_output(jobspec, tmpLog)

# post zipping
def post_zip_output(self, jobspec):
return True, ''
3 changes: 3 additions & 0 deletions templates/panda_harvester.cfg.rpmnew.template
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,9 @@ keepMissed = 24

[watcher]

# a comma-concatenated list of file name of logs to watch (default: panda-db_proxy.log)
logFileNameList = panda-db_proxy.log

# action is taken when the last message is older than maxStalled sec. set 0 to disable the action
maxStalled = 300

Expand Down