diff --git a/java/NEXT_CHANGELOG.md b/java/NEXT_CHANGELOG.md index b570c30..3f0d502 100644 --- a/java/NEXT_CHANGELOG.md +++ b/java/NEXT_CHANGELOG.md @@ -6,6 +6,14 @@ ### New Features and Improvements +- **Arrow Flight Support (Experimental)**: Added support for ingesting Apache Arrow `VectorSchemaRoot` batches via Arrow Flight protocol + - **Note**: Arrow Flight is not yet supported by default from the Zerobus server side. + - New `ZerobusArrowStream` class with `ingestBatch()`, `waitForOffset()`, `flush()`, `close()`, `getUnackedBatches()` methods + - New `ArrowStreamConfigurationOptions` for configuring Arrow streams (max inflight batches, recovery, timeouts) + - New `createArrowStream()` and `recreateArrowStream()` methods on `ZerobusSdk` + - Accepts `VectorSchemaRoot` directly via `ingestBatch()` (IPC serialization handled internally) + - Arrow is opt-in: add `arrow-vector` and `arrow-memory-netty` as dependencies (provided scope, `>= 15.0.0`) + ### Bug Fixes - Fixed proto generation tool to skip reserved field numbers 19000-19999 for tables with more than 19000 columns @@ -14,9 +22,20 @@ ### Internal Changes +- Added `arrow-vector` 17.0.0 as provided dependency for Arrow Flight support +- Added `arrow-memory-netty` 17.0.0 as test dependency for integration tests +- Uses existing JNI Arrow Flight bindings from Rust SDK (`nativeCreateArrowStream`, `nativeIngestBatch`, etc.) + ### Breaking Changes ### Deprecations ### API Changes +- Added `createArrowStream(String tableName, Schema schema, String clientId, String clientSecret)` to `ZerobusSdk` +- Added `createArrowStream(String tableName, Schema schema, String clientId, String clientSecret, ArrowStreamConfigurationOptions options)` to `ZerobusSdk` +- Added `recreateArrowStream(ZerobusArrowStream closedStream)` to `ZerobusSdk` +- Added `ZerobusArrowStream` class with methods: `ingestBatch()`, `waitForOffset()`, `flush()`, `close()`, `getUnackedBatches()`, `isClosed()`, `getTableName()`, `getOptions()` +- Added `ArrowStreamConfigurationOptions` class with fields: `maxInflightBatches`, `recovery`, `recoveryTimeoutMs`, `recoveryBackoffMs`, `recoveryRetries`, `serverLackOfAckTimeoutMs`, `flushTimeoutMs`, `connectionTimeoutMs` +- Added optional dependency: `org.apache.arrow:arrow-vector >= 15.0.0` (provided scope) + diff --git a/java/examples/arrow/ArrowIngestionExample.java b/java/examples/arrow/ArrowIngestionExample.java new file mode 100644 index 0000000..c039506 --- /dev/null +++ b/java/examples/arrow/ArrowIngestionExample.java @@ -0,0 +1,169 @@ +package com.databricks.zerobus.examples.arrow; + +import com.databricks.zerobus.*; +import java.util.Arrays; +import java.util.List; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.LargeVarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; + +/** + * Arrow Flight ingestion example. + * + *

Demonstrates ingesting columnar data using Apache Arrow record batches via the Arrow Flight + * protocol. This provides high-performance ingestion for large datasets. + * + *

Prerequisites: + * + *

+ * + *

