Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 37 additions & 39 deletions pysis/workertools/baseWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,92 +2,90 @@
import os
import json
import pdb
import logging
from logging import FileHandler
from logging.handlers import TimedRotatingFileHandler
from pysis.workertools.api import APIFileSaver
from logging_utils import S3MetricFileHandler

class APITokenException(Exception):
pass


class BaseWorker(object):

WORKER_LOG_FILE = '/tmp/worker.log'
METRIC_LOG_FILE = '/tmp/metrics.log'

def __init__(self, workerID, environment):
self.env = environment
self.uuid = workerID
self.config = self.loadConfiguration()
self.api = SIS()
self.configuration_id = None
self.fileSaver = APIFileSaver(apiDBConnection=None,
accessKeyID=self.config['aws-access-key-id']['value'],
secretAccessKey=self.config['aws-secret-access-key']['value'])
self.logger = self.init_logger()
self.metric_logger = self.init_metric_logger()

def init_metric_logger(self):
metric_logger = logging.getLogger('worker.metric.{}'.format(self.__class__.__name__))
metric_logger.setLevel(logging.DEBUG)
# overwrite each time as we store it in the DB when done
metrics_disk_handler = FileHandler(BaseWorker.METRIC_LOG_FILE, mode='w')
s3_handler = S3MetricFileHandler(self.config)
metric_logger.addHandler(s3_handler)
metric_logger.addHandler(metrics_disk_handler)
return metric_logger

def init_logger(self):
logger = logging.getLogger('worker')
logger.setLevel(logging.DEBUG)
disk_handler = TimedRotatingFileHandler(BaseWorker.WORKER_LOG_FILE, when='d')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
disk_handler.setFormatter(formatter)
logger.addHandler(disk_handler)
return logger

# load configuration
self.config = self.loadConfiguration()

def loadConfiguration(self):
self.worker = self.api.workers.get(uuid=self.uuid)
print (self.worker.label)
configValues = self.worker.getConfigurationValues(environment=self.env)
config = {}


for value in configValues:

configValue = {}

# store type
configValue['type'] = value.type

configValue = {'type': value.type}
# store value
if value.type == "integer":
configValue['value'] = int(value.value)
elif value.type == "json":
configValue['value'] = json.loads(value.value)
else:
configValue['value'] = str(value.value)

mapping = {'integer': int, 'json': json.loads}
configValue['value'] = mapping.get(value.type, default=str)(value.value)
# store id
configValue['id'] = value.id

# store config dict
config[value.key] = configValue

# save configuration_id
# should be the same each time
# dumb, but whatever
self.configuration_id = value.configuration_id

return config

def updateConfigurationValue(self, key, value):


# update local value
configValue = self.config[key]

configValue['value'] = value

# send along type so update completes properly
value_type = configValue['type']

value_id = configValue['id']

self.worker.updateConfigurationValue(self.configuration_id, value_id, value, value_type)


def getConfigurationValue(self, key):

return self.config[key]['value']


def createConfigurationValue(self, key, value, value_type):

res = self.worker.createConfigurationValue(self.configuration_id, key, value, value_type)

# load local values
configValue = {'value': value, 'id': res.id, 'type': value_type}

self.config[key] = configValue


def deleteConfigurationValue(self, key, value):

self.worker.deleteConfigurationValue(self.configuration_id, value['id'])

# TODO: API provides no way of checking if deletion was successful
del self.config[key]


88 changes: 88 additions & 0 deletions pysis/workertools/logging_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from logging import Handler, FileHandler
from os.path import join
from os import environ
import logging
import pika
from json import dumps


class QueueHandler(Handler):
def __init__(self, config, file_saver):
super(QueueHandler, self).__init__()
credentials = pika.PlainCredentials(config['rabbit-user']['value'], config['rabbit-pass']['value'])
params = pika.ConnectionParameters(host=config['rabbit-host']['value'], credentials=credentials)
connection = pika.BlockingConnection(params)
self.channel = connection.channel()
self.rabbit_prefix = config['rabbit-prefix']['value']
self.exchange_name = self.rabbit_prefix+'.wms_metrics'
self.channel.exchange_declare(exchange=self.exchange_name, type='topic')

def emit(self, record):
try:
self.channel.publish(exchange=self.exchange_name,
routing_key='metrics',
body=dumps(record))
except:
self.handleError(record)

def flush(self):
pass

def close(self):
try:
self.channel.close()
except Exception as e:
logging.error('Unable to close publishing channel', e)

class MetricFileHandler(FileHandler):
def emit(self, record):
if type(record.msg) is dict:
record.msg = dumps(record.msg)
super(MetricFileHandler, self).emit(record)

def close(self):
super(MetricFileHandler, self).close()
self.save_file()

def save_file(self):
pass


class MockHandler(MetricFileHandler):
def __init__(self, filename, saved_file_name, mode='a', encoding=None, delay=0):
super(MetricFileHandler, self).__init__(filename=filename, mode=mode, encoding=encoding, delay=delay)
self.saved_file_name = saved_file_name

def save_file(self):
with open(self.saved_file_name, 'w') as saved:
saved.write('saved')


