From 71e2025dece07c1efc9ebd71288aa0ed4bec2592 Mon Sep 17 00:00:00 2001 From: anastas Date: Tue, 30 Aug 2016 14:11:32 +0300 Subject: [PATCH 1/5] Merging the pipeline --- .../hbase/regionserver/HeapMemStoreLAB.java | 11 +++++-- .../hbase/regionserver/ImmutableSegment.java | 19 ++++++++---- .../hbase/regionserver/MemStoreCompactor.java | 23 +++++++++----- .../MemStoreCompactorIterator.java | 5 ++-- .../hbase/regionserver/SegmentFactory.java | 30 +++++++++++++++---- .../regionserver/TestCompactingMemStore.java | 2 ++ 6 files changed, 68 insertions(+), 22 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java index 3ca4b0c52b2c..4a2c6f2cea14 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java @@ -106,6 +106,13 @@ public HeapMemStoreLAB(Configuration conf) { MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); } + /** + * To be used for merging multiple MSLABs + */ + public void addPooledChunkQueue(BlockingQueue queueToBeAdded) { + queueToBeAdded.drainTo(pooledChunkQueue); + } + /** * Allocate a slice of the given length. * @@ -242,8 +249,8 @@ Chunk getCurrentChunk() { return this.curChunk.get(); } - @VisibleForTesting - BlockingQueue getChunkQueue() { + //@VisibleForTesting + public BlockingQueue getChunkQueue() { return this.pooledChunkQueue; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index 28f14d5c2df3..30bf818730f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -79,7 +79,7 @@ protected ImmutableSegment(Segment segment) { * are going to be introduced. */ protected ImmutableSegment(CellComparator comparator, MemStoreCompactorIterator iterator, - MemStoreLAB memStoreLAB, int numOfCells, Type type) { + MemStoreLAB memStoreLAB, int numOfCells, Type type, boolean merge) { super(null, // initiailize the CellSet with NULL comparator, memStoreLAB, @@ -88,7 +88,7 @@ protected ImmutableSegment(CellComparator comparator, MemStoreCompactorIterator ClassSize.CELL_ARRAY_MAP_ENTRY); // build the true CellSet based on CellArrayMap - CellSet cs = createCellArrayMapSet(numOfCells, iterator); + CellSet cs = createCellArrayMapSet(numOfCells, iterator, merge); this.setCellSet(null, cs); // update the CellSet of the new Segment this.type = type; @@ -194,16 +194,23 @@ public boolean flatten() { ///////////////////// PRIVATE METHODS ///////////////////// /*------------------------------------------------------------------------*/ // Create CellSet based on CellArrayMap from compacting iterator - private CellSet createCellArrayMapSet(int numOfCells, MemStoreCompactorIterator iterator) { + private CellSet createCellArrayMapSet(int numOfCells, MemStoreCompactorIterator iterator, + boolean merge) { Cell[] cells = new Cell[numOfCells]; // build the Cell Array int i = 0; while (iterator.hasNext()) { Cell c = iterator.next(); // The scanner behind the iterator is doing all the elimination logic - // now we just copy it to the new segment (also MSLAB copy) - cells[i] = maybeCloneWithAllocator(c); - boolean usedMSLAB = (cells[i] != c); + if (!merge) { + // now we just copy it to the new segment (also MSLAB copy) + cells[i] = maybeCloneWithAllocator(c); + } else { + // if this is merge we just move the Cell object without copying MSLAB + // the sizes still need to be updated in the new segment + cells[i] = c; + } + boolean usedMSLAB = (getMemStoreLAB()!=null); // second parameter true, because in compaction addition of the cell to new segment // is always successful updateMetaInfo(c, true, usedMSLAB); // updates the size per cell diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index 470dc9c8daa0..c70cfc1a3daf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -74,14 +74,15 @@ class MemStoreCompactor { int immutCellsNum = 0; // number of immutable for compaction cells /** - * Types of Compaction + * Types of actions to be done on the pipeline upon MemStoreCompaction invocation */ - private enum Type { + private enum ActionUponInvocation { COMPACT_TO_SKIPLIST_MAP, - COMPACT_TO_ARRAY_MAP + COMPACT_TO_ARRAY_MAP, + MERGE_TO_ARRAY_MAP } - private Type type = Type.COMPACT_TO_ARRAY_MAP; + private ActionUponInvocation type = ActionUponInvocation.COMPACT_TO_ARRAY_MAP; public MemStoreCompactor(CompactingMemStore compactingMemStore) { this.compactingMemStore = compactingMemStore; @@ -104,9 +105,9 @@ public boolean start() throws IOException { COMPACTING_MEMSTORE_TYPE_DEFAULT); switch (t) { - case 1: type = Type.COMPACT_TO_SKIPLIST_MAP; + case 1: type = ActionUponInvocation.COMPACT_TO_SKIPLIST_MAP; break; - case 2: type = Type.COMPACT_TO_ARRAY_MAP; + case 2: type = ActionUponInvocation.COMPACT_TO_ARRAY_MAP; break; default: throw new RuntimeException("Unknown type " + type); // sanity check } @@ -149,6 +150,8 @@ private void releaseResources() { * still need to evaluate the compaction. */ private boolean shouldFlatten() { + if (isInterrupted.get()) // if the entire process is interrupted refuse to flatten + return false; // the compaction also doesn't start when interrupted boolean userToFlatten = // the user configurable option to flatten or not to flatten compactingMemStore.getConfiguration().getBoolean(MEMSTORE_COMPACTOR_FLATTENING, MEMSTORE_COMPACTOR_FLATTENING_DEFAULT); @@ -160,8 +163,9 @@ private boolean shouldFlatten() { // limit the number of the segments in the pipeline int numOfSegments = versionedList.getNumOfSegments(); if (numOfSegments > 3) { // hard-coded for now as it is going to move to policy - LOG.debug("In-Memory shrink is doing compaction, as there already are " + numOfSegments + LOG.debug("In-Memory shrink is doing MERGE, as there already are " + numOfSegments + " segments in the compaction pipeline"); + type = ActionUponInvocation.MERGE_TO_ARRAY_MAP; return false; // to avoid "too many open files later", compact now } // till here we hvae all the signs that it is possible to flatten, run the speculative scan @@ -254,6 +258,11 @@ private ImmutableSegment compact(int numOfCells) throws IOException { compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, numOfCells, ImmutableSegment.Type.ARRAY_MAP_BASED); break; + case MERGE_TO_ARRAY_MAP: + result = SegmentFactory.instance().createImmutableSegment( + compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, + numOfCells, ImmutableSegment.Type.ARRAY_MAP_BASED, versionedList.getStoreSegments()); + break; default: throw new RuntimeException("Unknown type " + type); // sanity check } } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java index 2eafb421bd2d..329652741aee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java @@ -86,7 +86,7 @@ public boolean hasNext() { return false; } } - return (kvsIterator.hasNext() || hasMore); + return kvsIterator.hasNext(); } @Override @@ -129,7 +129,8 @@ private StoreScanner createScanner(Store store, KeyValueScanner scanner) } - + /* Refill kev-value set (should be invoked only when KVS is empty) + * Returns false in case there is nothing left and true otherwise */ private boolean refillKVS() { kvs.clear(); // clear previous KVS, first initiated in the constructor if (!hasMore) { // if there is nothing expected next in compactingScanner diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java index 6351f1358b9f..a1d040d2a451 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; import java.io.IOException; +import java.util.LinkedList; /** * A singleton store segment factory. @@ -70,16 +71,35 @@ public MutableSegment createMutableSegment(final Configuration conf, } // create new flat immutable segment from compacting old immutable segment - public ImmutableSegment createImmutableSegment(final Configuration conf, final CellComparator comparator, + public ImmutableSegment createImmutableSegment( + final Configuration conf, final CellComparator comparator, MemStoreCompactorIterator iterator, int numOfCells, ImmutableSegment.Type segmentType) throws IOException { - Preconditions.checkArgument( - segmentType != ImmutableSegment.Type.SKIPLIST_MAP_BASED, "wrong immutable segment type"); - MemStoreLAB memStoreLAB = getMemStoreLAB(conf); + Preconditions.checkArgument(segmentType != ImmutableSegment.Type.SKIPLIST_MAP_BASED, + "wrong immutable segment type"); + MemStoreLAB memStoreLAB = getMemStoreLAB(conf);; return - new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType); + new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, false); } + // create new flat immutable segment from merging old immutable segment + public ImmutableSegment createImmutableSegment( + final Configuration conf, final CellComparator comparator, + MemStoreCompactorIterator iterator, int numOfCells, ImmutableSegment.Type segmentType, + LinkedList segments) + throws IOException { + Preconditions.checkArgument(segmentType != ImmutableSegment.Type.ARRAY_MAP_BASED, + "wrong immutable segment type"); + MemStoreLAB memStoreLAB = getMemStoreLAB(conf);; + + for (Segment s: segments){ + ((HeapMemStoreLAB)memStoreLAB).addPooledChunkQueue( + ((HeapMemStoreLAB)s.getMemStoreLAB()).getChunkQueue()); + } + + return + new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, true); + } //****** private methods to instantiate concrete store segments **********// private MutableSegment generateMutableSegment( diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index db0205e3a1b5..b6298ab398c7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -616,6 +616,8 @@ public void testCompaction1Bucket() throws IOException { @Test public void testCompaction2Buckets() throws IOException { + memstore.getConfiguration().setBoolean("hbase.hregion.compacting.memstore.flatten", false); + String[] keys1 = { "A", "A", "B", "C" }; String[] keys2 = { "A", "B", "D" }; From f499e50d20e135759a2487f2d536e126f125d80d Mon Sep 17 00:00:00 2001 From: anastas Date: Sun, 4 Sep 2016 11:58:03 +0300 Subject: [PATCH 2/5] After Eddie's review comments applied --- .../hbase/regionserver/HeapMemStoreLAB.java | 4 +- .../hbase/regionserver/ImmutableSegment.java | 12 ++--- .../hbase/regionserver/MemStoreCompactor.java | 49 ++++++++++--------- .../MemStoreCompactorIterator.java | 2 +- .../hbase/regionserver/SegmentFactory.java | 4 +- 5 files changed, 37 insertions(+), 34 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java index 4a2c6f2cea14..df0090fcf3f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java @@ -109,8 +109,8 @@ public HeapMemStoreLAB(Configuration conf) { /** * To be used for merging multiple MSLABs */ - public void addPooledChunkQueue(BlockingQueue queueToBeAdded) { - queueToBeAdded.drainTo(pooledChunkQueue); + public void addPooledChunkQueue(BlockingQueue targetQueue) { + targetQueue.drainTo(pooledChunkQueue); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index 30bf818730f9..f6caa9107b38 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -79,7 +79,7 @@ protected ImmutableSegment(Segment segment) { * are going to be introduced. */ protected ImmutableSegment(CellComparator comparator, MemStoreCompactorIterator iterator, - MemStoreLAB memStoreLAB, int numOfCells, Type type, boolean merge) { + MemStoreLAB memStoreLAB, int numOfCells, Type type, boolean doMerge) { super(null, // initiailize the CellSet with NULL comparator, memStoreLAB, @@ -88,7 +88,7 @@ protected ImmutableSegment(CellComparator comparator, MemStoreCompactorIterator ClassSize.CELL_ARRAY_MAP_ENTRY); // build the true CellSet based on CellArrayMap - CellSet cs = createCellArrayMapSet(numOfCells, iterator, merge); + CellSet cs = createCellArrayMapSet(numOfCells, iterator, doMerge); this.setCellSet(null, cs); // update the CellSet of the new Segment this.type = type; @@ -202,13 +202,13 @@ private CellSet createCellArrayMapSet(int numOfCells, MemStoreCompactorIterator while (iterator.hasNext()) { Cell c = iterator.next(); // The scanner behind the iterator is doing all the elimination logic - if (!merge) { - // now we just copy it to the new segment (also MSLAB copy) - cells[i] = maybeCloneWithAllocator(c); - } else { + if (merge) { // if this is merge we just move the Cell object without copying MSLAB // the sizes still need to be updated in the new segment cells[i] = c; + } else { + // now we just copy it to the new segment (also MSLAB copy) + cells[i] = maybeCloneWithAllocator(c); } boolean usedMSLAB = (getMemStoreLAB()!=null); // second parameter true, because in compaction addition of the cell to new segment diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index c70cfc1a3daf..e04350b41dc4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -46,7 +46,7 @@ class MemStoreCompactor { // Option for external setting of the compacted structure (SkipList, CellArray, etc.) static final String COMPACTING_MEMSTORE_TYPE_KEY = "hbase.hregion.compacting.memstore.type"; - static final int COMPACTING_MEMSTORE_TYPE_DEFAULT = 2; // COMPACT_TO_ARRAY_MAP as default + static final int COMPACTING_MEMSTORE_TYPE_DEFAULT = 2; // COMPACT_DATA_TO_ARRAY_MAP as default // What percentage of the duplications is causing compaction? static final String COMPACTION_THRESHOLD_REMAIN_FRACTION @@ -76,13 +76,13 @@ class MemStoreCompactor { /** * Types of actions to be done on the pipeline upon MemStoreCompaction invocation */ - private enum ActionUponInvocation { - COMPACT_TO_SKIPLIST_MAP, - COMPACT_TO_ARRAY_MAP, - MERGE_TO_ARRAY_MAP + private enum Action { + COMPACT_DATA_TO_SKIPLIST_MAP, + COMPACT_DATA_TO_ARRAY_MAP, + COMPACT_INDEX_TO_ARRAY_MAP } - private ActionUponInvocation type = ActionUponInvocation.COMPACT_TO_ARRAY_MAP; + private Action action = Action.COMPACT_DATA_TO_ARRAY_MAP; public MemStoreCompactor(CompactingMemStore compactingMemStore) { this.compactingMemStore = compactingMemStore; @@ -101,15 +101,15 @@ public MemStoreCompactor(CompactingMemStore compactingMemStore) { public boolean start() throws IOException { if (!compactingMemStore.hasImmutableSegments()) return false; // no compaction on empty - int t = compactingMemStore.getConfiguration().getInt(COMPACTING_MEMSTORE_TYPE_KEY, + int memStoreType = compactingMemStore.getConfiguration().getInt(COMPACTING_MEMSTORE_TYPE_KEY, COMPACTING_MEMSTORE_TYPE_DEFAULT); - switch (t) { - case 1: type = ActionUponInvocation.COMPACT_TO_SKIPLIST_MAP; + switch (memStoreType) { + case 1: action = Action.COMPACT_DATA_TO_SKIPLIST_MAP; break; - case 2: type = ActionUponInvocation.COMPACT_TO_ARRAY_MAP; + case 2: action = Action.COMPACT_DATA_TO_ARRAY_MAP; break; - default: throw new RuntimeException("Unknown type " + type); // sanity check + default: throw new RuntimeException("Unknown action " + action); // sanity check } // get a snapshot of the list of the segments from the pipeline, @@ -118,7 +118,7 @@ public boolean start() throws IOException { immutCellsNum = versionedList.getNumOfCells(); if (LOG.isDebugEnabled()) { - LOG.debug("Starting the MemStore In-Memory Shrink of type " + type + " for store " + LOG.debug("Starting the MemStore In-Memory Shrink of type " + action + " for store " + compactingMemStore.getStore().getColumnFamilyName()); } @@ -150,31 +150,34 @@ private void releaseResources() { * still need to evaluate the compaction. */ private boolean shouldFlatten() { - if (isInterrupted.get()) // if the entire process is interrupted refuse to flatten + + if (isInterrupted.get()) // if the entire process is interrupted cancel flattening return false; // the compaction also doesn't start when interrupted + boolean userToFlatten = // the user configurable option to flatten or not to flatten compactingMemStore.getConfiguration().getBoolean(MEMSTORE_COMPACTOR_FLATTENING, MEMSTORE_COMPACTOR_FLATTENING_DEFAULT); if (userToFlatten==false) { - LOG.debug("In-Memory shrink is doing compaction, as user asked to avoid flattening"); + LOG.debug("In-Memory Compaction compacts the data, as user asked to avoid flattening"); return false; // the user doesn't want to flatten } // limit the number of the segments in the pipeline int numOfSegments = versionedList.getNumOfSegments(); if (numOfSegments > 3) { // hard-coded for now as it is going to move to policy - LOG.debug("In-Memory shrink is doing MERGE, as there already are " + numOfSegments + LOG.debug("In-Memory Compaction merges the segments, as there already are " + numOfSegments + " segments in the compaction pipeline"); - type = ActionUponInvocation.MERGE_TO_ARRAY_MAP; + action = Action.COMPACT_INDEX_TO_ARRAY_MAP; return false; // to avoid "too many open files later", compact now } + // till here we hvae all the signs that it is possible to flatten, run the speculative scan // (if allowed by the user) to check the efficiency of compaction boolean avoidSpeculativeScan = // the user configurable option to avoid the speculative scan compactingMemStore.getConfiguration().getBoolean(MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN, MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN_DEFAULT); if (avoidSpeculativeScan==true) { - LOG.debug("In-Memory shrink is doing flattening, as user asked to avoid compaction " + LOG.debug("In-Memory Compaction flattens the new segment, as user asked to avoid compaction " + "evaluation"); return true; // flatten without checking the compaction expedience } @@ -233,7 +236,7 @@ private void doCompaction() { } /**---------------------------------------------------------------------- - * The copy-compaction is the creation of the ImmutableSegment (from the relevant type) + * The copy-compaction is the creation of the ImmutableSegment (from the relevant action) * based on the Compactor Iterator. The new ImmutableSegment is returned. */ private ImmutableSegment compact(int numOfCells) throws IOException { @@ -248,22 +251,22 @@ private ImmutableSegment compact(int numOfCells) throws IOException { compactingMemStore.getComparator(), compactionKVMax, compactingMemStore.getStore()); try { - switch (type) { - case COMPACT_TO_SKIPLIST_MAP: + switch (action) { + case COMPACT_DATA_TO_SKIPLIST_MAP: result = SegmentFactory.instance().createImmutableSegment( compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator); break; - case COMPACT_TO_ARRAY_MAP: + case COMPACT_DATA_TO_ARRAY_MAP: result = SegmentFactory.instance().createImmutableSegment( compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, numOfCells, ImmutableSegment.Type.ARRAY_MAP_BASED); break; - case MERGE_TO_ARRAY_MAP: + case COMPACT_INDEX_TO_ARRAY_MAP: result = SegmentFactory.instance().createImmutableSegment( compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, numOfCells, ImmutableSegment.Type.ARRAY_MAP_BASED, versionedList.getStoreSegments()); break; - default: throw new RuntimeException("Unknown type " + type); // sanity check + default: throw new RuntimeException("Unknown action " + action); // sanity check } } finally { iterator.close(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java index 329652741aee..af7f7e45aaa0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java @@ -130,7 +130,7 @@ private StoreScanner createScanner(Store store, KeyValueScanner scanner) /* Refill kev-value set (should be invoked only when KVS is empty) - * Returns false in case there is nothing left and true otherwise */ + * Returns true if KVS is non-empty */ private boolean refillKVS() { kvs.clear(); // clear previous KVS, first initiated in the constructor if (!hasMore) { // if there is nothing expected next in compactingScanner diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java index a1d040d2a451..8df5dc4ff3be 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java @@ -77,7 +77,7 @@ public ImmutableSegment createImmutableSegment( throws IOException { Preconditions.checkArgument(segmentType != ImmutableSegment.Type.SKIPLIST_MAP_BASED, "wrong immutable segment type"); - MemStoreLAB memStoreLAB = getMemStoreLAB(conf);; + MemStoreLAB memStoreLAB = getMemStoreLAB(conf); return new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, false); } @@ -90,7 +90,7 @@ public ImmutableSegment createImmutableSegment( throws IOException { Preconditions.checkArgument(segmentType != ImmutableSegment.Type.ARRAY_MAP_BASED, "wrong immutable segment type"); - MemStoreLAB memStoreLAB = getMemStoreLAB(conf);; + MemStoreLAB memStoreLAB = getMemStoreLAB(conf); for (Segment s: segments){ ((HeapMemStoreLAB)memStoreLAB).addPooledChunkQueue( From 0782b7bf0ceaeaddaffc7faaf0cb2c3aa1f35369 Mon Sep 17 00:00:00 2001 From: anastas Date: Wed, 7 Sep 2016 15:53:52 +0300 Subject: [PATCH 3/5] Eshcar's code review fixes --- .../hbase/regionserver/HeapMemStoreLAB.java | 4 +- .../hbase/regionserver/MemStoreCompactor.java | 174 +++++++++--------- .../hbase/regionserver/SegmentFactory.java | 6 +- 3 files changed, 98 insertions(+), 86 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java index df0090fcf3f7..69c8af31e612 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java @@ -69,6 +69,8 @@ public class HeapMemStoreLAB implements MemStoreLAB { private AtomicReference curChunk = new AtomicReference(); // A queue of chunks from pool contained by this memstore LAB + // TODO: in the future, it would be better to have List implementation instead of Queue, + // as FIFO order is not so important here @VisibleForTesting BlockingQueue pooledChunkQueue = null; private final int chunkSize; @@ -249,7 +251,7 @@ Chunk getCurrentChunk() { return this.curChunk.get(); } - //@VisibleForTesting + public BlockingQueue getChunkQueue() { return this.pooledChunkQueue; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index e04350b41dc4..ae37da31a5c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -40,24 +40,25 @@ @InterfaceAudience.Private class MemStoreCompactor { - // Option for external guidance whether flattening is allowed - static final String MEMSTORE_COMPACTOR_FLATTENING = "hbase.hregion.compacting.memstore.flatten"; - static final boolean MEMSTORE_COMPACTOR_FLATTENING_DEFAULT = true; - - // Option for external setting of the compacted structure (SkipList, CellArray, etc.) + // The external setting of the compacting MemStore behaviour + // Compaction of the index without the data is the default static final String COMPACTING_MEMSTORE_TYPE_KEY = "hbase.hregion.compacting.memstore.type"; - static final int COMPACTING_MEMSTORE_TYPE_DEFAULT = 2; // COMPACT_DATA_TO_ARRAY_MAP as default + static final String COMPACTING_MEMSTORE_TYPE_DEFAULT = "index-compaction"; // What percentage of the duplications is causing compaction? static final String COMPACTION_THRESHOLD_REMAIN_FRACTION = "hbase.hregion.compacting.memstore.comactPercent"; - static final double COMPACTION_THRESHOLD_REMAIN_FRACTION_DEFAULT = 0.2; + static final double COMPACTION_THRESHOLD_REMAIN_FRACTION_DEFAULT = 0.3; - // Option for external guidance whether the flattening is allowed + // Option for external guidance whether the speculative scan + // (to evaluate compaction efficiency) is allowed static final String MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN = "hbase.hregion.compacting.memstore.avoidSpeculativeScan"; static final boolean MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN_DEFAULT = false; + // Maximal number of the segments in the compaction pipeline + private static final int THRESHOLD_PIPELINE_SEGMENTS = 3; + private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class); private CompactingMemStore compactingMemStore; @@ -70,24 +71,26 @@ class MemStoreCompactor { // the limit to the size of the groups to be later provided to MemStoreCompactorIterator private final int compactionKVMax; - double fraction = 0.8; + double fraction = 0.7; int immutCellsNum = 0; // number of immutable for compaction cells + /** * Types of actions to be done on the pipeline upon MemStoreCompaction invocation */ private enum Action { - COMPACT_DATA_TO_SKIPLIST_MAP, - COMPACT_DATA_TO_ARRAY_MAP, - COMPACT_INDEX_TO_ARRAY_MAP + NOP, + FLATTEN, + MERGE, + COMPACT } - private Action action = Action.COMPACT_DATA_TO_ARRAY_MAP; + private Action action = Action.FLATTEN; public MemStoreCompactor(CompactingMemStore compactingMemStore) { this.compactingMemStore = compactingMemStore; - this.compactionKVMax = compactingMemStore.getConfiguration().getInt( - HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); + this.compactionKVMax = compactingMemStore.getConfiguration() + .getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); this.fraction = 1 - compactingMemStore.getConfiguration().getDouble( COMPACTION_THRESHOLD_REMAIN_FRACTION, COMPACTION_THRESHOLD_REMAIN_FRACTION_DEFAULT); @@ -99,17 +102,19 @@ public MemStoreCompactor(CompactingMemStore compactingMemStore) { * is already an ongoing compaction or no segments to compact. */ public boolean start() throws IOException { - if (!compactingMemStore.hasImmutableSegments()) return false; // no compaction on empty + if (!compactingMemStore.hasImmutableSegments()) // no compaction on empty pipeline + return false; - int memStoreType = compactingMemStore.getConfiguration().getInt(COMPACTING_MEMSTORE_TYPE_KEY, + String memStoreType = compactingMemStore.getConfiguration().get(COMPACTING_MEMSTORE_TYPE_KEY, COMPACTING_MEMSTORE_TYPE_DEFAULT); switch (memStoreType) { - case 1: action = Action.COMPACT_DATA_TO_SKIPLIST_MAP; + case "index-compaction": action = Action.MERGE; break; - case 2: action = Action.COMPACT_DATA_TO_ARRAY_MAP; + case "data-compaction": action = Action.COMPACT; break; - default: throw new RuntimeException("Unknown action " + action); // sanity check + default: + throw new RuntimeException("Unknown action " + action); // sanity check } // get a snapshot of the list of the segments from the pipeline, @@ -118,7 +123,7 @@ public boolean start() throws IOException { immutCellsNum = versionedList.getNumOfCells(); if (LOG.isDebugEnabled()) { - LOG.debug("Starting the MemStore In-Memory Shrink of type " + action + " for store " + LOG.debug("Starting the In-Memory Compaction for store " + compactingMemStore.getStore().getColumnFamilyName()); } @@ -149,47 +154,42 @@ private void releaseResources() { * returns false if we must compact. If this method returns true we * still need to evaluate the compaction. */ - private boolean shouldFlatten() { + private Action policy() { if (isInterrupted.get()) // if the entire process is interrupted cancel flattening - return false; // the compaction also doesn't start when interrupted - - boolean userToFlatten = // the user configurable option to flatten or not to flatten - compactingMemStore.getConfiguration().getBoolean(MEMSTORE_COMPACTOR_FLATTENING, - MEMSTORE_COMPACTOR_FLATTENING_DEFAULT); - if (userToFlatten==false) { - LOG.debug("In-Memory Compaction compacts the data, as user asked to avoid flattening"); - return false; // the user doesn't want to flatten + return Action.NOP; // the compaction also doesn't start when interrupted + + if (action == Action.COMPACT) { // try to compact if it worth it + // check if running the speculative scan (to check the efficiency of compaction) + // is allowed by the user + boolean avoidSpeculativeScan = + compactingMemStore.getConfiguration().getBoolean( + MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN, + MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN_DEFAULT); + if (avoidSpeculativeScan==true) { + LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName() + + " is going to be compacted, without compaction-evaluation"); + return Action.COMPACT; // compact without checking the compaction expedience + } + if (worthDoingCompaction()) { + return Action.COMPACT; // compact because it worth it + } } + // compaction shouldn't happen or doesn't worth it // limit the number of the segments in the pipeline int numOfSegments = versionedList.getNumOfSegments(); - if (numOfSegments > 3) { // hard-coded for now as it is going to move to policy - LOG.debug("In-Memory Compaction merges the segments, as there already are " + numOfSegments - + " segments in the compaction pipeline"); - action = Action.COMPACT_INDEX_TO_ARRAY_MAP; - return false; // to avoid "too many open files later", compact now + if (numOfSegments > THRESHOLD_PIPELINE_SEGMENTS) { + LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName() + + " is going to be merged, as there are " + numOfSegments + " segments"); + action = Action.MERGE; + return Action.MERGE; // to avoid too many segments, merge now } - // till here we hvae all the signs that it is possible to flatten, run the speculative scan - // (if allowed by the user) to check the efficiency of compaction - boolean avoidSpeculativeScan = // the user configurable option to avoid the speculative scan - compactingMemStore.getConfiguration().getBoolean(MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN, - MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN_DEFAULT); - if (avoidSpeculativeScan==true) { - LOG.debug("In-Memory Compaction flattens the new segment, as user asked to avoid compaction " - + "evaluation"); - return true; // flatten without checking the compaction expedience - } - try { - immutCellsNum = countCellsForCompaction(); - if (immutCellsNum > fraction * versionedList.getNumOfCells()) { - return true; - } - } catch(Exception e) { - return true; - } - return false; + // if nothing of the above, then just flatten the newly joined segment + LOG.debug("The youngest segment in the in-Memory Compaction Pipeline for store " + + compactingMemStore.getFamilyName() + " is going to be flattened"); + return Action.FLATTEN; } /**---------------------------------------------------------------------- @@ -202,22 +202,25 @@ private void doCompaction() { boolean resultSwapped = false; try { - // PHASE I: estimate the compaction expedience - EVALUATE COMPACTION - if (shouldFlatten()) { - // too much cells "survive" the possible compaction, we do not want to compact! - LOG.debug("In-Memory compaction does not pay off - storing the flattened segment" - + " for store: " + compactingMemStore.getFamilyName()); - // Looking for Segment in the pipeline with SkipList index, to make it flat + Action nextStep = policy(); + switch (nextStep){ + case FLATTEN: // Youngest Segment in the pipeline is with SkipList index, make it flat compactingMemStore.flattenOneSegment(versionedList.getVersion()); + case NOP: // intentionally falling through return; + case MERGE: + case COMPACT: + break; + default: throw new RuntimeException("Unknown action " + action); // sanity check } - // PHASE II: create the new compacted ImmutableSegment - START COPY-COMPACTION + // Create one segment representing all segments in the compaction pipeline, + // either by compaction or by merge if (!isInterrupted.get()) { - result = compact(immutCellsNum); + result = createSubstitution(); } - // Phase III: swap the old compaction pipeline - END COPY-COMPACTION + // Substitute the pipeline with one segment if (!isInterrupted.get()) { if (resultSwapped = compactingMemStore.swapCompactedSegments(versionedList, result)) { // update the wal so it can be truncated and not get too long @@ -236,14 +239,10 @@ private void doCompaction() { } /**---------------------------------------------------------------------- - * The copy-compaction is the creation of the ImmutableSegment (from the relevant action) - * based on the Compactor Iterator. The new ImmutableSegment is returned. + * Creation of the ImmutableSegment either by merge or copy-compact of the segments of the + * pipeline, based on the Compactor Iterator. The new ImmutableSegment is returned. */ - private ImmutableSegment compact(int numOfCells) throws IOException { - - LOG.debug("In-Memory compaction does pay off - The estimated number of cells " - + "after compaction is " + numOfCells + ", while number of cells before is " + versionedList - .getNumOfCells() + ". The fraction of remaining cells should be: " + fraction); + private ImmutableSegment createSubstitution() throws IOException { ImmutableSegment result = null; MemStoreCompactorIterator iterator = @@ -252,19 +251,15 @@ private ImmutableSegment compact(int numOfCells) throws IOException { compactionKVMax, compactingMemStore.getStore()); try { switch (action) { - case COMPACT_DATA_TO_SKIPLIST_MAP: - result = SegmentFactory.instance().createImmutableSegment( - compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator); - break; - case COMPACT_DATA_TO_ARRAY_MAP: + case COMPACT: result = SegmentFactory.instance().createImmutableSegment( compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, - numOfCells, ImmutableSegment.Type.ARRAY_MAP_BASED); + immutCellsNum, ImmutableSegment.Type.ARRAY_MAP_BASED); break; - case COMPACT_INDEX_TO_ARRAY_MAP: + case MERGE: result = SegmentFactory.instance().createImmutableSegment( compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, - numOfCells, ImmutableSegment.Type.ARRAY_MAP_BASED, versionedList.getStoreSegments()); + immutCellsNum, ImmutableSegment.Type.ARRAY_MAP_BASED, versionedList.getStoreSegments()); break; default: throw new RuntimeException("Unknown action " + action); // sanity check } @@ -276,24 +271,35 @@ private ImmutableSegment compact(int numOfCells) throws IOException { } /**---------------------------------------------------------------------- - * Count cells to estimate the efficiency of the future compaction + * Estimate the efficiency of the future compaction */ - private int countCellsForCompaction() throws IOException { + private boolean worthDoingCompaction() { int cnt = 0; - MemStoreCompactorIterator iterator = + MemStoreCompactorIterator iterator = null; + + try { + iterator = new MemStoreCompactorIterator( versionedList.getStoreSegments(), compactingMemStore.getComparator(), compactionKVMax, compactingMemStore.getStore()); - try { while (iterator.next() != null) { cnt++; } + } catch(Exception e) { + return false; } finally { - iterator.close(); + if (iterator!=null) iterator.close(); } - return cnt; + if (cnt >= fraction * versionedList.getNumOfCells()) + return false; + + LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName() + + " is going to be compacted, according to compaction-evaluation, " + cnt + + " cells will remain out of " + versionedList.getNumOfCells()); + immutCellsNum = cnt; + return true ; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java index 8df5dc4ff3be..75bdf3dbc92c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java @@ -79,7 +79,9 @@ public ImmutableSegment createImmutableSegment( "wrong immutable segment type"); MemStoreLAB memStoreLAB = getMemStoreLAB(conf); return - new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, false); + // the last parameter "false" means not to merge, but to compact the pipeline + // in order to create the new segment + new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, false); } // create new flat immutable segment from merging old immutable segment @@ -98,6 +100,8 @@ public ImmutableSegment createImmutableSegment( } return + // the last parameter "true" means to merge the compaction pipeline + // in order to create the new segment new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, true); } //****** private methods to instantiate concrete store segments **********// From 449b51cbeae61cd6a176169fb6660bd0f592d780 Mon Sep 17 00:00:00 2001 From: anastas Date: Thu, 8 Sep 2016 14:10:05 +0300 Subject: [PATCH 4/5] Debugging for regression --- .../hbase/regionserver/MemStoreCompactor.java | 2 +- .../regionserver/TestCompactingMemStore.java | 15 ++++++++++++++- .../TestCompactingToCellArrayMapMemStore.java | 13 +++++++------ 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index ae37da31a5c7..448a8e893c02 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -114,7 +114,7 @@ public boolean start() throws IOException { case "data-compaction": action = Action.COMPACT; break; default: - throw new RuntimeException("Unknown action " + action); // sanity check + throw new RuntimeException("Unknown memstore type " + memStoreType); // sanity check } // get a snapshot of the list of the segments from the pipeline, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index b6298ab398c7..d4843317e162 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -589,6 +589,11 @@ public void testPuttingBackChunksWithOpeningPipelineScanner() @Test public void testCompaction1Bucket() throws IOException { + // set memstore to do data compaction and not to use the speculative scan + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction"); + memstore.getConfiguration().setBoolean("hbase.hregion.compacting.memstore.avoidSpeculativeScan", + true); + String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4 // test 1 bucket @@ -616,7 +621,10 @@ public void testCompaction1Bucket() throws IOException { @Test public void testCompaction2Buckets() throws IOException { - memstore.getConfiguration().setBoolean("hbase.hregion.compacting.memstore.flatten", false); + // set memstore to do data compaction and not to use the speculative scan + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction"); + memstore.getConfiguration().setBoolean( + "hbase.hregion.compacting.memstore.avoidSpeculativeScan", true); String[] keys1 = { "A", "A", "B", "C" }; String[] keys2 = { "A", "B", "D" }; @@ -662,6 +670,11 @@ public void testCompaction2Buckets() throws IOException { @Test public void testCompaction3Buckets() throws IOException { + // set memstore to do data compaction and not to use the speculative scan + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction"); + memstore.getConfiguration().setBoolean( + "hbase.hregion.compacting.memstore.avoidSpeculativeScan", true); + String[] keys1 = { "A", "A", "B", "C" }; String[] keys2 = { "A", "B", "D" }; String[] keys3 = { "D", "B", "B" }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java index 1933343b0012..c3fae3bff6c2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java @@ -67,7 +67,9 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore compactingSetUp(); Configuration conf = HBaseConfiguration.create(); - conf.setLong("hbase.hregion.compacting.memstore.type", 2); // compact to CellArrayMap + // set memstore to do data compaction and not to use the speculative scan + conf.set("hbase.hregion.compacting.memstore.type", "data-compaction"); + conf.setBoolean("hbase.hregion.compacting.memstore.avoidSpeculativeScan", true); this.memstore = new CompactingMemStore(conf, CellComparator.COMPARATOR, store, @@ -221,17 +223,16 @@ public void testCompaction3Buckets() throws IOException { } ////////////////////////////////////////////////////////////////////////////// - // Flattening tests + // Merging tests ////////////////////////////////////////////////////////////////////////////// @Test - public void testFlattening() throws IOException { + public void testMerging() throws IOException { String[] keys1 = { "A", "A", "B", "C", "F", "H"}; String[] keys2 = { "A", "B", "D", "G", "I", "J"}; String[] keys3 = { "D", "B", "B", "E" }; - // set flattening to true - memstore.getConfiguration().setBoolean("hbase.hregion.compacting.memstore.flatten", true); + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "index-compaction"); addRowsByKeys(memstore, keys1); @@ -264,7 +265,7 @@ public void testFlattening() throws IOException { for ( Segment s : memstore.getSegments()) { counter += s.getCellsCount(); } - assertEquals(10,counter); + assertEquals(16,counter); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot ImmutableSegment s = memstore.getSnapshot(); From e928406596f3632a94af0ac34eac9eb858b51fec Mon Sep 17 00:00:00 2001 From: anastas Date: Sat, 10 Sep 2016 23:16:16 +0300 Subject: [PATCH 5/5] fix after debugging --- .../regionserver/CompactingMemStore.java | 6 ++-- .../regionserver/CompactionPipeline.java | 16 ++++++--- .../hbase/regionserver/ImmutableSegment.java | 8 ++--- .../hbase/regionserver/MemStoreCompactor.java | 33 +++++++++++-------- .../regionserver/TestCompactingMemStore.java | 14 +++++--- .../TestCompactingToCellArrayMapMemStore.java | 2 +- 6 files changed, 48 insertions(+), 31 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 504ddabdab52..424e884a0994 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -204,9 +204,9 @@ public List getSegments() { return list; } - public boolean swapCompactedSegments(VersionedSegmentsList versionedList, - ImmutableSegment result) { - return pipeline.swap(versionedList, result); + public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result, + boolean merge) { + return pipeline.swap(versionedList, result, merge); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index e0ba8c3ac0d8..849b67bacaae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -83,9 +83,11 @@ public VersionedSegmentsList getVersionedList() { * Swapping only if there were no changes to the suffix of the list while it was compacted. * @param versionedList tail of the pipeline that was compacted * @param segment new compacted segment + * @param merge * @return true iff swapped tail with new compacted segment */ - public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment) { + public boolean swap( + VersionedSegmentsList versionedList, ImmutableSegment segment, boolean merge) { if (versionedList.getVersion() != version) { return false; } @@ -101,13 +103,14 @@ public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segmen + versionedList.getStoreSegments().size() + ", and the number of cells in new segment is:" + segment.getCellsCount()); } - swapSuffix(suffix,segment); + swapSuffix(suffix,segment, merge); } if (region != null) { // update the global memstore size counter long suffixSize = CompactingMemStore.getSegmentsSize(suffix); long newSize = CompactingMemStore.getSegmentSize(segment); long delta = suffixSize - newSize; + if (merge) assert(delta==0); // sanity check long globalMemstoreSize = region.addAndGetGlobalMemstoreSize(-delta); if (LOG.isDebugEnabled()) { LOG.debug("Suffix size: " + suffixSize + " compacted item size: " + newSize @@ -189,10 +192,13 @@ public long getTailSize() { return CompactingMemStore.getSegmentSize(pipeline.peekLast()); } - private void swapSuffix(LinkedList suffix, ImmutableSegment segment) { + private void swapSuffix(LinkedList suffix, ImmutableSegment segment, + boolean merge) { version++; - for(Segment itemInSuffix : suffix) { - itemInSuffix.close(); + if (!merge) { + for (Segment itemInSuffix : suffix) { + itemInSuffix.close(); + } } pipeline.removeAll(suffix); pipeline.addLast(segment); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index f6caa9107b38..5ffed0274615 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -195,14 +195,14 @@ public boolean flatten() { /*------------------------------------------------------------------------*/ // Create CellSet based on CellArrayMap from compacting iterator private CellSet createCellArrayMapSet(int numOfCells, MemStoreCompactorIterator iterator, - boolean merge) { + boolean doMerge) { Cell[] cells = new Cell[numOfCells]; // build the Cell Array int i = 0; while (iterator.hasNext()) { Cell c = iterator.next(); // The scanner behind the iterator is doing all the elimination logic - if (merge) { + if (doMerge) { // if this is merge we just move the Cell object without copying MSLAB // the sizes still need to be updated in the new segment cells[i] = c; @@ -210,10 +210,10 @@ private CellSet createCellArrayMapSet(int numOfCells, MemStoreCompactorIterator // now we just copy it to the new segment (also MSLAB copy) cells[i] = maybeCloneWithAllocator(c); } - boolean usedMSLAB = (getMemStoreLAB()!=null); + boolean useMSLAB = (getMemStoreLAB()!=null); // second parameter true, because in compaction addition of the cell to new segment // is always successful - updateMetaInfo(c, true, usedMSLAB); // updates the size per cell + updateMetaInfo(c, true, useMSLAB); // updates the size per cell i++; } // build the immutable CellSet diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index 448a8e893c02..4e3d486f0878 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -52,9 +52,9 @@ class MemStoreCompactor { // Option for external guidance whether the speculative scan // (to evaluate compaction efficiency) is allowed - static final String MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN - = "hbase.hregion.compacting.memstore.avoidSpeculativeScan"; - static final boolean MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN_DEFAULT = false; + static final String MEMSTORE_COMPACTOR_SPECULATIVE_SCAN + = "hbase.hregion.compacting.memstore.speculativeScan"; + static final boolean MEMSTORE_COMPACTOR_SPECULATIVE_SCAN_DEFAULT = true; // Maximal number of the segments in the compaction pipeline private static final int THRESHOLD_PIPELINE_SEGMENTS = 3; @@ -71,18 +71,22 @@ class MemStoreCompactor { // the limit to the size of the groups to be later provided to MemStoreCompactorIterator private final int compactionKVMax; + // the upper bound on the percentage of the pre-compaction amount of cells, + // which are going to "survive" after compaction; no compaction for percentage above this double fraction = 0.7; int immutCellsNum = 0; // number of immutable for compaction cells /** - * Types of actions to be done on the pipeline upon MemStoreCompaction invocation + * Types of actions to be done on the pipeline upon MemStoreCompaction invocation. + * Note that every value covers the previous ones, i.e. if MERGE is the action it implies + * that the youngest segment is going to be flatten anyway. */ private enum Action { NOP, - FLATTEN, - MERGE, - COMPACT + FLATTEN, // flatten the youngest segment in the pipeline + MERGE, // merge all the segments in the pipeline into one + COMPACT // copy-compact the data of all the segments in the pipeline } private Action action = Action.FLATTEN; @@ -102,8 +106,9 @@ public MemStoreCompactor(CompactingMemStore compactingMemStore) { * is already an ongoing compaction or no segments to compact. */ public boolean start() throws IOException { - if (!compactingMemStore.hasImmutableSegments()) // no compaction on empty pipeline + if (!compactingMemStore.hasImmutableSegments()) { // no compaction on empty pipeline return false; + } String memStoreType = compactingMemStore.getConfiguration().get(COMPACTING_MEMSTORE_TYPE_KEY, COMPACTING_MEMSTORE_TYPE_DEFAULT); @@ -162,11 +167,10 @@ private Action policy() { if (action == Action.COMPACT) { // try to compact if it worth it // check if running the speculative scan (to check the efficiency of compaction) // is allowed by the user - boolean avoidSpeculativeScan = - compactingMemStore.getConfiguration().getBoolean( - MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN, - MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN_DEFAULT); - if (avoidSpeculativeScan==true) { + boolean useSpeculativeScan = + compactingMemStore.getConfiguration().getBoolean(MEMSTORE_COMPACTOR_SPECULATIVE_SCAN, + MEMSTORE_COMPACTOR_SPECULATIVE_SCAN_DEFAULT); + if (useSpeculativeScan==false) { LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName() + " is going to be compacted, without compaction-evaluation"); return Action.COMPACT; // compact without checking the compaction expedience @@ -222,7 +226,8 @@ private void doCompaction() { // Substitute the pipeline with one segment if (!isInterrupted.get()) { - if (resultSwapped = compactingMemStore.swapCompactedSegments(versionedList, result)) { + if (resultSwapped = compactingMemStore.swapCompactedSegments( + versionedList, result, (action==Action.MERGE))) { // update the wal so it can be truncated and not get too long compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index d4843317e162..ebde1ea4a4e9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -508,6 +508,12 @@ public void testPuttingBackChunksWithOpeningScanner() @Test public void testPuttingBackChunksWithOpeningPipelineScanner() throws IOException { + + // set memstore to do data compaction and not to use the speculative scan + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction"); + memstore.getConfiguration().setBoolean("hbase.hregion.compacting.memstore.speculativeScan", + false); + byte[] row = Bytes.toBytes("testrow"); byte[] fam = Bytes.toBytes("testfamily"); byte[] qf1 = Bytes.toBytes("testqualifier1"); @@ -591,8 +597,8 @@ public void testCompaction1Bucket() throws IOException { // set memstore to do data compaction and not to use the speculative scan memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction"); - memstore.getConfiguration().setBoolean("hbase.hregion.compacting.memstore.avoidSpeculativeScan", - true); + memstore.getConfiguration().setBoolean("hbase.hregion.compacting.memstore.speculativeScan", + false); String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4 @@ -624,7 +630,7 @@ public void testCompaction2Buckets() throws IOException { // set memstore to do data compaction and not to use the speculative scan memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction"); memstore.getConfiguration().setBoolean( - "hbase.hregion.compacting.memstore.avoidSpeculativeScan", true); + "hbase.hregion.compacting.memstore.speculativeScan", false); String[] keys1 = { "A", "A", "B", "C" }; String[] keys2 = { "A", "B", "D" }; @@ -673,7 +679,7 @@ public void testCompaction3Buckets() throws IOException { // set memstore to do data compaction and not to use the speculative scan memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction"); memstore.getConfiguration().setBoolean( - "hbase.hregion.compacting.memstore.avoidSpeculativeScan", true); + "hbase.hregion.compacting.memstore.speculativeScan", false); String[] keys1 = { "A", "A", "B", "C" }; String[] keys2 = { "A", "B", "D" }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java index c3fae3bff6c2..2a9cb1f4e4ae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java @@ -69,7 +69,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore // set memstore to do data compaction and not to use the speculative scan conf.set("hbase.hregion.compacting.memstore.type", "data-compaction"); - conf.setBoolean("hbase.hregion.compacting.memstore.avoidSpeculativeScan", true); + conf.setBoolean("hbase.hregion.compacting.memstore.speculativeScan", false); this.memstore = new CompactingMemStore(conf, CellComparator.COMPARATOR, store,