Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
.bsp/
.bazelbsp/

# Tools
pyrefly.toml

# OS
.DS_Store
Thumbs.db
Expand Down
20 changes: 20 additions & 0 deletions python/NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,35 @@

### New Features and Improvements

- **Arrow Flight Support (Experimental)**: Added support for ingesting `pyarrow.RecordBatch` and `pyarrow.Table` objects via Arrow Flight protocol
- **Note**: Arrow Flight is not yet supported by default from the Zerobus server side.
- New `ZerobusArrowStream` class (sync in `zerobus.sdk.sync`, async in `zerobus.sdk.aio`) with `ingest_batch()`, `wait_for_offset()`, `flush()`, `close()`, `get_unacked_batches()` methods
- New `ArrowStreamConfigurationOptions` for configuring Arrow streams (max inflight batches, recovery, timeouts)
- New `create_arrow_stream()` and `recreate_arrow_stream()` methods on both sync and async `ZerobusSdk`
- Accepts both `pyarrow.RecordBatch` and `pyarrow.Table` (Tables are combined to a single batch internally)
- Arrow is opt-in: install via `pip install databricks-zerobus-ingest-sdk[arrow]` (requires `pyarrow>=14.0.0`)
- Arrow types gated behind `_core.arrow` submodule — not loaded unless pyarrow is installed
- Available from both `zerobus.sdk.sync` and `zerobus.sdk.aio`, and re-exported from top-level `zerobus` package

### Bug Fixes

### Documentation

### Internal Changes

- Bumped Rust SDK dependency to v1.0.1 with `arrow-flight` feature
- Added `arrow-ipc`, `arrow-schema`, `arrow-array` (v56.2.0) Rust dependencies for IPC serialization
- Added PyO3 arrow module (`arrow.rs`) with `ArrowStreamConfigurationOptions`, `ZerobusArrowStream`, `AsyncZerobusArrowStream` pyclasses
- Added Python-side serialization helpers in `zerobus.sdk.shared.arrow` (`_serialize_schema`, `_serialize_batch`, `_deserialize_batch`)

### Breaking Changes

### Deprecations

### API Changes

- Added `create_arrow_stream(table_name, schema, client_id, client_secret, options=None, headers_provider=None)` to sync and async `ZerobusSdk`
- Added `recreate_arrow_stream(old_stream)` to sync and async `ZerobusSdk`
- Added `ZerobusArrowStream` class (sync and async variants) with methods: `ingest_batch()`, `wait_for_offset()`, `flush()`, `close()`, `get_unacked_batches()`, properties: `is_closed`, `table_name`
- Added `ArrowStreamConfigurationOptions` class with fields: `max_inflight_batches`, `recovery`, `recovery_timeout_ms`, `recovery_backoff_ms`, `recovery_retries`, `server_lack_of_ack_timeout_ms`, `flush_timeout_ms`, `connection_timeout_ms`
- Added optional dependency: `pyarrow>=14.0.0` via `pip install databricks-zerobus-ingest-sdk[arrow]`
200 changes: 200 additions & 0 deletions python/examples/async_example_arrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
"""
Asynchronous Ingestion Example - Arrow Flight Mode

This example demonstrates record ingestion using the asynchronous API with Arrow Flight.

Record Type Mode: Arrow (RecordBatch)
- Records are sent as pyarrow RecordBatches
- Uses Arrow Flight protocol for columnar data transfer
- Best for structured/columnar data, DataFrames, Parquet workflows

Requirements:
pip install databricks-zerobus-ingest-sdk[arrow]

Note: Arrow Flight support is experimental and not yet supported for production use.
"""

import asyncio
import logging
import os
import time

import pyarrow as pa

from zerobus.sdk.aio import ZerobusSdk
from zerobus.sdk.shared.arrow import ArrowStreamConfigurationOptions

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)


# Configuration - update these with your values
SERVER_ENDPOINT = os.getenv("ZEROBUS_SERVER_ENDPOINT", "https://your-shard-id.zerobus.region.cloud.databricks.com")
UNITY_CATALOG_ENDPOINT = os.getenv("DATABRICKS_WORKSPACE_URL", "https://your-workspace.cloud.databricks.com")
TABLE_NAME = os.getenv("ZEROBUS_TABLE_NAME", "catalog.schema.table")

# For OAuth authentication
CLIENT_ID = os.getenv("DATABRICKS_CLIENT_ID", "your-oauth-client-id")
CLIENT_SECRET = os.getenv("DATABRICKS_CLIENT_SECRET", "your-oauth-client-secret")

# Number of batches to ingest
NUM_BATCHES = 10
ROWS_PER_BATCH = 100

# Define the Arrow schema
SCHEMA = pa.schema(
[
("device_name", pa.large_utf8()),
("temp", pa.int32()),
("humidity", pa.int64()),
]
)


