Skip to content
This repository was archived by the owner on Jan 15, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion bin/etl/hraven-etl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ hbaseconfdir=${HBASE_CONF_DIR:-$HBASE_HOME/conf}
# HDFS directories for processing and loading job history data
historyRawDir=/yarn/history/done/
historyProcessingDir=/hraven/processing/
pathExclusionFilter=distcp
pathInclusionFilter=rmcuser
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these looks like some specific internal users you run in your environment.
We may have to leave this out of the generic setup and you may have to add that in your internal deployment yourself.
Alternatively we can see if this can be picked up from hrave-conf.xml so that the shell script and the config are separated.

#######################################################

#If costfile is empty, fill it with default values
Expand All @@ -65,7 +67,7 @@ create_pidfile $HRAVEN_PID_DIR
trap 'cleanup_pidfile_and_exit $HRAVEN_PID_DIR' INT TERM EXIT

# Pre-process
$home/jobFilePreprocessor.sh $hadoopconfdir $historyRawDir $historyProcessingDir $cluster $batchsize $defaultrawfilesizelimit
$home/jobFilePreprocessor.sh $hadoopconfdir $historyRawDir $historyProcessingDir $cluster $batchsize $defaultrawfilesizelimit $pathExclusionFilter $pathInclusionFilter

# Load
$home/jobFileLoader.sh $hadoopconfdir $mapredmaxsplitsize $schedulerpoolname $cluster $historyProcessingDir
Expand Down
6 changes: 3 additions & 3 deletions bin/etl/jobFilePreprocessor.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
# Usage ./jobFilePreprocessor.sh [hadoopconfdir]
# [historyrawdir] [historyprocessingdir] [cluster] [batchsize]

