diff --git a/Cargo.lock b/Cargo.lock index 8dcb2220f..ce6cfcb05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,9 +4,9 @@ version = 3 [[package]] name = "addr2line" -version = "0.20.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4fa78e18c64fce05e902adecd7a5eed15a5e0a3439f7b0e169f0252214865e3" +checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" dependencies = [ "gimli", ] @@ -156,6 +156,31 @@ dependencies = [ "backtrace", ] +[[package]] +name = "apache-avro" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cf4144857f9e4d7dd6cc4ba4c78efd2a46bad682b029bd0d91e76a021af1b2a" +dependencies = [ + "byteorder", + "digest 0.10.7", + "lazy_static", + "libflate", + "log", + "num-bigint 0.4.4", + "quad-rand", + "rand 0.8.5", + "regex", + "serde", + "serde_json", + "strum 0.24.1", + "strum_macros 0.24.3", + "thiserror", + "typed-builder 0.10.0", + "uuid 1.4.1", + "zerocopy 0.6.3", +] + [[package]] name = "approx" version = "0.3.2" @@ -644,9 +669,9 @@ dependencies = [ "strum 0.18.0", "strum_macros 0.18.0", "thiserror", - "typed-builder", + "typed-builder 0.5.1", "uuid 0.8.2", - "zerocopy", + "zerocopy 0.3.0", ] [[package]] @@ -707,9 +732,9 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.68" +version = "0.3.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4319208da049c43661739c5fade2ba182f09d1dc2299b32298d3a31692b17e12" +checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" dependencies = [ "addr2line", "cc", @@ -746,7 +771,7 @@ checksum = "454bca3db10617b88b566f205ed190aedb0e0e6dd4cad61d3988a72e8c5594cb" dependencies = [ "autocfg", "libm", - "num-bigint 0.4.3", + "num-bigint 0.4.4", "num-integer", "num-traits", "serde", @@ -1042,9 +1067,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.3.23" +version = "4.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03aef18ddf7d879c15ce20f04826ef8418101c7e528014c3eeea13321047dca3" +checksum = "fb690e81c7840c0d7aade59f242ea3b41b9bc27bcd5997890e7702ae4b32e487" dependencies = [ "clap_builder", "clap_derive", @@ -1053,9 +1078,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.3.23" +version = "4.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8ce6fffb678c9b80a70b6b6de0aad31df727623a70fd9a842c30cd573e2fa98" +checksum = "5ed2e96bc16d8d740f6f48d663eddf4b8a0983e79210fd55479b7bcd0a69860e" dependencies = [ "anstream", "anstyle", @@ -1218,15 +1243,30 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49fc9a695bca7f35f5f4c15cddc84415f66a74ea78eef08e90c5024f2b540e23" +dependencies = [ + "crc-catalog 1.1.1", +] + [[package]] name = "crc" version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe" dependencies = [ - "crc-catalog", + "crc-catalog 2.2.0", ] +[[package]] +name = "crc-catalog" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccaeedb56da03b09f598226e25e80088cb4cd25f316e6e4df7d695f0feeb1403" + [[package]] name = "crc-catalog" version = "2.2.0" @@ -1501,9 +1541,9 @@ checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0" [[package]] name = "encoding_rs" -version = "0.8.32" +version = "0.8.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "071a31f4ee85403370b58aca746f01041ede6f0da2730960ad001edc2b71b394" +checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1" dependencies = [ "cfg-if", ] @@ -1869,9 +1909,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.27.3" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e" +checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" [[package]] name = "glob" @@ -2325,6 +2365,25 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kafka" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b11c86b0c0c9a9d89b136b2938a5b46a35c40f66eced2f09c76458b17dadfc2a" +dependencies = [ + "byteorder", + "crc 2.1.0", + "flate2", + "fnv", + "openssl", + "openssl-sys", + "ref_slice", + "snap", + "thiserror", + "tracing", + "twox-hash", +] + [[package]] name = "kv-log-macro" version = "1.0.7" @@ -2712,7 +2771,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05180d69e3da0e530ba2a1dae5110317e49e3b7f3d41be227dc5f92e49ee7af" dependencies = [ - "num-bigint 0.4.3", + "num-bigint 0.4.4", "num-complex", "num-integer", "num-iter", @@ -2733,9 +2792,9 @@ dependencies = [ [[package]] name = "num-bigint" -version = "0.4.3" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f" +checksum = "608e7659b5c3d7cba262d894801b9ec9d00de989e8a82bd4bef91d08da45cdc0" dependencies = [ "autocfg", "num-integer", @@ -2779,7 +2838,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0" dependencies = [ "autocfg", - "num-bigint 0.4.3", + "num-bigint 0.4.4", "num-integer", "num-traits", ] @@ -2806,9 +2865,9 @@ dependencies = [ [[package]] name = "object" -version = "0.31.1" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bda667d9f2b5051b8833f59f3bf748b28ef54f850f4fcb389a252aa383866d1" +checksum = "77ac5bbd07aea88c60a577a1ce218075ffd59208b2d7ca97adf9bfc5aeb21ebe" dependencies = [ "memchr", ] @@ -3089,7 +3148,7 @@ dependencies = [ "hashbrown 0.14.0", "lz4", "num", - "num-bigint 0.4.3", + "num-bigint 0.4.4", "paste", "seq-macro", "snap", @@ -3547,7 +3606,7 @@ dependencies = [ "bit-vec", "bytes", "chrono", - "crc", + "crc 3.0.1", "futures", "futures-io", "futures-timer", @@ -3568,6 +3627,12 @@ dependencies = [ "uuid 1.4.1", ] +[[package]] +name = "quad-rand" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "658fa1faf7a4cc5f057c9ee5ef560f717ad9d8dc66d975267f709624d6e1ab88" + [[package]] name = "quick-error" version = "1.2.3" @@ -3708,6 +3773,12 @@ dependencies = [ "thiserror", ] +[[package]] +name = "ref_slice" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4ed1d73fb92eba9b841ba2aef69533a060ccc0d3ec71c90aeda5996d4afb7a9" + [[package]] name = "regex" version = "1.9.3" @@ -3959,6 +4030,21 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "schema_registry_converter" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ea1c251bea837ece3126074d9e01c5868e5ae6135946404ceb20b5b2e264f7c" +dependencies = [ + "apache-avro", + "byteorder", + "dashmap", + "futures", + "reqwest", + "serde", + "serde_json", +] + [[package]] name = "scoped-tls" version = "1.0.1" @@ -4148,9 +4234,9 @@ dependencies = [ [[package]] name = "siphasher" -version = "0.3.10" +version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" +checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" [[package]] name = "slab" @@ -4710,6 +4796,7 @@ dependencies = [ "insta", "inventory", "itertools 0.11.0", + "kafka", "lz4", "num-traits", "object_store", @@ -4722,6 +4809,7 @@ dependencies = [ "pulsar", "rand 0.8.5", "reqwest", + "schema_registry_converter", "serde", "serde_json", "serde_yaml", @@ -4971,6 +5059,12 @@ version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57bd81eb48f4c437cadc685403cad539345bf703d78e63707418431cecd4522b" +[[package]] +name = "strum" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f" + [[package]] name = "strum" version = "0.25.0" @@ -4989,6 +5083,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "strum_macros" +version = "0.24.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "rustversion", + "syn 1.0.109", +] + [[package]] name = "strum_macros" version = "0.25.2" @@ -5590,6 +5697,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", + "rand 0.8.5", "static_assertions", ] @@ -5604,6 +5712,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "typed-builder" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89851716b67b937e393b3daa8423e67ddfc4bbbf1654bcf05488e95e0828db0c" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "typenum" version = "1.16.0" @@ -5798,6 +5917,7 @@ checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" dependencies = [ "getrandom 0.2.10", "rand 0.8.5", + "serde", ] [[package]] @@ -6208,7 +6328,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6580539ad917b7c026220c4b3f2c08d52ce54d6ce0dc491e66002e35388fab46" dependencies = [ "byteorder", - "zerocopy-derive", + "zerocopy-derive 0.2.0", +] + +[[package]] +name = "zerocopy" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3b9c234616391070b0b173963ebc65a9195068e7ed3731c6edac2ec45ebe106" +dependencies = [ + "byteorder", + "zerocopy-derive 0.6.3", ] [[package]] @@ -6222,6 +6352,17 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zerocopy-derive" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f7f3a471f98d0a61c34322fbbfd10c384b07687f680d4119813713f72308d91" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.29", +] + [[package]] name = "zstd" version = "0.12.4" diff --git a/Cargo.toml b/Cargo.toml index 1a6abd684..3e608325e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,9 @@ version = "0.11.0" license = "Apache-2.0" [workspace.dependencies] + +logos = "0.12.1" +serde = { version = "1.0.159", features = ["derive", "rc"] } ahash = "0.8.3" anyhow = { version = "1.0.70", features = ["backtrace"] } approx = "0.5.1" @@ -63,10 +66,10 @@ index_vec = { version = "0.1.3", features = ["serde"] } indoc = "1.0.9" insta = { version = "1.29.0", features = ["ron", "yaml", "json"] } inventory = "0.3.8" +kafka = "0.9.0" itertools = "0.11.0" lalrpop = "0.20.0" lalrpop-util = "0.20.0" -logos = "0.12.1" lz4 = "1.24.0" lz4-sys = "1.9.4" num = "0.4.0" @@ -97,7 +100,7 @@ pulsar = { version = "5.1.0", default-features = false, features = [ rand = "0.8.5" regex = "1.9.3" reqwest = { version = "0.11.14", features = ["native-tls-vendored"] } -serde = { version = "1.0.159", features = ["derive", "rc"] } +schema_registry_converter = { version = "3.1.0", features = ["avro"] } serde_json = "1.0.95" serde_yaml = "0.9.19" sha2 = "0.10.6" @@ -149,7 +152,7 @@ features = ["lz4"] [profile.release] lto = "thin" -debug = 0 # Set this to 1 or 2 to get more useful backtraces from debugger +debug = 0 # Set this to 1 or 2 to get more useful backtraces from debugger codegen-units = 1 # Enable max optimizations for dependencies, but not for our code diff --git a/Makefile b/Makefile index 4fb2a7d5e..00fabf6df 100644 --- a/Makefile +++ b/Makefile @@ -86,7 +86,7 @@ test/int/run-api-postgres-s3-docker: cd tests/integration/api && ENV=local-docker DB_DIALECT="postgres" OBJECT_STORE_TYPE=s3 OBJECT_STORE_PATH=/data go run github.com/onsi/ginkgo/v2/ginkgo -vv ./... test/int/run-api: - cd tests/integration/api && ENV=local-local go run github.com/onsi/ginkgo/v2/ginkgo -vv ./... + cd tests/integration/api && ENV=local-local go run github.com/onsi/ginkgo/v2/ginkgo -tags dynamic -vv ./... test/int/run-api-s3: cd tests/integration/api && ENV=local-local OBJECT_STORE_TYPE=s3 OBJECT_STORE_PATH=/data go run github.com/onsi/ginkgo/v2/ginkgo -vv ./... diff --git a/clients/python/src/kaskada/table.py b/clients/python/src/kaskada/table.py index 0f2ac96d7..1227bb4e8 100644 --- a/clients/python/src/kaskada/table.py +++ b/clients/python/src/kaskada/table.py @@ -2,7 +2,7 @@ import logging import sys from pathlib import Path -from typing import Any, Dict, Optional, Union +from typing import Any, Dict, List, Optional, Union import google.protobuf.wrappers_pb2 as wrappers import grpc @@ -39,6 +39,18 @@ def __init__( self._tenant = tenant self._namespace = namespace self._topic_name = topic_name + + +class KafkaTableSource(TableSource): + def __init__( + self, + hosts: List[str], + topic: str, + avro_schema: str + ): + self._hosts = hosts + self._topic = topic + self._avro_schema = avro_schema def get_table_name( @@ -188,6 +200,16 @@ def create_table( } } } + elif isinstance(source, KafkaTableSource): + table_args["source"] = { + "kafka": { + "config": { + "hosts": source._hosts, + "topic": source._topic, + "avro_schema": source._avro_schema + } + } + } else: raise ValueError("invalid table source provided") diff --git a/crates/sparrow-main/src/serve/file_service.rs b/crates/sparrow-main/src/serve/file_service.rs index dc506b317..f3b6e155d 100644 --- a/crates/sparrow-main/src/serve/file_service.rs +++ b/crates/sparrow-main/src/serve/file_service.rs @@ -110,7 +110,7 @@ pub(crate) async fn get_source_metadata( pub(crate) async fn get_pulsar_metadata( pc: &PulsarConfig, ) -> error_stack::Result { - let metadata = RawMetadata::try_from_pulsar_subscription(pc) + let metadata = RawMetadata::try_from_pulsar_config(pc) .await .attach_printable_lazy(|| format!("Pulsar Source: {:?}", pc)) .change_context(Error::Schema(format!( @@ -123,7 +123,7 @@ pub(crate) async fn get_pulsar_metadata( pub(crate) async fn get_kafka_metadata( kc: &KafkaConfig, ) -> error_stack::Result { - let metadata = RawMetadata::try_from_kafka_subscription(kc) + let metadata = RawMetadata::try_from_kafka_config(kc) .await .attach_printable_lazy(|| format!("Kafka Source: {:?}", kc)) .change_context(Error::Schema(format!( diff --git a/crates/sparrow-runtime/Cargo.toml b/crates/sparrow-runtime/Cargo.toml index a5d6518c8..d54a2c775 100644 --- a/crates/sparrow-runtime/Cargo.toml +++ b/crates/sparrow-runtime/Cargo.toml @@ -43,6 +43,7 @@ half.workspace = true hashbrown.workspace = true inventory.workspace = true itertools.workspace = true +kafka.workspace = true lz4 = { workspace = true, optional = true } num-traits.workspace = true object_store.workspace = true @@ -52,9 +53,10 @@ pin-project.workspace = true prost-wkt-types.workspace = true pulsar = { workspace = true, optional = true } reqwest.workspace = true +schema_registry_converter.workspace = true +serde.workspace = true serde_json.workspace = true serde_yaml.workspace = true -serde.workspace = true sha2.workspace = true smallvec.workspace = true sparrow-api = { path = "../sparrow-api" } diff --git a/crates/sparrow-runtime/src/batch.rs b/crates/sparrow-runtime/src/batch.rs index 32e180cca..ee7009607 100644 --- a/crates/sparrow-runtime/src/batch.rs +++ b/crates/sparrow-runtime/src/batch.rs @@ -51,6 +51,7 @@ impl Batch { ); let key_triples = KeyTriples::try_from(&data)?; + tracing::debug!("created batch with: {:?}", key_triples); Self::try_new_with_bounds( data, key_triples.first().context("First key triple")?, diff --git a/crates/sparrow-runtime/src/execute/operation.rs b/crates/sparrow-runtime/src/execute/operation.rs index a2a558fa1..7cca195b2 100644 --- a/crates/sparrow-runtime/src/execute/operation.rs +++ b/crates/sparrow-runtime/src/execute/operation.rs @@ -264,6 +264,8 @@ impl OperationExecutor { // We could attempt to determine whether we needed to execute sequentially... // but for now it is easier to just have the single path. 'operation: while let Some(input) = recv.recv().await { + tracing::debug!("Input operation batch: {:?}", input); + #[cfg(debug_assertions)] input .validate_bounds() @@ -287,6 +289,8 @@ impl OperationExecutor { .into_report() .change_context(Error::internal())?; + tracing::debug!("Output operation batch: {:?}", output); + // For each batch produced by the operation, write it to each channel. // We currently do this synchronously in the order channels subscribed. // We could attempt to use a select to send this to channels in the diff --git a/crates/sparrow-runtime/src/execute/operation/scan.rs b/crates/sparrow-runtime/src/execute/operation/scan.rs index 1bb1c3eb4..884abb863 100644 --- a/crates/sparrow-runtime/src/execute/operation/scan.rs +++ b/crates/sparrow-runtime/src/execute/operation/scan.rs @@ -19,6 +19,7 @@ use crate::execute::operation::{InputBatch, Operation, OperationContext}; use crate::execute::progress_reporter::ProgressUpdate; use crate::execute::{error, Error}; use crate::key_hash_index::KeyHashIndex; +use crate::read::stream_reader::kafka_stream_reader; use crate::stream_reader::stream_reader; use crate::table_reader::table_reader; use crate::Batch; @@ -67,6 +68,7 @@ impl Operation for ScanOperation { .create_input(batch?) .into_report() .change_context(Error::PreprocessNextInput)?; + tracing::debug!("Executing next batch: {:?}", input); // Send progress update if self @@ -279,7 +281,22 @@ impl ScanOperation { input_stream } - v1alpha::source::Source::Kafka(_) => todo!(), + v1alpha::source::Source::Kafka(k) => { + let input_stream = kafka_stream_reader( + context, + table_info, + requested_slice.as_ref(), + projected_columns, + // TODO: Fix flight recorder + FlightRecorder::disabled(), + k, + ) + .await + .change_context(Error::internal_msg("failed to create stream reader"))? + .map_err(|e| e.change_context(Error::internal_msg("failed to read batch"))) + .boxed(); + input_stream + } }; // Currently configures the stream for the following cases: diff --git a/crates/sparrow-runtime/src/execute/operation/select.rs b/crates/sparrow-runtime/src/execute/operation/select.rs index f31cd886e..4097dabda 100644 --- a/crates/sparrow-runtime/src/execute/operation/select.rs +++ b/crates/sparrow-runtime/src/execute/operation/select.rs @@ -99,6 +99,7 @@ impl SelectOperation { .filter(unfiltered_column.as_ref()) .context("filter for select operation") })?; + tracing::debug!("Select Output: {:?}", output); Ok(output) } } diff --git a/crates/sparrow-runtime/src/execute/output.rs b/crates/sparrow-runtime/src/execute/output.rs index aac939595..f7fd693db 100644 --- a/crates/sparrow-runtime/src/execute/output.rs +++ b/crates/sparrow-runtime/src/execute/output.rs @@ -110,6 +110,8 @@ pub(super) fn write( }; + tracing::debug!("Writing out batch: {:?}", batch); + if batch.num_rows() > max_batch_size { for start in (0..batch.num_rows()).step_by(max_batch_size) { let end = (start + max_batch_size).min(batch.num_rows()); diff --git a/crates/sparrow-runtime/src/metadata/raw_metadata.rs b/crates/sparrow-runtime/src/metadata/raw_metadata.rs index 636efa871..5bce7e246 100644 --- a/crates/sparrow-runtime/src/metadata/raw_metadata.rs +++ b/crates/sparrow-runtime/src/metadata/raw_metadata.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use arrow::array::ArrowPrimitiveType; use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef, TimestampMillisecondType}; use error_stack::{IntoReport, IntoReportCompat, ResultExt}; +use sparrow_arrow::avro::from_avro_schema; use tempfile::NamedTempFile; use sparrow_api::kaskada::v1alpha::source_data::{self, Source}; @@ -32,6 +33,10 @@ pub enum Error { PulsarSchema(String), #[display(fmt = "unsupport column detected: '{_0}")] UnsupportedColumn(String), + #[display(fmt = "no kafka schema config")] + MissingKafkaSchemaConfig, + #[display(fmt = "unable to parse kafka avro schema: {_0}")] + KafkaSchema(String), } impl error_stack::Context for Error {} @@ -59,6 +64,14 @@ pub struct PulsarMetadata { pub sparrow_metadata: RawMetadata, } +pub struct KafkaMetadata { + pub user_schema: SchemaRef, + + pub sparrow_metadata: RawMetadata, + + pub kafka_avro_schema: avro_rs::Schema, +} + impl RawMetadata { pub async fn try_from( source: &Source, @@ -78,9 +91,7 @@ impl RawMetadata { } } - pub async fn try_from_pulsar_subscription( - ps: &PulsarConfig, - ) -> error_stack::Result { + pub async fn try_from_pulsar_config(ps: &PulsarConfig) -> error_stack::Result { // The `_publish_time` is metadata on the pulsar message, and required // by the `prepare` step. However, that is not part of the user's schema. // The prepare path calls `try_from_pulsar` directly, so for all other cases @@ -94,8 +105,9 @@ impl RawMetadata { .sparrow_metadata) } - pub async fn try_from_kafka_subscription(_: &KafkaConfig) -> error_stack::Result { - todo!() + /// TODO: Docs + pub async fn try_from_kafka_config(config: &KafkaConfig) -> error_stack::Result { + Ok(Self::try_from_kafka(&config).await?.sparrow_metadata) } /// Create `RawMetadata` from a raw schema. @@ -212,6 +224,36 @@ impl RawMetadata { Self::from_raw_schema(Arc::new(raw_schema)) } + + pub(crate) async fn try_from_kafka( + config: &KafkaConfig, + ) -> error_stack::Result { + let schema = config + .schema + .to_owned() + .ok_or(Error::MissingKafkaSchemaConfig)?; + match schema { + sparrow_api::kaskada::v1alpha::kafka_config::Schema::AvroSchema(avro_schema) => { + let parsed_schema: avro_schema::schema::Schema = serde_json::from_str(&avro_schema) + .into_report() + .change_context_lazy(|| Error::KafkaSchema(avro_schema.to_owned()))?; + // let parsed_schema = Arc::new(parsed_schema); + let converted_schema = Arc::new( + from_avro_schema(&parsed_schema) + .change_context(Error::KafkaSchema("".to_owned()))?, + ); + let avro_schema = avro_rs::Schema::parse_str(&avro_schema) + .into_report() + .change_context_lazy(|| Error::KafkaSchema(avro_schema.to_owned()))?; + Ok(KafkaMetadata { + user_schema: converted_schema.clone(), + sparrow_metadata: Self::from_raw_schema(converted_schema.clone())?, + kafka_avro_schema: avro_schema, + }) + } + sparrow_api::kaskada::v1alpha::kafka_config::Schema::SchemaRegistryUrl(_) => todo!(), + } + } } /// Converts the schema to a table schema @@ -267,6 +309,7 @@ mod tests { use std::sync::Arc; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; + use sparrow_api::kaskada::v1alpha::KafkaConfig; use crate::RawMetadata; @@ -401,4 +444,28 @@ mod tests { } } } + + #[tokio::test] + async fn test_raw_metadata_from_kafka_schema() { + let schema_def = "{\"type\": \"record\", \"name\": \"MyRecord\", \"fields\": [{\"name\": \"time\", \"type\":\"long\"}, {\"name\": \"id\", \"type\": \"long\"}, {\"name\": \"my_val\", \"type\": \"long\"}]}"; + let expected = Arc::new(Schema::new(vec![ + Field::new("time", DataType::Int64, false), + Field::new("id", DataType::Int64, false), + Field::new("my_val", DataType::Int64, false), + ])); + + let kafka_config = KafkaConfig { + hosts: vec![], + topic: "awkward-topic".to_string(), + schema: Some( + sparrow_api::kaskada::v1alpha::kafka_config::Schema::AvroSchema( + schema_def.to_owned(), + ), + ), + }; + let metadata = RawMetadata::try_from_kafka_config(&kafka_config) + .await + .unwrap(); + assert_eq!(metadata.raw_schema, expected); + } } diff --git a/crates/sparrow-runtime/src/prepare/execute_input_stream.rs b/crates/sparrow-runtime/src/prepare/execute_input_stream.rs index 8932980fa..fc6cb3ffb 100644 --- a/crates/sparrow-runtime/src/prepare/execute_input_stream.rs +++ b/crates/sparrow-runtime/src/prepare/execute_input_stream.rs @@ -116,6 +116,7 @@ pub async fn prepare_input<'a>( Ok(async_stream::try_stream! { let mut input_buffer = InputBuffer::new(); while let Some(unfiltered_batch) = reader.next().await { + tracing::debug!("Execute Input Stream: {:?}", unfiltered_batch); let unfiltered_batch = unfiltered_batch.into_report().change_context(Error::PreparingColumn)?; let unfiltered_rows = unfiltered_batch.num_rows(); @@ -291,6 +292,7 @@ pub async fn prepare_input<'a>( None } }; + tracing::debug!("Prepared record batch: {:?}", record_batch); yield record_batch } diff --git a/crates/sparrow-runtime/src/read/stream_reader.rs b/crates/sparrow-runtime/src/read/stream_reader.rs index 13a493783..12e1e0580 100644 --- a/crates/sparrow-runtime/src/read/stream_reader.rs +++ b/crates/sparrow-runtime/src/read/stream_reader.rs @@ -6,7 +6,9 @@ use error_stack::{IntoReportCompat, ResultExt}; use futures::{Stream, StreamExt}; use hashbrown::HashSet; use sparrow_api::kaskada::v1alpha::slice_plan::Slice; -use sparrow_api::kaskada::v1alpha::{PulsarSource, PulsarSubscription}; +use sparrow_api::kaskada::v1alpha::{ + KafkaSource, KafkaSubscription, PulsarSource, PulsarSubscription, +}; use sparrow_compiler::TableInfo; use sparrow_qfr::{ activity, gauge, Activity, FlightRecorder, Gauge, PushRegistration, Registration, Registrations, @@ -44,7 +46,7 @@ inventory::submit!(®ISTRATION); /// This is hard-coded for now, but could easily be made configurable as a parameter /// to the table. This simple hueristic is a good start, but we can improve on this /// by statistically modeling event behavior and adapting the watermark accordingly. -const BOUNDED_LATENESS_NS: i64 = 1_000_000_000; +const BOUNDED_LATENESS_NS: i64 = 500_000_000; /// Create a stream that continually reads messages from a stream. pub(crate) async fn stream_reader( @@ -135,6 +137,93 @@ pub(crate) async fn stream_reader( }) } +/// Create a stream that continually reads messages from a stream. +pub(crate) async fn kafka_stream_reader( + context: &OperationContext, + table_info: &TableInfo, + requested_slice: Option<&Slice>, + projected_columns: Option>, + _flight_recorder: FlightRecorder, + kafka_source: &KafkaSource, +) -> error_stack::Result> + 'static, Error> { + // TODO: This should be the materialization ID, or configurable by the user. + // This will be important when restarting a consumer at a specific point. + // let pulsar_subscription = + // std::env::var("PULSAR_SUBSCRIPTION").unwrap_or("subscription-default".to_owned()); + let kafka_config = kafka_source.config.as_ref().ok_or(Error::Internal)?; + let kafka_subscription = KafkaSubscription { + config: Some(kafka_config.clone()), + group: "some-awkward-group".to_owned(), // TODO: Fix this. + }; + let kafka_metadata = RawMetadata::try_from_kafka(kafka_config) + .await + .change_context(Error::CreateStream)?; + // Verify the provided table schema matches the topic schema + verify_schema_match( + kafka_metadata.sparrow_metadata.raw_schema.clone(), + table_info.schema().clone(), + )?; + + // The projected schema should come from the table_schema, which includes converted + // timestamp column, dropped decimal columns, etc. + // i.e. any changes we make to the raw schema to be able to process rows. + let projected_schema = if let Some(columns) = &projected_columns { + projected_schema(kafka_metadata.sparrow_metadata.table_schema, columns) + .change_context(Error::CreateStream)? + } else { + kafka_metadata.sparrow_metadata.table_schema + }; + + let consumer = streams::kafka::stream::consumer(&kafka_subscription) + .await + .change_context(Error::CreateStream)?; + + let stream = streams::kafka::stream::execution_stream( + kafka_metadata.kafka_avro_schema, + kafka_metadata.sparrow_metadata.raw_schema, + projected_schema.clone(), + consumer, + ); + + let table_config = table_info.config().clone(); + let bounded_lateness = if let Some(bounded_lateness) = context.bounded_lateness_ns { + bounded_lateness + } else { + BOUNDED_LATENESS_NS + }; + + let mut input_stream = prepare::execute_input_stream::prepare_input( + stream.boxed(), + table_config, + kafka_metadata.user_schema.clone(), + projected_schema, + 0, + requested_slice, + context.key_hash_inverse.clone(), + bounded_lateness, + ) + .await + .into_report() + .change_context(Error::CreateStream)?; + + Ok(async_stream::try_stream! { + loop { + if let Some(next_input) = input_stream.next().await { + let next_input = next_input.change_context(Error::ReadNextBatch)?; + tracing::debug!("Stream reader next batch: {:?}", next_input); + match next_input { + None => continue, + Some(input) => { + yield Batch::try_new_from_batch(input).into_report().change_context(Error::Internal)? + } + } + } else { + // Loop indefinitely - it's possible a batch was not produced because the watermark did not advance. + } + } + }) +} + /// Compute the projected schema from a base schema and projected columns. fn projected_schema( schema: SchemaRef, diff --git a/crates/sparrow-runtime/src/streams.rs b/crates/sparrow-runtime/src/streams.rs index a11a84f70..744bef6f8 100644 --- a/crates/sparrow-runtime/src/streams.rs +++ b/crates/sparrow-runtime/src/streams.rs @@ -1 +1,59 @@ +use arrow::datatypes::Schema; +use avro_rs::types::Value; +use hashbrown::HashSet; + +pub(crate) mod kafka; pub(crate) mod pulsar; + +pub struct AvroWrapper { + user_record: Value, + projected_record: Value, +} + +#[derive(derive_more::Display, Debug)] +pub enum DeserializeError { + #[display(fmt = "error reading Avro record")] + Avro, + #[display(fmt = "unsupported Avro value")] + UnsupportedType, + #[display(fmt = "internal error")] + InternalError, +} + +impl error_stack::Context for DeserializeError {} + +#[derive(Debug)] +struct DeserializeErrorWrapper(error_stack::Report); + +impl std::fmt::Display for DeserializeErrorWrapper { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(&self.0, f) + } +} + +impl std::error::Error for DeserializeErrorWrapper {} + +impl From> for DeserializeErrorWrapper { + fn from(error: error_stack::Report) -> Self { + DeserializeErrorWrapper(error) + } +} + +/// Determine needed indices given a file schema and projected schema. +fn get_columns_to_read(file_schema: &Schema, projected_schema: &Schema) -> Vec { + let needed_columns: HashSet<_> = projected_schema + .fields() + .iter() + .map(|field| field.name()) + .collect(); + + let mut columns = Vec::with_capacity(3 + needed_columns.len()); + + for (index, column) in file_schema.fields().iter().enumerate() { + if needed_columns.contains(column.name()) { + columns.push(index) + } + } + + columns +} diff --git a/crates/sparrow-runtime/src/streams/kafka.rs b/crates/sparrow-runtime/src/streams/kafka.rs new file mode 100644 index 000000000..e59dade62 --- /dev/null +++ b/crates/sparrow-runtime/src/streams/kafka.rs @@ -0,0 +1,2 @@ +pub(crate) mod schema; +pub(crate) mod stream; diff --git a/crates/sparrow-runtime/src/streams/kafka/schema.rs b/crates/sparrow-runtime/src/streams/kafka/schema.rs new file mode 100644 index 000000000..24376df5e --- /dev/null +++ b/crates/sparrow-runtime/src/streams/kafka/schema.rs @@ -0,0 +1,31 @@ +use schema_registry_converter::{ + async_impl::schema_registry::{get_schema_by_subject, SrSettings}, + schema_registry_common::SubjectNameStrategy, +}; + +use error_stack::{IntoReport, Result, ResultExt}; + +/// TODO: Pending Implementation +#[allow(unused)] +#[derive(Debug, derive_more::Display)] +pub enum Error { + #[allow(dead_code)] + AvroNotEnabled, + AvroSchemaConversion, + SchemaRequest, + UnsupportedSchema, +} +impl error_stack::Context for Error {} + +#[allow(unused)] +/// TODO: Pending implementation +pub async fn get_kafka_schema() -> Result { + let sr_settings = SrSettings::new(String::from("http://localhost:8085")); + let subject_name_strategy = + SubjectNameStrategy::TopicNameStrategy(String::from("my-topic"), false); + let schema = get_schema_by_subject(&sr_settings, &subject_name_strategy) + .await + .into_report() + .change_context(Error::SchemaRequest)?; + Ok(schema.schema) +} diff --git a/crates/sparrow-runtime/src/streams/kafka/stream.rs b/crates/sparrow-runtime/src/streams/kafka/stream.rs new file mode 100644 index 000000000..6b6ca22dd --- /dev/null +++ b/crates/sparrow-runtime/src/streams/kafka/stream.rs @@ -0,0 +1,144 @@ +use std::time::SystemTime; + +use arrow::{datatypes::SchemaRef, error::ArrowError, record_batch::RecordBatch}; +use avro_rs::types::Value; +use avro_rs::Reader; +use avro_rs::Schema; +use error_stack::{IntoReport, ResultExt}; +use futures_lite::Stream; +use kafka::consumer::Consumer; +use sparrow_api::kaskada::v1alpha::KafkaSubscription; + +use crate::streams::get_columns_to_read; +use crate::streams::DeserializeError; +use crate::streams::DeserializeErrorWrapper; + +#[derive(derive_more::Display, Debug)] +pub enum Error { + #[display(fmt = "missing kafka config")] + MissingKafkaConfig, + #[display(fmt = "unable to create consume")] + CreateKafkaConsumer, +} + +impl error_stack::Context for Error {} + +pub fn execution_stream( + kafka_avro_schema: Schema, + raw_schema: SchemaRef, + projected_schema: SchemaRef, + consumer: Consumer, +) -> impl Stream> { + async_stream::try_stream! { + let mut reader = KafkaReader::new(kafka_avro_schema, raw_schema, projected_schema, consumer); + loop { + if let Some(next) = reader.next_result_async().await? { + tracing::debug!("yielded batch size {:?}", next); + yield next + } else { + // Keep looping - this may happen if we timed out trying to read from the stream + } + } + } +} + +struct KafkaReader { + kafka_avro_schema: Schema, + raw_schema: SchemaRef, + /// The projected schema; includes only columns that are needed by the query. + projected_schema: SchemaRef, + /// Kafka consumer client + consumer: Consumer, +} + +impl KafkaReader { + pub fn new( + kafka_avro_schema: Schema, + raw_schema: SchemaRef, + projected_schema: SchemaRef, + consumer: Consumer, + ) -> Self { + KafkaReader { + kafka_avro_schema, + raw_schema, + projected_schema, + consumer, + } + } + + async fn next_result_async(&mut self) -> Result, ArrowError> { + tracing::debug!("reading kafka messages"); + let max_batch_size = 100000; + let mut avro_values = Vec::with_capacity(max_batch_size); + let start = SystemTime::now(); + while avro_values.len() < max_batch_size { + let since_start = SystemTime::now().duration_since(start).unwrap(); + if since_start.as_millis() > 1000 { + break; + } + let next_results = self.consumer.poll(); + let Ok(msg) = next_results else { + tracing::debug!("unable to poll kafka stream"); + break; + }; + for ms in msg.iter() { + for m in ms.messages() { + let reader = Reader::with_schema(&self.kafka_avro_schema, m.value).unwrap(); + for kafka_msg in reader { + let kafka_msg = kafka_msg.unwrap(); + tracing::debug!("read kafka message: {:?}", kafka_msg); + match kafka_msg { + Value::Record(fields) => avro_values.push(fields), + _ => { + let e = error_stack::report!(DeserializeError::UnsupportedType) + .attach_printable(format!( + "expected a record but got {:?}", + kafka_msg + )); + return Err(ArrowError::from_external_error(Box::new( + DeserializeErrorWrapper::from(e), + ))); + } + } + } + } + let _ = self.consumer.consume_messageset(ms); + } + } + tracing::debug!("read {} messages", avro_values.len()); + match avro_values.len() { + 0 => Ok(None), + _ => { + let arrow_data = sparrow_arrow::avro::avro_to_arrow(avro_values).map_err(|e| { + tracing::error!("avro_to_arrow error: {}", e); + ArrowError::from_external_error(Box::new(e)) + })?; + let batch = RecordBatch::try_new(self.raw_schema.clone(), arrow_data)?; + tracing::debug!("produced batch: {:?}", batch); + // Note that the _publish_time is dropped here. This field is added for the purposes of + // prepare, where the `time` column is automatically set to the `_publish_time`. + let columns_to_read = get_columns_to_read(&self.raw_schema, &self.projected_schema); + let columns: Vec<_> = columns_to_read + .iter() + .map(|index| batch.column(*index).clone()) + .collect(); + + Ok(RecordBatch::try_new(self.projected_schema.clone(), columns).map(Some)?) + } + } + } +} + +pub async fn consumer(subscription: &KafkaSubscription) -> error_stack::Result { + let config = subscription + .config + .as_ref() + .ok_or(Error::MissingKafkaConfig)?; + let consumer = Consumer::from_hosts(config.hosts.to_owned()) + .with_topic(config.topic.to_owned()) + .create() + .into_report() + .change_context(Error::CreateKafkaConsumer)?; + tracing::info!("created kafka consumer"); + Ok(consumer) +} diff --git a/crates/sparrow-runtime/src/streams/pulsar/stream.rs b/crates/sparrow-runtime/src/streams/pulsar/stream.rs index 8d519976a..e5325a3c3 100644 --- a/crates/sparrow-runtime/src/streams/pulsar/stream.rs +++ b/crates/sparrow-runtime/src/streams/pulsar/stream.rs @@ -1,12 +1,11 @@ use crate::prepare::Error; -use arrow::datatypes::Schema; +use crate::streams::{get_columns_to_read, AvroWrapper, DeserializeError, DeserializeErrorWrapper}; use arrow::error::ArrowError; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; use avro_rs::types::Value; use error_stack::{IntoReport, ResultExt}; use futures::Stream; use futures_lite::stream::StreamExt; -use hashbrown::HashSet; use pulsar::consumer::InitialPosition; use pulsar::{ @@ -20,11 +19,6 @@ use std::io::Cursor; use std::time::Duration; use tokio::time::timeout; -pub struct AvroWrapper { - user_record: Value, - projected_record: Value, -} - /// Creates a pulsar stream to be used during execution in a long-lived process. /// /// This stream should not close naturally. It continually reads messages from the @@ -66,35 +60,6 @@ struct PulsarReader { should_include_publish_time: bool, } -#[derive(Debug)] -struct DeserializeErrorWrapper(error_stack::Report); - -impl std::fmt::Display for DeserializeErrorWrapper { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::fmt::Display::fmt(&self.0, f) - } -} - -impl std::error::Error for DeserializeErrorWrapper {} - -impl From> for DeserializeErrorWrapper { - fn from(error: error_stack::Report) -> Self { - DeserializeErrorWrapper(error) - } -} - -#[derive(derive_more::Display, Debug)] -pub enum DeserializeError { - #[display(fmt = "error reading Avro record")] - Avro, - #[display(fmt = "unsupported Avro value")] - UnsupportedType, - #[display(fmt = "internal error")] - InternalError, -} - -impl error_stack::Context for DeserializeError {} - impl DeserializeMessage for AvroWrapper { type Output = error_stack::Result; @@ -342,22 +307,3 @@ pub async fn consumer( Ok(consumer) } - -/// Determine needed indices given a file schema and projected schema. -fn get_columns_to_read(file_schema: &Schema, projected_schema: &Schema) -> Vec { - let needed_columns: HashSet<_> = projected_schema - .fields() - .iter() - .map(|field| field.name()) - .collect(); - - let mut columns = Vec::with_capacity(3 + needed_columns.len()); - - for (index, column) in file_schema.fields().iter().enumerate() { - if needed_columns.contains(column.name()) { - columns.push(index) - } - } - - columns -} diff --git a/examples/kafka_materialize/README.md b/examples/kafka_materialize/README.md new file mode 100644 index 000000000..8e58de68c --- /dev/null +++ b/examples/kafka_materialize/README.md @@ -0,0 +1,16 @@ +# Kafka with Schema Registry in Docker Compose + +The goal of this readme is document the setup instructions for running Kafka locally to use with Sparrow/Wren/Kaskada. This implementation is designed for streaming reads. + +## Setup + +Run a `docker compose up -d` + +## Details + +* Kafka Worker on `kafka1:9092` +* Schema Registry on `schemaregistry:8085` +* Zookeeper on `zookeeper:2181` +* Kafka Rest API on `restproxy:8082` +* Kafka SQL server on `ksql-server:8088` + diff --git a/examples/kafka_materialize/docker-compose.yaml b/examples/kafka_materialize/docker-compose.yaml new file mode 100644 index 000000000..46602642a --- /dev/null +++ b/examples/kafka_materialize/docker-compose.yaml @@ -0,0 +1,98 @@ +version: "3.0" + +services: + + worker: + image: tlberglund/kafka-workshop-worker + environment: + KAFKA_BOOTSTRAP_SERVERS: kafka1:9092 + depends_on: + - kafka1 + command: "bash -c 'sleep infinity'" + volumes: + - $PWD/worker/data:/data + + ksql-cli: + image: confluentinc/cp-ksql-cli:5.1.2 + depends_on: + - ksql-server + + ksql-server: + image: confluentinc/cp-ksql-server:5.1.2 + ports: + - 8088:8088 + depends_on: + - kafka1 + - zookeeper + - schemaregistry + environment: + KSQL_BOOTSTRAP_SERVERS: kafka1:9092 + KSQL_LISTENERS: http://0.0.0.0:8088 + KSQL_KSQL_SERVICE_ID: kafka_workshop + KSQL_CUB_KAFKA_TIMEOUT: 300 + KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schemaregistry:8085 + KSQL_KSQL_COMMIT_INTERVAL_MS: 2000 + KSQL_KSQL_CACHE_MAX_BYTES_BUFFERING: 10000000 + KSQL_KSQL_STREAMS_AUTO_OFFSET_RESET: earliest + + zookeeper: + image: confluentinc/cp-zookeeper:5.1.2 + restart: always + environment: + ZOOKEEPER_SERVER_ID: 1 + ZOOKEEPER_CLIENT_PORT: "2181" + ZOOKEEPER_TICK_TIME: "2000" + ZOOKEEPER_SERVERS: "zookeeper:22888:23888" + ports: + - "2181:2181" + + kafka1: + image: confluentinc/cp-enterprise-kafka:5.1.2 + depends_on: + - zookeeper + ports: + # Exposes 29092 for external connections to the broker + # Use kafka1:9092 for connections internal on the docker network + # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details + - "29092:29092" + environment: + KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_BROKER_ID: 1 + KAFKA_BROKER_RACK: "r1" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_DELETE_TOPIC_ENABLE: "true" + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + KAFKA_SCHEMA_REGISTRY_URL: "schemaregistry:8085" + #KAFKA_LOG4J_ROOT_LOGLEVEL: INFO + KAFKA_JMX_PORT: 9991 + + + schemaregistry: + image: confluentinc/cp-schema-registry:5.1.2 + restart: always + depends_on: + - zookeeper + environment: + SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181" + SCHEMA_REGISTRY_HOST_NAME: schemaregistry + SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8085" + ports: + - 8085:8085 + + + restproxy: + image: confluentinc/cp-kafka-rest:5.1.2 + restart: always + depends_on: + - kafka1 + environment: + KAFKA_REST_ZOOKEEPER_CONNECT: "zookeeper:2181" + KAFKA_REST_LISTENERS: "http://0.0.0.0:8082" + KAFKA_REST_SCHEMA_REGISTRY_URL: "http://schemaregistry:8085" + KAFKA_REST_HOST_NAME: restproxy + KAFKA_REST_DEBUG: "true" + ports: + - 8082:8082 diff --git a/tests/integration/api/api_suite_test.go b/tests/integration/api/api_suite_test.go index ba422211a..6b54f38dc 100644 --- a/tests/integration/api/api_suite_test.go +++ b/tests/integration/api/api_suite_test.go @@ -202,6 +202,16 @@ func getPulsarConfig(topicName string) *v1alpha.PulsarConfig { } } +func getKafkaConfig(topicName string) *v1alpha.KafkaConfig { + return &v1alpha.KafkaConfig{ + Hosts: []string{"localhost:29092"}, + Topic: topicName, + Schema: &v1alpha.KafkaConfig_AvroSchema{ + AvroSchema: "{\"type\": \"record\", \"name\": \"MyRecord\", \"fields\": [{\"name\": \"time\", \"type\":\"long\"}, {\"name\": \"id\", \"type\": \"long\"}, {\"name\": \"my_val\", \"type\": \"long\"}]}", + }, + } +} + func receivePulsarMessageWithTimeout(pulsarConsumer pulsar.Consumer, ctx context.Context) pulsar.Message { timeout, timeoutCancel := context.WithTimeout(ctx, 250*time.Millisecond) defer timeoutCancel() diff --git a/tests/integration/api/go.mod b/tests/integration/api/go.mod index 3ee201394..9ba679f7f 100644 --- a/tests/integration/api/go.mod +++ b/tests/integration/api/go.mod @@ -18,6 +18,7 @@ require ( google.golang.org/genproto v0.0.0-20230221151758-ace64dc21148 google.golang.org/grpc v1.53.0 google.golang.org/protobuf v1.28.1 + gopkg.in/confluentinc/confluent-kafka-go.v1 v1.8.2 ) require ( @@ -44,6 +45,7 @@ require ( github.com/aws/aws-sdk-go v1.44.122 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/confluentinc/confluent-kafka-go v1.9.2 // indirect github.com/danieljoos/wincred v1.1.2 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/dvsekhvalnov/jose2go v1.5.0 // indirect @@ -56,8 +58,8 @@ require ( github.com/golang-jwt/jwt v3.2.1+incompatible // indirect github.com/golang-jwt/jwt/v4 v4.4.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/golang/snappy v0.0.3 // indirect - github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/google/pprof v0.0.0-20211008130755-947d60d73cc0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect github.com/googleapis/gax-go/v2 v2.7.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.0 // indirect diff --git a/tests/integration/api/go.sum b/tests/integration/api/go.sum index abc9d8d78..2d1a42d51 100644 --- a/tests/integration/api/go.sum +++ b/tests/integration/api/go.sum @@ -147,6 +147,9 @@ github.com/DataDog/zstd v1.5.0/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwS github.com/GoogleCloudPlatform/cloudsql-proxy v1.29.0/go.mod h1:spvB9eLJH9dutlbPSRmHvSXXHOwGRyeXh1jVdquA2G8= github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/actgardner/gogen-avro/v10 v10.1.0/go.mod h1:o+ybmVjEa27AAr35FRqU98DJu1fXES56uXniYFv4yDA= +github.com/actgardner/gogen-avro/v10 v10.2.1/go.mod h1:QUhjeHPchheYmMDni/Nx7VB0RsT/ee8YIgGY/xpEQgQ= +github.com/actgardner/gogen-avro/v9 v9.1.0/go.mod h1:nyTj6wPqDJoxM3qdnjcLv+EnMDSDFqE0qDpva2QRmKc= github.com/agext/levenshtein v1.2.1 h1:QmvMAjj2aEICytGiWzmxoE0x2KZvE0fvmqMOfy2tjT8= github.com/agext/levenshtein v1.2.1/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -231,15 +234,19 @@ github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XP github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c= +github.com/confluentinc/confluent-kafka-go v1.9.2 h1:gV/GxhMBUb03tFWkN+7kdhg+zf+QUM+wVkI9zwh770Q= +github.com/confluentinc/confluent-kafka-go v1.9.2/go.mod h1:ptXNqsuDfYbAE/LBW6pnwWZElUoWxHoV8E43DCrliyo= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0= github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -266,12 +273,17 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= +github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.9.1 h1:PS7VIOgmSVhWUEeZwTe7z7zouA22Cr590PzXKbZHOVY= github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/frankban/quicktest v1.2.2/go.mod h1:Qh/WofXFeiAFII1aEBu529AtJo6Zg2VHscnEsbBnJ20= +github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o= +github.com/frankban/quicktest v1.10.0/go.mod h1:ui7WezCLWMWxVWr1GETZY3smRy0G4KWq9vcPtJmFl7Y= +github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= @@ -368,13 +380,15 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/flatbuffers v1.11.0 h1:O7CEyB8Cb3/DmtxODGtLHcEvpr81Jm5qLg/hsHnxA2A= github.com/google/flatbuffers v1.11.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.2.1-0.20190312032427-6f77996f0c42/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -415,8 +429,9 @@ github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20210506205249-923b5ab0fc1a/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= -github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20211008130755-947d60d73cc0 h1:zHs+jv3LO743/zFGcByu2KmpbliCU2AhjcGgrdTwSG4= +github.com/google/pprof v0.0.0-20211008130755-947d60d73cc0/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/subcommands v1.0.1/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -442,6 +457,7 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.0 h1:1JYBfzqrWPcCclBwxFCPAou9n+q github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.0/go.mod h1:YDZoGHuwE+ov0c8smSH49WLF3F2LaWnYYuDVd+EWrc0= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0= +github.com/hamba/avro v1.5.6/go.mod h1:3vNT0RLXXpFm2Tb/5KC71ZRJlOroggq1Rcitb6k4Fr8= github.com/hanwen/go-fuse v1.0.0/go.mod h1:unqXarDXqzAk0rt98O2tVndEPIpUgLD9+rwFisZH3Ok= github.com/hanwen/go-fuse/v2 v2.1.0/go.mod h1:oRyA5eK+pvJyv5otpO/DgccS8y/RvYMaO00GgRLGryc= github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q= @@ -467,10 +483,14 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= +github.com/heetch/avro v0.3.1/go.mod h1:4xn38Oz/+hiEUTpbVfGVLfvOg0yKLlRP7Q9+gJJILgA= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0/go.mod h1:N0Wam8K1arqPXNWjMo21EXnBPOPp36vB07FNRdD2geA= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/invopop/jsonschema v0.4.0/go.mod h1:O9uiLokuu0+MGFlyiaqtWxwqJm41/+8Nj0lD7A36YH0= github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= @@ -512,6 +532,11 @@ github.com/jackc/puddle v1.2.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dv github.com/jawher/mow.cli v1.0.4/go.mod h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk= github.com/jawher/mow.cli v1.2.0/go.mod h1:y+pcA3jBAdo/GIZx/0rFjw/K2bVEODP9rfZOfaiq8Ko= github.com/jcmturner/gofork v0.0.0-20180107083740-2aebee971930/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jhump/gopoet v0.0.0-20190322174617-17282ff210b3/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI= +github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI= +github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ= +github.com/jhump/protoreflect v1.11.0/go.mod h1:U7aMIjN0NWq9swDP7xDdoMfRHb35uiuTd3Z9nFXJf5E= +github.com/jhump/protoreflect v1.12.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= @@ -531,6 +556,7 @@ github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/X github.com/jt-nti/gproto v0.0.0-20210304092907-23e645af1351 h1:jYsiD6zdBzctjZ4sDB+gGJJPB3NROHrUuCp/wUj5p9Y= github.com/jt-nti/gproto v0.0.0-20210304092907-23e645af1351/go.mod h1:yfoLDf8VFUCWSxFJsPuQT5BlqdDbGkDl5m6hzABroMI= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/juju/qthttptest v0.1.1/go.mod h1:aTlAv8TYaflIiTDIQYzxnl1QdPjAg8Q8qJMErpKy6A4= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -555,12 +581,14 @@ github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= @@ -572,7 +600,11 @@ github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.4/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.6 h1:jbk+ZieJ0D7EVGJYpL9QTz7/YW6UHbmdnZWYyK5cdBs= github.com/lib/pq v1.10.6/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/linkedin/goavro v2.1.0+incompatible/go.mod h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM= github.com/linkedin/goavro/v2 v2.9.8/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= +github.com/linkedin/goavro/v2 v2.10.0/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= +github.com/linkedin/goavro/v2 v2.10.1/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= +github.com/linkedin/goavro/v2 v2.11.1/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= github.com/linkedin/goavro/v2 v2.12.0 h1:rIQQSj8jdAUlKQh6DttK8wCRv4t4QO09g1C4aBWXslg= github.com/linkedin/goavro/v2 v2.12.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= @@ -633,6 +665,7 @@ github.com/namsral/flag v1.7.4-pre h1:b2ScHhoCUkbsq0d2C15Mv+VU8bl8hAXV8arnWiOHNZ github.com/namsral/flag v1.7.4-pre/go.mod h1:OXldTctbM6SWH1K899kPZcf65KxJiD7MsceFUpB5yDo= github.com/ncw/swift v1.0.52/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/nrwiersma/avro-benchmarks v0.0.0-20210913175520-21aec48c8f76/go.mod h1:iKyFMidsk/sVYONJRE372sJuX/QTRPacU7imPqqsu7g= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= @@ -663,6 +696,7 @@ github.com/pierrec/lz4/v4 v4.1.8 h1:ieHkV+i2BRzngO4Wd/3HGowuZStgq6QkPsD1eolNAO4= github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA= github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4/go.mod h1:N6UoU20jOqggOuDwUaBQpluzLNDqif3kq9z2wpdYEfQ= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -701,8 +735,12 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= +github.com/rogpeppe/clock v0.0.0-20190514195947-2896927a307a/go.mod h1:4r5QyqhjIWCcK8DO4KMclc5Iknq5qVBAlbYYzAbUScQ= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= +github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= @@ -710,6 +748,7 @@ github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OK github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= +github.com/santhosh-tekuri/jsonschema/v5 v5.0.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/secure-io/sio-go v0.3.1 h1:dNvY9awjabXTYGsTF1PiCySl9Ltofk9GA3VdWlo7rRc= @@ -752,6 +791,7 @@ github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpE github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.3.1-0.20190311161405-34c6fa2dc709/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -919,6 +959,7 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200301022130-244492dfa37a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20200505041828-1ed23360d12c/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= @@ -1145,6 +1186,7 @@ golang.org/x/tools v0.0.0-20200304193943-95d2e580d8eb/go.mod h1:o4KQGtdN14AW+yjs golang.org/x/tools v0.0.0-20200312045724-11d5b4c81c7d/go.mod h1:o4KQGtdN14AW+yjsvvwRTJJuXz8XRtIHtEnmAXLyFUw= golang.org/x/tools v0.0.0-20200331025713-a30bf2db82d4/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8= golang.org/x/tools v0.0.0-20200501065659-ab2804fb9c9d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20200505023115-26f46d2f7ef8/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200512131952-2bc93b1c0c88/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200515010526-7d3b6ebf133d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= @@ -1311,6 +1353,7 @@ google.golang.org/genproto v0.0.0-20220304144024-325a89244dc8/go.mod h1:kGP+zUP2 google.golang.org/genproto v0.0.0-20220310185008-1973136f34c6/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI= google.golang.org/genproto v0.0.0-20220324131243-acbaeb5b85eb/go.mod h1:hAL49I2IFola2sVEjAn7MEwsja0xp51I0tlGAf9hz4E= google.golang.org/genproto v0.0.0-20220401170504-314d38edb7de/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo= +google.golang.org/genproto v0.0.0-20220503193339-ba3ae3f07e29/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4= google.golang.org/genproto v0.0.0-20230221151758-ace64dc21148 h1:muK+gVBJBfFb4SejshDBlN2/UgxCCOKH9Y34ljqEGOc= google.golang.org/genproto v0.0.0-20230221151758-ace64dc21148/go.mod h1:3Dl5ZL0q0isWJt+FVcfpQyirqemEuLAK/iFvg1UP1Hw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -1341,6 +1384,7 @@ google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9K google.golang.org/grpc v1.40.1/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= +google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= google.golang.org/grpc v1.53.0 h1:LAv2ds7cmFV/XTS3XG1NneeENYrXGmorPxsBbptIjNc= google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= @@ -1361,14 +1405,19 @@ google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/avro.v0 v0.0.0-20171217001914-a730b5802183/go.mod h1:FvqrFXt+jCsyQibeRv4xxEJBL5iG2DDW5aeJwzDiq4A= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/confluentinc/confluent-kafka-go.v1 v1.8.2 h1:QAgN6OC0o7dwvyz+HML6GYm+0Pk54O91+oxGqJ/5z8I= +gopkg.in/confluentinc/confluent-kafka-go.v1 v1.8.2/go.mod h1:ZdI3yfYmdNSLQPNCpO1y00EHyWaHG5EnQEyL/ntAegY= +gopkg.in/errgo.v1 v1.0.0/go.mod h1:CxwszS/Xz1C49Ucd2i6Zil5UToP1EmyrFhKaMVbg1mk= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/httprequest.v1 v1.2.1/go.mod h1:x2Otw96yda5+8+6ZeWwHIJTFkEHWP/qP8pJOzqEtWPM= gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s= gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.66.6/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= @@ -1379,7 +1428,9 @@ gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eR gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4= gopkg.in/jcmturner/gokrb5.v7 v7.3.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= +gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= +gopkg.in/retry.v1 v1.0.3/go.mod h1:FJkXmWiMaAo7xB+xhvDF59zhfjDWyzmyAxiT4dB688g= gopkg.in/square/go-jose.v2 v2.4.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= @@ -1388,6 +1439,7 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= diff --git a/tests/integration/api/mat_kafka_to_obj_store_test.go b/tests/integration/api/mat_kafka_to_obj_store_test.go new file mode 100644 index 000000000..58bcc2384 --- /dev/null +++ b/tests/integration/api/mat_kafka_to_obj_store_test.go @@ -0,0 +1,217 @@ +package api_test + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/google/uuid" + v1alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v1alpha" + "github.com/kaskada-ai/kaskada/tests/integration/shared/helpers" + . "github.com/kaskada-ai/kaskada/tests/integration/shared/matchers" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" +) + +var _ = FDescribe("Materialization from Kafka to ObjectStore", Ordered, Label("kafka"), func() { + var ( + ctx context.Context + cancel context.CancelFunc + conn *grpc.ClientConn + err error + kafkaProducer *kafka.Producer + materializationClient v1alpha.MaterializationServiceClient + materializationName string + outputPath string + outputURI string + kafkaDeliverChannel chan kafka.Event + tableClient v1alpha.TableServiceClient + tableName string + topicName string + ) + + BeforeAll(func() { + //get connection to wren + ctx, cancel, conn = grpcConfig.GetContextCancelConnection(20) + ctx = metadata.AppendToOutgoingContext(ctx, "client-id", *integrationClientID) + + // get a grpc client for the table & materialization services + tableClient = v1alpha.NewTableServiceClient(conn) + materializationClient = v1alpha.NewMaterializationServiceClient(conn) + materializationName = "mat_kafkaToObjStore" + + // define the output path and make sure it is empty + outputPath = fmt.Sprintf("../data/output/%s/", materializationName) + os.RemoveAll(outputPath) + + if os.Getenv("ENV") == "local-local" { + workDir, err := os.Getwd() + Expect(err).ShouldNot(HaveOccurred()) + outputURI = fmt.Sprintf("file://%s/../data/output/%s", workDir, materializationName) + } else { + outputURI = fmt.Sprintf("file:///data/output/%s", materializationName) + } + client_id, err := uuid.NewRandom() + Expect(err).Should(BeNil()) + kafkaProducer, err = kafka.NewProducer(&kafka.ConfigMap{ + "bootstrap.servers": "localhost:29092", + "client.id": client_id.String(), + "acks": "all", + }) + Expect(err).Should(BeNil()) + kafkaDeliverChannel = make(chan kafka.Event, 10000) + + topicName = uuid.NewString() + + // create a table backed by kafka + tableName = "table_kafkaToObjStore" + tableClient.DeleteTable(ctx, &v1alpha.DeleteTableRequest{TableName: tableName}) + + err = kafkaProducer.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{ + Topic: &topicName, + Partition: 0, + }, + Value: helpers.ReadTestFile("avro/msg_0.avro"), + }, kafkaDeliverChannel) + Expect(err).ShouldNot(HaveOccurred()) + + e := <-kafkaDeliverChannel + m := e.(*kafka.Message) + Expect(m.TopicPartition.Error).ShouldNot(HaveOccurred()) + kafkaProducer.Flush(1000) + + table := &v1alpha.Table{ + TableName: tableName, + TimeColumnName: "time", + EntityKeyColumnName: "id", + Source: &v1alpha.Source{ + Source: &v1alpha.Source_Kafka{ + Kafka: &v1alpha.KafkaSource{ + Config: getKafkaConfig(topicName), + }, + }, + }, + } + _, err = tableClient.CreateTable(ctx, &v1alpha.CreateTableRequest{Table: table}) + Expect(err).ShouldNot(HaveOccurredGrpc(), "failed to create kafka-backed table") + }) + + AfterAll(func() { + // clean up items from the test + _, err = materializationClient.DeleteMaterialization(ctx, &v1alpha.DeleteMaterializationRequest{MaterializationName: materializationName}) + Expect(err).ShouldNot(HaveOccurredGrpc(), "issue deleting materialization") + _, err = tableClient.DeleteTable(ctx, &v1alpha.DeleteTableRequest{TableName: tableName}) + Expect(err).ShouldNot(HaveOccurredGrpc(), "issue deleting table") + + cancel() + conn.Close() + if kafkaDeliverChannel != nil { + close(kafkaDeliverChannel) + } + }) + + Describe("Create a materialization", func() { + It("Should work without error", func() { + createRequest := &v1alpha.CreateMaterializationRequest{ + Materialization: &v1alpha.Materialization{ + MaterializationName: materializationName, + Expression: ` + { + last_id: table_kafkaToObjStore.id | last(), + last_time: table_kafkaToObjStore.time | last(), + count: table_kafkaToObjStore | count(), + } + `, + Destination: &v1alpha.Destination{ + Destination: &v1alpha.Destination_ObjectStore{ + ObjectStore: &v1alpha.ObjectStoreDestination{ + FileType: v1alpha.FileType_FILE_TYPE_CSV, + OutputPrefixUri: outputURI, + }, + }, + }, + }, + } + + res, err := materializationClient.CreateMaterialization(ctx, createRequest) + Expect(err).ShouldNot(HaveOccurredGrpc()) + Expect(res).ShouldNot(BeNil()) + VerifyRequestDetails(res.RequestDetails) + Expect(res.Analysis.CanExecute).Should(BeTrue()) + Expect(res.Materialization.MaterializationId).ShouldNot(BeEmpty()) + }) + + It("Should output initial results to csv", func() { + err = kafkaProducer.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{ + Topic: &topicName, + Partition: 0, + }, + Value: helpers.ReadTestFile("avro/msg_1.avro"), + }, kafkaDeliverChannel) + Expect(err).ShouldNot(HaveOccurred()) + + e := <-kafkaDeliverChannel + m := e.(*kafka.Message) + Expect(m.TopicPartition.Error).ShouldNot(HaveOccurred()) + err = kafkaProducer.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{ + Topic: &topicName, + Partition: 0, + }, + Value: helpers.ReadTestFile("avro/msg_2.avro"), + }, kafkaDeliverChannel) + Expect(err).ShouldNot(HaveOccurred()) + + e = <-kafkaDeliverChannel + m = e.(*kafka.Message) + Expect(m.TopicPartition.Error).ShouldNot(HaveOccurred()) + + err = kafkaProducer.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{ + Topic: &topicName, + Partition: 0, + }, + Value: helpers.ReadTestFile("avro/msg_3.avro"), + }, kafkaDeliverChannel) + Expect(err).ShouldNot(HaveOccurred()) + + e = <-kafkaDeliverChannel + m = e.(*kafka.Message) + Expect(m.TopicPartition.Error).ShouldNot(HaveOccurred()) + + err = kafkaProducer.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{ + Topic: &topicName, + Partition: 0, + }, + Value: helpers.ReadTestFile("avro/msg_4.avro"), + }, kafkaDeliverChannel) + Expect(err).ShouldNot(HaveOccurred()) + + e = <-kafkaDeliverChannel + m = e.(*kafka.Message) + Expect(m.TopicPartition.Error).ShouldNot(HaveOccurred()) + kafkaProducer.Flush(1000) + + time.Sleep(time.Millisecond * 1000) + + Eventually(func(g Gomega) { + dirs, err := os.ReadDir(outputPath) + g.Expect(err).ShouldNot(HaveOccurred(), "cannot list output_path files") + g.Expect(dirs).Should(HaveLen(1)) + firstFileName := dirs[0].Name() + + results := helpers.GetCSV(outputPath + firstFileName) + g.Expect(results).Should(HaveLen(2)) //header row + 1 data row + g.Expect(results[0]).Should(ContainElements("_time", "_subsort", "_key_hash", "last_id", "last_time", "count")) + g.Expect(results[1]).Should(ContainElements("2023-06-20T23:30:01.000000000", "0", "2122274938272070218", "9", "9", "1687303801000000000", "1")) + }, "10s", "1s").Should(Succeed()) + }) + }) +}) diff --git a/tests/integration/api/mat_table_to_pulsar_test.go b/tests/integration/api/mat_table_to_pulsar_test.go index e3346c409..9fcafd7fd 100644 --- a/tests/integration/api/mat_table_to_pulsar_test.go +++ b/tests/integration/api/mat_table_to_pulsar_test.go @@ -21,7 +21,7 @@ import ( . "github.com/kaskada-ai/kaskada/tests/integration/shared/matchers" ) -var _ = Describe("Materialization with Pulsar upload", Ordered, Label("pulsar"), func() { +var _ = XDescribe("Materialization with Pulsar upload", Ordered, Label("pulsar"), func() { var ( ctx context.Context cancel context.CancelFunc diff --git a/wren/compute/file_manager.go b/wren/compute/file_manager.go index 3e1fd9e43..b363e30c3 100644 --- a/wren/compute/file_manager.go +++ b/wren/compute/file_manager.go @@ -18,6 +18,9 @@ type FileManager interface { // GetPulsarSchema returns the schema of the pulsar topic GetPulsarSchema(ctx context.Context, pulsarConfig *v1alpha.PulsarConfig) (*v1alpha.Schema, error) + + // GetKafkaSchema returns the schema of the kafka topic + GetKafkaSchema(ctx context.Context, kafkaConfig *v1alpha.KafkaConfig) (*v1alpha.Schema, error) } type fileManager struct { @@ -64,6 +67,16 @@ func (m *fileManager) GetPulsarSchema(ctx context.Context, pulsarConfig *v1alpha return m.getSchema(ctx, metadataReq) } +func (m *fileManager) GetKafkaSchema(ctx context.Context, kafkaConfig *v1alpha.KafkaConfig) (*v1alpha.Schema, error) { + metadataReq := &v1alpha.GetMetadataRequest{ + Source: &v1alpha.GetMetadataRequest_KafkaConfig{ + KafkaConfig: kafkaConfig, + }, + } + + return m.getSchema(ctx, metadataReq) +} + func (m *fileManager) getSchema(ctx context.Context, metadataReq *v1alpha.GetMetadataRequest) (*v1alpha.Schema, error) { subLogger := log.Ctx(ctx).With().Str("method", "fileManager.getSchema").Logger() diff --git a/wren/service/materialization.go b/wren/service/materialization.go index c3f62130b..c9d9603d0 100644 --- a/wren/service/materialization.go +++ b/wren/service/materialization.go @@ -210,6 +210,7 @@ func (s *materializationService) createMaterialization(ctx context.Context, owne case *v1alpha.Source_Kaskada: newSourceType = materialization.SourceTypeFiles case *v1alpha.Source_Pulsar: + case *v1alpha.Source_Kafka: newSourceType = materialization.SourceTypeStreams default: log.Error().Msgf("unknown source type %T", table.Source.Source) diff --git a/wren/service/table.go b/wren/service/table.go index 4a01b16f9..d2b07f589 100644 --- a/wren/service/table.go +++ b/wren/service/table.go @@ -184,7 +184,20 @@ func (t *tableService) createTable(ctx context.Context, owner *ent.Owner, reques case *v1alpha.Source_Pulsar: // if the table source is pulsar, validate the schema streamSchema, err := t.fileManager.GetPulsarSchema(ctx, s.Pulsar.Config) if err != nil { - subLogger.Error().Err(err).Msg("issue getting schema for file") + subLogger.Error().Err(err).Msg("issue getting schema for pulsar") + return nil, reMapSparrowError(ctx, err) + } + + err = t.validateSchema(ctx, *newTable, streamSchema) + if err != nil { + return nil, err + } + newTable.MergedSchema = streamSchema + case *v1alpha.Source_Kafka: + subLogger.Log().Msgf("Kafka Source: %v", s) + streamSchema, err := t.fileManager.GetKafkaSchema(ctx, s.Kafka.Config) + if err != nil { + subLogger.Error().Err(err).Msg("issue getting schema for kafka") return nil, reMapSparrowError(ctx, err) }