def create_sample_batch(batch_index):
"""
Creates a sample RecordBatch with air quality data.

Returns a pyarrow.RecordBatch with ROWS_PER_BATCH rows.
"""
return pa.record_batch(
{
"device_name": [f"sensor-{(batch_index * ROWS_PER_BATCH + i) % 10}" for i in range(ROWS_PER_BATCH)],
"temp": [20 + ((batch_index * ROWS_PER_BATCH + i) % 15) for i in range(ROWS_PER_BATCH)],
"humidity": [50 + ((batch_index * ROWS_PER_BATCH + i) % 40) for i in range(ROWS_PER_BATCH)],
},
schema=SCHEMA,
)


async def main():
print("Starting asynchronous ingestion example (Arrow Flight Mode)...")
print("=" * 60)

# Check if credentials are configured
if CLIENT_ID == "your-oauth-client-id" or CLIENT_SECRET == "your-oauth-client-secret":
logger.error("Please set DATABRICKS_CLIENT_ID and DATABRICKS_CLIENT_SECRET environment variables")
return

if SERVER_ENDPOINT == "https://your-shard-id.zerobus.region.cloud.databricks.com":
logger.error("Please set ZEROBUS_SERVER_ENDPOINT environment variable")
return

if TABLE_NAME == "catalog.schema.table":
logger.error("Please set ZEROBUS_TABLE_NAME environment variable")
return

try:
# Step 1: Initialize the SDK
sdk = ZerobusSdk(SERVER_ENDPOINT, UNITY_CATALOG_ENDPOINT)
logger.info("SDK initialized")

# Step 2: Configure arrow stream options (all optional, shown with defaults)
options = ArrowStreamConfigurationOptions(
max_inflight_batches=10,
recovery=True,
recovery_timeout_ms=15000,
recovery_backoff_ms=2000,
recovery_retries=3,
)
logger.info("Arrow stream configuration created")

# Step 3: Create an Arrow Flight stream
#
# Pass a pyarrow.Schema - the SDK handles serialization internally.
# The SDK automatically:
# - Includes authorization header with OAuth token
# - Includes x-databricks-zerobus-table-name header
stream = await sdk.create_arrow_stream(TABLE_NAME, SCHEMA, CLIENT_ID, CLIENT_SECRET, options)
logger.info(f"Arrow stream created for table: {stream.table_name}")

# Step 4: Ingest Arrow RecordBatches asynchronously
logger.info(f"\nIngesting {NUM_BATCHES} batches of {ROWS_PER_BATCH} rows each...")
start_time = time.time()
total_rows = 0

try:
# ========================================================================
# Ingest RecordBatches - each call returns an offset
# ========================================================================
offsets = []
for i in range(NUM_BATCHES):
batch = create_sample_batch(i)
offset = await stream.ingest_batch(batch)
offsets.append(offset)
total_rows += batch.num_rows
logger.info(f" Batch {i + 1}: {batch.num_rows} rows, offset: {offset}")

# ========================================================================
# You can also ingest a pyarrow.Table directly
# The SDK converts it to a single RecordBatch internally
# ========================================================================
table = pa.table(
{
"device_name": [f"sensor-table-{i}" for i in range(50)],
"temp": list(range(20, 70)),
"humidity": list(range(50, 100)),
},
schema=SCHEMA,
)
offset = await stream.ingest_batch(table)
offsets.append(offset)
total_rows += table.num_rows
logger.info(f" Table ingested: {table.num_rows} rows, offset: {offset}")

submit_end_time = time.time()
submit_duration = submit_end_time - start_time
logger.info(f"\nAll batches submitted in {submit_duration:.2f} seconds")

# ========================================================================
# Wait for the last offset to be acknowledged
# ========================================================================
logger.info(f"Waiting for offset {offsets[-1]} to be acknowledged...")
await stream.wait_for_offset(offsets[-1])
logger.info(f" Offset {offsets[-1]} acknowledged")

# Step 5: Flush and close the stream
logger.info("\nFlushing stream...")
await stream.flush()
logger.info("Stream flushed")

end_time = time.time()
total_duration = end_time - start_time
rows_per_second = total_rows / total_duration

await stream.close()
logger.info("Stream closed")

# Print summary
print("\n" + "=" * 60)
print("Ingestion Summary:")
print(f" Total batches: {NUM_BATCHES + 1}")
print(f" Total rows: {total_rows}")
print(f" Submit time: {submit_duration:.2f} seconds")
print(f" Total time: {total_duration:.2f} seconds")
print(f" Throughput: {rows_per_second:.2f} rows/sec")
print(f" Record type: Arrow Flight (RecordBatch)")
print("=" * 60)

except Exception as e:
logger.error(f"\nError during ingestion: {e}")

# On failure, you can retrieve unacked batches for retry
if stream.is_closed:
unacked = await stream.get_unacked_batches()
if unacked:
logger.info(f" {len(unacked)} unacked batches available for retry")
for i, batch in enumerate(unacked):
logger.info(f" Batch {i}: {batch.num_rows} rows, schema: {batch.schema}")

await stream.close()
raise

except Exception as e:
logger.error(f"\nFailed to initialize stream: {e}")
raise


if __name__ == "__main__":
asyncio.run(main())
Loading
Loading