From efe39e6308ea05300c47f82cb608769121833b57 Mon Sep 17 00:00:00 2001 From: Fred Park Date: Mon, 19 Jan 2026 11:31:21 -0800 Subject: [PATCH] feat: Add getter for fetching pre-batch buffer sizes --- CHANGELOG.md | 4 + client/README.md | 34 ++ .../java/com/influxdb/client/WriteApi.java | 51 +++ .../com/influxdb/client/WriteOptions.java | 31 +- .../client/internal/AbstractWriteClient.java | 51 ++- .../client/internal/WriteApiImpl.java | 36 +++ .../FlowableBufferTimedFlushable.java | 37 ++- .../client/PreBatchBufferSizeTest.java | 303 ++++++++++++++++++ .../com/influxdb/client/WriteOptionsTest.java | 3 + .../FlowableBufferTimedFlushableTest.java | 207 ++++++++++++ 10 files changed, 749 insertions(+), 8 deletions(-) create mode 100644 client/src/test/java/com/influxdb/client/PreBatchBufferSizeTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index bfafe20d584..c954082a4f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## 7.6.0 [unreleased] +### Features + +- [TODO](): Add getter for fetching pre-batch buffer sizes + ## 7.5.0 [2026-01-13] ### Features diff --git a/client/README.md b/client/README.md index 883f7626639..3dc02b6fc46 100644 --- a/client/README.md +++ b/client/README.md @@ -613,6 +613,7 @@ The writes are processed in batches which are configurable by `WriteOptions`: | **backpressureStrategy** | the strategy to deal with buffer overflow | DROP_OLDEST | | **captureBackpressureData** | whether to capture affected data points in backpressure events | false | | **concatMapPrefetch** | the number of upstream items to prefetch for the concatMapMaybe operator | 2 | +| **enableBufferTracking** | whether to enable pre-batch buffer size tracking via `getPreBatchBufferSize()` | false | There is also a synchronous blocking version of `WriteApi` - [WriteApiBlocking](#writing-data-using-synchronous-blocking-api). @@ -680,6 +681,39 @@ writeApi.listenEvents(BackpressureEvent.class, backpressureEvent -> { Note: Disabling `captureBackpressureData` can improve performance when backpressure data capture is not needed. +##### Buffer Size Monitoring + +The `WriteApi` provides methods to monitor the number of data points waiting in the pre-batch buffer before being sent to InfluxDB. This is useful for monitoring write throughput and detecting potential bottlenecks. + +Each unique combination of bucket, organization, precision, and consistency has its own independent buffer. + +```java +WriteApi writeApi = influxDBClient.makeWriteApi(WriteOptions.builder() + .batchSize(1000) + .flushInterval(1000) + .build()); + +// Write some data +writeApi.writeRecord("my-bucket", "my-org", WritePrecision.NS, "measurement,tag=value field=1"); + +// Get buffer size for a specific destination +int bufferSize = writeApi.getPreBatchBufferSize("my-bucket", "my-org", WritePrecision.NS); +System.out.println("Points waiting to be batched: " + bufferSize); + +// Get all buffer sizes across all destinations +Map allSizes = writeApi.getPreBatchBufferSizes(); +allSizes.forEach((params, size) -> + System.out.println(params + ": " + size + " points")); +``` + +To enable pre-batch buffer tracking: + +```java +WriteOptions options = WriteOptions.builder() + .enableBufferTracking(true) + .build(); +``` + #### Writing data ##### By POJO diff --git a/client/src/main/java/com/influxdb/client/WriteApi.java b/client/src/main/java/com/influxdb/client/WriteApi.java index 3045f50437d..8d16962fc0c 100644 --- a/client/src/main/java/com/influxdb/client/WriteApi.java +++ b/client/src/main/java/com/influxdb/client/WriteApi.java @@ -22,10 +22,12 @@ package com.influxdb.client; import java.util.List; +import java.util.Map; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; +import com.influxdb.client.domain.WriteConsistency; import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.write.Point; import com.influxdb.client.write.WriteParameters; @@ -283,6 +285,55 @@ ListenerRegistration listenEvents(@Nonnull final */ void flush(); + /** + * Returns the current pre-batch buffer size for the specified destination. + * This represents the number of data points waiting to be batched for the given + * bucket, organization, precision, and consistency combination. + * + *