Run with: {@code java -cp com.databricks.zerobus.examples.arrow.ArrowIngestionExample} + */ +public class ArrowIngestionExample { + + public static void main(String[] args) throws Exception { + String serverEndpoint = System.getenv("ZEROBUS_SERVER_ENDPOINT"); + String workspaceUrl = System.getenv("DATABRICKS_WORKSPACE_URL"); + String tableName = System.getenv("ZEROBUS_TABLE_NAME"); + String clientId = System.getenv("DATABRICKS_CLIENT_ID"); + String clientSecret = System.getenv("DATABRICKS_CLIENT_SECRET"); + + if (serverEndpoint == null + || workspaceUrl == null + || tableName == null + || clientId == null + || clientSecret == null) { + System.err.println("Error: Required environment variables not set."); + System.err.println( + "Set: ZEROBUS_SERVER_ENDPOINT, DATABRICKS_WORKSPACE_URL, ZEROBUS_TABLE_NAME,"); + System.err.println(" DATABRICKS_CLIENT_ID, DATABRICKS_CLIENT_SECRET"); + System.exit(1); + } + + System.out.println("=== Arrow Flight Ingestion Example ===\n"); + + // Define the Arrow schema matching the Delta table + Schema schema = + new Schema( + Arrays.asList( + Field.nullable("device_name", ArrowType.LargeUtf8.INSTANCE), + Field.nullable("temp", new ArrowType.Int(32, true)), + Field.nullable("humidity", new ArrowType.Int(64, true)))); + + try (BufferAllocator allocator = new RootAllocator(); + ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl)) { + + // === Single batch ingestion === + System.out.println("--- Single Batch Ingestion ---"); + + ZerobusArrowStream stream = + sdk.createArrowStream(tableName, schema, clientId, clientSecret).join(); + + try { + try (VectorSchemaRoot batch = VectorSchemaRoot.create(schema, allocator)) { + LargeVarCharVector nameVector = (LargeVarCharVector) batch.getVector("device_name"); + IntVector tempVector = (IntVector) batch.getVector("temp"); + BigIntVector humidityVector = (BigIntVector) batch.getVector("humidity"); + + int rowCount = 5; + batch.allocateNew(); + for (int i = 0; i < rowCount; i++) { + nameVector.setSafe(i, ("arrow-device-" + i).getBytes()); + tempVector.setSafe(i, 20 + i); + humidityVector.setSafe(i, 50 + i); + } + batch.setRowCount(rowCount); + + long offset = stream.ingestBatch(batch); + stream.waitForOffset(offset); + System.out.println( + " " + rowCount + " rows ingested and acknowledged (offset: " + offset + ")"); + } + + // === Multiple batch ingestion === + System.out.println("\n--- Multiple Batch Ingestion ---"); + + long lastOffset = -1; + for (int batchNum = 0; batchNum < 3; batchNum++) { + try (VectorSchemaRoot batch = VectorSchemaRoot.create(schema, allocator)) { + LargeVarCharVector nameVector = (LargeVarCharVector) batch.getVector("device_name"); + IntVector tempVector = (IntVector) batch.getVector("temp"); + BigIntVector humidityVector = (BigIntVector) batch.getVector("humidity"); + + int rowCount = 10; + batch.allocateNew(); + for (int i = 0; i < rowCount; i++) { + nameVector.setSafe(i, ("arrow-batch-" + batchNum + "-row-" + i).getBytes()); + tempVector.setSafe(i, 30 + i); + humidityVector.setSafe(i, 60 + i); + } + batch.setRowCount(rowCount); + + lastOffset = stream.ingestBatch(batch); + } + } + stream.flush(); + System.out.println(" 3 batches (30 rows total) ingested and flushed"); + + // === Custom options === + System.out.println("\n--- Custom Options ---"); + + ArrowStreamConfigurationOptions customOptions = + ArrowStreamConfigurationOptions.builder() + .setMaxInflightBatches(2000) + .setFlushTimeoutMs(600000) + .setRecovery(true) + .setRecoveryRetries(5) + .build(); + System.out.println( + " maxInflightBatches: " + customOptions.maxInflightBatches()); + System.out.println(" flushTimeoutMs: " + customOptions.flushTimeoutMs()); + System.out.println(" recoveryRetries: " + customOptions.recoveryRetries()); + + } finally { + stream.close(); + } + + // === Demonstrate getUnackedBatches and recreateArrowStream === + System.out.println("\n--- Unacked Batches (after close) ---"); + + List unackedBatches = stream.getUnackedBatches(); + System.out.println(" Unacked batches: " + unackedBatches.size()); + System.out.println(" (Expected 0 after successful flush/close)"); + + System.out.println("\n--- Recreate Arrow Stream ---"); + + ZerobusArrowStream newStream = sdk.recreateArrowStream(stream).join(); + try { + try (VectorSchemaRoot batch = VectorSchemaRoot.create(schema, allocator)) { + LargeVarCharVector nameVector = (LargeVarCharVector) batch.getVector("device_name"); + IntVector tempVector = (IntVector) batch.getVector("temp"); + BigIntVector humidityVector = (BigIntVector) batch.getVector("humidity"); + + batch.allocateNew(); + nameVector.setSafe(0, "arrow-recreated".getBytes()); + tempVector.setSafe(0, 99); + humidityVector.setSafe(0, 99); + batch.setRowCount(1); + + long offset = newStream.ingestBatch(batch); + newStream.waitForOffset(offset); + System.out.println(" 1 row ingested on recreated stream (offset: " + offset + ")"); + } + } finally { + newStream.close(); + } + + System.out.println("\n=== Arrow Flight Example Complete ==="); + } + } +} diff --git a/java/pom.xml b/java/pom.xml index 1eef956..6f647eb 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -28,6 +28,7 @@ https://github.com/databricks/zerobus-sdk-java/tree/main + 17.0.0 1.8 1.8 UTF-8 @@ -45,6 +46,21 @@ slf4j-api 2.0.17 + + + org.apache.arrow + arrow-vector + ${arrow.version} + provided + + + + org.apache.arrow + arrow-memory-netty + ${arrow.version} + test + org.slf4j diff --git a/java/src/main/java/com/databricks/zerobus/ArrowStreamConfigurationOptions.java b/java/src/main/java/com/databricks/zerobus/ArrowStreamConfigurationOptions.java new file mode 100644 index 0000000..690d5b2 --- /dev/null +++ b/java/src/main/java/com/databricks/zerobus/ArrowStreamConfigurationOptions.java @@ -0,0 +1,289 @@ +package com.databricks.zerobus; + +/** + * Configuration options for Arrow Flight streams. + * + *

This class provides settings to control Arrow Flight stream behavior including performance + * tuning, recovery, and connection management. + * + *

Use the builder pattern to create instances: + * + *

{@code
+ * ArrowStreamConfigurationOptions options = ArrowStreamConfigurationOptions.builder()
+ *     .setMaxInflightBatches(2000)
+ *     .setRecovery(true)
+ *     .setFlushTimeoutMs(600000)
+ *     .build();
+ * }
+ * + * @see ZerobusArrowStream + * @see ZerobusSdk#createArrowStream + */ +public class ArrowStreamConfigurationOptions { + + private int maxInflightBatches = 1000; + private boolean recovery = true; + private long recoveryTimeoutMs = 15000; + private long recoveryBackoffMs = 2000; + private int recoveryRetries = 4; + private long serverLackOfAckTimeoutMs = 60000; + private long flushTimeoutMs = 300000; + private long connectionTimeoutMs = 30000; + + private ArrowStreamConfigurationOptions() {} + + private ArrowStreamConfigurationOptions( + int maxInflightBatches, + boolean recovery, + long recoveryTimeoutMs, + long recoveryBackoffMs, + int recoveryRetries, + long serverLackOfAckTimeoutMs, + long flushTimeoutMs, + long connectionTimeoutMs) { + this.maxInflightBatches = maxInflightBatches; + this.recovery = recovery; + this.recoveryTimeoutMs = recoveryTimeoutMs; + this.recoveryBackoffMs = recoveryBackoffMs; + this.recoveryRetries = recoveryRetries; + this.serverLackOfAckTimeoutMs = serverLackOfAckTimeoutMs; + this.flushTimeoutMs = flushTimeoutMs; + this.connectionTimeoutMs = connectionTimeoutMs; + } + + /** + * Returns the maximum number of batches that can be in flight. + * + *

This controls how many batches the SDK can accept and send to the server before waiting for + * acknowledgments. Higher values improve throughput but use more memory. + * + * @return the maximum number of in-flight batches + */ + public int maxInflightBatches() { + return this.maxInflightBatches; + } + + /** + * Returns whether automatic recovery is enabled. + * + *

When enabled, the SDK will automatically attempt to recover from stream failures by + * reconnecting and resending unacknowledged batches. + * + * @return true if automatic recovery is enabled, false otherwise + */ + public boolean recovery() { + return this.recovery; + } + + /** + * Returns the timeout for recovery operations. + * + * @return the recovery timeout in milliseconds + */ + public long recoveryTimeoutMs() { + return this.recoveryTimeoutMs; + } + + /** + * Returns the backoff delay between recovery attempts. + * + * @return the recovery backoff delay in milliseconds + */ + public long recoveryBackoffMs() { + return this.recoveryBackoffMs; + } + + /** + * Returns the maximum number of recovery attempts. + * + * @return the maximum number of recovery attempts + */ + public int recoveryRetries() { + return this.recoveryRetries; + } + + /** + * Returns the timeout for server acknowledgment. + * + * @return the server acknowledgment timeout in milliseconds + */ + public long serverLackOfAckTimeoutMs() { + return this.serverLackOfAckTimeoutMs; + } + + /** + * Returns the timeout for flush operations. + * + * @return the flush timeout in milliseconds + */ + public long flushTimeoutMs() { + return this.flushTimeoutMs; + } + + /** + * Returns the timeout for establishing a connection. + * + * @return the connection timeout in milliseconds + */ + public long connectionTimeoutMs() { + return this.connectionTimeoutMs; + } + + /** + * Returns the default Arrow stream configuration options. + * + *

Default values: + * + *

+ * + * @return the default Arrow stream configuration options + */ + public static ArrowStreamConfigurationOptions getDefault() { + return new ArrowStreamConfigurationOptions(); + } + + /** + * Returns a new builder for creating ArrowStreamConfigurationOptions. + * + * @return a new builder + */ + public static ArrowStreamConfigurationOptionsBuilder builder() { + return new ArrowStreamConfigurationOptionsBuilder(); + } + + /** + * Builder for creating {@link ArrowStreamConfigurationOptions} instances. + * + *

All parameters have sensible defaults if not specified. + */ + public static class ArrowStreamConfigurationOptionsBuilder { + private final ArrowStreamConfigurationOptions defaults = + ArrowStreamConfigurationOptions.getDefault(); + + private int maxInflightBatches = defaults.maxInflightBatches; + private boolean recovery = defaults.recovery; + private long recoveryTimeoutMs = defaults.recoveryTimeoutMs; + private long recoveryBackoffMs = defaults.recoveryBackoffMs; + private int recoveryRetries = defaults.recoveryRetries; + private long serverLackOfAckTimeoutMs = defaults.serverLackOfAckTimeoutMs; + private long flushTimeoutMs = defaults.flushTimeoutMs; + private long connectionTimeoutMs = defaults.connectionTimeoutMs; + + private ArrowStreamConfigurationOptionsBuilder() {} + + /** + * Sets the maximum number of batches that can be in flight. + * + * @param maxInflightBatches the maximum number of in-flight batches + * @return this builder for method chaining + */ + public ArrowStreamConfigurationOptionsBuilder setMaxInflightBatches(int maxInflightBatches) { + this.maxInflightBatches = maxInflightBatches; + return this; + } + + /** + * Sets whether automatic recovery is enabled. + * + * @param recovery true to enable automatic recovery, false to disable + * @return this builder for method chaining + */ + public ArrowStreamConfigurationOptionsBuilder setRecovery(boolean recovery) { + this.recovery = recovery; + return this; + } + + /** + * Sets the timeout for recovery operations. + * + * @param recoveryTimeoutMs the recovery timeout in milliseconds + * @return this builder for method chaining + */ + public ArrowStreamConfigurationOptionsBuilder setRecoveryTimeoutMs(long recoveryTimeoutMs) { + this.recoveryTimeoutMs = recoveryTimeoutMs; + return this; + } + + /** + * Sets the backoff delay between recovery attempts. + * + * @param recoveryBackoffMs the recovery backoff delay in milliseconds + * @return this builder for method chaining + */ + public ArrowStreamConfigurationOptionsBuilder setRecoveryBackoffMs(long recoveryBackoffMs) { + this.recoveryBackoffMs = recoveryBackoffMs; + return this; + } + + /** + * Sets the maximum number of recovery attempts. + * + * @param recoveryRetries the maximum number of recovery attempts + * @return this builder for method chaining + */ + public ArrowStreamConfigurationOptionsBuilder setRecoveryRetries(int recoveryRetries) { + this.recoveryRetries = recoveryRetries; + return this; + } + + /** + * Sets the timeout for server acknowledgment. + * + * @param serverLackOfAckTimeoutMs the server acknowledgment timeout in milliseconds + * @return this builder for method chaining + */ + public ArrowStreamConfigurationOptionsBuilder setServerLackOfAckTimeoutMs( + long serverLackOfAckTimeoutMs) { + this.serverLackOfAckTimeoutMs = serverLackOfAckTimeoutMs; + return this; + } + + /** + * Sets the timeout for flush operations. + * + * @param flushTimeoutMs the flush timeout in milliseconds + * @return this builder for method chaining + */ + public ArrowStreamConfigurationOptionsBuilder setFlushTimeoutMs(long flushTimeoutMs) { + this.flushTimeoutMs = flushTimeoutMs; + return this; + } + + /** + * Sets the timeout for establishing a connection. + * + * @param connectionTimeoutMs the connection timeout in milliseconds + * @return this builder for method chaining + */ + public ArrowStreamConfigurationOptionsBuilder setConnectionTimeoutMs(long connectionTimeoutMs) { + this.connectionTimeoutMs = connectionTimeoutMs; + return this; + } + + /** + * Builds a new ArrowStreamConfigurationOptions instance. + * + * @return a new ArrowStreamConfigurationOptions with the configured settings + */ + public ArrowStreamConfigurationOptions build() { + return new ArrowStreamConfigurationOptions( + this.maxInflightBatches, + this.recovery, + this.recoveryTimeoutMs, + this.recoveryBackoffMs, + this.recoveryRetries, + this.serverLackOfAckTimeoutMs, + this.flushTimeoutMs, + this.connectionTimeoutMs); + } + } +} diff --git a/java/src/main/java/com/databricks/zerobus/ZerobusArrowStream.java b/java/src/main/java/com/databricks/zerobus/ZerobusArrowStream.java new file mode 100644 index 0000000..5390f27 --- /dev/null +++ b/java/src/main/java/com/databricks/zerobus/ZerobusArrowStream.java @@ -0,0 +1,326 @@ +package com.databricks.zerobus; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.channels.Channels; +import java.util.ArrayList; +import java.util.List; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; +import org.apache.arrow.vector.types.pojo.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Stream for ingesting Apache Arrow record batches into a table via Arrow Flight. + * + *

This class provides high-performance columnar data ingestion using the Apache Arrow Flight + * protocol. Data is sent as Arrow {@link VectorSchemaRoot} batches, which are automatically + * serialized to Arrow IPC format for transmission over the wire. + * + *

Create instances using {@link ZerobusSdk#createArrowStream}: + * + *

{@code
+ * Schema schema = new Schema(Arrays.asList(
+ *     Field.nullable("name", new ArrowType.Utf8()),
+ *     Field.nullable("age", new ArrowType.Int(32, true))
+ * ));
+ *
+ * ZerobusArrowStream stream = sdk.createArrowStream(
+ *     "catalog.schema.table",
+ *     schema,
+ *     clientId,
+ *     clientSecret
+ * ).join();
+ *
+ * // Create and populate a VectorSchemaRoot, then ingest
+ * long offset = stream.ingestBatch(batch);
+ * stream.waitForOffset(offset);
+ * stream.close();
+ * }
+ * + *

Resource Management

+ * + *

This class holds native resources that are not automatically released by the garbage + * collector. You must call {@link #close()} when done to avoid native memory leaks. Use + * try-with-resources for automatic cleanup. + * + *

Thread Safety

+ * + *

This class is not thread-safe. Each stream instance should be used from a single + * thread. + * + *

Dependencies

+ * + *

This class requires Apache Arrow Java libraries on the classpath. Add {@code arrow-vector} and + * a memory allocator implementation (e.g., {@code arrow-memory-netty}) to your project + * dependencies. + * + * @see ZerobusSdk#createArrowStream(String, Schema, String, String) + * @see ArrowStreamConfigurationOptions + */ +public class ZerobusArrowStream implements AutoCloseable { + + private static final Logger logger = LoggerFactory.getLogger(ZerobusArrowStream.class); + + // Ensure native library is loaded. + static { + NativeLoader.ensureLoaded(); + } + + // Native handle to the Rust Arrow stream object. + private volatile long nativeHandle; + + // Stream properties. + private final String tableName; + private final ArrowStreamConfigurationOptions options; + private final byte[] schemaIpc; + + // Credentials stored for stream recreation. + private final String clientId; + private final String clientSecret; + + // Cached unacked batches (populated on close for use in recreateArrowStream). + private volatile List cachedUnackedBatches; + + /** Package-private constructor. Use {@link ZerobusSdk#createArrowStream} to create instances. */ + ZerobusArrowStream( + long nativeHandle, + String tableName, + ArrowStreamConfigurationOptions options, + byte[] schemaIpc, + String clientId, + String clientSecret) { + this.nativeHandle = nativeHandle; + this.tableName = tableName; + this.options = options; + this.schemaIpc = schemaIpc; + this.clientId = clientId; + this.clientSecret = clientSecret; + } + + // ==================== Batch Ingestion ==================== + + /** + * Ingests an Arrow record batch and returns the offset immediately. + * + *

The batch is serialized to Arrow IPC format and sent to the server. The method returns as + * soon as the batch is queued for transmission, without waiting for server acknowledgment. Use + * {@link #waitForOffset(long)} or {@link #flush()} to wait for acknowledgment. + * + *

The batch schema must match the schema used to create this stream. + * + * @param batch the Arrow record batch to ingest + * @return the offset ID assigned to this batch + * @throws ZerobusException if the stream is closed, the schema doesn't match, or an error occurs + */ + public long ingestBatch(VectorSchemaRoot batch) throws ZerobusException { + if (batch == null) { + throw new ZerobusException("Batch must not be null"); + } + ensureOpen(); + byte[] ipcBytes = serializeBatchToIpc(batch); + return nativeIngestBatch(nativeHandle, ipcBytes); + } + + // ==================== Acknowledgment ==================== + + /** + * Waits for a specific offset to be acknowledged by the server. + * + * @param offset the offset to wait for (as returned by {@link #ingestBatch}) + * @throws ZerobusException if an error occurs or the wait times out + */ + public void waitForOffset(long offset) throws ZerobusException { + ensureOpen(); + nativeWaitForOffset(nativeHandle, offset); + } + + /** + * Flushes all pending batches and waits for their acknowledgments. + * + *

Blocks until all batches that were ingested before this call are acknowledged by the server. + * + * @throws ZerobusException if an error occurs or the flush times out + */ + public void flush() throws ZerobusException { + ensureOpen(); + nativeFlush(nativeHandle); + logger.info("All Arrow batches have been flushed"); + } + + // ==================== Lifecycle ==================== + + /** + * Closes the stream, flushing all pending batches first. + * + *

After closing, unacknowledged batches can still be retrieved via {@link + * #getUnackedBatches()} for use in stream recreation. + * + * @throws ZerobusException if an error occurs during close + */ + @Override + public void close() throws ZerobusException { + long handle = nativeHandle; + if (handle != 0) { + // Close the stream first (flushes pending batches) + nativeClose(handle); + + // Cache unacked batches before destroying the handle (for recreateArrowStream) + try { + cachedUnackedBatches = nativeGetUnackedBatches(handle); + } catch (Exception e) { + logger.warn("Failed to cache unacked batches: {}", e.getMessage()); + cachedUnackedBatches = new ArrayList<>(); + } + + // Now destroy the handle + nativeHandle = 0; + nativeDestroy(handle); + logger.info("Arrow stream closed"); + } + } + + /** + * Returns whether the stream is closed. + * + * @return true if the stream is closed + */ + public boolean isClosed() { + return nativeHandle == 0 || nativeIsClosed(nativeHandle); + } + + // ==================== Accessors ==================== + + /** + * Returns the table name for this stream. + * + * @return the fully qualified table name + */ + public String getTableName() { + return tableName; + } + + /** + * Returns the stream configuration options. + * + * @return the Arrow stream configuration options + */ + public ArrowStreamConfigurationOptions getOptions() { + return options; + } + + // ==================== Unacknowledged Batches ==================== + + /** + * Returns the unacknowledged batches as Arrow IPC byte arrays. + * + *

Each element in the returned list is a serialized Arrow IPC stream containing one record + * batch. These can be deserialized using {@code ArrowStreamReader} or re-ingested into a new + * stream via {@link ZerobusSdk#recreateArrowStream}. + * + *

After the stream is closed, this method returns cached data. + * + * @return a list of unacknowledged batches as IPC byte arrays + * @throws ZerobusException if an error occurs + */ + public List getUnackedBatches() throws ZerobusException { + if (nativeHandle == 0) { + return cachedUnackedBatches != null ? cachedUnackedBatches : new ArrayList<>(); + } + return nativeGetUnackedBatches(nativeHandle); + } + + // ==================== Package-Private Accessors ==================== + + /** Returns the IPC-serialized schema used to create this stream. */ + byte[] getSchemaIpc() { + return schemaIpc; + } + + /** Returns the client ID used to create this stream. */ + String getClientId() { + return clientId; + } + + /** Returns the client secret used to create this stream. */ + String getClientSecret() { + return clientSecret; + } + + /** + * Ingests a pre-serialized Arrow IPC batch. Package-private for use by {@link + * ZerobusSdk#recreateArrowStream} during re-ingestion of unacked batches. + */ + long ingestBatchIpc(byte[] ipcBytes) throws ZerobusException { + ensureOpen(); + return nativeIngestBatch(nativeHandle, ipcBytes); + } + + // ==================== Serialization Utilities ==================== + + /** + * Serializes an Arrow schema to IPC stream format. + * + *

Package-private for use by {@link ZerobusSdk#createArrowStream}. + */ + static byte[] serializeSchemaToIpc(Schema schema) throws ZerobusException { + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (BufferAllocator allocator = new RootAllocator(); + VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator); + ArrowStreamWriter writer = new ArrowStreamWriter(root, null, Channels.newChannel(out))) { + writer.start(); + writer.end(); + } + return out.toByteArray(); + } catch (IOException e) { + throw new ZerobusException("Failed to serialize Arrow schema to IPC: " + e.getMessage(), e); + } + } + + private static byte[] serializeBatchToIpc(VectorSchemaRoot batch) throws ZerobusException { + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (ArrowStreamWriter writer = + new ArrowStreamWriter(batch, null, Channels.newChannel(out))) { + writer.start(); + writer.writeBatch(); + writer.end(); + } + return out.toByteArray(); + } catch (IOException e) { + throw new ZerobusException("Failed to serialize Arrow batch to IPC: " + e.getMessage(), e); + } + } + + private void ensureOpen() throws ZerobusException { + if (nativeHandle == 0) { + throw new ZerobusException("Arrow stream is closed"); + } + if (nativeIsClosed(nativeHandle)) { + throw new ZerobusException("Arrow stream is closed"); + } + } + + // ==================== Native methods implemented in Rust ==================== + + private static native void nativeDestroy(long handle); + + private native long nativeIngestBatch(long handle, byte[] batchData); + + private native void nativeWaitForOffset(long handle, long offset); + + private native void nativeFlush(long handle); + + private native void nativeClose(long handle); + + private native boolean nativeIsClosed(long handle); + + private native String nativeGetTableName(long handle); + + @SuppressWarnings("unchecked") + private native List nativeGetUnackedBatches(long handle); +} diff --git a/java/src/main/java/com/databricks/zerobus/ZerobusSdk.java b/java/src/main/java/com/databricks/zerobus/ZerobusSdk.java index d4ad928..39073aa 100644 --- a/java/src/main/java/com/databricks/zerobus/ZerobusSdk.java +++ b/java/src/main/java/com/databricks/zerobus/ZerobusSdk.java @@ -3,6 +3,7 @@ import com.google.protobuf.Message; import java.util.List; import java.util.concurrent.CompletableFuture; +import org.apache.arrow.vector.types.pojo.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,6 +75,9 @@ public class ZerobusSdk implements AutoCloseable { private static final StreamConfigurationOptions DEFAULT_OPTIONS = StreamConfigurationOptions.getDefault(); + private static final ArrowStreamConfigurationOptions DEFAULT_ARROW_OPTIONS = + ArrowStreamConfigurationOptions.getDefault(); + // Native handle to the Rust SDK object. private volatile long nativeHandle; @@ -319,6 +323,72 @@ public CompletableFuture> return this.createStream(tableProperties, clientId, clientSecret, DEFAULT_OPTIONS); } + // ==================== Arrow Stream Creation ==================== + + /** + * Creates a new Arrow Flight stream for ingesting Arrow record batches into a table. + * + * @param tableName The fully qualified table name (catalog.schema.table). + * @param schema The Arrow schema describing the columns of the target table. + * @param clientId The OAuth client ID for authentication. + * @param clientSecret The OAuth client secret for authentication. + * @return A CompletableFuture that completes with the ZerobusArrowStream when the stream is + * ready. + */ + public CompletableFuture createArrowStream( + String tableName, Schema schema, String clientId, String clientSecret) { + return createArrowStream(tableName, schema, clientId, clientSecret, DEFAULT_ARROW_OPTIONS); + } + + /** + * Creates a new Arrow Flight stream for ingesting Arrow record batches into a table with custom + * options. + * + * @param tableName The fully qualified table name (catalog.schema.table). + * @param schema The Arrow schema describing the columns of the target table. + * @param clientId The OAuth client ID for authentication. + * @param clientSecret The OAuth client secret for authentication. + * @param options Configuration options for the Arrow stream. + * @return A CompletableFuture that completes with the ZerobusArrowStream when the stream is + * ready. + */ + public CompletableFuture createArrowStream( + String tableName, + Schema schema, + String clientId, + String clientSecret, + ArrowStreamConfigurationOptions options) { + + ensureOpen(); + + ArrowStreamConfigurationOptions effectiveOptions = + options != null ? options : DEFAULT_ARROW_OPTIONS; + + logger.debug("Creating Arrow stream for table: {}", tableName); + + byte[] schemaIpc; + try { + schemaIpc = ZerobusArrowStream.serializeSchemaToIpc(schema); + } catch (ZerobusException e) { + CompletableFuture failed = new CompletableFuture<>(); + failed.completeExceptionally(e); + return failed; + } + + CompletableFuture handleFuture = + nativeCreateArrowStream( + nativeHandle, tableName, schemaIpc, clientId, clientSecret, effectiveOptions); + + return handleFuture.thenApply( + handle -> { + if (handle == null || handle == 0) { + throw new RuntimeException("Failed to create Arrow stream: null handle returned"); + } + return new ZerobusArrowStream( + handle, tableName, effectiveOptions, schemaIpc, clientId, clientSecret); + }); + } + // ==================== Stream Recreation ==================== /** @@ -458,6 +528,67 @@ public CompletableFuture recreateStream(ZerobusJsonStream clo }); } + /** + * Recreates an Arrow stream from a closed stream, re-ingesting unacknowledged batches. + * + * @param closedStream the closed Arrow stream to recreate + * @return a CompletableFuture that completes with the new stream after unacked batches are + * re-ingested + * @throws IllegalStateException if the original stream is not closed + */ + public CompletableFuture recreateArrowStream( + ZerobusArrowStream closedStream) { + if (!closedStream.isClosed()) { + throw new IllegalStateException("Arrow stream must be closed before recreation"); + } + + ensureOpen(); + + List unackedBatches; + try { + unackedBatches = closedStream.getUnackedBatches(); + } catch (ZerobusException e) { + CompletableFuture failed = new CompletableFuture<>(); + failed.completeExceptionally(e); + return failed; + } + + CompletableFuture handleFuture = + nativeCreateArrowStream( + nativeHandle, + closedStream.getTableName(), + closedStream.getSchemaIpc(), + closedStream.getClientId(), + closedStream.getClientSecret(), + closedStream.getOptions()); + + return handleFuture.thenApply( + handle -> { + if (handle == null || handle == 0) { + throw new RuntimeException("Failed to recreate Arrow stream: null handle returned"); + } + ZerobusArrowStream newStream = + new ZerobusArrowStream( + handle, + closedStream.getTableName(), + closedStream.getOptions(), + closedStream.getSchemaIpc(), + closedStream.getClientId(), + closedStream.getClientSecret()); + + try { + for (byte[] batchIpc : unackedBatches) { + newStream.ingestBatchIpc(batchIpc); + } + newStream.flush(); + } catch (ZerobusException e) { + throw new RuntimeException("Failed to re-ingest unacked Arrow batches", e); + } + + return newStream; + }); + } + /** * Recreates a legacy stream from a closed stream, re-ingesting unacknowledged records. * @@ -561,4 +692,12 @@ private native CompletableFuture nativeCreateStream( String clientSecret, Object options, boolean isJson); + + private native CompletableFuture nativeCreateArrowStream( + long sdkHandle, + String tableName, + byte[] arrowSchema, + String clientId, + String clientSecret, + Object options); } diff --git a/java/src/test/java/com/databricks/zerobus/ArrowIntegrationTest.java b/java/src/test/java/com/databricks/zerobus/ArrowIntegrationTest.java new file mode 100644 index 0000000..fa72d4c --- /dev/null +++ b/java/src/test/java/com/databricks/zerobus/ArrowIntegrationTest.java @@ -0,0 +1,617 @@ +package com.databricks.zerobus; + +import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +import java.util.Arrays; +import java.util.List; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.LargeVarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.jupiter.api.*; + +/** + * Integration tests for Arrow Flight stream operations. + * + *

These tests are skipped unless the following environment variables are set: + * + *

+ * + *

The target table must have columns: device_name (STRING), temp (INT), humidity (BIGINT). + * + *

Run with: {@code mvn test -Dtest=ArrowIntegrationTest} + */ +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class ArrowIntegrationTest { + + private static String serverEndpoint; + private static String workspaceUrl; + private static String tableName; + private static String clientId; + private static String clientSecret; + private static boolean configAvailable; + private static boolean nativeLibraryAvailable; + + private static final Schema SCHEMA = + new Schema( + Arrays.asList( + Field.nullable("device_name", ArrowType.LargeUtf8.INSTANCE), + Field.nullable("temp", new ArrowType.Int(32, true)), + Field.nullable("humidity", new ArrowType.Int(64, true)))); + + @BeforeAll + static void checkPrerequisites() { + serverEndpoint = System.getenv("ZEROBUS_SERVER_ENDPOINT"); + workspaceUrl = System.getenv("DATABRICKS_WORKSPACE_URL"); + tableName = System.getenv("ZEROBUS_TABLE_NAME"); + clientId = System.getenv("DATABRICKS_CLIENT_ID"); + clientSecret = System.getenv("DATABRICKS_CLIENT_SECRET"); + + configAvailable = + serverEndpoint != null + && workspaceUrl != null + && tableName != null + && clientId != null + && clientSecret != null; + + if (!configAvailable) { + System.out.println( + "Arrow integration tests skipped: Required environment variables not set."); + } + + try { + NativeLoader.ensureLoaded(); + nativeLibraryAvailable = true; + } catch (UnsatisfiedLinkError | ExceptionInInitializerError e) { + nativeLibraryAvailable = false; + System.out.println( + "Arrow integration tests skipped: Native library not available - " + e.getMessage()); + } + } + + @BeforeEach + void skipIfPrerequisitesNotMet() { + assumeTrue(nativeLibraryAvailable, "Native library not available"); + assumeTrue(configAvailable, "Configuration not available"); + } + + // =================================================================================== + // Test 1: Arrow stream - single batch ingestion (5 rows) + // =================================================================================== + + @Test + @Order(1) + @DisplayName("Arrow stream - single batch ingestion") + void testArrowSingleBatchIngestion() throws Exception { + ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl); + + try (BufferAllocator allocator = new RootAllocator()) { + ZerobusArrowStream stream = + sdk.createArrowStream(tableName, SCHEMA, clientId, clientSecret).join(); + + try { + try (VectorSchemaRoot batch = VectorSchemaRoot.create(SCHEMA, allocator)) { + LargeVarCharVector nameVector = (LargeVarCharVector) batch.getVector("device_name"); + IntVector tempVector = (IntVector) batch.getVector("temp"); + BigIntVector humidityVector = (BigIntVector) batch.getVector("humidity"); + + int rowCount = 5; + batch.allocateNew(); + for (int i = 0; i < rowCount; i++) { + nameVector.setSafe(i, ("test-arrow-single-" + i).getBytes()); + tempVector.setSafe(i, 20 + i); + humidityVector.setSafe(i, 50 + i); + } + batch.setRowCount(rowCount); + + long offset = stream.ingestBatch(batch); + stream.waitForOffset(offset); + + System.out.println( + "Arrow single batch: " + rowCount + " rows ingested (offset: " + offset + ")"); + } + } finally { + stream.close(); + } + } + + sdk.close(); + } + + // =================================================================================== + // Test 2: Arrow stream - multiple batches with flush (30 rows) + // =================================================================================== + + @Test + @Order(2) + @DisplayName("Arrow stream - multiple batches with flush") + void testArrowMultipleBatchesFlush() throws Exception { + ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl); + + try (BufferAllocator allocator = new RootAllocator()) { + ZerobusArrowStream stream = + sdk.createArrowStream(tableName, SCHEMA, clientId, clientSecret).join(); + + int totalRows = 0; + + try { + for (int batchNum = 0; batchNum < 3; batchNum++) { + try (VectorSchemaRoot batch = VectorSchemaRoot.create(SCHEMA, allocator)) { + LargeVarCharVector nameVector = (LargeVarCharVector) batch.getVector("device_name"); + IntVector tempVector = (IntVector) batch.getVector("temp"); + BigIntVector humidityVector = (BigIntVector) batch.getVector("humidity"); + + int rowCount = 10; + batch.allocateNew(); + for (int i = 0; i < rowCount; i++) { + nameVector.setSafe(i, ("test-arrow-multi-" + batchNum + "-" + i).getBytes()); + tempVector.setSafe(i, 30 + i); + humidityVector.setSafe(i, 60 + i); + } + batch.setRowCount(rowCount); + + stream.ingestBatch(batch); + totalRows += rowCount; + } + } + + stream.flush(); + + System.out.println("Arrow multiple batches: " + totalRows + " rows flushed"); + assertEquals(30, totalRows); + } finally { + stream.close(); + } + } + + sdk.close(); + } + + // =================================================================================== + // Test 3: Arrow stream - custom options (5 rows) + // =================================================================================== + + @Test + @Order(3) + @DisplayName("Arrow stream - custom configuration options") + void testArrowCustomOptions() throws Exception { + ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl); + + ArrowStreamConfigurationOptions options = + ArrowStreamConfigurationOptions.builder() + .setMaxInflightBatches(2000) + .setRecovery(true) + .setRecoveryRetries(5) + .setFlushTimeoutMs(600000) + .setConnectionTimeoutMs(60000) + .build(); + + try (BufferAllocator allocator = new RootAllocator()) { + ZerobusArrowStream stream = + sdk.createArrowStream(tableName, SCHEMA, clientId, clientSecret, options).join(); + + try { + try (VectorSchemaRoot batch = VectorSchemaRoot.create(SCHEMA, allocator)) { + LargeVarCharVector nameVector = (LargeVarCharVector) batch.getVector("device_name"); + IntVector tempVector = (IntVector) batch.getVector("temp"); + BigIntVector humidityVector = (BigIntVector) batch.getVector("humidity"); + + batch.allocateNew(); + for (int i = 0; i < 5; i++) { + nameVector.setSafe(i, ("test-arrow-options-" + i).getBytes()); + tempVector.setSafe(i, 40 + i); + humidityVector.setSafe(i, 70 + i); + } + batch.setRowCount(5); + + long offset = stream.ingestBatch(batch); + stream.waitForOffset(offset); + } + + assertEquals(2000, stream.getOptions().maxInflightBatches()); + assertEquals(600000, stream.getOptions().flushTimeoutMs()); + assertEquals(60000, stream.getOptions().connectionTimeoutMs()); + + System.out.println("Arrow custom options: 5 rows ingested with custom config"); + } finally { + stream.close(); + } + } + + sdk.close(); + } + + // =================================================================================== + // Test 4: Arrow stream - getUnackedBatches after close (5 rows) + // =================================================================================== + + @Test + @Order(4) + @DisplayName("Arrow stream - getUnackedBatches after close") + void testArrowGetUnackedBatches() throws Exception { + ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl); + + try (BufferAllocator allocator = new RootAllocator()) { + ZerobusArrowStream stream = + sdk.createArrowStream(tableName, SCHEMA, clientId, clientSecret).join(); + + try { + try (VectorSchemaRoot batch = VectorSchemaRoot.create(SCHEMA, allocator)) { + LargeVarCharVector nameVector = (LargeVarCharVector) batch.getVector("device_name"); + IntVector tempVector = (IntVector) batch.getVector("temp"); + BigIntVector humidityVector = (BigIntVector) batch.getVector("humidity"); + + batch.allocateNew(); + for (int i = 0; i < 5; i++) { + nameVector.setSafe(i, ("test-arrow-unacked-" + i).getBytes()); + tempVector.setSafe(i, 50 + i); + humidityVector.setSafe(i, 80 + i); + } + batch.setRowCount(5); + + long offset = stream.ingestBatch(batch); + stream.waitForOffset(offset); + } + } finally { + stream.close(); + } + + assertTrue(stream.isClosed(), "Stream should be closed"); + + List unackedBatches = stream.getUnackedBatches(); + assertNotNull(unackedBatches, "getUnackedBatches() should not return null"); + assertTrue(unackedBatches.isEmpty(), "Should be empty after successful flush/close"); + + System.out.println("Arrow unacked test: 5 rows ingested, 0 unacked after close"); + } + + sdk.close(); + } + + // =================================================================================== + // Test 5: Arrow stream - recreateArrowStream (6 rows total) + // =================================================================================== + + @Test + @Order(5) + @DisplayName("Arrow stream - recreateArrowStream") + void testRecreateArrowStream() throws Exception { + ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl); + + try (BufferAllocator allocator = new RootAllocator()) { + // Create and use first stream + ZerobusArrowStream stream = + sdk.createArrowStream(tableName, SCHEMA, clientId, clientSecret).join(); + + try { + try (VectorSchemaRoot batch = VectorSchemaRoot.create(SCHEMA, allocator)) { + LargeVarCharVector nameVector = (LargeVarCharVector) batch.getVector("device_name"); + IntVector tempVector = (IntVector) batch.getVector("temp"); + BigIntVector humidityVector = (BigIntVector) batch.getVector("humidity"); + + batch.allocateNew(); + for (int i = 0; i < 3; i++) { + nameVector.setSafe(i, ("test-arrow-recreate-" + i).getBytes()); + tempVector.setSafe(i, 60 + i); + humidityVector.setSafe(i, 90 + i); + } + batch.setRowCount(3); + + long offset = stream.ingestBatch(batch); + stream.waitForOffset(offset); + } + } finally { + stream.close(); + } + + assertTrue(stream.isClosed()); + List unacked = stream.getUnackedBatches(); + assertTrue(unacked.isEmpty(), "Should have 0 unacked after successful close"); + + // Recreate the stream + ZerobusArrowStream newStream = sdk.recreateArrowStream(stream).join(); + + try { + try (VectorSchemaRoot batch = VectorSchemaRoot.create(SCHEMA, allocator)) { + LargeVarCharVector nameVector = (LargeVarCharVector) batch.getVector("device_name"); + IntVector tempVector = (IntVector) batch.getVector("temp"); + BigIntVector humidityVector = (BigIntVector) batch.getVector("humidity"); + + batch.allocateNew(); + for (int i = 0; i < 3; i++) { + nameVector.setSafe(i, ("test-arrow-recreate-new-" + i).getBytes()); + tempVector.setSafe(i, 70 + i); + humidityVector.setSafe(i, 95 + i); + } + batch.setRowCount(3); + + long offset = newStream.ingestBatch(batch); + newStream.waitForOffset(offset); + } + } finally { + newStream.close(); + } + + System.out.println("Arrow recreate test: 6 rows total (3 + 3 on recreated stream)"); + } + + sdk.close(); + } + + // =================================================================================== + // Test 6: Arrow stream - high throughput (100 batches x 100 rows) + // =================================================================================== + + @Test + @Order(6) + @DisplayName("Arrow stream - high throughput") + void testArrowHighThroughput() throws Exception { + ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl); + + ArrowStreamConfigurationOptions options = + ArrowStreamConfigurationOptions.builder().setMaxInflightBatches(500).build(); + + try (BufferAllocator allocator = new RootAllocator()) { + ZerobusArrowStream stream = + sdk.createArrowStream(tableName, SCHEMA, clientId, clientSecret, options).join(); + + int batchCount = 100; + int rowsPerBatch = 100; + long startTime = System.currentTimeMillis(); + long lastOffset = -1; + + try { + for (int b = 0; b < batchCount; b++) { + try (VectorSchemaRoot batch = VectorSchemaRoot.create(SCHEMA, allocator)) { + LargeVarCharVector nameVector = (LargeVarCharVector) batch.getVector("device_name"); + IntVector tempVector = (IntVector) batch.getVector("temp"); + BigIntVector humidityVector = (BigIntVector) batch.getVector("humidity"); + + batch.allocateNew(); + for (int i = 0; i < rowsPerBatch; i++) { + nameVector.setSafe(i, ("test-arrow-throughput-" + b + "-" + i).getBytes()); + tempVector.setSafe(i, 15 + (i % 20)); + humidityVector.setSafe(i, 40 + (i % 50)); + } + batch.setRowCount(rowsPerBatch); + + lastOffset = stream.ingestBatch(batch); + } + } + + stream.waitForOffset(lastOffset); + + long endTime = System.currentTimeMillis(); + double durationSec = (endTime - startTime) / 1000.0; + int totalRows = batchCount * rowsPerBatch; + double rowsPerSec = totalRows / durationSec; + + System.out.printf( + "Arrow high throughput: %d rows in %.2f sec = %.0f rows/sec%n", + totalRows, durationSec, rowsPerSec); + + assertTrue(rowsPerSec > 1000, "Arrow throughput should be at least 1000 rows/sec"); + } finally { + stream.close(); + } + } + + sdk.close(); + } + + // =================================================================================== + // Test 7: Arrow stream - double close is safe + // =================================================================================== + + @Test + @Order(7) + @DisplayName("Arrow stream - double close is safe") + void testArrowDoubleClose() throws Exception { + try (ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl); + BufferAllocator allocator = new RootAllocator()) { + ZerobusArrowStream stream = + sdk.createArrowStream(tableName, SCHEMA, clientId, clientSecret).join(); + + try (VectorSchemaRoot batch = VectorSchemaRoot.create(SCHEMA, allocator)) { + LargeVarCharVector nameVector = (LargeVarCharVector) batch.getVector("device_name"); + IntVector tempVector = (IntVector) batch.getVector("temp"); + BigIntVector humidityVector = (BigIntVector) batch.getVector("humidity"); + + batch.allocateNew(); + nameVector.setSafe(0, "test-double-close".getBytes()); + tempVector.setSafe(0, 1); + humidityVector.setSafe(0, 1); + batch.setRowCount(1); + + stream.ingestBatch(batch); + } + + stream.close(); + assertTrue(stream.isClosed()); + + // Second close should be a no-op, not throw + stream.close(); + assertTrue(stream.isClosed()); + + System.out.println("Arrow double close: safe"); + } + } + + // =================================================================================== + // Test 8: Arrow stream - isClosed state transitions + // =================================================================================== + + @Test + @Order(8) + @DisplayName("Arrow stream - isClosed state transitions") + void testArrowIsClosedStateTransitions() throws Exception { + try (ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl); + BufferAllocator allocator = new RootAllocator()) { + ZerobusArrowStream stream = + sdk.createArrowStream(tableName, SCHEMA, clientId, clientSecret).join(); + + assertFalse(stream.isClosed(), "Stream should be open after creation"); + + try (VectorSchemaRoot batch = VectorSchemaRoot.create(SCHEMA, allocator)) { + LargeVarCharVector nameVector = (LargeVarCharVector) batch.getVector("device_name"); + IntVector tempVector = (IntVector) batch.getVector("temp"); + BigIntVector humidityVector = (BigIntVector) batch.getVector("humidity"); + + batch.allocateNew(); + nameVector.setSafe(0, "test-state".getBytes()); + tempVector.setSafe(0, 1); + humidityVector.setSafe(0, 1); + batch.setRowCount(1); + + long offset = stream.ingestBatch(batch); + assertFalse(stream.isClosed(), "Stream should be open during ingestion"); + + stream.waitForOffset(offset); + assertFalse(stream.isClosed(), "Stream should be open after ack"); + } + + stream.close(); + assertTrue(stream.isClosed(), "Stream should be closed after close()"); + + System.out.println("Arrow isClosed transitions: correct"); + } + } + + // =================================================================================== + // Test 9: Arrow stream - concurrent streams + // =================================================================================== + + @Test + @Order(9) + @DisplayName("Arrow stream - concurrent streams") + void testArrowConcurrentStreams() throws Exception { + try (ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl); + BufferAllocator allocator = new RootAllocator()) { + ZerobusArrowStream stream1 = + sdk.createArrowStream(tableName, SCHEMA, clientId, clientSecret).join(); + ZerobusArrowStream stream2 = + sdk.createArrowStream(tableName, SCHEMA, clientId, clientSecret).join(); + + try { + try (VectorSchemaRoot batch1 = VectorSchemaRoot.create(SCHEMA, allocator); + VectorSchemaRoot batch2 = VectorSchemaRoot.create(SCHEMA, allocator)) { + fillBatch(batch1, "test-concurrent-1", 1); + fillBatch(batch2, "test-concurrent-2", 1); + + long offset1 = stream1.ingestBatch(batch1); + long offset2 = stream2.ingestBatch(batch2); + + stream1.waitForOffset(offset1); + stream2.waitForOffset(offset2); + } + + System.out.println("Arrow concurrent streams: 2 rows ingested (1 per stream)"); + } finally { + stream1.close(); + stream2.close(); + } + } + } + + // =================================================================================== + // Test 10: Arrow stream - table name accessor + // =================================================================================== + + @Test + @Order(10) + @DisplayName("Arrow stream - table name and options accessors") + void testArrowAccessors() throws Exception { + ArrowStreamConfigurationOptions options = + ArrowStreamConfigurationOptions.builder().setMaxInflightBatches(999).build(); + + try (ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl)) { + ZerobusArrowStream stream = + sdk.createArrowStream(tableName, SCHEMA, clientId, clientSecret, options).join(); + + try { + assertEquals(tableName, stream.getTableName()); + assertEquals(999, stream.getOptions().maxInflightBatches()); + assertTrue(stream.getOptions().recovery()); + + System.out.println("Arrow accessors: tableName=" + stream.getTableName()); + } finally { + stream.close(); + } + } + } + + // =================================================================================== + // Test 11: Arrow stream - ingest after close throws + // =================================================================================== + + @Test + @Order(11) + @DisplayName("Arrow stream - ingest after close throws") + void testArrowIngestAfterCloseThrows() throws Exception { + try (ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl); + BufferAllocator allocator = new RootAllocator()) { + ZerobusArrowStream stream = + sdk.createArrowStream(tableName, SCHEMA, clientId, clientSecret).join(); + stream.close(); + + try (VectorSchemaRoot batch = VectorSchemaRoot.create(SCHEMA, allocator)) { + fillBatch(batch, "test-after-close", 1); + + assertThrows(ZerobusException.class, () -> stream.ingestBatch(batch)); + } + + System.out.println("Arrow ingest after close: correctly throws"); + } + } + + // =================================================================================== + // Test 12: Arrow stream - null batch throws + // =================================================================================== + + @Test + @Order(12) + @DisplayName("Arrow stream - null batch throws") + void testArrowNullBatchThrows() throws Exception { + try (ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl)) { + ZerobusArrowStream stream = + sdk.createArrowStream(tableName, SCHEMA, clientId, clientSecret).join(); + + try { + assertThrows(ZerobusException.class, () -> stream.ingestBatch(null)); + System.out.println("Arrow null batch: correctly throws"); + } finally { + stream.close(); + } + } + } + + // =================================================================================== + // Helper + // =================================================================================== + + private static void fillBatch(VectorSchemaRoot batch, String prefix, int rowCount) { + LargeVarCharVector nameVector = (LargeVarCharVector) batch.getVector("device_name"); + IntVector tempVector = (IntVector) batch.getVector("temp"); + BigIntVector humidityVector = (BigIntVector) batch.getVector("humidity"); + + batch.allocateNew(); + for (int i = 0; i < rowCount; i++) { + nameVector.setSafe(i, (prefix + "-" + i).getBytes()); + tempVector.setSafe(i, 20 + i); + humidityVector.setSafe(i, 50 + i); + } + batch.setRowCount(rowCount); + } +}