if [ $# -ne 6 ]
if [ $# -lt 6 ]
then
echo "Usage: `basename $0` [hadoopconfdir] [historyrawdir] [historyprocessingdir] [cluster] [batchsize] [defaultrawfilesizelimit]"
echo "Usage: `basename $0` [hadoopconfdir] [historyrawdir] [historyprocessingdir] [cluster] [batchsize] [defaultrawfilesizelimit] [[pathExclusionFilter]] [[pathInclusionFilter]]"
exit 1
fi

Expand All @@ -39,4 +39,4 @@ fi
create_pidfile $HRAVEN_PID_DIR
trap 'cleanup_pidfile_and_exit $HRAVEN_PID_DIR' INT TERM EXIT

hadoop --config $1 jar $hravenEtlJar com.twitter.hraven.etl.JobFilePreprocessor -libjars=$LIBJARS -d -i $2 -o $3 -c $4 -b $5 -s $6
hadoop --config $1 jar $hravenEtlJar com.twitter.hraven.etl.JobFilePreprocessor -libjars=$LIBJARS -d -i $2 -o $3 -c $4 -b $5 -s $6 -ex $7 -ix $8
47 changes: 35 additions & 12 deletions hraven-etl/src/main/java/com/twitter/hraven/etl/FileLister.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public FileLister() {
*
*/
private static void traverseDirs(List<FileStatus> fileStatusesList, FileSystem hdfs,
Path inputPath, JobFileModifiedRangePathFilter jobFileModifiedRangePathFilter)
Path inputPath, JobFileModifiedRangeSubstringPathFilter jobFileModifiedRangePathFilter)
throws IOException
{
// get all the files and dirs in the current dir
Expand Down Expand Up @@ -83,7 +83,7 @@ private static void traverseDirs(List<FileStatus> fileStatusesList, FileSystem h
* @throws IOException
*/
public static FileStatus[] listFiles (boolean recurse, FileSystem hdfs, Path inputPath,
JobFileModifiedRangePathFilter jobFileModifiedRangePathFilter) throws IOException
JobFileModifiedRangeSubstringPathFilter jobFileModifiedRangePathFilter) throws IOException
{
if (recurse) {
List<FileStatus> fileStatusesList = new ArrayList<FileStatus>();
Expand Down Expand Up @@ -113,7 +113,7 @@ public static FileStatus[] listFiles (boolean recurse, FileSystem hdfs, Path inp
* @throws IOException
*/
public static FileStatus[] getListFilesToProcess(long maxFileSize, boolean recurse,
FileSystem hdfs, Path inputPath, JobFileModifiedRangePathFilter pathFilter)
FileSystem hdfs, Path inputPath, JobFileModifiedRangeSubstringPathFilter pathFilter)
throws IOException {

LOG.info(" in getListFilesToProcess maxFileSize=" + maxFileSize
Expand All @@ -123,32 +123,50 @@ public static FileStatus[] getListFilesToProcess(long maxFileSize, boolean recur
LOG.info(" No files found, orig list returning 0");
return new FileStatus[0];
}
return pruneFileListBySize(maxFileSize, origList, hdfs, inputPath);
return pruneFileList(maxFileSize, origList);
}

/**
* prunes the given list/array of files based on their sizes
* also prunes orphan job conf xml files which do not have
* corresponding history files
*
* @param maxFileSize -max #bytes to be stored in an hbase cell
* @param origList - input list of files to be processed
* @param hdfs - filesystem to be looked at
* @param inputPath - root dir of the path containing history files
* @return - pruned array of FileStatus of files to be processed
*/
static FileStatus[] pruneFileListBySize(long maxFileSize, FileStatus[] origList, FileSystem hdfs,
Path inputPath) {
LOG.info("Pruning orig list of size " + origList.length + " for source" + inputPath.toUri());
static FileStatus[] pruneFileList(long maxFileSize, FileStatus[] origList) {
LOG.info("Pruning orig list of size " + origList.length);

long fileSize = 0L;

List<FileStatus> prunedFileList = new ArrayList<FileStatus>();

Set<String> toBeRemovedJobId = new HashSet<String>();
Set<String> historyFileJobIds = new HashSet<String>();

for (int i = 0; i < origList.length; i++) {
fileSize = origList[i].getLen();

// check if hbase can store this file if yes, consider it for processing
if (fileSize <= maxFileSize) {
prunedFileList.add(origList[i]);

// Maintain a list of history file jobIds to prune orphan xml files
// which do not have a corresponding history file. This is required for
// the path exclusion and inclusion filters to work properly,
// as exclusion filters do not filter on job conf files, resulting
// in a lot of orphan job conf files being loaded.
JobFile jFile = getJobFileFromPath(origList[i].getPath());
if (jFile != null) {
if (jFile.isJobHistoryFile()) {
String jobId = jFile.getJobid();
if (jobId != null)
historyFileJobIds.add(jobId);
}
}
} else {
Path hugeFile = origList[i].getPath();
LOG.info("In getListFilesToProcess filesize " + fileSize + " has exceeded maxFileSize "
Expand All @@ -158,6 +176,7 @@ static FileStatus[] pruneFileListBySize(long maxFileSize, FileStatus[] origList,
toBeRemovedJobId.add(getJobIdFromPath(hugeFile));
}
}

if (prunedFileList.size() == 0) {
LOG.info("Found no files worth processing. Returning 0 sized array");
return new FileStatus[0];
Expand All @@ -166,13 +185,12 @@ static FileStatus[] pruneFileListBySize(long maxFileSize, FileStatus[] origList,
String jobId = null;
ListIterator<FileStatus> it = prunedFileList.listIterator();
while (it.hasNext()) {
if (toBeRemovedJobId.size() == 0) {
// no files to remove
break;
}
Path curFile = it.next().getPath();
jobId = getJobIdFromPath(curFile);
if (toBeRemovedJobId.contains(jobId)) {

//Remove job if jobId found in toBeRemovedJobIds (for which either file was 'huge')
//or if this file did not have corresponding history file to go with it
if (toBeRemovedJobId.contains(jobId) || !historyFileJobIds.contains(jobId)) {
LOG.info("Removing from prunedList " + curFile.toUri());
it.remove();
/*
Expand Down Expand Up @@ -200,4 +218,9 @@ static String getJobIdFromPath(Path aPath) {
}
return jobId;
}

static JobFile getJobFileFromPath(Path aPath) {
String fileName = aPath.getName();
return new JobFile(fileName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobHistory;

/**
* Pathfilter that allows only files that are named correctly and are modified
* within a certain time range.
*
*/
public class JobFileModifiedRangePathFilter extends JobFilePathFilter {
public class JobFileModifiedRangeSubstringPathFilter extends JobFilePathFilter {

/**
* The minimum modification time of a file to be accepted in milliseconds
Expand All @@ -47,7 +48,12 @@ public class JobFileModifiedRangePathFilter extends JobFilePathFilter {
* The configuration of this processing job (not the files we are processing).
*/
private final Configuration myConf;
private static Log LOG = LogFactory.getLog(JobFileModifiedRangePathFilter.class);

private String[] pathExclusionFilter;

private String[] pathInclusionFilter;

private static Log LOG = LogFactory.getLog(JobFileModifiedRangeSubstringPathFilter.class);

/**
* Constructs a filter that accepts only JobFiles with lastModification time
Expand All @@ -62,11 +68,9 @@ public class JobFileModifiedRangePathFilter extends JobFilePathFilter {
* maximum modification time of a file to be accepted in milliseconds
* since January 1, 1970 UTC (including).
*/
public JobFileModifiedRangePathFilter(Configuration myConf,
public JobFileModifiedRangeSubstringPathFilter(Configuration myConf,
long minModificationTimeMillis, long maxModificationTimeMillis) {
this.myConf = myConf;
this.minModificationTimeMillis = minModificationTimeMillis;
this.maxModificationTimeMillis = maxModificationTimeMillis;
this(myConf, minModificationTimeMillis, maxModificationTimeMillis, null, null);
}

/**
Expand All @@ -79,11 +83,35 @@ public JobFileModifiedRangePathFilter(Configuration myConf,
* The minimum modification time of a file to be accepted in
* milliseconds since January 1, 1970 UTC (excluding).
*/
public JobFileModifiedRangePathFilter(Configuration myConf,
public JobFileModifiedRangeSubstringPathFilter(Configuration myConf,
long minModificationTimeMillis) {
this(myConf, minModificationTimeMillis, Long.MAX_VALUE);
}

/**
* Constructs a filter that accepts only JobFiles with lastModification time
* as least the specified minumum. Also accepts a simple substring to exclude.
*
* @param myConf
* used to be able to go from a path to a FileStatus.
* @param minModificationTimeMillis
* The minimum modification time of a file to be accepted in
* milliseconds since January 1, 1970 UTC (excluding).
* @param maxModificationTimeMillis The
* maximum modification time of a file to be accepted in milliseconds
* since January 1, 1970 UTC (including).
* @param pathExclusionFilter
* files with this substring path will be excluded
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are missing javadoc for pathInclusionFilter argument.
Can you describe how you expect them to behave when both are specified ?
Exclusion is easy to grasp. Just ignore the ones with these paths in them.
The inclusion filter means take only files that match this ? That seems doable as well.
But when you specify both, you mean only one that match this, but within that set, not these ones ? That certainly deserves a specific unit test to confirm all this works properly

*/
public JobFileModifiedRangeSubstringPathFilter(Configuration myConf, long minModificationTimeMillis, long maxModificationTimeMillis,
String[] pathExclusionFilter, String[] pathInclusionFilter) {
this.myConf = myConf;
this.minModificationTimeMillis = minModificationTimeMillis;
this.maxModificationTimeMillis = maxModificationTimeMillis;
this.pathExclusionFilter = pathExclusionFilter;
this.pathInclusionFilter = pathInclusionFilter;
}

/*
* (non-Javadoc)
*
Expand All @@ -96,25 +124,53 @@ public boolean accept(Path path) {
if (!super.accept(path)) {
return false;
}

JobFile jobFile = new JobFile(path.getName());
if (jobFile.isJobConfFile() || jobFile.isJobHistoryFile()) {
if (jobFile.isJobHistoryFile()) {
if (!includesPathSubstrings(path) || !excludesPathSubstrings(path)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The double negative with an or is really hard to read here, then the methods have indexOf != with false and true return statements.
Don't you mean, if it does not get included, or it gets excluded, then filter out (return false) ?
The excludesPathSubstrings and includePathSubstrings need some javadoc to make this simpler to read.

return false;
}
}
try {
FileSystem fs = path.getFileSystem(myConf);
FileStatus fileStatus = fs.getFileStatus(path);
long fileModificationTimeMillis = fileStatus.getModificationTime();
return accept(fileModificationTimeMillis);
} catch (IOException e) {
throw new ImportException("Cannot determine file modification time of "
+ path.getName(), e);
throw new ImportException("Cannot determine file modification time of " + path.getName(), e);
}
} else {
// Reject anything that does not match a job conf filename.
LOG.info(" Not a valid job conf / job history file "+ path.getName());
LOG.info(" Not a valid job conf / job history file " + path.getName());
return false;
}
}

private boolean excludesPathSubstrings(Path path) {
if (pathExclusionFilter == null)
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need curly braces around the then block for ifs.
Otherwise we get later logic issues if merges create two lines behind an if and only the first one is executed conditionally. Same here and below.


for (String s: pathExclusionFilter) {
if (path.toString().indexOf(s) != -1)
return false;
}

return true;
}

private boolean includesPathSubstrings(Path path) {
if (pathInclusionFilter == null)
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curly braces

boolean matches = false;
for (String s: pathInclusionFilter) {
if (path.toString().indexOf(s) != -1)
matches = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curlies

}

return matches;
}

/**
* @param fileModificationTimeMillis
* in milliseconds since January 1, 1970 UTC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ private void processHDFSSources(Path inputPath) throws IOException {

// Accept only jobFiles and only those that fall in the desired range of
// modification time.
JobFileModifiedRangePathFilter jobFileModifiedRangePathFilter = new JobFileModifiedRangePathFilter(
JobFileModifiedRangeSubstringPathFilter jobFileModifiedRangePathFilter = new JobFileModifiedRangeSubstringPathFilter(
myConf, 0L);

ContentSummary contentSummary = hdfs.getContentSummary(inputPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Date;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
Expand Down Expand Up @@ -48,7 +50,7 @@
import com.twitter.hraven.etl.ProcessRecordService;
import com.twitter.hraven.util.BatchUtil;
import com.twitter.hraven.etl.FileLister;
import com.twitter.hraven.etl.JobFileModifiedRangePathFilter;
import com.twitter.hraven.etl.JobFileModifiedRangeSubstringPathFilter;

/**
* Command line tool that can be run on a periodic basis (like daily, hourly, or
Expand Down Expand Up @@ -167,6 +169,21 @@ private static CommandLine parseArgs(String[] args) throws ParseException {
o.setRequired(false);
options.addOption(o);


// Path substring to exclude from job history files
o = new Option("ex", "pathExclusionFilter", true,
"Comma seperated list of path substrings to exclude from job history files");
o.setArgName("pathExclusionFilter");
o.setRequired(false);
options.addOption(o);

// Path substring to include in job history files (inclusions take precedence over exclusions)
o = new Option("ix", "pathInclusionFilter", true,
"Comma seperated list of path substrings to include in job history files");
o.setArgName("pathInclusionFilter");
o.setRequired(false);
options.addOption(o);

// Debugging
options.addOption("d", "debug", false, "switch on DEBUG log level");

Expand Down Expand Up @@ -315,10 +332,37 @@ public int run(String[] args) throws Exception {
+ lastProcessRecord);
}

// Grap the pathExclusionFilter and pathInclusionFilter
String[] pathExclusionFilter = null;

if (commandLine.getOptionValue("ex") != null) {
try {
pathExclusionFilter = commandLine.getOptionValue("ex").split(",");
LOG.info("pathExclusionFilter: " + Arrays.toString(pathExclusionFilter));
} catch (Exception e) {
throw new RuntimeException(
"Please provide a comma seperated list of substrings to exclude. This is invalid: "
+ commandLine.getOptionValue("ex"));
}
}

String[] pathInclusionFilter = null;

if (commandLine.getOptionValue("ix") != null) {
try {
pathInclusionFilter = commandLine.getOptionValue("ix").split(",");
LOG.info("pathInclusionFilter: " + Arrays.toString(pathInclusionFilter));
} catch (Exception e) {
throw new RuntimeException(
"Please provide a comma seperated list of substrings to include. This is invalid: "
+ commandLine.getOptionValue("ix"));
}
}

// Accept only jobFiles and only those that fall in the desired range of
// modification time.
JobFileModifiedRangePathFilter jobFileModifiedRangePathFilter = new JobFileModifiedRangePathFilter(
hbaseConf, minModificationTimeMillis);
JobFileModifiedRangeSubstringPathFilter jobFileModifiedRangePathFilter = new JobFileModifiedRangeSubstringPathFilter(
hbaseConf, minModificationTimeMillis, Long.MAX_VALUE, pathExclusionFilter, pathInclusionFilter);

String timestamp = Constants.TIMESTAMP_FORMAT.format(new Date(
minModificationTimeMillis));
Expand Down
Loading