+ * Note: Each unique combination of (bucket, org, precision, consistency) has its own + * independent pre-batch buffer. The parameters must match exactly how the data was written. + *

+ * + * @param bucket the destination bucket + * @param org the destination organization + * @param precision the write precision + * @return current number of points waiting to be batched, or 0 if no buffer exists + */ + int getPreBatchBufferSize(@Nonnull final String bucket, + @Nonnull final String org, + @Nonnull final WritePrecision precision); + + /** + * Returns the current pre-batch buffer size for the specified destination. + * + * @param bucket the destination bucket + * @param org the destination organization + * @param precision the write precision + * @param consistency the write consistency (for InfluxDB Enterprise clusters) + * @return current number of points waiting to be batched, or 0 if no buffer exists + */ + int getPreBatchBufferSize(@Nonnull final String bucket, + @Nonnull final String org, + @Nonnull final WritePrecision precision, + @Nullable final WriteConsistency consistency); + + /** + * Returns the current pre-batch buffer size for the specified destination. + * + * @param params the write parameters identifying the destination + * @return current number of points waiting to be batched, or 0 if no buffer exists + */ + int getPreBatchBufferSize(@Nonnull final WriteParameters params); + + /** + * Returns a snapshot of all current pre-batch buffer sizes. + * + * @return map of WriteParameters to current buffer size + */ + @Nonnull + Map getPreBatchBufferSizes(); + /** * Close threads for asynchronous batch writing. */ diff --git a/client/src/main/java/com/influxdb/client/WriteOptions.java b/client/src/main/java/com/influxdb/client/WriteOptions.java index bb0a428bc35..4ed0c10d8fd 100644 --- a/client/src/main/java/com/influxdb/client/WriteOptions.java +++ b/client/src/main/java/com/influxdb/client/WriteOptions.java @@ -45,6 +45,7 @@ *
  • bufferLimit = 10_000
  • *
  • concatMapPrefetch = 2
  • *
  • captureBackpressureData = false
  • + *
  • enableBufferTracking = false
  • * *

    * The default backpressure strategy is {@link BackpressureOverflowStrategy#DROP_OLDEST}. @@ -66,6 +67,7 @@ public final class WriteOptions implements WriteApi.RetryOptions { public static final int DEFAULT_BUFFER_LIMIT = 10000; public static final int DEFAULT_CONCAT_MAP_PREFETCH = 2; public static final boolean DEFAULT_CAPTURE_BACKPRESSURE_DATA = false; + public static final boolean DEFAULT_ENABLE_BUFFER_TRACKING = false; /** * Default configuration with values that are consistent with Telegraf. @@ -85,6 +87,7 @@ public final class WriteOptions implements WriteApi.RetryOptions { private final Scheduler writeScheduler; private final BackpressureOverflowStrategy backpressureStrategy; private final boolean captureBackpressureData; + private final boolean enableBufferTracking; /** * @return the number of data point to collect in batch @@ -214,6 +217,14 @@ public boolean getCaptureBackpressureData() { return captureBackpressureData; } + /** + * @return whether to enable pre-batch buffer size tracking + * @see WriteOptions.Builder#enableBufferTracking(boolean) + */ + public boolean getEnableBufferTracking() { + return enableBufferTracking; + } + private WriteOptions(@Nonnull final Builder builder) { Arguments.checkNotNull(builder, "WriteOptions.Builder"); @@ -231,6 +242,7 @@ private WriteOptions(@Nonnull final Builder builder) { writeScheduler = builder.writeScheduler; backpressureStrategy = builder.backpressureStrategy; captureBackpressureData = builder.captureBackpressureData; + enableBufferTracking = builder.enableBufferTracking; } /** @@ -262,6 +274,7 @@ public static class Builder { private Scheduler writeScheduler = Schedulers.newThread(); private BackpressureOverflowStrategy backpressureStrategy = BackpressureOverflowStrategy.DROP_OLDEST; private boolean captureBackpressureData = DEFAULT_CAPTURE_BACKPRESSURE_DATA; + private boolean enableBufferTracking = DEFAULT_ENABLE_BUFFER_TRACKING; /** * Set the number of data point to collect in batch. @@ -455,6 +468,22 @@ public Builder captureBackpressureData(final boolean captureBackpressureData) { return this; } + /** + * Set whether to enable pre-batch buffer size tracking. + * + * When enabled, the WriteApi tracks the number of data points waiting in the pre-batch buffer + * for each destination (bucket, org, precision, consistency combination). This allows monitoring + * buffer sizes via {@link WriteApi#getPreBatchBufferSize} and {@link WriteApi#getPreBatchBufferSizes}. + * + * @param enableBufferTracking whether to enable buffer size tracking. + * @return {@code this} + */ + @Nonnull + public Builder enableBufferTracking(final boolean enableBufferTracking) { + this.enableBufferTracking = enableBufferTracking; + return this; + } + /** * Build an instance of WriteOptions. * @@ -466,4 +495,4 @@ public WriteOptions build() { return new WriteOptions(this); } } -} \ No newline at end of file +} diff --git a/client/src/main/java/com/influxdb/client/internal/AbstractWriteClient.java b/client/src/main/java/com/influxdb/client/internal/AbstractWriteClient.java index 004f997c4e1..8d1025e1a3a 100644 --- a/client/src/main/java/com/influxdb/client/internal/AbstractWriteClient.java +++ b/client/src/main/java/com/influxdb/client/internal/AbstractWriteClient.java @@ -22,10 +22,13 @@ package com.influxdb.client.internal; import java.util.Collection; +import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Supplier; import java.util.logging.Level; @@ -90,6 +93,7 @@ public abstract class AbstractWriteClient extends AbstractRestClient implements private final Collection autoCloseables; private AtomicBoolean finished = new AtomicBoolean(false); + private final ConcurrentHashMap preBatchBufferSizes = new ConcurrentHashMap<>(); public AbstractWriteClient(@Nonnull final WriteOptions writeOptions, @Nonnull final InfluxDBClientOptions options, @@ -124,15 +128,30 @@ public AbstractWriteClient(@Nonnull final WriteOptions writeOptions, // // Use Buffer to create Batch Items // - .compose(source -> - new FlowableBufferTimedFlushable<>( + .compose(source -> { + if (writeOptions.getEnableBufferTracking()) { + AtomicInteger groupSize = preBatchBufferSizes.computeIfAbsent( + group.getKey(), k -> new AtomicInteger(0)); + return new FlowableBufferTimedFlushable<>( + source, + flushPublisher, + writeOptions.getFlushInterval(), + TimeUnit.MILLISECONDS, + writeOptions.getBatchSize(), processorScheduler, + ArrayListSupplier.asSupplier(), + groupSize::set + ); + } else { + return new FlowableBufferTimedFlushable<>( source, flushPublisher, writeOptions.getFlushInterval(), TimeUnit.MILLISECONDS, writeOptions.getBatchSize(), processorScheduler, ArrayListSupplier.asSupplier() - )) + ); + } + }) // // Collect Batch items into one Write Item // @@ -193,6 +212,32 @@ public void flush() { flushPublisher.offer(true); } + /** + * Returns the current pre-batch buffer size for the specified destination. + * This represents the number of data points waiting to be batched for the given + * bucket, organization, precision, and consistency combination. + * + * @param params the write parameters identifying the destination + * @return current number of points waiting to be batched, or 0 if no buffer exists + */ + public int getPreBatchBufferSize(@Nonnull final WriteParameters params) { + Arguments.checkNotNull(params, "WriteParameters"); + AtomicInteger size = preBatchBufferSizes.get(params); + return size != null ? size.get() : 0; + } + + /** + * Returns a snapshot of all current pre-batch buffer sizes. + * + * @return map of WriteParameters to current buffer size + */ + @Nonnull + public Map getPreBatchBufferSizes() { + ConcurrentHashMap result = new ConcurrentHashMap<>(); + preBatchBufferSizes.forEach((key, value) -> result.put(key, value.get())); + return result; + } + public void close() { LOG.log(Level.FINE, "Flushing any cached BatchWrites before shutdown."); diff --git a/client/src/main/java/com/influxdb/client/internal/WriteApiImpl.java b/client/src/main/java/com/influxdb/client/internal/WriteApiImpl.java index 3801b97c868..459b795c60f 100644 --- a/client/src/main/java/com/influxdb/client/internal/WriteApiImpl.java +++ b/client/src/main/java/com/influxdb/client/internal/WriteApiImpl.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -31,6 +32,7 @@ import com.influxdb.client.InfluxDBClientOptions; import com.influxdb.client.WriteApi; import com.influxdb.client.WriteOptions; +import com.influxdb.client.domain.WriteConsistency; import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.service.WriteService; import com.influxdb.client.write.Point; @@ -269,6 +271,40 @@ public ListenerRegistration listenEvents(@Nonnull return subscribe::dispose; } + @Override + public int getPreBatchBufferSize(@Nonnull final String bucket, + @Nonnull final String org, + @Nonnull final WritePrecision precision) { + Arguments.checkNonEmpty(bucket, "bucket"); + Arguments.checkNonEmpty(org, "org"); + Arguments.checkNotNull(precision, "WritePrecision"); + + return getPreBatchBufferSize(new WriteParameters(bucket, org, precision)); + } + + @Override + public int getPreBatchBufferSize(@Nonnull final String bucket, + @Nonnull final String org, + @Nonnull final WritePrecision precision, + @Nullable final WriteConsistency consistency) { + Arguments.checkNonEmpty(bucket, "bucket"); + Arguments.checkNonEmpty(org, "org"); + Arguments.checkNotNull(precision, "WritePrecision"); + + return getPreBatchBufferSize(new WriteParameters(bucket, org, precision, consistency)); + } + + @Override + public int getPreBatchBufferSize(@Nonnull final WriteParameters params) { + return super.getPreBatchBufferSize(params); + } + + @Override + @Nonnull + public Map getPreBatchBufferSizes() { + return super.getPreBatchBufferSizes(); + } + @Override public void close() { super.close(); diff --git a/client/src/main/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushable.java b/client/src/main/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushable.java index c487d8980be..6d61733d0ef 100644 --- a/client/src/main/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushable.java +++ b/client/src/main/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushable.java @@ -4,6 +4,7 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import io.reactivex.rxjava3.annotations.NonNull; import io.reactivex.rxjava3.core.Flowable; @@ -48,6 +49,7 @@ public final class FlowableBufferTimedFlushable> ex final Supplier bufferSupplier; final int maxSize; final boolean restartTimerOnMaxSize; + final Consumer bufferSizeConsumer; public FlowableBufferTimedFlushable(Publisher source, Publisher flusher, @@ -56,6 +58,17 @@ public FlowableBufferTimedFlushable(Publisher source, int maxSize, Scheduler scheduler, Supplier bufferSupplier) { + this(source, flusher, timespan, unit, maxSize, scheduler, bufferSupplier, null); + } + + public FlowableBufferTimedFlushable(Publisher source, + Publisher flusher, + long timespan, + TimeUnit unit, + int maxSize, + Scheduler scheduler, + Supplier bufferSupplier, + Consumer bufferSizeConsumer) { this.source = source; this.flusher = flusher; this.timespan = timespan; @@ -65,11 +78,12 @@ public FlowableBufferTimedFlushable(Publisher source, this.bufferSupplier = bufferSupplier; this.maxSize = maxSize; this.restartTimerOnMaxSize = true; + this.bufferSizeConsumer = bufferSizeConsumer; } @Override public @NonNull Publisher apply(@NonNull final Flowable upstream) { - return new FlowableBufferTimedFlushable<>(upstream, flusher, timeskip, unit, maxSize, scheduler, bufferSupplier); + return new FlowableBufferTimedFlushable<>(upstream, flusher, timeskip, unit, maxSize, scheduler, bufferSupplier, bufferSizeConsumer); } @Override @@ -78,7 +92,7 @@ protected void subscribeActual(@NonNull final Subscriber subscriber) source.subscribe(new BufferExactBoundedSubscriber<>( new SerializedSubscriber<>(subscriber), bufferSupplier, - timespan, unit, maxSize, restartTimerOnMaxSize, w, flusher + timespan, unit, maxSize, restartTimerOnMaxSize, w, flusher, bufferSizeConsumer )); } @@ -92,6 +106,7 @@ static final class BufferExactBoundedSubscriber flusher; + final Consumer bufferSizeConsumer; U buffer; @@ -107,7 +122,8 @@ static final class BufferExactBoundedSubscriber actual, Supplier bufferSupplier, long timespan, TimeUnit unit, int maxSize, - boolean restartOnMaxSize, Worker w, Publisher flusher) { + boolean restartOnMaxSize, Worker w, Publisher flusher, + Consumer bufferSizeConsumer) { super(actual, new MpscLinkedQueue<>()); this.bufferSupplier = bufferSupplier; this.timespan = timespan; @@ -116,6 +132,7 @@ static final class BufferExactBoundedSubscriber records = Arrays.asList( + "measurement,tag=value field=1i 1", + "measurement,tag=value field=2i 2", + "measurement,tag=value field=3i 3", + "measurement,tag=value field=4i 4", + "measurement,tag=value field=5i 5" + ); + writeApi.writeRecords("test-bucket", "test-org", WritePrecision.NS, records); + + Thread.sleep(100); + + Assertions.assertThat(writeApi.getPreBatchBufferSize("test-bucket", "test-org", WritePrecision.NS)) + .isEqualTo(5); + } + + @Test + void testBufferResetsAfterFlush() throws InterruptedException { + mockServer.enqueue(createResponse("{}")); + + writeApi = influxDBClient.makeWriteApi(WriteOptions.builder() + .batchSize(100) + .flushInterval(10_000) + .enableBufferTracking(true) + .writeScheduler(scheduler) + .build()); + + WriteEventListener listener = new WriteEventListener<>(); + writeApi.listenEvents(WriteSuccessEvent.class, listener); + + writeApi.writeRecord("test-bucket", "test-org", WritePrecision.NS, + "measurement,tag=value field=1i 1"); + + writeApi.flush(); + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + listener.awaitCount(1); + + Assertions.assertThat(writeApi.getPreBatchBufferSize("test-bucket", "test-org", WritePrecision.NS)) + .isEqualTo(0); + } + + @Test + void testBufferResetsWhenBatchSizeReached() throws InterruptedException { + mockServer.enqueue(createResponse("{}")); + + writeApi = influxDBClient.makeWriteApi(WriteOptions.builder() + .batchSize(3) + .flushInterval(10_000) + .enableBufferTracking(true) + .build()); + + Assertions.assertThat(writeApi.getPreBatchBufferSize("test-bucket", "test-org", WritePrecision.NS)) + .isEqualTo(0); + + WriteEventListener listener = new WriteEventListener<>(); + writeApi.listenEvents(WriteSuccessEvent.class, listener); + + writeApi.writeRecord("test-bucket", "test-org", WritePrecision.NS, "m,t=v f=1i 1"); + writeApi.writeRecord("test-bucket", "test-org", WritePrecision.NS, "m,t=v f=2i 2"); + Assertions.assertThat(writeApi.getPreBatchBufferSize("test-bucket", "test-org", WritePrecision.NS)) + .isEqualTo(2); + + writeApi.writeRecord("test-bucket", "test-org", WritePrecision.NS, "m,t=v f=3i 3"); + listener.awaitCount(1); + Assertions.assertThat(writeApi.getPreBatchBufferSize("test-bucket", "test-org", WritePrecision.NS)) + .isEqualTo(0); + } + + @Test + void testDifferentDestinationsTrackedSeparately() throws InterruptedException { + mockServer.enqueue(createResponse("{}")); + mockServer.enqueue(createResponse("{}")); + mockServer.enqueue(createResponse("{}")); + + writeApi = influxDBClient.makeWriteApi(WriteOptions.builder() + .batchSize(100) + .flushInterval(10_000) + .enableBufferTracking(true) + .writeScheduler(scheduler) + .build()); + + // Different bucket/org + writeApi.writeRecord("bucket1", "org1", WritePrecision.NS, "m,t=v f=1i 1"); + // Different precision + writeApi.writeRecord("bucket1", "org1", WritePrecision.MS, "m,t=v f=2i 2"); + // Different bucket + writeApi.writeRecord("bucket2", "org1", WritePrecision.NS, "m,t=v f=3i 3"); + + Thread.sleep(100); + + Assertions.assertThat(writeApi.getPreBatchBufferSize("bucket1", "org1", WritePrecision.NS)).isEqualTo(1); + Assertions.assertThat(writeApi.getPreBatchBufferSize("bucket1", "org1", WritePrecision.MS)).isEqualTo(1); + Assertions.assertThat(writeApi.getPreBatchBufferSize("bucket2", "org1", WritePrecision.NS)).isEqualTo(1); + + Map sizes = writeApi.getPreBatchBufferSizes(); + Assertions.assertThat(sizes).hasSize(3); + } + + @Test + void testConsistencyDistinguishesDifferentBuffers() { + writeApi = influxDBClient.makeWriteApi(WriteOptions.builder() + .batchSize(100) + .flushInterval(10_000) + .build()); + + WriteParameters paramsOne = new WriteParameters("bucket", "org", WritePrecision.NS, WriteConsistency.ONE); + WriteParameters paramsAll = new WriteParameters("bucket", "org", WritePrecision.NS, WriteConsistency.ALL); + + // Different consistency values should be different keys + Assertions.assertThat(paramsOne).isNotEqualTo(paramsAll); + + // Both should return 0 independently + Assertions.assertThat(writeApi.getPreBatchBufferSize(paramsOne)).isEqualTo(0); + Assertions.assertThat(writeApi.getPreBatchBufferSize(paramsAll)).isEqualTo(0); + + // Null consistency should work + Assertions.assertThat(writeApi.getPreBatchBufferSize("bucket", "org", WritePrecision.NS, null)) + .isEqualTo(0); + } + + @Test + void testInputValidation() { + writeApi = influxDBClient.makeWriteApi(WriteOptions.builder() + .batchSize(100) + .flushInterval(10_000) + .build()); + + // Empty bucket - 3 param overload + Assertions.assertThatThrownBy(() -> + writeApi.getPreBatchBufferSize("", "org", WritePrecision.NS)) + .isInstanceOf(IllegalArgumentException.class); + + // Empty org - 3 param overload + Assertions.assertThatThrownBy(() -> + writeApi.getPreBatchBufferSize("bucket", "", WritePrecision.NS)) + .isInstanceOf(IllegalArgumentException.class); + + // Null precision + Assertions.assertThatThrownBy(() -> + writeApi.getPreBatchBufferSize("bucket", "org", null)) + .isInstanceOf(NullPointerException.class); + + // Null WriteParameters + Assertions.assertThatThrownBy(() -> + writeApi.getPreBatchBufferSize(null)) + .isInstanceOf(NullPointerException.class); + + // Empty bucket - 4 param overload + Assertions.assertThatThrownBy(() -> + writeApi.getPreBatchBufferSize("", "org", WritePrecision.NS, WriteConsistency.ALL)) + .isInstanceOf(IllegalArgumentException.class); + + // Empty org - 4 param overload + Assertions.assertThatThrownBy(() -> + writeApi.getPreBatchBufferSize("bucket", "", WritePrecision.NS, WriteConsistency.ALL)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testBufferTrackingDisabled() throws InterruptedException { + mockServer.enqueue(createResponse("{}")); + + // Buffer tracking is disabled by default + writeApi = influxDBClient.makeWriteApi(WriteOptions.builder() + .batchSize(100) + .flushInterval(10_000) + .writeScheduler(scheduler) + .build()); + + // Write some records + writeApi.writeRecord("test-bucket", "test-org", WritePrecision.NS, + "measurement,tag=value field=1i 1"); + writeApi.writeRecord("test-bucket", "test-org", WritePrecision.NS, + "measurement,tag=value field=2i 2"); + + Thread.sleep(100); + + // Buffer size should always be 0 when tracking is disabled + Assertions.assertThat(writeApi.getPreBatchBufferSize("test-bucket", "test-org", WritePrecision.NS)) + .isEqualTo(0); + Assertions.assertThat(writeApi.getPreBatchBufferSizes()) + .isEmpty(); + } +} diff --git a/client/src/test/java/com/influxdb/client/WriteOptionsTest.java b/client/src/test/java/com/influxdb/client/WriteOptionsTest.java index 22f35a30fb9..e4f14d582d7 100644 --- a/client/src/test/java/com/influxdb/client/WriteOptionsTest.java +++ b/client/src/test/java/com/influxdb/client/WriteOptionsTest.java @@ -49,6 +49,7 @@ void defaults() { Assertions.assertThat(writeOptions.getWriteScheduler()).isEqualTo(Schedulers.newThread()); Assertions.assertThat(writeOptions.getBackpressureStrategy()).isEqualTo(BackpressureOverflowStrategy.DROP_OLDEST); Assertions.assertThat(writeOptions.getCaptureBackpressureData()).isFalse(); + Assertions.assertThat(writeOptions.getEnableBufferTracking()).isFalse(); } @Test @@ -67,6 +68,7 @@ void configure() { .writeScheduler(Schedulers.computation()) .backpressureStrategy(BackpressureOverflowStrategy.ERROR) .captureBackpressureData(true) + .enableBufferTracking(true) .build(); Assertions.assertThat(writeOptions.getBatchSize()).isEqualTo(10_000); @@ -81,6 +83,7 @@ void configure() { Assertions.assertThat(writeOptions.getWriteScheduler()).isEqualTo(Schedulers.computation()); Assertions.assertThat(writeOptions.getBackpressureStrategy()).isEqualTo(BackpressureOverflowStrategy.ERROR); Assertions.assertThat(writeOptions.getCaptureBackpressureData()).isTrue(); + Assertions.assertThat(writeOptions.getEnableBufferTracking()).isTrue(); } @Test diff --git a/client/src/test/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushableTest.java b/client/src/test/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushableTest.java index 4d9b248df8a..b47a86dc865 100644 --- a/client/src/test/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushableTest.java +++ b/client/src/test/java/com/influxdb/client/internal/flowable/FlowableBufferTimedFlushableTest.java @@ -5,8 +5,10 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.FlowableTransformer; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.internal.util.ArrayListSupplier; import io.reactivex.rxjava3.processors.PublishProcessor; @@ -75,4 +77,209 @@ public void byFlusher() { subscription.dispose(); } + + @Test + public void applyWithBufferSizeConsumer() { + AtomicInteger lastReportedSize = new AtomicInteger(-1); + + // Create the transformer instance + FlowableTransformer> transformer = + new FlowableBufferTimedFlushable<>( + Flowable.empty(), // dummy source, will be replaced by apply() + PublishProcessor.create(), + 1_000, + TimeUnit.SECONDS, + 4, + new TestScheduler(), + ArrayListSupplier.asSupplier(), + lastReportedSize::set + ); + + // this calls apply() + Flowable.just(1, 2, 3, 4, 5, 6, 7, 8) + .compose(transformer) + .test() + .assertResult( + Arrays.asList(1, 2, 3, 4), + Arrays.asList(5, 6, 7, 8), + Collections.emptyList() + ); + } + + @Test + public void bufferSizeConsumerNotifiedOnAdd() { + PublishProcessor flushPublisher = PublishProcessor.create(); + AtomicInteger lastReportedSize = new AtomicInteger(-1); + + List> results = new ArrayList<>(); + + PublishProcessor publisher = PublishProcessor.create(); + Disposable subscription = publisher + .compose(source -> + new FlowableBufferTimedFlushable<>( + source, + flushPublisher, + 1_000, + TimeUnit.SECONDS, + 10, // Large batch size so items accumulate + new TestScheduler(), + ArrayListSupplier.asSupplier(), + lastReportedSize::set + )) + .subscribe(results::add); + + // Add items and verify size is reported + publisher.offer(1); + Assertions.assertThat(lastReportedSize.get()).isEqualTo(1); + + publisher.offer(2); + Assertions.assertThat(lastReportedSize.get()).isEqualTo(2); + + publisher.offer(3); + Assertions.assertThat(lastReportedSize.get()).isEqualTo(3); + + subscription.dispose(); + } + + @Test + public void bufferSizeConsumerNotifiedZeroOnFlush() { + PublishProcessor flushPublisher = PublishProcessor.create(); + AtomicInteger lastReportedSize = new AtomicInteger(-1); + + List> results = new ArrayList<>(); + + PublishProcessor publisher = PublishProcessor.create(); + Disposable subscription = publisher + .compose(source -> + new FlowableBufferTimedFlushable<>( + source, + flushPublisher, + 1_000, + TimeUnit.SECONDS, + 10, + new TestScheduler(), + ArrayListSupplier.asSupplier(), + lastReportedSize::set + )) + .subscribe(results::add); + + // Add items + publisher.offer(1); + publisher.offer(2); + Assertions.assertThat(lastReportedSize.get()).isEqualTo(2); + + // Flush - should report 0 + flushPublisher.offer(true); + Assertions.assertThat(lastReportedSize.get()).isEqualTo(0); + + subscription.dispose(); + } + + @Test + public void bufferSizeConsumerNotifiedZeroOnBatchSizeReached() { + PublishProcessor flushPublisher = PublishProcessor.create(); + AtomicInteger lastReportedSize = new AtomicInteger(-1); + List reportedSizes = new ArrayList<>(); + + List> results = new ArrayList<>(); + + PublishProcessor publisher = PublishProcessor.create(); + Disposable subscription = publisher + .compose(source -> + new FlowableBufferTimedFlushable<>( + source, + flushPublisher, + 1_000, + TimeUnit.SECONDS, + 3, // Small batch size + new TestScheduler(), + ArrayListSupplier.asSupplier(), + size -> { + lastReportedSize.set(size); + reportedSizes.add(size); + } + )) + .subscribe(results::add); + + // Add items up to batch size + publisher.offer(1); + Assertions.assertThat(lastReportedSize.get()).isEqualTo(1); + + publisher.offer(2); + Assertions.assertThat(lastReportedSize.get()).isEqualTo(2); + + // Third item reaches batch size - should flush and report 0 + publisher.offer(3); + Assertions.assertThat(lastReportedSize.get()).isEqualTo(0); + + // Verify the batch was emitted + Assertions.assertThat(results).hasSize(1); + Assertions.assertThat(results.get(0)).isEqualTo(Arrays.asList(1, 2, 3)); + + subscription.dispose(); + } + + @Test + public void bufferSizeConsumerNullIsAllowed() { + // Test that null consumer doesn't cause issues + Flowable.just(1, 2, 3, 4) + .compose(source -> + new FlowableBufferTimedFlushable<>( + source, + PublishProcessor.create(), + 1_000, + TimeUnit.SECONDS, + 2, + new TestScheduler(), + ArrayListSupplier.asSupplier(), + null // null consumer + )) + .test() + .assertResult( + Arrays.asList(1, 2), + Arrays.asList(3, 4), + Collections.emptyList() + ); + } + + @Test + public void bufferSizeConsumerNotifiedOnTimerFlush() { + PublishProcessor flushPublisher = PublishProcessor.create(); + TestScheduler scheduler = new TestScheduler(); + AtomicInteger lastReportedSize = new AtomicInteger(-1); + + List> results = new ArrayList<>(); + + PublishProcessor publisher = PublishProcessor.create(); + Disposable subscription = publisher + .compose(source -> + new FlowableBufferTimedFlushable<>( + source, + flushPublisher, + 1, // 1 second flush interval + TimeUnit.SECONDS, + 100, // Large batch size + scheduler, + ArrayListSupplier.asSupplier(), + lastReportedSize::set + )) + .subscribe(results::add); + + // Add items + publisher.offer(1); + publisher.offer(2); + Assertions.assertThat(lastReportedSize.get()).isEqualTo(2); + + // Advance time by exactly 1 second to trigger single timer flush + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + // Should report 0 after timer flush + Assertions.assertThat(lastReportedSize.get()).isEqualTo(0); + + // Verify exactly one batch was emitted with our items + Assertions.assertThat(results).hasSize(1); + Assertions.assertThat(results.get(0)).isEqualTo(Arrays.asList(1, 2)); + + subscription.dispose(); + } }