Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,821 changes: 248 additions & 1,573 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ crate-type = ["lib"]
[[bin]]
name = "main"
path = "bin/main.rs"
required-features = ["testing"]

[features]
default = []
testing = ["async-channel"]

[dependencies]
rdkafka = { version = "0.28", features = ["cmake-build"] }
Expand All @@ -26,19 +30,16 @@ tracing-subscriber = { version = "0.3.11", features = [
async-stream = "0.3.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1.0.79"
prost = "0.10.4"
anyhow = "1.0.56"
pin-project = "*"
sled = "0.34.7"
rocksdb = "0.18.0"
bigtable_rs = "0.1.5"
dyn-clone = "1.0.5"
chrono = "0.4.19"
async-trait = "0.1.53"
google-bigtableadmin2 = "3.0.0"
# bigtable depends on `yup-oauth2` ^6, but on hyper-rustls==0.22, but `yup-oauth2` 6.6.0 pulls hyper-rustls==0.23 :(
yup-oauth2 = "=6.5.0"
thiserror = "1.0.31"
async-channel = { version = "*", optional = true }

[dev-dependencies]
rstest = "*"
tempfile = "*"

91 changes: 53 additions & 38 deletions bin/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};

use kast::{
context::Context, encoders::JsonEncoder, input::Input, output::Output, processor::Processor,
processor_helper::KafkaProcessorHelper,
context::Context, encoders::JsonDecoder, input::Input, output::Output, processor::Processor,
processor_helper::KafkaProcessorHelper, state_store::StateStore,
};
use rdkafka::ClientConfig;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;

#[derive(Debug, Clone, Deserialize, Serialize)]
struct Click {}
Expand All @@ -20,11 +21,15 @@ struct ClicksPerUser {
clicks: u32,
}

fn json_encoder<T: DeserializeOwned>(data: Option<&[u8]>) -> T {
serde_json::from_slice(data.expect("empty message")).unwrap()
}

async fn handle_click(ctx: &mut Context<ClicksPerUser>, _click: Click) {
async fn handle_click<TStore>(
_settings: &mut (),
ctx: &mut Context<TStore, ClicksPerUser>,
_click: Click,
_headers: HashMap<String, String>,
) -> Option<ClicksPerUser>
where
TStore: StateStore<ClicksPerUser>,
{
let mut clicks_per_user = match ctx.get_state() {
Some(state) => state.clone(),
None => ClicksPerUser { clicks: 0 },
Expand All @@ -33,20 +38,30 @@ async fn handle_click(ctx: &mut Context<ClicksPerUser>, _click: Click) {
clicks_per_user.clicks += 1;
println!("{:?}, {:?}", ctx.key(), clicks_per_user);

ctx.set_state(Some(clicks_per_user))
Some(clicks_per_user)
}

async fn re_emit_clicks(ctx: &mut Context<ClicksPerUser>, click: Click2) {
async fn re_emit_clicks<TStore>(
_settings: &mut (),
ctx: &mut Context<TStore, ClicksPerUser>,
click: Click2,
_headers: HashMap<String, String>,
) -> Option<ClicksPerUser>
where
TStore: StateStore<ClicksPerUser>,
{
let key = ctx.key().to_string();
for _ in 0..click.clicks {
ctx.emit("c1", &key, &Click {})
ctx.emit("c1", &key, &Click {}, Default::default())
}

None
}

//TODO: Support outputs
//TODO: Support different state stores which arent hashmaps
#[tokio::main]
async fn main() {
async fn main() -> Result<(), String> {
let settings = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("partitioner", "murmur2")
Expand All @@ -63,57 +78,55 @@ async fn main() {
.clone();

let mut p = Processor::new(
"clicks",
KafkaProcessorHelper::new(settings),
vec![
Input::new("c1".to_string(), json_encoder, handle_click),
Input::new("c2".to_string(), JsonEncoder::new(), re_emit_clicks),
Input::new("c1", JsonDecoder::new(), handle_click),
Input::new("c2", JsonDecoder::new(), re_emit_clicks),
],
vec![Output::new("c1".to_string())],
HashMap::<String, ClicksPerUser>::new,
vec![Output::new("c1")],
|| Arc::new(Mutex::new(HashMap::<String, ClicksPerUser>::new())),
|| (),
);

p.start().await;
p.join().await
p.run_forever().await
}

#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::Arc};
use super::*;
use std::collections::HashMap;

use crate::{handle_click, json_encoder, re_emit_clicks, Click, Click2, ClicksPerUser};
use crate::{handle_click, re_emit_clicks, Click, Click2, ClicksPerUser};
use kast::{
encoders::{JsonDecoder, JsonEncoder},
input::Input,
output::Output,
processor::Processor,
state_store::StateStore,
test_utils::TestsProcessorHelper,
};
use rstest::rstest;
use tokio::sync::RwLock;

#[rstest]
#[tokio::test]
async fn test_single_store() {
let mut t = TestsProcessorHelper::new(vec!["c1", "c2", "c3"]);
let state_store = Arc::new(RwLock::new(HashMap::new()));
let state_store = Arc::new(Mutex::new(HashMap::new()));
let state_store_clone = state_store.clone();
let mut in1 = t.input("c1".to_string(), JsonDecoder::new());
let mut in2 = t.input("c2".to_string(), JsonDecoder::new());
let mut in1 = t.input("c1", JsonEncoder::new());
let mut in2 = t.input("c2", JsonEncoder::new());

let mut p = Processor::new(
"clicks",
t,
vec![
Input::new("c1".to_string(), json_encoder, handle_click),
Input::new("c2".to_string(), JsonEncoder::new(), re_emit_clicks),
Input::new("c1", JsonDecoder::new(), handle_click),
Input::new("c2", JsonDecoder::new(), re_emit_clicks),
],
vec![Output::new("c1".to_string())],
move || state_store_clone.clone(),
vec![Output::new("c1")],
move || state_store_clone,
|| (),
);

p.start().await;
// We must run the processor before sending inputs, otherwise they will not reach the processor
p.run_forever().await.unwrap();

for _i in 0..100000 {
in1.send("a".to_string(), &Click {}).await.unwrap();
Expand All @@ -131,15 +144,17 @@ mod tests {
.await
.unwrap();

p.join().await;
p.join().await.unwrap();

let lock = state_store.lock().await;

let final_state: ClicksPerUser = state_store.get("a").await.unwrap();
let final_state: &ClicksPerUser = lock.get("clicks::a").unwrap();
assert_eq!(final_state.clicks, 110000);

let final_state: ClicksPerUser = state_store.get("b").await.unwrap();
let final_state: &ClicksPerUser = lock.get("clicks::b").unwrap();
assert_eq!(final_state.clicks, 10000);

let final_state: ClicksPerUser = state_store.get("c").await.unwrap();
let final_state: &ClicksPerUser = lock.get("clicks::c").unwrap();
assert_eq!(final_state.clicks, 1);
}
}
37 changes: 20 additions & 17 deletions src/context.rs
Original file line number Diff line number Diff line change
@@ -1,56 +1,59 @@
use std::{sync::Arc, collections::HashMap};

use serde::Serialize;
use tokio::sync::Mutex;

use crate::state_store::StateStore;

pub struct Context<T = ()> {
pub struct Context<TStore, T = ()> {
key: String,
origingal_state: Option<T>,
new_state: Option<T>,
original_state: Option<T>,
sends: Vec<FutureDeliverableMessage>,
state_store: Arc<Mutex<TStore>>,
}

#[derive(Clone)]
pub struct FutureDeliverableMessage {
pub topic: String,
pub key: String,
pub payload: Vec<u8>,
pub headers: HashMap<String, String>,
}

impl<T> Context<T>
impl<TStore, T> Context<TStore, T>
where
T: Clone,
TStore: StateStore<T>
{
pub fn new(key: &str, state: Option<T>) -> Self {
pub fn new(key: &str, state: Option<T>, state_store: Arc<Mutex<TStore>>) -> Self {
Self {
key: key.to_string(),
origingal_state: state,
new_state: None,
original_state: state,
sends: vec![],
// deserializers: HashMap::new(),
state_store,
}
}

pub fn key(&self) -> &str {
&self.key
}

pub fn get_state(&self) -> &Option<T> {
&self.origingal_state
}

pub fn get_new_state(&self) -> &Option<T> {
&self.new_state
pub fn state_store(&self) -> Arc<Mutex<TStore>> {
self.state_store.clone()
}

pub fn set_state(&mut self, state: Option<T>) {
self.new_state = state
pub fn get_state(&self) -> &Option<T> {
&self.original_state
}

pub fn emit<M: Serialize>(&mut self, topic: &str, key: &str, msg: &M) {
pub fn emit<M: Serialize>(&mut self, topic: &str, key: &str, msg: &M, headers: HashMap<String, String>) {
// let output = self.deserializers.get(topic).unwrap();
let data = serde_json::to_vec(msg).unwrap();
self.sends.push(FutureDeliverableMessage {
topic: topic.to_string(),
key: key.to_string(),
payload: data,
headers,
});
}

Expand Down
Loading