diff --git a/go/Makefile b/go/Makefile index 799fd07..0558b86 100644 --- a/go/Makefile +++ b/go/Makefile @@ -81,6 +81,7 @@ fmt-go: cd examples/json/batch && go fmt ./... cd examples/proto/single && go fmt ./... cd examples/proto/batch && go fmt ./... + cd examples/arrow && go fmt ./... fmt-rust: @echo "Formatting Rust code..." @@ -95,6 +96,7 @@ lint-go: cd examples/json/batch && go vet ./... cd examples/proto/single && go vet ./... cd examples/proto/batch && go vet ./... + cd examples/arrow && go vet ./... lint-rust: @echo "Linting Rust code..." @@ -120,6 +122,7 @@ examples: build cd examples/json/batch && go build -o json-batch main.go cd examples/proto/single && go build -o proto-single main.go cd examples/proto/batch && go build -o proto-batch main.go + cd examples/arrow && go build -o arrow main.go @echo "✓ Examples built successfully" release: diff --git a/go/NEXT_CHANGELOG.md b/go/NEXT_CHANGELOG.md index 892b6b6..d964b8f 100644 --- a/go/NEXT_CHANGELOG.md +++ b/go/NEXT_CHANGELOG.md @@ -4,17 +4,18 @@ ### New Features and Improvements +**[Experimental] Arrow Flight Ingestion**: Added experimental Arrow Flight support for high-throughput Apache Arrow RecordBatch ingestion -### Deprecations +- New `CreateArrowStream` and `CreateArrowStreamWithHeadersProvider` methods on `ZerobusSdk` +- New `ZerobusArrowStream` type with `IngestBatch`, `WaitForOffset`, `Flush`, `Close`, and `GetUnackedBatches` methods +- Configurable IPC compression via `ArrowStreamConfigurationOptions.IpcCompression` (supports `LZ4Frame` and `Zstd`) +### Deprecations ### Bug Fixes ### Documentation - ### Internal Changes - ### API Changes - diff --git a/go/README.md b/go/README.md index 6f5d7af..12e5c20 100644 --- a/go/README.md +++ b/go/README.md @@ -33,6 +33,7 @@ We are keen to hear feedback from you on this SDK. Please [file issues](https:// - [Configuration Options](#configuration-options) - [Error Handling](#error-handling) - [Examples](#examples) +- [Arrow Flight Ingestion (Experimental)](#arrow-flight-ingestion-experimental) - [Tests](#tests) - [Performance Benchmarks](#performance-benchmarks) - [Best Practices](#best-practices) @@ -64,6 +65,7 @@ This SDK wraps the [Rust SDK](https://github.com/databricks/zerobus-sdk/tree/mai - **Configurable timeouts and retry policies** - **Immediate offset returns** for ingested records - **Graceful stream management** - Proper flushing and resource cleanup +- **[Experimental] Arrow Flight ingestion** - High-throughput Apache Arrow RecordBatch ingestion via the Arrow Flight protocol ## Getting Started Choose your installation path: @@ -748,6 +750,9 @@ The `examples/` directory contains complete, runnable examples organized by form - **`examples/proto/single/`** - Single record ingestion with protobuf - **`examples/proto/batch/`** - Batch ingestion with protobuf +**Arrow Flight Examples (Experimental):** +- **`examples/arrow/`** - Arrow RecordBatch ingestion via Arrow Flight protocol + **To run an example:** ```bash @@ -767,6 +772,14 @@ go run main.go Each example includes detailed comments and demonstrates best practices for production use. See [`examples/README.md`](examples/README.md) for complete setup instructions, prerequisites, and detailed comparisons between examples. +## Arrow Flight Ingestion (Experimental) + +> **Experimental/Unsupported**: Arrow Flight ingestion is experimental and not yet supported for production use. The API may change in future releases. + +The SDK supports high-throughput ingestion of Apache Arrow RecordBatches via the Arrow Flight protocol. This is useful for pipelines that already work with columnar Arrow data. + +See [`examples/arrow/`](examples/arrow/) for a complete working example and setup instructions. + ## Tests Tests are located in the repository and can be run using: @@ -1247,6 +1260,77 @@ func (e *ZerobusError) Retryable() bool Returns `true` if the error can be automatically recovered by the SDK. +### `ZerobusArrowStream` (Experimental) + +Represents an active Arrow Flight ingestion stream. + +**Methods:** + +```go +func (s *ZerobusArrowStream) IngestBatch(ipcBytes []byte) (int64, error) +``` + +Ingests an Arrow RecordBatch serialized as Arrow IPC stream bytes. Returns the logical offset. + +```go +func (s *ZerobusArrowStream) WaitForOffset(offset int64) error +``` + +Blocks until the server acknowledges the batch at the given offset. + +```go +func (s *ZerobusArrowStream) Flush() error +``` + +Waits for all pending batches to be acknowledged. + +```go +func (s *ZerobusArrowStream) Close() error +``` + +Flushes and closes the stream. + +```go +func (s *ZerobusArrowStream) GetUnackedBatches() ([][]byte, error) +``` + +Returns unacknowledged batches as Arrow IPC bytes. Call only after stream failure. + +### `ArrowStreamConfigurationOptions` (Experimental) + +```go +type ArrowStreamConfigurationOptions struct { + MaxInflightBatches uint64 + Recovery bool + RecoveryTimeoutMs uint64 + RecoveryBackoffMs uint64 + RecoveryRetries uint32 + IpcCompression IpcCompressionType // None, LZ4Frame, or Zstd +} +``` + +Use `DefaultArrowStreamConfigurationOptions()` to get sensible defaults. + +**`ZerobusSdk` Arrow methods:** + +```go +func (s *ZerobusSdk) CreateArrowStream( + tableName string, + schemaIpcBytes []byte, + clientID, clientSecret string, + options *ArrowStreamConfigurationOptions, +) (*ZerobusArrowStream, error) +``` + +```go +func (s *ZerobusSdk) CreateArrowStreamWithHeadersProvider( + tableName string, + schemaIpcBytes []byte, + headersProvider HeadersProvider, + options *ArrowStreamConfigurationOptions, +) (*ZerobusArrowStream, error) +``` + ## Building from Source This section is for contributors and those who need to build the SDK from source. If you just want to use the SDK, see [Installation](#installation) instead. diff --git a/go/arrow_ffi.go b/go/arrow_ffi.go new file mode 100644 index 0000000..bee6531 --- /dev/null +++ b/go/arrow_ffi.go @@ -0,0 +1,396 @@ +package zerobus + +/* +#cgo linux,amd64 LDFLAGS: ${SRCDIR}/lib/linux_amd64/libzerobus_ffi.a -ldl -lpthread -lm -lresolv -lgcc_s +#cgo linux,arm64 LDFLAGS: ${SRCDIR}/lib/linux_arm64/libzerobus_ffi.a -ldl -lpthread -lm -lresolv -lgcc_s +#cgo darwin,amd64 LDFLAGS: ${SRCDIR}/lib/darwin_amd64/libzerobus_ffi.a -framework CoreFoundation -framework Security -liconv +#cgo darwin,arm64 LDFLAGS: ${SRCDIR}/lib/darwin_arm64/libzerobus_ffi.a -framework CoreFoundation -framework Security -liconv +#cgo windows,amd64 LDFLAGS: ${SRCDIR}/lib/windows_amd64/libzerobus_ffi.a -lws2_32 -luserenv -lbcrypt -lntdll +#cgo CFLAGS: -I${SRCDIR}/../rust/ffi -Wno-implicit-function-declaration + +#include +#include +#include +#include + +// Opaque SDK handle (forward declaration matches ffi.go's translation unit). +typedef struct CZerobusSdk { uint8_t _private[0]; } CZerobusSdk; + +// Result type (identical layout to CResult in ffi.go). +typedef struct CResult { + bool success; + char *error_message; + bool is_retryable; +} CResult; + +// Header types used by the headers-provider callback (same as ffi.go). +typedef struct CHeader { + char *key; + char *value; +} CHeader; + +typedef struct CHeaders { + CHeader *headers; + uintptr_t count; + char *error_message; +} CHeaders; + +typedef CHeaders (*HeadersProviderCallback)(void *user_data); + +// Arrow stream opaque handle. +typedef struct CArrowStream { uint8_t _private[0]; } CArrowStream; + +// Arrow stream configuration options. +// ipc_compression: -1 = None, 0 = LZ4_FRAME, 1 = ZSTD +typedef struct CArrowStreamConfigurationOptions { + uintptr_t max_inflight_batches; + bool recovery; + uint64_t recovery_timeout_ms; + uint64_t recovery_backoff_ms; + uint32_t recovery_retries; + uint64_t server_lack_of_ack_timeout_ms; + uint64_t flush_timeout_ms; + uint64_t connection_timeout_ms; + int32_t ipc_compression; +} CArrowStreamConfigurationOptions; + +// Array of Arrow IPC-encoded batches returned by get_unacked_batches. +typedef struct CArrowBatchArray { + uint8_t **batches; + uintptr_t *lengths; + uintptr_t count; +} CArrowBatchArray; + +// Forward declare the Go-exported headers callback (defined in ffi.go). +extern void goGetHeaders(void *userData, CHeader **headers, uintptr_t *count, char **error); + +// C callback wrapper for arrow streams — calls the same Go function as the +// regular stream callback. We define it here as a static so it exists in this +// translation unit without conflicting with ffi.go's cHeadersCallback. +static CHeaders cArrowHeadersCallback(void *userData) { + CHeader *headers = NULL; + uintptr_t count = 0; + char *error = NULL; + goGetHeaders(userData, &headers, &count, &error); + CHeaders result; + result.headers = headers; + result.count = count; + result.error_message = error; + return result; +} + +static HeadersProviderCallback getArrowHeadersCallback() { + return (HeadersProviderCallback)cArrowHeadersCallback; +} + +// Arrow FFI function declarations. +extern CArrowStream *zerobus_sdk_create_arrow_stream( + CZerobusSdk *sdk, + const char *table_name, + const uint8_t *schema_ipc_bytes, + uintptr_t schema_ipc_len, + const char *client_id, + const char *client_secret, + const CArrowStreamConfigurationOptions *options, + CResult *result); + +extern CArrowStream *zerobus_sdk_create_arrow_stream_with_headers_provider( + CZerobusSdk *sdk, + const char *table_name, + const uint8_t *schema_ipc_bytes, + uintptr_t schema_ipc_len, + HeadersProviderCallback headers_callback, + void *user_data, + const CArrowStreamConfigurationOptions *options, + CResult *result); + +extern void zerobus_arrow_stream_free(CArrowStream *stream); +extern int64_t zerobus_arrow_stream_ingest_batch(CArrowStream *stream, const uint8_t *ipc_bytes, uintptr_t ipc_len, CResult *result); +extern bool zerobus_arrow_stream_wait_for_offset(CArrowStream *stream, int64_t offset, CResult *result); +extern bool zerobus_arrow_stream_flush(CArrowStream *stream, CResult *result); +extern bool zerobus_arrow_stream_close(CArrowStream *stream, CResult *result); +extern CArrowBatchArray zerobus_arrow_stream_get_unacked_batches(CArrowStream *stream, CResult *result); +extern void zerobus_arrow_free_batch_array(CArrowBatchArray array); +extern bool zerobus_arrow_stream_is_closed(CArrowStream *stream); +extern CArrowStreamConfigurationOptions zerobus_arrow_get_default_config(void); + +// Shared cleanup helper (from ffi.go's translation unit). +extern void zerobus_free_error_message(char *message); +*/ +import "C" +import ( + "runtime" + "runtime/cgo" + "sync" + "unsafe" +) + +// arrowStreamHandleRegistry holds cgo.Handles for arrow streams created with a +// custom headers provider so the Go object stays alive for the stream lifetime. +var ( + arrowStreamHandleRegistry = make(map[unsafe.Pointer]cgo.Handle) + arrowStreamHandleRegistryMu sync.Mutex +) + +// arrowFfiResult converts a C.CResult (from this translation unit) to a Go error. +func arrowFfiResult(cres C.CResult) error { + if cres.success { + return nil + } + var message string + if cres.error_message != nil { + message = C.GoString(cres.error_message) + C.zerobus_free_error_message(cres.error_message) + } else { + message = "unknown error" + } + return &ZerobusError{ + Message: message, + IsRetryable: bool(cres.is_retryable), + } +} + +// convertArrowConfigToC converts Go arrow config to the C representation. +// Zero-valued fields fall back to defaults. +func convertArrowConfigToC(opts *ArrowStreamConfigurationOptions) C.CArrowStreamConfigurationOptions { + if opts == nil { + return C.zerobus_arrow_get_default_config() + } + d := DefaultArrowStreamConfigurationOptions() + + maxInflight := opts.MaxInflightBatches + if maxInflight == 0 { + maxInflight = d.MaxInflightBatches + } + recovery := opts.Recovery + if opts.RecoveryTimeoutMs == 0 && opts.RecoveryBackoffMs == 0 && opts.RecoveryRetries == 0 { + recovery = d.Recovery + } + recoveryTimeout := opts.RecoveryTimeoutMs + if recoveryTimeout == 0 { + recoveryTimeout = d.RecoveryTimeoutMs + } + recoveryBackoff := opts.RecoveryBackoffMs + if recoveryBackoff == 0 { + recoveryBackoff = d.RecoveryBackoffMs + } + recoveryRetries := opts.RecoveryRetries + if recoveryRetries == 0 { + recoveryRetries = d.RecoveryRetries + } + serverAckTimeout := opts.ServerLackOfAckTimeoutMs + if serverAckTimeout == 0 { + serverAckTimeout = d.ServerLackOfAckTimeoutMs + } + flushTimeout := opts.FlushTimeoutMs + if flushTimeout == 0 { + flushTimeout = d.FlushTimeoutMs + } + connTimeout := opts.ConnectionTimeoutMs + if connTimeout == 0 { + connTimeout = d.ConnectionTimeoutMs + } + ipcCompression := opts.IPCCompression + if ipcCompression == 0 && opts.IPCCompression == 0 { + ipcCompression = d.IPCCompression + } + + return C.CArrowStreamConfigurationOptions{ + max_inflight_batches: C.uintptr_t(maxInflight), + recovery: C.bool(recovery), + recovery_timeout_ms: C.uint64_t(recoveryTimeout), + recovery_backoff_ms: C.uint64_t(recoveryBackoff), + recovery_retries: C.uint32_t(recoveryRetries), + server_lack_of_ack_timeout_ms: C.uint64_t(serverAckTimeout), + flush_timeout_ms: C.uint64_t(flushTimeout), + connection_timeout_ms: C.uint64_t(connTimeout), + ipc_compression: C.int32_t(ipcCompression), + } +} + +// sdkCreateArrowStream creates an Arrow Flight stream with OAuth authentication. +func sdkCreateArrowStream( + sdkPtr unsafe.Pointer, + tableName string, + schemaIpcBytes []byte, + clientID string, + clientSecret string, + options *ArrowStreamConfigurationOptions, +) (unsafe.Pointer, error) { + cTableName := C.CString(tableName) + defer C.free(unsafe.Pointer(cTableName)) + cClientID := C.CString(clientID) + defer C.free(unsafe.Pointer(cClientID)) + cClientSecret := C.CString(clientSecret) + defer C.free(unsafe.Pointer(cClientSecret)) + + var pinner runtime.Pinner + defer pinner.Unpin() + + cSchema := (*C.uint8_t)(unsafe.SliceData(schemaIpcBytes)) + pinner.Pin(cSchema) + + cOpts := convertArrowConfigToC(options) + var cres C.CResult + ptr := C.zerobus_sdk_create_arrow_stream( + (*C.CZerobusSdk)(sdkPtr), + cTableName, + cSchema, + C.uintptr_t(len(schemaIpcBytes)), + cClientID, + cClientSecret, + &cOpts, + &cres, + ) + if ptr == nil { + return nil, arrowFfiResult(cres) + } + return unsafe.Pointer(ptr), nil +} + +// sdkCreateArrowStreamWithHeadersProvider creates an Arrow Flight stream with a custom headers provider. +func sdkCreateArrowStreamWithHeadersProvider( + sdkPtr unsafe.Pointer, + tableName string, + schemaIpcBytes []byte, + headersProvider HeadersProvider, + options *ArrowStreamConfigurationOptions, +) (unsafe.Pointer, error) { + cTableName := C.CString(tableName) + defer C.free(unsafe.Pointer(cTableName)) + + var pinner runtime.Pinner + defer pinner.Unpin() + + cSchema := (*C.uint8_t)(unsafe.SliceData(schemaIpcBytes)) + pinner.Pin(cSchema) + + handle := cgo.NewHandle(headersProvider) + handlePtr := *(*unsafe.Pointer)(unsafe.Pointer(&handle)) + + cOpts := convertArrowConfigToC(options) + var cres C.CResult + ptr := C.zerobus_sdk_create_arrow_stream_with_headers_provider( + (*C.CZerobusSdk)(sdkPtr), + cTableName, + cSchema, + C.uintptr_t(len(schemaIpcBytes)), + C.getArrowHeadersCallback(), + handlePtr, + &cOpts, + &cres, + ) + if ptr == nil { + handle.Delete() + return nil, arrowFfiResult(cres) + } + + arrowStreamHandleRegistryMu.Lock() + arrowStreamHandleRegistry[unsafe.Pointer(ptr)] = handle + arrowStreamHandleRegistryMu.Unlock() + + return unsafe.Pointer(ptr), nil +} + +// arrowStreamFree frees an Arrow Flight stream and its associated handle. +func arrowStreamFree(ptr unsafe.Pointer) { + if ptr == nil { + return + } + arrowStreamHandleRegistryMu.Lock() + if handle, ok := arrowStreamHandleRegistry[ptr]; ok { + handle.Delete() + delete(arrowStreamHandleRegistry, ptr) + } + arrowStreamHandleRegistryMu.Unlock() + C.zerobus_arrow_stream_free((*C.CArrowStream)(ptr)) +} + +// arrowStreamIngestBatch sends one Arrow IPC-encoded batch to the stream. +// Returns the logical offset assigned to this batch. +func arrowStreamIngestBatch(streamPtr unsafe.Pointer, ipcBytes []byte) (int64, error) { + if len(ipcBytes) == 0 { + return -1, &ZerobusError{Message: "empty IPC bytes", IsRetryable: false} + } + + var pinner runtime.Pinner + defer pinner.Unpin() + cBytes := (*C.uint8_t)(unsafe.SliceData(ipcBytes)) + pinner.Pin(cBytes) + + var cres C.CResult + offset := C.zerobus_arrow_stream_ingest_batch( + (*C.CArrowStream)(streamPtr), + cBytes, + C.uintptr_t(len(ipcBytes)), + &cres, + ) + if offset < 0 { + return -1, arrowFfiResult(cres) + } + return int64(offset), nil +} + +// arrowStreamWaitForOffset blocks until the server acknowledges the given offset. +func arrowStreamWaitForOffset(streamPtr unsafe.Pointer, offset int64) error { + var cres C.CResult + ok := C.zerobus_arrow_stream_wait_for_offset( + (*C.CArrowStream)(streamPtr), + C.int64_t(offset), + &cres, + ) + if !ok { + return arrowFfiResult(cres) + } + return nil +} + +// arrowStreamFlush flushes all pending batches and waits for acknowledgment. +func arrowStreamFlush(streamPtr unsafe.Pointer) error { + var cres C.CResult + ok := C.zerobus_arrow_stream_flush((*C.CArrowStream)(streamPtr), &cres) + if !ok { + return arrowFfiResult(cres) + } + return nil +} + +// arrowStreamClose gracefully closes the stream. +func arrowStreamClose(streamPtr unsafe.Pointer) error { + var cres C.CResult + ok := C.zerobus_arrow_stream_close((*C.CArrowStream)(streamPtr), &cres) + if !ok { + return arrowFfiResult(cres) + } + return nil +} + +// arrowStreamGetUnackedBatches retrieves unacknowledged batches as Arrow IPC bytes. +// Each []byte is a self-contained IPC stream (schema + one record batch). +func arrowStreamGetUnackedBatches(streamPtr unsafe.Pointer) ([][]byte, error) { + var cres C.CResult + cArray := C.zerobus_arrow_stream_get_unacked_batches((*C.CArrowStream)(streamPtr), &cres) + + if cArray.count == 0 { + if err := arrowFfiResult(cres); err != nil { + return nil, err + } + return [][]byte{}, nil + } + + batchPtrs := unsafe.Slice(cArray.batches, cArray.count) + batchLens := unsafe.Slice(cArray.lengths, cArray.count) + + result := make([][]byte, cArray.count) + for i := range result { + result[i] = C.GoBytes(unsafe.Pointer(batchPtrs[i]), C.int(batchLens[i])) + } + + C.zerobus_arrow_free_batch_array(cArray) + return result, nil +} + +// arrowStreamIsClosed returns true if the stream has been closed or failed. +func arrowStreamIsClosed(streamPtr unsafe.Pointer) bool { + return bool(C.zerobus_arrow_stream_is_closed((*C.CArrowStream)(streamPtr))) +} diff --git a/go/arrow_stream.go b/go/arrow_stream.go new file mode 100644 index 0000000..39a0512 --- /dev/null +++ b/go/arrow_stream.go @@ -0,0 +1,182 @@ +package zerobus + +// Experimental/Unsupported: Arrow Flight ingestion is experimental and not yet +// supported for production use. The API may change in future releases. +// +// Arrow Flight support lets you ingest Apache Arrow RecordBatches directly into +// Databricks Delta tables using the high-performance Arrow Flight protocol. +// +// See examples/arrow/ for a complete working example. + +import ( + "runtime" + "unsafe" +) + +// Arrow IPC compression options for ArrowStreamConfigurationOptions.IPCCompression. +const ( + // IPCCompressionNone disables Arrow IPC compression (default). + IPCCompressionNone int32 = -1 + // IPCCompressionLZ4Frame enables LZ4 frame compression. + IPCCompressionLZ4Frame int32 = 0 + // IPCCompressionZstd enables Zstandard compression. + IPCCompressionZstd int32 = 1 +) + +// ArrowStreamConfigurationOptions holds configuration for an Arrow Flight stream. +type ArrowStreamConfigurationOptions struct { + // Maximum number of batches in-flight (pending acknowledgment). Default: 1,000 + MaxInflightBatches uint64 + // Enable automatic stream recovery on retryable failures. Default: true + Recovery bool + // Timeout per recovery attempt in milliseconds. Default: 15,000 + RecoveryTimeoutMs uint64 + // Backoff between recovery attempts in milliseconds. Default: 2,000 + RecoveryBackoffMs uint64 + // Maximum recovery retry attempts. Default: 4 + RecoveryRetries uint32 + // Server acknowledgment timeout in milliseconds. Default: 60,000 + ServerLackOfAckTimeoutMs uint64 + // Flush timeout in milliseconds. Default: 300,000 + FlushTimeoutMs uint64 + // Connection establishment timeout in milliseconds. Default: 30,000 + ConnectionTimeoutMs uint64 + // Arrow IPC compression codec. Default: IPCCompressionNone + IPCCompression int32 +} + +// DefaultArrowStreamConfigurationOptions returns default configuration for Arrow streams. +func DefaultArrowStreamConfigurationOptions() *ArrowStreamConfigurationOptions { + return &ArrowStreamConfigurationOptions{ + MaxInflightBatches: 1_000, + Recovery: true, + RecoveryTimeoutMs: 15_000, + RecoveryBackoffMs: 2_000, + RecoveryRetries: 4, + ServerLackOfAckTimeoutMs: 60_000, + FlushTimeoutMs: 300_000, + ConnectionTimeoutMs: 30_000, + IPCCompression: IPCCompressionNone, + } +} + +// ZerobusArrowStream is an active Arrow Flight stream for ingesting Arrow RecordBatches. +// Batches are supplied as Arrow IPC stream bytes produced by the Apache Arrow Go library's ipc.Writer. +type ZerobusArrowStream struct { + ptr unsafe.Pointer +} + +// CreateArrowStream creates an Arrow Flight stream authenticated with OAuth client credentials. +// +// schemaIpcBytes must be Arrow IPC stream bytes containing only the schema (no data batches). +// Produce these with ipc.NewWriter followed by Close without writing any batches. +func (s *ZerobusSdk) CreateArrowStream( + tableName string, + schemaIpcBytes []byte, + clientID string, + clientSecret string, + options *ArrowStreamConfigurationOptions, +) (*ZerobusArrowStream, error) { + if s.ptr == nil { + return nil, &ZerobusError{Message: "SDK has been freed", IsRetryable: false} + } + if len(schemaIpcBytes) == 0 { + return nil, &ZerobusError{Message: "schemaIpcBytes must not be empty", IsRetryable: false} + } + + ptr, err := sdkCreateArrowStream(s.ptr, tableName, schemaIpcBytes, clientID, clientSecret, options) + if err != nil { + return nil, err + } + + stream := &ZerobusArrowStream{ptr: ptr} + runtime.SetFinalizer(stream, func(st *ZerobusArrowStream) { + st.Close() //nolint:errcheck + }) + return stream, nil +} + +// CreateArrowStreamWithHeadersProvider creates an Arrow Flight stream with a custom +// headers provider for authentication. +func (s *ZerobusSdk) CreateArrowStreamWithHeadersProvider( + tableName string, + schemaIpcBytes []byte, + headersProvider HeadersProvider, + options *ArrowStreamConfigurationOptions, +) (*ZerobusArrowStream, error) { + if s.ptr == nil { + return nil, &ZerobusError{Message: "SDK has been freed", IsRetryable: false} + } + if len(schemaIpcBytes) == 0 { + return nil, &ZerobusError{Message: "schemaIpcBytes must not be empty", IsRetryable: false} + } + + ptr, err := sdkCreateArrowStreamWithHeadersProvider(s.ptr, tableName, schemaIpcBytes, headersProvider, options) + if err != nil { + return nil, err + } + + stream := &ZerobusArrowStream{ptr: ptr} + runtime.SetFinalizer(stream, func(st *ZerobusArrowStream) { + st.Close() //nolint:errcheck + }) + return stream, nil +} + +// IngestBatch queues one Arrow RecordBatch for ingestion and returns its logical offset. +// ipcBytes must be Arrow IPC stream bytes containing exactly one RecordBatch. +// Use WaitForOffset or Flush to wait for server acknowledgment. +func (st *ZerobusArrowStream) IngestBatch(ipcBytes []byte) (int64, error) { + if st.ptr == nil { + return -1, &ZerobusError{Message: "Arrow stream has been closed", IsRetryable: false} + } + return arrowStreamIngestBatch(st.ptr, ipcBytes) +} + +// WaitForOffset blocks until the server acknowledges the batch at the given offset. +func (st *ZerobusArrowStream) WaitForOffset(offset int64) error { + if st.ptr == nil { + return &ZerobusError{Message: "Arrow stream has been closed", IsRetryable: false} + } + return arrowStreamWaitForOffset(st.ptr, offset) +} + +// Flush waits until all pending batches have been acknowledged by the server. +func (st *ZerobusArrowStream) Flush() error { + if st.ptr == nil { + return &ZerobusError{Message: "Arrow stream has been closed", IsRetryable: false} + } + return arrowStreamFlush(st.ptr) +} + +// Close gracefully shuts down the stream after flushing all pending batches. +// The stream must not be used after Close returns. Close is idempotent. +func (st *ZerobusArrowStream) Close() error { + if st.ptr == nil { + return nil + } + ptr := st.ptr + st.ptr = nil + + err := arrowStreamClose(ptr) + arrowStreamFree(ptr) + return err +} + +// GetUnackedBatches returns all unacknowledged batches as Arrow IPC bytes. +// Each []byte is a self-contained IPC stream (schema + one RecordBatch) that can +// be re-ingested into a new stream. Only call after the stream has closed or failed. +func (st *ZerobusArrowStream) GetUnackedBatches() ([][]byte, error) { + if st.ptr == nil { + return nil, &ZerobusError{Message: "Arrow stream has been closed", IsRetryable: false} + } + return arrowStreamGetUnackedBatches(st.ptr) +} + +// IsClosed reports whether the stream has been closed or failed. +func (st *ZerobusArrowStream) IsClosed() bool { + if st.ptr == nil { + return true + } + return arrowStreamIsClosed(st.ptr) +} diff --git a/go/examples/README.md b/go/examples/README.md index 23d59d6..bf2d7f4 100644 --- a/go/examples/README.md +++ b/go/examples/README.md @@ -12,6 +12,7 @@ Examples are organized by data format and ingestion pattern: | JSON Batch | JSON | Batch | `examples/json/batch/main.go` | | Proto Single | Protocol Buffers | Single-record | `examples/proto/single/main.go` | | Proto Batch | Protocol Buffers | Batch | `examples/proto/batch/main.go` | +| Arrow Flight | Arrow IPC | Batch (RecordBatch) | `examples/arrow/main.go` | ## Prerequisites @@ -141,6 +142,28 @@ for i := 0; i < 5; i++ { batchOffset, err := stream.IngestRecordsOffset(records) ``` +## Running Arrow Flight Examples (Experimental) + +> **Experimental/Unsupported**: Arrow Flight ingestion is experimental and not yet supported for production use. The API may change in future releases. + +The Arrow Flight example demonstrates ingestion of Apache Arrow RecordBatches. The schema defined in the example must match the target Delta table's column names and types. + +### Prerequisites + +In addition to the general prerequisites above, install the Arrow Go library: + +```bash +go get github.com/apache/arrow-go/v18/arrow +go get github.com/apache/arrow-go/v18/arrow/array +go get github.com/apache/arrow-go/v18/arrow/ipc +go get github.com/apache/arrow-go/v18/arrow/memory +``` + +```bash +cd examples/arrow +go run main.go +``` + ## Additional Resources - [SDK Documentation](../README.md) diff --git a/go/examples/arrow/go.mod b/go/examples/arrow/go.mod new file mode 100644 index 0000000..da77819 --- /dev/null +++ b/go/examples/arrow/go.mod @@ -0,0 +1,26 @@ +module zerobus-examples/arrow + +go 1.24.0 + +require ( + github.com/apache/arrow-go/v18 v18.0.0 + github.com/databricks/zerobus-sdk/go v0.1.0 +) + +require ( + github.com/goccy/go-json v0.10.3 // indirect + github.com/google/flatbuffers v24.3.25+incompatible // indirect + github.com/klauspost/compress v1.17.11 // indirect + github.com/klauspost/cpuid/v2 v2.2.8 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect + github.com/zeebo/xxh3 v1.0.2 // indirect + golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect + golang.org/x/mod v0.21.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.26.0 // indirect + golang.org/x/tools v0.26.0 // indirect + golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect +) + +// Use local zerobus module +replace github.com/databricks/zerobus-sdk/go => ../.. diff --git a/go/examples/arrow/go.sum b/go/examples/arrow/go.sum new file mode 100644 index 0000000..aaede1d --- /dev/null +++ b/go/examples/arrow/go.sum @@ -0,0 +1,53 @@ +github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= +github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= +github.com/apache/arrow-go/v18 v18.0.0 h1:1dBDaSbH3LtulTyOVYaBCHO3yVRwjV+TZaqn3g6V7ZM= +github.com/apache/arrow-go/v18 v18.0.0/go.mod h1:t6+cWRSmKgdQ6HsxisQjok+jBpKGhRDiqcf3p0p/F+A= +github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE= +github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= +github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/flatbuffers v24.3.25+incompatible h1:CX395cjN9Kke9mmalRoL3d81AtFUxJM+yDthflgJGkI= +github.com/google/flatbuffers v24.3.25+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= +github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= +github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY= +golang.org/x/mod v0.21.0 h1:vvrHzRwRfVKSiLrG+d4FMl/Qi4ukBCE6kZlTUkDYRT0= +golang.org/x/mod v0.21.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/tools v0.26.0 h1:v/60pFQmzmT9ExmjDv2gGIfi3OqfKoEP6I5+umXlbnQ= +golang.org/x/tools v0.26.0/go.mod h1:TPVVj70c7JJ3WCazhD8OdXcZg/og+b9+tH/KxylGwH0= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= +gonum.org/v1/gonum v0.15.1 h1:FNy7N6OUZVUaWG9pTiD+jlhdQ3lMP+/LcTpJ6+a8sQ0= +gonum.org/v1/gonum v0.15.1/go.mod h1:eZTZuRFrzu5pcyjN5wJhcIhnUdNijYxX1T2IcrOGY0o= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/go/examples/arrow/main.go b/go/examples/arrow/main.go new file mode 100644 index 0000000..3221711 --- /dev/null +++ b/go/examples/arrow/main.go @@ -0,0 +1,189 @@ +// **Experimental/Unsupported**: Arrow Flight ingestion is experimental and not yet +// supported for production use. The API may change in future releases. +// +// This example shows how to ingest Apache Arrow RecordBatches into a Databricks +// Delta table using the high-performance Arrow Flight protocol. +// +// # Required environment variables +// +// ZEROBUS_SERVER_ENDPOINT – Arrow Flight endpoint (e.g. "https://host:443") +// DATABRICKS_WORKSPACE_URL – Unity Catalog workspace URL +// DATABRICKS_CLIENT_ID – OAuth 2.0 client ID +// DATABRICKS_CLIENT_SECRET – OAuth 2.0 client secret +// ZEROBUS_TABLE_NAME – Fully-qualified table name (catalog.schema.table) +// +// # Run +// +// go run main.go +package main + +import ( + "bytes" + "log" + "os" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/ipc" + "github.com/apache/arrow-go/v18/arrow/memory" + zerobus "github.com/databricks/zerobus-sdk/go" +) + +func main() { + // ── 1. Read configuration from the environment ──────────────────────────── + endpoint := os.Getenv("ZEROBUS_SERVER_ENDPOINT") + ucURL := os.Getenv("DATABRICKS_WORKSPACE_URL") + clientID := os.Getenv("DATABRICKS_CLIENT_ID") + clientSecret := os.Getenv("DATABRICKS_CLIENT_SECRET") + tableName := os.Getenv("ZEROBUS_TABLE_NAME") + + if endpoint == "" || ucURL == "" || clientID == "" || clientSecret == "" || tableName == "" { + log.Fatal("Missing required environment variables: ZEROBUS_SERVER_ENDPOINT, " + + "DATABRICKS_WORKSPACE_URL, DATABRICKS_CLIENT_ID, DATABRICKS_CLIENT_SECRET, " + + "ZEROBUS_TABLE_NAME") + } + + // ── 2. Create the SDK instance ──────────────────────────────────────────── + sdk, err := zerobus.NewZerobusSdk(endpoint, ucURL) + if err != nil { + log.Fatalf("Failed to create SDK: %v", err) + } + defer sdk.Free() + + // ── 3. Define the Arrow schema ──────────────────────────────────────────── + // + // The schema must match the target Delta table's column names and types. + // Adjust the fields below to match your table. + schema := arrow.NewSchema([]arrow.Field{ + {Name: "device_name", Type: arrow.BinaryTypes.String, Nullable: false}, + {Name: "temperature", Type: arrow.PrimitiveTypes.Float64, Nullable: true}, + {Name: "humidity", Type: arrow.PrimitiveTypes.Float64, Nullable: true}, + }, nil) + + // ── 4. Serialize the schema to Arrow IPC bytes ──────────────────────────── + // + // Use ipc.NewWriter with no written batches; the resulting bytes contain + // only the schema message and are passed to CreateArrowStream. + schemaIPC := serializeSchema(schema) + + // ── 5. Create the Arrow Flight stream ───────────────────────────────────── + opts := zerobus.DefaultArrowStreamConfigurationOptions() + + stream, err := sdk.CreateArrowStream(tableName, schemaIPC, clientID, clientSecret, opts) + if err != nil { + log.Fatalf("Failed to create Arrow stream: %v", err) + } + defer stream.Close() + + log.Printf("Arrow Flight stream opened for table %s", tableName) + + // ── 6. Build and ingest RecordBatches ───────────────────────────────────── + // + // Build 3 batches of sensor readings and ingest them. + alloc := memory.DefaultAllocator + + batches := [][]sensorReading{ + {{device: "sensor-001", temp: 22.5, humid: 60.1}, {device: "sensor-002", temp: 21.0, humid: 58.3}}, + {{device: "sensor-001", temp: 23.1, humid: 61.0}, {device: "sensor-003", temp: 19.8, humid: 55.7}}, + {{device: "sensor-002", temp: 22.8, humid: 59.5}}, + } + + var offsets []int64 + for i, readings := range batches { + ipcBytes := serializeBatch(schema, alloc, readings) + + offset, err := stream.IngestBatch(ipcBytes) + if err != nil { + // Check whether the error is retryable. + if zbErr, ok := err.(*zerobus.ZerobusError); ok && zbErr.Retryable() { + log.Printf("Batch %d ingestion failed (retryable): %v", i, err) + } else { + log.Fatalf("Batch %d ingestion failed (non-retryable): %v", i, err) + } + continue + } + + log.Printf("Batch %d (%d rows) queued at offset %d", i, len(readings), offset) + offsets = append(offsets, offset) + } + + // ── 7. Wait for server acknowledgments ─────────────────────────────────── + // + // WaitForOffset blocks until the server confirms that the batch at the + // given offset is durably stored. Call Flush() to wait for all pending + // batches at once. + log.Println("Waiting for acknowledgments...") + for _, offset := range offsets { + if err := stream.WaitForOffset(offset); err != nil { + log.Fatalf("WaitForOffset(%d) failed: %v", offset, err) + } + log.Printf("Offset %d acknowledged", offset) + } + + // ── 8. Flush and close ──────────────────────────────────────────────────── + if err := stream.Flush(); err != nil { + log.Fatalf("Flush failed: %v", err) + } + if err := stream.Close(); err != nil { + log.Fatalf("Close failed: %v", err) + } + + log.Println("All batches ingested and acknowledged successfully!") +} + +// serializeSchema serialises an Arrow schema to Arrow IPC stream bytes (no batches). +// Pass the returned bytes to CreateArrowStream / CreateArrowStreamWithHeadersProvider. +func serializeSchema(schema *arrow.Schema) []byte { + var buf bytes.Buffer + w := ipc.NewWriter(&buf, ipc.WithSchema(schema)) + if err := w.Close(); err != nil { + log.Fatalf("Failed to serialize Arrow schema: %v", err) + } + return buf.Bytes() +} + +// sensorReading holds one row of sensor data for the example. +type sensorReading struct { + device string + temp float64 + humid float64 +} + +// serializeBatch builds one Arrow RecordBatch from a slice of readings and serialises +// it to Arrow IPC stream bytes (schema + one batch). +// Pass the returned bytes to ZerobusArrowStream.IngestBatch. +func serializeBatch(schema *arrow.Schema, alloc memory.Allocator, readings []sensorReading) []byte { + deviceBldr := array.NewStringBuilder(alloc) + tempBldr := array.NewFloat64Builder(alloc) + humidBldr := array.NewFloat64Builder(alloc) + defer deviceBldr.Release() + defer tempBldr.Release() + defer humidBldr.Release() + + for _, r := range readings { + deviceBldr.Append(r.device) + tempBldr.Append(r.temp) + humidBldr.Append(r.humid) + } + + deviceArr := deviceBldr.NewArray() + tempArr := tempBldr.NewArray() + humidArr := humidBldr.NewArray() + defer deviceArr.Release() + defer tempArr.Release() + defer humidArr.Release() + + rec := array.NewRecord(schema, []arrow.Array{deviceArr, tempArr, humidArr}, int64(len(readings))) + defer rec.Release() + + var buf bytes.Buffer + w := ipc.NewWriter(&buf, ipc.WithSchema(schema)) + if err := w.Write(rec); err != nil { + log.Fatalf("Failed to write RecordBatch: %v", err) + } + if err := w.Close(); err != nil { + log.Fatalf("Failed to close IPC writer: %v", err) + } + + return buf.Bytes() +} diff --git a/go/tests/arrow_integration_test.go b/go/tests/arrow_integration_test.go new file mode 100644 index 0000000..e785971 --- /dev/null +++ b/go/tests/arrow_integration_test.go @@ -0,0 +1,410 @@ +package tests + +import ( + "bytes" + "testing" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/ipc" + "github.com/apache/arrow-go/v18/arrow/memory" + zerobus "github.com/databricks/zerobus-sdk/go" +) + +const arrowTestTable = "test_catalog.test_schema.arrow_table" + +// testArrowSchema returns a minimal single-column schema used by all arrow tests. +func testArrowSchema() *arrow.Schema { + return arrow.NewSchema([]arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int32, Nullable: false}, + }, nil) +} + +// makeSchemaIPC serialises an Arrow schema to Arrow IPC stream bytes (no batches). +func makeSchemaIPC(schema *arrow.Schema) []byte { + var buf bytes.Buffer + w := ipc.NewWriter(&buf, ipc.WithSchema(schema)) + w.Close() + return buf.Bytes() +} + +// makeBatchIPC serialises one RecordBatch (ids column) to Arrow IPC stream bytes. +func makeBatchIPC(schema *arrow.Schema, ids []int32) []byte { + bldr := array.NewInt32Builder(memory.DefaultAllocator) + bldr.AppendValues(ids, nil) + col := bldr.NewArray() + defer col.Release() + + rec := array.NewRecord(schema, []arrow.Array{col}, int64(len(ids))) + defer rec.Release() + + var buf bytes.Buffer + w := ipc.NewWriter(&buf, ipc.WithSchema(schema)) + w.Write(rec) //nolint:errcheck + w.Close() + return buf.Bytes() +} + +// arrowOpts returns default options with Recovery disabled and short timeouts. +func arrowOpts() *zerobus.ArrowStreamConfigurationOptions { + opts := zerobus.DefaultArrowStreamConfigurationOptions() + opts.Recovery = false + opts.FlushTimeoutMs = 15_000 + opts.ConnectionTimeoutMs = 10_000 + return opts +} + +// TestArrowStreamCreation verifies that a stream can be created against the mock server. +func TestArrowStreamCreation(t *testing.T) { + _, serverURL, stop, err := StartMockArrowServer() + if err != nil { + t.Fatalf("Failed to start Arrow mock server: %v", err) + } + defer stop() + + sdk, err := zerobus.NewZerobusSdk(serverURL, "https://mock-uc.com") + if err != nil { + t.Fatalf("Failed to create SDK: %v", err) + } + defer sdk.Free() + + schema := testArrowSchema() + stream, err := sdk.CreateArrowStreamWithHeadersProvider( + arrowTestTable, + makeSchemaIPC(schema), + &TestHeadersProvider{}, + arrowOpts(), + ) + if err != nil { + t.Fatalf("Failed to create Arrow stream: %v", err) + } + defer stream.Close() + + if stream.IsClosed() { + t.Fatal("Stream should be open immediately after creation") + } + + t.Log("Arrow stream created successfully") +} + +// TestArrowStreamIngestAndFlush verifies that batches are ingested and Flush waits for acks. +func TestArrowStreamIngestAndFlush(t *testing.T) { + mockServer, serverURL, stop, err := StartMockArrowServer() + if err != nil { + t.Fatalf("Failed to start Arrow mock server: %v", err) + } + defer stop() + + sdk, err := zerobus.NewZerobusSdk(serverURL, "https://mock-uc.com") + if err != nil { + t.Fatalf("Failed to create SDK: %v", err) + } + defer sdk.Free() + + schema := testArrowSchema() + stream, err := sdk.CreateArrowStreamWithHeadersProvider( + arrowTestTable, + makeSchemaIPC(schema), + &TestHeadersProvider{}, + arrowOpts(), + ) + if err != nil { + t.Fatalf("Failed to create Arrow stream: %v", err) + } + defer stream.Close() + + batch := makeBatchIPC(schema, []int32{1, 2, 3}) + mockServer.ConfigureRowsForOffset(0, 3) + mockServer.ConfigureRowsForOffset(1, 3) + + offset0, err := stream.IngestBatch(batch) + if err != nil { + t.Fatalf("Failed to ingest batch 0: %v", err) + } + if offset0 != 0 { + t.Fatalf("Expected offset 0, got %d", offset0) + } + + offset1, err := stream.IngestBatch(batch) + if err != nil { + t.Fatalf("Failed to ingest batch 1: %v", err) + } + if offset1 != 1 { + t.Fatalf("Expected offset 1, got %d", offset1) + } + + if err := stream.Flush(); err != nil { + t.Fatalf("Flush failed: %v", err) + } + + if n := mockServer.GetBatchesReceived(); n != 2 { + t.Errorf("Expected 2 batches received, got %d", n) + } + if max := mockServer.GetMaxOffsetSeen(); max != 1 { + t.Errorf("Expected max offset 1, got %d", max) + } + + t.Logf("Ingested 2 batches at offsets %d and %d", offset0, offset1) +} + +// TestArrowStreamWaitForOffset verifies WaitForOffset blocks until the server acks the batch. +func TestArrowStreamWaitForOffset(t *testing.T) { + mockServer, serverURL, stop, err := StartMockArrowServer() + if err != nil { + t.Fatalf("Failed to start Arrow mock server: %v", err) + } + defer stop() + + sdk, err := zerobus.NewZerobusSdk(serverURL, "https://mock-uc.com") + if err != nil { + t.Fatalf("Failed to create SDK: %v", err) + } + defer sdk.Free() + + schema := testArrowSchema() + stream, err := sdk.CreateArrowStreamWithHeadersProvider( + arrowTestTable, + makeSchemaIPC(schema), + &TestHeadersProvider{}, + arrowOpts(), + ) + if err != nil { + t.Fatalf("Failed to create Arrow stream: %v", err) + } + defer stream.Close() + + mockServer.ConfigureRowsForOffset(0, 5) + batch := makeBatchIPC(schema, []int32{10, 20, 30, 40, 50}) + + offset, err := stream.IngestBatch(batch) + if err != nil { + t.Fatalf("IngestBatch failed: %v", err) + } + + if err := stream.WaitForOffset(offset); err != nil { + t.Fatalf("WaitForOffset(%d) failed: %v", offset, err) + } + + t.Logf("Batch at offset %d acknowledged", offset) +} + +// TestArrowStreamClose verifies that Close succeeds and marks the stream closed. +func TestArrowStreamClose(t *testing.T) { + _, serverURL, stop, err := StartMockArrowServer() + if err != nil { + t.Fatalf("Failed to start Arrow mock server: %v", err) + } + defer stop() + + sdk, err := zerobus.NewZerobusSdk(serverURL, "https://mock-uc.com") + if err != nil { + t.Fatalf("Failed to create SDK: %v", err) + } + defer sdk.Free() + + schema := testArrowSchema() + stream, err := sdk.CreateArrowStreamWithHeadersProvider( + arrowTestTable, + makeSchemaIPC(schema), + &TestHeadersProvider{}, + arrowOpts(), + ) + if err != nil { + t.Fatalf("Failed to create Arrow stream: %v", err) + } + + if err := stream.Close(); err != nil { + t.Fatalf("Close failed: %v", err) + } + + if !stream.IsClosed() { + t.Fatal("Expected stream to be closed after Close()") + } + + t.Log("Stream closed successfully") +} + +// TestArrowStreamIdempotentClose verifies that calling Close twice does not error. +func TestArrowStreamIdempotentClose(t *testing.T) { + _, serverURL, stop, err := StartMockArrowServer() + if err != nil { + t.Fatalf("Failed to start Arrow mock server: %v", err) + } + defer stop() + + sdk, err := zerobus.NewZerobusSdk(serverURL, "https://mock-uc.com") + if err != nil { + t.Fatalf("Failed to create SDK: %v", err) + } + defer sdk.Free() + + schema := testArrowSchema() + stream, err := sdk.CreateArrowStreamWithHeadersProvider( + arrowTestTable, + makeSchemaIPC(schema), + &TestHeadersProvider{}, + arrowOpts(), + ) + if err != nil { + t.Fatalf("Failed to create Arrow stream: %v", err) + } + + if err := stream.Close(); err != nil { + t.Fatalf("First Close failed: %v", err) + } + if err := stream.Close(); err != nil { + t.Fatalf("Second Close failed: %v", err) + } + + t.Log("Idempotent close works correctly") +} + +// TestArrowStreamIngestAfterClose verifies that IngestBatch returns an error after Close. +func TestArrowStreamIngestAfterClose(t *testing.T) { + _, serverURL, stop, err := StartMockArrowServer() + if err != nil { + t.Fatalf("Failed to start Arrow mock server: %v", err) + } + defer stop() + + sdk, err := zerobus.NewZerobusSdk(serverURL, "https://mock-uc.com") + if err != nil { + t.Fatalf("Failed to create SDK: %v", err) + } + defer sdk.Free() + + schema := testArrowSchema() + stream, err := sdk.CreateArrowStreamWithHeadersProvider( + arrowTestTable, + makeSchemaIPC(schema), + &TestHeadersProvider{}, + arrowOpts(), + ) + if err != nil { + t.Fatalf("Failed to create Arrow stream: %v", err) + } + + if err := stream.Close(); err != nil { + t.Fatalf("Close failed: %v", err) + } + + _, err = stream.IngestBatch(makeBatchIPC(schema, []int32{1})) + if err == nil { + t.Fatal("Expected IngestBatch after Close to fail, but it succeeded") + } + + t.Log("IngestBatch correctly fails after Close") +} + +// TestArrowStreamGetUnackedBatches verifies that unacked batches can be retrieved. +func TestArrowStreamGetUnackedBatches(t *testing.T) { + _, serverURL, stop, err := StartMockArrowServer() + if err != nil { + t.Fatalf("Failed to start Arrow mock server: %v", err) + } + defer stop() + + sdk, err := zerobus.NewZerobusSdk(serverURL, "https://mock-uc.com") + if err != nil { + t.Fatalf("Failed to create SDK: %v", err) + } + defer sdk.Free() + + schema := testArrowSchema() + + // Use a very short flush timeout so Close succeeds quickly. + opts := zerobus.DefaultArrowStreamConfigurationOptions() + opts.Recovery = false + opts.ConnectionTimeoutMs = 5_000 + opts.FlushTimeoutMs = 500 + + stream, err := sdk.CreateArrowStreamWithHeadersProvider( + arrowTestTable, + makeSchemaIPC(schema), + &TestHeadersProvider{}, + opts, + ) + if err != nil { + t.Fatalf("Failed to create Arrow stream: %v", err) + } + + // Close the stream without ingesting anything; unacked list should be empty. + // (Close flushes first; with no pending batches this should succeed quickly.) + if err := stream.Close(); err != nil { + // Some implementations may report an error here; only fail on unexpected errors. + t.Logf("Close returned (possibly expected): %v", err) + } + + // After close, GetUnackedBatches should return an error because the stream is closed. + _, err = stream.GetUnackedBatches() + if err == nil { + t.Fatal("Expected GetUnackedBatches to fail on closed stream, but it succeeded") + } + + t.Log("GetUnackedBatches correctly fails on closed stream") +} + +// TestArrowStreamMultipleBatches verifies ingestion of several batches with different sizes. +func TestArrowStreamMultipleBatches(t *testing.T) { + mockServer, serverURL, stop, err := StartMockArrowServer() + if err != nil { + t.Fatalf("Failed to start Arrow mock server: %v", err) + } + defer stop() + + // Give server a moment to be fully ready. + time.Sleep(200 * time.Millisecond) + + sdk, err := zerobus.NewZerobusSdk(serverURL, "https://mock-uc.com") + if err != nil { + t.Fatalf("Failed to create SDK: %v", err) + } + defer sdk.Free() + + schema := testArrowSchema() + stream, err := sdk.CreateArrowStreamWithHeadersProvider( + arrowTestTable, + makeSchemaIPC(schema), + &TestHeadersProvider{}, + arrowOpts(), + ) + if err != nil { + t.Fatalf("Failed to create Arrow stream: %v", err) + } + defer stream.Close() + + batches := [][]int32{ + {1}, + {2, 3}, + {4, 5, 6}, + {7, 8, 9, 10}, + } + + for i, ids := range batches { + mockServer.ConfigureRowsForOffset(int64(i), uint64(len(ids))) + } + + var offsets []int64 + for i, ids := range batches { + offset, err := stream.IngestBatch(makeBatchIPC(schema, ids)) + if err != nil { + t.Fatalf("IngestBatch %d failed: %v", i, err) + } + if offset != int64(i) { + t.Fatalf("Expected offset %d, got %d", i, offset) + } + offsets = append(offsets, offset) + t.Logf("Batch %d (%d rows) queued at offset %d", i, len(ids), offset) + } + + if err := stream.Flush(); err != nil { + t.Fatalf("Flush failed: %v", err) + } + + if n := mockServer.GetBatchesReceived(); n != len(batches) { + t.Errorf("Expected %d batches, got %d", len(batches), n) + } + + t.Logf("All %d batches successfully ingested and acknowledged", len(batches)) +} diff --git a/go/tests/arrow_mock_server.go b/go/tests/arrow_mock_server.go new file mode 100644 index 0000000..2e83d8d --- /dev/null +++ b/go/tests/arrow_mock_server.go @@ -0,0 +1,162 @@ +package tests + +import ( + "encoding/json" + "fmt" + "io" + "sync" + "time" + + "github.com/apache/arrow-go/v18/arrow/flight" +) + +// arrowBatchMeta matches FlightBatchMetadata JSON in the Rust SDK. +type arrowBatchMeta struct { + OffsetID int64 `json:"offset_id"` +} + +// arrowAckMeta matches FlightAckMetadata JSON in the Rust SDK. +type arrowAckMeta struct { + AckUpToOffset int64 `json:"ack_up_to_offset"` + AckUpToRecords uint64 `json:"ack_up_to_records"` +} + +// MockArrowFlightServer is a minimal Arrow Flight DoPut server for testing. +// It sends a stream-ready ack after the schema message, then acks each data +// batch using a configurable per-offset row count. +type MockArrowFlightServer struct { + flight.BaseFlightServer + + mu sync.Mutex + rowsPerOffset map[int64]uint64 // configured rows for each logical offset + defaultRows uint64 // fallback when offset not configured + batchesReceived int + maxOffsetSeen int64 +} + +// NewMockArrowFlightServer creates a server with defaultRows=1. +func NewMockArrowFlightServer() *MockArrowFlightServer { + return &MockArrowFlightServer{ + rowsPerOffset: make(map[int64]uint64), + defaultRows: 1, + maxOffsetSeen: -1, + } +} + +// ConfigureRowsForOffset sets the row count for a specific logical offset. +func (s *MockArrowFlightServer) ConfigureRowsForOffset(offset int64, rows uint64) { + s.mu.Lock() + defer s.mu.Unlock() + s.rowsPerOffset[offset] = rows +} + +// SetDefaultRows sets the fallback row count used for unconfigured offsets. +func (s *MockArrowFlightServer) SetDefaultRows(rows uint64) { + s.mu.Lock() + defer s.mu.Unlock() + s.defaultRows = rows +} + +// GetBatchesReceived returns the total number of data batches (not schema) received. +func (s *MockArrowFlightServer) GetBatchesReceived() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.batchesReceived +} + +// GetMaxOffsetSeen returns the highest logical offset received so far. +func (s *MockArrowFlightServer) GetMaxOffsetSeen() int64 { + s.mu.Lock() + defer s.mu.Unlock() + return s.maxOffsetSeen +} + +// Reset clears server state for reuse between tests. +func (s *MockArrowFlightServer) ArrowReset() { + s.mu.Lock() + defer s.mu.Unlock() + s.rowsPerOffset = make(map[int64]uint64) + s.defaultRows = 1 + s.batchesReceived = 0 + s.maxOffsetSeen = -1 +} + +// DoPut implements flight.FlightServer. It handles one Arrow Flight DoPut call: +// 1. Reads the schema FlightData (first message, no AppMetadata). +// 2. Sends the stream-ready ack {"ack_up_to_offset":-1,"ack_up_to_records":0}. +// 3. For each subsequent data FlightData, reads offset_id from AppMetadata, +// accumulates the configured row count, and sends an ack. +func (s *MockArrowFlightServer) DoPut(stream flight.FlightService_DoPutServer) error { + // First message is the schema (FlightDataEncoderBuilder idx=0, no AppMetadata). + if _, err := stream.Recv(); err != nil { + return err + } + + // Send stream-ready ack to unblock the Rust SDK's stream creation wait. + readyBytes, _ := json.Marshal(arrowAckMeta{AckUpToOffset: -1, AckUpToRecords: 0}) + if err := stream.Send(&flight.PutResult{AppMetadata: readyBytes}); err != nil { + return err + } + + var cumulativeRecords uint64 + for { + data, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + + var meta arrowBatchMeta + if err := json.Unmarshal(data.AppMetadata, &meta); err != nil { + return fmt.Errorf("arrow mock: invalid batch AppMetadata: %w", err) + } + + s.mu.Lock() + rows, ok := s.rowsPerOffset[meta.OffsetID] + if !ok { + rows = s.defaultRows + } + s.batchesReceived++ + if meta.OffsetID > s.maxOffsetSeen { + s.maxOffsetSeen = meta.OffsetID + } + s.mu.Unlock() + + cumulativeRecords += rows + + ackBytes, _ := json.Marshal(arrowAckMeta{ + AckUpToOffset: meta.OffsetID, + AckUpToRecords: cumulativeRecords, + }) + if err := stream.Send(&flight.PutResult{AppMetadata: ackBytes}); err != nil { + return err + } + } +} + +// StartMockArrowServer creates and starts a mock Arrow Flight gRPC server on a +// random local port. Returns the server, its URL, a stop function, and any error. +func StartMockArrowServer() (*MockArrowFlightServer, string, func(), error) { + mockServer := NewMockArrowFlightServer() + + srv := flight.NewServerWithMiddleware(nil) + srv.RegisterFlightService(mockServer) + + if err := srv.Init("127.0.0.1:0"); err != nil { + return nil, "", nil, fmt.Errorf("failed to init Arrow Flight server: %w", err) + } + + go func() { + srv.Serve() //nolint:errcheck + }() + + // Let the server finish binding before returning. + time.Sleep(100 * time.Millisecond) + + serverURL := fmt.Sprintf("http://%s", srv.Addr().String()) + stop := func() { srv.Shutdown() } + + return mockServer, serverURL, stop, nil +} diff --git a/go/tests/go.mod b/go/tests/go.mod index e9bc2b2..ebb3017 100644 --- a/go/tests/go.mod +++ b/go/tests/go.mod @@ -3,6 +3,7 @@ module github.com/databricks/zerobus-sdk/go/tests go 1.24.0 require ( + github.com/apache/arrow-go/v18 v18.0.0 github.com/databricks/zerobus-sdk/go v0.0.0 github.com/golang/protobuf v1.5.4 google.golang.org/grpc v1.78.0 @@ -10,9 +11,21 @@ require ( ) require ( + github.com/goccy/go-json v0.10.3 // indirect + github.com/google/flatbuffers v24.3.25+incompatible // indirect + github.com/klauspost/compress v1.17.11 // indirect + github.com/klauspost/cpuid/v2 v2.2.8 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect + github.com/zeebo/xxh3 v1.0.2 // indirect + golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect + golang.org/x/mod v0.29.0 // indirect golang.org/x/net v0.47.0 // indirect + golang.org/x/sync v0.18.0 // indirect golang.org/x/sys v0.38.0 // indirect + golang.org/x/telemetry v0.0.0-20251008203120-078029d740a8 // indirect golang.org/x/text v0.31.0 // indirect + golang.org/x/tools v0.38.0 // indirect + golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda // indirect ) diff --git a/go/tests/go.sum b/go/tests/go.sum index 8a84e18..4b7fe33 100644 --- a/go/tests/go.sum +++ b/go/tests/go.sum @@ -1,13 +1,47 @@ +github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= +github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= +github.com/apache/arrow-go/v18 v18.0.0 h1:1dBDaSbH3LtulTyOVYaBCHO3yVRwjV+TZaqn3g6V7ZM= +github.com/apache/arrow-go/v18 v18.0.0/go.mod h1:t6+cWRSmKgdQ6HsxisQjok+jBpKGhRDiqcf3p0p/F+A= +github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE= +github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= +github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/flatbuffers v24.3.25+incompatible h1:CX395cjN9Kke9mmalRoL3d81AtFUxJM+yDthflgJGkI= +github.com/google/flatbuffers v24.3.25+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= +github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= +github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= @@ -20,12 +54,25 @@ go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6 go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA= go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY= +golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA= +golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w= golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= +golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= +golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/telemetry v0.0.0-20251008203120-078029d740a8 h1:LvzTn0GQhWuvKH/kVRS3R3bVAsdQWI7hvfLHGgh9+lU= +golang.org/x/telemetry v0.0.0-20251008203120-078029d740a8/go.mod h1:Pi4ztBfryZoJEkyFTI5/Ocsu2jXyDr6iSdgJiYE/uwE= golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= +golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ= +golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda h1:i/Q+bfisr7gq6feoJnS/DlpdwEL4ihp41fvRiM3Ork0= @@ -34,3 +81,5 @@ google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc= google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U= google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/rust/ffi/Cargo.toml b/rust/ffi/Cargo.toml index b77a308..762d58c 100644 --- a/rust/ffi/Cargo.toml +++ b/rust/ffi/Cargo.toml @@ -11,7 +11,10 @@ crate-type = ["staticlib"] [dependencies] # Core SDK logic - use local path dependency -databricks-zerobus-ingest-sdk = { path = "../sdk" } +databricks-zerobus-ingest-sdk = { path = "../sdk", features = ["arrow-flight", "testing"] } + +# Arrow IPC for serializing/deserializing RecordBatches across the FFI boundary +arrow-ipc = { version = "56.2.0", default-features = false } # FFI helpers tokio = { version = "1.42", features = ["rt", "rt-multi-thread"] } diff --git a/rust/ffi/cbindgen.toml b/rust/ffi/cbindgen.toml index 755d620..afbfc2b 100644 --- a/rust/ffi/cbindgen.toml +++ b/rust/ffi/cbindgen.toml @@ -8,6 +8,6 @@ namespace = "zerobus" cpp_compat = true [export] -include = ["CZerobusSdk", "CZerobusStream", "CResult", "CStreamConfigurationOptions", "CRecord", "CRecordArray", "CHeader", "CHeaders"] +include = ["CZerobusSdk", "CZerobusStream", "CResult", "CStreamConfigurationOptions", "CRecord", "CRecordArray", "CHeader", "CHeaders", "CArrowStream", "CArrowStreamConfigurationOptions", "CArrowBatchArray"] [export.rename] diff --git a/rust/ffi/src/lib.rs b/rust/ffi/src/lib.rs index 17e11e7..ddd46dc 100644 --- a/rust/ffi/src/lib.rs +++ b/rust/ffi/src/lib.rs @@ -13,11 +13,15 @@ use tokio::runtime::Runtime; use tracing_subscriber::{fmt, EnvFilter}; extern crate libc; +use arrow_ipc::{reader::StreamReader, writer::StreamWriter, CompressionType}; use async_trait::async_trait; use databricks_zerobus_ingest_sdk::databricks::zerobus::RecordType; use databricks_zerobus_ingest_sdk::{ - EncodedRecord, HeadersProvider, StreamConfigurationOptions, TableProperties, ZerobusError, - ZerobusResult, ZerobusSdk, ZerobusStream, + ArrowStreamConfigurationOptions, ArrowTableProperties, RecordBatch, ZerobusArrowStream, +}; +use databricks_zerobus_ingest_sdk::{ + EncodedRecord, HeadersProvider, NoTlsConfig, StreamConfigurationOptions, TableProperties, + ZerobusError, ZerobusResult, ZerobusSdk, ZerobusStream, }; use prost::Message; use std::sync::Arc; @@ -26,6 +30,559 @@ use std::sync::Arc; #[cfg(test)] mod tests; +// ============================================================================ +// Arrow Flight FFI +// ============================================================================ + +/// Opaque handle for an Arrow Flight stream. +#[repr(C)] +pub struct CArrowStream { + _private: [u8; 0], +} + +/// Configuration options for Arrow Flight streams. +/// +/// `ipc_compression`: -1 = None, 0 = LZ4_FRAME, 1 = ZSTD +#[repr(C)] +#[derive(Clone, Copy)] +pub struct CArrowStreamConfigurationOptions { + pub max_inflight_batches: usize, + pub recovery: bool, + pub recovery_timeout_ms: u64, + pub recovery_backoff_ms: u64, + pub recovery_retries: u32, + pub server_lack_of_ack_timeout_ms: u64, + pub flush_timeout_ms: u64, + pub connection_timeout_ms: u64, + /// -1 = None, 0 = LZ4_FRAME, 1 = ZSTD + pub ipc_compression: i32, +} + +impl From for ArrowStreamConfigurationOptions { + fn from(c_opts: CArrowStreamConfigurationOptions) -> Self { + let ipc_compression = match c_opts.ipc_compression { + 0 => Some(CompressionType::LZ4_FRAME), + 1 => Some(CompressionType::ZSTD), + _ => None, + }; + ArrowStreamConfigurationOptions { + max_inflight_batches: c_opts.max_inflight_batches, + recovery: c_opts.recovery, + recovery_timeout_ms: c_opts.recovery_timeout_ms, + recovery_backoff_ms: c_opts.recovery_backoff_ms, + recovery_retries: c_opts.recovery_retries, + server_lack_of_ack_timeout_ms: c_opts.server_lack_of_ack_timeout_ms, + flush_timeout_ms: c_opts.flush_timeout_ms, + connection_timeout_ms: c_opts.connection_timeout_ms, + ipc_compression, + } + } +} + +/// An array of Arrow IPC-encoded batches, returned by `zerobus_arrow_stream_get_unacked_batches`. +/// Must be freed with `zerobus_arrow_free_batch_array`. +#[repr(C)] +pub struct CArrowBatchArray { + /// Array of pointers to IPC-encoded batch bytes. + pub batches: *mut *mut u8, + /// Array of byte lengths, one per batch. + pub lengths: *mut usize, + /// Number of batches. + pub count: usize, +} + +// ---- Arrow pointer validation helpers ---- + +fn validate_arrow_stream_ptr<'a>( + stream: *mut CArrowStream, +) -> Result<&'a ZerobusArrowStream, &'static str> { + if stream.is_null() { + return Err("Arrow stream pointer is null"); + } + unsafe { Ok(&*(stream as *const ZerobusArrowStream)) } +} + +fn validate_arrow_stream_ptr_mut<'a>( + stream: *mut CArrowStream, +) -> Result<&'a mut ZerobusArrowStream, &'static str> { + if stream.is_null() { + return Err("Arrow stream pointer is null"); + } + unsafe { Ok(&mut *(stream as *mut ZerobusArrowStream)) } +} + +// ---- Arrow IPC helpers ---- + +/// Deserializes an `Arc` from Arrow IPC stream bytes (schema-only stream). +#[allow(clippy::result_large_err)] +fn ipc_bytes_to_schema( + bytes: &[u8], +) -> ZerobusResult> { + use std::io::Cursor; + let cursor = Cursor::new(bytes); + let reader = StreamReader::try_new(cursor, None).map_err(|e| { + ZerobusError::InvalidArgument(format!("Failed to parse Arrow IPC schema: {e}")) + })?; + Ok(reader.schema().clone()) +} + +/// Deserializes the first `RecordBatch` from Arrow IPC stream bytes. +#[allow(clippy::result_large_err)] +fn ipc_bytes_to_record_batch(bytes: &[u8]) -> ZerobusResult { + use std::io::Cursor; + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).map_err(|e| { + ZerobusError::InvalidArgument(format!("Failed to parse Arrow IPC stream: {e}")) + })?; + reader + .next() + .ok_or_else(|| ZerobusError::InvalidArgument("No record batch in IPC stream".to_string()))? + .map_err(|e| ZerobusError::InvalidArgument(format!("Failed to read Arrow IPC batch: {e}"))) +} + +/// Serializes a `RecordBatch` to Arrow IPC stream bytes (schema + one batch). +#[allow(clippy::result_large_err)] +fn record_batch_to_ipc_bytes(batch: &RecordBatch) -> ZerobusResult> { + let mut buf = Vec::new(); + let mut writer = StreamWriter::try_new(&mut buf, batch.schema().as_ref()).map_err(|e| { + ZerobusError::InvalidArgument(format!("Failed to create Arrow IPC writer: {e}")) + })?; + writer.write(batch).map_err(|e| { + ZerobusError::InvalidArgument(format!("Failed to write Arrow IPC batch: {e}")) + })?; + writer.finish().map_err(|e| { + ZerobusError::InvalidArgument(format!("Failed to finish Arrow IPC stream: {e}")) + })?; + Ok(buf) +} + +// ---- Arrow FFI functions ---- + +/// Creates an Arrow Flight stream authenticated with OAuth client credentials. +/// +/// `schema_ipc_bytes` must point to Arrow IPC stream bytes encoding only the schema +/// (write an empty IPC stream with just the schema message). +#[no_mangle] +pub extern "C" fn zerobus_sdk_create_arrow_stream( + sdk: *mut CZerobusSdk, + table_name: *const c_char, + schema_ipc_bytes: *const u8, + schema_ipc_len: usize, + client_id: *const c_char, + client_secret: *const c_char, + options: *const CArrowStreamConfigurationOptions, + result: *mut CResult, +) -> *mut CArrowStream { + let sdk_ref = match validate_sdk_ptr(sdk) { + Ok(s) => s, + Err(msg) => { + write_error_result(result, msg, false); + return ptr::null_mut(); + } + }; + + let res = RUNTIME.block_on(async { + let table_name_str = unsafe { c_str_to_string(table_name).map_err(|e| e.to_string())? }; + let client_id_str = unsafe { c_str_to_string(client_id).map_err(|e| e.to_string())? }; + let client_secret_str = + unsafe { c_str_to_string(client_secret).map_err(|e| e.to_string())? }; + + if schema_ipc_bytes.is_null() || schema_ipc_len == 0 { + return Err("Schema IPC bytes are required for Arrow stream".to_string()); + } + let schema_bytes = unsafe { std::slice::from_raw_parts(schema_ipc_bytes, schema_ipc_len) }; + let schema = ipc_bytes_to_schema(schema_bytes).map_err(|e| e.to_string())?; + + let table_props = ArrowTableProperties { + table_name: table_name_str, + schema, + }; + let stream_options = if !options.is_null() { + Some(unsafe { (*options).into() }) + } else { + None + }; + + let stream = sdk_ref + .create_arrow_stream( + table_props, + client_id_str, + client_secret_str, + stream_options, + ) + .await + .map_err(|e| e.to_string())?; + + let boxed = Box::new(stream); + Ok::<*mut CArrowStream, String>(Box::into_raw(boxed) as *mut CArrowStream) + }); + + match res { + Ok(ptr) => { + write_success_result(result); + ptr + } + Err(err) => { + write_error_result(result, &err, false); + ptr::null_mut() + } + } +} + +/// Creates an Arrow Flight stream with a custom headers provider callback. +/// +/// `schema_ipc_bytes` must point to Arrow IPC stream bytes encoding only the schema. +#[no_mangle] +pub extern "C" fn zerobus_sdk_create_arrow_stream_with_headers_provider( + sdk: *mut CZerobusSdk, + table_name: *const c_char, + schema_ipc_bytes: *const u8, + schema_ipc_len: usize, + headers_callback: HeadersProviderCallback, + user_data: *mut std::ffi::c_void, + options: *const CArrowStreamConfigurationOptions, + result: *mut CResult, +) -> *mut CArrowStream { + let sdk_ref = match validate_sdk_ptr(sdk) { + Ok(s) => s, + Err(msg) => { + write_error_result(result, msg, false); + return ptr::null_mut(); + } + }; + + let res = RUNTIME.block_on(async { + let table_name_str = unsafe { c_str_to_string(table_name).map_err(|e| e.to_string())? }; + + if schema_ipc_bytes.is_null() || schema_ipc_len == 0 { + return Err("Schema IPC bytes are required for Arrow stream".to_string()); + } + let schema_bytes = unsafe { std::slice::from_raw_parts(schema_ipc_bytes, schema_ipc_len) }; + let schema = ipc_bytes_to_schema(schema_bytes).map_err(|e| e.to_string())?; + + let table_props = ArrowTableProperties { + table_name: table_name_str, + schema, + }; + let stream_options = if !options.is_null() { + Some(unsafe { (*options).into() }) + } else { + None + }; + + let headers_provider = Arc::new(CallbackHeadersProvider::new(headers_callback, user_data)); + + let stream = sdk_ref + .create_arrow_stream_with_headers_provider( + table_props, + headers_provider, + stream_options, + ) + .await + .map_err(|e| e.to_string())?; + + let boxed = Box::new(stream); + Ok::<*mut CArrowStream, String>(Box::into_raw(boxed) as *mut CArrowStream) + }); + + match res { + Ok(ptr) => { + write_success_result(result); + ptr + } + Err(err) => { + write_error_result(result, &err, false); + ptr::null_mut() + } + } +} + +/// Frees an Arrow Flight stream instance. +#[no_mangle] +pub extern "C" fn zerobus_arrow_stream_free(stream: *mut CArrowStream) { + if !stream.is_null() { + unsafe { + let _ = Box::from_raw(stream as *mut ZerobusArrowStream); + } + } +} + +/// Ingests one Arrow RecordBatch supplied as Arrow IPC stream bytes. +/// +/// `ipc_bytes` must be a valid Arrow IPC stream (schema + one record batch). +/// Returns the logical offset assigned to this batch, or -1 on error. +#[no_mangle] +pub extern "C" fn zerobus_arrow_stream_ingest_batch( + stream: *mut CArrowStream, + ipc_bytes: *const u8, + ipc_len: usize, + result: *mut CResult, +) -> i64 { + if ipc_bytes.is_null() || ipc_len == 0 { + write_error_result(result, "IPC bytes are required", false); + return -1; + } + + let stream_ref = match validate_arrow_stream_ptr(stream) { + Ok(s) => s, + Err(msg) => { + write_error_result(result, msg, false); + return -1; + } + }; + + let bytes = unsafe { std::slice::from_raw_parts(ipc_bytes, ipc_len) }; + + let offset_res = RUNTIME.block_on(async { + let batch = ipc_bytes_to_record_batch(bytes)?; + stream_ref.ingest_batch(batch).await + }); + + match offset_res { + Ok(offset) => { + write_success_result(result); + offset + } + Err(err) => { + if !result.is_null() { + unsafe { + *result = CResult::error(err); + } + } + -1 + } + } +} + +/// Waits until the server acknowledges the batch at the given logical offset. +#[no_mangle] +pub extern "C" fn zerobus_arrow_stream_wait_for_offset( + stream: *mut CArrowStream, + offset: i64, + result: *mut CResult, +) -> bool { + let stream_ref = match validate_arrow_stream_ptr(stream) { + Ok(s) => s, + Err(msg) => { + write_error_result(result, msg, false); + return false; + } + }; + + let res = RUNTIME.block_on(async { stream_ref.wait_for_offset(offset).await }); + + match res { + Ok(()) => { + write_success_result(result); + true + } + Err(err) => { + if !result.is_null() { + unsafe { + *result = CResult::error(err); + } + } + false + } + } +} + +/// Flushes all pending batches and waits for their acknowledgment. +#[no_mangle] +pub extern "C" fn zerobus_arrow_stream_flush( + stream: *mut CArrowStream, + result: *mut CResult, +) -> bool { + let stream_ref = match validate_arrow_stream_ptr(stream) { + Ok(s) => s, + Err(msg) => { + write_error_result(result, msg, false); + return false; + } + }; + + let res = RUNTIME.block_on(async { stream_ref.flush().await }); + + match res { + Ok(()) => { + write_success_result(result); + true + } + Err(err) => { + if !result.is_null() { + unsafe { + *result = CResult::error(err); + } + } + false + } + } +} + +/// Gracefully closes the stream, flushing all pending batches first. +#[no_mangle] +pub extern "C" fn zerobus_arrow_stream_close( + stream: *mut CArrowStream, + result: *mut CResult, +) -> bool { + let stream_ref = match validate_arrow_stream_ptr_mut(stream) { + Ok(s) => s, + Err(msg) => { + write_error_result(result, msg, false); + return false; + } + }; + + let res = RUNTIME.block_on(async { stream_ref.close().await }); + + match res { + Ok(()) => { + write_success_result(result); + true + } + Err(err) => { + if !result.is_null() { + unsafe { + *result = CResult::error(err); + } + } + false + } + } +} + +/// Returns all unacknowledged batches from a closed or failed stream as Arrow IPC bytes. +/// +/// Each batch is serialized as a self-contained Arrow IPC stream (schema + one batch). +/// The returned array must be freed with `zerobus_arrow_free_batch_array`. +#[no_mangle] +pub extern "C" fn zerobus_arrow_stream_get_unacked_batches( + stream: *mut CArrowStream, + result: *mut CResult, +) -> CArrowBatchArray { + let empty = CArrowBatchArray { + batches: ptr::null_mut(), + lengths: ptr::null_mut(), + count: 0, + }; + + let stream_ref = match validate_arrow_stream_ptr(stream) { + Ok(s) => s, + Err(msg) => { + write_error_result(result, msg, false); + return empty; + } + }; + + let batches_res = RUNTIME.block_on(async { stream_ref.get_unacked_batches().await }); + + match batches_res { + Ok(batches) => { + if batches.is_empty() { + write_success_result(result); + return empty; + } + + let count = batches.len(); + let mut batch_ptrs: Vec<*mut u8> = Vec::with_capacity(count); + let mut batch_lens: Vec = Vec::with_capacity(count); + + for batch in &batches { + match record_batch_to_ipc_bytes(batch) { + Ok(bytes) => { + let len = bytes.len(); + let ptr = Box::into_raw(bytes.into_boxed_slice()) as *mut u8; + batch_ptrs.push(ptr); + batch_lens.push(len); + } + Err(e) => { + // Free already-allocated batches before returning error. + for (&ptr, &len) in batch_ptrs.iter().zip(batch_lens.iter()) { + if !ptr.is_null() && len > 0 { + unsafe { + let _ = Vec::from_raw_parts(ptr, len, len); + } + } + } + write_error_result(result, &e.to_string(), false); + return empty; + } + } + } + + let ptrs_ptr = batch_ptrs.as_mut_ptr(); + let lens_ptr = batch_lens.as_mut_ptr(); + std::mem::forget(batch_ptrs); + std::mem::forget(batch_lens); + + write_success_result(result); + CArrowBatchArray { + batches: ptrs_ptr, + lengths: lens_ptr, + count, + } + } + Err(err) => { + if !result.is_null() { + unsafe { + *result = CResult::error(err); + } + } + empty + } + } +} + +/// Frees a `CArrowBatchArray` returned by `zerobus_arrow_stream_get_unacked_batches`. +#[no_mangle] +pub extern "C" fn zerobus_arrow_free_batch_array(array: CArrowBatchArray) { + if array.count == 0 { + return; + } + unsafe { + if !array.batches.is_null() && !array.lengths.is_null() { + let ptrs = Vec::from_raw_parts(array.batches, array.count, array.count); + let lens = Vec::from_raw_parts(array.lengths, array.count, array.count); + for (&ptr, &len) in ptrs.iter().zip(lens.iter()) { + if !ptr.is_null() && len > 0 { + let _ = Vec::from_raw_parts(ptr, len, len); + } + } + } + } +} + +/// Returns whether the Arrow stream has been closed. +#[no_mangle] +pub extern "C" fn zerobus_arrow_stream_is_closed(stream: *mut CArrowStream) -> bool { + match validate_arrow_stream_ptr(stream) { + Ok(s) => s.is_closed(), + Err(_) => true, + } +} + +/// Returns the default Arrow stream configuration options. +#[no_mangle] +pub extern "C" fn zerobus_arrow_get_default_config() -> CArrowStreamConfigurationOptions { + let d = ArrowStreamConfigurationOptions::default(); + let ipc_compression = match d.ipc_compression { + Some(ct) if ct == CompressionType::LZ4_FRAME => 0, + Some(ct) if ct == CompressionType::ZSTD => 1, + _ => -1, + }; + CArrowStreamConfigurationOptions { + max_inflight_batches: d.max_inflight_batches, + recovery: d.recovery, + recovery_timeout_ms: d.recovery_timeout_ms, + recovery_backoff_ms: d.recovery_backoff_ms, + recovery_retries: d.recovery_retries, + server_lack_of_ack_timeout_ms: d.server_lack_of_ack_timeout_ms, + flush_timeout_ms: d.flush_timeout_ms, + connection_timeout_ms: d.connection_timeout_ms, + ipc_compression, + } +} + // Global Tokio runtime for handling async Rust calls static RUNTIME: Lazy = Lazy::new(|| Runtime::new().expect("Failed to create Tokio runtime")); @@ -389,11 +946,13 @@ pub extern "C" fn zerobus_sdk_new( let endpoint = unsafe { c_str_to_string(zerobus_endpoint).map_err(|e| e.to_string())? }; let catalog_url = unsafe { c_str_to_string(unity_catalog_url).map_err(|e| e.to_string())? }; - let sdk = ZerobusSdk::builder() - .endpoint(endpoint) - .unity_catalog_url(catalog_url) - .build() - .map_err(|e| e.to_string())?; + let mut builder = ZerobusSdk::builder() + .endpoint(endpoint.clone()) + .unity_catalog_url(catalog_url); + if endpoint.starts_with("http://") { + builder = builder.tls_config(Arc::new(NoTlsConfig)); + } + let sdk = builder.build().map_err(|e| e.to_string())?; let boxed = Box::new(sdk); Ok(Box::into_raw(boxed) as *mut CZerobusSdk) })(); @@ -1024,7 +1583,7 @@ pub extern "C" fn zerobus_free_error_message(message: *mut c_char) { } } -/// Get default configuration options +/// Get default stream configuration options #[no_mangle] pub extern "C" fn zerobus_get_default_config() -> CStreamConfigurationOptions { let default_opts = StreamConfigurationOptions::default(); diff --git a/rust/ffi/zerobus.h b/rust/ffi/zerobus.h index 52eab91..5022fb8 100644 --- a/rust/ffi/zerobus.h +++ b/rust/ffi/zerobus.h @@ -18,6 +18,43 @@ namespace zerobus { #endif // __cplusplus +/** + * Opaque handle for an Arrow Flight stream. + */ +typedef struct CArrowStream { + uint8_t _private[0]; +} CArrowStream; + +typedef struct CZerobusSdk { + uint8_t _private[0]; +} CZerobusSdk; + +/** + * Configuration options for Arrow Flight streams. + * + * `ipc_compression`: -1 = None, 0 = LZ4_FRAME, 1 = ZSTD + */ +typedef struct CArrowStreamConfigurationOptions { + uintptr_t max_inflight_batches; + bool recovery; + uint64_t recovery_timeout_ms; + uint64_t recovery_backoff_ms; + uint32_t recovery_retries; + uint64_t server_lack_of_ack_timeout_ms; + uint64_t flush_timeout_ms; + uint64_t connection_timeout_ms; + /** + * -1 = None, 0 = LZ4_FRAME, 1 = ZSTD + */ + int32_t ipc_compression; +} CArrowStreamConfigurationOptions; + +typedef struct CResult { + bool success; + char *error_message; + bool is_retryable; +} CResult; + /** * A single header key-value pair for C FFI */ @@ -35,15 +72,31 @@ typedef struct CHeaders { char *error_message; } CHeaders; -typedef struct CZerobusSdk { - uint8_t _private[0]; -} CZerobusSdk; +/** + * Function pointer type for the headers provider callback + * The callback should return a CHeaders struct + * The caller is responsible for freeing the returned CHeaders using zerobus_free_headers + */ +typedef struct CHeaders (*HeadersProviderCallback)(void *user_data); -typedef struct CResult { - bool success; - char *error_message; - bool is_retryable; -} CResult; +/** + * An array of Arrow IPC-encoded batches, returned by `zerobus_arrow_stream_get_unacked_batches`. + * Must be freed with `zerobus_arrow_free_batch_array`. + */ +typedef struct CArrowBatchArray { + /** + * Array of pointers to IPC-encoded batch bytes. + */ + uint8_t **batches; + /** + * Array of byte lengths, one per batch. + */ + uintptr_t *lengths; + /** + * Number of batches. + */ + uintptr_t count; +} CArrowBatchArray; typedef struct CZerobusStream { uint8_t _private[0]; @@ -64,13 +117,6 @@ typedef struct CStreamConfigurationOptions { bool has_callback_max_wait_time_ms; } CStreamConfigurationOptions; -/** - * Function pointer type for the headers provider callback - * The callback should return a CHeaders struct - * The caller is responsible for freeing the returned CHeaders using zerobus_free_headers - */ -typedef struct CHeaders (*HeadersProviderCallback)(void *user_data); - /** * Represents a single record (either Proto or JSON) */ @@ -92,6 +138,92 @@ typedef struct CRecordArray { extern "C" { #endif // __cplusplus +/** + * Creates an Arrow Flight stream authenticated with OAuth client credentials. + * + * `schema_ipc_bytes` must point to Arrow IPC stream bytes encoding only the schema + * (write an empty IPC stream with just the schema message). + */ +struct CArrowStream *zerobus_sdk_create_arrow_stream(struct CZerobusSdk *sdk, + const char *table_name, + const uint8_t *schema_ipc_bytes, + uintptr_t schema_ipc_len, + const char *client_id, + const char *client_secret, + const struct CArrowStreamConfigurationOptions *options, + struct CResult *result); + +/** + * Creates an Arrow Flight stream with a custom headers provider callback. + * + * `schema_ipc_bytes` must point to Arrow IPC stream bytes encoding only the schema. + */ +struct CArrowStream *zerobus_sdk_create_arrow_stream_with_headers_provider(struct CZerobusSdk *sdk, + const char *table_name, + const uint8_t *schema_ipc_bytes, + uintptr_t schema_ipc_len, + HeadersProviderCallback headers_callback, + void *user_data, + const struct CArrowStreamConfigurationOptions *options, + struct CResult *result); + +/** + * Frees an Arrow Flight stream instance. + */ +void zerobus_arrow_stream_free(struct CArrowStream *stream); + +/** + * Ingests one Arrow RecordBatch supplied as Arrow IPC stream bytes. + * + * `ipc_bytes` must be a valid Arrow IPC stream (schema + one record batch). + * Returns the logical offset assigned to this batch, or -1 on error. + */ +int64_t zerobus_arrow_stream_ingest_batch(struct CArrowStream *stream, + const uint8_t *ipc_bytes, + uintptr_t ipc_len, + struct CResult *result); + +/** + * Waits until the server acknowledges the batch at the given logical offset. + */ +bool zerobus_arrow_stream_wait_for_offset(struct CArrowStream *stream, + int64_t offset, + struct CResult *result); + +/** + * Flushes all pending batches and waits for their acknowledgment. + */ +bool zerobus_arrow_stream_flush(struct CArrowStream *stream, struct CResult *result); + +/** + * Gracefully closes the stream, flushing all pending batches first. + */ +bool zerobus_arrow_stream_close(struct CArrowStream *stream, struct CResult *result); + +/** + * Returns all unacknowledged batches from a closed or failed stream as Arrow IPC bytes. + * + * Each batch is serialized as a self-contained Arrow IPC stream (schema + one batch). + * The returned array must be freed with `zerobus_arrow_free_batch_array`. + */ +struct CArrowBatchArray zerobus_arrow_stream_get_unacked_batches(struct CArrowStream *stream, + struct CResult *result); + +/** + * Frees a `CArrowBatchArray` returned by `zerobus_arrow_stream_get_unacked_batches`. + */ +void zerobus_arrow_free_batch_array(struct CArrowBatchArray array); + +/** + * Returns whether the Arrow stream has been closed. + */ +bool zerobus_arrow_stream_is_closed(struct CArrowStream *stream); + +/** + * Returns the default Arrow stream configuration options. + */ +struct CArrowStreamConfigurationOptions zerobus_arrow_get_default_config(void); + /** * Free headers returned from callback */ @@ -224,7 +356,7 @@ bool zerobus_stream_close(struct CZerobusStream *stream, struct CResult *result) void zerobus_free_error_message(char *message); /** - * Get default configuration options + * Get default stream configuration options */ struct CStreamConfigurationOptions zerobus_get_default_config(void);