diff --git a/bin/generate_job.py b/bin/generate_job.py
index 1d099da..b45ddab 100755
--- a/bin/generate_job.py
+++ b/bin/generate_job.py
@@ -123,6 +123,7 @@
('targetdb', None),
('stagingdb', None),
('backdate', '7'),
+ ('retention', '7'),
('schema', None),
('table', None),
('mapper', None),
@@ -180,6 +181,9 @@
},
'incremental-ingest-frozen': {
'workflow': 'incremental-ingest-frozen',
+ },
+ 'data-retention-hdfs': {
+ 'workflow': 'data-retention-hdfs',
}
}
@@ -188,37 +192,43 @@
'ingest-full': '00:01',
'ingest-increment': '00:01',
'transform-full': '00:30',
- 'transform-increment': '00:30'
+ 'transform-increment': '00:30',
+ 'data-retention-hdfs': '14:30'
},
'SIEBEL_NOVA': {
'ingest-full': '03:00',
'ingest-increment': '03:00',
'transform-full': '05:00',
- 'transform-increment': '05:00'
+ 'transform-increment': '05:00',
+ 'data-retention-hdfs': '14:30'
},
'BRM_NOVA': {
'ingest-full': '03:01',
'ingest-increment': '03:01',
'transform-full': '05:00',
- 'transform-increment': '05:00'
+ 'transform-increment': '05:00',
+ 'data-retention-hdfs': '14:30'
},
'GRANITE': {
'ingest-full': '03:01',
'ingest-increment': '03:01',
'transform-full': '05:00',
- 'transform-increment': '05:00'
+ 'transform-increment': '05:00',
+ 'data-retention-hdfs': '14:30'
},
'NIS': {
'ingest-full': '03:01',
'ingest-increment': '03:01',
'transform-full': '05:00',
- 'transform-increment': '05:00'
+ 'transform-increment': '05:00',
+ 'data-retention-hdfs': '14:30'
},
'PORTAL': {
'ingest-full': '03:01',
'ingest-increment': '03:01',
'transform-full': '05:00',
- 'transform-increment': '05:00'
+ 'transform-increment': '05:00',
+ 'data-retention-hdfs': '14:30'
}
}
@@ -227,13 +237,13 @@
'path': '%(prefix)s/source/%(source_name)s/%(schema)s_%(table)s/ingest_date=${YEAR}-${MONTH}-${DAY}',
'format': 'parquet',
'exec_time': '00:00',
- 'retention': 365
+ 'retention': 7
},
'increment-retention': {
'path': '%(prefix)s/source/%(source_name)s/%(schema)s_%(table)s/INCREMENT/ingest_date=${YEAR}-${MONTH}-${DAY}',
'format': 'parquet',
'exec_time': '00:00',
- 'retention': 365
+ 'retention': 7
},
'full': {
'path': '%(prefix)s/source/%(source_name)s/%(schema)s_%(table)s/CURRENT/',
diff --git a/bin/generate_job_files.py b/bin/generate_job_files.py
index 13c6380..d767120 100755
--- a/bin/generate_job_files.py
+++ b/bin/generate_job_files.py
@@ -63,6 +63,29 @@
'''
+falcon_hivefeed_template = '''
+
+ entity_type=feed,format=%(feed_format)s,stage=%(stage)s,source=%(source_name)s,schema=%(schema)s,table=%(table)s,feed_type=%(feed_type)s
+ _SUCCESS
+ days(1)
+ GMT+08:00
+
+
+
+
+ %(retention)s
+
+
+
+
+
+
+
+
+
+
+
+'''
@@ -162,10 +185,10 @@
FEEDS = {
'full-retention': {
- 'path': '%(prefix)s/source/%(source_name)s/%(schema)s_%(table)s/instance_date=${YEAR}-${MONTH}-${DAY}',
+ 'path': '%(prefix)s/source/%(source_name)s/%(schema)s_%(table)s/ingest_date=${YEAR}-${MONTH}-${DAY}',
'format': 'parquet',
'exec_time': '00:00',
- 'retention': 365
+ 'retention': 7
},
# 'increment-retention': {
# 'path': '%(prefix)s/source/%(source_name)s/%(schema)s_%(table)s/INCREMENT/instance_date=${YEAR}-${MONTH}-${DAY}',
@@ -185,6 +208,16 @@
# },
}
+HIVE_FEEDS = {
+ 'hive-retention' : {
+ 'path': 'catalog:%(targetdb)s_HISTORY:%(schema)s_%(table)s#ingest_date=${YEAR}-${MONTH}-${DAY}',
+ 'format': 'orc',
+ 'exec_time': '00:00',
+ 'retention': 365
+ }
+}
+
+
ARTIFACTS='files-artifacts/'
FOREVER=36135
@@ -316,6 +349,38 @@ def falcon_feed(stage, properties, feed, feed_path, feed_format,
job = falcon_feed_template % params
return params, job
+def write_falcon_hivefeed(storedir, stage, properties, feed, feed_path,
+ feed_format, exec_time='00:00', retention=FOREVER):
+ filename = '%(source_name)s-%(schema)s-%(table)s.xml' % properties
+ if not os.path.exists(storedir):
+ os.makedirs(storedir)
+ with open('%s/%s' % (storedir, filename), 'w') as f:
+ params, job = falcon_hivefeed(stage, properties, feed,
+ feed_path, feed_format, exec_time, retention)
+ f.write(job)
+
+def falcon_hivefeed(stage, properties, feed, feed_path, feed_format,
+ exec_time='00:00', retention=FOREVER):
+ if retention is not None:
+ rt = "" % retention
+ else:
+ rt = ''
+ params = {
+ 'schema': properties['schema'],
+ 'table': properties['table'],
+ 'source_name': properties['source_name'],
+ 'start_utc': generate_utc_time(exec_time),
+ 'feed_name': default_feed_name(stage, properties, feed),
+ 'feed_path': feed_path % properties,
+ 'feed_type': feed,
+ 'feed_format': feed_format,
+ 'stage': stage,
+ 'retention': rt
+ }
+ job = falcon_hivefeed_template % params
+ return params, job
+
+
def main():
argparser = argparse.ArgumentParser(description='Generate oozie and falcon configurations for ingestion')
argparser.add_argument('tabletsv', help='TSV of the table list')
@@ -369,6 +434,15 @@ def main():
feed_opts['path'], feed_opts['format'],
feed_opts['exec_time'],
feed_opts.get('retention', FOREVER))
+ for feed, feed_opts in HIVE_FEEDS.items():
+ opts = params.copy()
+ opts['prefix'] = conf['prefix']
+ opts['targetdb'] = conf['targetdb'] % params
+ storedir = '%s/%s-falconfeed-%s' % (ARTIFACTS, stage, feed)
+ write_falcon_hivefeed(storedir, stage, opts, feed,
+ feed_opts['path'], feed_opts['format'],
+ feed_opts['exec_time'],
+ feed_opts.get('retention', FOREVER))
if __name__ == '__main__':
main()
diff --git a/settings.yml.sample b/settings.yml.sample
new file mode 100644
index 0000000..01ec567
--- /dev/null
+++ b/settings.yml.sample
@@ -0,0 +1,4 @@
+yarn:
+ hostname: <>
+ cluster: <>
+ queue: <>
\ No newline at end of file
diff --git a/workflow/data-retention-hdfs/conf/oozie.xml b/workflow/data-retention-hdfs/conf/oozie.xml
new file mode 100644
index 0000000..deb54e6
--- /dev/null
+++ b/workflow/data-retention-hdfs/conf/oozie.xml
@@ -0,0 +1,22 @@
+
+
+ oozie.launcher.mapreduce.job.queuename
+ oozie
+
+
+ mapreduce.job.queuename
+ oozie
+
+
+ mapred.job.queuename
+ oozie
+
+
+ mapred.reduce.tasks
+ -1
+
+
+ tez.queue.name
+ batch
+
+
diff --git a/workflow/data-retention-hdfs/data-retention.py b/workflow/data-retention-hdfs/data-retention.py
new file mode 100644
index 0000000..00c2f5a
--- /dev/null
+++ b/workflow/data-retention-hdfs/data-retention.py
@@ -0,0 +1,44 @@
+#!/usr/bin/env python
+
+from subprocess import Popen, PIPE
+import time
+import traceback
+import argparse
+
+from datetime import datetime, timedelta, date
+
+parser = argparse.ArgumentParser()
+parser.add_argument("--sourcedir", help="Source directory")
+parser.add_argument("--retention", help="Days for data retention")
+args = parser.parse_args()
+
+OPTIONS = [args.sourcedir, args.sourcedir+'/INCREMENT']
+
+INGEST_DATE = []
+
+for i in OPTIONS:
+ try:
+ list_files = Popen(['hdfs','dfs','-ls',i], stdout=PIPE)
+ list_files.wait()
+
+ lines = list_files.stdout.read().strip().split('\n')
+ except:
+ traceback.print_exc()
+
+ for p in lines:
+ if 'ingest_date' in p:
+ INGEST_DATE.append(p)
+
+base_date = (datetime.now() - timedelta(days=int(args.retention))).strftime('%Y-%m-%d')
+to_be_deleted = [i.split(' ')[len(i.split(' '))-1] for i in INGEST_DATE if i.split(' ')[len(i.split(' '))-1].split('=')[1] < base_date]
+
+if to_be_deleted:
+ print("To be deleted")
+ print '\n'.join([i for i in to_be_deleted])
+
+ for item in to_be_deleted:
+ try:
+ delete_folder = Popen(['hdfs','dfs','-rm','-r', item], stdout=PIPE)
+ delete_folder.wait()
+ except:
+ traceback.print_exc()
diff --git a/workflow/data-retention-hdfs/workflow.xml b/workflow/data-retention-hdfs/workflow.xml
new file mode 100644
index 0000000..be74b47
--- /dev/null
+++ b/workflow/data-retention-hdfs/workflow.xml
@@ -0,0 +1,41 @@
+
+
+
+
+ prefix
+ /user/trace/development/
+
+
+ sourcedir
+ ${prefix}/source/${source_name}/${schema}_${table}/
+
+
+ retention
+ 7
+
+
+
+
+
+
+ ${resourceManager}
+ ${nameNode}
+ conf/oozie.xml
+ python
+ data-retention.py
+ --sourcedir
+ ${sourcedir}
+ --retention
+ ${retention}
+ HADOOP_USER_NAME=${wf:user()}
+ data-retention.py
+
+
+
+
+
+ ${wf:errorMessage(wf:lastErrorNode())}
+
+
+
diff --git a/workflow/ingest-files-raw-current/conf/oozie.xml b/workflow/ingest-files-raw-current/conf/oozie.xml
new file mode 100644
index 0000000..deb54e6
--- /dev/null
+++ b/workflow/ingest-files-raw-current/conf/oozie.xml
@@ -0,0 +1,22 @@
+
+
+ oozie.launcher.mapreduce.job.queuename
+ oozie
+
+
+ mapreduce.job.queuename
+ oozie
+
+
+ mapred.job.queuename
+ oozie
+
+
+ mapred.reduce.tasks
+ -1
+
+
+ tez.queue.name
+ batch
+
+
diff --git a/workflow/ingest-files-raw-current/conf/oraoop-site.xml b/workflow/ingest-files-raw-current/conf/oraoop-site.xml
new file mode 100755
index 0000000..21c3d18
--- /dev/null
+++ b/workflow/ingest-files-raw-current/conf/oraoop-site.xml
@@ -0,0 +1,104 @@
+
+
+
+
+
+
+
+
+
+ oraoop.oracle.session.initialization.statements
+ alter session disable parallel query;
+ alter session set "_serial_direct_read"=true;
+ alter session set tracefile_identifier=oraoop;
+ --alter session set events '10046 trace name context forever, level 8';
+ alter session set time_zone = '{oracle.sessionTimeZone|GMT}';
+
+ A semicolon-delimited list of Oracle statements that are executed, in order, to initialize each Oracle session.
+ Use {[property_name]|[default_value]} characters to refer to a Sqoop/Hadoop configuration property.
+ If the property does not exist, the specified default value will be used.
+ E.g. {oracle.sessionTimeZone|GMT} will equate to the value of the property named "oracle.sessionTimeZone" or
+ to "GMT" if this property has not been set.
+
+
+
+
+ mapred.map.tasks.speculative.execution
+ false
+ Speculative execution is disabled to prevent redundant load on the Oracle database.
+
+
+
+
+ oraoop.import.hint
+ NO_INDEX(t)
+ Hint to add to the SELECT statement for an IMPORT job.
+ The table will have an alias of t which can be used in the hint.
+ By default the NO_INDEX hint is applied to stop the use of an index.
+ To override this in oraoop-site.xml set the value to a blank string.
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/workflow/ingest-files-raw-current/date_helper.py b/workflow/ingest-files-raw-current/date_helper.py
new file mode 100755
index 0000000..53bf3ac
--- /dev/null
+++ b/workflow/ingest-files-raw-current/date_helper.py
@@ -0,0 +1,20 @@
+#!/usr/bin/env python
+
+"""Get today's date"""
+from datetime import datetime
+
+def main():
+ """Main function to get today's date"""
+ now = datetime.now()
+ utcnow = datetime.utcnow()
+ fmt = '%Y-%m-%d %H:%M:%S.0'
+ out = {
+ 'NOW': now.strftime(fmt),
+ 'UTCNOW': utcnow.strftime(fmt),
+ 'DATE': now.strftime('%Y-%m-%d')
+ }
+
+ print '\n'.join(['%s=%s' % (k, v) for k, v in out.items()])
+
+if __name__ == '__main__':
+ main()
diff --git a/workflow/ingest-files-raw-current/workflow.xml b/workflow/ingest-files-raw-current/workflow.xml
new file mode 100644
index 0000000..9e16992
--- /dev/null
+++ b/workflow/ingest-files-raw-current/workflow.xml
@@ -0,0 +1,81 @@
+
+
+
+ prefix
+ /user/trace/development/
+
+
+ stagingdb
+ staging_dev
+
+
+ targetdb
+ ${source_name}_dev
+
+
+ outputdir
+ ${prefix}/source/${source_name}/${table}/
+
+
+ sourcedir
+ ${prefix}/source_files/${source_name}/${table}/
+
+
+ staging_tbl
+ ${source_name}_${schema}_${table}
+
+
+ reconcile
+ append
+
+
+
+
+
+
+ ${resourceManager}
+ ${nameNode}
+ python
+ date_helper.py
+ date_helper.py
+
+
+
+
+
+
+
+ ${nameNode}
+
+
+
+
+
+
+
+
+ ${resourceManager}
+ ${nameNode}
+ -Dmapreduce.job.queuename=distcp
+ ${nameNode}/${sourcedir}/RAW/ingest_date=${wf:actionData('getDates')['DATE']}
+ ${nameNode}/${sourcedir}/RAW/CURRENT
+
+
+
+
+
+
+ ${nameNode}
+
+
+
+
+
+
+ ${wf:errorMessage(wf:lastErrorNode())}
+
+
+
diff --git a/workflow/transform-files-full/date_helper.py b/workflow/transform-files-full/date_helper.py
new file mode 100755
index 0000000..53bf3ac
--- /dev/null
+++ b/workflow/transform-files-full/date_helper.py
@@ -0,0 +1,20 @@
+#!/usr/bin/env python
+
+"""Get today's date"""
+from datetime import datetime
+
+def main():
+ """Main function to get today's date"""
+ now = datetime.now()
+ utcnow = datetime.utcnow()
+ fmt = '%Y-%m-%d %H:%M:%S.0'
+ out = {
+ 'NOW': now.strftime(fmt),
+ 'UTCNOW': utcnow.strftime(fmt),
+ 'DATE': now.strftime('%Y-%m-%d')
+ }
+
+ print '\n'.join(['%s=%s' % (k, v) for k, v in out.items()])
+
+if __name__ == '__main__':
+ main()
diff --git a/workflow/transform-files-full/files-full-parquet.py b/workflow/transform-files-full/files-full-parquet.py
new file mode 100644
index 0000000..c24e517
--- /dev/null
+++ b/workflow/transform-files-full/files-full-parquet.py
@@ -0,0 +1,67 @@
+import argparse
+
+if not 'sc' in globals():
+ from pyspark.context import SparkContext
+ from pyspark.sql import HiveContext
+
+ sc = SparkContext(appName='FilesToParquet')
+ sqlContext = HiveContext(sc)
+
+parser = argparse.ArgumentParser()
+parser.add_argument("--sourcedir", help="source directory")
+parser.add_argument("--outputdir", help="output directory")
+parser.add_argument("--ingestdate", help="ingest date")
+parser.add_argument("--schema", help="Schema")
+parser.add_argument("--db", help="database")
+parser.add_argument("--table", help="table")
+args = parser.parse_args()
+
+options = {
+ 'sourcedir':args.sourcedir,
+ 'outputdir':args.outputdir,
+ 'ingestdate':args.ingestdate,
+ 'schema':args.schema,
+ 'db':args.db,
+ 'table':args.table,
+ }
+
+def process(record,schema):
+ split_text = []
+ if schema == 'OSM':
+ d = "1-"
+ for i,e in enumerate(record.split("\n1-")):
+ if i > 0:
+ split_text.append(d+e)
+ else:
+ split_text.append(e)
+ else:
+ split_text = [row for row in record.split("\n") if row]
+ return split_text
+
+f = sc.wholeTextFiles('%(sourcedir)s/RAW/ingest_date=%(ingestdate)s' % options)
+schema = options['schema']
+stage1 = f.collect()
+split_text = process(stage1[0][1],schema)
+header = split_text[0].split("~^")
+split_text.pop(0)
+if split_text:
+ rows = [row.split("~^") for row in split_text if row]
+
+df_writer = sqlContext.createDataFrame(rows,header)
+
+df_writer.registerTempTable("df_writer")
+sqlContext.sql("create database if not exists ingest_%(db)s" % options)
+sqlContext.sql("create table if not exists ingest_%(db)s.%(schema)s_%(table)s_schema \
+ row format serde \
+ 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' \
+ stored as avro \
+ tblproperties ( \
+ 'avro.schema.url'='%(sourcedir)s/PARQUET/.metadata/schema.avsc' \
+ )" % options)
+sqlContext.sql("create external table if not exists \
+ ingest_%(db)s.%(schema)s_%(table)s_current \
+ like ingest_%(db)s.%(schema)s_%(table)s_schema \
+ stored as parquet \
+ location '%(outputdir)s/CURRENT'" % options)
+sqlContext.sql("insert overwrite table ingest_%(db)s.%(schema)s_%(table)s_current \
+ select * from df_writer" % options)
diff --git a/workflow/transform-files-full/workflow.xml b/workflow/transform-files-full/workflow.xml
new file mode 100644
index 0000000..34fa49d
--- /dev/null
+++ b/workflow/transform-files-full/workflow.xml
@@ -0,0 +1,197 @@
+
+
+
+ prefix
+ /user/trace/development/
+
+
+ stagingdb
+ staging_dev
+
+
+ targetdb
+ ${source_name}_dev
+
+
+ outputdir
+ ${prefix}/source/${source_name}/${schema}_${table}/
+
+
+ sourcedir
+ ${prefix}/source_files/${source_name}/${schema}_${table}/
+
+
+ staging_tbl
+ ${source_name}_${schema}_${table}
+
+
+ reconcile
+ append
+
+
+
+
+
+ ${resourceManager}
+ ${nameNode}
+ python
+ date_helper.py
+ date_helper.py
+
+
+
+
+
+
+
+ ${nameNode}
+
+
+
+
+
+
+
+ ${resourceManager}
+ ${nameNode}
+ ${nameNode}/${outputdir}/CURRENT/
+ ${nameNode}/${outputdir}/PREVIOUS
+
+
+
+
+
+
+ ${resourceManager}
+ ${nameNode}
+ yarn-client
+ spark-submit
+ files-full-parquet.py
+ --sourcedir
+ ${sourcedir}
+ --outputdir
+ ${outputdir}
+ --ingestdate
+ ${wf:actionData('getDates')['DATE']}
+ --schema
+ ${schema}
+ --db
+ ${targetdb}
+ --table
+ ${table}
+ files-full-parquet.py
+
+
+
+
+
+
+ ${nameNode}
+
+
+
+
+
+
+
+ ${resourceManager}
+ ${nameNode}
+ ${nameNode}/${outputdir}/CURRENT/
+ ${nameNode}/${outputdir}/ingest_date=${wf:actionData('getDates')['DATE']}
+
+
+
+
+
+
+ ${nameNode}
+
+
+
+
+
+
+
+ ${resourceManager}
+ ${nameNode}
+ SET tez.queue.name=batch;
+
+CREATE DATABASE IF NOT EXISTS ${targetdb};
+
+CREATE DATABASE IF NOT EXISTS INGEST_${targetdb};
+
+CREATE DATABASE IF NOT EXISTS ${targetdb}_HISTORY;
+
+CREATE TABLE IF NOT EXISTS INGEST_${targetdb}.${schema}_${table}_SCHEMA
+ ROW FORMAT SERDE
+ 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
+STORED AS AVRO
+TBLPROPERTIES (
+'avro.schema.url'='${nameNode}/${sourcedir}/PARQUET/.metadata/schema.avsc'
+);
+
+CREATE EXTERNAL TABLE IF NOT EXISTS INGEST_${targetdb}.${schema}_${table}_CURRENT
+LIKE INGEST_${targetdb}.${schema}_${table}_SCHEMA
+STORED AS PARQUET LOCATION '${outputdir}/CURRENT';
+
+CREATE TABLE IF NOT EXISTS ${targetdb}.${schema}_${table}
+LIKE INGEST_${targetdb}.${schema}_${table}_SCHEMA
+STORED AS ORC;
+
+INSERT OVERWRITE TABLE ${targetdb}.${schema}_${table}
+SELECT *
+FROM INGEST_${targetdb}.${schema}_${table}_CURRENT;
+
+CREATE TABLE IF NOT EXISTS INGEST_${targetdb}.${schema}_${table}_HISTORYSCHEMA
+PARTITIONED BY (ingest_date STRING)
+ROW FORMAT SERDE
+ 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
+STORED AS AVRO
+TBLPROPERTIES (
+'avro.schema.url'='${nameNode}/${sourcedir}/PARQUET/.metadata/schema.avsc'
+);
+
+CREATE TABLE IF NOT EXISTS ${targetdb}_HISTORY.${schema}_${table}
+LIKE INGEST_${targetdb}.${schema}_${table}_HISTORYSCHEMA
+STORED AS ORC;
+
+INSERT OVERWRITE TABLE ${targetdb}_HISTORY.${schema}_${table}
+PARTITION (ingest_date='${wf:actionData('getDates')['DATE']}')
+SELECT * FROM ${targetdb}.${schema}_${table};
+
+
+
+
+
+
+
+ ${nameNode}
+
+
+
+
+
+
+
+
+ ${nameNode}
+
+
+
+
+
+
+
+
+ ${wf:errorMessage(wf:lastErrorNode())}
+
+
+
diff --git a/yarn_usage.py b/yarn_usage.py
new file mode 100644
index 0000000..6050253
--- /dev/null
+++ b/yarn_usage.py
@@ -0,0 +1,88 @@
+#!/usr/bin/env python
+# Author : Adi Yusman
+# Email : adiyusman.yusof@cimb.com
+# Date Modified : 2019-04-26
+# Sample command : python yarn-usage.py --settings settings.yml --start yyyy-mm-dd --end yyyy-mm-dd
+# Purpose: to get list of all queries or job executed in hive for cluster user
+# usage. cloudera manager yarn > applications API
+###############################################################################
+
+import csv
+import urllib
+import json
+import argparse
+import time
+import yaml
+
+class Config(object):
+
+ value = ''
+
+ def __init__(self, config_file):
+ with open(config_file, 'r') as stream:
+ try:
+ self.value = yaml.safe_load(stream)
+ except yaml.YAMLError as exc:
+ print(exc)
+
+def retrieve_data(apps):
+ # Filter required data
+ appID = apps['applicationId']
+ name = apps['name'].replace('\n',' ').replace('\r',' ')
+ user = apps['user']
+ pool = apps['pool']
+ state = apps['state']
+ startTime = apps['startTime']
+ endTime = apps['endTime']
+
+ # Handle hive query key not exists
+ if 'hive_query_string' in apps['attributes']:
+ hiveString = apps['attributes']['hive_query_string'].replace('\n',' ').replace('\r',' ')
+ else:
+ hiveString = ''
+ data=[appID,name,user,pool,state,startTime,endTime,hiveString]
+ return data
+
+def dump_csv(data,filename):
+ # Dump data to csv
+ with open(filename, mode='a') as outfile:
+ outfile_writer = csv.writer(outfile, delimiter='|', quotechar='"', quoting=csv.QUOTE_MINIMAL)
+ outfile_writer.writerow(data)
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--start", help="start date in 'yyyy-mm-dd' format",dest="start")
+ parser.add_argument("--end", help="end date in 'yyyy-mm-dd' format",dest="end")
+ parser.add_argument("--settings", help="Python YAML settings file",dest="settings")
+ args = parser.parse_args()
+
+ startTime = args.start
+ endTime = args.end
+ settings = args.settings
+
+ config = Config(settings)
+ yarn = config.value['yarn']
+
+ if startTime <= endTime:
+ url = "https://"+yarn['hostname']+":7183/api/v7/clusters/"+yarn['cluster']+"/services/yarn/yarnApplications?from="+startTime+"T07:00:59.321Z&to="+endTime+"T07:00:59.321Z&filter=pool=root."+yarn['queue']+"&limit=1000"
+ response = urllib.request.urlopen(url)
+ data = json.loads(response.read())
+
+ # Prepare output file
+ path = 'outfile/'
+ filename = path+'outfile-'+time.strftime("%Y%m%d%H%M")+'.csv'
+ with open(filename, mode='w') as outfile:
+ outfile_writer = csv.writer(outfile, delimiter='|', quotechar='"', quoting=csv.QUOTE_MINIMAL)
+ outfile_writer.writerow(['APPID', 'NAME','USER','POOL','STATE','STARTTIME','ENDTIME','HIVESTRING'])
+
+ # Start Process
+ for apps in data['applications']:
+ data = retrieve_data(apps)
+ dump_csv(data,filename)
+
+ else:
+ print("Enter a valid dates input")
+
+if __name__ == '__main__':
+ main()
+