From 90aefac49bb85dc815791105e65c602979079aaa Mon Sep 17 00:00:00 2001 From: Tadashi Maeno Date: Tue, 30 Jul 2019 11:18:22 +0200 Subject: [PATCH 1/3] Update README.md --- README.md | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 1508c691..0f6391b3 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,5 @@ -# Harvester +# OBSOLETE -Harvester is a resource-facing service between the PanDA server and collection of pilots. -It is a lightweight stateless service running on a VObox or an edge node of HPC centers -to provide a uniform view for various resources. - -For a detailed description and installation instructions, please check out this project's wiki tab: -https://github.com/PanDAWMS/panda-harvester/wiki +Moved to https://github.com/HSF/harvester ---------- From ba9bf326c66af1794e50771cff5355990655baab Mon Sep 17 00:00:00 2001 From: fahui Date: Tue, 30 Jul 2019 17:41:11 +0800 Subject: [PATCH 2/3] add ssh ziper --- pandaharvester/commit_timestamp.py | 2 +- .../harvesterpreparator/gridftp_preparator.py | 4 +-- .../harvesterstager/gridftp_stager.py | 4 +-- pandaharvester/harvesterzipper/ssh_zipper.py | 29 +++++++++++++++++++ 4 files changed, 34 insertions(+), 5 deletions(-) create mode 100644 pandaharvester/harvesterzipper/ssh_zipper.py diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 32b6ce6d..27edac0b 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "26-07-2019 07:58:24 on master (by tmaeno)" +timestamp = "30-07-2019 09:41:11 on contrib_cern (by fahui)" diff --git a/pandaharvester/harvesterpreparator/gridftp_preparator.py b/pandaharvester/harvesterpreparator/gridftp_preparator.py index 7cacb543..0eb3258a 100644 --- a/pandaharvester/harvesterpreparator/gridftp_preparator.py +++ b/pandaharvester/harvesterpreparator/gridftp_preparator.py @@ -19,14 +19,14 @@ "preparator": { "name": "GridFtpPreparator", "module": "pandaharvester.harvesterpreparator.gridftp_preparator", - # base path for source GridFTP server + # base path for source GridFTP server "srcBasePath": "gsiftp://dcdum02.aglt2.org/pnfs/aglt2.org/atlasdatadisk/rucio/", # base path for destination GridFTP server "dstBasePath": "gsiftp://dcgftp.usatlas.bnl.gov:2811/pnfs/usatlas.bnl.gov/atlasscratchdisk/rucio/", # base path for local access to the copied files "localBasePath": "/data/rucio", # max number of attempts - maxAttempts: 3, + "maxAttempts": 3, # options for globus-url-copy "gulOpts": "-cred /tmp/x509_u1234 -sync -sync-level 3 -verify-checksum -v" } diff --git a/pandaharvester/harvesterstager/gridftp_stager.py b/pandaharvester/harvesterstager/gridftp_stager.py index 48f1158f..e5110185 100644 --- a/pandaharvester/harvesterstager/gridftp_stager.py +++ b/pandaharvester/harvesterstager/gridftp_stager.py @@ -20,7 +20,7 @@ "stager":{ "name":"GridFtpStager", "module":"pandaharvester.harvesterstager.gridftp_stager", - "objstoreID_ES":117, + "objstoreID":117, # base path for local access to the files to be copied "srcOldBasePath":"/tmp/workdirs", # base path for access through source GridFTP server to the files to be copied @@ -28,7 +28,7 @@ # base path for destination GridFTP server "dstBasePath":"gsiftp://dcgftp.usatlas.bnl.gov:2811/pnfs/usatlas.bnl.gov/atlasscratchdisk/rucio", # max number of attempts - maxAttempts: 3, + "maxAttempts": 3, # options for globus-url-copy "gulOpts":"-verify-checksum -v" } diff --git a/pandaharvester/harvesterzipper/ssh_zipper.py b/pandaharvester/harvesterzipper/ssh_zipper.py new file mode 100644 index 00000000..854d1f5d --- /dev/null +++ b/pandaharvester/harvesterzipper/ssh_zipper.py @@ -0,0 +1,29 @@ +from pandaharvester.harvestercore import core_utils +from .base_zipper import BaseZipper + +# logger +_logger = core_utils.setup_logger('ssh_zipper') + + +# ssh plugin for zipper +class SshZipper(BaseZipper): + # constructor + def __init__(self, **kwarg): + BaseZipper.__init__(self, **kwarg) + + # zip output files + def zip_output(self, jobspec): + tmpLog = self.make_logger(_logger, 'PandaID={0}'.format(jobspec.PandaID), + method_name='zip_output') + return self.ssh_zip_output(jobspec, tmpLog) + + # asynchronous zip output + def async_zip_output(self, jobspec): + tmpLog = self.make_logger(_logger, 'PandaID={0}'.format(jobspec.PandaID), + method_name='zip_output') + # not really asynchronous as two staged zipping is not implemented in this plugin + return self.ssh_zip_output(jobspec, tmpLog) + + # post zipping + def post_zip_output(self, jobspec): + return True, '' From a8bb8d9730500694fc4e00ebd879ae3cb5fce06a Mon Sep 17 00:00:00 2001 From: fahui Date: Thu, 1 Aug 2019 17:17:12 +0800 Subject: [PATCH 3/3] Fix watcher --- pandaharvester/commit_timestamp.py | 2 +- pandaharvester/harvesterbody/watcher.py | 135 +++++++++--------- templates/panda_harvester.cfg.rpmnew.template | 3 + 3 files changed, 72 insertions(+), 68 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index be2a59b5..3ad82f87 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "30-07-2019 10:33:18 on contrib_cern (by fahui)" +timestamp = "01-08-2019 09:17:12 on contrib_cern (by fahui)" diff --git a/pandaharvester/harvesterbody/watcher.py b/pandaharvester/harvesterbody/watcher.py index 87c8e795..fbfa1c0c 100644 --- a/pandaharvester/harvesterbody/watcher.py +++ b/pandaharvester/harvesterbody/watcher.py @@ -63,11 +63,12 @@ def execute(self): timeNow = datetime.datetime.utcnow() if os.path.exists(logFilePath): # get latest timestamp + tmpLogDuration = None try: p = subprocess.Popen(['tail', '-1', logFilePath], stdout=subprocess.PIPE, stderr=subprocess.PIPE) line = p.communicate()[0] - tmpLastTime = datetime.datetime.strptime(line[:23], "%Y-%m-%d %H:%M:%S,%f") + tmpLastTime = datetime.datetime.strptime(str(line[:23], 'utf-8'), "%Y-%m-%d %H:%M:%S,%f") except Exception: tmpLastTime = None # get processing time for last 1000 queries @@ -76,11 +77,11 @@ def execute(self): logFilePath), stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) line = p.communicate()[0] - firstTime = datetime.datetime.strptime(line[:23], "%Y-%m-%d %H:%M:%S,%f") + firstTime = datetime.datetime.strptime(str(line[:23], 'utf-8'), "%Y-%m-%d %H:%M:%S,%f") if tmpLastTime is not None: tmpLogDuration = tmpLastTime - firstTime - except Exception: - pass + except Exception as e: + mainLog.warning('Skip with error {0}: {1}'.format(e.__class__.__name__, e)) tmpMsg = 'log={0} : last message at {0}. '.format(logFileName, tmpLastTime) if tmpLogDuration is not None: tmpMsg += '{0} messages took {1} sec'.format(harvester_config.watcher.nMessages, @@ -92,69 +93,69 @@ def execute(self): if tmpLogDuration is not None and (logDuration is None or logDuration < tmpLogDuration): logDuration = tmpLogDuration logDurationName = logFileName - # check timestamp - doAction = False - if harvester_config.watcher.maxStalled > 0 and lastTime is not None and \ - timeNow - lastTime > datetime.timedelta(seconds=harvester_config.watcher.maxStalled): - mainLog.warning('last log message is too old in {0}. seems to be stalled'.format(lastTimeName)) - doAction = True - elif harvester_config.watcher.maxDuration > 0 and logDuration is not None and \ - logDuration.total_seconds() > harvester_config.watcher.maxDuration: - mainLog.warning('slow message generation in {0}. seems to be a performance issue'.format( - logDurationName)) - doAction = True - # take action - if doAction: - # email - if 'email' in harvester_config.watcher.actions.split(','): - # get pass phrase - toSkip = False - mailUser = None - mailPass = None - if harvester_config.watcher.mailUser != '' and \ - harvester_config.watcher.mailPassword != '': - envName = harvester_config.watcher.passphraseEnv - if envName not in os.environ: - tmpMsg = '{0} is undefined in etc/sysconfig/panda_harvester'.format(envName) - mainLog.error(tmpMsg) - toSkip = True - else: - key = os.environ[envName] - mailUser = core_utils.decrypt_string(key, harvester_config.watcher.mailUser) - mailPass = core_utils.decrypt_string(key, harvester_config.watcher.mailPassword) - if not toSkip: - # message - msgBody = 'harvester {0} '.format(harvester_config.master.harvester_id) - msgBody += 'is having a problem on {0} '.format(socket.getfqdn()) - msgBody += 'at {0} (UTC)'.format(datetime.datetime.utcnow()) - message = MIMEText(msgBody) - message['Subject'] = "Harvester Alarm" - message['From'] = harvester_config.watcher.mailFrom - message['To'] = harvester_config.watcher.mailTo - # send email - mainLog.debug('sending email to {0}'.format(harvester_config.watcher.mailTo)) - server = smtplib.SMTP(harvester_config.watcher.mailServer, - harvester_config.watcher.mailPort) - if hasattr(harvester_config.watcher, 'mailUseSSL') and \ - harvester_config.watcher.mailUseSSL is True: - server.starttls() - if mailUser is not None and mailPass is not None: - server.login(mailUser, mailPass) - server.ehlo() - server.sendmail(harvester_config.watcher.mailFrom, - harvester_config.watcher.mailTo.split(','), - message.as_string()) - server.quit() - # kill - if 'kill' in harvester_config.watcher.actions.split(','): - # send USR2 fist - mainLog.debug('sending SIGUSR2') - os.killpg(os.getpgrp(), signal.SIGUSR2) - time.sleep(60) - mainLog.debug('sending SIGKILL') - os.killpg(os.getpgrp(), signal.SIGKILL) - else: - mainLog.debug('skip as {0} is missing'.format(logFileName)) + # check timestamp + doAction = False + if harvester_config.watcher.maxStalled > 0 and lastTime is not None and \ + timeNow - lastTime > datetime.timedelta(seconds=harvester_config.watcher.maxStalled): + mainLog.warning('last log message is too old in {0}. seems to be stalled'.format(lastTimeName)) + doAction = True + elif harvester_config.watcher.maxDuration > 0 and logDuration is not None and \ + logDuration.total_seconds() > harvester_config.watcher.maxDuration: + mainLog.warning('slow message generation in {0}. seems to be a performance issue'.format( + logDurationName)) + doAction = True + # take action + if doAction: + # email + if 'email' in harvester_config.watcher.actions.split(','): + # get pass phrase + toSkip = False + mailUser = None + mailPass = None + if harvester_config.watcher.mailUser != '' and \ + harvester_config.watcher.mailPassword != '': + envName = harvester_config.watcher.passphraseEnv + if envName not in os.environ: + tmpMsg = '{0} is undefined in etc/sysconfig/panda_harvester'.format(envName) + mainLog.error(tmpMsg) + toSkip = True + else: + key = os.environ[envName] + mailUser = core_utils.decrypt_string(key, harvester_config.watcher.mailUser) + mailPass = core_utils.decrypt_string(key, harvester_config.watcher.mailPassword) + if not toSkip: + # message + msgBody = 'harvester {0} '.format(harvester_config.master.harvester_id) + msgBody += 'is having a problem on {0} '.format(socket.getfqdn()) + msgBody += 'at {0} (UTC)'.format(datetime.datetime.utcnow()) + message = MIMEText(msgBody) + message['Subject'] = "Harvester Alarm" + message['From'] = harvester_config.watcher.mailFrom + message['To'] = harvester_config.watcher.mailTo + # send email + mainLog.debug('sending email to {0}'.format(harvester_config.watcher.mailTo)) + server = smtplib.SMTP(harvester_config.watcher.mailServer, + harvester_config.watcher.mailPort) + if hasattr(harvester_config.watcher, 'mailUseSSL') and \ + harvester_config.watcher.mailUseSSL is True: + server.starttls() + if mailUser is not None and mailPass is not None: + server.login(mailUser, mailPass) + server.ehlo() + server.sendmail(harvester_config.watcher.mailFrom, + harvester_config.watcher.mailTo.split(','), + message.as_string()) + server.quit() + # kill + if 'kill' in harvester_config.watcher.actions.split(','): + # send USR2 fist + mainLog.debug('sending SIGUSR2') + os.killpg(os.getpgrp(), signal.SIGUSR2) + time.sleep(60) + mainLog.debug('sending SIGKILL') + os.killpg(os.getpgrp(), signal.SIGKILL) + else: + mainLog.debug('No action needed for {0}'.format(logFileName)) except IOError: mainLog.debug('skip as locked by another thread or too early to check') except Exception: diff --git a/templates/panda_harvester.cfg.rpmnew.template b/templates/panda_harvester.cfg.rpmnew.template index d677feb3..4f15e0c8 100644 --- a/templates/panda_harvester.cfg.rpmnew.template +++ b/templates/panda_harvester.cfg.rpmnew.template @@ -735,6 +735,9 @@ keepMissed = 24 [watcher] +# a comma-concatenated list of file name of logs to watch (default: panda-db_proxy.log) +logFileNameList = panda-db_proxy.log + # action is taken when the last message is older than maxStalled sec. set 0 to disable the action maxStalled = 300