Skip to content
This repository was archived by the owner on Mar 3, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 168 additions & 27 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ version = "0.11.0"
license = "Apache-2.0"

[workspace.dependencies]

logos = "0.12.1"
serde = { version = "1.0.159", features = ["derive", "rc"] }
ahash = "0.8.3"
anyhow = { version = "1.0.70", features = ["backtrace"] }
approx = "0.5.1"
Expand Down Expand Up @@ -63,10 +66,10 @@ index_vec = { version = "0.1.3", features = ["serde"] }
indoc = "1.0.9"
insta = { version = "1.29.0", features = ["ron", "yaml", "json"] }
inventory = "0.3.8"
kafka = "0.9.0"
itertools = "0.11.0"
lalrpop = "0.20.0"
lalrpop-util = "0.20.0"
logos = "0.12.1"
lz4 = "1.24.0"
lz4-sys = "1.9.4"
num = "0.4.0"
Expand Down Expand Up @@ -97,7 +100,7 @@ pulsar = { version = "5.1.0", default-features = false, features = [
rand = "0.8.5"
regex = "1.9.3"
reqwest = { version = "0.11.14", features = ["native-tls-vendored"] }
serde = { version = "1.0.159", features = ["derive", "rc"] }
schema_registry_converter = { version = "3.1.0", features = ["avro"] }
serde_json = "1.0.95"
serde_yaml = "0.9.19"
sha2 = "0.10.6"
Expand Down Expand Up @@ -149,7 +152,7 @@ features = ["lz4"]

[profile.release]
lto = "thin"
debug = 0 # Set this to 1 or 2 to get more useful backtraces from debugger
debug = 0 # Set this to 1 or 2 to get more useful backtraces from debugger
codegen-units = 1

# Enable max optimizations for dependencies, but not for our code
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ test/int/run-api-postgres-s3-docker:
cd tests/integration/api && ENV=local-docker DB_DIALECT="postgres" OBJECT_STORE_TYPE=s3 OBJECT_STORE_PATH=/data go run github.com/onsi/ginkgo/v2/ginkgo -vv ./...

test/int/run-api:
cd tests/integration/api && ENV=local-local go run github.com/onsi/ginkgo/v2/ginkgo -vv ./...
cd tests/integration/api && ENV=local-local go run github.com/onsi/ginkgo/v2/ginkgo -tags dynamic -vv ./...

test/int/run-api-s3:
cd tests/integration/api && ENV=local-local OBJECT_STORE_TYPE=s3 OBJECT_STORE_PATH=/data go run github.com/onsi/ginkgo/v2/ginkgo -vv ./...
Expand Down
24 changes: 23 additions & 1 deletion clients/python/src/kaskada/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import sys
from pathlib import Path
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, List, Optional, Union

import google.protobuf.wrappers_pb2 as wrappers
import grpc
Expand Down Expand Up @@ -39,6 +39,18 @@ def __init__(
self._tenant = tenant
self._namespace = namespace
self._topic_name = topic_name


class KafkaTableSource(TableSource):
def __init__(
self,
hosts: List[str],
topic: str,
avro_schema: str
):
self._hosts = hosts
self._topic = topic
self._avro_schema = avro_schema


def get_table_name(
Expand Down Expand Up @@ -188,6 +200,16 @@ def create_table(
}
}
}
elif isinstance(source, KafkaTableSource):
table_args["source"] = {
"kafka": {
"config": {
"hosts": source._hosts,
"topic": source._topic,
"avro_schema": source._avro_schema
}
}
}
else:
raise ValueError("invalid table source provided")

Expand Down
4 changes: 2 additions & 2 deletions crates/sparrow-main/src/serve/file_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub(crate) async fn get_source_metadata(
pub(crate) async fn get_pulsar_metadata(
pc: &PulsarConfig,
) -> error_stack::Result<SourceMetadata, Error> {
let metadata = RawMetadata::try_from_pulsar_subscription(pc)
let metadata = RawMetadata::try_from_pulsar_config(pc)
.await
.attach_printable_lazy(|| format!("Pulsar Source: {:?}", pc))
.change_context(Error::Schema(format!(
Expand All @@ -123,7 +123,7 @@ pub(crate) async fn get_pulsar_metadata(
pub(crate) async fn get_kafka_metadata(
kc: &KafkaConfig,
) -> error_stack::Result<SourceMetadata, Error> {
let metadata = RawMetadata::try_from_kafka_subscription(kc)
let metadata = RawMetadata::try_from_kafka_config(kc)
.await
.attach_printable_lazy(|| format!("Kafka Source: {:?}", kc))
.change_context(Error::Schema(format!(
Expand Down
4 changes: 3 additions & 1 deletion crates/sparrow-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ half.workspace = true
hashbrown.workspace = true
inventory.workspace = true
itertools.workspace = true
kafka.workspace = true
lz4 = { workspace = true, optional = true }
num-traits.workspace = true
object_store.workspace = true
Expand All @@ -52,9 +53,10 @@ pin-project.workspace = true
prost-wkt-types.workspace = true
pulsar = { workspace = true, optional = true }
reqwest.workspace = true
schema_registry_converter.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_yaml.workspace = true
serde.workspace = true
sha2.workspace = true
smallvec.workspace = true
sparrow-api = { path = "../sparrow-api" }
Expand Down
1 change: 1 addition & 0 deletions crates/sparrow-runtime/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ impl Batch {
);

let key_triples = KeyTriples::try_from(&data)?;
tracing::debug!("created batch with: {:?}", key_triples);
Self::try_new_with_bounds(
data,
key_triples.first().context("First key triple")?,
Expand Down
4 changes: 4 additions & 0 deletions crates/sparrow-runtime/src/execute/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ impl OperationExecutor {
// We could attempt to determine whether we needed to execute sequentially...
// but for now it is easier to just have the single path.
'operation: while let Some(input) = recv.recv().await {
tracing::debug!("Input operation batch: {:?}", input);

#[cfg(debug_assertions)]
input
.validate_bounds()
Expand All @@ -287,6 +289,8 @@ impl OperationExecutor {
.into_report()
.change_context(Error::internal())?;

tracing::debug!("Output operation batch: {:?}", output);

// For each batch produced by the operation, write it to each channel.
// We currently do this synchronously in the order channels subscribed.
// We could attempt to use a select to send this to channels in the
Expand Down
19 changes: 18 additions & 1 deletion crates/sparrow-runtime/src/execute/operation/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::execute::operation::{InputBatch, Operation, OperationContext};
use crate::execute::progress_reporter::ProgressUpdate;
use crate::execute::{error, Error};
use crate::key_hash_index::KeyHashIndex;
use crate::read::stream_reader::kafka_stream_reader;
use crate::stream_reader::stream_reader;
use crate::table_reader::table_reader;
use crate::Batch;
Expand Down Expand Up @@ -67,6 +68,7 @@ impl Operation for ScanOperation {
.create_input(batch?)
.into_report()
.change_context(Error::PreprocessNextInput)?;
tracing::debug!("Executing next batch: {:?}", input);

// Send progress update
if self
Expand Down Expand Up @@ -279,7 +281,22 @@ impl ScanOperation {

input_stream
}
v1alpha::source::Source::Kafka(_) => todo!(),
v1alpha::source::Source::Kafka(k) => {
let input_stream = kafka_stream_reader(
context,
table_info,
requested_slice.as_ref(),
projected_columns,
// TODO: Fix flight recorder
FlightRecorder::disabled(),
k,
)
.await
.change_context(Error::internal_msg("failed to create stream reader"))?
.map_err(|e| e.change_context(Error::internal_msg("failed to read batch")))
.boxed();
input_stream
}
};

// Currently configures the stream for the following cases:
Expand Down
1 change: 1 addition & 0 deletions crates/sparrow-runtime/src/execute/operation/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl SelectOperation {
.filter(unfiltered_column.as_ref())
.context("filter for select operation")
})?;
tracing::debug!("Select Output: {:?}", output);
Ok(output)
}
}
Expand Down
2 changes: 2 additions & 0 deletions crates/sparrow-runtime/src/execute/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ pub(super) fn write(
};


tracing::debug!("Writing out batch: {:?}", batch);

if batch.num_rows() > max_batch_size {
for start in (0..batch.num_rows()).step_by(max_batch_size) {
let end = (start + max_batch_size).min(batch.num_rows());
Expand Down
77 changes: 72 additions & 5 deletions crates/sparrow-runtime/src/metadata/raw_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use arrow::array::ArrowPrimitiveType;
use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef, TimestampMillisecondType};
use error_stack::{IntoReport, IntoReportCompat, ResultExt};
use sparrow_arrow::avro::from_avro_schema;
use tempfile::NamedTempFile;

use sparrow_api::kaskada::v1alpha::source_data::{self, Source};
Expand Down Expand Up @@ -32,6 +33,10 @@ pub enum Error {
PulsarSchema(String),
#[display(fmt = "unsupport column detected: '{_0}")]
UnsupportedColumn(String),
#[display(fmt = "no kafka schema config")]
MissingKafkaSchemaConfig,
#[display(fmt = "unable to parse kafka avro schema: {_0}")]
KafkaSchema(String),
}

impl error_stack::Context for Error {}
Expand Down Expand Up @@ -59,6 +64,14 @@ pub struct PulsarMetadata {
pub sparrow_metadata: RawMetadata,
}

pub struct KafkaMetadata {
pub user_schema: SchemaRef,

pub sparrow_metadata: RawMetadata,

pub kafka_avro_schema: avro_rs::Schema,
}

impl RawMetadata {
pub async fn try_from(
source: &Source,
Expand All @@ -78,9 +91,7 @@ impl RawMetadata {
}
}

pub async fn try_from_pulsar_subscription(
ps: &PulsarConfig,
) -> error_stack::Result<Self, Error> {
pub async fn try_from_pulsar_config(ps: &PulsarConfig) -> error_stack::Result<Self, Error> {
// The `_publish_time` is metadata on the pulsar message, and required
// by the `prepare` step. However, that is not part of the user's schema.
// The prepare path calls `try_from_pulsar` directly, so for all other cases
Expand All @@ -94,8 +105,9 @@ impl RawMetadata {
.sparrow_metadata)
}

pub async fn try_from_kafka_subscription(_: &KafkaConfig) -> error_stack::Result<Self, Error> {
todo!()
/// TODO: Docs
pub async fn try_from_kafka_config(config: &KafkaConfig) -> error_stack::Result<Self, Error> {
Ok(Self::try_from_kafka(&config).await?.sparrow_metadata)
}

/// Create `RawMetadata` from a raw schema.
Expand Down Expand Up @@ -212,6 +224,36 @@ impl RawMetadata {

Self::from_raw_schema(Arc::new(raw_schema))
}

pub(crate) async fn try_from_kafka(
config: &KafkaConfig,
) -> error_stack::Result<KafkaMetadata, Error> {
let schema = config
.schema
.to_owned()
.ok_or(Error::MissingKafkaSchemaConfig)?;
match schema {
sparrow_api::kaskada::v1alpha::kafka_config::Schema::AvroSchema(avro_schema) => {
let parsed_schema: avro_schema::schema::Schema = serde_json::from_str(&avro_schema)
.into_report()
.change_context_lazy(|| Error::KafkaSchema(avro_schema.to_owned()))?;
// let parsed_schema = Arc::new(parsed_schema);
let converted_schema = Arc::new(
from_avro_schema(&parsed_schema)
.change_context(Error::KafkaSchema("".to_owned()))?,
);
let avro_schema = avro_rs::Schema::parse_str(&avro_schema)
.into_report()
.change_context_lazy(|| Error::KafkaSchema(avro_schema.to_owned()))?;
Ok(KafkaMetadata {
user_schema: converted_schema.clone(),
sparrow_metadata: Self::from_raw_schema(converted_schema.clone())?,
kafka_avro_schema: avro_schema,
})
}
sparrow_api::kaskada::v1alpha::kafka_config::Schema::SchemaRegistryUrl(_) => todo!(),
}
}
}

/// Converts the schema to a table schema
Expand Down Expand Up @@ -267,6 +309,7 @@ mod tests {
use std::sync::Arc;

use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use sparrow_api::kaskada::v1alpha::KafkaConfig;

use crate::RawMetadata;

Expand Down Expand Up @@ -401,4 +444,28 @@ mod tests {
}
}
}

#[tokio::test]
async fn test_raw_metadata_from_kafka_schema() {
let schema_def = "{\"type\": \"record\", \"name\": \"MyRecord\", \"fields\": [{\"name\": \"time\", \"type\":\"long\"}, {\"name\": \"id\", \"type\": \"long\"}, {\"name\": \"my_val\", \"type\": \"long\"}]}";
let expected = Arc::new(Schema::new(vec![
Field::new("time", DataType::Int64, false),
Field::new("id", DataType::Int64, false),
Field::new("my_val", DataType::Int64, false),
]));

let kafka_config = KafkaConfig {
hosts: vec![],
topic: "awkward-topic".to_string(),
schema: Some(
sparrow_api::kaskada::v1alpha::kafka_config::Schema::AvroSchema(
schema_def.to_owned(),
),
),
};
let metadata = RawMetadata::try_from_kafka_config(&kafka_config)
.await
.unwrap();
assert_eq!(metadata.raw_schema, expected);
}
}
2 changes: 2 additions & 0 deletions crates/sparrow-runtime/src/prepare/execute_input_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ pub async fn prepare_input<'a>(
Ok(async_stream::try_stream! {
let mut input_buffer = InputBuffer::new();
while let Some(unfiltered_batch) = reader.next().await {
tracing::debug!("Execute Input Stream: {:?}", unfiltered_batch);
let unfiltered_batch = unfiltered_batch.into_report().change_context(Error::PreparingColumn)?;
let unfiltered_rows = unfiltered_batch.num_rows();

Expand Down Expand Up @@ -291,6 +292,7 @@ pub async fn prepare_input<'a>(
None
}
};
tracing::debug!("Prepared record batch: {:?}", record_batch);
yield record_batch
}

Expand Down
Loading