diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java index c34f85a..d57da96 100644 --- a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java +++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java @@ -167,6 +167,12 @@ private static CommandLine parseArgs(String[] args) throws ParseException { o.setRequired(false); options.addOption(o); + // Accept a minModificationTimeMillis. Don't process files before this time. + o = new Option("m", "minModificationTimeMillis", true, + "The minimum modification time of the file to be processed"); + o.setArgName("minModificationTimeMillis"); + o.setRequired(false); + options.addOption(o); // Debugging options.addOption("d", "debug", false, "switch on DEBUG log level"); @@ -285,6 +291,22 @@ public int run(String[] args) throws Exception { throw new ProcessingException("Caught NumberFormatException during conversion " + " of maxFileSize to long", nfe); } + + // Grab the minModificationTimeMillis argument + long minModificationTimeMillis = 0; + + if (commandLine.getOptionValue("m") != null) { + try { + minModificationTimeMillis = Long.parseLong(commandLine.getOptionValue("m")); + LOG.info("Using specified start time for filtering history files: " + minModificationTimeMillis); + } catch (NumberFormatException nfe) { + throw new IllegalArgumentException( + "minModificationTimeMillis has to be an epoch time (long). Can't be: " + + commandLine.getOptionValue("m"), nfe); + } + } + + LOG.info("minModificationTimeMillis: " + minModificationTimeMillis); ProcessRecordService processRecordService = new ProcessRecordService( hbaseConf); @@ -298,20 +320,30 @@ public int run(String[] args) throws Exception { if (!forceAllFiles) { lastProcessRecord = processRecordService .getLastSuccessfulProcessRecord(cluster); + } else { + //discard minModificationTimeMillis arguemnt given if all files + //are to be forced. + minModificationTimeMillis = 0; } - long minModificationTimeMillis = 0; + // Start of this time period is the end of the last period. if (lastProcessRecord != null) { - // Start of this time period is the end of the last period. - minModificationTimeMillis = lastProcessRecord - .getMaxModificationTimeMillis(); + LOG.info("lastProcessRecord time: " + lastProcessRecord.getMaxModificationTimeMillis()); + // Choose the maximum of the two. + if (minModificationTimeMillis < lastProcessRecord + .getMaxModificationTimeMillis()) { + minModificationTimeMillis = lastProcessRecord + .getMaxModificationTimeMillis(); + LOG.info("lastProcessRecord is greater than minModificationTimeMillis. Using that as minimum time: " + + minModificationTimeMillis); + } } // Do a sanity check. The end time of the last scan better not be later // than when we started processing. if (minModificationTimeMillis > processingStartMillis) { throw new RuntimeException( - "The last processing record has maxModificationMillis later than now: " + "Job start time is lesser than the minimum modification time of files to read. Failing." + lastProcessRecord); }