diff --git a/pysis/workertools/baseWorker.py b/pysis/workertools/baseWorker.py index 729092a..432f441 100644 --- a/pysis/workertools/baseWorker.py +++ b/pysis/workertools/baseWorker.py @@ -2,92 +2,90 @@ import os import json import pdb +import logging +from logging import FileHandler +from logging.handlers import TimedRotatingFileHandler +from pysis.workertools.api import APIFileSaver +from logging_utils import S3MetricFileHandler class APITokenException(Exception): pass + class BaseWorker(object): - + WORKER_LOG_FILE = '/tmp/worker.log' + METRIC_LOG_FILE = '/tmp/metrics.log' + def __init__(self, workerID, environment): self.env = environment self.uuid = workerID + self.config = self.loadConfiguration() self.api = SIS() self.configuration_id = None + self.fileSaver = APIFileSaver(apiDBConnection=None, + accessKeyID=self.config['aws-access-key-id']['value'], + secretAccessKey=self.config['aws-secret-access-key']['value']) + self.logger = self.init_logger() + self.metric_logger = self.init_metric_logger() + + def init_metric_logger(self): + metric_logger = logging.getLogger('worker.metric.{}'.format(self.__class__.__name__)) + metric_logger.setLevel(logging.DEBUG) + # overwrite each time as we store it in the DB when done + metrics_disk_handler = FileHandler(BaseWorker.METRIC_LOG_FILE, mode='w') + s3_handler = S3MetricFileHandler(self.config) + metric_logger.addHandler(s3_handler) + metric_logger.addHandler(metrics_disk_handler) + return metric_logger + + def init_logger(self): + logger = logging.getLogger('worker') + logger.setLevel(logging.DEBUG) + disk_handler = TimedRotatingFileHandler(BaseWorker.WORKER_LOG_FILE, when='d') + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + disk_handler.setFormatter(formatter) + logger.addHandler(disk_handler) + return logger - # load configuration - self.config = self.loadConfiguration() - def loadConfiguration(self): self.worker = self.api.workers.get(uuid=self.uuid) print (self.worker.label) configValues = self.worker.getConfigurationValues(environment=self.env) config = {} - - for value in configValues: - - configValue = {} - - # store type - configValue['type'] = value.type - + configValue = {'type': value.type} # store value - if value.type == "integer": - configValue['value'] = int(value.value) - elif value.type == "json": - configValue['value'] = json.loads(value.value) - else: - configValue['value'] = str(value.value) - + mapping = {'integer': int, 'json': json.loads} + configValue['value'] = mapping.get(value.type, default=str)(value.value) # store id configValue['id'] = value.id - # store config dict config[value.key] = configValue - # save configuration_id # should be the same each time # dumb, but whatever self.configuration_id = value.configuration_id - return config def updateConfigurationValue(self, key, value): - - # update local value configValue = self.config[key] - configValue['value'] = value - # send along type so update completes properly value_type = configValue['type'] - value_id = configValue['id'] - self.worker.updateConfigurationValue(self.configuration_id, value_id, value, value_type) - def getConfigurationValue(self, key): - return self.config[key]['value'] - def createConfigurationValue(self, key, value, value_type): - res = self.worker.createConfigurationValue(self.configuration_id, key, value, value_type) - # load local values configValue = {'value': value, 'id': res.id, 'type': value_type} - self.config[key] = configValue - def deleteConfigurationValue(self, key, value): - self.worker.deleteConfigurationValue(self.configuration_id, value['id']) - # TODO: API provides no way of checking if deletion was successful del self.config[key] - - diff --git a/pysis/workertools/logging_utils.py b/pysis/workertools/logging_utils.py new file mode 100644 index 0000000..d658789 --- /dev/null +++ b/pysis/workertools/logging_utils.py @@ -0,0 +1,88 @@ +from logging import Handler, FileHandler +from os.path import join +from os import environ +import logging +import pika +from json import dumps + + +class QueueHandler(Handler): + def __init__(self, config, file_saver): + super(QueueHandler, self).__init__() + credentials = pika.PlainCredentials(config['rabbit-user']['value'], config['rabbit-pass']['value']) + params = pika.ConnectionParameters(host=config['rabbit-host']['value'], credentials=credentials) + connection = pika.BlockingConnection(params) + self.channel = connection.channel() + self.rabbit_prefix = config['rabbit-prefix']['value'] + self.exchange_name = self.rabbit_prefix+'.wms_metrics' + self.channel.exchange_declare(exchange=self.exchange_name, type='topic') + + def emit(self, record): + try: + self.channel.publish(exchange=self.exchange_name, + routing_key='metrics', + body=dumps(record)) + except: + self.handleError(record) + + def flush(self): + pass + + def close(self): + try: + self.channel.close() + except Exception as e: + logging.error('Unable to close publishing channel', e) + +class MetricFileHandler(FileHandler): + def emit(self, record): + if type(record.msg) is dict: + record.msg = dumps(record.msg) + super(MetricFileHandler, self).emit(record) + + def close(self): + super(MetricFileHandler, self).close() + self.save_file() + + def save_file(self): + pass + + +class MockHandler(MetricFileHandler): + def __init__(self, filename, saved_file_name, mode='a', encoding=None, delay=0): + super(MetricFileHandler, self).__init__(filename=filename, mode=mode, encoding=encoding, delay=delay) + self.saved_file_name = saved_file_name + + def save_file(self): + with open(self.saved_file_name, 'w') as saved: + saved.write('saved') + + +class S3MetricFileHandler(MetricFileHandler): + def __init__(self, file_name, file_saver=None, mode='a', encoding=None, delay=0): + super(S3MetricFileHandler, self).__init__(filename=file_name, mode=mode, encoding=encoding, delay=delay) + self.file_saver = file_saver + self.local_file_name = file_name + # TODO -> need a meaningful name, maybe based on the docker container + self.s3_file_name = join('workers/joblogs', self.get_container_id()) + + def get_container_id(self): + if 'HOSTNAME' in environ: + return environ['HOSTNAME'].strip() + cg_data = open('/proc/self/cgroup').readlines() + cg_data = [l.split('/')[-1].strip() for l in cg_data] + uid = set([l for l in cg_data if len(l) > 0]) + if len(set(cg_data)) == 1: + return cg_data[0].strip() + elif len(set(cg_data)) > 1: + print("More than one CID????") + return cg_data[0].strip() + else: + print("Unable to deduce container id from docker") + return 'DEFAULT_HOST_NAME' + + def save_file(self): + if self.file_saver is not None: + self.file_saver.saveFile(fileName=self.s3_file_name, + file_category_id=None, + local_filepath=self.local_file_name) \ No newline at end of file diff --git a/test/test_metric_handler.py b/test/test_metric_handler.py new file mode 100644 index 0000000..9767c92 --- /dev/null +++ b/test/test_metric_handler.py @@ -0,0 +1,45 @@ +import unittest +import os +import logging +from pysis.workertools.logging_utils import MockHandler + +LOG_FILE = '/tmp/test.log' +SAVED_FILE = '/tmp/saved_handler' + +FIRST_LINE = {"foo": "bar"} +SECOND_LINE = {"baz": "quux"} + + +class HandlerTest(unittest.TestCase): + def delete_files(self): + if os.path.isfile(LOG_FILE): + os.remove(LOG_FILE) + if os.path.isfile(SAVED_FILE): + os.remove(SAVED_FILE) + + def setUp(self): + handler = MockHandler(filename=LOG_FILE, saved_file_name=SAVED_FILE) + logger = logging.getLogger("test") + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + logger.info(FIRST_LINE) + logger.info(SECOND_LINE) + # make sure the close method is called + del logger + del handler + + def tearDown(self): + self.delete_files() + + def test_write_and_save(self): + self.assertTrue(os.path.isfile(LOG_FILE)) + self.assertTrue(os.path.isfile(SAVED_FILE)) + import json + saved_text = open(SAVED_FILE).read().strip() + self.assertEqual('saved', saved_text) + saved_info = open(LOG_FILE).readlines() + + self.assertEqual(2, len(saved_info)) + info = [FIRST_LINE, SECOND_LINE] + for i, l in enumerate(info): + self.assertEqual(json.loads(saved_info[i]), l) \ No newline at end of file diff --git a/test/test_metric_names.py b/test/test_metric_names.py new file mode 100644 index 0000000..9d62e72 --- /dev/null +++ b/test/test_metric_names.py @@ -0,0 +1,73 @@ +import unittest +import inspect +import os +import logging +import json +from pysis.workertools.baseWorker import BaseWorker +from pysis.workertools.logging_utils import MockHandler + +LOG_FILE = '/tmp/test.log' +SAVED_FILE = '/tmp/saved' + +class BaseMockDatabaseWorker(BaseWorker): + def __init__(self): + self.metric_logger = self.init_metric_logger() + + def init_metric_logger(self): + metric_logger = logging.getLogger('worker.metric.{}'.format(self.__class__.__name__)) + metric_logger.setLevel(logging.DEBUG) + # overwrite each time as we store it in the DB when done + metrics_disk_handler = MockHandler(filename=LOG_FILE, saved_file_name=SAVED_FILE, mode='w') + metric_logger.addHandler(metrics_disk_handler) + return metric_logger + + def onExit(self): + pass + + def selectRowsByBlock(self): + current_frame = inspect.currentframe() + calling_frame = inspect.getouterframes(current_frame, 2) + calling_frame_name = calling_frame[1][3] + self.metric_logger.info({'foo': 'block', 'name': calling_frame_name}) + + def selectRowsByRange(self): + current_frame = inspect.currentframe() + calling_frame = inspect.getouterframes(current_frame, 2) + calling_frame_name = calling_frame[1][3] + self.metric_logger.info({'foo': 'range', 'name': calling_frame_name}) + + +class MockWorker(BaseMockDatabaseWorker): + def getFoo(self): + self.selectRowsByBlock() + + def getBar(self): + self.selectRowsByRange() + + +class TestMethodName(unittest.TestCase): + def delete_files(self): + if os.path.isfile(LOG_FILE): + os.remove(LOG_FILE) + if os.path.isfile(SAVED_FILE): + os.remove(SAVED_FILE) + + def setUp(self): + worker = MockWorker() + worker.getFoo() + worker.getBar() + del worker + + def tearDown(self): + self.delete_files() + + def test_writes_log(self): + self.assertTrue(os.path.isfile(LOG_FILE)) + data = open(LOG_FILE).readlines() + data = [json.loads(d.strip()) for d in data] + self.assertEqual(2, len(data)) + self.assertEqual({'foo': 'block', 'name': 'getFoo'}, data[0]) + self.assertEqual({'foo': 'range', 'name': 'getBar'}, data[1]) + + self.assertTrue(os.path.isfile(SAVED_FILE)) + self.assertEqual('saved', open(SAVED_FILE).read().strip()) \ No newline at end of file