Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/main/java/com/upserve/uppend/BlockedLongs.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.util.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.Lock;
import java.util.function.Supplier;
import java.util.stream.*;

public class BlockedLongs implements AutoCloseable, Flushable {
Expand Down Expand Up @@ -214,7 +213,14 @@ public LongStream values(Long pos){
return LongStream.empty();
}

return Arrays.stream(valuesArray(pos));
long[] longs = valuesArray(pos);


return Arrays.stream(longs);

// return StreamSupport.longStream(
// Spliterators.spliterator(longs, 0, longs.length, Spliterator.CONCURRENT | Spliterator.IMMUTABLE |Spliterator.NONNULL),
// true);
}

public long[] valuesArray(Long pos) {
Expand Down Expand Up @@ -259,6 +265,8 @@ public long[] valuesArray(Long pos) {
}
}

// Lazy values is much slower in Performance tests with a large number of blocks.
// Might be workable with a custom spliterator that was block aware for parallelization
public LongStream lazyValues(Long pos) {
log.trace("streaming values from {} at {}", file, pos);

Expand Down
22 changes: 11 additions & 11 deletions src/main/java/com/upserve/uppend/blobs/MappedPage.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,23 @@
import static java.lang.Integer.min;

/**
* Mapped Byte Buffer backed implementation of Page
* {@link ByteBuffer} backed implementation of {@link Page}
* The buffer used in the constructor must be thread local
* Buffer are not thread safe!
*/
public class MappedPage implements Page {
private final MappedByteBuffer buffer;
private final ByteBuffer buffer;
private final int pageSize;
private final int startingPosition;

/**
* Constructor for a MappedPage
*
* @param buffer the mapped byte buffer representing a page of a file
* @param buffer a {@link ThreadLocal} {@link ByteBuffer} (mapped from a file) containing a {@link Page} of a {@link VirtualPageFile}
* @param startingPosition the starting offset in a larger buffer
* @param pageSize the size of the page to create
*/
public MappedPage(MappedByteBuffer buffer, int startingPosition, int pageSize) {
public MappedPage(ByteBuffer buffer, int startingPosition, int pageSize) {
this.pageSize = pageSize;
this.buffer = buffer;
this.startingPosition = startingPosition;
Expand All @@ -30,7 +32,7 @@ public MappedPage(MappedByteBuffer buffer, int startingPosition, int pageSize) {
*
* @param buffer the mapped byte buffer representing a page of a file
*/
public MappedPage(MappedByteBuffer buffer) {
public MappedPage(ByteBuffer buffer) {
this(buffer, 0, buffer.capacity());
}

Expand All @@ -42,9 +44,8 @@ public int get(int pagePosition, byte[] dst, int bufferOffset) {
final int actualRead = min(desiredRead, availableToRead);

// Make a local buffer with local position
ByteBuffer localBuffer = buffer.duplicate();
localBuffer.position(pagePosition + startingPosition);
localBuffer.get(dst, bufferOffset, actualRead);
buffer.position(pagePosition + startingPosition);
buffer.get(dst, bufferOffset, actualRead);

return actualRead;
}
Expand All @@ -56,9 +57,8 @@ public int put(int pagePosition, byte[] src, int bufferOffset) {
final int actualWrite = min(desiredWrite, availableToWrite);

// Make a local buffer with local position
ByteBuffer localBuffer = buffer.duplicate();
localBuffer.position(pagePosition + startingPosition);
localBuffer.put(src, bufferOffset, actualWrite);
buffer.position(pagePosition + startingPosition);
buffer.put(src, bufferOffset, actualWrite);

return actualWrite;
}
Expand Down
36 changes: 22 additions & 14 deletions src/main/java/com/upserve/uppend/blobs/VirtualPageFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class VirtualPageFile implements Closeable {
private static final int PAGE_TABLE_SIZE = 1000;

private static final int MAX_BUFFERS = 1024 * 64; // 128 TB per partition for 2Gb Bufffers
private final MappedByteBuffer[] mappedByteBuffers;
private final ThreadLocal[] localMappedByteBuffers;
private final int bufferSize;

private final Path filePath;
Expand Down Expand Up @@ -96,8 +96,7 @@ public Path getFilePath() {
@Override
public void close() throws IOException {
if (!channel.isOpen()) return;
Arrays.fill(mappedByteBuffers, null);

Arrays.fill(localMappedByteBuffers, null);
if (!readOnly) {
channel.truncate(nextPagePosition.get());
}
Expand Down Expand Up @@ -239,7 +238,7 @@ MappedPage mappedPage(long startPosition) {
final int mapIndex = (int) (postHeaderPosition / bufferSize);
final int mapPosition = (int) (postHeaderPosition % bufferSize);

MappedByteBuffer bigbuffer = ensureBuffered(mapIndex);
ByteBuffer bigbuffer = ensureBuffered(mapIndex);

return new MappedPage(bigbuffer, mapPosition, pageSize);
}
Expand All @@ -262,7 +261,7 @@ public VirtualPageFile(Path filePath, int virtualFiles, int pageSize, int target
this.virtualFiles = virtualFiles;
this.pageSize = pageSize;

this.mappedByteBuffers = new MappedByteBuffer[MAX_BUFFERS];
this.localMappedByteBuffers = new ThreadLocal[MAX_BUFFERS];

if (targetBufferSize < (pageSize)) throw new IllegalArgumentException("Target buffer size " + targetBufferSize + " must be larger than a page " + pageSize);

Expand Down Expand Up @@ -460,22 +459,28 @@ private void allocatePage(int virtualFileNumber, int currentPageCount, int pageN
pageAllocationCount.add(pagesToAllocate);
}

private MappedByteBuffer ensureBuffered(int bufferIndex) {
MappedByteBuffer buffer = mappedByteBuffers[bufferIndex];
if (buffer == null) {
synchronized (mappedByteBuffers) {
buffer = mappedByteBuffers[bufferIndex];
if (buffer == null) {
@SuppressWarnings("unchecked")
private ByteBuffer ensureBuffered(int bufferIndex) {
ThreadLocal local = localMappedByteBuffers[bufferIndex];
if (local == null) {
synchronized (localMappedByteBuffers) {
local = localMappedByteBuffers[bufferIndex];
if (local == null) {
long bufferStart = ((long) bufferIndex * bufferSize) + totalHeaderSize;
try {
buffer = channel.map(FileChannel.MapMode.READ_ONLY, bufferStart, bufferSize);
ByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, bufferStart, bufferSize);
local = ThreadLocal.withInitial(buffer::duplicate);
local.set(buffer);
localMappedByteBuffers[bufferIndex] = local;
} catch (IOException e) {
throw new UncheckedIOException("Unable to map buffer for index " + bufferIndex + " at (" + bufferStart + " start position) in file " + filePath, e);
}
mappedByteBuffers[bufferIndex] = buffer;
}
}
}

ByteBuffer buffer = (ByteBuffer) local.get();

return buffer;
}

Expand Down Expand Up @@ -513,6 +518,7 @@ private LongBuffer ensurePageTable(int pageNumber) {
}

// Called during initialize only - no need to synchronize
@SuppressWarnings("unchecked")
private void preloadBuffers(long nextPagePosition){
for (int bufferIndex=0; bufferIndex<MAX_BUFFERS; bufferIndex++){
long bufferStart = ((long) bufferIndex * bufferSize) + totalHeaderSize;
Expand All @@ -521,7 +527,9 @@ private void preloadBuffers(long nextPagePosition){

try {
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, bufferStart, bufferSize);
mappedByteBuffers[bufferIndex] = buffer;
ThreadLocal local = ThreadLocal.withInitial(buffer::duplicate);
local.set(buffer);
localMappedByteBuffers[bufferIndex] = local;
} catch (IOException e) {
throw new UncheckedIOException("Unable to preload mapped buffer for index " + bufferIndex + " at (" + bufferStart + " start position) in file " + filePath, e);
}
Expand Down
31 changes: 3 additions & 28 deletions src/test/java/com/upserve/uppend/AppendOnlyStoreTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.concurrent.*;
import java.util.stream.Collectors;

import static com.upserve.uppend.TestHelper.genBytes;
import static org.junit.Assert.*;

public class AppendOnlyStoreTest {
Expand Down Expand Up @@ -389,37 +390,11 @@ private void tester(int number, int size) {

assertEquals(inputBytes.size(), outputBytes.size());

inputBytes.sort(AppendOnlyStoreTest::compareByteArrays);
outputBytes.sort(AppendOnlyStoreTest::compareByteArrays);
inputBytes.sort(TestHelper::compareByteArrays);
outputBytes.sort(TestHelper::compareByteArrays);

for (int i = 0; i < number; i++) {
assertArrayEquals("input and output byte arrays differ at index " + i, inputBytes.get(i), outputBytes.get(i));
}
}

private static int compareByteArrays(byte[] o1, byte[] o2) {
if (o1 == null) {
if (o2 == null) {
return 0;
}
return -1;
}
if (o2 == null) {
return 1;
}
for (int i = 0; i < o1.length && i < o2.length; i++) {
int v1 = 0xff & o1[i];
int v2 = 0xff & o2[i];
if (v1 != v2) {
return v1 < v2 ? -1 : 1;
}
}
return Integer.compare(o1.length, o2.length);
}

private byte[] genBytes(int len) {
byte[] bytes = new byte[len];
new Random().nextBytes(bytes);
return bytes;
}
}
28 changes: 28 additions & 0 deletions src/test/java/com/upserve/uppend/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import org.slf4j.*;

import java.lang.reflect.*;
import java.util.Random;
import java.util.concurrent.*;

public class TestHelper {

public static void resetLogger(Class clazz, String fieldName) throws Exception {
setLogger(clazz, fieldName, LoggerFactory.getLogger(clazz));
}
Expand Down Expand Up @@ -43,4 +45,30 @@ public static CounterStoreBuilder getDefaultCounterStoreTestBuilder() {
.withLookupPageSize(16 * 1024);
}

public static int compareByteArrays(byte[] o1, byte[] o2) {
if (o1 == null) {
if (o2 == null) {
return 0;
}
return -1;
}
if (o2 == null) {
return 1;
}
for (int i = 0; i < o1.length && i < o2.length; i++) {
int v1 = 0xff & o1[i];
int v2 = 0xff & o2[i];
if (v1 != v2) {
return v1 < v2 ? -1 : 1;
}
}
return Integer.compare(o1.length, o2.length);
}

public static byte[] genBytes(int len) {
byte[] bytes = new byte[len];
ThreadLocalRandom.current().nextBytes(bytes);
return bytes;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.upserve.uppend.performance;

import org.junit.*;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.*;
import java.util.stream.*;

import static com.upserve.uppend.performance.StreamTimerMethods.*;

// Parallel is slower - usually about 2X, but variable!
public class ArrayLongStreamTest {

private static final int values = 10_000_000;

private final long[] longs = new long[values];

private final int repeats = 5;

private final Supplier<LongStream> parallelStream = () -> Arrays.stream(longs).parallel();
private final Supplier<LongStream> sequentialStream = () -> Arrays.stream(longs).sequential();

@Before
public void loadStore() {
Arrays.setAll(longs, (v) -> ThreadLocalRandom.current().nextLong(0, 512));
long val = Arrays.stream(longs, 0, 100).parallel().sum();
}

@Test
public void sumTest() {
for (int i=0; i<repeats; i++) {
parallelTime(sum(parallelStream));
sequentialTime(sum(sequentialStream));
}
}

@Test
public void forEachAdderTest() {
for (int i=0; i<repeats; i++) {
parallelTime(forEachAdder(parallelStream));
sequentialTime(forEachAdder(sequentialStream));
}
}

@Test
public void groupByCountingTest() {
for (int i=0; i<repeats; i++) {
parallelTime(groupByCounting(parallelStream, false));
sequentialTime(groupByCounting(sequentialStream, false));
}
}

@Test
public void concurrentGroupByCountingTest() {
for (int i=0; i<repeats; i++) {
parallelTime(groupByCounting(parallelStream, true));
sequentialTime(groupByCounting(sequentialStream, true));
}
}



@Test
public void flatMapTest(){
for (int i=0; i<repeats; i++) {
parallelTime(flatMapSum(LongStream.of(1,2,3,4,5,6,7,8,9,10).flatMap(val -> Arrays.stream(longs)).parallel()));
sequentialTime(flatMapSum(LongStream.of(1,2,3,4,5,6,7,8,9,10).flatMap(val -> Arrays.stream(longs)).sequential()));
}
}
}
Loading