Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void readPartition(PartitionPersistenceServiceProto.ReadPartitionRequest
try {
PartitionByteBuffer partitionByteBuffer = persistenceAdapter.openPartition(fromGrpc(request.getKey()));
try {
builder.setData(ByteString.copyFrom(partitionByteBuffer.getByteBuffer()));
builder.setData(partitionByteBuffer.withRead(ByteString::copyFrom));
} finally {
partitionByteBuffer.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.huebert.iotfsdb.api.ui.service.PlotData;
import org.huebert.iotfsdb.api.ui.service.SearchParser;
import org.huebert.iotfsdb.service.QueryService;
import org.huebert.iotfsdb.service.TimeConverter;
import org.huebert.iotfsdb.stats.CaptureStats;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Controller;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Positive;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.huebert.iotfsdb.IotfsdbProperties;
import org.huebert.iotfsdb.api.schema.PartitionPeriod;
Expand All @@ -25,7 +24,6 @@

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.FileSystem;
Expand Down Expand Up @@ -193,7 +191,10 @@ public PartitionByteBuffer openPartition(@NotNull @Valid PartitionKey key) {
long fileSize = Files.size(path);
FileChannel fileChannel = FileChannel.open(path, openOptions);
MappedByteBuffer byteBuffer = fileChannel.map(readOnly ? READ_ONLY : READ_WRITE, 0, fileSize);
return new FileByteBuffer(fileChannel, byteBuffer);
return PartitionByteBuffer.builder()
.fileChannel(fileChannel)
.byteBuffer(byteBuffer)
.build();
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -225,26 +226,4 @@ private Path getPartitionPath(PartitionKey key) {
return getSeriesRoot(key.seriesId()).resolve(key.partitionId());
}

@AllArgsConstructor
private static class FileByteBuffer implements PartitionByteBuffer {

private final FileChannel fileChannel;

private final MappedByteBuffer byteBuffer;

@Override
public ByteBuffer getByteBuffer() {
return byteBuffer.slice();
}

@Override
public void close() {
try {
fileChannel.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Positive;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.huebert.iotfsdb.api.schema.SeriesFile;
import org.huebert.iotfsdb.service.PartitionKey;
Expand All @@ -31,7 +30,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {

private final SetMultimap<String, PartitionKey> partitionMap = HashMultimap.create();

private final ConcurrentMap<PartitionKey, MemoryPartitionByteBuffer> byteBufferMap = new ConcurrentHashMap<>();
private final ConcurrentMap<PartitionKey, PartitionByteBuffer> byteBufferMap = new ConcurrentHashMap<>();

@PostConstruct
public void postConstruct() {
Expand Down Expand Up @@ -60,8 +59,9 @@ public Set<PartitionKey> getPartitions(@NotNull @Valid SeriesFile seriesFile) {

@Override
public void createPartition(@NotNull @Valid PartitionKey key, @Positive long size) {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect((int) size);
byteBufferMap.put(key, new MemoryPartitionByteBuffer(byteBuffer));
byteBufferMap.put(key, PartitionByteBuffer.builder()
.byteBuffer(ByteBuffer.allocateDirect((int) size))
.build());
}

@Override
Expand All @@ -74,20 +74,4 @@ public void close() {
// Do nothing
}

@AllArgsConstructor
private static class MemoryPartitionByteBuffer implements PartitionByteBuffer {

private final ByteBuffer byteBuffer;

@Override
public ByteBuffer getByteBuffer() {
return byteBuffer.slice();
}

@Override
public void close() {
// Do nothing
}
}

}
Original file line number Diff line number Diff line change
@@ -1,11 +1,51 @@
package org.huebert.iotfsdb.persistence;

import lombok.Builder;
import lombok.Data;
import org.huebert.iotfsdb.service.LockUtil;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;

@Builder
public class PartitionByteBuffer {

private final FileChannel fileChannel;

private final ByteBuffer byteBuffer;

private final ReadWriteLock lock = new ReentrantReadWriteLock();

public <T> T withRead(Function<ByteBuffer, T> function) {
Result<T> result = new Result<>();
LockUtil.withRead(lock, () -> result.setValue(function.apply(byteBuffer)));
return result.getValue();
}

public interface PartitionByteBuffer {
public void withWrite(Consumer<ByteBuffer> consumer) {
LockUtil.withWrite(lock, () -> consumer.accept(byteBuffer));
}

ByteBuffer getByteBuffer();
public void close() {
if (fileChannel != null) {
LockUtil.withWrite(lock, () -> {
try {
fileChannel.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
}

void close();
@Data
private static class Result<T> {
private T value;
}

}
29 changes: 10 additions & 19 deletions src/main/java/org/huebert/iotfsdb/service/CloneService.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@
import org.huebert.iotfsdb.api.schema.SeriesData;
import org.huebert.iotfsdb.api.schema.SeriesDefinition;
import org.huebert.iotfsdb.api.schema.SeriesFile;
import org.huebert.iotfsdb.persistence.PartitionByteBuffer;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;

import java.nio.ByteBuffer;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;

Expand Down Expand Up @@ -78,15 +77,12 @@ public void updateDefinition(@NotBlank String id, @NotNull @Valid SeriesDefiniti
private List<SeriesData> convertPartition(PartitionKey partition) {
PartitionRange range = partitionService.getRange(partition);
List<SeriesData> result = new ArrayList<>();
range.withRead(() -> {
ByteBuffer buffer = dataService.getBuffer(partition).orElseThrow();
ZonedDateTime current = TimeConverter.toUtc(range.getRange().lowerEndpoint());
Iterator<Number> iterator = range.getStream(buffer).iterator();
while (iterator.hasNext()) {
result.add(new SeriesData(current, iterator.next()));
current = current.plus(range.getInterval());
}
});
PartitionByteBuffer buffer = dataService.getBuffer(partition).orElseThrow();
ZonedDateTime current = TimeConverter.toUtc(range.getRange().lowerEndpoint());
for (Number number : range.getStream(buffer)) {
result.add(new SeriesData(current, number));
current = current.plus(range.getInterval());
}
return result;
}

Expand All @@ -113,14 +109,9 @@ private SeriesFile cloneSeriesFile(String sourceId, String destinationId) {
private void clonePartition(PartitionKey sourceKey, String destinationId) {
PartitionKey destinationKey = new PartitionKey(destinationId, sourceKey.partitionId());
PartitionRange sourceRange = partitionService.getRange(sourceKey);
PartitionRange destinationRange = partitionService.getRange(destinationKey);
sourceRange.withRead(() -> {
ByteBuffer sourceBuffer = dataService.getBuffer(sourceKey).orElseThrow();
destinationRange.withWrite(() -> {
ByteBuffer destinationBuffer = dataService.getBuffer(destinationKey, sourceRange.getSize(), sourceRange.getAdapter());
destinationBuffer.put(sourceBuffer);
});
});
PartitionByteBuffer sourceBuffer = dataService.getBuffer(sourceKey).orElseThrow();
PartitionByteBuffer destinationBuffer = dataService.getBuffer(destinationKey, sourceRange.getSize(), sourceRange.getAdapter());
destinationBuffer.withWrite(db -> sourceBuffer.withRead(db::put));
}

}
18 changes: 9 additions & 9 deletions src/main/java/org/huebert/iotfsdb/service/DataService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -92,32 +91,33 @@ public Set<PartitionKey> getPartitions(@NotBlank String seriesId) {
return seriesPartitions.getOrDefault(seriesId, Set.of());
}

public Optional<ByteBuffer> getBuffer(@Valid @NotNull PartitionKey key) {
public Optional<PartitionByteBuffer> getBuffer(@Valid @NotNull PartitionKey key) {
if (partitionNotExists(key)) {
return Optional.empty();
}
return Optional.of(partitionCache.getUnchecked(key).getByteBuffer());
return Optional.of(partitionCache.getUnchecked(key));
}

public ByteBuffer getBuffer(@Valid @NotNull PartitionKey key, @NotNull @Positive Long size, @NotNull PartitionAdapter adapter) {
public PartitionByteBuffer getBuffer(@Valid @NotNull PartitionKey key, @NotNull @Positive Long size, @NotNull PartitionAdapter adapter) {
if (partitionNotExists(key)) {
LockUtil.withLock(stripedLocks.get(key.seriesId()), () -> {
if (partitionNotExists(key)) {

persistenceAdapter.createPartition(key, adapter.getTypeSize() * size);

PartitionByteBuffer partitionByteBuffer = persistenceAdapter.openPartition(key);
ByteBuffer byteBuffer = partitionByteBuffer.getByteBuffer();
for (int i = 0; i < size; i++) {
adapter.put(byteBuffer, i, null);
}
partitionByteBuffer.withWrite(b -> {
for (int i = 0; i < size; i++) {
adapter.put(b, i, null);
}
});
partitionCache.put(key, partitionByteBuffer);

seriesPartitions.computeIfAbsent(key.seriesId(), k -> ConcurrentHashMap.newKeySet()).add(key);
}
});
}
return partitionCache.getUnchecked(key).getByteBuffer();
return partitionCache.getUnchecked(key);
}

private boolean partitionNotExists(PartitionKey key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private void writeSeriesFileToZip(ZipOutputStream zos, SeriesFile seriesFile) th

private void writePartitionsToZip(ZipOutputStream zos, String seriesId) throws IOException {
for (PartitionKey key : dataService.getPartitions(seriesId)) {
partitionService.getRange(key).withRead(() -> addToZip(zos, key.seriesId(), key.partitionId(), dataService.getBuffer(key).map(ByteBuffer::array).orElseThrow()));
addToZip(zos, key.seriesId(), key.partitionId(), dataService.getBuffer(key).map(pbb -> pbb.withRead(ByteBuffer::array)).orElseThrow());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private static List<SeriesData> convertPartition(PersistenceAdapter adapter, Par
try {
List<SeriesData> result = new ArrayList<>();
ZonedDateTime current = TimeConverter.toUtc(partitionRange.getRange().lowerEndpoint());
Iterator<Number> iterator = partitionRange.getStream(pbb.getByteBuffer()).iterator();
Iterator<Number> iterator = partitionRange.getStream(pbb).iterator();
while (iterator.hasNext()) {
Number value = iterator.next();
if (value != null) {
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/org/huebert/iotfsdb/service/InsertService.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
import org.huebert.iotfsdb.api.schema.SeriesDefinition;
import org.huebert.iotfsdb.api.schema.SeriesFile;
import org.huebert.iotfsdb.partition.PartitionAdapter;
import org.huebert.iotfsdb.persistence.PartitionByteBuffer;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;

import java.nio.ByteBuffer;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -74,19 +74,19 @@ public void insert(@Valid @NotNull InsertRequest request) {
private void insertIntoPartition(PartitionKey key, List<SeriesData> data, Collector<Number, ?, Number> collector) {
PartitionRange details = partitionService.getRange(key);
PartitionAdapter adapter = details.getAdapter();
details.withWrite(() -> {
ByteBuffer buffer = dataService.getBuffer(key, details.getSize(), adapter);
PartitionByteBuffer buffer = dataService.getBuffer(key, details.getSize(), adapter);
buffer.withWrite(b -> {
for (SeriesData value : data) {
LocalDateTime local = TimeConverter.toUtc(value.getTime());
int index = details.getIndex(local);
Number putValue = value.getValue();
if (collector != null) {
putValue = Stream.concat(
adapter.getStream(buffer, index, 1),
adapter.getStream(b, index, 1),
Stream.of(putValue)
).collect(collector);
}
adapter.put(buffer, index, putValue);
adapter.put(b, index, putValue);
}
});
}
Expand Down
Loading
Loading