diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java index 82a55f12d..bcff2a6bb 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java @@ -426,11 +426,14 @@ public void ioCommandCompleted(IOCommand command) { * threads. * * @param fraction the fraction of processing threads to remain active. This - * number is in range [0, 1] + * number is in range (0, 1] */ public void updateActiveThreadsFraction(double fraction) { checkState(fraction >= 0 && fraction <= 1); - int numActiveThreads = (int) (numProcessingThreads * fraction); + // Making sure the number of active threads is not set to 0 to ensure + // progress in computation + int numActiveThreads = Math.max( + (int) (Math.ceil(numProcessingThreads * fraction)), 1); if (LOG.isInfoEnabled()) { LOG.info("updateActiveThreadsFraction: updating the number of active " + "threads to " + numActiveThreads); @@ -489,7 +492,7 @@ public void processingThreadFinish() { */ public void updateRequestsCreditFraction(double fraction) { checkState(fraction >= 0 && fraction <= 1); - short newCredit = (short) (maxRequestsCredit * fraction); + short newCredit = (short) (Math.ceil(maxRequestsCredit * fraction)); if (LOG.isInfoEnabled()) { LOG.info("updateRequestsCreditFraction: updating the credit to " + newCredit); diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/MemoryEstimatorOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/MemoryEstimatorOracle.java index 1233183e6..7ff647145 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/MemoryEstimatorOracle.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/MemoryEstimatorOracle.java @@ -63,6 +63,10 @@ * x4: bytes received due to messages * x5: bytes loaded/stored from/to disk due to OOC. * + * If the estimation error is high, the oracle falls back to a threshold-based + * mechanism based on JVM's pessimistic memory usage report (refer to + * {@link ThresholdBasedOracle} to see how an oracle can be entirely depend on + * such reports) */ public class MemoryEstimatorOracle implements OutOfCoreOracle { /** Memory check interval in msec */ @@ -78,6 +82,14 @@ public class MemoryEstimatorOracle implements OutOfCoreOracle { new FloatConfOption("giraph.garbageEstimator.manualGCPressure", 0.95f, "The threshold above which GC is called manually if Full GC has not " + "happened in a while"); + /** + * If full GC is not called within this interval, in case of tight memory + * pressure, we should call GC manually + */ + public static final LongConfOption MANUAL_GC_INTERVAL = + new LongConfOption("giraph.memoryEstimator.manualGCInterval", 20000, + "The amount of time passed from the last full GC to call GC " + + "manually if memory is tight (in milliseconds)"); /** Used to detect a high memory pressure situation */ public static final FloatConfOption GC_MINIMUM_RECLAIM_FRACTION = new FloatConfOption("giraph.garbageEstimator.gcReclaimFraction", 0.05f, @@ -103,11 +115,32 @@ public class MemoryEstimatorOracle implements OutOfCoreOracle { public static final FloatConfOption CREDIT_LOW_THRESHOLD = new FloatConfOption("giraph.creditLowThreshold", 0.90f, "If mem-usage is below this threshold, credit is set to max"); - /** OOC starts if mem-usage is above this threshold */ + /** OOC starts being aggressive if mem-usage is above this threshold */ + public static final FloatConfOption OOC_THRESHOLD_STORE = + new FloatConfOption("giraph.oocThresholdStore", 0.95f, + "If mem-usage is above this threshold, out of core threads " + + "starts aggressively writing data to disk"); + /** + * OOC threads start using disk if mem-usage is above this threshold. If + * mem-usage is below this threshold, OOC threads bring critical (unprocessed) + * data to memory + */ public static final FloatConfOption OOC_THRESHOLD = new FloatConfOption("giraph.oocThreshold", 0.90f, "If mem-usage is above this threshold, out of core threads starts " + - "writing data to disk"); + "writing data to disk, but they wont be aggressive"); + /** + * OOC threads start bringing data back to memory if mem-usage is below this + * threshold + */ + public static final FloatConfOption OOC_THRESHOLD_LOAD = + new FloatConfOption("giraph.oocThresholdLoad", 0.85f, + "If mem-usage is below this threshold, out of core threads " + + "starts reading data from disk"); + /** Maximum tolerable error by memory estimator mechanism */ + public static final FloatConfOption MEM_ESTIMATOR_ACCURACY = + new FloatConfOption("giraph.MemEstimatorAccuracy", 0.1f, + "Maximum tolerable error by memory estimation mechanism"); /** Logger */ private static final Logger LOG = @@ -127,6 +160,12 @@ public class MemoryEstimatorOracle implements OutOfCoreOracle { private final float creditLowThreshold; /** Cached value for {@link #OOC_THRESHOLD} */ private final float oocThreshold; + /** Cached value for {@link #OOC_THRESHOLD_LOAD} */ + private final float oocThresholdLoad; + /** Cached value for {@link #OOC_THRESHOLD_STORE} */ + private final float oocThresholdStore; + /** Cached value for {@link #MEM_ESTIMATOR_ACCURACY} */ + private final float memEstAcc; /** Reference to running OOC engine */ private final OutOfCoreEngine oocEngine; @@ -134,21 +173,33 @@ public class MemoryEstimatorOracle implements OutOfCoreOracle { private final MemoryEstimator memoryEstimator; /** Keeps track of the number of bytes stored/loaded by OOC */ private final AtomicLong oocBytesInjected = new AtomicLong(0); - /** How many bytes to offload */ - private final AtomicLong numBytesToOffload = new AtomicLong(0); - /** Current state of the OOC */ - private volatile State state = State.STABLE; /** Timestamp of the last major GC */ private volatile long lastMajorGCTime = 0; + /** + * Lock to avoid interleaving of resetting data structures in memory estimator + * with modifying those data structures + */ + private Lock lock = new ReentrantLock(); + /** State of the oracle */ + private volatile OracleState oracleState = OracleState.NEUTRAL; + /** Whether OOC has offloaded any data since the beginning of the superstep */ + private volatile boolean hasOffloaded = false; + /** The last time where memory usage was not identified as 'high' */ + private volatile long lastNonHighPressureTimeMillis = Long.MAX_VALUE; /** - * Different states the OOC can be in. + * Different states the oracle can be in. */ - private enum State { - /** No offloading */ - STABLE, - /** Current offloading */ - OFFLOADING, + private enum OracleState { + /** No offloading yet, no estimation exists */ + NEUTRAL, + /** An estimation exists and the accuracy is good enough */ + MEM_ESTIMATOR, + /** + * There has been data offloaded to disk since the beginning of the + * superstep, but a good estimation does not exist + */ + THRESHOLD_BASED, } /** @@ -170,38 +221,68 @@ public MemoryEstimatorOracle(ImmutableClassesGiraphConfiguration conf, this.creditHighThreshold = CREDIT_HIGH_THRESHOLD.get(conf); this.creditLowThreshold = CREDIT_LOW_THRESHOLD.get(conf); this.oocThreshold = OOC_THRESHOLD.get(conf); + this.oocThresholdLoad = OOC_THRESHOLD_LOAD.get(conf); + this.oocThresholdStore = OOC_THRESHOLD_STORE.get(conf); + this.memEstAcc = MEM_ESTIMATOR_ACCURACY.get(conf); final long checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf); + final long manualGCInterval = MANUAL_GC_INTERVAL.get(conf); ThreadUtils.startThread(new Runnable() { @Override public void run() { while (true) { - long oldGenUsageEstimate = memoryEstimator.getUsageEstimate(); MemoryUsage usage = getOldGenUsed(); - if (oldGenUsageEstimate > 0) { - updateRates(oldGenUsageEstimate, usage.getMax()); - } else { - long time = System.currentTimeMillis(); - if (time - lastMajorGCTime >= 10000) { - double used = (double) usage.getUsed() / usage.getMax(); - if (used > manualGCMemoryPressure) { - if (LOG.isInfoEnabled()) { - LOG.info( - "High memory pressure with no full GC from the JVM. " + - "Calling GC manually. Used fraction of old-gen is " + - String.format("%.2f", used) + "."); - } - System.gc(); - time = System.currentTimeMillis() - time; - usage = getOldGenUsed(); - used = (double) usage.getUsed() / usage.getMax(); - if (LOG.isInfoEnabled()) { - LOG.info("Manual GC done. It took " + - String.format("%.2f", time / 1000.0) + - " seconds. Used fraction of old-gen is " + + // If we are following an OOC policy (based on either + // memory-estimation or threshold-based), we update the + // number of active threads and network credits. + if (oracleState == OracleState.MEM_ESTIMATOR) { + updateRates(memoryEstimator.getUsageEstimate(), usage.getMax()); + } else if (oracleState == OracleState.THRESHOLD_BASED) { + updateRates(usage.getUsed(), usage.getMax()); + } + + // We check intervals of major GCs. If a long enough interval has + // passed and either of the following + // satisfies: + // - Memory usage in old-gen is high, + // - We have constantly been at high memory pressure (i.e. + // offloading data if possible), + // we call manual GC. + + long time = System.currentTimeMillis(); + if (time - lastMajorGCTime >= manualGCInterval) { + double used = (double) usage.getUsed() / usage.getMax(); + if ( + // Memory usage is at a critical point and JVM's GC is almost + // about to kick in. + used > manualGCMemoryPressure || + // Or, memory pressure is high (i.e. enough to start offloading + // to disk), and it has been high for a long enough period. + ((double) getUsageEstimate(usage) / usage.getMax() > + oocThreshold && + time - lastNonHighPressureTimeMillis >= 3 * manualGCInterval) || + // Or, oracle is following the ThresholdBased scheme and long + // enough has passed since last major GC. We should call manual + // GC hoping for memory estimation accuracy to be good this + // time. + (oracleState == OracleState.THRESHOLD_BASED && + time - lastMajorGCTime >= 5 * manualGCInterval)) { + if (LOG.isInfoEnabled()) { + LOG.info( + "High memory pressure with no full GC from the JVM. " + + "Calling GC manually. Used fraction of old-gen is " + String.format("%.2f", used) + "."); - } + } + System.gc(); + time = System.currentTimeMillis() - time; + usage = getOldGenUsed(); + used = (double) usage.getUsed() / usage.getMax(); + if (LOG.isInfoEnabled()) { + LOG.info("Manual GC done. It took " + + String.format("%.2f", time / 1000.0) + + " seconds. Used fraction of old-gen is " + + String.format("%.2f", used) + "."); } } } @@ -217,6 +298,27 @@ public void run() { .createUncaughtExceptionHandler()); } + /** + * Finds the best possible estimation of usage at the moment based on the + * oracle's state. If the memory-estimation mechanism is accurate enough, we + * rely on that. If ThresholdBased scheme is used, we solely rely on JVM's + * report. Otherwise, we don't have any good estimation and return -1. + * + * @param oldGenUsage MemoryUsage object obtained from JVM in case we are + * using ThresholdBased scheme + * @return memory usage estimate + */ + private long getUsageEstimate(MemoryUsage oldGenUsage) { + if (oracleState == OracleState.MEM_ESTIMATOR) { + return memoryEstimator.getUsageEstimate(); + } else if (oracleState == OracleState.THRESHOLD_BASED) { + return oldGenUsage.getUsed(); + } else { + return -1; + } + } + + /** * Resets all the counters used in the memory estimation. This is called at * the beginning of a new superstep. @@ -231,8 +333,16 @@ public void run() { public void startIteration() { AbstractEdgeStore.PROGRESS_COUNTER.reset(); oocBytesInjected.set(0); - memoryEstimator.clear(); - memoryEstimator.setCurrentSuperstep(oocEngine.getSuperstep()); + oracleState = OracleState.NEUTRAL; + hasOffloaded = false; + lastNonHighPressureTimeMillis = Long.MAX_VALUE; + lock.lock(); + try { + memoryEstimator.clear(); + memoryEstimator.setCurrentSuperstep(oocEngine.getSuperstep()); + } finally { + lock.unlock(); + } oocEngine.updateRequestsCreditFraction(1); oocEngine.updateActiveThreadsFraction(1); } @@ -240,28 +350,45 @@ public void startIteration() { @Override public IOAction[] getNextIOActions() { - if (state == State.OFFLOADING) { - return new IOAction[]{ - IOAction.STORE_MESSAGES_AND_BUFFERS, IOAction.STORE_PARTITION}; - } - long oldGenUsage = memoryEstimator.getUsageEstimate(); + long oldGenUsage = -1; MemoryUsage usage = getOldGenUsed(); + if (oracleState == OracleState.MEM_ESTIMATOR) { + oldGenUsage = memoryEstimator.getUsageEstimate(); + } else if (oracleState == OracleState.THRESHOLD_BASED) { + oldGenUsage = usage.getUsed(); + } if (oldGenUsage > 0) { double usageEstimate = (double) oldGenUsage / usage.getMax(); - if (usageEstimate > oocThreshold) { + if (usageEstimate < oocThreshold) { + lastNonHighPressureTimeMillis = System.currentTimeMillis(); + } + if (usageEstimate < oocThresholdLoad) { + return new IOAction[]{IOAction.LOAD_PARTITION}; + } else if (usageEstimate < oocThreshold) { + return new IOAction[]{ + IOAction.LOAD_UNPROCESSED_PARTITION, + IOAction.LOAD_PARTITION}; + } else if (usageEstimate < oocThresholdStore) { return new IOAction[]{ IOAction.STORE_MESSAGES_AND_BUFFERS, - IOAction.STORE_PARTITION}; + IOAction.STORE_PROCESSED_PARTITION}; } else { - return new IOAction[]{IOAction.LOAD_PARTITION}; + return new IOAction[]{ + IOAction.STORE_MESSAGES_AND_BUFFERS, + IOAction.STORE_PARTITION}; } } else { + lastNonHighPressureTimeMillis = System.currentTimeMillis(); return new IOAction[]{IOAction.LOAD_PARTITION}; } } @Override public boolean approve(IOCommand command) { + if (!(command instanceof WaitIOCommand) && + !(command instanceof LoadPartitionIOCommand) && !hasOffloaded) { + hasOffloaded = true; + } return true; } @@ -269,20 +396,8 @@ public boolean approve(IOCommand command) { public void commandCompleted(IOCommand command) { if (command instanceof LoadPartitionIOCommand) { oocBytesInjected.getAndAdd(command.bytesTransferred()); - if (state == State.OFFLOADING) { - numBytesToOffload.getAndAdd(command.bytesTransferred()); - } } else if (!(command instanceof WaitIOCommand)) { oocBytesInjected.getAndAdd(0 - command.bytesTransferred()); - if (state == State.OFFLOADING) { - numBytesToOffload.getAndAdd(0 - command.bytesTransferred()); - } - } - - if (state == State.OFFLOADING && numBytesToOffload.get() <= 0) { - numBytesToOffload.set(0); - state = State.STABLE; - updateRates(-1, 1); } } @@ -323,52 +438,64 @@ public synchronized void gcCompleted( long usedMemoryEstimate = memoryEstimator.getUsageEstimate(); long usedMemoryReal = after.getUsed(); if (usedMemoryEstimate >= 0) { + double error = (double) Math.abs(usedMemoryEstimate - usedMemoryReal) / + usedMemoryReal; if (LOG.isInfoEnabled()) { LOG.info("gcCompleted: estimate=" + usedMemoryEstimate + " real=" + - usedMemoryReal + " error=" + - ((double) Math.abs(usedMemoryEstimate - usedMemoryReal) / - usedMemoryReal * 100)); + usedMemoryReal + " error=" + (error * 100)); + } + if (error < memEstAcc) { + oracleState = OracleState.MEM_ESTIMATOR; + } else { + if (oracleState == OracleState.MEM_ESTIMATOR) { + if (hasOffloaded) { + oracleState = OracleState.THRESHOLD_BASED; + } else { + oracleState = OracleState.NEUTRAL; + } + } } } - // Number of edges loaded so far (if in input superstep) - long edgesLoaded = oocEngine.getSuperstep() >= 0 ? 0 : - EdgeInputSplitsCallable.getTotalEdgesLoadedMeter().count(); - // Number of vertices loaded so far (if in input superstep) - long verticesLoaded = oocEngine.getSuperstep() >= 0 ? 0 : - VertexInputSplitsCallable.getTotalVerticesLoadedMeter().count(); - // Number of vertices computed (if either in compute or store phase) - long verticesComputed = WorkerProgress.get().getVerticesComputed() + - WorkerProgress.get().getVerticesStored() + - AbstractEdgeStore.PROGRESS_COUNTER.getProgress(); - // Number of bytes received - long receivedBytes = - oocEngine.getNetworkMetrics().getBytesReceivedPerSuperstep(); - // Number of OOC bytes - long oocBytes = oocBytesInjected.get(); - - memoryEstimator.addRecord(getOldGenUsed().getUsed(), edgesLoaded, - verticesLoaded, verticesComputed, receivedBytes, oocBytes); + lock.lock(); + try { + // Number of edges loaded so far (if in input superstep) + long edgesLoaded = memoryEstimator.getCurrentSuperstep() >= 0 ? 0 : + EdgeInputSplitsCallable.getTotalEdgesLoadedMeter().count(); + // Number of vertices loaded so far (if in input superstep) + long verticesLoaded = memoryEstimator.getCurrentSuperstep() >= 0 ? 0 : + VertexInputSplitsCallable.getTotalVerticesLoadedMeter().count(); + // Number of vertices computed (if either in compute or store phase) + long verticesComputed = WorkerProgress.get().getVerticesComputed() + + WorkerProgress.get().getVerticesStored() + + AbstractEdgeStore.PROGRESS_COUNTER.getProgress(); + // Number of bytes received + long receivedBytes = + oocEngine.getNetworkMetrics().getBytesReceivedPerSuperstep(); + // Number of OOC bytes + long oocBytes = oocBytesInjected.get(); + + memoryEstimator.addRecord(getOldGenUsed().getUsed(), edgesLoaded, + verticesLoaded, verticesComputed, receivedBytes, oocBytes); + } finally { + lock.unlock(); + } long garbage = before.getUsed() - after.getUsed(); long maxMem = after.getMax(); long memUsed = after.getUsed(); - boolean isTight = (maxMem - memUsed) < 2 * gcReclaimFraction * maxMem && - garbage < gcReclaimFraction * maxMem; + boolean highMemoryUsage = + (maxMem - memUsed) < 2 * gcReclaimFraction * maxMem; + boolean isTight = highMemoryUsage && garbage < gcReclaimFraction * maxMem; boolean predictionExist = memoryEstimator.getUsageEstimate() > 0; - if (isTight && !predictionExist) { + if (isTight && !predictionExist && oracleState == OracleState.NEUTRAL) { + oracleState = OracleState.THRESHOLD_BASED; if (LOG.isInfoEnabled()) { - LOG.info("gcCompleted: garbage=" + garbage + " memUsed=" + - memUsed + " maxMem=" + maxMem); + LOG.info("gcCompleted: Tight memory usage, no prediction exists, " + + "fall back to threshold-based until prediction is valid. garbage=" + + garbage + " memUsed=" + memUsed + " maxMem=" + maxMem); } - numBytesToOffload.set((long) (2 * gcReclaimFraction * maxMem) - - (maxMem - memUsed)); - if (LOG.isInfoEnabled()) { - LOG.info("gcCompleted: tight memory usage. Starting to offload " + - "until " + numBytesToOffload.get() + " bytes are offloaded"); - } - state = State.OFFLOADING; - updateRates(1, 1); + updateRates(memUsed, maxMem); } } } @@ -483,6 +610,10 @@ public void setCurrentSuperstep(long superstep) { this.currentSuperstep = superstep; } + public long getCurrentSuperstep() { + return this.currentSuperstep; + } + /** * Given the current state of computation (i.e. current edges loaded, * vertices computed etc) and the current model (i.e. the regression @@ -787,14 +918,6 @@ private Boolean refineCoefficient(int coefIndex, double lowerBound, */ private static boolean calculateRegression(double[] coefficient, List validColumnIndices, OLSMultipleLinearRegression mlr) { - - if (coefficient.length != validColumnIndices.size()) { - LOG.warn("There are " + coefficient.length + - " coefficients, but " + validColumnIndices.size() + - " valid columns in the regression"); - return false; - } - double[] beta = mlr.estimateRegressionParameters(); Arrays.fill(coefficient, 0); for (int i = 0; i < validColumnIndices.size(); ++i) {