Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions bin/generate_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
('targetdb', None),
('stagingdb', None),
('backdate', '7'),
('retention', '7'),
('schema', None),
('table', None),
('mapper', None),
Expand Down Expand Up @@ -180,6 +181,9 @@
},
'incremental-ingest-frozen': {
'workflow': 'incremental-ingest-frozen',
},
'data-retention-hdfs': {
'workflow': 'data-retention-hdfs',
}
}

Expand All @@ -188,37 +192,43 @@
'ingest-full': '00:01',
'ingest-increment': '00:01',
'transform-full': '00:30',
'transform-increment': '00:30'
'transform-increment': '00:30',
'data-retention-hdfs': '14:30'
},
'SIEBEL_NOVA': {
'ingest-full': '03:00',
'ingest-increment': '03:00',
'transform-full': '05:00',
'transform-increment': '05:00'
'transform-increment': '05:00',
'data-retention-hdfs': '14:30'
},
'BRM_NOVA': {
'ingest-full': '03:01',
'ingest-increment': '03:01',
'transform-full': '05:00',
'transform-increment': '05:00'
'transform-increment': '05:00',
'data-retention-hdfs': '14:30'
},
'GRANITE': {
'ingest-full': '03:01',
'ingest-increment': '03:01',
'transform-full': '05:00',
'transform-increment': '05:00'
'transform-increment': '05:00',
'data-retention-hdfs': '14:30'
},
'NIS': {
'ingest-full': '03:01',
'ingest-increment': '03:01',
'transform-full': '05:00',
'transform-increment': '05:00'
'transform-increment': '05:00',
'data-retention-hdfs': '14:30'
},
'PORTAL': {
'ingest-full': '03:01',
'ingest-increment': '03:01',
'transform-full': '05:00',
'transform-increment': '05:00'
'transform-increment': '05:00',
'data-retention-hdfs': '14:30'
}
}

