From b091e681cfd1bfd8ddc8563c333c500972149096 Mon Sep 17 00:00:00 2001 From: Angad Singh Date: Sun, 29 Jun 2014 14:33:41 +0530 Subject: [PATCH 1/2] * Path exclusion/inclusion filtering * Added logic to FileLister.pruneListBySize method to prune orphan job confs and renamed to just pruneList (+1 squashed commit) --- bin/etl/hraven-etl.sh | 4 +- bin/etl/jobFilePreprocessor.sh | 6 +- .../com/twitter/hraven/etl/FileLister.java | 46 ++++++++--- ...FileModifiedRangeSubstringPathFilter.java} | 78 ++++++++++++++++--- .../hraven/etl/JobFilePartitioner.java | 2 +- .../hraven/etl/JobFilePreprocessor.java | 50 +++++++++++- .../twitter/hraven/etl/TestFileLister.java | 8 +- 7 files changed, 160 insertions(+), 34 deletions(-) rename hraven-etl/src/main/java/com/twitter/hraven/etl/{JobFileModifiedRangePathFilter.java => JobFileModifiedRangeSubstringPathFilter.java} (65%) diff --git a/bin/etl/hraven-etl.sh b/bin/etl/hraven-etl.sh index b2e2caa..62900a6 100755 --- a/bin/etl/hraven-etl.sh +++ b/bin/etl/hraven-etl.sh @@ -42,6 +42,8 @@ hbaseconfdir=${HBASE_CONF_DIR:-$HBASE_HOME/conf} # HDFS directories for processing and loading job history data historyRawDir=/yarn/history/done/ historyProcessingDir=/hraven/processing/ +pathExclusionFilter=distcp +pathInclusionFilter=rmcuser ####################################################### #If costfile is empty, fill it with default values @@ -65,7 +67,7 @@ create_pidfile $HRAVEN_PID_DIR trap 'cleanup_pidfile_and_exit $HRAVEN_PID_DIR' INT TERM EXIT # Pre-process -$home/jobFilePreprocessor.sh $hadoopconfdir $historyRawDir $historyProcessingDir $cluster $batchsize $defaultrawfilesizelimit +$home/jobFilePreprocessor.sh $hadoopconfdir $historyRawDir $historyProcessingDir $cluster $batchsize $defaultrawfilesizelimit $pathExclusionFilter $pathInclusionFilter # Load $home/jobFileLoader.sh $hadoopconfdir $mapredmaxsplitsize $schedulerpoolname $cluster $historyProcessingDir diff --git a/bin/etl/jobFilePreprocessor.sh b/bin/etl/jobFilePreprocessor.sh index fd478b6..bf6452a 100755 --- a/bin/etl/jobFilePreprocessor.sh +++ b/bin/etl/jobFilePreprocessor.sh @@ -19,9 +19,9 @@ # Usage ./jobFilePreprocessor.sh [hadoopconfdir] # [historyrawdir] [historyprocessingdir] [cluster] [batchsize] -if [ $# -ne 6 ] +if [ $# -lt 6 ] then - echo "Usage: `basename $0` [hadoopconfdir] [historyrawdir] [historyprocessingdir] [cluster] [batchsize] [defaultrawfilesizelimit]" + echo "Usage: `basename $0` [hadoopconfdir] [historyrawdir] [historyprocessingdir] [cluster] [batchsize] [defaultrawfilesizelimit] [[pathExclusionFilter]] [[pathInclusionFilter]]" exit 1 fi @@ -39,4 +39,4 @@ fi create_pidfile $HRAVEN_PID_DIR trap 'cleanup_pidfile_and_exit $HRAVEN_PID_DIR' INT TERM EXIT -hadoop --config $1 jar $hravenEtlJar com.twitter.hraven.etl.JobFilePreprocessor -libjars=$LIBJARS -d -i $2 -o $3 -c $4 -b $5 -s $6 \ No newline at end of file +hadoop --config $1 jar $hravenEtlJar com.twitter.hraven.etl.JobFilePreprocessor -libjars=$LIBJARS -d -i $2 -o $3 -c $4 -b $5 -s $6 -ex $7 -ix $8 diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/FileLister.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/FileLister.java index 91fc66a..21cc056 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/etl/FileLister.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/FileLister.java @@ -51,7 +51,7 @@ public FileLister() { * */ private static void traverseDirs(List fileStatusesList, FileSystem hdfs, - Path inputPath, JobFileModifiedRangePathFilter jobFileModifiedRangePathFilter) + Path inputPath, JobFileModifiedRangeSubstringPathFilter jobFileModifiedRangePathFilter) throws IOException { // get all the files and dirs in the current dir @@ -83,7 +83,7 @@ private static void traverseDirs(List fileStatusesList, FileSystem h * @throws IOException */ public static FileStatus[] listFiles (boolean recurse, FileSystem hdfs, Path inputPath, - JobFileModifiedRangePathFilter jobFileModifiedRangePathFilter) throws IOException + JobFileModifiedRangeSubstringPathFilter jobFileModifiedRangePathFilter) throws IOException { if (recurse) { List fileStatusesList = new ArrayList(); @@ -113,7 +113,7 @@ public static FileStatus[] listFiles (boolean recurse, FileSystem hdfs, Path inp * @throws IOException */ public static FileStatus[] getListFilesToProcess(long maxFileSize, boolean recurse, - FileSystem hdfs, Path inputPath, JobFileModifiedRangePathFilter pathFilter) + FileSystem hdfs, Path inputPath, JobFileModifiedRangeSubstringPathFilter pathFilter) throws IOException { LOG.info(" in getListFilesToProcess maxFileSize=" + maxFileSize @@ -124,10 +124,13 @@ public static FileStatus[] getListFilesToProcess(long maxFileSize, boolean recur return new FileStatus[0]; } return pruneFileListBySize(maxFileSize, origList, hdfs, inputPath); + return pruneFileList(maxFileSize, origList); } /** * prunes the given list/array of files based on their sizes + * also prunes orphan job conf xml files which do not have + * corresponding history files * * @param maxFileSize -max #bytes to be stored in an hbase cell * @param origList - input list of files to be processed @@ -135,20 +138,36 @@ public static FileStatus[] getListFilesToProcess(long maxFileSize, boolean recur * @param inputPath - root dir of the path containing history files * @return - pruned array of FileStatus of files to be processed */ - static FileStatus[] pruneFileListBySize(long maxFileSize, FileStatus[] origList, FileSystem hdfs, - Path inputPath) { - LOG.info("Pruning orig list of size " + origList.length + " for source" + inputPath.toUri()); + static FileStatus[] pruneFileList(long maxFileSize, FileStatus[] origList) { + LOG.info("Pruning orig list of size " + origList.length); long fileSize = 0L; + List prunedFileList = new ArrayList(); Set toBeRemovedJobId = new HashSet(); + Set historyFileJobIds = new HashSet(); + for (int i = 0; i < origList.length; i++) { fileSize = origList[i].getLen(); // check if hbase can store this file if yes, consider it for processing if (fileSize <= maxFileSize) { prunedFileList.add(origList[i]); + + // Maintain a list of history file jobIds to prune orphan xml files + // which do not have a corresponding history file. This is required for + // the path exclusion and inclusion filters to work properly, + // as exclusion filters do not filter on job conf files, resulting + // in a lot of orphan job conf files being loaded. + JobFile jFile = getJobFileFromPath(origList[i].getPath()); + if (jFile != null) { + if (jFile.isJobHistoryFile()) { + String jobId = jFile.getJobid(); + if (jobId != null) + historyFileJobIds.add(jobId); + } + } } else { Path hugeFile = origList[i].getPath(); LOG.info("In getListFilesToProcess filesize " + fileSize + " has exceeded maxFileSize " @@ -158,6 +177,7 @@ static FileStatus[] pruneFileListBySize(long maxFileSize, FileStatus[] origList, toBeRemovedJobId.add(getJobIdFromPath(hugeFile)); } } + if (prunedFileList.size() == 0) { LOG.info("Found no files worth processing. Returning 0 sized array"); return new FileStatus[0]; @@ -166,13 +186,12 @@ static FileStatus[] pruneFileListBySize(long maxFileSize, FileStatus[] origList, String jobId = null; ListIterator it = prunedFileList.listIterator(); while (it.hasNext()) { - if (toBeRemovedJobId.size() == 0) { - // no files to remove - break; - } Path curFile = it.next().getPath(); jobId = getJobIdFromPath(curFile); - if (toBeRemovedJobId.contains(jobId)) { + + //Remove job if jobId found in toBeRemovedJobIds (for which either file was 'huge') + //or if this file did not have corresponding history file to go with it + if (toBeRemovedJobId.contains(jobId) || !historyFileJobIds.contains(jobId)) { LOG.info("Removing from prunedList " + curFile.toUri()); it.remove(); /* @@ -200,4 +219,9 @@ static String getJobIdFromPath(Path aPath) { } return jobId; } + + static JobFile getJobFileFromPath(Path aPath) { + String fileName = aPath.getName(); + return new JobFile(fileName); + } } diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFileModifiedRangePathFilter.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFileModifiedRangeSubstringPathFilter.java similarity index 65% rename from hraven-etl/src/main/java/com/twitter/hraven/etl/JobFileModifiedRangePathFilter.java rename to hraven-etl/src/main/java/com/twitter/hraven/etl/JobFileModifiedRangeSubstringPathFilter.java index c283a89..4097e8a 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFileModifiedRangePathFilter.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFileModifiedRangeSubstringPathFilter.java @@ -23,13 +23,14 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobHistory; /** * Pathfilter that allows only files that are named correctly and are modified * within a certain time range. * */ -public class JobFileModifiedRangePathFilter extends JobFilePathFilter { +public class JobFileModifiedRangeSubstringPathFilter extends JobFilePathFilter { /** * The minimum modification time of a file to be accepted in milliseconds @@ -47,7 +48,12 @@ public class JobFileModifiedRangePathFilter extends JobFilePathFilter { * The configuration of this processing job (not the files we are processing). */ private final Configuration myConf; - private static Log LOG = LogFactory.getLog(JobFileModifiedRangePathFilter.class); + + private String[] pathExclusionFilter; + + private String[] pathInclusionFilter; + + private static Log LOG = LogFactory.getLog(JobFileModifiedRangeSubstringPathFilter.class); /** * Constructs a filter that accepts only JobFiles with lastModification time @@ -62,11 +68,9 @@ public class JobFileModifiedRangePathFilter extends JobFilePathFilter { * maximum modification time of a file to be accepted in milliseconds * since January 1, 1970 UTC (including). */ - public JobFileModifiedRangePathFilter(Configuration myConf, + public JobFileModifiedRangeSubstringPathFilter(Configuration myConf, long minModificationTimeMillis, long maxModificationTimeMillis) { - this.myConf = myConf; - this.minModificationTimeMillis = minModificationTimeMillis; - this.maxModificationTimeMillis = maxModificationTimeMillis; + this(myConf, minModificationTimeMillis, maxModificationTimeMillis, null, null); } /** @@ -79,11 +83,35 @@ public JobFileModifiedRangePathFilter(Configuration myConf, * The minimum modification time of a file to be accepted in * milliseconds since January 1, 1970 UTC (excluding). */ - public JobFileModifiedRangePathFilter(Configuration myConf, + public JobFileModifiedRangeSubstringPathFilter(Configuration myConf, long minModificationTimeMillis) { this(myConf, minModificationTimeMillis, Long.MAX_VALUE); } + /** + * Constructs a filter that accepts only JobFiles with lastModification time + * as least the specified minumum. Also accepts a simple substring to exclude. + * + * @param myConf + * used to be able to go from a path to a FileStatus. + * @param minModificationTimeMillis + * The minimum modification time of a file to be accepted in + * milliseconds since January 1, 1970 UTC (excluding). + * @param maxModificationTimeMillis The + * maximum modification time of a file to be accepted in milliseconds + * since January 1, 1970 UTC (including). + * @param pathExclusionFilter + * files with this substring path will be excluded + */ + public JobFileModifiedRangeSubstringPathFilter(Configuration myConf, long minModificationTimeMillis, long maxModificationTimeMillis, + String[] pathExclusionFilter, String[] pathInclusionFilter) { + this.myConf = myConf; + this.minModificationTimeMillis = minModificationTimeMillis; + this.maxModificationTimeMillis = maxModificationTimeMillis; + this.pathExclusionFilter = pathExclusionFilter; + this.pathInclusionFilter = pathInclusionFilter; + } + /* * (non-Javadoc) * @@ -96,25 +124,53 @@ public boolean accept(Path path) { if (!super.accept(path)) { return false; } - + JobFile jobFile = new JobFile(path.getName()); if (jobFile.isJobConfFile() || jobFile.isJobHistoryFile()) { + if (jobFile.isJobHistoryFile()) { + if (!includesPathSubstrings(path) || !excludesPathSubstrings(path)) { + return false; + } + } try { FileSystem fs = path.getFileSystem(myConf); FileStatus fileStatus = fs.getFileStatus(path); long fileModificationTimeMillis = fileStatus.getModificationTime(); return accept(fileModificationTimeMillis); } catch (IOException e) { - throw new ImportException("Cannot determine file modification time of " - + path.getName(), e); + throw new ImportException("Cannot determine file modification time of " + path.getName(), e); } } else { // Reject anything that does not match a job conf filename. - LOG.info(" Not a valid job conf / job history file "+ path.getName()); + LOG.info(" Not a valid job conf / job history file " + path.getName()); return false; } } + private boolean excludesPathSubstrings(Path path) { + if (pathExclusionFilter == null) + return true; + + for (String s: pathExclusionFilter) { + if (path.toString().indexOf(s) != -1) + return false; + } + + return true; + } + + private boolean includesPathSubstrings(Path path) { + if (pathInclusionFilter == null) + return true; + boolean matches = false; + for (String s: pathInclusionFilter) { + if (path.toString().indexOf(s) != -1) + matches = true; + } + + return matches; + } + /** * @param fileModificationTimeMillis * in milliseconds since January 1, 1970 UTC diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePartitioner.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePartitioner.java index f93d7d8..ef255ad 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePartitioner.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePartitioner.java @@ -316,7 +316,7 @@ private void processHDFSSources(Path inputPath) throws IOException { // Accept only jobFiles and only those that fall in the desired range of // modification time. - JobFileModifiedRangePathFilter jobFileModifiedRangePathFilter = new JobFileModifiedRangePathFilter( + JobFileModifiedRangeSubstringPathFilter jobFileModifiedRangePathFilter = new JobFileModifiedRangeSubstringPathFilter( myConf, 0L); ContentSummary contentSummary = hdfs.getContentSummary(inputPath); diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java index c34f85a..64e4270 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java @@ -18,6 +18,8 @@ import java.io.IOException; import java.util.Arrays; import java.util.Date; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -48,7 +50,7 @@ import com.twitter.hraven.etl.ProcessRecordService; import com.twitter.hraven.util.BatchUtil; import com.twitter.hraven.etl.FileLister; -import com.twitter.hraven.etl.JobFileModifiedRangePathFilter; +import com.twitter.hraven.etl.JobFileModifiedRangeSubstringPathFilter; /** * Command line tool that can be run on a periodic basis (like daily, hourly, or @@ -167,6 +169,21 @@ private static CommandLine parseArgs(String[] args) throws ParseException { o.setRequired(false); options.addOption(o); + + // Path substring to exclude from job history files + o = new Option("ex", "pathExclusionFilter", true, + "Comma seperated list of path substrings to exclude from job history files"); + o.setArgName("pathExclusionFilter"); + o.setRequired(false); + options.addOption(o); + + // Path substring to include in job history files (inclusions take precedence over exclusions) + o = new Option("ix", "pathInclusionFilter", true, + "Comma seperated list of path substrings to include in job history files"); + o.setArgName("pathInclusionFilter"); + o.setRequired(false); + options.addOption(o); + // Debugging options.addOption("d", "debug", false, "switch on DEBUG log level"); @@ -315,10 +332,37 @@ public int run(String[] args) throws Exception { + lastProcessRecord); } + // Grap the pathExclusionFilter and pathInclusionFilter + String[] pathExclusionFilter = null; + + if (commandLine.getOptionValue("ex") != null) { + try { + pathExclusionFilter = commandLine.getOptionValue("ex").split(","); + LOG.info("pathExclusionFilter: " + Arrays.toString(pathExclusionFilter)); + } catch (Exception e) { + throw new RuntimeException( + "Please provide a comma seperated list of substrings to exclude. This is invalid: " + + commandLine.getOptionValue("ex")); + } + } + + String[] pathInclusionFilter = null; + + if (commandLine.getOptionValue("ix") != null) { + try { + pathInclusionFilter = commandLine.getOptionValue("ix").split(","); + LOG.info("pathInclusionFilter: " + Arrays.toString(pathInclusionFilter)); + } catch (Exception e) { + throw new RuntimeException( + "Please provide a comma seperated list of substrings to include. This is invalid: " + + commandLine.getOptionValue("ix")); + } + } + // Accept only jobFiles and only those that fall in the desired range of // modification time. - JobFileModifiedRangePathFilter jobFileModifiedRangePathFilter = new JobFileModifiedRangePathFilter( - hbaseConf, minModificationTimeMillis); + JobFileModifiedRangeSubstringPathFilter jobFileModifiedRangePathFilter = new JobFileModifiedRangeSubstringPathFilter( + hbaseConf, minModificationTimeMillis, Long.MAX_VALUE, pathExclusionFilter, pathInclusionFilter); String timestamp = Constants.TIMESTAMP_FORMAT.format(new Date( minModificationTimeMillis)); diff --git a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestFileLister.java b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestFileLister.java index 9e508d4..eff7db9 100644 --- a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestFileLister.java +++ b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestFileLister.java @@ -70,7 +70,7 @@ public void testPruneFileListBySize() throws IOException { assertTrue(hdfs.exists(expPath)); origList[1] = hdfs.getFileStatus(expPath); - FileStatus [] prunedList = FileLister.pruneFileListBySize(maxFileSize, origList, hdfs, inputPath); + FileStatus [] prunedList = FileLister.pruneFileList(maxFileSize, origList); assertNotNull(prunedList); assertTrue(prunedList.length == 0); @@ -87,7 +87,7 @@ public void testPruneFileListBySize() throws IOException { assertTrue(hdfs.exists(emptyConfFile)); origList[1] = hdfs.getFileStatus(emptyConfFile); - prunedList = FileLister.pruneFileListBySize(maxFileSize, origList, hdfs, inputPath); + prunedList = FileLister.pruneFileList(maxFileSize, origList); assertNotNull(prunedList); assertTrue(prunedList.length == 2); @@ -129,7 +129,7 @@ public void testPruneFileListRemovingConfFromPruneList() throws IOException { assertTrue(hdfs.exists(expPath)); origList[1] = hdfs.getFileStatus(expPath); - FileStatus [] prunedList = FileLister.pruneFileListBySize(maxFileSize, origList, hdfs, inputPath); + FileStatus [] prunedList = FileLister.pruneFileList(maxFileSize, origList); assertNotNull(prunedList); assertTrue(prunedList.length == 0); } @@ -235,7 +235,7 @@ public void testPruneFileListMultipleFilesAlreadyMovedCases() throws IOException assertTrue(hdfs.exists(emptyConfFile4)); origList[11] = hdfs.getFileStatus(emptyConfFile4); - FileStatus [] prunedList = FileLister.pruneFileListBySize(maxFileSize, origList, hdfs, inputPath); + FileStatus [] prunedList = FileLister.pruneFileList(maxFileSize, origList); assertNotNull(prunedList); assertTrue(prunedList.length == 4); } From 3594db9e2821e5ab9ce5292ac6e27db4c66a8bdd Mon Sep 17 00:00:00 2001 From: Angad Singh Date: Sun, 29 Jun 2014 17:20:12 +0530 Subject: [PATCH 2/2] small fix to make travis build pass. --- hraven-etl/src/main/java/com/twitter/hraven/etl/FileLister.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/FileLister.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/FileLister.java index 21cc056..8b31da3 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/etl/FileLister.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/FileLister.java @@ -123,7 +123,6 @@ public static FileStatus[] getListFilesToProcess(long maxFileSize, boolean recur LOG.info(" No files found, orig list returning 0"); return new FileStatus[0]; } - return pruneFileListBySize(maxFileSize, origList, hdfs, inputPath); return pruneFileList(maxFileSize, origList); }