diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index ee87bbc..3f4475e 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -34,6 +34,11 @@ jobs: uses: actions-rs/cargo@v1 with: command: test + - name: cargo test --examples + uses: actions-rs/cargo@v1 + with: + command: test + args: --examples publish-book: runs-on: ubuntu-22.04 diff --git a/chorus_lib/examples/kvs.rs b/chorus_lib/examples/kvs.rs new file mode 100644 index 0000000..cebfd4f --- /dev/null +++ b/chorus_lib/examples/kvs.rs @@ -0,0 +1,436 @@ +extern crate chorus_lib; + +use std::env; +use std::hash::{DefaultHasher, Hash, Hasher}; +use std::marker::PhantomData; +use std::{ + collections::HashMap, + fs::{self, OpenOptions}, + io::{Read, Write}, + path::Path, +}; +use std::{io, process, thread}; + +use chorus_lib::core::{ + ChoreoOp, Choreography, ChoreographyLocation, Deserialize, Faceted, FanInChoreography, HCons, + HNil, Located, LocationSet, LocationSetFoldable, Member, MultiplyLocated, Portable, Projector, + Serialize, Subset, +}; +use chorus_lib::transport::http::{HttpTransport, HttpTransportConfigBuilder}; + +type Response = i32; +type Value = i32; +type Key = String; + +#[derive(Serialize, Deserialize)] +enum Request { + Get(Key), + Put(Key, Value), +} + +#[derive(Serialize, Deserialize, Debug)] +struct KeyValueStore { + store: HashMap, +} + +impl KeyValueStore { + fn load_from_file(file_path: &str) -> Self { + if Path::new(file_path).exists() { + let mut file = OpenOptions::new().read(true).open(file_path).unwrap(); + let mut contents = String::new(); + file.read_to_string(&mut contents).unwrap(); + serde_json::from_str(&contents).unwrap_or(Self { + store: HashMap::new(), + }) + } else { + Self { + store: HashMap::new(), + } + } + } + + fn save_to_file(&self, file_path: &str) { + let json_data = serde_json::to_string(&self).unwrap(); + let mut file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(file_path) + .unwrap(); + file.write_all(json_data.as_bytes()).unwrap(); + } +} + +fn get_thread_id() -> String { + let pid = process::id(); + let thread_id = thread::current().id(); + + let mut hasher = DefaultHasher::new(); + pid.hash(&mut hasher); + thread_id.hash(&mut hasher); + format!("{:x}", hasher.finish()) // Convert to hexadecimal for compactness +} + +fn create_data_dir_if_necessary() { + if !Path::new(".data").exists() { + fs::create_dir(".data").unwrap(); + let gitignore_path = Path::new(".data/.gitignore"); + let mut file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(gitignore_path) + .unwrap(); + file.write_all(b"*").unwrap(); + } +} + +fn handle_get(key: String) -> i32 { + let thread_id = get_thread_id(); + let file_path = format!(".data/{}", thread_id); + + create_data_dir_if_necessary(); + + let kv_store = KeyValueStore::load_from_file(&file_path); + kv_store.store.get(&key).cloned().unwrap_or(-1) +} + +fn handle_put(key: String, value: i32) -> i32 { + let thread_id = get_thread_id(); + let file_path = format!(".data/{}", thread_id); + + create_data_dir_if_necessary(); + + let mut kv_store = KeyValueStore::load_from_file(&file_path); + kv_store.store.insert(key, value); + kv_store.save_to_file(&file_path); + 0 +} + +#[derive(ChoreographyLocation, Debug)] +struct Client; + +#[derive(ChoreographyLocation, Debug)] +struct Server; + +#[derive(ChoreographyLocation, Debug)] +struct Backup1; + +#[derive(ChoreographyLocation, Debug)] +struct Backup2; + +// This should perhaps be in core? +struct Gather< + 'a, + V, + Senders: LocationSet + Subset, + Recievers: LocationSet + Subset, + Census: LocationSet, + SendersPresent, + RecieversPresent, +> { + values: &'a Faceted, + phantom: PhantomData<(Census, SendersPresent, Recievers, RecieversPresent)>, +} +impl< + 'a, + V: Portable + Copy, + Senders: LocationSet + Subset, + Recievers: LocationSet + Subset, + Census: LocationSet, + SendersPresent, + RecieversPresent, + > FanInChoreography + for Gather<'a, V, Senders, Recievers, Census, SendersPresent, RecieversPresent> +{ + type L = Census; + type QS = Senders; + type RS = Recievers; + fn run< + Sender: ChoreographyLocation, + _SendersPresent, + _RecieversPresent, + SenderPresent, + SenderInSenders, + >( + &self, + op: &impl ChoreoOp, + ) -> MultiplyLocated + where + Self::QS: Subset, + Self::RS: Subset, + Sender: Member, + Sender: Member, + { + let x = op.locally(Sender::new(), |un| *un.unwrap(self.values)); + let x = op.multicast::( + Sender::new(), + ::new(), + &x, + ); + x + } +} + +struct HandleRequest { + request: Located, + _phantoms: PhantomData<(Backups, BackupsPresent, BSpine)>, +} +impl Choreography> + for HandleRequest +where + Backups: Subset, BackupsPresent>, + Backups: LocationSetFoldable, Backups, BSpine>, +{ + type L = HCons; + fn run(self, op: &impl ChoreoOp) -> Located { + match op.broadcast(Server, self.request) { + Request::Put(key, value) => { + let oks = op.parallel(Backups::new(), || handle_put(key.clone(), value)); + let gathered = op.fanin::, _, _, _, _>( + Backups::new(), + Gather { + values: &oks, + phantom: PhantomData, + }, + ); + op.locally(Server, |un| { + let ok = un + .unwrap(&gathered) + .get_map() + .into_values() + .all(|response| response == 0); + if ok { + return handle_put(key.clone(), value); + } else { + return -1; + } + }) + } + Request::Get(key) => op.locally(Server, |_| handle_get(key.clone())), + } + } +} + +struct KVS { + request: Located, + _phantoms: PhantomData<(Backups, BackupsPresent, BackupsAreServers, BSpine)>, +} +impl + Choreography> + for KVS +where + Backups: Subset>, BackupsPresent>, + Backups: Subset, BackupsAreServers>, + Backups: LocationSetFoldable, Backups, BSpine>, +{ + type L = HCons>; + fn run(self, op: &impl ChoreoOp) -> Located { + let request = op.comm(Client, Server, &self.request); + let response = op + .conclave(HandleRequest:: { + request: request, + _phantoms: PhantomData, + }) + .flatten(); + op.comm(Server, Client, &response) + } +} + +fn main() { + let args: Vec = env::args().collect(); + if args.len() != 2 || !["client", "server", "backup1", "backup2"].contains(&args[1].as_str()) { + eprintln!("Usage: {} [client|server|backup1|backup2]", args[0]); + process::exit(1); + } + let role = args[1].as_str(); + match role { + "client" => { + let config = HttpTransportConfigBuilder::for_target(Client, ("0.0.0.0", 9010)) + .with(Server, ("localhost", 9011)) + .with(Backup1, ("localhost", 9012)) + .with(Backup2, ("localhost", 9013)) + .build(); + let transport = HttpTransport::new(config); + let projector = Projector::new(Client, transport); + + println!("Enter a command in one of the following formats:"); + println!(" get "); + println!(" put "); + println!("Type 'exit' to quit."); + loop { + // Read input + print!("> "); + io::stdout().flush().unwrap(); + let mut input = String::new(); + io::stdin().read_line(&mut input).unwrap(); + let input = input.trim(); + + // Parse command + let parts: Vec<&str> = input.split_whitespace().collect(); + let request = match parts.as_slice() { + ["get", key] => Request::Get(key.to_string()), + ["put", key, value] if value.parse::().is_ok() => { + Request::Put(key.to_string(), value.parse::().unwrap()) + } + ["exit"] => break, + _ => { + eprintln!("Invalid command. Use 'get ' or 'put '."); + continue; + } + }; + let response = + projector.epp_and_run(KVS::>, _, _, _> { + request: projector.local(request), + _phantoms: PhantomData, + }); + println!("Response: {:?}", projector.unwrap(response)); + } + } + "server" => { + println!("Server process started."); + let config = HttpTransportConfigBuilder::for_target(Server, ("0.0.0.0", 9011)) + .with(Client, ("localhost", 9010)) + .with(Backup1, ("localhost", 9012)) + .with(Backup2, ("localhost", 9013)) + .build(); + let transport = HttpTransport::new(config); + let projector = Projector::new(Server, transport); + loop { + projector.epp_and_run(KVS::>, _, _, _> { + request: projector.remote(Client), + _phantoms: PhantomData, + }); + } + } + "backup1" => { + println!("Backup1 process started."); + let config = HttpTransportConfigBuilder::for_target(Backup1, ("0.0.0.0", 9012)) + .with(Client, ("localhost", 9010)) + .with(Server, ("localhost", 9011)) + .with(Backup2, ("localhost", 9013)) + .build(); + let transport = HttpTransport::new(config); + let projector = Projector::new(Backup1, transport); + loop { + projector.epp_and_run(KVS::>, _, _, _> { + request: projector.remote(Client), + _phantoms: PhantomData, + }); + } + } + "backup2" => { + println!("Backup2 process started."); + let config = HttpTransportConfigBuilder::for_target(Backup2, ("0.0.0.0", 9013)) + .with(Client, ("localhost", 9010)) + .with(Server, ("localhost", 9011)) + .with(Backup1, ("localhost", 9012)) + .build(); + let transport = HttpTransport::new(config); + let projector = Projector::new(Backup2, transport); + loop { + projector.epp_and_run(KVS::>, _, _, _> { + request: projector.remote(Client), + _phantoms: PhantomData, + }); + } + } + _ => unreachable!(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chorus_lib::transport::local::{LocalTransport, LocalTransportChannelBuilder}; + + fn clear_data() { + if Path::new(".data").exists() { + fs::remove_dir_all(".data").unwrap(); + } + } + + fn handle_requests(scenario: Vec<(Request, Response)>) { + let n = scenario.len(); + type Locations = LocationSet!(Backup1, Backup2); + + let transport_channel = LocalTransportChannelBuilder::new() + .with(Client) + .with(Server) + .with(Backup1) + .with(Backup2) + .build(); + let transport_client = LocalTransport::new(Client, transport_channel.clone()); + let transport_server = LocalTransport::new(Server, transport_channel.clone()); + let transport_backup1 = LocalTransport::new(Backup1, transport_channel.clone()); + let transport_backup2 = LocalTransport::new(Backup2, transport_channel.clone()); + + let client_projector = Projector::new(Client, transport_client); + let server_projector = Projector::new(Server, transport_server); + let backup1_projector = Projector::new(Backup1, transport_backup1); + let backup2_projector = Projector::new(Backup2, transport_backup2); + + let mut handles = Vec::new(); + handles.push( + thread::Builder::new() + .name("Server".to_string()) + .spawn(move || { + for _ in 0..n { + server_projector.epp_and_run(KVS:: { + request: server_projector.remote(Client), + _phantoms: PhantomData, + }); + } + }) + .unwrap(), + ); + handles.push( + thread::Builder::new() + .name("Backup1".to_string()) + .spawn(move || { + for _ in 0..n { + backup1_projector.epp_and_run(KVS:: { + request: backup1_projector.remote(Client), + _phantoms: PhantomData, + }); + } + }) + .unwrap(), + ); + handles.push( + thread::Builder::new() + .name("Backup2".to_string()) + .spawn(move || { + for _ in 0..n { + backup2_projector.epp_and_run(KVS:: { + request: backup2_projector.remote(Client), + _phantoms: PhantomData, + }); + } + }) + .unwrap(), + ); + for (req, expected_response) in scenario { + let response = client_projector.epp_and_run(KVS:: { + request: client_projector.local(req), + _phantoms: PhantomData, + }); + assert_eq!(client_projector.unwrap(response), expected_response); + } + for handle in handles { + handle.join().unwrap(); + } + } + + #[test] + fn test_kvs() { + clear_data(); + handle_requests(vec![ + (Request::Get("foo".to_string()), -1), + (Request::Put("foo".to_string(), 42), 0), + (Request::Get("foo".to_string()), 42), + (Request::Put("foo".to_string(), 43), 0), + (Request::Get("foo".to_string()), 43), + ]); + } +} diff --git a/chorus_lib/examples/playground.rs b/chorus_lib/examples/playground.rs index a4b5bb2..93f2448 100644 --- a/chorus_lib/examples/playground.rs +++ b/chorus_lib/examples/playground.rs @@ -1,3 +1,6 @@ +use std::env; +use std::io::{self, Write}; + /// # Testing Playground /// /// This is a place where you can write your own code to test ChoRus. @@ -8,64 +11,81 @@ /// /// You can run this program by using the following command: /// -/// cargo run --example playground +/// cargo run --example playground +/// where is either "alpha" or "beta". +/// use chorus_lib::{ - core::{Choreography, ChoreographyLocation, LocationSet}, - transport::local::{LocalTransport, LocalTransportChannelBuilder}, + core::{Choreography, ChoreographyLocation, LocationSet, Projector}, + transport::http::{HttpTransport, HttpTransportConfigBuilder}, }; -use rand; // STEP 1: Add locations #[derive(ChoreographyLocation)] -struct Alice; +struct Alpha; #[derive(ChoreographyLocation)] -struct Bob; +struct Beta; // STEP 2: Write a Choreography struct MainChoreography; impl Choreography for MainChoreography { - type L = LocationSet!(Alice, Bob); + type L = LocationSet!(Alpha, Beta); fn run(self, op: &impl chorus_lib::core::ChoreoOp) -> () { - let random_number_at_alice = op.locally(Alice, |_| { - let random_number = rand::random::(); - println!("Random number at Alice: {}", random_number); - random_number + let a = op.locally(Alpha, |_| loop { + print!("Enter a number: "); + io::stdout().flush().expect("Failed to flush stdout"); + let mut input = String::new(); + if io::stdin().read_line(&mut input).is_err() { + continue; + } + if let Ok(num) = input.trim().parse::() { + break num; + } else { + println!("Please enter a valid integer."); + } + }); + let a = op.comm(Alpha, Beta, &a); + let b = op.locally(Beta, |_| { + print!("Enter a word for Beta to send to Alpha: "); + io::stdout().flush().expect("Failed to flush stdout"); + let mut input = String::new(); + io::stdin() + .read_line(&mut input) + .expect("Failed to read line"); + input.trim().to_string() + }); + let b = op.comm(Beta, Alpha, &b); + op.locally(Alpha, |un| { + println!("Alpha received: {}", un.unwrap(&b)); }); - let random_number_at_bob = op.comm(Alice, Bob, &random_number_at_alice); - op.locally(Bob, |un| { - let random_number = un.unwrap(&random_number_at_bob); - println!("Random number at Bob: {}", random_number); + op.locally(Beta, |un| { + println!("Beta received: {}", un.unwrap(&a)); }); } } // STEP 3: Run the choreography fn main() { - // In this example, we use the local transport and run the choreography in two threads. - // Refer to the documentation for more information on how to use other transports. - let mut handles = Vec::new(); - let transport_channel = LocalTransportChannelBuilder::new() - .with(Alice) - .with(Bob) - .build(); - { - let transport = LocalTransport::new(Alice, transport_channel.clone()); - handles.push(std::thread::spawn(move || { - let projector = chorus_lib::core::Projector::new(Alice, transport); + let role = env::args().nth(1).expect("Usage: playground "); + match role.as_str() { + "alpha" => { + let config = HttpTransportConfigBuilder::for_target(Alpha, ("0.0.0.0", 8080)) + .with(Beta, ("127.0.0.1", 8081)) + .build(); + let transport = HttpTransport::new(config); + let projector = Projector::new(Alpha, transport); projector.epp_and_run(MainChoreography); - })); - } - { - let transport = LocalTransport::new(Bob, transport_channel.clone()); - handles.push(std::thread::spawn(move || { - let projector = chorus_lib::core::Projector::new(Bob, transport); + } + "beta" => { + let config = HttpTransportConfigBuilder::for_target(Beta, ("0.0.0.0", 8081)) + .with(Alpha, ("127.0.0.1", 8080)) + .build(); + let transport = HttpTransport::new(config); + let projector = Projector::new(Beta, transport); projector.epp_and_run(MainChoreography); - })); - } - for h in handles { - h.join().unwrap(); - } + } + _ => panic!("Invalid role"), + }; } diff --git a/chorus_lib/tests/kvs.rs b/chorus_lib/tests/kvs.rs deleted file mode 100644 index 87497cf..0000000 --- a/chorus_lib/tests/kvs.rs +++ /dev/null @@ -1,234 +0,0 @@ -extern crate chorus_lib; - -use std::marker::PhantomData; -use std::thread; - -use chorus_lib::core::{ - ChoreoOp, Choreography, ChoreographyLocation, Deserialize, Faceted, FanInChoreography, HCons, - HNil, Located, LocationSet, LocationSetFoldable, Member, MultiplyLocated, Portable, Projector, - Serialize, Subset, -}; -use chorus_lib::transport::local::{LocalTransport, LocalTransportChannelBuilder}; - -type Response = i32; -type Key = String; - -#[derive(Serialize, Deserialize)] -enum Request { - Get(Key), - Put(Key, i32), -} - -fn handle_get(key: Key) -> Response { - key.len().try_into().unwrap() -} - -fn handle_put(key: Key, val: i32) -> Response { - (val != handle_get(key)) as Response -} - -#[derive(ChoreographyLocation, Debug)] -struct Client; - -#[derive(ChoreographyLocation, Debug)] -struct Server; - -#[derive(ChoreographyLocation, Debug)] -struct Backup1; - -#[derive(ChoreographyLocation, Debug)] -struct Backup2; - -// This should perhaps be in core? -struct Gather< - 'a, - V, - Senders: LocationSet + Subset, - Recievers: LocationSet + Subset, - Census: LocationSet, - SendersPresent, - RecieversPresent, -> { - values: &'a Faceted, - phantom: PhantomData<(Census, SendersPresent, Recievers, RecieversPresent)>, -} -impl< - 'a, - V: Portable + Copy, - Senders: LocationSet + Subset, - Recievers: LocationSet + Subset, - Census: LocationSet, - SendersPresent, - RecieversPresent, - > FanInChoreography - for Gather<'a, V, Senders, Recievers, Census, SendersPresent, RecieversPresent> -{ - type L = Census; - type QS = Senders; - type RS = Recievers; - fn run< - Sender: ChoreographyLocation, - _SendersPresent, - _RecieversPresent, - SenderPresent, - SenderInSenders, - >( - &self, - op: &impl ChoreoOp, - ) -> MultiplyLocated - where - Self::QS: Subset, - Self::RS: Subset, - Sender: Member, - Sender: Member, - { - let x = op.locally(Sender::new(), |un| *un.unwrap(self.values)); - let x = op.multicast::( - Sender::new(), - ::new(), - &x, - ); - x - } -} - -struct HandleRequest { - request: Located, - _phantoms: PhantomData<(Backups, BackupsPresent, BSpine)>, -} -impl Choreography> - for HandleRequest -where - Backups: Subset, BackupsPresent>, - Backups: LocationSetFoldable, Backups, BSpine>, -{ - type L = HCons; - fn run(self, op: &impl ChoreoOp) -> Located { - match op.broadcast(Server, self.request) { - Request::Put(key, value) => { - let oks = op.parallel(Backups::new(), || handle_put(key.clone(), value)); - let gathered = op.fanin::, _, _, _, _>( - Backups::new(), - Gather { - values: &oks, - phantom: PhantomData, - }, - ); - op.locally(Server, |un| { - let ok = un - .unwrap(&gathered) - .get_map() - .into_values() - .all(|response| response == 0); - if ok { - return handle_put(key.clone(), value); - } else { - return -1; - } - }) - } - Request::Get(key) => op.locally(Server, |_| handle_get(key.clone())), - } - } -} - -struct KVS { - request: Located, - _phantoms: PhantomData<(Backups, BackupsPresent, BackupsAreServers, BSpine)>, -} -impl - Choreography> - for KVS -where - Backups: Subset>, BackupsPresent>, - Backups: Subset, BackupsAreServers>, - Backups: LocationSetFoldable, Backups, BSpine>, -{ - type L = HCons>; - fn run(self, op: &impl ChoreoOp) -> Located { - let request = op.comm(Client, Server, &self.request); - let response = op - .conclave(HandleRequest:: { - request: request, - _phantoms: PhantomData, - }) - .flatten(); - op.comm(Server, Client, &response) - } -} - -fn run_test(request: Request, answer: Response) { - let transport_channel = LocalTransportChannelBuilder::new() - .with(Client) - .with(Server) - .with(Backup1) - .with(Backup2) - .build(); - let transport_client = LocalTransport::new(Client, transport_channel.clone()); - let transport_server = LocalTransport::new(Server, transport_channel.clone()); - let transport_backup1 = LocalTransport::new(Backup1, transport_channel.clone()); - let transport_backup2 = LocalTransport::new(Backup2, transport_channel.clone()); - - let client_projector = Projector::new(Client, transport_client); - let server_projector = Projector::new(Server, transport_server); - let backup1_projector = Projector::new(Backup1, transport_backup1); - let backup2_projector = Projector::new(Backup2, transport_backup2); - - let mut handles: Vec>> = Vec::new(); - handles.push( - thread::Builder::new() - .name("Server".to_string()) - .spawn(move || { - server_projector.epp_and_run(KVS::>, _, _, _> { - request: server_projector.remote(Client), - _phantoms: PhantomData, - }) - }) - .unwrap(), - ); - handles.push( - thread::Builder::new() - .name("Backup1".to_string()) - .spawn(move || { - backup1_projector.epp_and_run( - KVS::>, _, _, _> { - request: backup1_projector.remote(Client), - _phantoms: PhantomData, - }, - ) - }) - .unwrap(), - ); - handles.push( - thread::Builder::new() - .name("Backup2".to_string()) - .spawn(move || { - backup2_projector.epp_and_run( - KVS::>, _, _, _> { - request: backup2_projector.remote(Client), - _phantoms: PhantomData, - }, - ) - }) - .unwrap(), - ); - let retval = - client_projector.epp_and_run(KVS::>, _, _, _> { - request: client_projector.local(request), - _phantoms: PhantomData, - }); - for handle in handles { - handle.join().unwrap(); - } - assert_eq!(client_projector.unwrap(retval), answer); -} - -#[test] -fn main() { - let two = "xx".to_string(); - let three = "xxx".to_string(); - run_test(Request::Get(two.clone()), 2); - run_test(Request::Get(three.clone()), 3); - run_test(Request::Put(two.clone(), 2), 0); - run_test(Request::Put(three.clone(), 2), -1); -}