Expand All @@ -227,13 +237,13 @@
'path': '%(prefix)s/source/%(source_name)s/%(schema)s_%(table)s/ingest_date=${YEAR}-${MONTH}-${DAY}',
'format': 'parquet',
'exec_time': '00:00',
'retention': 365
'retention': 7
},
'increment-retention': {
'path': '%(prefix)s/source/%(source_name)s/%(schema)s_%(table)s/INCREMENT/ingest_date=${YEAR}-${MONTH}-${DAY}',
'format': 'parquet',
'exec_time': '00:00',
'retention': 365
'retention': 7
},
'full': {
'path': '%(prefix)s/source/%(source_name)s/%(schema)s_%(table)s/CURRENT/',
Expand Down
78 changes: 76 additions & 2 deletions bin/generate_job_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,29 @@
</properties>
</feed>
'''
falcon_hivefeed_template = '''
<feed xmlns='uri:falcon:feed:0.1' name='%(feed_name)s'>
<tags>entity_type=feed,format=%(feed_format)s,stage=%(stage)s,source=%(source_name)s,schema=%(schema)s,table=%(table)s,feed_type=%(feed_type)s</tags>
<availabilityFlag>_SUCCESS</availabilityFlag>
<frequency>days(1)</frequency>
<timezone>GMT+08:00</timezone>
<late-arrival cut-off='hours(18)'/>
<clusters>
<cluster name='TMDATALAKEP' type='source'>
<validity start='%(start_utc)s' end='2099-12-31T00:00Z'/>
%(retention)s
</cluster>
</clusters>
<table uri="%(feed_path)s"/>
<ACL owner='trace' group='users' permission='0x755'/>
<schema location='/none' provider='/none'/>
<properties>
<property name='queueName' value='oozie'></property>
<property name='jobPriority' value='NORMAL'></property>
<property name="oozie.processing.timezone" value="UTC" />
</properties>
</feed>
'''



Expand Down Expand Up @@ -162,10 +185,10 @@

FEEDS = {
'full-retention': {
'path': '%(prefix)s/source/%(source_name)s/%(schema)s_%(table)s/instance_date=${YEAR}-${MONTH}-${DAY}',
'path': '%(prefix)s/source/%(source_name)s/%(schema)s_%(table)s/ingest_date=${YEAR}-${MONTH}-${DAY}',
'format': 'parquet',
'exec_time': '00:00',
'retention': 365
'retention': 7
},
# 'increment-retention': {
# 'path': '%(prefix)s/source/%(source_name)s/%(schema)s_%(table)s/INCREMENT/instance_date=${YEAR}-${MONTH}-${DAY}',
Expand All @@ -185,6 +208,16 @@
# },
}

HIVE_FEEDS = {
'hive-retention' : {
'path': 'catalog:%(targetdb)s_HISTORY:%(schema)s_%(table)s#ingest_date=${YEAR}-${MONTH}-${DAY}',
'format': 'orc',
'exec_time': '00:00',
'retention': 365
}
}


ARTIFACTS='files-artifacts/'

FOREVER=36135
Expand Down Expand Up @@ -316,6 +349,38 @@ def falcon_feed(stage, properties, feed, feed_path, feed_format,
job = falcon_feed_template % params
return params, job

def write_falcon_hivefeed(storedir, stage, properties, feed, feed_path,
feed_format, exec_time='00:00', retention=FOREVER):
filename = '%(source_name)s-%(schema)s-%(table)s.xml' % properties
if not os.path.exists(storedir):
os.makedirs(storedir)
with open('%s/%s' % (storedir, filename), 'w') as f:
params, job = falcon_hivefeed(stage, properties, feed,
feed_path, feed_format, exec_time, retention)
f.write(job)

def falcon_hivefeed(stage, properties, feed, feed_path, feed_format,
exec_time='00:00', retention=FOREVER):
if retention is not None:
rt = "<retention limit='days(%s)' action='delete'/>" % retention
else:
rt = ''
params = {
'schema': properties['schema'],
'table': properties['table'],
'source_name': properties['source_name'],
'start_utc': generate_utc_time(exec_time),
'feed_name': default_feed_name(stage, properties, feed),
'feed_path': feed_path % properties,
'feed_type': feed,
'feed_format': feed_format,
'stage': stage,
'retention': rt
}
job = falcon_hivefeed_template % params
return params, job


def main():
argparser = argparse.ArgumentParser(description='Generate oozie and falcon configurations for ingestion')
argparser.add_argument('tabletsv', help='TSV of the table list')
Expand Down Expand Up @@ -369,6 +434,15 @@ def main():
feed_opts['path'], feed_opts['format'],
feed_opts['exec_time'],
feed_opts.get('retention', FOREVER))
for feed, feed_opts in HIVE_FEEDS.items():
opts = params.copy()
opts['prefix'] = conf['prefix']
opts['targetdb'] = conf['targetdb'] % params
storedir = '%s/%s-falconfeed-%s' % (ARTIFACTS, stage, feed)
write_falcon_hivefeed(storedir, stage, opts, feed,
feed_opts['path'], feed_opts['format'],
feed_opts['exec_time'],
feed_opts.get('retention', FOREVER))

if __name__ == '__main__':
main()
4 changes: 4 additions & 0 deletions settings.yml.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
yarn:
hostname: <<hostnamehere>>
cluster: <<clusternamehere>>
queue: <<queuename>>
22 changes: 22 additions & 0 deletions workflow/data-retention-hdfs/conf/oozie.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<configuration>
<property>
<name>oozie.launcher.mapreduce.job.queuename</name>
<value>oozie</value>
</property>
<property>
<name>mapreduce.job.queuename</name>
<value>oozie</value>
</property>
<property>
<name>mapred.job.queuename</name>
<value>oozie</value>
</property>
<property>
<name>mapred.reduce.tasks</name>
<value>-1</value>
</property>
<property>
<name>tez.queue.name</name>
<value>batch</value>
</property>
</configuration>
44 changes: 44 additions & 0 deletions workflow/data-retention-hdfs/data-retention.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env python

from subprocess import Popen, PIPE
import time
import traceback
import argparse

from datetime import datetime, timedelta, date

parser = argparse.ArgumentParser()
parser.add_argument("--sourcedir", help="Source directory")
parser.add_argument("--retention", help="Days for data retention")
args = parser.parse_args()

OPTIONS = [args.sourcedir, args.sourcedir+'/INCREMENT']

INGEST_DATE = []

for i in OPTIONS:
try:
list_files = Popen(['hdfs','dfs','-ls',i], stdout=PIPE)
list_files.wait()

lines = list_files.stdout.read().strip().split('\n')
except:
traceback.print_exc()

for p in lines:
if 'ingest_date' in p:
INGEST_DATE.append(p)

base_date = (datetime.now() - timedelta(days=int(args.retention))).strftime('%Y-%m-%d')
to_be_deleted = [i.split(' ')[len(i.split(' '))-1] for i in INGEST_DATE if i.split(' ')[len(i.split(' '))-1].split('=')[1] < base_date]

if to_be_deleted:
print("To be deleted")
print '\n'.join([i for i in to_be_deleted])

for item in to_be_deleted:
try:
delete_folder = Popen(['hdfs','dfs','-rm','-r', item], stdout=PIPE)
delete_folder.wait()
except:
traceback.print_exc()
41 changes: 41 additions & 0 deletions workflow/data-retention-hdfs/workflow.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<workflow-app xmlns="uri:oozie:workflow:0.5" name="data-retention-hdfs-${source_name}-${schema}-${table}">
<parameters>
<property>
<name>prefix</name>
<value>/user/trace/development/</value>
</property>
<property>
<name>sourcedir</name>
<value>${prefix}/source/${source_name}/${schema}_${table}/</value>
</property>
<property>
<name>retention</name>
<value>7</value>
</property>
</parameters>

<start to="evictData"/>
<action name="evictData">
<shell
xmlns="uri:oozie:shell-action:0.3">
<job-tracker>${resourceManager}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>conf/oozie.xml</job-xml>
<exec>python</exec>
<argument>data-retention.py</argument>
<argument>--sourcedir</argument>
<argument>${sourcedir}</argument>
<argument>--retention</argument>
<argument>${retention}</argument>
<env-var>HADOOP_USER_NAME=${wf:user()}</env-var>
<file>data-retention.py</file>
</shell>
<ok to="end"/>
<error to="kill"/>
</action>
<kill name="kill">
<message>${wf:errorMessage(wf:lastErrorNode())}</message>
</kill>
<end name="end"/>
</workflow-app>
22 changes: 22 additions & 0 deletions workflow/ingest-files-raw-current/conf/oozie.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<configuration>
<property>
<name>oozie.launcher.mapreduce.job.queuename</name>
<value>oozie</value>
</property>
<property>
<name>mapreduce.job.queuename</name>
<value>oozie</value>
</property>
<property>
<name>mapred.job.queuename</name>
<value>oozie</value>
</property>
<property>
<name>mapred.reduce.tasks</name>
<value>-1</value>
</property>
<property>
<name>tez.queue.name</name>
<value>batch</value>
</property>
</configuration>
Loading