diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 504ddabdab52..424e884a0994 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -204,9 +204,9 @@ public List getSegments() { return list; } - public boolean swapCompactedSegments(VersionedSegmentsList versionedList, - ImmutableSegment result) { - return pipeline.swap(versionedList, result); + public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result, + boolean merge) { + return pipeline.swap(versionedList, result, merge); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index e0ba8c3ac0d8..849b67bacaae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -83,9 +83,11 @@ public VersionedSegmentsList getVersionedList() { * Swapping only if there were no changes to the suffix of the list while it was compacted. * @param versionedList tail of the pipeline that was compacted * @param segment new compacted segment + * @param merge * @return true iff swapped tail with new compacted segment */ - public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment) { + public boolean swap( + VersionedSegmentsList versionedList, ImmutableSegment segment, boolean merge) { if (versionedList.getVersion() != version) { return false; } @@ -101,13 +103,14 @@ public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segmen + versionedList.getStoreSegments().size() + ", and the number of cells in new segment is:" + segment.getCellsCount()); } - swapSuffix(suffix,segment); + swapSuffix(suffix,segment, merge); } if (region != null) { // update the global memstore size counter long suffixSize = CompactingMemStore.getSegmentsSize(suffix); long newSize = CompactingMemStore.getSegmentSize(segment); long delta = suffixSize - newSize; + if (merge) assert(delta==0); // sanity check long globalMemstoreSize = region.addAndGetGlobalMemstoreSize(-delta); if (LOG.isDebugEnabled()) { LOG.debug("Suffix size: " + suffixSize + " compacted item size: " + newSize @@ -189,10 +192,13 @@ public long getTailSize() { return CompactingMemStore.getSegmentSize(pipeline.peekLast()); } - private void swapSuffix(LinkedList suffix, ImmutableSegment segment) { + private void swapSuffix(LinkedList suffix, ImmutableSegment segment, + boolean merge) { version++; - for(Segment itemInSuffix : suffix) { - itemInSuffix.close(); + if (!merge) { + for (Segment itemInSuffix : suffix) { + itemInSuffix.close(); + } } pipeline.removeAll(suffix); pipeline.addLast(segment); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java index 3ca4b0c52b2c..69c8af31e612 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java @@ -69,6 +69,8 @@ public class HeapMemStoreLAB implements MemStoreLAB { private AtomicReference curChunk = new AtomicReference(); // A queue of chunks from pool contained by this memstore LAB + // TODO: in the future, it would be better to have List implementation instead of Queue, + // as FIFO order is not so important here @VisibleForTesting BlockingQueue pooledChunkQueue = null; private final int chunkSize; @@ -106,6 +108,13 @@ public HeapMemStoreLAB(Configuration conf) { MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); } + /** + * To be used for merging multiple MSLABs + */ + public void addPooledChunkQueue(BlockingQueue targetQueue) { + targetQueue.drainTo(pooledChunkQueue); + } + /** * Allocate a slice of the given length. * @@ -242,8 +251,8 @@ Chunk getCurrentChunk() { return this.curChunk.get(); } - @VisibleForTesting - BlockingQueue getChunkQueue() { + + public BlockingQueue getChunkQueue() { return this.pooledChunkQueue; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index 28f14d5c2df3..5ffed0274615 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -79,7 +79,7 @@ protected ImmutableSegment(Segment segment) { * are going to be introduced. */ protected ImmutableSegment(CellComparator comparator, MemStoreCompactorIterator iterator, - MemStoreLAB memStoreLAB, int numOfCells, Type type) { + MemStoreLAB memStoreLAB, int numOfCells, Type type, boolean doMerge) { super(null, // initiailize the CellSet with NULL comparator, memStoreLAB, @@ -88,7 +88,7 @@ protected ImmutableSegment(CellComparator comparator, MemStoreCompactorIterator ClassSize.CELL_ARRAY_MAP_ENTRY); // build the true CellSet based on CellArrayMap - CellSet cs = createCellArrayMapSet(numOfCells, iterator); + CellSet cs = createCellArrayMapSet(numOfCells, iterator, doMerge); this.setCellSet(null, cs); // update the CellSet of the new Segment this.type = type; @@ -194,19 +194,26 @@ public boolean flatten() { ///////////////////// PRIVATE METHODS ///////////////////// /*------------------------------------------------------------------------*/ // Create CellSet based on CellArrayMap from compacting iterator - private CellSet createCellArrayMapSet(int numOfCells, MemStoreCompactorIterator iterator) { + private CellSet createCellArrayMapSet(int numOfCells, MemStoreCompactorIterator iterator, + boolean doMerge) { Cell[] cells = new Cell[numOfCells]; // build the Cell Array int i = 0; while (iterator.hasNext()) { Cell c = iterator.next(); // The scanner behind the iterator is doing all the elimination logic - // now we just copy it to the new segment (also MSLAB copy) - cells[i] = maybeCloneWithAllocator(c); - boolean usedMSLAB = (cells[i] != c); + if (doMerge) { + // if this is merge we just move the Cell object without copying MSLAB + // the sizes still need to be updated in the new segment + cells[i] = c; + } else { + // now we just copy it to the new segment (also MSLAB copy) + cells[i] = maybeCloneWithAllocator(c); + } + boolean useMSLAB = (getMemStoreLAB()!=null); // second parameter true, because in compaction addition of the cell to new segment // is always successful - updateMetaInfo(c, true, usedMSLAB); // updates the size per cell + updateMetaInfo(c, true, useMSLAB); // updates the size per cell i++; } // build the immutable CellSet diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index 470dc9c8daa0..4e3d486f0878 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -40,23 +40,24 @@ @InterfaceAudience.Private class MemStoreCompactor { - // Option for external guidance whether flattening is allowed - static final String MEMSTORE_COMPACTOR_FLATTENING = "hbase.hregion.compacting.memstore.flatten"; - static final boolean MEMSTORE_COMPACTOR_FLATTENING_DEFAULT = true; - - // Option for external setting of the compacted structure (SkipList, CellArray, etc.) + // The external setting of the compacting MemStore behaviour + // Compaction of the index without the data is the default static final String COMPACTING_MEMSTORE_TYPE_KEY = "hbase.hregion.compacting.memstore.type"; - static final int COMPACTING_MEMSTORE_TYPE_DEFAULT = 2; // COMPACT_TO_ARRAY_MAP as default + static final String COMPACTING_MEMSTORE_TYPE_DEFAULT = "index-compaction"; // What percentage of the duplications is causing compaction? static final String COMPACTION_THRESHOLD_REMAIN_FRACTION = "hbase.hregion.compacting.memstore.comactPercent"; - static final double COMPACTION_THRESHOLD_REMAIN_FRACTION_DEFAULT = 0.2; + static final double COMPACTION_THRESHOLD_REMAIN_FRACTION_DEFAULT = 0.3; + + // Option for external guidance whether the speculative scan + // (to evaluate compaction efficiency) is allowed + static final String MEMSTORE_COMPACTOR_SPECULATIVE_SCAN + = "hbase.hregion.compacting.memstore.speculativeScan"; + static final boolean MEMSTORE_COMPACTOR_SPECULATIVE_SCAN_DEFAULT = true; - // Option for external guidance whether the flattening is allowed - static final String MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN - = "hbase.hregion.compacting.memstore.avoidSpeculativeScan"; - static final boolean MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN_DEFAULT = false; + // Maximal number of the segments in the compaction pipeline + private static final int THRESHOLD_PIPELINE_SEGMENTS = 3; private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class); private CompactingMemStore compactingMemStore; @@ -70,23 +71,30 @@ class MemStoreCompactor { // the limit to the size of the groups to be later provided to MemStoreCompactorIterator private final int compactionKVMax; - double fraction = 0.8; + // the upper bound on the percentage of the pre-compaction amount of cells, + // which are going to "survive" after compaction; no compaction for percentage above this + double fraction = 0.7; int immutCellsNum = 0; // number of immutable for compaction cells + /** - * Types of Compaction + * Types of actions to be done on the pipeline upon MemStoreCompaction invocation. + * Note that every value covers the previous ones, i.e. if MERGE is the action it implies + * that the youngest segment is going to be flatten anyway. */ - private enum Type { - COMPACT_TO_SKIPLIST_MAP, - COMPACT_TO_ARRAY_MAP + private enum Action { + NOP, + FLATTEN, // flatten the youngest segment in the pipeline + MERGE, // merge all the segments in the pipeline into one + COMPACT // copy-compact the data of all the segments in the pipeline } - private Type type = Type.COMPACT_TO_ARRAY_MAP; + private Action action = Action.FLATTEN; public MemStoreCompactor(CompactingMemStore compactingMemStore) { this.compactingMemStore = compactingMemStore; - this.compactionKVMax = compactingMemStore.getConfiguration().getInt( - HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); + this.compactionKVMax = compactingMemStore.getConfiguration() + .getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); this.fraction = 1 - compactingMemStore.getConfiguration().getDouble( COMPACTION_THRESHOLD_REMAIN_FRACTION, COMPACTION_THRESHOLD_REMAIN_FRACTION_DEFAULT); @@ -98,17 +106,20 @@ public MemStoreCompactor(CompactingMemStore compactingMemStore) { * is already an ongoing compaction or no segments to compact. */ public boolean start() throws IOException { - if (!compactingMemStore.hasImmutableSegments()) return false; // no compaction on empty + if (!compactingMemStore.hasImmutableSegments()) { // no compaction on empty pipeline + return false; + } - int t = compactingMemStore.getConfiguration().getInt(COMPACTING_MEMSTORE_TYPE_KEY, + String memStoreType = compactingMemStore.getConfiguration().get(COMPACTING_MEMSTORE_TYPE_KEY, COMPACTING_MEMSTORE_TYPE_DEFAULT); - switch (t) { - case 1: type = Type.COMPACT_TO_SKIPLIST_MAP; + switch (memStoreType) { + case "index-compaction": action = Action.MERGE; break; - case 2: type = Type.COMPACT_TO_ARRAY_MAP; + case "data-compaction": action = Action.COMPACT; break; - default: throw new RuntimeException("Unknown type " + type); // sanity check + default: + throw new RuntimeException("Unknown memstore type " + memStoreType); // sanity check } // get a snapshot of the list of the segments from the pipeline, @@ -117,7 +128,7 @@ public boolean start() throws IOException { immutCellsNum = versionedList.getNumOfCells(); if (LOG.isDebugEnabled()) { - LOG.debug("Starting the MemStore In-Memory Shrink of type " + type + " for store " + LOG.debug("Starting the In-Memory Compaction for store " + compactingMemStore.getStore().getColumnFamilyName()); } @@ -148,41 +159,41 @@ private void releaseResources() { * returns false if we must compact. If this method returns true we * still need to evaluate the compaction. */ - private boolean shouldFlatten() { - boolean userToFlatten = // the user configurable option to flatten or not to flatten - compactingMemStore.getConfiguration().getBoolean(MEMSTORE_COMPACTOR_FLATTENING, - MEMSTORE_COMPACTOR_FLATTENING_DEFAULT); - if (userToFlatten==false) { - LOG.debug("In-Memory shrink is doing compaction, as user asked to avoid flattening"); - return false; // the user doesn't want to flatten + private Action policy() { + + if (isInterrupted.get()) // if the entire process is interrupted cancel flattening + return Action.NOP; // the compaction also doesn't start when interrupted + + if (action == Action.COMPACT) { // try to compact if it worth it + // check if running the speculative scan (to check the efficiency of compaction) + // is allowed by the user + boolean useSpeculativeScan = + compactingMemStore.getConfiguration().getBoolean(MEMSTORE_COMPACTOR_SPECULATIVE_SCAN, + MEMSTORE_COMPACTOR_SPECULATIVE_SCAN_DEFAULT); + if (useSpeculativeScan==false) { + LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName() + + " is going to be compacted, without compaction-evaluation"); + return Action.COMPACT; // compact without checking the compaction expedience + } + if (worthDoingCompaction()) { + return Action.COMPACT; // compact because it worth it + } } + // compaction shouldn't happen or doesn't worth it // limit the number of the segments in the pipeline int numOfSegments = versionedList.getNumOfSegments(); - if (numOfSegments > 3) { // hard-coded for now as it is going to move to policy - LOG.debug("In-Memory shrink is doing compaction, as there already are " + numOfSegments - + " segments in the compaction pipeline"); - return false; // to avoid "too many open files later", compact now - } - // till here we hvae all the signs that it is possible to flatten, run the speculative scan - // (if allowed by the user) to check the efficiency of compaction - boolean avoidSpeculativeScan = // the user configurable option to avoid the speculative scan - compactingMemStore.getConfiguration().getBoolean(MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN, - MEMSTORE_COMPACTOR_AVOID_SPECULATIVE_SCAN_DEFAULT); - if (avoidSpeculativeScan==true) { - LOG.debug("In-Memory shrink is doing flattening, as user asked to avoid compaction " - + "evaluation"); - return true; // flatten without checking the compaction expedience - } - try { - immutCellsNum = countCellsForCompaction(); - if (immutCellsNum > fraction * versionedList.getNumOfCells()) { - return true; - } - } catch(Exception e) { - return true; + if (numOfSegments > THRESHOLD_PIPELINE_SEGMENTS) { + LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName() + + " is going to be merged, as there are " + numOfSegments + " segments"); + action = Action.MERGE; + return Action.MERGE; // to avoid too many segments, merge now } - return false; + + // if nothing of the above, then just flatten the newly joined segment + LOG.debug("The youngest segment in the in-Memory Compaction Pipeline for store " + + compactingMemStore.getFamilyName() + " is going to be flattened"); + return Action.FLATTEN; } /**---------------------------------------------------------------------- @@ -195,24 +206,28 @@ private void doCompaction() { boolean resultSwapped = false; try { - // PHASE I: estimate the compaction expedience - EVALUATE COMPACTION - if (shouldFlatten()) { - // too much cells "survive" the possible compaction, we do not want to compact! - LOG.debug("In-Memory compaction does not pay off - storing the flattened segment" - + " for store: " + compactingMemStore.getFamilyName()); - // Looking for Segment in the pipeline with SkipList index, to make it flat + Action nextStep = policy(); + switch (nextStep){ + case FLATTEN: // Youngest Segment in the pipeline is with SkipList index, make it flat compactingMemStore.flattenOneSegment(versionedList.getVersion()); + case NOP: // intentionally falling through return; + case MERGE: + case COMPACT: + break; + default: throw new RuntimeException("Unknown action " + action); // sanity check } - // PHASE II: create the new compacted ImmutableSegment - START COPY-COMPACTION + // Create one segment representing all segments in the compaction pipeline, + // either by compaction or by merge if (!isInterrupted.get()) { - result = compact(immutCellsNum); + result = createSubstitution(); } - // Phase III: swap the old compaction pipeline - END COPY-COMPACTION + // Substitute the pipeline with one segment if (!isInterrupted.get()) { - if (resultSwapped = compactingMemStore.swapCompactedSegments(versionedList, result)) { + if (resultSwapped = compactingMemStore.swapCompactedSegments( + versionedList, result, (action==Action.MERGE))) { // update the wal so it can be truncated and not get too long compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater } @@ -229,14 +244,10 @@ private void doCompaction() { } /**---------------------------------------------------------------------- - * The copy-compaction is the creation of the ImmutableSegment (from the relevant type) - * based on the Compactor Iterator. The new ImmutableSegment is returned. + * Creation of the ImmutableSegment either by merge or copy-compact of the segments of the + * pipeline, based on the Compactor Iterator. The new ImmutableSegment is returned. */ - private ImmutableSegment compact(int numOfCells) throws IOException { - - LOG.debug("In-Memory compaction does pay off - The estimated number of cells " - + "after compaction is " + numOfCells + ", while number of cells before is " + versionedList - .getNumOfCells() + ". The fraction of remaining cells should be: " + fraction); + private ImmutableSegment createSubstitution() throws IOException { ImmutableSegment result = null; MemStoreCompactorIterator iterator = @@ -244,17 +255,18 @@ private ImmutableSegment compact(int numOfCells) throws IOException { compactingMemStore.getComparator(), compactionKVMax, compactingMemStore.getStore()); try { - switch (type) { - case COMPACT_TO_SKIPLIST_MAP: + switch (action) { + case COMPACT: result = SegmentFactory.instance().createImmutableSegment( - compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator); + compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, + immutCellsNum, ImmutableSegment.Type.ARRAY_MAP_BASED); break; - case COMPACT_TO_ARRAY_MAP: + case MERGE: result = SegmentFactory.instance().createImmutableSegment( compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, - numOfCells, ImmutableSegment.Type.ARRAY_MAP_BASED); + immutCellsNum, ImmutableSegment.Type.ARRAY_MAP_BASED, versionedList.getStoreSegments()); break; - default: throw new RuntimeException("Unknown type " + type); // sanity check + default: throw new RuntimeException("Unknown action " + action); // sanity check } } finally { iterator.close(); @@ -264,24 +276,35 @@ private ImmutableSegment compact(int numOfCells) throws IOException { } /**---------------------------------------------------------------------- - * Count cells to estimate the efficiency of the future compaction + * Estimate the efficiency of the future compaction */ - private int countCellsForCompaction() throws IOException { + private boolean worthDoingCompaction() { int cnt = 0; - MemStoreCompactorIterator iterator = + MemStoreCompactorIterator iterator = null; + + try { + iterator = new MemStoreCompactorIterator( versionedList.getStoreSegments(), compactingMemStore.getComparator(), compactionKVMax, compactingMemStore.getStore()); - try { while (iterator.next() != null) { cnt++; } + } catch(Exception e) { + return false; } finally { - iterator.close(); + if (iterator!=null) iterator.close(); } - return cnt; + if (cnt >= fraction * versionedList.getNumOfCells()) + return false; + + LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName() + + " is going to be compacted, according to compaction-evaluation, " + cnt + + " cells will remain out of " + versionedList.getNumOfCells()); + immutCellsNum = cnt; + return true ; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java index 2eafb421bd2d..af7f7e45aaa0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorIterator.java @@ -86,7 +86,7 @@ public boolean hasNext() { return false; } } - return (kvsIterator.hasNext() || hasMore); + return kvsIterator.hasNext(); } @Override @@ -129,7 +129,8 @@ private StoreScanner createScanner(Store store, KeyValueScanner scanner) } - + /* Refill kev-value set (should be invoked only when KVS is empty) + * Returns true if KVS is non-empty */ private boolean refillKVS() { kvs.clear(); // clear previous KVS, first initiated in the constructor if (!hasMore) { // if there is nothing expected next in compactingScanner diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java index 6351f1358b9f..75bdf3dbc92c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; import java.io.IOException; +import java.util.LinkedList; /** * A singleton store segment factory. @@ -70,16 +71,39 @@ public MutableSegment createMutableSegment(final Configuration conf, } // create new flat immutable segment from compacting old immutable segment - public ImmutableSegment createImmutableSegment(final Configuration conf, final CellComparator comparator, + public ImmutableSegment createImmutableSegment( + final Configuration conf, final CellComparator comparator, MemStoreCompactorIterator iterator, int numOfCells, ImmutableSegment.Type segmentType) throws IOException { - Preconditions.checkArgument( - segmentType != ImmutableSegment.Type.SKIPLIST_MAP_BASED, "wrong immutable segment type"); + Preconditions.checkArgument(segmentType != ImmutableSegment.Type.SKIPLIST_MAP_BASED, + "wrong immutable segment type"); MemStoreLAB memStoreLAB = getMemStoreLAB(conf); return - new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType); + // the last parameter "false" means not to merge, but to compact the pipeline + // in order to create the new segment + new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, false); } + // create new flat immutable segment from merging old immutable segment + public ImmutableSegment createImmutableSegment( + final Configuration conf, final CellComparator comparator, + MemStoreCompactorIterator iterator, int numOfCells, ImmutableSegment.Type segmentType, + LinkedList segments) + throws IOException { + Preconditions.checkArgument(segmentType != ImmutableSegment.Type.ARRAY_MAP_BASED, + "wrong immutable segment type"); + MemStoreLAB memStoreLAB = getMemStoreLAB(conf); + + for (Segment s: segments){ + ((HeapMemStoreLAB)memStoreLAB).addPooledChunkQueue( + ((HeapMemStoreLAB)s.getMemStoreLAB()).getChunkQueue()); + } + + return + // the last parameter "true" means to merge the compaction pipeline + // in order to create the new segment + new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, true); + } //****** private methods to instantiate concrete store segments **********// private MutableSegment generateMutableSegment( diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index db0205e3a1b5..ebde1ea4a4e9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -508,6 +508,12 @@ public void testPuttingBackChunksWithOpeningScanner() @Test public void testPuttingBackChunksWithOpeningPipelineScanner() throws IOException { + + // set memstore to do data compaction and not to use the speculative scan + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction"); + memstore.getConfiguration().setBoolean("hbase.hregion.compacting.memstore.speculativeScan", + false); + byte[] row = Bytes.toBytes("testrow"); byte[] fam = Bytes.toBytes("testfamily"); byte[] qf1 = Bytes.toBytes("testqualifier1"); @@ -589,6 +595,11 @@ public void testPuttingBackChunksWithOpeningPipelineScanner() @Test public void testCompaction1Bucket() throws IOException { + // set memstore to do data compaction and not to use the speculative scan + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction"); + memstore.getConfiguration().setBoolean("hbase.hregion.compacting.memstore.speculativeScan", + false); + String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4 // test 1 bucket @@ -616,6 +627,11 @@ public void testCompaction1Bucket() throws IOException { @Test public void testCompaction2Buckets() throws IOException { + // set memstore to do data compaction and not to use the speculative scan + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction"); + memstore.getConfiguration().setBoolean( + "hbase.hregion.compacting.memstore.speculativeScan", false); + String[] keys1 = { "A", "A", "B", "C" }; String[] keys2 = { "A", "B", "D" }; @@ -660,6 +676,11 @@ public void testCompaction2Buckets() throws IOException { @Test public void testCompaction3Buckets() throws IOException { + // set memstore to do data compaction and not to use the speculative scan + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "data-compaction"); + memstore.getConfiguration().setBoolean( + "hbase.hregion.compacting.memstore.speculativeScan", false); + String[] keys1 = { "A", "A", "B", "C" }; String[] keys2 = { "A", "B", "D" }; String[] keys3 = { "D", "B", "B" }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java index 1933343b0012..2a9cb1f4e4ae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java @@ -67,7 +67,9 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore compactingSetUp(); Configuration conf = HBaseConfiguration.create(); - conf.setLong("hbase.hregion.compacting.memstore.type", 2); // compact to CellArrayMap + // set memstore to do data compaction and not to use the speculative scan + conf.set("hbase.hregion.compacting.memstore.type", "data-compaction"); + conf.setBoolean("hbase.hregion.compacting.memstore.speculativeScan", false); this.memstore = new CompactingMemStore(conf, CellComparator.COMPARATOR, store, @@ -221,17 +223,16 @@ public void testCompaction3Buckets() throws IOException { } ////////////////////////////////////////////////////////////////////////////// - // Flattening tests + // Merging tests ////////////////////////////////////////////////////////////////////////////// @Test - public void testFlattening() throws IOException { + public void testMerging() throws IOException { String[] keys1 = { "A", "A", "B", "C", "F", "H"}; String[] keys2 = { "A", "B", "D", "G", "I", "J"}; String[] keys3 = { "D", "B", "B", "E" }; - // set flattening to true - memstore.getConfiguration().setBoolean("hbase.hregion.compacting.memstore.flatten", true); + memstore.getConfiguration().set("hbase.hregion.compacting.memstore.type", "index-compaction"); addRowsByKeys(memstore, keys1); @@ -264,7 +265,7 @@ public void testFlattening() throws IOException { for ( Segment s : memstore.getSegments()) { counter += s.getCellsCount(); } - assertEquals(10,counter); + assertEquals(16,counter); MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot ImmutableSegment s = memstore.getSnapshot();