Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 7.6.0 [unreleased]

### Features

- [TODO](): Add getter for fetching pre-batch buffer sizes

## 7.5.0 [2026-01-13]

### Features
Expand Down
34 changes: 34 additions & 0 deletions client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ The writes are processed in batches which are configurable by `WriteOptions`:
| **backpressureStrategy** | the strategy to deal with buffer overflow | DROP_OLDEST |
| **captureBackpressureData** | whether to capture affected data points in backpressure events | false |
| **concatMapPrefetch** | the number of upstream items to prefetch for the concatMapMaybe operator | 2 |
| **enableBufferTracking** | whether to enable pre-batch buffer size tracking via `getPreBatchBufferSize()` | false |

There is also a synchronous blocking version of `WriteApi` - [WriteApiBlocking](#writing-data-using-synchronous-blocking-api).

Expand Down Expand Up @@ -680,6 +681,39 @@ writeApi.listenEvents(BackpressureEvent.class, backpressureEvent -> {

Note: Disabling `captureBackpressureData` can improve performance when backpressure data capture is not needed.

##### Buffer Size Monitoring

The `WriteApi` provides methods to monitor the number of data points waiting in the pre-batch buffer before being sent to InfluxDB. This is useful for monitoring write throughput and detecting potential bottlenecks.

Each unique combination of bucket, organization, precision, and consistency has its own independent buffer.

```java
WriteApi writeApi = influxDBClient.makeWriteApi(WriteOptions.builder()
.batchSize(1000)
.flushInterval(1000)
.build());

// Write some data
writeApi.writeRecord("my-bucket", "my-org", WritePrecision.NS, "measurement,tag=value field=1");

// Get buffer size for a specific destination
int bufferSize = writeApi.getPreBatchBufferSize("my-bucket", "my-org", WritePrecision.NS);
System.out.println("Points waiting to be batched: " + bufferSize);

// Get all buffer sizes across all destinations
Map<WriteParameters, Integer> allSizes = writeApi.getPreBatchBufferSizes();
allSizes.forEach((params, size) ->
System.out.println(params + ": " + size + " points"));
```

To enable pre-batch buffer tracking:

```java
WriteOptions options = WriteOptions.builder()
.enableBufferTracking(true)
.build();
```

#### Writing data

##### By POJO
Expand Down
51 changes: 51 additions & 0 deletions client/src/main/java/com/influxdb/client/WriteApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
package com.influxdb.client;

import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

import com.influxdb.client.domain.WriteConsistency;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.influxdb.client.write.WriteParameters;
Expand Down Expand Up @@ -283,6 +285,55 @@ <T extends AbstractWriteEvent> ListenerRegistration listenEvents(@Nonnull final
*/
void flush();

/**
* Returns the current pre-batch buffer size for the specified destination.
* This represents the number of data points waiting to be batched for the given
* bucket, organization, precision, and consistency combination.
*
* <p>
* Note: Each unique combination of (bucket, org, precision, consistency) has its own
* independent pre-batch buffer. The parameters must match exactly how the data was written.
* </p>
*
* @param bucket the destination bucket
* @param org the destination organization
* @param precision the write precision
* @return current number of points waiting to be batched, or 0 if no buffer exists
*/
int getPreBatchBufferSize(@Nonnull final String bucket,
@Nonnull final String org,
@Nonnull final WritePrecision precision);

/**
* Returns the current pre-batch buffer size for the specified destination.
*
* @param bucket the destination bucket
* @param org the destination organization
* @param precision the write precision
* @param consistency the write consistency (for InfluxDB Enterprise clusters)
* @return current number of points waiting to be batched, or 0 if no buffer exists
*/
int getPreBatchBufferSize(@Nonnull final String bucket,
@Nonnull final String org,
@Nonnull final WritePrecision precision,
@Nullable final WriteConsistency consistency);

/**
* Returns the current pre-batch buffer size for the specified destination.
*
* @param params the write parameters identifying the destination
* @return current number of points waiting to be batched, or 0 if no buffer exists
*/
int getPreBatchBufferSize(@Nonnull final WriteParameters params);

/**
* Returns a snapshot of all current pre-batch buffer sizes.
*
* @return map of WriteParameters to current buffer size
*/
@Nonnull
Map<WriteParameters, Integer> getPreBatchBufferSizes();

/**
* Close threads for asynchronous batch writing.
*/
Expand Down
31 changes: 30 additions & 1 deletion client/src/main/java/com/influxdb/client/WriteOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
* <li>bufferLimit = 10_000</li>
* <li>concatMapPrefetch = 2</li>
* <li>captureBackpressureData = false</li>
* <li>enableBufferTracking = false</li>
* </ul>
* <p>
* The default backpressure strategy is {@link BackpressureOverflowStrategy#DROP_OLDEST}.
Expand All @@ -66,6 +67,7 @@ public final class WriteOptions implements WriteApi.RetryOptions {
public static final int DEFAULT_BUFFER_LIMIT = 10000;
public static final int DEFAULT_CONCAT_MAP_PREFETCH = 2;
public static final boolean DEFAULT_CAPTURE_BACKPRESSURE_DATA = false;
public static final boolean DEFAULT_ENABLE_BUFFER_TRACKING = false;

/**
* Default configuration with values that are consistent with Telegraf.
Expand All @@ -85,6 +87,7 @@ public final class WriteOptions implements WriteApi.RetryOptions {
private final Scheduler writeScheduler;
private final BackpressureOverflowStrategy backpressureStrategy;
private final boolean captureBackpressureData;
private final boolean enableBufferTracking;

/**
* @return the number of data point to collect in batch
Expand Down Expand Up @@ -214,6 +217,14 @@ public boolean getCaptureBackpressureData() {
return captureBackpressureData;
}

/**
* @return whether to enable pre-batch buffer size tracking
* @see WriteOptions.Builder#enableBufferTracking(boolean)
*/
public boolean getEnableBufferTracking() {
return enableBufferTracking;
}

private WriteOptions(@Nonnull final Builder builder) {

Arguments.checkNotNull(builder, "WriteOptions.Builder");
Expand All @@ -231,6 +242,7 @@ private WriteOptions(@Nonnull final Builder builder) {
writeScheduler = builder.writeScheduler;
backpressureStrategy = builder.backpressureStrategy;
captureBackpressureData = builder.captureBackpressureData;
enableBufferTracking = builder.enableBufferTracking;
}

/**
Expand Down Expand Up @@ -262,6 +274,7 @@ public static class Builder {
private Scheduler writeScheduler = Schedulers.newThread();
private BackpressureOverflowStrategy backpressureStrategy = BackpressureOverflowStrategy.DROP_OLDEST;
private boolean captureBackpressureData = DEFAULT_CAPTURE_BACKPRESSURE_DATA;
private boolean enableBufferTracking = DEFAULT_ENABLE_BUFFER_TRACKING;

/**
* Set the number of data point to collect in batch.
Expand Down Expand Up @@ -455,6 +468,22 @@ public Builder captureBackpressureData(final boolean captureBackpressureData) {
return this;
}

/**
* Set whether to enable pre-batch buffer size tracking.
*
* When enabled, the WriteApi tracks the number of data points waiting in the pre-batch buffer
* for each destination (bucket, org, precision, consistency combination). This allows monitoring
* buffer sizes via {@link WriteApi#getPreBatchBufferSize} and {@link WriteApi#getPreBatchBufferSizes}.
*
* @param enableBufferTracking whether to enable buffer size tracking.
* @return {@code this}
*/
@Nonnull
public Builder enableBufferTracking(final boolean enableBufferTracking) {
this.enableBufferTracking = enableBufferTracking;
return this;
}

/**
* Build an instance of WriteOptions.
*
Expand All @@ -466,4 +495,4 @@ public WriteOptions build() {
return new WriteOptions(this);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
package com.influxdb.client.internal;

import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.logging.Level;
Expand Down Expand Up @@ -90,6 +93,7 @@ public abstract class AbstractWriteClient extends AbstractRestClient implements
private final Collection<AutoCloseable> autoCloseables;

private AtomicBoolean finished = new AtomicBoolean(false);
private final ConcurrentHashMap<WriteParameters, AtomicInteger> preBatchBufferSizes = new ConcurrentHashMap<>();

public AbstractWriteClient(@Nonnull final WriteOptions writeOptions,
@Nonnull final InfluxDBClientOptions options,
Expand Down Expand Up @@ -124,15 +128,30 @@ public AbstractWriteClient(@Nonnull final WriteOptions writeOptions,
//
// Use Buffer to create Batch Items
//
.compose(source ->
new FlowableBufferTimedFlushable<>(
.compose(source -> {
if (writeOptions.getEnableBufferTracking()) {
AtomicInteger groupSize = preBatchBufferSizes.computeIfAbsent(
group.getKey(), k -> new AtomicInteger(0));
return new FlowableBufferTimedFlushable<>(
source,
flushPublisher,
writeOptions.getFlushInterval(),
TimeUnit.MILLISECONDS,
writeOptions.getBatchSize(), processorScheduler,
ArrayListSupplier.asSupplier(),
groupSize::set
);
} else {
return new FlowableBufferTimedFlushable<>(
source,
flushPublisher,
writeOptions.getFlushInterval(),
TimeUnit.MILLISECONDS,
writeOptions.getBatchSize(), processorScheduler,
ArrayListSupplier.asSupplier()
))
);
}
})
//
// Collect Batch items into one Write Item
//
Expand Down Expand Up @@ -193,6 +212,32 @@ public void flush() {
flushPublisher.offer(true);
}

/**
* Returns the current pre-batch buffer size for the specified destination.
* This represents the number of data points waiting to be batched for the given
* bucket, organization, precision, and consistency combination.
*
* @param params the write parameters identifying the destination
* @return current number of points waiting to be batched, or 0 if no buffer exists
*/
public int getPreBatchBufferSize(@Nonnull final WriteParameters params) {
Arguments.checkNotNull(params, "WriteParameters");
AtomicInteger size = preBatchBufferSizes.get(params);
return size != null ? size.get() : 0;
}

/**
* Returns a snapshot of all current pre-batch buffer sizes.
*
* @return map of WriteParameters to current buffer size
*/
@Nonnull
public Map<WriteParameters, Integer> getPreBatchBufferSizes() {
ConcurrentHashMap<WriteParameters, Integer> result = new ConcurrentHashMap<>();
preBatchBufferSizes.forEach((key, value) -> result.put(key, value.get()));
return result;
}

public void close() {

LOG.log(Level.FINE, "Flushing any cached BatchWrites before shutdown.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import com.influxdb.client.InfluxDBClientOptions;
import com.influxdb.client.WriteApi;
import com.influxdb.client.WriteOptions;
import com.influxdb.client.domain.WriteConsistency;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.service.WriteService;
import com.influxdb.client.write.Point;
Expand Down Expand Up @@ -269,6 +271,40 @@ public <T extends AbstractWriteEvent> ListenerRegistration listenEvents(@Nonnull
return subscribe::dispose;
}

@Override
public int getPreBatchBufferSize(@Nonnull final String bucket,
@Nonnull final String org,
@Nonnull final WritePrecision precision) {
Arguments.checkNonEmpty(bucket, "bucket");
Arguments.checkNonEmpty(org, "org");
Arguments.checkNotNull(precision, "WritePrecision");

return getPreBatchBufferSize(new WriteParameters(bucket, org, precision));
}

@Override
public int getPreBatchBufferSize(@Nonnull final String bucket,
@Nonnull final String org,
@Nonnull final WritePrecision precision,
@Nullable final WriteConsistency consistency) {
Arguments.checkNonEmpty(bucket, "bucket");
Arguments.checkNonEmpty(org, "org");
Arguments.checkNotNull(precision, "WritePrecision");

return getPreBatchBufferSize(new WriteParameters(bucket, org, precision, consistency));
}

@Override
public int getPreBatchBufferSize(@Nonnull final WriteParameters params) {
return super.getPreBatchBufferSize(params);
}

@Override
@Nonnull
public Map<WriteParameters, Integer> getPreBatchBufferSizes() {
return super.getPreBatchBufferSizes();
}

@Override
public void close() {
super.close();
Expand Down
Loading