From b46e2949ef3a2140936245401428dd1d0de49816 Mon Sep 17 00:00:00 2001 From: adixsyukri Date: Thu, 27 Apr 2017 12:44:57 +0800 Subject: [PATCH 1/2] add workflow data-retention-hdfs, transform-files-full, ingest-files-raw-current --- bin/generate_job.py | 26 ++- bin/generate_job_files.py | 78 ++++++- workflow/data-retention-hdfs/conf/oozie.xml | 22 ++ .../data-retention-hdfs/data-retention.py | 44 ++++ workflow/data-retention-hdfs/workflow.xml | 41 ++++ .../ingest-files-raw-current/conf/oozie.xml | 22 ++ .../conf/oraoop-site.xml | 104 +++++++++ .../ingest-files-raw-current/date_helper.py | 20 ++ .../ingest-files-raw-current/workflow.xml | 81 +++++++ workflow/transform-files-full/date_helper.py | 20 ++ .../files-full-parquet.py | 67 ++++++ workflow/transform-files-full/workflow.xml | 197 ++++++++++++++++++ 12 files changed, 712 insertions(+), 10 deletions(-) create mode 100644 workflow/data-retention-hdfs/conf/oozie.xml create mode 100644 workflow/data-retention-hdfs/data-retention.py create mode 100644 workflow/data-retention-hdfs/workflow.xml create mode 100644 workflow/ingest-files-raw-current/conf/oozie.xml create mode 100755 workflow/ingest-files-raw-current/conf/oraoop-site.xml create mode 100755 workflow/ingest-files-raw-current/date_helper.py create mode 100644 workflow/ingest-files-raw-current/workflow.xml create mode 100755 workflow/transform-files-full/date_helper.py create mode 100644 workflow/transform-files-full/files-full-parquet.py create mode 100644 workflow/transform-files-full/workflow.xml diff --git a/bin/generate_job.py b/bin/generate_job.py index 1d099da..b45ddab 100755 --- a/bin/generate_job.py +++ b/bin/generate_job.py @@ -123,6 +123,7 @@ ('targetdb', None), ('stagingdb', None), ('backdate', '7'), + ('retention', '7'), ('schema', None), ('table', None), ('mapper', None), @@ -180,6 +181,9 @@ }, 'incremental-ingest-frozen': { 'workflow': 'incremental-ingest-frozen', + }, + 'data-retention-hdfs': { + 'workflow': 'data-retention-hdfs', } } @@ -188,37 +192,43 @@ 'ingest-full': '00:01', 'ingest-increment': '00:01', 'transform-full': '00:30', - 'transform-increment': '00:30' + 'transform-increment': '00:30', + 'data-retention-hdfs': '14:30' }, 'SIEBEL_NOVA': { 'ingest-full': '03:00', 'ingest-increment': '03:00', 'transform-full': '05:00', - 'transform-increment': '05:00' + 'transform-increment': '05:00', + 'data-retention-hdfs': '14:30' }, 'BRM_NOVA': { 'ingest-full': '03:01', 'ingest-increment': '03:01', 'transform-full': '05:00', - 'transform-increment': '05:00' + 'transform-increment': '05:00', + 'data-retention-hdfs': '14:30' }, 'GRANITE': { 'ingest-full': '03:01', 'ingest-increment': '03:01', 'transform-full': '05:00', - 'transform-increment': '05:00' + 'transform-increment': '05:00', + 'data-retention-hdfs': '14:30' }, 'NIS': { 'ingest-full': '03:01', 'ingest-increment': '03:01', 'transform-full': '05:00', - 'transform-increment': '05:00' + 'transform-increment': '05:00', + 'data-retention-hdfs': '14:30' }, 'PORTAL': { 'ingest-full': '03:01', 'ingest-increment': '03:01', 'transform-full': '05:00', - 'transform-increment': '05:00' + 'transform-increment': '05:00', + 'data-retention-hdfs': '14:30' } } @@ -227,13 +237,13 @@ 'path': '%(prefix)s/source/%(source_name)s/%(schema)s_%(table)s/ingest_date=${YEAR}-${MONTH}-${DAY}', 'format': 'parquet', 'exec_time': '00:00', - 'retention': 365 + 'retention': 7 }, 'increment-retention': { 'path': '%(prefix)s/source/%(source_name)s/%(schema)s_%(table)s/INCREMENT/ingest_date=${YEAR}-${MONTH}-${DAY}', 'format': 'parquet', 'exec_time': '00:00', - 'retention': 365 + 'retention': 7 }, 'full': { 'path': '%(prefix)s/source/%(source_name)s/%(schema)s_%(table)s/CURRENT/', diff --git a/bin/generate_job_files.py b/bin/generate_job_files.py index 13c6380..d767120 100755 --- a/bin/generate_job_files.py +++ b/bin/generate_job_files.py @@ -63,6 +63,29 @@ ''' +falcon_hivefeed_template = ''' + + entity_type=feed,format=%(feed_format)s,stage=%(stage)s,source=%(source_name)s,schema=%(schema)s,table=%(table)s,feed_type=%(feed_type)s + _SUCCESS + days(1) + GMT+08:00 + + + + + %(retention)s + + + + + + + + + + + +''' @@ -162,10 +185,10 @@ FEEDS = { 'full-retention': { - 'path': '%(prefix)s/source/%(source_name)s/%(schema)s_%(table)s/instance_date=${YEAR}-${MONTH}-${DAY}', + 'path': '%(prefix)s/source/%(source_name)s/%(schema)s_%(table)s/ingest_date=${YEAR}-${MONTH}-${DAY}', 'format': 'parquet', 'exec_time': '00:00', - 'retention': 365 + 'retention': 7 }, # 'increment-retention': { # 'path': '%(prefix)s/source/%(source_name)s/%(schema)s_%(table)s/INCREMENT/instance_date=${YEAR}-${MONTH}-${DAY}', @@ -185,6 +208,16 @@ # }, } +HIVE_FEEDS = { + 'hive-retention' : { + 'path': 'catalog:%(targetdb)s_HISTORY:%(schema)s_%(table)s#ingest_date=${YEAR}-${MONTH}-${DAY}', + 'format': 'orc', + 'exec_time': '00:00', + 'retention': 365 + } +} + + ARTIFACTS='files-artifacts/' FOREVER=36135 @@ -316,6 +349,38 @@ def falcon_feed(stage, properties, feed, feed_path, feed_format, job = falcon_feed_template % params return params, job +def write_falcon_hivefeed(storedir, stage, properties, feed, feed_path, + feed_format, exec_time='00:00', retention=FOREVER): + filename = '%(source_name)s-%(schema)s-%(table)s.xml' % properties + if not os.path.exists(storedir): + os.makedirs(storedir) + with open('%s/%s' % (storedir, filename), 'w') as f: + params, job = falcon_hivefeed(stage, properties, feed, + feed_path, feed_format, exec_time, retention) + f.write(job) + +def falcon_hivefeed(stage, properties, feed, feed_path, feed_format, + exec_time='00:00', retention=FOREVER): + if retention is not None: + rt = "" % retention + else: + rt = '' + params = { + 'schema': properties['schema'], + 'table': properties['table'], + 'source_name': properties['source_name'], + 'start_utc': generate_utc_time(exec_time), + 'feed_name': default_feed_name(stage, properties, feed), + 'feed_path': feed_path % properties, + 'feed_type': feed, + 'feed_format': feed_format, + 'stage': stage, + 'retention': rt + } + job = falcon_hivefeed_template % params + return params, job + + def main(): argparser = argparse.ArgumentParser(description='Generate oozie and falcon configurations for ingestion') argparser.add_argument('tabletsv', help='TSV of the table list') @@ -369,6 +434,15 @@ def main(): feed_opts['path'], feed_opts['format'], feed_opts['exec_time'], feed_opts.get('retention', FOREVER)) + for feed, feed_opts in HIVE_FEEDS.items(): + opts = params.copy() + opts['prefix'] = conf['prefix'] + opts['targetdb'] = conf['targetdb'] % params + storedir = '%s/%s-falconfeed-%s' % (ARTIFACTS, stage, feed) + write_falcon_hivefeed(storedir, stage, opts, feed, + feed_opts['path'], feed_opts['format'], + feed_opts['exec_time'], + feed_opts.get('retention', FOREVER)) if __name__ == '__main__': main() diff --git a/workflow/data-retention-hdfs/conf/oozie.xml b/workflow/data-retention-hdfs/conf/oozie.xml new file mode 100644 index 0000000..deb54e6 --- /dev/null +++ b/workflow/data-retention-hdfs/conf/oozie.xml @@ -0,0 +1,22 @@ + + + oozie.launcher.mapreduce.job.queuename + oozie + + + mapreduce.job.queuename + oozie + + + mapred.job.queuename + oozie + + + mapred.reduce.tasks + -1 + + + tez.queue.name + batch + + diff --git a/workflow/data-retention-hdfs/data-retention.py b/workflow/data-retention-hdfs/data-retention.py new file mode 100644 index 0000000..00c2f5a --- /dev/null +++ b/workflow/data-retention-hdfs/data-retention.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python + +from subprocess import Popen, PIPE +import time +import traceback +import argparse + +from datetime import datetime, timedelta, date + +parser = argparse.ArgumentParser() +parser.add_argument("--sourcedir", help="Source directory") +parser.add_argument("--retention", help="Days for data retention") +args = parser.parse_args() + +OPTIONS = [args.sourcedir, args.sourcedir+'/INCREMENT'] + +INGEST_DATE = [] + +for i in OPTIONS: + try: + list_files = Popen(['hdfs','dfs','-ls',i], stdout=PIPE) + list_files.wait() + + lines = list_files.stdout.read().strip().split('\n') + except: + traceback.print_exc() + + for p in lines: + if 'ingest_date' in p: + INGEST_DATE.append(p) + +base_date = (datetime.now() - timedelta(days=int(args.retention))).strftime('%Y-%m-%d') +to_be_deleted = [i.split(' ')[len(i.split(' '))-1] for i in INGEST_DATE if i.split(' ')[len(i.split(' '))-1].split('=')[1] < base_date] + +if to_be_deleted: + print("To be deleted") + print '\n'.join([i for i in to_be_deleted]) + + for item in to_be_deleted: + try: + delete_folder = Popen(['hdfs','dfs','-rm','-r', item], stdout=PIPE) + delete_folder.wait() + except: + traceback.print_exc() diff --git a/workflow/data-retention-hdfs/workflow.xml b/workflow/data-retention-hdfs/workflow.xml new file mode 100644 index 0000000..be74b47 --- /dev/null +++ b/workflow/data-retention-hdfs/workflow.xml @@ -0,0 +1,41 @@ + + + + + prefix + /user/trace/development/ + + + sourcedir + ${prefix}/source/${source_name}/${schema}_${table}/ + + + retention + 7 + + + + + + + ${resourceManager} + ${nameNode} + conf/oozie.xml + python + data-retention.py + --sourcedir + ${sourcedir} + --retention + ${retention} + HADOOP_USER_NAME=${wf:user()} + data-retention.py + + + + + + ${wf:errorMessage(wf:lastErrorNode())} + + + diff --git a/workflow/ingest-files-raw-current/conf/oozie.xml b/workflow/ingest-files-raw-current/conf/oozie.xml new file mode 100644 index 0000000..deb54e6 --- /dev/null +++ b/workflow/ingest-files-raw-current/conf/oozie.xml @@ -0,0 +1,22 @@ + + + oozie.launcher.mapreduce.job.queuename + oozie + + + mapreduce.job.queuename + oozie + + + mapred.job.queuename + oozie + + + mapred.reduce.tasks + -1 + + + tez.queue.name + batch + + diff --git a/workflow/ingest-files-raw-current/conf/oraoop-site.xml b/workflow/ingest-files-raw-current/conf/oraoop-site.xml new file mode 100755 index 0000000..21c3d18 --- /dev/null +++ b/workflow/ingest-files-raw-current/conf/oraoop-site.xml @@ -0,0 +1,104 @@ + + + + + + + + + + oraoop.oracle.session.initialization.statements + alter session disable parallel query; + alter session set "_serial_direct_read"=true; + alter session set tracefile_identifier=oraoop; + --alter session set events '10046 trace name context forever, level 8'; + alter session set time_zone = '{oracle.sessionTimeZone|GMT}'; + + A semicolon-delimited list of Oracle statements that are executed, in order, to initialize each Oracle session. + Use {[property_name]|[default_value]} characters to refer to a Sqoop/Hadoop configuration property. + If the property does not exist, the specified default value will be used. + E.g. {oracle.sessionTimeZone|GMT} will equate to the value of the property named "oracle.sessionTimeZone" or + to "GMT" if this property has not been set. + + + + + mapred.map.tasks.speculative.execution + false + Speculative execution is disabled to prevent redundant load on the Oracle database. + + + + + oraoop.import.hint + NO_INDEX(t) + Hint to add to the SELECT statement for an IMPORT job. + The table will have an alias of t which can be used in the hint. + By default the NO_INDEX hint is applied to stop the use of an index. + To override this in oraoop-site.xml set the value to a blank string. + + + + + + + + + + + + diff --git a/workflow/ingest-files-raw-current/date_helper.py b/workflow/ingest-files-raw-current/date_helper.py new file mode 100755 index 0000000..53bf3ac --- /dev/null +++ b/workflow/ingest-files-raw-current/date_helper.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python + +"""Get today's date""" +from datetime import datetime + +def main(): + """Main function to get today's date""" + now = datetime.now() + utcnow = datetime.utcnow() + fmt = '%Y-%m-%d %H:%M:%S.0' + out = { + 'NOW': now.strftime(fmt), + 'UTCNOW': utcnow.strftime(fmt), + 'DATE': now.strftime('%Y-%m-%d') + } + + print '\n'.join(['%s=%s' % (k, v) for k, v in out.items()]) + +if __name__ == '__main__': + main() diff --git a/workflow/ingest-files-raw-current/workflow.xml b/workflow/ingest-files-raw-current/workflow.xml new file mode 100644 index 0000000..9e16992 --- /dev/null +++ b/workflow/ingest-files-raw-current/workflow.xml @@ -0,0 +1,81 @@ + + + + prefix + /user/trace/development/ + + + stagingdb + staging_dev + + + targetdb + ${source_name}_dev + + + outputdir + ${prefix}/source/${source_name}/${table}/ + + + sourcedir + ${prefix}/source_files/${source_name}/${table}/ + + + staging_tbl + ${source_name}_${schema}_${table} + + + reconcile + append + + + + + + + ${resourceManager} + ${nameNode} + python + date_helper.py + date_helper.py + + + + + + + + ${nameNode} + + + + + + + + + ${resourceManager} + ${nameNode} + -Dmapreduce.job.queuename=distcp + ${nameNode}/${sourcedir}/RAW/ingest_date=${wf:actionData('getDates')['DATE']} + ${nameNode}/${sourcedir}/RAW/CURRENT + + + + + + + ${nameNode} + + + + + + + ${wf:errorMessage(wf:lastErrorNode())} + + + diff --git a/workflow/transform-files-full/date_helper.py b/workflow/transform-files-full/date_helper.py new file mode 100755 index 0000000..53bf3ac --- /dev/null +++ b/workflow/transform-files-full/date_helper.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python + +"""Get today's date""" +from datetime import datetime + +def main(): + """Main function to get today's date""" + now = datetime.now() + utcnow = datetime.utcnow() + fmt = '%Y-%m-%d %H:%M:%S.0' + out = { + 'NOW': now.strftime(fmt), + 'UTCNOW': utcnow.strftime(fmt), + 'DATE': now.strftime('%Y-%m-%d') + } + + print '\n'.join(['%s=%s' % (k, v) for k, v in out.items()]) + +if __name__ == '__main__': + main() diff --git a/workflow/transform-files-full/files-full-parquet.py b/workflow/transform-files-full/files-full-parquet.py new file mode 100644 index 0000000..c24e517 --- /dev/null +++ b/workflow/transform-files-full/files-full-parquet.py @@ -0,0 +1,67 @@ +import argparse + +if not 'sc' in globals(): + from pyspark.context import SparkContext + from pyspark.sql import HiveContext + + sc = SparkContext(appName='FilesToParquet') + sqlContext = HiveContext(sc) + +parser = argparse.ArgumentParser() +parser.add_argument("--sourcedir", help="source directory") +parser.add_argument("--outputdir", help="output directory") +parser.add_argument("--ingestdate", help="ingest date") +parser.add_argument("--schema", help="Schema") +parser.add_argument("--db", help="database") +parser.add_argument("--table", help="table") +args = parser.parse_args() + +options = { + 'sourcedir':args.sourcedir, + 'outputdir':args.outputdir, + 'ingestdate':args.ingestdate, + 'schema':args.schema, + 'db':args.db, + 'table':args.table, + } + +def process(record,schema): + split_text = [] + if schema == 'OSM': + d = "1-" + for i,e in enumerate(record.split("\n1-")): + if i > 0: + split_text.append(d+e) + else: + split_text.append(e) + else: + split_text = [row for row in record.split("\n") if row] + return split_text + +f = sc.wholeTextFiles('%(sourcedir)s/RAW/ingest_date=%(ingestdate)s' % options) +schema = options['schema'] +stage1 = f.collect() +split_text = process(stage1[0][1],schema) +header = split_text[0].split("~^") +split_text.pop(0) +if split_text: + rows = [row.split("~^") for row in split_text if row] + +df_writer = sqlContext.createDataFrame(rows,header) + +df_writer.registerTempTable("df_writer") +sqlContext.sql("create database if not exists ingest_%(db)s" % options) +sqlContext.sql("create table if not exists ingest_%(db)s.%(schema)s_%(table)s_schema \ + row format serde \ + 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' \ + stored as avro \ + tblproperties ( \ + 'avro.schema.url'='%(sourcedir)s/PARQUET/.metadata/schema.avsc' \ + )" % options) +sqlContext.sql("create external table if not exists \ + ingest_%(db)s.%(schema)s_%(table)s_current \ + like ingest_%(db)s.%(schema)s_%(table)s_schema \ + stored as parquet \ + location '%(outputdir)s/CURRENT'" % options) +sqlContext.sql("insert overwrite table ingest_%(db)s.%(schema)s_%(table)s_current \ + select * from df_writer" % options) diff --git a/workflow/transform-files-full/workflow.xml b/workflow/transform-files-full/workflow.xml new file mode 100644 index 0000000..34fa49d --- /dev/null +++ b/workflow/transform-files-full/workflow.xml @@ -0,0 +1,197 @@ + + + + prefix + /user/trace/development/ + + + stagingdb + staging_dev + + + targetdb + ${source_name}_dev + + + outputdir + ${prefix}/source/${source_name}/${schema}_${table}/ + + + sourcedir + ${prefix}/source_files/${source_name}/${schema}_${table}/ + + + staging_tbl + ${source_name}_${schema}_${table} + + + reconcile + append + + + + + + ${resourceManager} + ${nameNode} + python + date_helper.py + date_helper.py + + + + + + + + ${nameNode} + + + + + + + + ${resourceManager} + ${nameNode} + ${nameNode}/${outputdir}/CURRENT/ + ${nameNode}/${outputdir}/PREVIOUS + + + + + + + ${resourceManager} + ${nameNode} + yarn-client + spark-submit + files-full-parquet.py + --sourcedir + ${sourcedir} + --outputdir + ${outputdir} + --ingestdate + ${wf:actionData('getDates')['DATE']} + --schema + ${schema} + --db + ${targetdb} + --table + ${table} + files-full-parquet.py + + + + + + + ${nameNode} + + + + + + + + ${resourceManager} + ${nameNode} + ${nameNode}/${outputdir}/CURRENT/ + ${nameNode}/${outputdir}/ingest_date=${wf:actionData('getDates')['DATE']} + + + + + + + ${nameNode} + + + + + + + + ${resourceManager} + ${nameNode} + SET tez.queue.name=batch; + +CREATE DATABASE IF NOT EXISTS ${targetdb}; + +CREATE DATABASE IF NOT EXISTS INGEST_${targetdb}; + +CREATE DATABASE IF NOT EXISTS ${targetdb}_HISTORY; + +CREATE TABLE IF NOT EXISTS INGEST_${targetdb}.${schema}_${table}_SCHEMA + ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' +STORED AS AVRO +TBLPROPERTIES ( +'avro.schema.url'='${nameNode}/${sourcedir}/PARQUET/.metadata/schema.avsc' +); + +CREATE EXTERNAL TABLE IF NOT EXISTS INGEST_${targetdb}.${schema}_${table}_CURRENT +LIKE INGEST_${targetdb}.${schema}_${table}_SCHEMA +STORED AS PARQUET LOCATION '${outputdir}/CURRENT'; + +CREATE TABLE IF NOT EXISTS ${targetdb}.${schema}_${table} +LIKE INGEST_${targetdb}.${schema}_${table}_SCHEMA +STORED AS ORC; + +INSERT OVERWRITE TABLE ${targetdb}.${schema}_${table} +SELECT * +FROM INGEST_${targetdb}.${schema}_${table}_CURRENT; + +CREATE TABLE IF NOT EXISTS INGEST_${targetdb}.${schema}_${table}_HISTORYSCHEMA +PARTITIONED BY (ingest_date STRING) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' +STORED AS AVRO +TBLPROPERTIES ( +'avro.schema.url'='${nameNode}/${sourcedir}/PARQUET/.metadata/schema.avsc' +); + +CREATE TABLE IF NOT EXISTS ${targetdb}_HISTORY.${schema}_${table} +LIKE INGEST_${targetdb}.${schema}_${table}_HISTORYSCHEMA +STORED AS ORC; + +INSERT OVERWRITE TABLE ${targetdb}_HISTORY.${schema}_${table} +PARTITION (ingest_date='${wf:actionData('getDates')['DATE']}') +SELECT * FROM ${targetdb}.${schema}_${table}; + + + + + + + + ${nameNode} + + + + + + + + + ${nameNode} + + + + + + + + + ${wf:errorMessage(wf:lastErrorNode())} + + + From b06164f7549b75aa2205997bafe8f093f9375482 Mon Sep 17 00:00:00 2001 From: adixsyukri Date: Thu, 31 Oct 2019 15:32:41 +0800 Subject: [PATCH 2/2] add yarn usage monitoring tool --- settings.yml.sample | 4 +++ yarn_usage.py | 88 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+) create mode 100644 settings.yml.sample create mode 100644 yarn_usage.py diff --git a/settings.yml.sample b/settings.yml.sample new file mode 100644 index 0000000..01ec567 --- /dev/null +++ b/settings.yml.sample @@ -0,0 +1,4 @@ +yarn: + hostname: <> + cluster: <> + queue: <> \ No newline at end of file diff --git a/yarn_usage.py b/yarn_usage.py new file mode 100644 index 0000000..6050253 --- /dev/null +++ b/yarn_usage.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python +# Author : Adi Yusman +# Email : adiyusman.yusof@cimb.com +# Date Modified : 2019-04-26 +# Sample command : python yarn-usage.py --settings settings.yml --start yyyy-mm-dd --end yyyy-mm-dd +# Purpose: to get list of all queries or job executed in hive for cluster user +# usage. cloudera manager yarn > applications API +############################################################################### + +import csv +import urllib +import json +import argparse +import time +import yaml + +class Config(object): + + value = '' + + def __init__(self, config_file): + with open(config_file, 'r') as stream: + try: + self.value = yaml.safe_load(stream) + except yaml.YAMLError as exc: + print(exc) + +def retrieve_data(apps): + # Filter required data + appID = apps['applicationId'] + name = apps['name'].replace('\n',' ').replace('\r',' ') + user = apps['user'] + pool = apps['pool'] + state = apps['state'] + startTime = apps['startTime'] + endTime = apps['endTime'] + + # Handle hive query key not exists + if 'hive_query_string' in apps['attributes']: + hiveString = apps['attributes']['hive_query_string'].replace('\n',' ').replace('\r',' ') + else: + hiveString = '' + data=[appID,name,user,pool,state,startTime,endTime,hiveString] + return data + +def dump_csv(data,filename): + # Dump data to csv + with open(filename, mode='a') as outfile: + outfile_writer = csv.writer(outfile, delimiter='|', quotechar='"', quoting=csv.QUOTE_MINIMAL) + outfile_writer.writerow(data) + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--start", help="start date in 'yyyy-mm-dd' format",dest="start") + parser.add_argument("--end", help="end date in 'yyyy-mm-dd' format",dest="end") + parser.add_argument("--settings", help="Python YAML settings file",dest="settings") + args = parser.parse_args() + + startTime = args.start + endTime = args.end + settings = args.settings + + config = Config(settings) + yarn = config.value['yarn'] + + if startTime <= endTime: + url = "https://"+yarn['hostname']+":7183/api/v7/clusters/"+yarn['cluster']+"/services/yarn/yarnApplications?from="+startTime+"T07:00:59.321Z&to="+endTime+"T07:00:59.321Z&filter=pool=root."+yarn['queue']+"&limit=1000" + response = urllib.request.urlopen(url) + data = json.loads(response.read()) + + # Prepare output file + path = 'outfile/' + filename = path+'outfile-'+time.strftime("%Y%m%d%H%M")+'.csv' + with open(filename, mode='w') as outfile: + outfile_writer = csv.writer(outfile, delimiter='|', quotechar='"', quoting=csv.QUOTE_MINIMAL) + outfile_writer.writerow(['APPID', 'NAME','USER','POOL','STATE','STARTTIME','ENDTIME','HIVESTRING']) + + # Start Process + for apps in data['applications']: + data = retrieve_data(apps) + dump_csv(data,filename) + + else: + print("Enter a valid dates input") + +if __name__ == '__main__': + main() +