diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c795b05 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +build \ No newline at end of file diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt index b1d3c74..55cb00c 100644 --- a/benchmarks/CMakeLists.txt +++ b/benchmarks/CMakeLists.txt @@ -73,6 +73,24 @@ target_link_libraries(matmul-rich PRIVATE ts_array PRIVATE ts_matrix) +add_executable(matmul-dist-simple + matmul-dist-simple.cpp) +target_link_libraries(matmul-dist-simple + PRIVATE df_interface + PRIVATE ts_type + PRIVATE df_operation + PRIVATE ts_array + PRIVATE ts_matrix) + +add_executable(matmul-distributed + matmul-distributed.cpp) +target_link_libraries(matmul-distributed + PRIVATE df_interface + PRIVATE ts_type + PRIVATE df_operation + PRIVATE ts_array + PRIVATE ts_matrix) + add_executable(k_means_mapreduce k_means_mapreduce.cpp) target_link_libraries(k_means_mapreduce PRIVATE df_interface diff --git a/benchmarks/matmul-dist-simple.cpp b/benchmarks/matmul-dist-simple.cpp new file mode 100644 index 0000000..09070a7 --- /dev/null +++ b/benchmarks/matmul-dist-simple.cpp @@ -0,0 +1,91 @@ +// Test file testing distributed array capabilities - multiplies a matrix by itself and the result of previous iteration - distributed across two hosts. +// Written by Krish & Gen +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "../df_interface.h" +#include "type_system/ts_type.h" +#include "type_system/types/ts_array.h" +#include "type_system/types/ts_matrix.h" +#include "type_system/types/ts_primitive.h" +#include "type_system/types/ts_string.h" + + +const size_t MAT_SIZE = 2; + +int a[MAT_SIZE][MAT_SIZE] = { + {1, 1}, + {1, 1} +}; +const int curr_host_id = 2; + +const int ns = 1; + +void fire_matrix(int matrix[MAT_SIZE][MAT_SIZE], int id, unsigned long long itr); + +int main(int argc, char **argv) { + // Program laminar set up + system("sudo find . -name \"lmr*\" -delete"); + laminar_reset(); /* reset setup data structures */ + laminar_init(); + + set_host(curr_host_id); + add_host(1, "169.231.230.190", "/disk2/cspot-namespace-platform/"); + add_host(2, "169.231.230.247", "/disk2/cspot-namespace-platform/"); + + add_node(ns, 1, 1, {DF_CUSTOM, MATRIX_MULTIPLY}); + + // Inputs + add_operand(ns, 1, 2); // a + add_operand(ns, 2, 3); // b + + subscribe(ns, 1, 0, ns, 2); + subscribe(ns, 1, 1, ns, 3); + + /* Run program */ + laminar_setup(); + if (curr_host_id == 1) { + fire_matrix(a, 2, 1); + } + else{ + fire_matrix(a, 3, 1); + } + for (unsigned long long itr = 1; itr < 31; itr++) { + operand result; + int err = get_result(ns, 1, &result, itr); + if (err < 0) { + std::cout << "Failed to read the result " << std::endl; + } + ts_value* const result_value = load_value(&result.operand_value, ns, 1); + int result_matrix[MAT_SIZE][MAT_SIZE]; + + get_integer_matrix(result_matrix, result_value); + for (int i = 0; i < MAT_SIZE; i++) { + for (int j = 0; j < MAT_SIZE; j++) { + std::cout << result_matrix[i][j] << " "; + } + std::cout << std::endl; + } + if (curr_host_id == 1) { + fire_matrix(a, 2, itr+1); + } + else{ + fire_matrix(result_matrix, 3, itr+1); + } + } +} + +void fire_matrix(int matrix[MAT_SIZE][MAT_SIZE], int id, unsigned long long itr){ + struct ts_value* operand_value = value_from_integer_matrix(matrix, MAT_SIZE, MAT_SIZE); + operand op(operand_value, itr); + write_value(&op.operand_value); + fire_operand(ns, id, &op); + value_deep_delete(operand_value); +} \ No newline at end of file diff --git a/benchmarks/matmul-distributed.cpp b/benchmarks/matmul-distributed.cpp new file mode 100644 index 0000000..aa54af9 --- /dev/null +++ b/benchmarks/matmul-distributed.cpp @@ -0,0 +1,239 @@ +// Test file testing distributed array capabilities - multiplies two matrix inputs fromtwo machines - 30 seperate times. +// Written by Krish & Gen + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "../df_interface.h" +#include "type_system/ts_type.h" +#include "type_system/types/ts_array.h" +#include "type_system/types/ts_matrix.h" +#include "type_system/types/ts_primitive.h" +#include "type_system/types/ts_string.h" + +// int a[8192][8192]; +// int b[8192][8192]; + +int a[4][4] = { + {2, 2, 3, 4}, + {4, 3, 2, 1}, + {2, 4, 1, 3}, + {3, 1, 4, 2} +}; + +int b[4][4] = { + {2, 2, 3, 4}, + {4, 3, 2, 1}, + {2, 4, 1, 3}, + {3, 1, 4, 2} +}; + +bool TIMING = false; + +int curr_host_id = 2; + +double matmul_partition(const int mat_size, int n_partitions, int t_partitions, unsigned long long itr) { + auto start_time = std::chrono::high_resolution_clock::now(); + + // Write matrix A to operands in `t_partitions` chunks + int min_chunk_size = mat_size / t_partitions; + int overflow = mat_size % t_partitions; + + if (curr_host_id == 1) { + // Write matrix B to operand + struct ts_value* operand_value = value_from_integer_matrix(b, mat_size, mat_size); + + operand op(operand_value, itr); + write_value(&op.operand_value); + fire_operand(2, 1, &op); + value_deep_delete(operand_value); + } + else{ + //std::cout << "t_partitions: " << t_partitions << "min_chunk_size: " << min_chunk_size << "mat_size: " << mat_size << std::endl; + int start = 0, end = 0; + for (int i = 0; i < t_partitions - 1; i++) { + end = start + min_chunk_size - 1; + if (overflow) { + end++; + overflow--; + } + + // Write submatrix to operand + // std::cout << '[' << start << ", " << end << ']' << std::endl; + struct ts_value* operand_value = value_from_integer_matrix(a[start], end - start + 1, mat_size); + + operand op(operand_value, itr); + write_value(&op.operand_value); + fire_operand(1, i + 1, &op); + value_deep_delete(operand_value); + + start = end + 1; + } + + // Write final submatrix to operand + // std::cout << '[' << start << ", " << mat_size - 1 << ']' << std::endl; + struct ts_value* operand_value = value_from_integer_matrix(a[start], mat_size - start, mat_size); + operand op(operand_value, itr); + write_value(&op.operand_value); + fire_operand(1, t_partitions, &op); + value_deep_delete(operand_value); + } + + // usleep(5e5); + + if (!TIMING) { + // Show matrix multiplication result + overflow = mat_size % t_partitions; + operand result; + for (unsigned int id = 1; id <= t_partitions; id++) { + int err = get_result(3, id, &result, itr); + if (err < 0) { + std::cout << "Failed to read the result " << std::endl; + } + ts_value* const result_value = load_value(&result.operand_value, 3, id); + int result_matrix[min_chunk_size + !!overflow][mat_size]; + + get_integer_matrix(result_matrix, result_value); + for (int i = 0; i < min_chunk_size + !!overflow; i++) { + for (int j = 0; j < mat_size; j++) { + std::cout << result_matrix[i][j] << " "; + } + std::cout << std::endl; + } + + if (overflow) { + overflow--; + } + } + + return 0.0; + } + + else { + operand op; + std::vector timestamps_ns; + for (unsigned int id = 1; id <= t_partitions; id++) { + int err = get_result(3, id, &op, itr); + + long ts = op.operand_value.value.ts_long; + timestamps_ns.push_back(ts); + } + + long start_ns = std::chrono::duration_cast( + start_time.time_since_epoch()) + .count(); + + long end_ns = *std::max_element(timestamps_ns.begin(), timestamps_ns.end()); + + double latency_ms = (double)(end_ns - start_ns) / 1e6; + std::cout << "latency: " << latency_ms << "ms" << std::endl; + + return latency_ms; + } +} + +double avg(std::vector& v) { + double sum = 0.0; + + for (double d : v) + sum += d; + + return sum / v.size(); +} + +#define ARGS "p:d:" +char *Usage = "matmul-distributed -p partitions -d dimention\n"; + +int main(int argc, char **argv) { + // Program laminar set up + system("sudo find . -name \"lmr*\" -delete"); + laminar_reset(); /* reset setup data structures */ + + set_host(curr_host_id); + add_host(1, "169.231.230.190", "/disk2/cspot-namespace-platform/"); + add_host(2, "169.231.230.247", "/disk2/cspot-namespace-platform/"); + laminar_init(); + + // Matrix set up + int c; + int partitions; + int dim; + + partitions = 0; + dim = 0; + + while((c = getopt(argc,argv,ARGS)) != EOF) { + switch(c) { + case 'p': + partitions = atoi(optarg); + break; + case 'd': + dim = atoi(optarg); + break; + default: + fprintf(stderr, + "unrecognized command %c\n", + (char)c); + fprintf(stderr,"usage: %s",Usage); + break; + } + } + + if(partitions == 0) { + fprintf(stderr,"must specify partitions\n"); + fprintf(stderr,"usage: %s",Usage); + exit(1); + } + + if(dim == 0) { + fprintf(stderr,"must specify partitions\n"); + fprintf(stderr,"usage: %s",Usage); + exit(1); + } + + int t_partitions; + + if(dim < partitions) { + t_partitions = dim; + } else { + t_partitions = partitions; + } + + // Create operands + for (int i = 1; i <= t_partitions; i++) { + add_operand(1, 2, i); // A / t_partitions + } + + add_operand(2, 1, 1); // B + + // Create node for each partition output + for (int i = 1; i <= t_partitions; i++) { + add_node( + 3, (i % 2 + 1), i, + {DF_CUSTOM, (TIMING ? MATRIX_MULTIPLY_TIMING : MATRIX_MULTIPLY)}); + subscribe(3, i, 0, 1, i); // Rows of matrix A (A / t_partitions) + subscribe(3, i, 1, 2, 1); // Matrix B + } + + /* Run program */ + laminar_setup(); + + std::vector latencies_ms; + for (unsigned long long itr = 1; itr < 31; itr++) { + latencies_ms.push_back(matmul_partition(dim, partitions, t_partitions, itr)); + } + + for (double t : latencies_ms) + std::cout << t << ", "; + + std::cout << "\nAverage: " << avg(latencies_ms) << std::endl; + + return 0; +} diff --git a/benchmarks/matmul-rich.cpp b/benchmarks/matmul-rich.cpp index f7e1e10..19d15e4 100644 --- a/benchmarks/matmul-rich.cpp +++ b/benchmarks/matmul-rich.cpp @@ -26,7 +26,7 @@ void transpose(std::vector>& m) { } void matmul() { - system("sudo find . -name \"laminar*\" -delete"); + system("sudo find . -name \"lmr*\" -delete"); laminar_reset(); /* reset setup data structures */ set_host(1); add_host(1, "127.0.0.1", "/home/centos/cspot/build/bin/"); @@ -158,7 +158,7 @@ int a[8192][8192]; int b[8192][8192]; double matmul_partition(const int mat_size, int n_partitions, bool timing) { - system("sudo find . -name \"laminar*\" -delete"); + system("sudo find . -name \"lmr*\" -delete"); laminar_reset(); /* reset setup data structures */ set_host(1); add_host(1, "127.0.0.1", "/home/centos/cspot/build/bin/"); diff --git a/df.h b/df.h index b84c1ec..cdf8b8e 100644 --- a/df.h +++ b/df.h @@ -5,9 +5,11 @@ #define CSPOTDEVICE #include "df_operations.h" #include "ts_types.h" +#include "ts_type.h" #else #include "operation_system/df_operations.h" #include "type_system/ts_types.h" +#include "type_system/ts_type.h" #endif #include @@ -78,11 +80,16 @@ struct operand { struct cached_output { operand op; unsigned long long seq; // CSPOT seq in output woof + int input_ns; + int input_id; // Defaults execution iteration and seq to 0 so initial access is thrown out and updated - explicit cached_output(const operand& op = operand(nullptr, 0), unsigned long long seq = 0) + // We are now storing the source of the cached outputs so we can fetch later in case of aggregate data + explicit cached_output(const operand& op = operand(nullptr, 0), unsigned long long seq = 0, int input_ns = -1, int input_id = -1) : op(op) - , seq(seq) { + , seq(seq) + , input_ns(input_ns) + , input_id(input_id) { } }; diff --git a/df_interface.cpp b/df_interface.cpp index 473c3df..831a8e0 100644 --- a/df_interface.cpp +++ b/df_interface.cpp @@ -329,18 +329,17 @@ void add_operand(const int ns, const int host_id, const int id) { void fire_operand(const int ns, const int id, const operand* const op, const bool trigger_handler) { std::string output_woof = generate_woof_path(OUT_WF_TYPE, ns, id); - unsigned long long curr_itr = (unsigned long long)woof_last_seq(output_woof); + // unsigned long long curr_itr = (unsigned long long)woof_last_seq(output_woof); + + // Spin waits that check the validity of the newest iteration and whether iteration + while (WooFInvalid(woof_last_seq(output_woof))) {} // fire only if the iteration number is 1 more then the previous iteration -printf("fire_operand: output_woof: %s, value: %f\n", - output_woof.c_str(), - op->operand_value.value.ts_double); - - if (curr_itr + 1 == op->itr) { - if (!trigger_handler) { - woof_put(output_woof, "", op); - } else { - woof_put(output_woof, OUTPUT_HANDLER, op); - } + while (woof_last_seq(output_woof) + 1 != op->itr) {} + + if (!trigger_handler) { + woof_put(output_woof, "", op); + } else { + woof_put(output_woof, OUTPUT_HANDLER, op); } } @@ -348,6 +347,7 @@ int get_result(const int ns, const int id, operand* const res, const unsigned lo std::string woof_name = generate_woof_path(OUT_WF_TYPE, ns, id); // wait till output log has atleast itr number of results + while (WooFInvalid(woof_last_seq(woof_name))) {} while (woof_last_seq(woof_name) < itr) {} //while(WooFGetLatestSeqno(woof_name.c_str()) < itr){} @@ -559,3 +559,89 @@ std::string graphviz_representation() { return g; } + +// overloaded load_value with ns and id for aggregate data types +struct ts_value* load_value(const struct ts_value* const unloaded_value, int ns, int id){ + std::string woof_name = generate_woof_path(OUT_WF_TYPE, ns, id); + + size_t pos = woof_name.find_last_of('/'); + std::string uri; + if (pos != std::string::npos) { + uri = woof_name.substr(0, pos + 1); // Keep up to and including the '/' + } else { + uri = ""; + } + + struct ts_value* return_value = new ts_value; + + return_value->type = unloaded_value->type; + switch (unloaded_value->type) { + case TS_UNINITIALIZED: + case TS_UNKNOWN: { + free(return_value); + return NULL; + } + case TS_ERROR: + case TS_EXCEPTION: + case TS_BOOLEAN: + case TS_BYTE: + case TS_SHORT: + case TS_INTEGER: + case TS_LONG: + case TS_UNSIGNED_BYTE: + case TS_UNSIGNED_SHORT: + case TS_UNSIGNED_INTEGER: + case TS_UNSIGNED_LONG: + case TS_FLOAT: + case TS_DOUBLE: + case TS_TIMESTAMP: + case TS_PRIM_STRING: { + if (!value_deep_set(unloaded_value, return_value)) { + free(return_value); + return NULL; + } + } break; + case TS_PRIM_LARGE_STRING: { + if (!value_deep_set(unloaded_value, return_value)) { + free(return_value); + return NULL; + } + } break; + case TS_PRIM_2D_DOUBLE_ARRAY: { + for (size_t i = 0; i < TS_PRIM_2D_DOUBLE_ARRAY_ROWS; i++) { + for (size_t j = 0; j < TS_PRIM_2D_DOUBLE_ARRAY_COLS; j++) { + return_value->value.ts_prim_2d_double_array[i][j] = unloaded_value->value.ts_prim_2d_double_array[i][j]; + } + } + } break; + case TS_PRIM_DOUBLE_ARRAY: { + if (!value_deep_set(unloaded_value, return_value)) { + free(return_value); + return NULL; + } + } break; +#ifndef ESP8266 + case TS_STRING: { + // assume always local + return_value->value.ts_string = unloaded_value->value.ts_string; + if (!load_string_value(&return_value->value.ts_string, uri.c_str())) { + free(return_value); + return NULL; + } + } break; + case TS_ARRAY: { + // assume always local + return_value->value.ts_array = unloaded_value->value.ts_array; + if (!load_array_value(&return_value->value.ts_array, uri.c_str())) { + free(return_value); + return NULL; + } + } break; +#endif + default: { + free(return_value); + return NULL; + } + } + return return_value; +} \ No newline at end of file diff --git a/df_interface.h b/df_interface.h index 17a1bf7..a9eef0d 100644 --- a/df_interface.h +++ b/df_interface.h @@ -45,4 +45,7 @@ std::string generate_woof_host_url(int host_id); std::string graphviz_representation(); +// overloaded load_value in c++ form, for aggregate data types +struct ts_value* load_value(const struct ts_value* const unloaded_value, int ns, int id); + #endif // DF_INTERFACE_H diff --git a/subscription_event_handler.cpp b/subscription_event_handler.cpp index 3a2e855..c2889a2 100644 --- a/subscription_event_handler.cpp +++ b/subscription_event_handler.cpp @@ -20,12 +20,14 @@ operand perform_operation(const std::vector& operands, const struct df_operation operation, - struct df_operation_metadata* const operation_metadata) { + struct df_operation_metadata* const operation_metadata, + const std::vector& input_hosts) { const unsigned long operand_count = operands.size(); struct ts_value* operands_array[operand_count]; const struct ts_value* const_operands_array[operand_count]; for (size_t i = 0; i < operand_count; ++i) { - operands_array[i] = load_value(&operands[i].operand_value); + // Load the operand value, in case if aggregate data + operands_array[i] = load_value(&operands[i].operand_value, input_hosts[i].ns, input_hosts[i].id); const_operands_array[i] = operands_array[i]; if (const_operands_array[i] == nullptr) { log_error("Could not load value [input:%lu]", i); @@ -157,6 +159,8 @@ extern "C" int subscription_event_handler(WOOF* wf, unsigned long seqno, void* p log_debug("[namespace:%d][node_id:%d] has %lu inputs", node_namespace, node_id, input_count); // Scan through subscription outputs and collect operands std::vector op_values(input_count); + // Create a vector of subscriptions to hold the source hosts of the operands + std::vector input_hosts(input_count); for (unsigned long input_index = 0; input_index < input_count; input_index++) { log_debug("[input:%lu] START input processing", input_index); // Get last used output and seqno for this port @@ -177,6 +181,8 @@ extern "C" int subscription_event_handler(WOOF* wf, unsigned long seqno, void* p if (cached_last_output.op.itr == curr_itr) { // Operand for this seq has already been found and cached. Retrieve from cache and proceed op_values[input_index] = cached_last_output.op; + // Save the source of the cached output in case if need to fetch aggregate data later when performing operation + input_hosts[input_index] = subscription(cached_last_output.input_ns, cached_last_output.input_id); log_debug("[input:%lu] Already retrieved, continue", input_index); log_debug("[input:%lu] END input processing", input_index); continue; @@ -204,6 +210,9 @@ extern "C" int subscription_event_handler(WOOF* wf, unsigned long seqno, void* p input_subscription.id, input_subscription.port); + // set the operand input host + input_hosts[input_index] = input_subscription; + // Get relevant operand from subscription output (if it exists) std::string subscription_output_woof = generate_woof_path(OUT_WF_TYPE, input_subscription.ns, input_subscription.id); @@ -334,7 +343,7 @@ extern "C" int subscription_event_handler(WOOF* wf, unsigned long seqno, void* p input_index, subscription_operand.itr, new_sequence_number); - const cached_output new_cached_output = cached_output(subscription_operand, new_sequence_number); + const cached_output new_cached_output = cached_output(subscription_operand, new_sequence_number, input_subscription.ns, input_subscription.id); woof_put(last_used_sub_pos_woof, "", &new_cached_output); } @@ -441,7 +450,7 @@ extern "C" int subscription_event_handler(WOOF* wf, unsigned long seqno, void* p .write_value = true, .output_woof_path = output_woof.c_str()}}; log_debug("Firing operation %d\n",n.id); - operand result = perform_operation(op_values, n.operation, &operation_metadata); + operand result = perform_operation(op_values, n.operation, &operation_metadata, input_hosts); log_debug("Fired operation %d\n",n.id); const unsigned long long last_output_sequence_number = (unsigned long long)woof_last_seq(output_woof); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index b0bcbe4..cd5ca80 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -1,5 +1,36 @@ add_subdirectory(type_system) +add_executable(distributed_addition distributed_addition.cpp) +target_link_libraries(distributed_addition + PRIVATE df_interface + PRIVATE ts_type + PRIVATE df_operation + PRIVATE ts_array + PRIVATE ts_matrix) + +add_executable(distributed_two distributed_two.cpp) +target_link_libraries(distributed_two + PRIVATE df_interface + PRIVATE ts_type) + +add_executable(distributed_vector distributed_vector.cpp) +target_link_libraries(distributed_vector + PRIVATE df_interface + PRIVATE ts_type + PRIVATE df_operation + PRIVATE ts_array + PRIVATE ts_matrix + PRIVATE ts_string) + +add_executable(distributed_vector2 distributed_vector2.cpp) +target_link_libraries(distributed_vector2 + PRIVATE df_interface + PRIVATE ts_type + PRIVATE df_operation + PRIVATE ts_array + PRIVATE ts_matrix + PRIVATE ts_string) + add_executable(multinode_regression multinode_regression.cpp) target_link_libraries(multinode_regression PRIVATE df_interface diff --git a/tests/distributed_addition.cpp b/tests/distributed_addition.cpp new file mode 100644 index 0000000..4db2562 --- /dev/null +++ b/tests/distributed_addition.cpp @@ -0,0 +1,65 @@ +// run a distributed addition operation on two devices. +// Written by Krish & Gen + +#include "../df_interface.h" +#include "type_system/types/ts_primitive.h" + +#include +#include +#include + +int main() { + int ns = 1; // Namespace + system("sudo find . -name \"lmr*\" -delete"); + laminar_reset(); /* reset setup data structures */ + laminar_init(); + + // Set up single device on localhost + + int curr_host_id = 2; + set_host(curr_host_id); + add_host(1, "169.231.230.190", "/disk2/cspot-namespace-platform/"); + add_host(2, "169.231.230.247", "/disk2/cspot-namespace-platform/"); + + // Nodes is on device 1: (namespace, host, node) + + add_node(ns, 1, 1, {.category = DF_ARITHMETIC, .operation = DF_ARITH_ADDITION}); // a + b + + // Inputs is shared on both devices + + add_operand(ns, 1, 2); // a + add_operand(ns, 2, 3); // b + + // Edges: "namespace:dest_node:operand_num", "namespace:source_node" + subscribe("1:1:0", "1:2"); // ADD[0] <-- a + subscribe("1:1:1", "1:3"); // ADD[1] <-- b + + // Optional: Autogenerate diagram + std::cout << graphviz_representation() << std::endl; + // exit(0); + + laminar_setup(); + if(curr_host_id == 1) { + struct ts_value value_a{}; + set_double(&value_a, 1); + operand op_a(&value_a); + fire_operand(ns, 2, &op_a); + std::cout << "Completed input " << std::endl; + } + else { + struct ts_value value_b{}; + set_double(&value_b, 2); + operand op_b(&value_b); + fire_operand(ns, 3, &op_b); + std::cout << "Completed input " << std::endl; + } + + operand result; + int err = get_result(ns, 1, &result, 1); + if (err < 0) { + std::cout << "Failed to read the result " << std::endl; + } + std::cout << "Result: " << result.operand_value.value.ts_double << std::endl; + + return 0; +} \ No newline at end of file diff --git a/tests/distributed_simple_laminar_example.cpp b/tests/distributed_simple_laminar_example.cpp index fd2ca23..f9b9af7 100644 --- a/tests/distributed_simple_laminar_example.cpp +++ b/tests/distributed_simple_laminar_example.cpp @@ -16,14 +16,15 @@ int main() { int ns = 1; // Laminar Namespace (not CSPOT's) - + system("sudo find . -name \"lmr*\" -delete"); + laminar_reset(); /* reset setup data structures */ laminar_init(); // Set up two devices (change IPs and/or cspot namespaces) int curr_host_id = 2; set_host(curr_host_id); - add_host(1, "169.231.230.183", "/cspot-device-namespace/"); - add_host(2, "169.231.230.225", "/cspot-device-namespace/"); + add_host(1, "169.231.230.190", "/disk2/cspot-namespace-platform/"); + add_host(2, "169.231.230.247", "/disk2/cspot-namespace-platform/"); // Nodes diff --git a/tests/distributed_two.cpp b/tests/distributed_two.cpp new file mode 100644 index 0000000..da989d8 --- /dev/null +++ b/tests/distributed_two.cpp @@ -0,0 +1,56 @@ +// passing a double from one host to another +// testing simple distributed capabilities +// Written by Krish & Gen + +#include "../df_interface.h" +#include "type_system/types/ts_primitive.h" + +#include +#include +#include + +int main() { + int ns = 1; // Laminar Namespace (not CSPOT's) + system("sudo find . -name \"lmr*\" -delete"); + laminar_reset(); /* reset setup data structures */ + laminar_init(); + + // Set up two devices (change IPs and/or cspot namespaces) + int curr_host_id = 2; + set_host(curr_host_id); + add_host(1, "169.231.230.190", "/disk2/cspot-namespace-platform/"); + add_host(2, "169.231.230.247", "/disk2/cspot-namespace-platform/"); + + // Nodes + add_node(ns, 1, 3, {.category = DF_ARITHMETIC, .operation = DF_ARITH_MULTIPLICATION}); // a * b + + // Inputs + add_operand(ns, 1, 1); // a + add_operand(ns, 2, 2); // b + + // Edges + subscribe("1:3:0", "1:1"); // ADD[0] <-- a + subscribe("1:3:1", "1:2"); // ADD[1] <-- b + + laminar_setup(); + if(curr_host_id == 1) { + // Example: (1 + 2) * 3 + struct ts_value value_a{}; + set_double(&value_a, 1); + operand op_a(&value_a, 1); + fire_operand(ns, 1, &op_a); + } + else { + struct ts_value value_b{}; + set_double(&value_b, 2); + operand op_b(&value_b, 1); + fire_operand(ns, 2, &op_b); + } + operand result; + int err = get_result(ns, 3, &result, 1); + if (err < 0) { + std::cout << "Failed to read the result " << std::endl; + } + + std::cout << "Result: " << result.operand_value.value.ts_double << std::endl; +} diff --git a/tests/distributed_vector.cpp b/tests/distributed_vector.cpp new file mode 100644 index 0000000..206e731 --- /dev/null +++ b/tests/distributed_vector.cpp @@ -0,0 +1,75 @@ +// passing a vector of unsigned bytes between two hosts +// with 1 node, has 1 sec wait between each iteration +// Written by Krish & Gen + +#include "../df_interface.h" +#include "type_system/ts_type.h" +#include "type_system/types/ts_array.h" +#include "type_system/types/ts_matrix.h" +#include "type_system/types/ts_primitive.h" +#include "type_system/types/ts_string.h" + +#include +#include +#include +#include + +int main() { + int ns = 1; // Namespace + system("sudo find . -name \"lmr*\" -delete"); + laminar_reset(); /* reset setup data structures */ + + int curr_host_id = 2; + set_host(curr_host_id); + add_host(1, "169.231.230.190", "/disk2/cspot-namespace-platform/"); + add_host(2, "169.231.230.247", "/disk2/cspot-namespace-platform/"); + laminar_init(); + + const struct df_operation parse = {DF_INTERNAL, DF_INTERNAL_NOOP}; + add_node(ns, 1, 1, parse); + add_operand(ns, 1, 2); + + subscribe(ns, 1, 0, ns, 2); + laminar_setup(); + + for (unsigned long long itr = 1; itr < 30; itr++){ + std::cout << "itr: " << itr << std::endl; + if (curr_host_id == 1){ + uint8_t array[] = {1, 2, 3}; + struct ts_value* operand_value = value_from_unsigned_byte_array(array, 3); + operand op(operand_value, itr); + write_value(&op.operand_value); + fire_operand(ns, 2, &op); + value_deep_delete(operand_value); + + sleep(1); + + std::cout << "gets: " << (int)woof_last_seq(generate_woof_path(OUT_WF_TYPE, ns, 1)) << std::endl; + operand result; + get_result(ns, 1, &result, itr); + ts_value* loaded_result = load_value(&result.operand_value); + + uint8_t result_array[3]; + get_unsigned_byte_array(result_array, loaded_result); + + for(int i = 0; i < sizeof(result_array); i++){ + std::cout << (int)result_array[i] << std::endl; + } + } + else{ + std::cout << "gets: " << (int)woof_last_seq(generate_woof_path(OUT_WF_TYPE, ns, 1)) << std::endl; + operand result; + get_result(ns, 1, &result, itr); + ts_value* loaded_result = load_value(&result.operand_value, ns, 1); + + uint8_t result_array[3]; + get_unsigned_byte_array(result_array, loaded_result); + + for(int i = 0; i < sizeof(result_array); i++){ + std::cout << (int)result_array[i] << std::endl; + } + } + } + + return 0; +} \ No newline at end of file diff --git a/tests/distributed_vector2.cpp b/tests/distributed_vector2.cpp new file mode 100644 index 0000000..2a2e8c5 --- /dev/null +++ b/tests/distributed_vector2.cpp @@ -0,0 +1,73 @@ +// passing a vector of unsigned bytes between two hosts +// with 2 nodes so we can achieve some synchronization +// Written by Krish & Gen + +#include "../df_interface.h" +#include "type_system/ts_type.h" +#include "type_system/types/ts_array.h" +#include "type_system/types/ts_matrix.h" +#include "type_system/types/ts_primitive.h" +#include "type_system/types/ts_string.h" + +#include +#include +#include +#include + +int main() { + int ns = 1; // Namespace + system("sudo find . -name \"lmr*\" -delete"); + laminar_reset(); /* reset setup data structures */ + + int curr_host_id = 2; + set_host(curr_host_id); + add_host(1, "169.231.230.190", "/disk2/cspot-namespace-platform/"); + add_host(2, "169.231.230.247", "/disk2/cspot-namespace-platform/"); + laminar_init(); + + const struct df_operation parse = {DF_INTERNAL, DF_INTERNAL_NOOP}; + add_operand(ns, 1, 1); + add_node(ns, 1, 2, parse); + add_node(ns, 2, 3, parse); + + subscribe(ns, 2, 0, ns, 1); + subscribe(ns, 3, 0, ns, 2); + laminar_setup(); + + for (unsigned long long itr = 1; itr < 30; itr++){ + std::cout << "itr: " << itr << std::endl; + if (curr_host_id == 1){ + uint8_t array[] = {1, 2, 3}; + struct ts_value* operand_value = value_from_unsigned_byte_array(array, 3); + operand op(operand_value, itr); + write_value(&op.operand_value); + fire_operand(ns, 1, &op); + value_deep_delete(operand_value); + + operand result; + get_result(ns, 3, &result, itr); + ts_value* loaded_result = load_value(&result.operand_value, ns, 3); + + uint8_t result_array[3]; + get_unsigned_byte_array(result_array, loaded_result); + + for(int i = 0; i < sizeof(result_array); i++){ + std::cout << (int)result_array[i] << std::endl; + } + } + else{ + operand result; + get_result(ns, 2, &result, itr); + ts_value* loaded_result = load_value(&result.operand_value, ns, 2); + + uint8_t result_array[3]; + get_unsigned_byte_array(result_array, loaded_result); + + for(int i = 0; i < sizeof(result_array); i++){ + std::cout << (int)result_array[i] << std::endl; + } + } + } + + return 0; +} \ No newline at end of file diff --git a/tests/simple_laminar_example.cpp b/tests/simple_laminar_example.cpp index 41277a9..44240e6 100644 --- a/tests/simple_laminar_example.cpp +++ b/tests/simple_laminar_example.cpp @@ -11,7 +11,8 @@ int main() { int ns = 1; // Namespace int hd = 1; // Host device - + system("sudo find . -name \"lmr*\" -delete"); + laminar_reset(); /* reset setup data structures */ laminar_init(); // Set up single device on localhost diff --git a/type_system/include/type_system/type_system/ts_type.h b/type_system/include/type_system/type_system/ts_type.h index 2b561ad..8cc3421 100644 --- a/type_system/include/type_system/type_system/ts_type.h +++ b/type_system/include/type_system/type_system/ts_type.h @@ -17,6 +17,10 @@ extern "C" { */ struct ts_value* load_value(const struct ts_value* unloaded_value); +// used in df_interface.cpp for overloaded load_value +bool load_string_value(struct ts_value_string* string, const char* uri); +bool load_array_value(struct ts_value_array* array, const char* uri); + /** * Function to store value. Note the type system will modify information inside the storage system of the value. * These need to be written to the MAIN WooF otherwise information is lost. diff --git a/type_system/src/ts_type.c b/type_system/src/ts_type.c index 982f18c..5ab3acb 100644 --- a/type_system/src/ts_type.c +++ b/type_system/src/ts_type.c @@ -51,8 +51,6 @@ unsigned long byte_array_to_unsigned_long(const uint8_t* const array) { } -bool load_string_value(struct ts_value_string* string); -bool load_array_value(struct ts_value_array* array); struct ts_value* load_value(const struct ts_value* const unloaded_value) { struct ts_value* return_value = malloc(sizeof(struct ts_value)); @@ -105,15 +103,17 @@ struct ts_value* load_value(const struct ts_value* const unloaded_value) { } break; #ifndef ESP8266 case TS_STRING: { + // assume always local return_value->value.ts_string = unloaded_value->value.ts_string; - if (!load_string_value(&return_value->value.ts_string)) { + if (!load_string_value(&return_value->value.ts_string, NULL)) { free(return_value); return NULL; } } break; case TS_ARRAY: { + // assume always local return_value->value.ts_array = unloaded_value->value.ts_array; - if (!load_array_value(&return_value->value.ts_array)) { + if (!load_array_value(&return_value->value.ts_array, NULL)) { free(return_value); return NULL; } @@ -128,7 +128,7 @@ struct ts_value* load_value(const struct ts_value* const unloaded_value) { } #ifndef ESP8266 -bool load_string_value(struct ts_value_string* const string) { +bool load_string_value(struct ts_value_string* const string, const char* uri) { char woof_id[100]; strcpy(woof_id, TS_STORAGE_PREFIX); strcat(woof_id, "-string"); @@ -168,9 +168,17 @@ void* woof_get_array_value(const char* const woof_id, size_t value_size, const s return array_value; } -bool load_array_value(struct ts_value_array* const array) { // NOLINT(misc-no-recursion) - char woof_id[100]; - strcpy(woof_id, TS_STORAGE_PREFIX); +bool load_array_value(struct ts_value_array* const array, const char* uri) { // NOLINT(misc-no-recursion) + char woof_id[200]; + // if url is not null, use it to create the woof_id + // else use the default woof_id + if (uri){ + strcpy(woof_id, uri); + strcat(woof_id, TS_STORAGE_PREFIX); + } + else{ + strcpy(woof_id, TS_STORAGE_PREFIX); + } strcat(woof_id, "-array-"); char uuid_string[UUID_STR_LEN]; uuid_unparse_lower(array->storage_system.id, uuid_string); @@ -225,7 +233,7 @@ bool load_array_value(struct ts_value_array* const array) { // NOLINT(misc-no-re if (array->value != NULL) { for (size_t i = 0; i < array->size; i++) { struct ts_value_string* const array_element = (struct ts_value_string*)array->value; - const bool is_element_loaded = load_string_value(&array_element[i]); + const bool is_element_loaded = load_string_value(&array_element[i], uri); if (!is_element_loaded) { // delete all previous allocated elements for (size_t j = 0; j < i; j++) { @@ -241,7 +249,7 @@ bool load_array_value(struct ts_value_array* const array) { // NOLINT(misc-no-re if (array->value != NULL) { for (size_t i = 0; i < array->size; i++) { struct ts_value_array* const array_element = (struct ts_value_array*)array->value; - const bool is_element_loaded = load_array_value(&array_element[i]); + const bool is_element_loaded = load_array_value(&array_element[i], uri); if (!is_element_loaded) { // delete all previous allocated elements for (size_t j = 0; j < i; j++) {