Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ public List<Segment> getSegments() {
return list;
}

public boolean swapCompactedSegments(VersionedSegmentsList versionedList,
ImmutableSegment result) {
return pipeline.swap(versionedList, result);
public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result,
boolean merge) {
return pipeline.swap(versionedList, result, merge);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ public VersionedSegmentsList getVersionedList() {
* Swapping only if there were no changes to the suffix of the list while it was compacted.
* @param versionedList tail of the pipeline that was compacted
* @param segment new compacted segment
* @param merge
* @return true iff swapped tail with new compacted segment
*/
public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment) {
public boolean swap(
VersionedSegmentsList versionedList, ImmutableSegment segment, boolean merge) {
if (versionedList.getVersion() != version) {
return false;
}
Expand All @@ -101,13 +103,14 @@ public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segmen
+ versionedList.getStoreSegments().size()
+ ", and the number of cells in new segment is:" + segment.getCellsCount());
}
swapSuffix(suffix,segment);
swapSuffix(suffix,segment, merge);
}
if (region != null) {
// update the global memstore size counter
long suffixSize = CompactingMemStore.getSegmentsSize(suffix);
long newSize = CompactingMemStore.getSegmentSize(segment);
long delta = suffixSize - newSize;
if (merge) assert(delta==0); // sanity check
long globalMemstoreSize = region.addAndGetGlobalMemstoreSize(-delta);
if (LOG.isDebugEnabled()) {
LOG.debug("Suffix size: " + suffixSize + " compacted item size: " + newSize
Expand Down Expand Up @@ -189,10 +192,13 @@ public long getTailSize() {
return CompactingMemStore.getSegmentSize(pipeline.peekLast());
}

private void swapSuffix(LinkedList<ImmutableSegment> suffix, ImmutableSegment segment) {
private void swapSuffix(LinkedList<ImmutableSegment> suffix, ImmutableSegment segment,
boolean merge) {
version++;
for(Segment itemInSuffix : suffix) {
itemInSuffix.close();
if (!merge) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having the merge flag affecting the swap suffix seems wrong
applying the close in the outer scope seems to be better.
this change could be deferred to the policy jira, but if there's no policy jira, should do it here.

for (Segment itemInSuffix : suffix) {
itemInSuffix.close();
}
}
pipeline.removeAll(suffix);
pipeline.addLast(segment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class HeapMemStoreLAB implements MemStoreLAB {

private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
// A queue of chunks from pool contained by this memstore LAB
// TODO: in the future, it would be better to have List implementation instead of Queue,
// as FIFO order is not so important here
@VisibleForTesting
BlockingQueue<PooledChunk> pooledChunkQueue = null;
private final int chunkSize;
Expand Down Expand Up @@ -106,6 +108,13 @@ public HeapMemStoreLAB(Configuration conf) {
MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
}

/**
* To be used for merging multiple MSLABs
*/
public void addPooledChunkQueue(BlockingQueue<PooledChunk> targetQueue) {
targetQueue.drainTo(pooledChunkQueue);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the complexity here O(#items in queue)? How many items in queue, roughly? Could this become a bottleneck in merging/compaction?
Not important for this patch but maybe add a todo comment to change this to a set or list instead of queue

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The complexity is O(#items in targetQueue). Can not say how many items are in the queue as it depends on how many chunks we have per segment and this can vary. But I don't believe there are so many chunks that this would become a bottleneck, however this is for us to see later with stress benchmarking...

Added the comment

}

/**
* Allocate a slice of the given length.
*
Expand Down Expand Up @@ -242,8 +251,8 @@ Chunk getCurrentChunk() {
return this.curChunk.get();
}

@VisibleForTesting
BlockingQueue<PooledChunk> getChunkQueue() {

public BlockingQueue<PooledChunk> getChunkQueue() {
return this.pooledChunkQueue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ protected ImmutableSegment(Segment segment) {
* are going to be introduced.
*/
protected ImmutableSegment(CellComparator comparator, MemStoreCompactorIterator iterator,
MemStoreLAB memStoreLAB, int numOfCells, Type type) {
MemStoreLAB memStoreLAB, int numOfCells, Type type, boolean doMerge) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all these doMerge flags should be part of the policy state and should not be passed throughout the code.
@sanastas this is not an AI for you, just a statement.


super(null, // initiailize the CellSet with NULL
comparator, memStoreLAB,
Expand All @@ -88,7 +88,7 @@ protected ImmutableSegment(CellComparator comparator, MemStoreCompactorIterator
ClassSize.CELL_ARRAY_MAP_ENTRY);

// build the true CellSet based on CellArrayMap
CellSet cs = createCellArrayMapSet(numOfCells, iterator);
CellSet cs = createCellArrayMapSet(numOfCells, iterator, doMerge);

this.setCellSet(null, cs); // update the CellSet of the new Segment
this.type = type;
Expand Down Expand Up @@ -194,19 +194,26 @@ public boolean flatten() {
///////////////////// PRIVATE METHODS /////////////////////
/*------------------------------------------------------------------------*/
// Create CellSet based on CellArrayMap from compacting iterator
private CellSet createCellArrayMapSet(int numOfCells, MemStoreCompactorIterator iterator) {
private CellSet createCellArrayMapSet(int numOfCells, MemStoreCompactorIterator iterator,
boolean doMerge) {

Cell[] cells = new Cell[numOfCells]; // build the Cell Array
int i = 0;
while (iterator.hasNext()) {
Cell c = iterator.next();
// The scanner behind the iterator is doing all the elimination logic
// now we just copy it to the new segment (also MSLAB copy)
cells[i] = maybeCloneWithAllocator(c);
boolean usedMSLAB = (cells[i] != c);
if (doMerge) {
// if this is merge we just move the Cell object without copying MSLAB
// the sizes still need to be updated in the new segment
cells[i] = c;
} else {
// now we just copy it to the new segment (also MSLAB copy)
cells[i] = maybeCloneWithAllocator(c);
}
boolean useMSLAB = (getMemStoreLAB()!=null);
// second parameter true, because in compaction addition of the cell to new segment
// is always successful
updateMetaInfo(c, true, usedMSLAB); // updates the size per cell
updateMetaInfo(c, true, useMSLAB); // updates the size per cell
i++;
}
// build the immutable CellSet
Expand Down
Loading