From f8e5aa5ab4369b018c686d6474626ba889b11a6d Mon Sep 17 00:00:00 2001 From: Shun Kashiwa Date: Fri, 14 Mar 2025 23:20:07 -0700 Subject: [PATCH 1/6] move kvs to examples --- chorus_lib/{tests => examples}/kvs.rs | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename chorus_lib/{tests => examples}/kvs.rs (100%) diff --git a/chorus_lib/tests/kvs.rs b/chorus_lib/examples/kvs.rs similarity index 100% rename from chorus_lib/tests/kvs.rs rename to chorus_lib/examples/kvs.rs From 4f36a8c24afd6ce0ef11bd372de255921f6a3904 Mon Sep 17 00:00:00 2001 From: Shun Kashiwa Date: Sat, 15 Mar 2025 21:04:32 -0700 Subject: [PATCH 2/6] make kvs work --- chorus_lib/examples/kvs.rs | 355 +++++++++++++++++++++++++++++-------- 1 file changed, 278 insertions(+), 77 deletions(-) diff --git a/chorus_lib/examples/kvs.rs b/chorus_lib/examples/kvs.rs index 87497cf..a39d29d 100644 --- a/chorus_lib/examples/kvs.rs +++ b/chorus_lib/examples/kvs.rs @@ -1,30 +1,110 @@ extern crate chorus_lib; +use std::env; +use std::hash::{DefaultHasher, Hash, Hasher}; use std::marker::PhantomData; -use std::thread; +use std::{ + collections::HashMap, + fs::{self, OpenOptions}, + io::{Read, Write}, + path::Path, +}; +use std::{io, process, thread}; use chorus_lib::core::{ ChoreoOp, Choreography, ChoreographyLocation, Deserialize, Faceted, FanInChoreography, HCons, HNil, Located, LocationSet, LocationSetFoldable, Member, MultiplyLocated, Portable, Projector, Serialize, Subset, }; -use chorus_lib::transport::local::{LocalTransport, LocalTransportChannelBuilder}; +use chorus_lib::transport::http::{HttpTransport, HttpTransportConfigBuilder}; type Response = i32; +type Value = i32; type Key = String; #[derive(Serialize, Deserialize)] enum Request { Get(Key), - Put(Key, i32), + Put(Key, Value), +} + +#[derive(Serialize, Deserialize, Debug)] +struct KeyValueStore { + store: HashMap, +} + +impl KeyValueStore { + fn load_from_file(file_path: &str) -> Self { + if Path::new(file_path).exists() { + let mut file = OpenOptions::new().read(true).open(file_path).unwrap(); + let mut contents = String::new(); + file.read_to_string(&mut contents).unwrap(); + serde_json::from_str(&contents).unwrap_or(Self { + store: HashMap::new(), + }) + } else { + Self { + store: HashMap::new(), + } + } + } + + fn save_to_file(&self, file_path: &str) { + let json_data = serde_json::to_string(&self).unwrap(); + let mut file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(file_path) + .unwrap(); + file.write_all(json_data.as_bytes()).unwrap(); + } } -fn handle_get(key: Key) -> Response { - key.len().try_into().unwrap() +fn get_thread_id() -> String { + let pid = process::id(); + let thread_id = thread::current().id(); + + let mut hasher = DefaultHasher::new(); + pid.hash(&mut hasher); + thread_id.hash(&mut hasher); + format!("{:x}", hasher.finish()) // Convert to hexadecimal for compactness } -fn handle_put(key: Key, val: i32) -> Response { - (val != handle_get(key)) as Response +fn create_data_dir_if_necessary() { + if !Path::new(".data").exists() { + fs::create_dir(".data").unwrap(); + let gitignore_path = Path::new(".data/.gitignore"); + let mut file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(gitignore_path) + .unwrap(); + file.write_all(b"*").unwrap(); + } +} + +fn handle_get(key: String) -> i32 { + let thread_id = get_thread_id(); + let file_path = format!(".data/{}", thread_id); + + create_data_dir_if_necessary(); + + let kv_store = KeyValueStore::load_from_file(&file_path); + kv_store.store.get(&key).cloned().unwrap_or(-1) +} + +fn handle_put(key: String, value: i32) -> i32 { + let thread_id = get_thread_id(); + let file_path = format!(".data/{}", thread_id); + + create_data_dir_if_necessary(); + + let mut kv_store = KeyValueStore::load_from_file(&file_path); + kv_store.store.insert(key, value); + kv_store.save_to_file(&file_path); + 0 } #[derive(ChoreographyLocation, Debug)] @@ -157,78 +237,199 @@ where } } -fn run_test(request: Request, answer: Response) { - let transport_channel = LocalTransportChannelBuilder::new() - .with(Client) - .with(Server) - .with(Backup1) - .with(Backup2) - .build(); - let transport_client = LocalTransport::new(Client, transport_channel.clone()); - let transport_server = LocalTransport::new(Server, transport_channel.clone()); - let transport_backup1 = LocalTransport::new(Backup1, transport_channel.clone()); - let transport_backup2 = LocalTransport::new(Backup2, transport_channel.clone()); - - let client_projector = Projector::new(Client, transport_client); - let server_projector = Projector::new(Server, transport_server); - let backup1_projector = Projector::new(Backup1, transport_backup1); - let backup2_projector = Projector::new(Backup2, transport_backup2); - - let mut handles: Vec>> = Vec::new(); - handles.push( - thread::Builder::new() - .name("Server".to_string()) - .spawn(move || { - server_projector.epp_and_run(KVS::>, _, _, _> { - request: server_projector.remote(Client), - _phantoms: PhantomData, - }) - }) - .unwrap(), - ); - handles.push( - thread::Builder::new() - .name("Backup1".to_string()) - .spawn(move || { - backup1_projector.epp_and_run( - KVS::>, _, _, _> { - request: backup1_projector.remote(Client), - _phantoms: PhantomData, - }, - ) - }) - .unwrap(), - ); - handles.push( - thread::Builder::new() - .name("Backup2".to_string()) - .spawn(move || { - backup2_projector.epp_and_run( - KVS::>, _, _, _> { - request: backup2_projector.remote(Client), +fn main() { + let args: Vec = env::args().collect(); + if args.len() != 2 || !["client", "server", "backup1", "backup2"].contains(&args[1].as_str()) { + eprintln!("Usage: {} [client|server|backup1|backup2]", args[0]); + process::exit(1); + } + let role = args[1].as_str(); + match role { + "client" => { + let config = HttpTransportConfigBuilder::for_target(Client, ("0.0.0.0", 9010)) + .with(Server, ("localhost", 9011)) + .with(Backup1, ("localhost", 9012)) + .with(Backup2, ("localhost", 9013)) + .build(); + let transport = HttpTransport::new(config); + let projector = Projector::new(Client, transport); + + println!("Enter a command in one of the following formats:"); + println!(" get "); + println!(" put "); + println!("Type 'exit' to quit."); + loop { + // Read input + print!("> "); + io::stdout().flush().unwrap(); + let mut input = String::new(); + io::stdin().read_line(&mut input).unwrap(); + let input = input.trim(); + + // Parse command + let parts: Vec<&str> = input.split_whitespace().collect(); + let request = match parts.as_slice() { + ["get", key] => Request::Get(key.to_string()), + ["put", key, value] if value.parse::().is_ok() => { + Request::Put(key.to_string(), value.parse::().unwrap()) + } + ["exit"] => break, + _ => { + eprintln!("Invalid command. Use 'get ' or 'put '."); + continue; + } + }; + let response = + projector.epp_and_run(KVS::>, _, _, _> { + request: projector.local(request), _phantoms: PhantomData, - }, - ) - }) - .unwrap(), - ); - let retval = - client_projector.epp_and_run(KVS::>, _, _, _> { - request: client_projector.local(request), - _phantoms: PhantomData, - }); - for handle in handles { - handle.join().unwrap(); + }); + println!("Response: {:?}", projector.unwrap(response)); + } + } + "server" => { + println!("Server process started."); + let config = HttpTransportConfigBuilder::for_target(Server, ("0.0.0.0", 9011)) + .with(Client, ("localhost", 9010)) + .with(Backup1, ("localhost", 9012)) + .with(Backup2, ("localhost", 9013)) + .build(); + let transport = HttpTransport::new(config); + let projector = Projector::new(Server, transport); + loop { + projector.epp_and_run(KVS::>, _, _, _> { + request: projector.remote(Client), + _phantoms: PhantomData, + }); + } + } + "backup1" => { + println!("Backup1 process started."); + let config = HttpTransportConfigBuilder::for_target(Backup1, ("0.0.0.0", 9012)) + .with(Client, ("localhost", 9010)) + .with(Server, ("localhost", 9011)) + .with(Backup2, ("localhost", 9013)) + .build(); + let transport = HttpTransport::new(config); + let projector = Projector::new(Backup1, transport); + loop { + projector.epp_and_run(KVS::>, _, _, _> { + request: projector.remote(Client), + _phantoms: PhantomData, + }); + } + } + "backup2" => { + println!("Backup2 process started."); + let config = HttpTransportConfigBuilder::for_target(Backup2, ("0.0.0.0", 9013)) + .with(Client, ("localhost", 9010)) + .with(Server, ("localhost", 9011)) + .with(Backup1, ("localhost", 9012)) + .build(); + let transport = HttpTransport::new(config); + let projector = Projector::new(Backup2, transport); + loop { + projector.epp_and_run(KVS::>, _, _, _> { + request: projector.remote(Client), + _phantoms: PhantomData, + }); + } + } + _ => unreachable!(), } - assert_eq!(client_projector.unwrap(retval), answer); } -#[test] -fn main() { - let two = "xx".to_string(); - let three = "xxx".to_string(); - run_test(Request::Get(two.clone()), 2); - run_test(Request::Get(three.clone()), 3); - run_test(Request::Put(two.clone(), 2), 0); - run_test(Request::Put(three.clone(), 2), -1); +#[cfg(test)] +mod tests { + use super::*; + + fn clear_data() { + if Path::new(".data").exists() { + fs::remove_dir_all(".data").unwrap(); + } + } + + fn handle_requests(scenario: Vec<(Request, Response)>) { + let n = scenario.len(); + type Locations = LocationSet!(Backup1, Backup2); + + let transport_channel = LocalTransportChannelBuilder::new() + .with(Client) + .with(Server) + .with(Backup1) + .with(Backup2) + .build(); + let transport_client = LocalTransport::new(Client, transport_channel.clone()); + let transport_server = LocalTransport::new(Server, transport_channel.clone()); + let transport_backup1 = LocalTransport::new(Backup1, transport_channel.clone()); + let transport_backup2 = LocalTransport::new(Backup2, transport_channel.clone()); + + let client_projector = Projector::new(Client, transport_client); + let server_projector = Projector::new(Server, transport_server); + let backup1_projector = Projector::new(Backup1, transport_backup1); + let backup2_projector = Projector::new(Backup2, transport_backup2); + + let mut handles = Vec::new(); + handles.push( + thread::Builder::new() + .name("Server".to_string()) + .spawn(move || { + for _ in 0..n { + server_projector.epp_and_run(KVS:: { + request: server_projector.remote(Client), + _phantoms: PhantomData, + }); + } + }) + .unwrap(), + ); + handles.push( + thread::Builder::new() + .name("Backup1".to_string()) + .spawn(move || { + for _ in 0..n { + backup1_projector.epp_and_run(KVS:: { + request: backup1_projector.remote(Client), + _phantoms: PhantomData, + }); + } + }) + .unwrap(), + ); + handles.push( + thread::Builder::new() + .name("Backup2".to_string()) + .spawn(move || { + for _ in 0..n { + backup2_projector.epp_and_run(KVS:: { + request: backup2_projector.remote(Client), + _phantoms: PhantomData, + }); + } + }) + .unwrap(), + ); + for (req, expected_response) in scenario { + let response = client_projector.epp_and_run(KVS:: { + request: client_projector.local(req), + _phantoms: PhantomData, + }); + assert_eq!(client_projector.unwrap(response), expected_response); + } + for handle in handles { + handle.join().unwrap(); + } + } + + #[test] + fn test_kvs() { + clear_data(); + handle_requests(vec![ + (Request::Get("foo".to_string()), -1), + (Request::Put("foo".to_string(), 42), 0), + (Request::Get("foo".to_string()), 42), + (Request::Put("foo".to_string(), 43), 0), + (Request::Get("foo".to_string()), 43), + ]); + } } From 6a20d4a319297df5cf3789c2289ce18d8f9879d1 Mon Sep 17 00:00:00 2001 From: Shun Kashiwa Date: Sat, 15 Mar 2025 21:07:36 -0700 Subject: [PATCH 3/6] fix missing import --- chorus_lib/examples/kvs.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/chorus_lib/examples/kvs.rs b/chorus_lib/examples/kvs.rs index a39d29d..cebfd4f 100644 --- a/chorus_lib/examples/kvs.rs +++ b/chorus_lib/examples/kvs.rs @@ -342,6 +342,7 @@ fn main() { #[cfg(test)] mod tests { use super::*; + use chorus_lib::transport::local::{LocalTransport, LocalTransportChannelBuilder}; fn clear_data() { if Path::new(".data").exists() { From 4f7d4b41a27232b0e3296162563670adc9c71c92 Mon Sep 17 00:00:00 2001 From: Shun Kashiwa Date: Sat, 15 Mar 2025 21:07:55 -0700 Subject: [PATCH 4/6] run examples test on ci --- .github/workflows/rust.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index ee87bbc..8c25656 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -34,6 +34,10 @@ jobs: uses: actions-rs/cargo@v1 with: command: test + - name: cargo test --examples + uses: actions-rs/cargo@v1 + with: + command: test --examples publish-book: runs-on: ubuntu-22.04 From eb8c3fb8ea0eba353829989d88f6f47231292e86 Mon Sep 17 00:00:00 2001 From: Shun Kashiwa Date: Sun, 16 Mar 2025 16:07:27 -0700 Subject: [PATCH 5/6] tweak playground --- chorus_lib/examples/playground.rs | 94 +++++++++++++++++++------------ 1 file changed, 57 insertions(+), 37 deletions(-) diff --git a/chorus_lib/examples/playground.rs b/chorus_lib/examples/playground.rs index a4b5bb2..93f2448 100644 --- a/chorus_lib/examples/playground.rs +++ b/chorus_lib/examples/playground.rs @@ -1,3 +1,6 @@ +use std::env; +use std::io::{self, Write}; + /// # Testing Playground /// /// This is a place where you can write your own code to test ChoRus. @@ -8,64 +11,81 @@ /// /// You can run this program by using the following command: /// -/// cargo run --example playground +/// cargo run --example playground +/// where is either "alpha" or "beta". +/// use chorus_lib::{ - core::{Choreography, ChoreographyLocation, LocationSet}, - transport::local::{LocalTransport, LocalTransportChannelBuilder}, + core::{Choreography, ChoreographyLocation, LocationSet, Projector}, + transport::http::{HttpTransport, HttpTransportConfigBuilder}, }; -use rand; // STEP 1: Add locations #[derive(ChoreographyLocation)] -struct Alice; +struct Alpha; #[derive(ChoreographyLocation)] -struct Bob; +struct Beta; // STEP 2: Write a Choreography struct MainChoreography; impl Choreography for MainChoreography { - type L = LocationSet!(Alice, Bob); + type L = LocationSet!(Alpha, Beta); fn run(self, op: &impl chorus_lib::core::ChoreoOp) -> () { - let random_number_at_alice = op.locally(Alice, |_| { - let random_number = rand::random::(); - println!("Random number at Alice: {}", random_number); - random_number + let a = op.locally(Alpha, |_| loop { + print!("Enter a number: "); + io::stdout().flush().expect("Failed to flush stdout"); + let mut input = String::new(); + if io::stdin().read_line(&mut input).is_err() { + continue; + } + if let Ok(num) = input.trim().parse::() { + break num; + } else { + println!("Please enter a valid integer."); + } + }); + let a = op.comm(Alpha, Beta, &a); + let b = op.locally(Beta, |_| { + print!("Enter a word for Beta to send to Alpha: "); + io::stdout().flush().expect("Failed to flush stdout"); + let mut input = String::new(); + io::stdin() + .read_line(&mut input) + .expect("Failed to read line"); + input.trim().to_string() + }); + let b = op.comm(Beta, Alpha, &b); + op.locally(Alpha, |un| { + println!("Alpha received: {}", un.unwrap(&b)); }); - let random_number_at_bob = op.comm(Alice, Bob, &random_number_at_alice); - op.locally(Bob, |un| { - let random_number = un.unwrap(&random_number_at_bob); - println!("Random number at Bob: {}", random_number); + op.locally(Beta, |un| { + println!("Beta received: {}", un.unwrap(&a)); }); } } // STEP 3: Run the choreography fn main() { - // In this example, we use the local transport and run the choreography in two threads. - // Refer to the documentation for more information on how to use other transports. - let mut handles = Vec::new(); - let transport_channel = LocalTransportChannelBuilder::new() - .with(Alice) - .with(Bob) - .build(); - { - let transport = LocalTransport::new(Alice, transport_channel.clone()); - handles.push(std::thread::spawn(move || { - let projector = chorus_lib::core::Projector::new(Alice, transport); + let role = env::args().nth(1).expect("Usage: playground "); + match role.as_str() { + "alpha" => { + let config = HttpTransportConfigBuilder::for_target(Alpha, ("0.0.0.0", 8080)) + .with(Beta, ("127.0.0.1", 8081)) + .build(); + let transport = HttpTransport::new(config); + let projector = Projector::new(Alpha, transport); projector.epp_and_run(MainChoreography); - })); - } - { - let transport = LocalTransport::new(Bob, transport_channel.clone()); - handles.push(std::thread::spawn(move || { - let projector = chorus_lib::core::Projector::new(Bob, transport); + } + "beta" => { + let config = HttpTransportConfigBuilder::for_target(Beta, ("0.0.0.0", 8081)) + .with(Alpha, ("127.0.0.1", 8080)) + .build(); + let transport = HttpTransport::new(config); + let projector = Projector::new(Beta, transport); projector.epp_and_run(MainChoreography); - })); - } - for h in handles { - h.join().unwrap(); - } + } + _ => panic!("Invalid role"), + }; } From ccdea93cd9492c58de5da46f5a0f989731b8e73e Mon Sep 17 00:00:00 2001 From: Shun Kashiwa Date: Sun, 16 Mar 2025 16:21:21 -0700 Subject: [PATCH 6/6] update CI workflow to use args for example tests --- .github/workflows/rust.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 8c25656..3f4475e 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -37,7 +37,8 @@ jobs: - name: cargo test --examples uses: actions-rs/cargo@v1 with: - command: test --examples + command: test + args: --examples publish-book: runs-on: ubuntu-22.04