Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions java/NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@

### New Features and Improvements

- **Arrow Flight Support (Experimental)**: Added support for ingesting Apache Arrow `VectorSchemaRoot` batches via Arrow Flight protocol
- **Note**: Arrow Flight is not yet supported by default from the Zerobus server side.
- New `ZerobusArrowStream` class with `ingestBatch()`, `waitForOffset()`, `flush()`, `close()`, `getUnackedBatches()` methods
- New `ArrowStreamConfigurationOptions` for configuring Arrow streams (max inflight batches, recovery, timeouts)
- New `createArrowStream()` and `recreateArrowStream()` methods on `ZerobusSdk`
- Accepts `VectorSchemaRoot` directly via `ingestBatch()` (IPC serialization handled internally)
- Arrow is opt-in: add `arrow-vector` and `arrow-memory-netty` as dependencies (provided scope, `>= 15.0.0`)

### Bug Fixes

- Fixed proto generation tool to skip reserved field numbers 19000-19999 for tables with more than 19000 columns
Expand All @@ -14,9 +22,20 @@

### Internal Changes

- Added `arrow-vector` 17.0.0 as provided dependency for Arrow Flight support
- Added `arrow-memory-netty` 17.0.0 as test dependency for integration tests
- Uses existing JNI Arrow Flight bindings from Rust SDK (`nativeCreateArrowStream`, `nativeIngestBatch`, etc.)

### Breaking Changes

### Deprecations

### API Changes

- Added `createArrowStream(String tableName, Schema schema, String clientId, String clientSecret)` to `ZerobusSdk`
- Added `createArrowStream(String tableName, Schema schema, String clientId, String clientSecret, ArrowStreamConfigurationOptions options)` to `ZerobusSdk`
- Added `recreateArrowStream(ZerobusArrowStream closedStream)` to `ZerobusSdk`
- Added `ZerobusArrowStream` class with methods: `ingestBatch()`, `waitForOffset()`, `flush()`, `close()`, `getUnackedBatches()`, `isClosed()`, `getTableName()`, `getOptions()`
- Added `ArrowStreamConfigurationOptions` class with fields: `maxInflightBatches`, `recovery`, `recoveryTimeoutMs`, `recoveryBackoffMs`, `recoveryRetries`, `serverLackOfAckTimeoutMs`, `flushTimeoutMs`, `connectionTimeoutMs`
- Added optional dependency: `org.apache.arrow:arrow-vector >= 15.0.0` (provided scope)

169 changes: 169 additions & 0 deletions java/examples/arrow/ArrowIngestionExample.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package com.databricks.zerobus.examples.arrow;

import com.databricks.zerobus.*;
import java.util.Arrays;
import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.LargeVarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;

