Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ fmt-go:
cd examples/json/batch && go fmt ./...
cd examples/proto/single && go fmt ./...
cd examples/proto/batch && go fmt ./...
cd examples/arrow && go fmt ./...

fmt-rust:
@echo "Formatting Rust code..."
Expand All @@ -95,6 +96,7 @@ lint-go:
cd examples/json/batch && go vet ./...
cd examples/proto/single && go vet ./...
cd examples/proto/batch && go vet ./...
cd examples/arrow && go vet ./...

lint-rust:
@echo "Linting Rust code..."
Expand All @@ -120,6 +122,7 @@ examples: build
cd examples/json/batch && go build -o json-batch main.go
cd examples/proto/single && go build -o proto-single main.go
cd examples/proto/batch && go build -o proto-batch main.go
cd examples/arrow && go build -o arrow main.go
@echo "✓ Examples built successfully"

release:
Expand Down
9 changes: 5 additions & 4 deletions go/NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@

### New Features and Improvements

**[Experimental] Arrow Flight Ingestion**: Added experimental Arrow Flight support for high-throughput Apache Arrow RecordBatch ingestion

### Deprecations
- New `CreateArrowStream` and `CreateArrowStreamWithHeadersProvider` methods on `ZerobusSdk`
- New `ZerobusArrowStream` type with `IngestBatch`, `WaitForOffset`, `Flush`, `Close`, and `GetUnackedBatches` methods
- Configurable IPC compression via `ArrowStreamConfigurationOptions.IpcCompression` (supports `LZ4Frame` and `Zstd`)

### Deprecations

### Bug Fixes

### Documentation


### Internal Changes


### API Changes

84 changes: 84 additions & 0 deletions go/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ We are keen to hear feedback from you on this SDK. Please [file issues](https://
- [Configuration Options](#configuration-options)
- [Error Handling](#error-handling)
- [Examples](#examples)
- [Arrow Flight Ingestion (Experimental)](#arrow-flight-ingestion-experimental)
- [Tests](#tests)
- [Performance Benchmarks](#performance-benchmarks)
- [Best Practices](#best-practices)
Expand Down Expand Up @@ -64,6 +65,7 @@ This SDK wraps the [Rust SDK](https://github.com/databricks/zerobus-sdk/tree/mai
- **Configurable timeouts and retry policies**
- **Immediate offset returns** for ingested records
- **Graceful stream management** - Proper flushing and resource cleanup
- **[Experimental] Arrow Flight ingestion** - High-throughput Apache Arrow RecordBatch ingestion via the Arrow Flight protocol
## Getting Started

Choose your installation path:
Expand Down Expand Up @@ -748,6 +750,9 @@ The `examples/` directory contains complete, runnable examples organized by form
- **`examples/proto/single/`** - Single record ingestion with protobuf
- **`examples/proto/batch/`** - Batch ingestion with protobuf

**Arrow Flight Examples (Experimental):**
- **`examples/arrow/`** - Arrow RecordBatch ingestion via Arrow Flight protocol

**To run an example:**

```bash
Expand All @@ -767,6 +772,14 @@ go run main.go

Each example includes detailed comments and demonstrates best practices for production use. See [`examples/README.md`](examples/README.md) for complete setup instructions, prerequisites, and detailed comparisons between examples.

## Arrow Flight Ingestion (Experimental)

> **Experimental/Unsupported**: Arrow Flight ingestion is experimental and not yet supported for production use. The API may change in future releases.

The SDK supports high-throughput ingestion of Apache Arrow RecordBatches via the Arrow Flight protocol. This is useful for pipelines that already work with columnar Arrow data.

See [`examples/arrow/`](examples/arrow/) for a complete working example and setup instructions.

## Tests

Tests are located in the repository and can be run using:
Expand Down Expand Up @@ -1247,6 +1260,77 @@ func (e *ZerobusError) Retryable() bool

Returns `true` if the error can be automatically recovered by the SDK.

### `ZerobusArrowStream` (Experimental)

Represents an active Arrow Flight ingestion stream.

**Methods:**

```go
func (s *ZerobusArrowStream) IngestBatch(ipcBytes []byte) (int64, error)
```

Ingests an Arrow RecordBatch serialized as Arrow IPC stream bytes. Returns the logical offset.

```go
func (s *ZerobusArrowStream) WaitForOffset(offset int64) error
```

Blocks until the server acknowledges the batch at the given offset.

```go
func (s *ZerobusArrowStream) Flush() error
```

Waits for all pending batches to be acknowledged.

```go
func (s *ZerobusArrowStream) Close() error
```

Flushes and closes the stream.

```go
func (s *ZerobusArrowStream) GetUnackedBatches() ([][]byte, error)
```

Returns unacknowledged batches as Arrow IPC bytes. Call only after stream failure.

### `ArrowStreamConfigurationOptions` (Experimental)

```go
type ArrowStreamConfigurationOptions struct {
MaxInflightBatches uint64
Recovery bool
RecoveryTimeoutMs uint64
RecoveryBackoffMs uint64
RecoveryRetries uint32
IpcCompression IpcCompressionType // None, LZ4Frame, or Zstd
}
```

Use `DefaultArrowStreamConfigurationOptions()` to get sensible defaults.

**`ZerobusSdk` Arrow methods:**

```go
func (s *ZerobusSdk) CreateArrowStream(
tableName string,
schemaIpcBytes []byte,
clientID, clientSecret string,
options *ArrowStreamConfigurationOptions,
) (*ZerobusArrowStream, error)
```

```go
func (s *ZerobusSdk) CreateArrowStreamWithHeadersProvider(
tableName string,
schemaIpcBytes []byte,
headersProvider HeadersProvider,
options *ArrowStreamConfigurationOptions,
) (*ZerobusArrowStream, error)
```

## Building from Source

This section is for contributors and those who need to build the SDK from source. If you just want to use the SDK, see [Installation](#installation) instead.
Expand Down
Loading
Loading