class S3MetricFileHandler(MetricFileHandler):
def __init__(self, file_name, file_saver=None, mode='a', encoding=None, delay=0):
super(S3MetricFileHandler, self).__init__(filename=file_name, mode=mode, encoding=encoding, delay=delay)
self.file_saver = file_saver
self.local_file_name = file_name
# TODO -> need a meaningful name, maybe based on the docker container
self.s3_file_name = join('workers/joblogs', self.get_container_id())

def get_container_id(self):
if 'HOSTNAME' in environ:
return environ['HOSTNAME'].strip()
cg_data = open('/proc/self/cgroup').readlines()
cg_data = [l.split('/')[-1].strip() for l in cg_data]
uid = set([l for l in cg_data if len(l) > 0])
if len(set(cg_data)) == 1:
return cg_data[0].strip()
elif len(set(cg_data)) > 1:
print("More than one CID????")
return cg_data[0].strip()
else:
print("Unable to deduce container id from docker")
return 'DEFAULT_HOST_NAME'

def save_file(self):
if self.file_saver is not None:
self.file_saver.saveFile(fileName=self.s3_file_name,
file_category_id=None,
local_filepath=self.local_file_name)
45 changes: 45 additions & 0 deletions test/test_metric_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import unittest
import os
import logging
from pysis.workertools.logging_utils import MockHandler

LOG_FILE = '/tmp/test.log'
SAVED_FILE = '/tmp/saved_handler'

FIRST_LINE = {"foo": "bar"}
SECOND_LINE = {"baz": "quux"}


class HandlerTest(unittest.TestCase):
def delete_files(self):
if os.path.isfile(LOG_FILE):
os.remove(LOG_FILE)
if os.path.isfile(SAVED_FILE):
os.remove(SAVED_FILE)

def setUp(self):
handler = MockHandler(filename=LOG_FILE, saved_file_name=SAVED_FILE)
logger = logging.getLogger("test")
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
logger.info(FIRST_LINE)
logger.info(SECOND_LINE)
# make sure the close method is called
del logger
del handler

def tearDown(self):
self.delete_files()

def test_write_and_save(self):
self.assertTrue(os.path.isfile(LOG_FILE))
self.assertTrue(os.path.isfile(SAVED_FILE))
import json
saved_text = open(SAVED_FILE).read().strip()
self.assertEqual('saved', saved_text)
saved_info = open(LOG_FILE).readlines()

self.assertEqual(2, len(saved_info))
info = [FIRST_LINE, SECOND_LINE]
for i, l in enumerate(info):
self.assertEqual(json.loads(saved_info[i]), l)
73 changes: 73 additions & 0 deletions test/test_metric_names.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import unittest
import inspect
import os
import logging
import json
from pysis.workertools.baseWorker import BaseWorker
from pysis.workertools.logging_utils import MockHandler

LOG_FILE = '/tmp/test.log'
SAVED_FILE = '/tmp/saved'

class BaseMockDatabaseWorker(BaseWorker):
def __init__(self):
self.metric_logger = self.init_metric_logger()

def init_metric_logger(self):
metric_logger = logging.getLogger('worker.metric.{}'.format(self.__class__.__name__))
metric_logger.setLevel(logging.DEBUG)
# overwrite each time as we store it in the DB when done
metrics_disk_handler = MockHandler(filename=LOG_FILE, saved_file_name=SAVED_FILE, mode='w')
metric_logger.addHandler(metrics_disk_handler)
return metric_logger

def onExit(self):
pass

def selectRowsByBlock(self):
current_frame = inspect.currentframe()
calling_frame = inspect.getouterframes(current_frame, 2)
calling_frame_name = calling_frame[1][3]
self.metric_logger.info({'foo': 'block', 'name': calling_frame_name})

def selectRowsByRange(self):
current_frame = inspect.currentframe()
calling_frame = inspect.getouterframes(current_frame, 2)
calling_frame_name = calling_frame[1][3]
self.metric_logger.info({'foo': 'range', 'name': calling_frame_name})


class MockWorker(BaseMockDatabaseWorker):
def getFoo(self):
self.selectRowsByBlock()

def getBar(self):
self.selectRowsByRange()


class TestMethodName(unittest.TestCase):
def delete_files(self):
if os.path.isfile(LOG_FILE):
os.remove(LOG_FILE)
if os.path.isfile(SAVED_FILE):
os.remove(SAVED_FILE)

def setUp(self):
worker = MockWorker()
worker.getFoo()
worker.getBar()
del worker

def tearDown(self):
self.delete_files()

def test_writes_log(self):
self.assertTrue(os.path.isfile(LOG_FILE))
data = open(LOG_FILE).readlines()
data = [json.loads(d.strip()) for d in data]
self.assertEqual(2, len(data))
self.assertEqual({'foo': 'block', 'name': 'getFoo'}, data[0])
self.assertEqual({'foo': 'range', 'name': 'getBar'}, data[1])

self.assertTrue(os.path.isfile(SAVED_FILE))
self.assertEqual('saved', open(SAVED_FILE).read().strip())