From 3e8d940afaeae0dce020c069cbda9fe71d0e3180 Mon Sep 17 00:00:00 2001 From: Jason Huebert Date: Wed, 23 Jul 2025 11:17:35 -0500 Subject: [PATCH 1/2] feat: Refactor partition buffer handling with new PartitionByteBuffer class --- .../internal/PartitionPersistenceService.java | 2 +- .../persistence/FilePersistenceAdapter.java | 29 ++---------- .../persistence/MemoryPersistenceAdapter.java | 24 ++-------- .../persistence/PartitionByteBuffer.java | 46 +++++++++++++++++-- .../huebert/iotfsdb/service/CloneService.java | 10 ++-- .../huebert/iotfsdb/service/DataService.java | 18 ++++---- .../iotfsdb/service/ExportService.java | 2 +- .../iotfsdb/service/ImportService.java | 2 +- .../iotfsdb/service/InsertService.java | 28 +++++------ .../iotfsdb/service/PartitionRange.java | 12 ++--- .../huebert/iotfsdb/service/QueryService.java | 2 +- 11 files changed, 90 insertions(+), 85 deletions(-) diff --git a/src/main/java/org/huebert/iotfsdb/api/grpc/internal/PartitionPersistenceService.java b/src/main/java/org/huebert/iotfsdb/api/grpc/internal/PartitionPersistenceService.java index 00ccabd0..7ed5e72a 100644 --- a/src/main/java/org/huebert/iotfsdb/api/grpc/internal/PartitionPersistenceService.java +++ b/src/main/java/org/huebert/iotfsdb/api/grpc/internal/PartitionPersistenceService.java @@ -78,7 +78,7 @@ public void readPartition(PartitionPersistenceServiceProto.ReadPartitionRequest try { PartitionByteBuffer partitionByteBuffer = persistenceAdapter.openPartition(fromGrpc(request.getKey())); try { - builder.setData(ByteString.copyFrom(partitionByteBuffer.getByteBuffer())); + builder.setData(partitionByteBuffer.withRead(ByteString::copyFrom)); } finally { partitionByteBuffer.close(); } diff --git a/src/main/java/org/huebert/iotfsdb/persistence/FilePersistenceAdapter.java b/src/main/java/org/huebert/iotfsdb/persistence/FilePersistenceAdapter.java index 90e24d91..8e2ab024 100644 --- a/src/main/java/org/huebert/iotfsdb/persistence/FilePersistenceAdapter.java +++ b/src/main/java/org/huebert/iotfsdb/persistence/FilePersistenceAdapter.java @@ -10,7 +10,6 @@ import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.Positive; -import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.huebert.iotfsdb.IotfsdbProperties; import org.huebert.iotfsdb.api.schema.PartitionPeriod; @@ -25,7 +24,6 @@ import java.io.IOException; import java.io.RandomAccessFile; -import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.FileSystem; @@ -193,7 +191,10 @@ public PartitionByteBuffer openPartition(@NotNull @Valid PartitionKey key) { long fileSize = Files.size(path); FileChannel fileChannel = FileChannel.open(path, openOptions); MappedByteBuffer byteBuffer = fileChannel.map(readOnly ? READ_ONLY : READ_WRITE, 0, fileSize); - return new FileByteBuffer(fileChannel, byteBuffer); + return PartitionByteBuffer.builder() + .fileChannel(fileChannel) + .byteBuffer(byteBuffer) + .build(); } catch (IOException e) { throw new RuntimeException(e); } @@ -225,26 +226,4 @@ private Path getPartitionPath(PartitionKey key) { return getSeriesRoot(key.seriesId()).resolve(key.partitionId()); } - @AllArgsConstructor - private static class FileByteBuffer implements PartitionByteBuffer { - - private final FileChannel fileChannel; - - private final MappedByteBuffer byteBuffer; - - @Override - public ByteBuffer getByteBuffer() { - return byteBuffer.slice(); - } - - @Override - public void close() { - try { - fileChannel.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - } diff --git a/src/main/java/org/huebert/iotfsdb/persistence/MemoryPersistenceAdapter.java b/src/main/java/org/huebert/iotfsdb/persistence/MemoryPersistenceAdapter.java index 2a8795b2..a32acd6e 100644 --- a/src/main/java/org/huebert/iotfsdb/persistence/MemoryPersistenceAdapter.java +++ b/src/main/java/org/huebert/iotfsdb/persistence/MemoryPersistenceAdapter.java @@ -7,7 +7,6 @@ import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.Positive; -import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.huebert.iotfsdb.api.schema.SeriesFile; import org.huebert.iotfsdb.service.PartitionKey; @@ -31,7 +30,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter { private final SetMultimap partitionMap = HashMultimap.create(); - private final ConcurrentMap byteBufferMap = new ConcurrentHashMap<>(); + private final ConcurrentMap byteBufferMap = new ConcurrentHashMap<>(); @PostConstruct public void postConstruct() { @@ -60,8 +59,9 @@ public Set getPartitions(@NotNull @Valid SeriesFile seriesFile) { @Override public void createPartition(@NotNull @Valid PartitionKey key, @Positive long size) { - ByteBuffer byteBuffer = ByteBuffer.allocateDirect((int) size); - byteBufferMap.put(key, new MemoryPartitionByteBuffer(byteBuffer)); + byteBufferMap.put(key, PartitionByteBuffer.builder() + .byteBuffer(ByteBuffer.allocateDirect((int) size)) + .build()); } @Override @@ -74,20 +74,4 @@ public void close() { // Do nothing } - @AllArgsConstructor - private static class MemoryPartitionByteBuffer implements PartitionByteBuffer { - - private final ByteBuffer byteBuffer; - - @Override - public ByteBuffer getByteBuffer() { - return byteBuffer.slice(); - } - - @Override - public void close() { - // Do nothing - } - } - } diff --git a/src/main/java/org/huebert/iotfsdb/persistence/PartitionByteBuffer.java b/src/main/java/org/huebert/iotfsdb/persistence/PartitionByteBuffer.java index 956a6439..9b4399ff 100644 --- a/src/main/java/org/huebert/iotfsdb/persistence/PartitionByteBuffer.java +++ b/src/main/java/org/huebert/iotfsdb/persistence/PartitionByteBuffer.java @@ -1,11 +1,51 @@ package org.huebert.iotfsdb.persistence; +import lombok.Builder; +import lombok.Data; +import org.huebert.iotfsdb.service.LockUtil; + +import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; +import java.util.function.Function; + +@Builder +public class PartitionByteBuffer { + + private final FileChannel fileChannel; + + private final ByteBuffer byteBuffer; + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + public T withRead(Function function) { + Result result = new Result<>(); + LockUtil.withRead(lock, () -> result.setValue(function.apply(byteBuffer))); + return result.getValue(); + } -public interface PartitionByteBuffer { + public void withWrite(Consumer consumer) { + LockUtil.withWrite(lock, () -> consumer.accept(byteBuffer)); + } - ByteBuffer getByteBuffer(); + public void close() { + if (fileChannel != null) { + LockUtil.withWrite(lock, () -> { + try { + fileChannel.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + } - void close(); + @Data + private static class Result { + private T value; + } } diff --git a/src/main/java/org/huebert/iotfsdb/service/CloneService.java b/src/main/java/org/huebert/iotfsdb/service/CloneService.java index e8050b46..845ed94a 100644 --- a/src/main/java/org/huebert/iotfsdb/service/CloneService.java +++ b/src/main/java/org/huebert/iotfsdb/service/CloneService.java @@ -9,10 +9,10 @@ import org.huebert.iotfsdb.api.schema.SeriesData; import org.huebert.iotfsdb.api.schema.SeriesDefinition; import org.huebert.iotfsdb.api.schema.SeriesFile; +import org.huebert.iotfsdb.persistence.PartitionByteBuffer; import org.springframework.stereotype.Service; import org.springframework.validation.annotation.Validated; -import java.nio.ByteBuffer; import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.HashMap; @@ -79,7 +79,7 @@ private List convertPartition(PartitionKey partition) { PartitionRange range = partitionService.getRange(partition); List result = new ArrayList<>(); range.withRead(() -> { - ByteBuffer buffer = dataService.getBuffer(partition).orElseThrow(); + PartitionByteBuffer buffer = dataService.getBuffer(partition).orElseThrow(); ZonedDateTime current = TimeConverter.toUtc(range.getRange().lowerEndpoint()); Iterator iterator = range.getStream(buffer).iterator(); while (iterator.hasNext()) { @@ -115,10 +115,10 @@ private void clonePartition(PartitionKey sourceKey, String destinationId) { PartitionRange sourceRange = partitionService.getRange(sourceKey); PartitionRange destinationRange = partitionService.getRange(destinationKey); sourceRange.withRead(() -> { - ByteBuffer sourceBuffer = dataService.getBuffer(sourceKey).orElseThrow(); + PartitionByteBuffer sourceBuffer = dataService.getBuffer(sourceKey).orElseThrow(); destinationRange.withWrite(() -> { - ByteBuffer destinationBuffer = dataService.getBuffer(destinationKey, sourceRange.getSize(), sourceRange.getAdapter()); - destinationBuffer.put(sourceBuffer); + PartitionByteBuffer destinationBuffer = dataService.getBuffer(destinationKey, sourceRange.getSize(), sourceRange.getAdapter()); + destinationBuffer.withWrite(db -> sourceBuffer.withRead(db::put)); }); }); } diff --git a/src/main/java/org/huebert/iotfsdb/service/DataService.java b/src/main/java/org/huebert/iotfsdb/service/DataService.java index 12ef36e7..15d2bab7 100644 --- a/src/main/java/org/huebert/iotfsdb/service/DataService.java +++ b/src/main/java/org/huebert/iotfsdb/service/DataService.java @@ -18,7 +18,6 @@ import org.springframework.stereotype.Service; import org.springframework.validation.annotation.Validated; -import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -92,14 +91,14 @@ public Set getPartitions(@NotBlank String seriesId) { return seriesPartitions.getOrDefault(seriesId, Set.of()); } - public Optional getBuffer(@Valid @NotNull PartitionKey key) { + public Optional getBuffer(@Valid @NotNull PartitionKey key) { if (partitionNotExists(key)) { return Optional.empty(); } - return Optional.of(partitionCache.getUnchecked(key).getByteBuffer()); + return Optional.of(partitionCache.getUnchecked(key)); } - public ByteBuffer getBuffer(@Valid @NotNull PartitionKey key, @NotNull @Positive Long size, @NotNull PartitionAdapter adapter) { + public PartitionByteBuffer getBuffer(@Valid @NotNull PartitionKey key, @NotNull @Positive Long size, @NotNull PartitionAdapter adapter) { if (partitionNotExists(key)) { LockUtil.withLock(stripedLocks.get(key.seriesId()), () -> { if (partitionNotExists(key)) { @@ -107,17 +106,18 @@ public ByteBuffer getBuffer(@Valid @NotNull PartitionKey key, @NotNull @Positive persistenceAdapter.createPartition(key, adapter.getTypeSize() * size); PartitionByteBuffer partitionByteBuffer = persistenceAdapter.openPartition(key); - ByteBuffer byteBuffer = partitionByteBuffer.getByteBuffer(); - for (int i = 0; i < size; i++) { - adapter.put(byteBuffer, i, null); - } + partitionByteBuffer.withWrite(b -> { + for (int i = 0; i < size; i++) { + adapter.put(b, i, null); + } + }); partitionCache.put(key, partitionByteBuffer); seriesPartitions.computeIfAbsent(key.seriesId(), k -> ConcurrentHashMap.newKeySet()).add(key); } }); } - return partitionCache.getUnchecked(key).getByteBuffer(); + return partitionCache.getUnchecked(key); } private boolean partitionNotExists(PartitionKey key) { diff --git a/src/main/java/org/huebert/iotfsdb/service/ExportService.java b/src/main/java/org/huebert/iotfsdb/service/ExportService.java index d0a5c266..b2c3784b 100644 --- a/src/main/java/org/huebert/iotfsdb/service/ExportService.java +++ b/src/main/java/org/huebert/iotfsdb/service/ExportService.java @@ -56,7 +56,7 @@ private void writeSeriesFileToZip(ZipOutputStream zos, SeriesFile seriesFile) th private void writePartitionsToZip(ZipOutputStream zos, String seriesId) throws IOException { for (PartitionKey key : dataService.getPartitions(seriesId)) { - partitionService.getRange(key).withRead(() -> addToZip(zos, key.seriesId(), key.partitionId(), dataService.getBuffer(key).map(ByteBuffer::array).orElseThrow())); + partitionService.getRange(key).withRead(() -> addToZip(zos, key.seriesId(), key.partitionId(), dataService.getBuffer(key).map(pbb -> pbb.withRead(ByteBuffer::array)).orElseThrow())); } } diff --git a/src/main/java/org/huebert/iotfsdb/service/ImportService.java b/src/main/java/org/huebert/iotfsdb/service/ImportService.java index ff2fc5e8..a0c5b197 100644 --- a/src/main/java/org/huebert/iotfsdb/service/ImportService.java +++ b/src/main/java/org/huebert/iotfsdb/service/ImportService.java @@ -66,7 +66,7 @@ private static List convertPartition(PersistenceAdapter adapter, Par try { List result = new ArrayList<>(); ZonedDateTime current = TimeConverter.toUtc(partitionRange.getRange().lowerEndpoint()); - Iterator iterator = partitionRange.getStream(pbb.getByteBuffer()).iterator(); + Iterator iterator = partitionRange.getStream(pbb).iterator(); while (iterator.hasNext()) { Number value = iterator.next(); if (value != null) { diff --git a/src/main/java/org/huebert/iotfsdb/service/InsertService.java b/src/main/java/org/huebert/iotfsdb/service/InsertService.java index 599a04fb..e6f647e3 100644 --- a/src/main/java/org/huebert/iotfsdb/service/InsertService.java +++ b/src/main/java/org/huebert/iotfsdb/service/InsertService.java @@ -9,10 +9,10 @@ import org.huebert.iotfsdb.api.schema.SeriesDefinition; import org.huebert.iotfsdb.api.schema.SeriesFile; import org.huebert.iotfsdb.partition.PartitionAdapter; +import org.huebert.iotfsdb.persistence.PartitionByteBuffer; import org.springframework.stereotype.Service; import org.springframework.validation.annotation.Validated; -import java.nio.ByteBuffer; import java.time.LocalDateTime; import java.util.HashMap; import java.util.List; @@ -75,19 +75,21 @@ private void insertIntoPartition(PartitionKey key, List data, Collec PartitionRange details = partitionService.getRange(key); PartitionAdapter adapter = details.getAdapter(); details.withWrite(() -> { - ByteBuffer buffer = dataService.getBuffer(key, details.getSize(), adapter); - for (SeriesData value : data) { - LocalDateTime local = TimeConverter.toUtc(value.getTime()); - int index = details.getIndex(local); - Number putValue = value.getValue(); - if (collector != null) { - putValue = Stream.concat( - adapter.getStream(buffer, index, 1), - Stream.of(putValue) - ).collect(collector); + PartitionByteBuffer buffer = dataService.getBuffer(key, details.getSize(), adapter); + buffer.withWrite(b -> { + for (SeriesData value : data) { + LocalDateTime local = TimeConverter.toUtc(value.getTime()); + int index = details.getIndex(local); + Number putValue = value.getValue(); + if (collector != null) { + putValue = Stream.concat( + adapter.getStream(b, index, 1), + Stream.of(putValue) + ).collect(collector); + } + adapter.put(b, index, putValue); } - adapter.put(buffer, index, putValue); - } + }); }); } diff --git a/src/main/java/org/huebert/iotfsdb/service/PartitionRange.java b/src/main/java/org/huebert/iotfsdb/service/PartitionRange.java index 9d5f11b9..68c88327 100644 --- a/src/main/java/org/huebert/iotfsdb/service/PartitionRange.java +++ b/src/main/java/org/huebert/iotfsdb/service/PartitionRange.java @@ -6,12 +6,12 @@ import jakarta.validation.constraints.NotNull; import lombok.Getter; import org.huebert.iotfsdb.partition.PartitionAdapter; +import org.huebert.iotfsdb.persistence.PartitionByteBuffer; -import java.nio.ByteBuffer; import java.time.Duration; import java.time.LocalDateTime; +import java.util.List; import java.util.concurrent.locks.ReadWriteLock; -import java.util.stream.Stream; public class PartitionRange { @@ -56,15 +56,15 @@ public PartitionRange(PartitionKey key, Range range, Duration int size = getIndex(range.upperEndpoint()) + 1; } - public Stream getStream(ByteBuffer buffer) { - return adapter.getStream(buffer, 0, (int) size); + public List getStream(PartitionByteBuffer buffer) { + return buffer.withRead(b -> adapter.getStream(b, 0, (int) size).toList()); } - public Stream getStream(ByteBuffer buffer, Range current) { + public List getStream(PartitionByteBuffer buffer, Range current) { Range intersection = range.intersection(current); int fromIndex = getIndex(intersection.lowerEndpoint()); int toIndex = getIndex(intersection.upperEndpoint()); - return adapter.getStream(buffer, fromIndex, toIndex - fromIndex + 1); + return buffer.withRead(b -> adapter.getStream(b, fromIndex, toIndex - fromIndex + 1).toList()); } public int getIndex(LocalDateTime dateTime) { diff --git a/src/main/java/org/huebert/iotfsdb/service/QueryService.java b/src/main/java/org/huebert/iotfsdb/service/QueryService.java index e140455b..91e8fc7c 100644 --- a/src/main/java/org/huebert/iotfsdb/service/QueryService.java +++ b/src/main/java/org/huebert/iotfsdb/service/QueryService.java @@ -86,7 +86,7 @@ private SeriesData findDataOverPartitions(Collector collector private Stream findDataFromPartition(PartitionRange partitionRange, Range current) { return dataService.getBuffer(partitionRange.getKey()) - .map(b -> partitionRange.getStream(b, current)) + .map(b -> partitionRange.getStream(b, current).stream()) .orElse(Stream.empty()); } From db1ba2fe05d71b18dc8fe46cce79a248e8708635 Mon Sep 17 00:00:00 2001 From: Jason Huebert Date: Wed, 23 Jul 2025 11:25:52 -0500 Subject: [PATCH 2/2] feat: Simplify partition handling by removing unnecessary locks and refactoring read/write operations --- .../iotfsdb/api/ui/DataUiController.java | 1 - .../huebert/iotfsdb/service/CloneService.java | 27 ++++++------------ .../iotfsdb/service/ExportService.java | 2 +- .../iotfsdb/service/InsertService.java | 28 +++++++++---------- .../iotfsdb/service/PartitionRange.java | 16 +---------- .../iotfsdb/service/PartitionService.java | 3 +- .../huebert/iotfsdb/service/QueryService.java | 13 +++------ 7 files changed, 29 insertions(+), 61 deletions(-) diff --git a/src/main/java/org/huebert/iotfsdb/api/ui/DataUiController.java b/src/main/java/org/huebert/iotfsdb/api/ui/DataUiController.java index 735827eb..2d418d86 100644 --- a/src/main/java/org/huebert/iotfsdb/api/ui/DataUiController.java +++ b/src/main/java/org/huebert/iotfsdb/api/ui/DataUiController.java @@ -12,7 +12,6 @@ import org.huebert.iotfsdb.api.ui.service.PlotData; import org.huebert.iotfsdb.api.ui.service.SearchParser; import org.huebert.iotfsdb.service.QueryService; -import org.huebert.iotfsdb.service.TimeConverter; import org.huebert.iotfsdb.stats.CaptureStats; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Controller; diff --git a/src/main/java/org/huebert/iotfsdb/service/CloneService.java b/src/main/java/org/huebert/iotfsdb/service/CloneService.java index 845ed94a..e4471309 100644 --- a/src/main/java/org/huebert/iotfsdb/service/CloneService.java +++ b/src/main/java/org/huebert/iotfsdb/service/CloneService.java @@ -16,7 +16,6 @@ import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.UUID; @@ -78,15 +77,12 @@ public void updateDefinition(@NotBlank String id, @NotNull @Valid SeriesDefiniti private List convertPartition(PartitionKey partition) { PartitionRange range = partitionService.getRange(partition); List result = new ArrayList<>(); - range.withRead(() -> { - PartitionByteBuffer buffer = dataService.getBuffer(partition).orElseThrow(); - ZonedDateTime current = TimeConverter.toUtc(range.getRange().lowerEndpoint()); - Iterator iterator = range.getStream(buffer).iterator(); - while (iterator.hasNext()) { - result.add(new SeriesData(current, iterator.next())); - current = current.plus(range.getInterval()); - } - }); + PartitionByteBuffer buffer = dataService.getBuffer(partition).orElseThrow(); + ZonedDateTime current = TimeConverter.toUtc(range.getRange().lowerEndpoint()); + for (Number number : range.getStream(buffer)) { + result.add(new SeriesData(current, number)); + current = current.plus(range.getInterval()); + } return result; } @@ -113,14 +109,9 @@ private SeriesFile cloneSeriesFile(String sourceId, String destinationId) { private void clonePartition(PartitionKey sourceKey, String destinationId) { PartitionKey destinationKey = new PartitionKey(destinationId, sourceKey.partitionId()); PartitionRange sourceRange = partitionService.getRange(sourceKey); - PartitionRange destinationRange = partitionService.getRange(destinationKey); - sourceRange.withRead(() -> { - PartitionByteBuffer sourceBuffer = dataService.getBuffer(sourceKey).orElseThrow(); - destinationRange.withWrite(() -> { - PartitionByteBuffer destinationBuffer = dataService.getBuffer(destinationKey, sourceRange.getSize(), sourceRange.getAdapter()); - destinationBuffer.withWrite(db -> sourceBuffer.withRead(db::put)); - }); - }); + PartitionByteBuffer sourceBuffer = dataService.getBuffer(sourceKey).orElseThrow(); + PartitionByteBuffer destinationBuffer = dataService.getBuffer(destinationKey, sourceRange.getSize(), sourceRange.getAdapter()); + destinationBuffer.withWrite(db -> sourceBuffer.withRead(db::put)); } } diff --git a/src/main/java/org/huebert/iotfsdb/service/ExportService.java b/src/main/java/org/huebert/iotfsdb/service/ExportService.java index b2c3784b..20cddfca 100644 --- a/src/main/java/org/huebert/iotfsdb/service/ExportService.java +++ b/src/main/java/org/huebert/iotfsdb/service/ExportService.java @@ -56,7 +56,7 @@ private void writeSeriesFileToZip(ZipOutputStream zos, SeriesFile seriesFile) th private void writePartitionsToZip(ZipOutputStream zos, String seriesId) throws IOException { for (PartitionKey key : dataService.getPartitions(seriesId)) { - partitionService.getRange(key).withRead(() -> addToZip(zos, key.seriesId(), key.partitionId(), dataService.getBuffer(key).map(pbb -> pbb.withRead(ByteBuffer::array)).orElseThrow())); + addToZip(zos, key.seriesId(), key.partitionId(), dataService.getBuffer(key).map(pbb -> pbb.withRead(ByteBuffer::array)).orElseThrow()); } } diff --git a/src/main/java/org/huebert/iotfsdb/service/InsertService.java b/src/main/java/org/huebert/iotfsdb/service/InsertService.java index e6f647e3..78e70a0d 100644 --- a/src/main/java/org/huebert/iotfsdb/service/InsertService.java +++ b/src/main/java/org/huebert/iotfsdb/service/InsertService.java @@ -74,22 +74,20 @@ public void insert(@Valid @NotNull InsertRequest request) { private void insertIntoPartition(PartitionKey key, List data, Collector collector) { PartitionRange details = partitionService.getRange(key); PartitionAdapter adapter = details.getAdapter(); - details.withWrite(() -> { - PartitionByteBuffer buffer = dataService.getBuffer(key, details.getSize(), adapter); - buffer.withWrite(b -> { - for (SeriesData value : data) { - LocalDateTime local = TimeConverter.toUtc(value.getTime()); - int index = details.getIndex(local); - Number putValue = value.getValue(); - if (collector != null) { - putValue = Stream.concat( - adapter.getStream(b, index, 1), - Stream.of(putValue) - ).collect(collector); - } - adapter.put(b, index, putValue); + PartitionByteBuffer buffer = dataService.getBuffer(key, details.getSize(), adapter); + buffer.withWrite(b -> { + for (SeriesData value : data) { + LocalDateTime local = TimeConverter.toUtc(value.getTime()); + int index = details.getIndex(local); + Number putValue = value.getValue(); + if (collector != null) { + putValue = Stream.concat( + adapter.getStream(b, index, 1), + Stream.of(putValue) + ).collect(collector); } - }); + adapter.put(b, index, putValue); + } }); } diff --git a/src/main/java/org/huebert/iotfsdb/service/PartitionRange.java b/src/main/java/org/huebert/iotfsdb/service/PartitionRange.java index 68c88327..be18f545 100644 --- a/src/main/java/org/huebert/iotfsdb/service/PartitionRange.java +++ b/src/main/java/org/huebert/iotfsdb/service/PartitionRange.java @@ -11,7 +11,6 @@ import java.time.Duration; import java.time.LocalDateTime; import java.util.List; -import java.util.concurrent.locks.ReadWriteLock; public class PartitionRange { @@ -32,10 +31,6 @@ public class PartitionRange { @NotNull private final PartitionAdapter adapter; - @Getter - @NotNull - private final ReadWriteLock rwLock; - private final long intervalMillis; private final LocalDateTime lowerEndpoint; @@ -43,12 +38,11 @@ public class PartitionRange { @Getter private final long size; - public PartitionRange(PartitionKey key, Range range, Duration interval, PartitionAdapter adapter, ReadWriteLock rwLock) { + public PartitionRange(PartitionKey key, Range range, Duration interval, PartitionAdapter adapter) { this.key = key; this.range = range; this.interval = interval; this.adapter = adapter; - this.rwLock = rwLock; intervalMillis = interval.toMillis(); lowerEndpoint = range.lowerEndpoint(); @@ -71,12 +65,4 @@ public int getIndex(LocalDateTime dateTime) { return (int) (Duration.between(lowerEndpoint, dateTime).toMillis() / intervalMillis); } - public void withRead(LockUtil.RunnableWithException runnable) { - LockUtil.withRead(rwLock, runnable); - } - - public void withWrite(LockUtil.RunnableWithException runnable) { - LockUtil.withWrite(rwLock, runnable); - } - } diff --git a/src/main/java/org/huebert/iotfsdb/service/PartitionService.java b/src/main/java/org/huebert/iotfsdb/service/PartitionService.java index 5adb87e1..b32e0384 100644 --- a/src/main/java/org/huebert/iotfsdb/service/PartitionService.java +++ b/src/main/java/org/huebert/iotfsdb/service/PartitionService.java @@ -33,7 +33,6 @@ import java.util.EnumSet; import java.util.Map; import java.util.Set; -import java.util.concurrent.locks.ReentrantReadWriteLock; @Validated @Slf4j @@ -95,7 +94,7 @@ private PartitionRange calculateRange(PartitionKey key) { public static PartitionRange calculateRange(SeriesDefinition definition, PartitionKey key) { Range range = getRange(definition, key.partitionId()); PartitionAdapter adapter = getAdapter(definition); - return new PartitionRange(key, range, definition.getIntervalDuration(), adapter, new ReentrantReadWriteLock()); + return new PartitionRange(key, range, definition.getIntervalDuration(), adapter); } private static Range getRange(SeriesDefinition definition, String partitionId) { diff --git a/src/main/java/org/huebert/iotfsdb/service/QueryService.java b/src/main/java/org/huebert/iotfsdb/service/QueryService.java index 91e8fc7c..51cb931b 100644 --- a/src/main/java/org/huebert/iotfsdb/service/QueryService.java +++ b/src/main/java/org/huebert/iotfsdb/service/QueryService.java @@ -73,15 +73,10 @@ private FindDataResponse findDataForSeries(@Valid @NotNull FindDataRequest reque private SeriesData findDataOverPartitions(Collector collector, RangeMap rangeMap, Range current) { Range local = TimeConverter.toUtc(current); Collection covered = rangeMap.subRangeMap(local).asMapOfRanges().values(); - covered.forEach(c -> c.getRwLock().readLock().lock()); - try { - Number value = covered.stream() - .flatMap(pr -> findDataFromPartition(pr, local)) - .collect(collector); - return new SeriesData(current.lowerEndpoint(), value); - } finally { - covered.forEach(c -> c.getRwLock().readLock().unlock()); - } + Number value = covered.stream() + .flatMap(pr -> findDataFromPartition(pr, local)) + .collect(collector); + return new SeriesData(current.lowerEndpoint(), value); } private Stream findDataFromPartition(PartitionRange partitionRange, Range current) {