/**
* Arrow Flight ingestion example.
*
* <p>Demonstrates ingesting columnar data using Apache Arrow record batches via the Arrow Flight
* protocol. This provides high-performance ingestion for large datasets.
*
* <p>Prerequisites:
*
* <ul>
* <li>A Delta table with columns: device_name (STRING), temp (INT), humidity (BIGINT)
* <li>Apache Arrow Java libraries on the classpath (arrow-vector, arrow-memory-netty)
* </ul>
*
* <p>Run with: {@code java -cp <classpath> com.databricks.zerobus.examples.arrow.ArrowIngestionExample}
*/
public class ArrowIngestionExample {

public static void main(String[] args) throws Exception {
String serverEndpoint = System.getenv("ZEROBUS_SERVER_ENDPOINT");
String workspaceUrl = System.getenv("DATABRICKS_WORKSPACE_URL");
String tableName = System.getenv("ZEROBUS_TABLE_NAME");
String clientId = System.getenv("DATABRICKS_CLIENT_ID");
String clientSecret = System.getenv("DATABRICKS_CLIENT_SECRET");

if (serverEndpoint == null
|| workspaceUrl == null
|| tableName == null
|| clientId == null
|| clientSecret == null) {
System.err.println("Error: Required environment variables not set.");
System.err.println(
"Set: ZEROBUS_SERVER_ENDPOINT, DATABRICKS_WORKSPACE_URL, ZEROBUS_TABLE_NAME,");
System.err.println(" DATABRICKS_CLIENT_ID, DATABRICKS_CLIENT_SECRET");
System.exit(1);
}

System.out.println("=== Arrow Flight Ingestion Example ===\n");

// Define the Arrow schema matching the Delta table
Schema schema =
new Schema(
Arrays.asList(
Field.nullable("device_name", ArrowType.LargeUtf8.INSTANCE),
Field.nullable("temp", new ArrowType.Int(32, true)),
Field.nullable("humidity", new ArrowType.Int(64, true))));

try (BufferAllocator allocator = new RootAllocator();
ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl)) {

// === Single batch ingestion ===
System.out.println("--- Single Batch Ingestion ---");

ZerobusArrowStream stream =
sdk.createArrowStream(tableName, schema, clientId, clientSecret).join();

try {
try (VectorSchemaRoot batch = VectorSchemaRoot.create(schema, allocator)) {
LargeVarCharVector nameVector = (LargeVarCharVector) batch.getVector("device_name");
IntVector tempVector = (IntVector) batch.getVector("temp");
BigIntVector humidityVector = (BigIntVector) batch.getVector("humidity");

int rowCount = 5;
batch.allocateNew();
for (int i = 0; i < rowCount; i++) {
nameVector.setSafe(i, ("arrow-device-" + i).getBytes());
tempVector.setSafe(i, 20 + i);
humidityVector.setSafe(i, 50 + i);
}
batch.setRowCount(rowCount);

long offset = stream.ingestBatch(batch);
stream.waitForOffset(offset);
System.out.println(
" " + rowCount + " rows ingested and acknowledged (offset: " + offset + ")");
}

// === Multiple batch ingestion ===
System.out.println("\n--- Multiple Batch Ingestion ---");

long lastOffset = -1;
for (int batchNum = 0; batchNum < 3; batchNum++) {
try (VectorSchemaRoot batch = VectorSchemaRoot.create(schema, allocator)) {
LargeVarCharVector nameVector = (LargeVarCharVector) batch.getVector("device_name");
IntVector tempVector = (IntVector) batch.getVector("temp");
BigIntVector humidityVector = (BigIntVector) batch.getVector("humidity");

int rowCount = 10;
batch.allocateNew();
for (int i = 0; i < rowCount; i++) {
nameVector.setSafe(i, ("arrow-batch-" + batchNum + "-row-" + i).getBytes());
tempVector.setSafe(i, 30 + i);
humidityVector.setSafe(i, 60 + i);
}
batch.setRowCount(rowCount);

lastOffset = stream.ingestBatch(batch);
}
}
stream.flush();
System.out.println(" 3 batches (30 rows total) ingested and flushed");

// === Custom options ===
System.out.println("\n--- Custom Options ---");

ArrowStreamConfigurationOptions customOptions =
ArrowStreamConfigurationOptions.builder()
.setMaxInflightBatches(2000)
.setFlushTimeoutMs(600000)
.setRecovery(true)
.setRecoveryRetries(5)
.build();
System.out.println(
" maxInflightBatches: " + customOptions.maxInflightBatches());
System.out.println(" flushTimeoutMs: " + customOptions.flushTimeoutMs());
System.out.println(" recoveryRetries: " + customOptions.recoveryRetries());

} finally {
stream.close();
}

// === Demonstrate getUnackedBatches and recreateArrowStream ===
System.out.println("\n--- Unacked Batches (after close) ---");

List<byte[]> unackedBatches = stream.getUnackedBatches();
System.out.println(" Unacked batches: " + unackedBatches.size());
System.out.println(" (Expected 0 after successful flush/close)");

System.out.println("\n--- Recreate Arrow Stream ---");

ZerobusArrowStream newStream = sdk.recreateArrowStream(stream).join();
try {
try (VectorSchemaRoot batch = VectorSchemaRoot.create(schema, allocator)) {
LargeVarCharVector nameVector = (LargeVarCharVector) batch.getVector("device_name");
IntVector tempVector = (IntVector) batch.getVector("temp");
BigIntVector humidityVector = (BigIntVector) batch.getVector("humidity");

batch.allocateNew();
nameVector.setSafe(0, "arrow-recreated".getBytes());
tempVector.setSafe(0, 99);
humidityVector.setSafe(0, 99);
batch.setRowCount(1);

long offset = newStream.ingestBatch(batch);
newStream.waitForOffset(offset);
System.out.println(" 1 row ingested on recreated stream (offset: " + offset + ")");
}
} finally {
newStream.close();
}

System.out.println("\n=== Arrow Flight Example Complete ===");
}
}
}
16 changes: 16 additions & 0 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
<url>https://github.com/databricks/zerobus-sdk-java/tree/main</url>
</scm>
<properties>
<arrow.version>17.0.0</arrow.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand All @@ -45,6 +46,21 @@
<artifactId>slf4j-api</artifactId>
<version>2.0.17</version>
</dependency>
<!-- Apache Arrow (provided - users must supply arrow-vector >= 15.0.0 and a memory
allocator such as arrow-memory-netty). Only needed for Arrow Flight ingestion. -->
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${arrow.version}</version>
<scope>provided</scope>
</dependency>
<!-- Apache Arrow memory allocator (for tests only) -->
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<version>${arrow.version}</version>
<scope>test</scope>
</dependency>
<!-- SLF4J Simple Implementation (for tests only) -->
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Loading
Loading