From 7f52a858102473ad7b8f90df5947eb17e41a2fae Mon Sep 17 00:00:00 2001 From: Ononymous Date: Sun, 19 Jan 2025 17:23:18 -0800 Subject: [PATCH 01/20] moved self-written tests from old repo --- .gitignore | 1 + tests/CMakeLists.txt | 13 ++++++ tests/distributed_addition.cpp | 62 +++++++++++++++++++++++++ tests/distributed_vector.cpp | 85 ++++++++++++++++++++++++++++++++++ 4 files changed, 161 insertions(+) create mode 100644 .gitignore create mode 100644 tests/distributed_addition.cpp create mode 100644 tests/distributed_vector.cpp diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c795b05 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +build \ No newline at end of file diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index b0bcbe4..b9eea30 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -1,5 +1,18 @@ add_subdirectory(type_system) +add_executable(distributed_addition distributed_addition.cpp) +target_link_libraries(distributed_addition + PRIVATE df_interface + PRIVATE ts_type + PRIVATE df_operation + PRIVATE ts_array + PRIVATE ts_matrix) + +add_executable(distributed_vector distributed_vector.cpp) +target_link_libraries(distributed_vector + PRIVATE df_interface + PRIVATE ts_type) + add_executable(multinode_regression multinode_regression.cpp) target_link_libraries(multinode_regression PRIVATE df_interface diff --git a/tests/distributed_addition.cpp b/tests/distributed_addition.cpp new file mode 100644 index 0000000..94c6208 --- /dev/null +++ b/tests/distributed_addition.cpp @@ -0,0 +1,62 @@ +#include "../df_interface.h" +#include "type_system/types/ts_primitive.h" + +#include +#include +#include + +int main() { + int ns = 1; // Namespace + system("sudo find . -name \"lmr*\" -delete"); + laminar_reset(); /* reset setup data structures */ + laminar_init(); + + // Set up single device on localhost + + set_host(2); + int curr_host_id = 2; + add_host(1, "172.31.31.200", "/cspot-device-namespace/"); + add_host(2, "172.31.26.11", "/cspot-device-namespace/"); + + // Nodes is on device 1: (namespace, host, node) + + add_node(ns, 1, 1, {.category = DF_ARITHMETIC, .operation = DF_ARITH_ADDITION}); // a + b + + // Inputs is shared on both devices + + add_operand(ns, 1, 2); // a + add_operand(ns, 2, 3); // b + + // Edges: "namespace:dest_node:operand_num", "namespace:source_node" + subscribe("1:1:0", "1:2"); // ADD[0] <-- a + subscribe("1:1:1", "1:3"); // ADD[1] <-- b + + // Optional: Autogenerate diagram + std::cout << graphviz_representation() << std::endl; + // exit(0); + + laminar_setup(); + if(curr_host_id == 1) { + struct ts_value value_a{}; + set_double(&value_a, 1); + operand op_a(&value_a); + fire_operand(ns, 2, &op_a); + std::cout << "Completed input " << std::endl; + } + else { + struct ts_value value_b{}; + set_double(&value_b, 2); + operand op_b(&value_b); + fire_operand(ns, 3, &op_b); + std::cout << "Completed input " << std::endl; + } + + operand result; + int err = get_result(ns, 1, &result, 1); + if (err < 0) { + std::cout << "Failed to read the result " << std::endl; + } + std::cout << "Result: " << result.operand_value.value.ts_double << std::endl; + + return 0; +} \ No newline at end of file diff --git a/tests/distributed_vector.cpp b/tests/distributed_vector.cpp new file mode 100644 index 0000000..4d21f26 --- /dev/null +++ b/tests/distributed_vector.cpp @@ -0,0 +1,85 @@ +#include "../df_interface.h" +#include "type_system/ts_type.h" +#include "type_system/types/ts_primitive.h" +#include "type_system/types/ts_array.h" + +#include +#include +#include +#include + +int main() { + int ns = 1; // Namespace + system("sudo find . -name \"lmr*\" -delete"); + laminar_reset(); /* reset setup data structures */ + set_host(1); + add_host(1, "localhost", "/cspot-device-namespace/"); + laminar_init(); + + // add a node that does nothing + add_node(ns, 1, 1, {DF_CUSTOM, VECTOR_DOT_PRODUCT}); + + // single input + add_operand(ns, 1, 2); // a + add_operand(ns, 1, 3); // b + + subscribe(ns, 1, 0, ns, 2); + subscribe(ns, 1, 1, ns, 3); + + laminar_setup(); + + std::cout << "Vinayak1" << std::endl; + + // firing array + int32_t a[] = {1, 2, 3}; + int32_t b[] = {2, 3, 4}; + struct ts_value* operand_value1 = value_from_integer_array(a, 3); + std::cout << "V1" << std::endl; + for(int i = 0; i < 3; i++){ + std::cout << ((int*)(operand_value1->value.ts_array.value))[i] << std::endl; + } + operand op1(operand_value1, 1); + for(int i = 0; i < 3; i++){ + std::cout << ((int*)(op1.operand_value.value.ts_array.value))[i] << std::endl; + } + if(!write_value(&op1.operand_value)){ + std::cout << "Issue with write value" << std::endl; + } + std::cout << "V2" << std::endl; + for(int i = 0; i < 3; i++){ + std::cout << ((int*)(op1.operand_value.value.ts_array.value))[i] << std::endl; + } + fire_operand(ns, 2, &op1); + value_deep_delete(operand_value1); + + + struct ts_value* operand_value2 = value_from_integer_array(b, 3); + std::cout << "V1" << std::endl; + for(int i = 0; i < 3; i++){ + std::cout << ((int*)(operand_value2->value.ts_array.value))[i] << std::endl; + } + operand op2(operand_value2, 1); + for(int i = 0; i < 3; i++){ + std::cout << ((int*)(op2.operand_value.value.ts_array.value))[i] << std::endl; + } + if(!write_value(&op2.operand_value)){ + std::cout << "Issue with write value" << std::endl; + } + std::cout << "V2" << std::endl; + for(int i = 0; i < 3; i++){ + std::cout << ((int*)(op2.operand_value.value.ts_array.value))[i] << std::endl; + } + fire_operand(ns, 3, &op2); + value_deep_delete(operand_value2); + + std::cout << "Animesh2" << std::endl; + + // get value from woof + operand result; + get_result(ns, 1, &result, 1); + std::cout << result.operand_value.type << std::endl; + std::cout << result.itr << std::endl; + std::cout << result.operand_value.value.ts_int << std::endl; + + return 0; +} \ No newline at end of file From e2953f7c48cfdc0235b57d4b45260a2257a2fea8 Mon Sep 17 00:00:00 2001 From: Ononymous Date: Sun, 19 Jan 2025 18:08:04 -0800 Subject: [PATCH 02/20] distributed vector work!! --- .vscode/settings.json | 74 ++++++++++++++++++++++++++++++++++ tests/CMakeLists.txt | 6 ++- tests/distributed_vector.cpp | 78 +++++++++--------------------------- 3 files changed, 99 insertions(+), 59 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..f8af820 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,74 @@ +{ + "files.associations": { + "*.hpp": "cpp", + "array": "cpp", + "atomic": "cpp", + "bit": "cpp", + "*.tcc": "cpp", + "bitset": "cpp", + "cctype": "cpp", + "chrono": "cpp", + "clocale": "cpp", + "cmath": "cpp", + "codecvt": "cpp", + "compare": "cpp", + "concepts": "cpp", + "condition_variable": "cpp", + "cstdarg": "cpp", + "cstddef": "cpp", + "cstdint": "cpp", + "cstdio": "cpp", + "cstdlib": "cpp", + "cstring": "cpp", + "ctime": "cpp", + "cwchar": "cpp", + "cwctype": "cpp", + "deque": "cpp", + "list": "cpp", + "map": "cpp", + "set": "cpp", + "string": "cpp", + "unordered_map": "cpp", + "unordered_set": "cpp", + "vector": "cpp", + "exception": "cpp", + "algorithm": "cpp", + "functional": "cpp", + "iterator": "cpp", + "memory": "cpp", + "memory_resource": "cpp", + "numeric": "cpp", + "optional": "cpp", + "random": "cpp", + "ratio": "cpp", + "regex": "cpp", + "string_view": "cpp", + "system_error": "cpp", + "tuple": "cpp", + "type_traits": "cpp", + "utility": "cpp", + "fstream": "cpp", + "future": "cpp", + "initializer_list": "cpp", + "iomanip": "cpp", + "iosfwd": "cpp", + "iostream": "cpp", + "istream": "cpp", + "limits": "cpp", + "mutex": "cpp", + "new": "cpp", + "numbers": "cpp", + "ostream": "cpp", + "semaphore": "cpp", + "shared_mutex": "cpp", + "sstream": "cpp", + "stdexcept": "cpp", + "stop_token": "cpp", + "streambuf": "cpp", + "thread": "cpp", + "cinttypes": "cpp", + "typeinfo": "cpp", + "valarray": "cpp", + "variant": "cpp" + } +} \ No newline at end of file diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index b9eea30..e46b315 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -11,7 +11,11 @@ target_link_libraries(distributed_addition add_executable(distributed_vector distributed_vector.cpp) target_link_libraries(distributed_vector PRIVATE df_interface - PRIVATE ts_type) + PRIVATE ts_type + PRIVATE df_operation + PRIVATE ts_array + PRIVATE ts_matrix + PRIVATE ts_string) add_executable(multinode_regression multinode_regression.cpp) target_link_libraries(multinode_regression diff --git a/tests/distributed_vector.cpp b/tests/distributed_vector.cpp index 4d21f26..a9aeef4 100644 --- a/tests/distributed_vector.cpp +++ b/tests/distributed_vector.cpp @@ -1,7 +1,9 @@ #include "../df_interface.h" #include "type_system/ts_type.h" -#include "type_system/types/ts_primitive.h" #include "type_system/types/ts_array.h" +#include "type_system/types/ts_matrix.h" +#include "type_system/types/ts_primitive.h" +#include "type_system/types/ts_string.h" #include #include @@ -13,73 +15,33 @@ int main() { system("sudo find . -name \"lmr*\" -delete"); laminar_reset(); /* reset setup data structures */ set_host(1); - add_host(1, "localhost", "/cspot-device-namespace/"); + add_host(1, "localhost", "/home/ubuntu/laminar/build/bin"); laminar_init(); - // add a node that does nothing - add_node(ns, 1, 1, {DF_CUSTOM, VECTOR_DOT_PRODUCT}); - - // single input - add_operand(ns, 1, 2); // a - add_operand(ns, 1, 3); // b + const struct df_operation parse = {DF_INTERNAL, DF_INTERNAL_NOOP}; + add_node(ns, 1, 1, parse); + add_operand(ns, 1, 2); subscribe(ns, 1, 0, ns, 2); - subscribe(ns, 1, 1, ns, 3); - laminar_setup(); - std::cout << "Vinayak1" << std::endl; + uint8_t array[] = {1, 2, 3}; + struct ts_value* operand_value = value_from_unsigned_byte_array(array, 3); + operand op(operand_value, 1); + write_value(&op.operand_value); + fire_operand(ns, 2, &op); + value_deep_delete(operand_value); - // firing array - int32_t a[] = {1, 2, 3}; - int32_t b[] = {2, 3, 4}; - struct ts_value* operand_value1 = value_from_integer_array(a, 3); - std::cout << "V1" << std::endl; - for(int i = 0; i < 3; i++){ - std::cout << ((int*)(operand_value1->value.ts_array.value))[i] << std::endl; - } - operand op1(operand_value1, 1); - for(int i = 0; i < 3; i++){ - std::cout << ((int*)(op1.operand_value.value.ts_array.value))[i] << std::endl; - } - if(!write_value(&op1.operand_value)){ - std::cout << "Issue with write value" << std::endl; - } - std::cout << "V2" << std::endl; - for(int i = 0; i < 3; i++){ - std::cout << ((int*)(op1.operand_value.value.ts_array.value))[i] << std::endl; - } - fire_operand(ns, 2, &op1); - value_deep_delete(operand_value1); + operand result; + get_result(ns, 1, &result, 1); + ts_value* loaded_result = load_value(&result.operand_value); + uint8_t result_array[3]; + get_unsigned_byte_array(result_array, loaded_result); - struct ts_value* operand_value2 = value_from_integer_array(b, 3); - std::cout << "V1" << std::endl; - for(int i = 0; i < 3; i++){ - std::cout << ((int*)(operand_value2->value.ts_array.value))[i] << std::endl; - } - operand op2(operand_value2, 1); - for(int i = 0; i < 3; i++){ - std::cout << ((int*)(op2.operand_value.value.ts_array.value))[i] << std::endl; - } - if(!write_value(&op2.operand_value)){ - std::cout << "Issue with write value" << std::endl; - } - std::cout << "V2" << std::endl; - for(int i = 0; i < 3; i++){ - std::cout << ((int*)(op2.operand_value.value.ts_array.value))[i] << std::endl; + for(int i = 0; i < sizeof(result_array); i++){ + std::cout << (int)result_array[i] << std::endl; } - fire_operand(ns, 3, &op2); - value_deep_delete(operand_value2); - - std::cout << "Animesh2" << std::endl; - - // get value from woof - operand result; - get_result(ns, 1, &result, 1); - std::cout << result.operand_value.type << std::endl; - std::cout << result.itr << std::endl; - std::cout << result.operand_value.value.ts_int << std::endl; return 0; } \ No newline at end of file From b17eedf7dd5210fedd012bf12fd80cf14840e0eb Mon Sep 17 00:00:00 2001 From: Ononymous Date: Sun, 19 Jan 2025 18:38:28 -0800 Subject: [PATCH 03/20] made distributed vector distributed --- tests/distributed_vector.cpp | 46 +++++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/tests/distributed_vector.cpp b/tests/distributed_vector.cpp index a9aeef4..52940d7 100644 --- a/tests/distributed_vector.cpp +++ b/tests/distributed_vector.cpp @@ -14,8 +14,11 @@ int main() { int ns = 1; // Namespace system("sudo find . -name \"lmr*\" -delete"); laminar_reset(); /* reset setup data structures */ + set_host(1); - add_host(1, "localhost", "/home/ubuntu/laminar/build/bin"); + int curr_host_id = 1; + add_host(1, "172.31.26.11", "/home/ubuntu/laminar/build/bin"); + add_host(2, "172.31.31.200", "/home/ubuntu/laminar/build/bin"); laminar_init(); const struct df_operation parse = {DF_INTERNAL, DF_INTERNAL_NOOP}; @@ -25,22 +28,37 @@ int main() { subscribe(ns, 1, 0, ns, 2); laminar_setup(); - uint8_t array[] = {1, 2, 3}; - struct ts_value* operand_value = value_from_unsigned_byte_array(array, 3); - operand op(operand_value, 1); - write_value(&op.operand_value); - fire_operand(ns, 2, &op); - value_deep_delete(operand_value); + if (curr_host_id == 1){ + uint8_t array[] = {1, 2, 3}; + struct ts_value* operand_value = value_from_unsigned_byte_array(array, 3); + operand op(operand_value, 1); + write_value(&op.operand_value); + fire_operand(ns, 2, &op); + value_deep_delete(operand_value); + + + operand result; + get_result(ns, 2, &result, 1); + ts_value* loaded_result = load_value(&result.operand_value); - operand result; - get_result(ns, 1, &result, 1); - ts_value* loaded_result = load_value(&result.operand_value); + uint8_t result_array[3]; + get_unsigned_byte_array(result_array, loaded_result); + + for(int i = 0; i < sizeof(result_array); i++){ + std::cout << (int)result_array[i] << std::endl; + } + } + else{ + operand result; + get_result(ns, 2, &result, 1); + ts_value* loaded_result = load_value(&result.operand_value); - uint8_t result_array[3]; - get_unsigned_byte_array(result_array, loaded_result); + uint8_t result_array[3]; + get_unsigned_byte_array(result_array, loaded_result); - for(int i = 0; i < sizeof(result_array); i++){ - std::cout << (int)result_array[i] << std::endl; + for(int i = 0; i < sizeof(result_array); i++){ + std::cout << (int)result_array[i] << std::endl; + } } return 0; From fc1a5bd3217a7a2631fefa09d3f5779fcff4a7ae Mon Sep 17 00:00:00 2001 From: Ononymous Date: Sun, 19 Jan 2025 18:46:03 -0800 Subject: [PATCH 04/20] git ignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c795b05 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +build \ No newline at end of file From c05493a55b4f11cc5c03e164936ba1cdfa5b997f Mon Sep 17 00:00:00 2001 From: Ononymous Date: Sun, 19 Jan 2025 18:58:42 -0800 Subject: [PATCH 05/20] changed cur host id --- tests/distributed_vector.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/distributed_vector.cpp b/tests/distributed_vector.cpp index 52940d7..96a6dab 100644 --- a/tests/distributed_vector.cpp +++ b/tests/distributed_vector.cpp @@ -15,8 +15,8 @@ int main() { system("sudo find . -name \"lmr*\" -delete"); laminar_reset(); /* reset setup data structures */ - set_host(1); - int curr_host_id = 1; + set_host(2); + int curr_host_id = 2; add_host(1, "172.31.26.11", "/home/ubuntu/laminar/build/bin"); add_host(2, "172.31.31.200", "/home/ubuntu/laminar/build/bin"); laminar_init(); From f231b6716cd0c7a80fa0c0554db8cda572128e1c Mon Sep 17 00:00:00 2001 From: Ononymous Date: Sun, 26 Jan 2025 14:56:06 -0800 Subject: [PATCH 06/20] fixed distributed --- tests/distributed_addition.cpp | 6 +++--- tests/distributed_simple_laminar_example.cpp | 7 ++++--- tests/distributed_vector.cpp | 12 +++++++++--- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/tests/distributed_addition.cpp b/tests/distributed_addition.cpp index 94c6208..a5c0903 100644 --- a/tests/distributed_addition.cpp +++ b/tests/distributed_addition.cpp @@ -13,10 +13,10 @@ int main() { // Set up single device on localhost - set_host(2); int curr_host_id = 2; - add_host(1, "172.31.31.200", "/cspot-device-namespace/"); - add_host(2, "172.31.26.11", "/cspot-device-namespace/"); + set_host(curr_host_id); + add_host(1, "169.231.230.190", "/home/ubuntu/laminar/build/bin/"); + add_host(2, "169.231.230.3", "/home/ubuntu/laminar/build/bin/"); // Nodes is on device 1: (namespace, host, node) diff --git a/tests/distributed_simple_laminar_example.cpp b/tests/distributed_simple_laminar_example.cpp index fd2ca23..e22421c 100644 --- a/tests/distributed_simple_laminar_example.cpp +++ b/tests/distributed_simple_laminar_example.cpp @@ -16,14 +16,15 @@ int main() { int ns = 1; // Laminar Namespace (not CSPOT's) - + system("sudo find . -name \"lmr*\" -delete"); + laminar_reset(); /* reset setup data structures */ laminar_init(); // Set up two devices (change IPs and/or cspot namespaces) int curr_host_id = 2; set_host(curr_host_id); - add_host(1, "169.231.230.183", "/cspot-device-namespace/"); - add_host(2, "169.231.230.225", "/cspot-device-namespace/"); + add_host(1, "169.231.230.190", "/home/ubuntu/laminar/build/bin/"); + add_host(2, "169.231.230.3", "/home/ubuntu/laminar/build/bin/"); // Nodes diff --git a/tests/distributed_vector.cpp b/tests/distributed_vector.cpp index 96a6dab..903fdba 100644 --- a/tests/distributed_vector.cpp +++ b/tests/distributed_vector.cpp @@ -15,10 +15,10 @@ int main() { system("sudo find . -name \"lmr*\" -delete"); laminar_reset(); /* reset setup data structures */ - set_host(2); int curr_host_id = 2; - add_host(1, "172.31.26.11", "/home/ubuntu/laminar/build/bin"); - add_host(2, "172.31.31.200", "/home/ubuntu/laminar/build/bin"); + set_host(curr_host_id); + add_host(1, "169.231.230.190", "/home/ubuntu/laminar/build/bin/"); + add_host(2, "169.231.230.3", "/home/ubuntu/laminar/build/bin/"); laminar_init(); const struct df_operation parse = {DF_INTERNAL, DF_INTERNAL_NOOP}; @@ -51,10 +51,16 @@ int main() { else{ operand result; get_result(ns, 2, &result, 1); + std::cout << "hi1" << std::endl; ts_value* loaded_result = load_value(&result.operand_value); + std::cout << "hi2" << std::endl; + if(loaded_result == NULL){ + std::cout << "null"< Date: Sat, 1 Feb 2025 18:18:35 -0800 Subject: [PATCH 07/20] hardcoded output for now --- df_interface.cpp | 10 ++++++++++ .../include/logger_system/df_logger_settings.h | 2 +- tests/simple_laminar_example.cpp | 3 ++- type_system/include/type_system/type_system/ts_types.h | 1 + type_system/src/ts_type.c | 9 +++++++-- 5 files changed, 21 insertions(+), 4 deletions(-) diff --git a/df_interface.cpp b/df_interface.cpp index 473c3df..6cc9c87 100644 --- a/df_interface.cpp +++ b/df_interface.cpp @@ -347,6 +347,16 @@ printf("fire_operand: output_woof: %s, value: %f\n", int get_result(const int ns, const int id, operand* const res, const unsigned long itr) { std::string woof_name = generate_woof_path(OUT_WF_TYPE, ns, id); + // size_t pos = woof_name.find_last_of('/'); + // std::string uri; + // if (pos != std::string::npos) { + // uri = woof_name.substr(0, pos + 1); // Keep up to and including the '/' + // } else { + // uri = woof_name; // No '/' found, keep the original string + // } + + // std::cout << "gengen11 " << uri << std::endl; + // wait till output log has atleast itr number of results while (woof_last_seq(woof_name) < itr) {} //while(WooFGetLatestSeqno(woof_name.c_str()) < itr){} diff --git a/logger_system/include/logger_system/df_logger_settings.h b/logger_system/include/logger_system/df_logger_settings.h index ff48285..799100c 100644 --- a/logger_system/include/logger_system/df_logger_settings.h +++ b/logger_system/include/logger_system/df_logger_settings.h @@ -9,6 +9,6 @@ #define DIRECT_FLUSH #include "df_logger.h" -enum LOG_LEVELS CURRENT_LOG_LEVEL = DF_WARN; +enum LOG_LEVELS CURRENT_LOG_LEVEL = DF_TRACE; #endif // CSPOT_APPS_DF_LOGGER_SETTINGS_H diff --git a/tests/simple_laminar_example.cpp b/tests/simple_laminar_example.cpp index 41277a9..44240e6 100644 --- a/tests/simple_laminar_example.cpp +++ b/tests/simple_laminar_example.cpp @@ -11,7 +11,8 @@ int main() { int ns = 1; // Namespace int hd = 1; // Host device - + system("sudo find . -name \"lmr*\" -delete"); + laminar_reset(); /* reset setup data structures */ laminar_init(); // Set up single device on localhost diff --git a/type_system/include/type_system/type_system/ts_types.h b/type_system/include/type_system/type_system/ts_types.h index 6947364..f47f3f4 100644 --- a/type_system/include/type_system/type_system/ts_types.h +++ b/type_system/include/type_system/type_system/ts_types.h @@ -78,6 +78,7 @@ struct ts_storage_system { uint8_t id[16]; // Unique identifier for composite items. Can but must not be used in combination with the // uuid.h library. size_t element_size; // Absolute element size of composite item in bytes. + // char uri[200]; }; diff --git a/type_system/src/ts_type.c b/type_system/src/ts_type.c index 982f18c..8399628 100644 --- a/type_system/src/ts_type.c +++ b/type_system/src/ts_type.c @@ -169,13 +169,18 @@ void* woof_get_array_value(const char* const woof_id, size_t value_size, const s } bool load_array_value(struct ts_value_array* const array) { // NOLINT(misc-no-recursion) - char woof_id[100]; - strcpy(woof_id, TS_STORAGE_PREFIX); + char woof_id[200]; + // STRATEGY + // strcpy(woof_id, array->storage_system.uri); + strcpy(woof_id, "woof://169.231.230.190/home/ubuntu/laminar/build/bin/"); + strcat(woof_id, TS_STORAGE_PREFIX); strcat(woof_id, "-array-"); char uuid_string[UUID_STR_LEN]; uuid_unparse_lower(array->storage_system.id, uuid_string); strcat(woof_id, uuid_string); + printf("gengen6 %s\n",woof_id); + const unsigned long index = WooFGetLatestSeqno(woof_id); // Stop loading if WooF does not exist or no entry is written From 2ac87b317cb549a8b39739483c5552c87e0e2d4e Mon Sep 17 00:00:00 2001 From: Ononymous Date: Thu, 20 Feb 2025 12:08:03 -0800 Subject: [PATCH 08/20] new test --- tests/distributed_vector2.cpp | 66 +++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 tests/distributed_vector2.cpp diff --git a/tests/distributed_vector2.cpp b/tests/distributed_vector2.cpp new file mode 100644 index 0000000..017ada4 --- /dev/null +++ b/tests/distributed_vector2.cpp @@ -0,0 +1,66 @@ +#include "../df_interface.h" +#include "type_system/ts_type.h" +#include "type_system/types/ts_array.h" +#include "type_system/types/ts_matrix.h" +#include "type_system/types/ts_primitive.h" +#include "type_system/types/ts_string.h" + +#include +#include +#include +#include + +int main() { + int ns = 1; // Namespace + system("sudo find . -name \"lmr*\" -delete"); + laminar_reset(); /* reset setup data structures */ + + int curr_host_id = 2; + set_host(curr_host_id); + add_host(1, "169.231.230.190", "/home/ubuntu/laminar/build/bin/"); + add_host(2, "169.231.230.3", "/home/ubuntu/laminar/build/bin/"); + laminar_init(); + + const struct df_operation parse = {DF_INTERNAL, DF_INTERNAL_NOOP}; + add_operand(ns, 1, 1); + add_node(ns, 1, 2, parse); + add_node(ns, 2, 3, parse); + + subscribe(ns, 2, 0, ns, 1); + subscribe(ns, 3, 0, ns, 2); + laminar_setup(); + + if (curr_host_id == 1){ + uint8_t array[] = {1, 2, 3}; + struct ts_value* operand_value = value_from_unsigned_byte_array(array, 3); + operand op(operand_value, 1); + write_value(&op.operand_value); + fire_operand(ns, 1, &op); + value_deep_delete(operand_value); + + operand result; + get_result(ns, 3, &result, 1); + ts_value* loaded_result = load_value(&result.operand_value); + + uint8_t result_array[3]; + get_unsigned_byte_array(result_array, loaded_result); + + for(int i = 0; i < sizeof(result_array); i++){ + std::cout << (int)result_array[i] << std::endl; + } + } + else{ + operand result; + get_result(ns, 2, &result, 1); + ts_value* loaded_result = load_value(&result.operand_value); + + uint8_t result_array[3]; + get_unsigned_byte_array(result_array, loaded_result); + + for(int i = 0; i < sizeof(result_array); i++){ + std::cout << (int)result_array[i] << std::endl; + } + } + + return 0; +} \ No newline at end of file From 7e71af9dbb2b1f047518e48eb5426ff0831fbcb2 Mon Sep 17 00:00:00 2001 From: Ononymous Date: Thu, 20 Feb 2025 14:01:05 -0800 Subject: [PATCH 09/20] all data pass works, need to implement wait --- df.h | 2 + df_interface.cpp | 95 +++++++++++++++++-- df_interface.h | 2 + subscription_event_handler.cpp | 16 +++- tests/CMakeLists.txt | 9 ++ tests/distributed_vector.cpp | 6 +- tests/distributed_vector2.cpp | 4 +- .../include/type_system/type_system/ts_type.h | 3 + type_system/src/ts_type.c | 26 ++--- 9 files changed, 134 insertions(+), 29 deletions(-) diff --git a/df.h b/df.h index b84c1ec..8c2ae83 100644 --- a/df.h +++ b/df.h @@ -5,9 +5,11 @@ #define CSPOTDEVICE #include "df_operations.h" #include "ts_types.h" +#include "ts_type.h" #else #include "operation_system/df_operations.h" #include "type_system/ts_types.h" +#include "type_system/ts_type.h" #endif #include diff --git a/df_interface.cpp b/df_interface.cpp index 6cc9c87..79772f0 100644 --- a/df_interface.cpp +++ b/df_interface.cpp @@ -347,16 +347,6 @@ printf("fire_operand: output_woof: %s, value: %f\n", int get_result(const int ns, const int id, operand* const res, const unsigned long itr) { std::string woof_name = generate_woof_path(OUT_WF_TYPE, ns, id); - // size_t pos = woof_name.find_last_of('/'); - // std::string uri; - // if (pos != std::string::npos) { - // uri = woof_name.substr(0, pos + 1); // Keep up to and including the '/' - // } else { - // uri = woof_name; // No '/' found, keep the original string - // } - - // std::cout << "gengen11 " << uri << std::endl; - // wait till output log has atleast itr number of results while (woof_last_seq(woof_name) < itr) {} //while(WooFGetLatestSeqno(woof_name.c_str()) < itr){} @@ -569,3 +559,88 @@ std::string graphviz_representation() { return g; } + +struct ts_value* load_value(const struct ts_value* const unloaded_value, int ns, int id){ + std::string woof_name = generate_woof_path(OUT_WF_TYPE, ns, id); + + size_t pos = woof_name.find_last_of('/'); + std::string uri; + if (pos != std::string::npos) { + uri = woof_name.substr(0, pos + 1); // Keep up to and including the '/' + } else { + uri = ""; + } + + struct ts_value* return_value = new ts_value; + + return_value->type = unloaded_value->type; + switch (unloaded_value->type) { + case TS_UNINITIALIZED: + case TS_UNKNOWN: { + free(return_value); + return NULL; + } + case TS_ERROR: + case TS_EXCEPTION: + case TS_BOOLEAN: + case TS_BYTE: + case TS_SHORT: + case TS_INTEGER: + case TS_LONG: + case TS_UNSIGNED_BYTE: + case TS_UNSIGNED_SHORT: + case TS_UNSIGNED_INTEGER: + case TS_UNSIGNED_LONG: + case TS_FLOAT: + case TS_DOUBLE: + case TS_TIMESTAMP: + case TS_PRIM_STRING: { + if (!value_deep_set(unloaded_value, return_value)) { + free(return_value); + return NULL; + } + } break; + case TS_PRIM_LARGE_STRING: { + if (!value_deep_set(unloaded_value, return_value)) { + free(return_value); + return NULL; + } + } break; + case TS_PRIM_2D_DOUBLE_ARRAY: { + for (size_t i = 0; i < TS_PRIM_2D_DOUBLE_ARRAY_ROWS; i++) { + for (size_t j = 0; j < TS_PRIM_2D_DOUBLE_ARRAY_COLS; j++) { + return_value->value.ts_prim_2d_double_array[i][j] = unloaded_value->value.ts_prim_2d_double_array[i][j]; + } + } + } break; + case TS_PRIM_DOUBLE_ARRAY: { + if (!value_deep_set(unloaded_value, return_value)) { + free(return_value); + return NULL; + } + } break; +#ifndef ESP8266 + case TS_STRING: { + // assume always local + return_value->value.ts_string = unloaded_value->value.ts_string; + if (!load_string_value(&return_value->value.ts_string, uri.c_str())) { + free(return_value); + return NULL; + } + } break; + case TS_ARRAY: { + // assume always local + return_value->value.ts_array = unloaded_value->value.ts_array; + if (!load_array_value(&return_value->value.ts_array, uri.c_str())) { + free(return_value); + return NULL; + } + } break; +#endif + default: { + free(return_value); + return NULL; + } + } + return return_value; +} \ No newline at end of file diff --git a/df_interface.h b/df_interface.h index 17a1bf7..7e6dbf3 100644 --- a/df_interface.h +++ b/df_interface.h @@ -45,4 +45,6 @@ std::string generate_woof_host_url(int host_id); std::string graphviz_representation(); +struct ts_value* load_value(const struct ts_value* const unloaded_value, int ns, int id); + #endif // DF_INTERFACE_H diff --git a/subscription_event_handler.cpp b/subscription_event_handler.cpp index 3a2e855..95a1820 100644 --- a/subscription_event_handler.cpp +++ b/subscription_event_handler.cpp @@ -20,12 +20,20 @@ operand perform_operation(const std::vector& operands, const struct df_operation operation, - struct df_operation_metadata* const operation_metadata) { + struct df_operation_metadata* const operation_metadata, + const std::vector& input_hosts) { + for(int i = 0; i < input_hosts.size(); i++){ + log_debug("[gengen] [namespace:%d][node_id:%d] %s", + input_hosts[i].ns, + input_hosts[i].id, + generate_woof_path(OUT_WF_TYPE, input_hosts[i].ns, input_hosts[i].id)); + std::cout << generate_woof_path(OUT_WF_TYPE, input_hosts[i].ns, input_hosts[i].id) << std::endl; + } const unsigned long operand_count = operands.size(); struct ts_value* operands_array[operand_count]; const struct ts_value* const_operands_array[operand_count]; for (size_t i = 0; i < operand_count; ++i) { - operands_array[i] = load_value(&operands[i].operand_value); + operands_array[i] = load_value(&operands[i].operand_value, input_hosts[i].ns, input_hosts[i].id); const_operands_array[i] = operands_array[i]; if (const_operands_array[i] == nullptr) { log_error("Could not load value [input:%lu]", i); @@ -157,6 +165,7 @@ extern "C" int subscription_event_handler(WOOF* wf, unsigned long seqno, void* p log_debug("[namespace:%d][node_id:%d] has %lu inputs", node_namespace, node_id, input_count); // Scan through subscription outputs and collect operands std::vector op_values(input_count); + std::vector input_hosts(input_count); for (unsigned long input_index = 0; input_index < input_count; input_index++) { log_debug("[input:%lu] START input processing", input_index); // Get last used output and seqno for this port @@ -203,6 +212,7 @@ extern "C" int subscription_event_handler(WOOF* wf, unsigned long seqno, void* p input_subscription.ns, input_subscription.id, input_subscription.port); + input_hosts[input_index] = input_subscription; // Get relevant operand from subscription output (if it exists) std::string subscription_output_woof = @@ -441,7 +451,7 @@ extern "C" int subscription_event_handler(WOOF* wf, unsigned long seqno, void* p .write_value = true, .output_woof_path = output_woof.c_str()}}; log_debug("Firing operation %d\n",n.id); - operand result = perform_operation(op_values, n.operation, &operation_metadata); + operand result = perform_operation(op_values, n.operation, &operation_metadata, input_hosts); log_debug("Fired operation %d\n",n.id); const unsigned long long last_output_sequence_number = (unsigned long long)woof_last_seq(output_woof); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e46b315..211ff19 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -17,6 +17,15 @@ target_link_libraries(distributed_vector PRIVATE ts_matrix PRIVATE ts_string) +add_executable(distributed_vector2 distributed_vector2.cpp) +target_link_libraries(distributed_vector2 + PRIVATE df_interface + PRIVATE ts_type + PRIVATE df_operation + PRIVATE ts_array + PRIVATE ts_matrix + PRIVATE ts_string) + add_executable(multinode_regression multinode_regression.cpp) target_link_libraries(multinode_regression PRIVATE df_interface diff --git a/tests/distributed_vector.cpp b/tests/distributed_vector.cpp index 903fdba..9011e16 100644 --- a/tests/distributed_vector.cpp +++ b/tests/distributed_vector.cpp @@ -38,7 +38,7 @@ int main() { operand result; - get_result(ns, 2, &result, 1); + get_result(ns, 1, &result, 1); ts_value* loaded_result = load_value(&result.operand_value); uint8_t result_array[3]; @@ -50,9 +50,9 @@ int main() { } else{ operand result; - get_result(ns, 2, &result, 1); + get_result(ns, 1, &result, 1); std::cout << "hi1" << std::endl; - ts_value* loaded_result = load_value(&result.operand_value); + ts_value* loaded_result = load_value(&result.operand_value, ns, 1); std::cout << "hi2" << std::endl; if(loaded_result == NULL){ std::cout << "null"<value.ts_string = unloaded_value->value.ts_string; - if (!load_string_value(&return_value->value.ts_string)) { + if (!load_string_value(&return_value->value.ts_string, NULL)) { free(return_value); return NULL; } } break; case TS_ARRAY: { + // assume always local return_value->value.ts_array = unloaded_value->value.ts_array; - if (!load_array_value(&return_value->value.ts_array)) { + if (!load_array_value(&return_value->value.ts_array, NULL)) { free(return_value); return NULL; } @@ -128,7 +128,7 @@ struct ts_value* load_value(const struct ts_value* const unloaded_value) { } #ifndef ESP8266 -bool load_string_value(struct ts_value_string* const string) { +bool load_string_value(struct ts_value_string* const string, const char* uri) { char woof_id[100]; strcpy(woof_id, TS_STORAGE_PREFIX); strcat(woof_id, "-string"); @@ -168,12 +168,16 @@ void* woof_get_array_value(const char* const woof_id, size_t value_size, const s return array_value; } -bool load_array_value(struct ts_value_array* const array) { // NOLINT(misc-no-recursion) +bool load_array_value(struct ts_value_array* const array, const char* uri) { // NOLINT(misc-no-recursion) char woof_id[200]; // STRATEGY - // strcpy(woof_id, array->storage_system.uri); - strcpy(woof_id, "woof://169.231.230.190/home/ubuntu/laminar/build/bin/"); - strcat(woof_id, TS_STORAGE_PREFIX); + if (uri){ + strcpy(woof_id, uri); + strcat(woof_id, TS_STORAGE_PREFIX); + } + else{ + strcpy(woof_id, TS_STORAGE_PREFIX); + } strcat(woof_id, "-array-"); char uuid_string[UUID_STR_LEN]; uuid_unparse_lower(array->storage_system.id, uuid_string); @@ -230,7 +234,7 @@ bool load_array_value(struct ts_value_array* const array) { // NOLINT(misc-no-re if (array->value != NULL) { for (size_t i = 0; i < array->size; i++) { struct ts_value_string* const array_element = (struct ts_value_string*)array->value; - const bool is_element_loaded = load_string_value(&array_element[i]); + const bool is_element_loaded = load_string_value(&array_element[i], uri); if (!is_element_loaded) { // delete all previous allocated elements for (size_t j = 0; j < i; j++) { @@ -246,7 +250,7 @@ bool load_array_value(struct ts_value_array* const array) { // NOLINT(misc-no-re if (array->value != NULL) { for (size_t i = 0; i < array->size; i++) { struct ts_value_array* const array_element = (struct ts_value_array*)array->value; - const bool is_element_loaded = load_array_value(&array_element[i]); + const bool is_element_loaded = load_array_value(&array_element[i], uri); if (!is_element_loaded) { // delete all previous allocated elements for (size_t j = 0; j < i; j++) { From 0117f9acc0751e3be28134e10f5d591600061295 Mon Sep 17 00:00:00 2001 From: Ononymous Date: Thu, 20 Feb 2025 14:56:18 -0800 Subject: [PATCH 10/20] fixed get_result wait, removed prints --- df_interface.cpp | 1 + subscription_event_handler.cpp | 14 +++++++------- type_system/src/ts_type.c | 12 +++++------- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/df_interface.cpp b/df_interface.cpp index 79772f0..803deae 100644 --- a/df_interface.cpp +++ b/df_interface.cpp @@ -348,6 +348,7 @@ int get_result(const int ns, const int id, operand* const res, const unsigned lo std::string woof_name = generate_woof_path(OUT_WF_TYPE, ns, id); // wait till output log has atleast itr number of results + while (WooFInvalid(woof_last_seq(woof_name))) {} while (woof_last_seq(woof_name) < itr) {} //while(WooFGetLatestSeqno(woof_name.c_str()) < itr){} diff --git a/subscription_event_handler.cpp b/subscription_event_handler.cpp index 95a1820..619e7da 100644 --- a/subscription_event_handler.cpp +++ b/subscription_event_handler.cpp @@ -22,13 +22,13 @@ operand perform_operation(const std::vector& operands, const struct df_operation operation, struct df_operation_metadata* const operation_metadata, const std::vector& input_hosts) { - for(int i = 0; i < input_hosts.size(); i++){ - log_debug("[gengen] [namespace:%d][node_id:%d] %s", - input_hosts[i].ns, - input_hosts[i].id, - generate_woof_path(OUT_WF_TYPE, input_hosts[i].ns, input_hosts[i].id)); - std::cout << generate_woof_path(OUT_WF_TYPE, input_hosts[i].ns, input_hosts[i].id) << std::endl; - } + // for(int i = 0; i < input_hosts.size(); i++){ + // log_debug("[gengen] [namespace:%d][node_id:%d] %s", + // input_hosts[i].ns, + // input_hosts[i].id, + // generate_woof_path(OUT_WF_TYPE, input_hosts[i].ns, input_hosts[i].id)); + // std::cout << generate_woof_path(OUT_WF_TYPE, input_hosts[i].ns, input_hosts[i].id) << std::endl; + // } const unsigned long operand_count = operands.size(); struct ts_value* operands_array[operand_count]; const struct ts_value* const_operands_array[operand_count]; diff --git a/type_system/src/ts_type.c b/type_system/src/ts_type.c index 64a46de..d1d3c5b 100644 --- a/type_system/src/ts_type.c +++ b/type_system/src/ts_type.c @@ -183,15 +183,13 @@ bool load_array_value(struct ts_value_array* const array, const char* uri) { // uuid_unparse_lower(array->storage_system.id, uuid_string); strcat(woof_id, uuid_string); - printf("gengen6 %s\n",woof_id); - + // Wait if WooF does not exist or no entry is written + while (!woof_exists(woof_id)) {} + + while (WooFGetLatestSeqno(woof_id) == 0) {} + const unsigned long index = WooFGetLatestSeqno(woof_id); - // Stop loading if WooF does not exist or no entry is written - if (WooFInvalid(index) || index == 0) { - return false; - } - switch (array->type) { case TS_UNINITIALIZED: array->value = NULL; From 561de46a823fa77885fec784646a94f1dc90d5c5 Mon Sep 17 00:00:00 2001 From: Ononymous Date: Tue, 25 Feb 2025 13:53:35 -0800 Subject: [PATCH 11/20] small change with distributed test --- benchmarks/CMakeLists.txt | 9 +++++++++ subscription_event_handler.cpp | 7 ------- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt index b1d3c74..78dcea9 100644 --- a/benchmarks/CMakeLists.txt +++ b/benchmarks/CMakeLists.txt @@ -73,6 +73,15 @@ target_link_libraries(matmul-rich PRIVATE ts_array PRIVATE ts_matrix) +add_executable(matmul-distributed + matmul-distributed.cpp) +target_link_libraries(matmul-distributed + PRIVATE df_interface + PRIVATE ts_type + PRIVATE df_operation + PRIVATE ts_array + PRIVATE ts_matrix) + add_executable(k_means_mapreduce k_means_mapreduce.cpp) target_link_libraries(k_means_mapreduce PRIVATE df_interface diff --git a/subscription_event_handler.cpp b/subscription_event_handler.cpp index 619e7da..07cc24c 100644 --- a/subscription_event_handler.cpp +++ b/subscription_event_handler.cpp @@ -22,13 +22,6 @@ operand perform_operation(const std::vector& operands, const struct df_operation operation, struct df_operation_metadata* const operation_metadata, const std::vector& input_hosts) { - // for(int i = 0; i < input_hosts.size(); i++){ - // log_debug("[gengen] [namespace:%d][node_id:%d] %s", - // input_hosts[i].ns, - // input_hosts[i].id, - // generate_woof_path(OUT_WF_TYPE, input_hosts[i].ns, input_hosts[i].id)); - // std::cout << generate_woof_path(OUT_WF_TYPE, input_hosts[i].ns, input_hosts[i].id) << std::endl; - // } const unsigned long operand_count = operands.size(); struct ts_value* operands_array[operand_count]; const struct ts_value* const_operands_array[operand_count]; From 8281e694625e4fb14efc815cd2393cadfe5e86c1 Mon Sep 17 00:00:00 2001 From: Ononymous Date: Wed, 26 Feb 2025 18:31:55 -0800 Subject: [PATCH 12/20] distributed tests works with loops --- tests/distributed_vector.cpp | 58 +++++++++++++++++------------------ tests/distributed_vector2.cpp | 49 +++++++++++++++-------------- 2 files changed, 55 insertions(+), 52 deletions(-) diff --git a/tests/distributed_vector.cpp b/tests/distributed_vector.cpp index 9011e16..7a2b682 100644 --- a/tests/distributed_vector.cpp +++ b/tests/distributed_vector.cpp @@ -28,42 +28,42 @@ int main() { subscribe(ns, 1, 0, ns, 2); laminar_setup(); - if (curr_host_id == 1){ - uint8_t array[] = {1, 2, 3}; - struct ts_value* operand_value = value_from_unsigned_byte_array(array, 3); - operand op(operand_value, 1); - write_value(&op.operand_value); - fire_operand(ns, 2, &op); - value_deep_delete(operand_value); + for (unsigned long long itr = 1; itr < 30; itr++){ + std::cout << "itr: " << itr << std::endl; + if (curr_host_id == 1){ + uint8_t array[] = {1, 2, 3}; + struct ts_value* operand_value = value_from_unsigned_byte_array(array, 3); + operand op(operand_value, itr); + write_value(&op.operand_value); + fire_operand(ns, 2, &op); + value_deep_delete(operand_value); + sleep(1); - operand result; - get_result(ns, 1, &result, 1); - ts_value* loaded_result = load_value(&result.operand_value); + std::cout << "gets: " << (int)woof_last_seq(generate_woof_path(OUT_WF_TYPE, ns, 1)) << std::endl; + operand result; + get_result(ns, 1, &result, itr); + ts_value* loaded_result = load_value(&result.operand_value); - uint8_t result_array[3]; - get_unsigned_byte_array(result_array, loaded_result); + uint8_t result_array[3]; + get_unsigned_byte_array(result_array, loaded_result); - for(int i = 0; i < sizeof(result_array); i++){ - std::cout << (int)result_array[i] << std::endl; - } - } - else{ - operand result; - get_result(ns, 1, &result, 1); - std::cout << "hi1" << std::endl; - ts_value* loaded_result = load_value(&result.operand_value, ns, 1); - std::cout << "hi2" << std::endl; - if(loaded_result == NULL){ - std::cout << "null"< Date: Wed, 26 Feb 2025 19:10:50 -0800 Subject: [PATCH 13/20] matmul works with distributed --- benchmarks/matmul-distributed.cpp | 237 ++++++++++++++++++++++++++++++ benchmarks/matmul-rich.cpp | 4 +- df_interface.cpp | 18 ++- 3 files changed, 249 insertions(+), 10 deletions(-) create mode 100644 benchmarks/matmul-distributed.cpp diff --git a/benchmarks/matmul-distributed.cpp b/benchmarks/matmul-distributed.cpp new file mode 100644 index 0000000..46a9665 --- /dev/null +++ b/benchmarks/matmul-distributed.cpp @@ -0,0 +1,237 @@ +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "../df_interface.h" +#include "type_system/ts_type.h" +#include "type_system/types/ts_array.h" +#include "type_system/types/ts_matrix.h" +#include "type_system/types/ts_primitive.h" +#include "type_system/types/ts_string.h" + +int a[8192][8192]; +int b[8192][8192]; + +// int a[4][4] = { +// {2, 2, 3, 4}, +// {4, 3, 2, 1}, +// {2, 4, 1, 3}, +// {3, 1, 4, 2} +// }; + +// int b[4][4] = { +// {2, 2, 3, 4}, +// {4, 3, 2, 1}, +// {2, 4, 1, 3}, +// {3, 1, 4, 2} +// }; + +bool TIMING = true; + +int curr_host_id = 2; + +double matmul_partition(const int mat_size, int n_partitions, int t_partitions, unsigned long long itr) { + auto start_time = std::chrono::high_resolution_clock::now(); + + // Write matrix A to operands in `t_partitions` chunks + int min_chunk_size = mat_size / t_partitions; + int overflow = mat_size % t_partitions; + + if (curr_host_id == 1) { + // Write matrix B to operand + struct ts_value* operand_value = value_from_integer_matrix(b, mat_size, mat_size); + + operand op(operand_value, itr); + write_value(&op.operand_value); + fire_operand(2, 1, &op); + value_deep_delete(operand_value); + } + else{ + //std::cout << "t_partitions: " << t_partitions << "min_chunk_size: " << min_chunk_size << "mat_size: " << mat_size << std::endl; + int start = 0, end = 0; + for (int i = 0; i < t_partitions - 1; i++) { + end = start + min_chunk_size - 1; + if (overflow) { + end++; + overflow--; + } + + // Write submatrix to operand + // std::cout << '[' << start << ", " << end << ']' << std::endl; + struct ts_value* operand_value = value_from_integer_matrix(a[start], end - start + 1, mat_size); + + operand op(operand_value, itr); + write_value(&op.operand_value); + fire_operand(1, i + 1, &op); + value_deep_delete(operand_value); + + start = end + 1; + } + + // Write final submatrix to operand + // std::cout << '[' << start << ", " << mat_size - 1 << ']' << std::endl; + struct ts_value* operand_value = value_from_integer_matrix(a[start], mat_size - start, mat_size); + operand op(operand_value, itr); + write_value(&op.operand_value); + fire_operand(1, t_partitions, &op); + value_deep_delete(operand_value); + } + + // usleep(5e5); + sleep(1); + + if (!TIMING) { + // Show matrix multiplication result + overflow = mat_size % t_partitions; + operand result; + for (unsigned int id = 1; id <= t_partitions; id++) { + int err = get_result(3, id, &result, itr); + if (err < 0) { + std::cout << "Failed to read the result " << std::endl; + } + ts_value* const result_value = load_value(&result.operand_value, 3, id); + int result_matrix[min_chunk_size + !!overflow][mat_size]; + + get_integer_matrix(result_matrix, result_value); + for (int i = 0; i < min_chunk_size + !!overflow; i++) { + for (int j = 0; j < mat_size; j++) { + std::cout << result_matrix[i][j] << " "; + } + std::cout << std::endl; + } + + if (overflow) { + overflow--; + } + } + + return 0.0; + } + + else { + operand op; + std::vector timestamps_ns; + for (unsigned int id = 1; id <= t_partitions; id++) { + int err = get_result(3, id, &op, itr); + + long ts = op.operand_value.value.ts_long; + timestamps_ns.push_back(ts); + } + + long start_ns = std::chrono::duration_cast( + start_time.time_since_epoch()) + .count(); + + long end_ns = *std::max_element(timestamps_ns.begin(), timestamps_ns.end()); + + double latency_ms = (double)(end_ns - start_ns) / 1e6; + std::cout << "latency: " << latency_ms << "ms" << std::endl; + + return latency_ms; + } +} + +double avg(std::vector& v) { + double sum = 0.0; + + for (double d : v) + sum += d; + + return sum / v.size(); +} + +#define ARGS "p:d:" +char *Usage = "matmul-distributed -p partitions -d dimention\n"; + +int main(int argc, char **argv) { + // Program laminar set up + system("sudo find . -name \"lmr*\" -delete"); + laminar_reset(); /* reset setup data structures */ + + set_host(curr_host_id); + add_host(1, "169.231.230.190", "/home/ubuntu/laminar/build/bin/"); + add_host(2, "169.231.230.3", "/home/ubuntu/laminar/build/bin/"); + laminar_init(); + + // Matrix set up + int c; + int partitions; + int dim; + + partitions = 0; + dim = 0; + + while((c = getopt(argc,argv,ARGS)) != EOF) { + switch(c) { + case 'p': + partitions = atoi(optarg); + break; + case 'd': + dim = atoi(optarg); + break; + default: + fprintf(stderr, + "unrecognized command %c\n", + (char)c); + fprintf(stderr,"usage: %s",Usage); + break; + } + } + + if(partitions == 0) { + fprintf(stderr,"must specify partitions\n"); + fprintf(stderr,"usage: %s",Usage); + exit(1); + } + + if(dim == 0) { + fprintf(stderr,"must specify partitions\n"); + fprintf(stderr,"usage: %s",Usage); + exit(1); + } + + int t_partitions; + + if(dim < partitions) { + t_partitions = dim; + } else { + t_partitions = partitions; + } + + // Create operands + for (int i = 1; i <= t_partitions; i++) { + add_operand(1, 2, i); // A / t_partitions + } + + add_operand(2, 1, 1); // B + + // Create node for each partition output + for (int i = 1; i <= t_partitions; i++) { + add_node( + 3, (i % 2 + 1), i, + {DF_CUSTOM, (TIMING ? MATRIX_MULTIPLY_TIMING : MATRIX_MULTIPLY)}); + subscribe(3, i, 0, 1, i); // Rows of matrix A (A / t_partitions) + subscribe(3, i, 1, 2, 1); // Matrix B + } + + /* Run program */ + laminar_setup(); + + std::vector latencies_ms; + for (unsigned long long itr = 1; itr < 31; itr++) { + latencies_ms.push_back(matmul_partition(dim, partitions, t_partitions, itr)); + } + + for (double t : latencies_ms) + std::cout << t << ", "; + + std::cout << "\nAverage: " << avg(latencies_ms) << std::endl; + + return 0; +} diff --git a/benchmarks/matmul-rich.cpp b/benchmarks/matmul-rich.cpp index f7e1e10..19d15e4 100644 --- a/benchmarks/matmul-rich.cpp +++ b/benchmarks/matmul-rich.cpp @@ -26,7 +26,7 @@ void transpose(std::vector>& m) { } void matmul() { - system("sudo find . -name \"laminar*\" -delete"); + system("sudo find . -name \"lmr*\" -delete"); laminar_reset(); /* reset setup data structures */ set_host(1); add_host(1, "127.0.0.1", "/home/centos/cspot/build/bin/"); @@ -158,7 +158,7 @@ int a[8192][8192]; int b[8192][8192]; double matmul_partition(const int mat_size, int n_partitions, bool timing) { - system("sudo find . -name \"laminar*\" -delete"); + system("sudo find . -name \"lmr*\" -delete"); laminar_reset(); /* reset setup data structures */ set_host(1); add_host(1, "127.0.0.1", "/home/centos/cspot/build/bin/"); diff --git a/df_interface.cpp b/df_interface.cpp index 803deae..01eb501 100644 --- a/df_interface.cpp +++ b/df_interface.cpp @@ -329,18 +329,20 @@ void add_operand(const int ns, const int host_id, const int id) { void fire_operand(const int ns, const int id, const operand* const op, const bool trigger_handler) { std::string output_woof = generate_woof_path(OUT_WF_TYPE, ns, id); - unsigned long long curr_itr = (unsigned long long)woof_last_seq(output_woof); + // unsigned long long curr_itr = (unsigned long long)woof_last_seq(output_woof); + // fire only if the iteration number is 1 more then the previous iteration -printf("fire_operand: output_woof: %s, value: %f\n", + printf("fire_operand: output_woof: %s, value: %f\n", output_woof.c_str(), op->operand_value.value.ts_double); + + while (WooFInvalid(woof_last_seq(output_woof))) {} + while (woof_last_seq(output_woof) + 1 != op->itr) {} - if (curr_itr + 1 == op->itr) { - if (!trigger_handler) { - woof_put(output_woof, "", op); - } else { - woof_put(output_woof, OUTPUT_HANDLER, op); - } + if (!trigger_handler) { + woof_put(output_woof, "", op); + } else { + woof_put(output_woof, OUTPUT_HANDLER, op); } } From 574c8e293509aba26ef88d9e0782e55470ed33c7 Mon Sep 17 00:00:00 2001 From: Ononymous Date: Thu, 27 Feb 2025 12:54:27 -0800 Subject: [PATCH 14/20] added new namespace address --- benchmarks/matmul-distributed.cpp | 4 ++-- tests/distributed_addition.cpp | 4 ++-- tests/distributed_simple_laminar_example.cpp | 4 ++-- tests/distributed_vector.cpp | 4 ++-- tests/distributed_vector2.cpp | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/benchmarks/matmul-distributed.cpp b/benchmarks/matmul-distributed.cpp index 46a9665..fb0a587 100644 --- a/benchmarks/matmul-distributed.cpp +++ b/benchmarks/matmul-distributed.cpp @@ -155,8 +155,8 @@ int main(int argc, char **argv) { laminar_reset(); /* reset setup data structures */ set_host(curr_host_id); - add_host(1, "169.231.230.190", "/home/ubuntu/laminar/build/bin/"); - add_host(2, "169.231.230.3", "/home/ubuntu/laminar/build/bin/"); + add_host(1, "169.231.230.190", "/disk2/cspot-namespace-platform/"); + add_host(2, "169.231.230.3", "/disk2/cspot-namespace-platform/"); laminar_init(); // Matrix set up diff --git a/tests/distributed_addition.cpp b/tests/distributed_addition.cpp index a5c0903..53a5338 100644 --- a/tests/distributed_addition.cpp +++ b/tests/distributed_addition.cpp @@ -15,8 +15,8 @@ int main() { int curr_host_id = 2; set_host(curr_host_id); - add_host(1, "169.231.230.190", "/home/ubuntu/laminar/build/bin/"); - add_host(2, "169.231.230.3", "/home/ubuntu/laminar/build/bin/"); + add_host(1, "169.231.230.190", "/disk2/cspot-namespace-platform/"); + add_host(2, "169.231.230.3", "/disk2/cspot-namespace-platform/"); // Nodes is on device 1: (namespace, host, node) diff --git a/tests/distributed_simple_laminar_example.cpp b/tests/distributed_simple_laminar_example.cpp index e22421c..36b19a3 100644 --- a/tests/distributed_simple_laminar_example.cpp +++ b/tests/distributed_simple_laminar_example.cpp @@ -23,8 +23,8 @@ int main() { // Set up two devices (change IPs and/or cspot namespaces) int curr_host_id = 2; set_host(curr_host_id); - add_host(1, "169.231.230.190", "/home/ubuntu/laminar/build/bin/"); - add_host(2, "169.231.230.3", "/home/ubuntu/laminar/build/bin/"); + add_host(1, "169.231.230.190", "/disk2/cspot-namespace-platform/"); + add_host(2, "169.231.230.3", "/disk2/cspot-namespace-platform/"); // Nodes diff --git a/tests/distributed_vector.cpp b/tests/distributed_vector.cpp index 7a2b682..b5d9297 100644 --- a/tests/distributed_vector.cpp +++ b/tests/distributed_vector.cpp @@ -17,8 +17,8 @@ int main() { int curr_host_id = 2; set_host(curr_host_id); - add_host(1, "169.231.230.190", "/home/ubuntu/laminar/build/bin/"); - add_host(2, "169.231.230.3", "/home/ubuntu/laminar/build/bin/"); + add_host(1, "169.231.230.190", "/disk2/cspot-namespace-platform/"); + add_host(2, "169.231.230.3", "/disk2/cspot-namespace-platform/"); laminar_init(); const struct df_operation parse = {DF_INTERNAL, DF_INTERNAL_NOOP}; diff --git a/tests/distributed_vector2.cpp b/tests/distributed_vector2.cpp index cca156a..0f8093f 100644 --- a/tests/distributed_vector2.cpp +++ b/tests/distributed_vector2.cpp @@ -17,8 +17,8 @@ int main() { int curr_host_id = 2; set_host(curr_host_id); - add_host(1, "169.231.230.190", "/home/ubuntu/laminar/build/bin/"); - add_host(2, "169.231.230.3", "/home/ubuntu/laminar/build/bin/"); + add_host(1, "169.231.230.190", "/disk2/cspot-namespace-platform/"); + add_host(2, "169.231.230.3", "/disk2/cspot-namespace-platform/"); laminar_init(); const struct df_operation parse = {DF_INTERNAL, DF_INTERNAL_NOOP}; From a2bf062a54b883fb19325879f480a89dc65aa05f Mon Sep 17 00:00:00 2001 From: Ononymous Date: Sat, 8 Mar 2025 19:59:24 -0800 Subject: [PATCH 15/20] new tests prove problem with distributed --- benchmarks/CMakeLists.txt | 9 ++++ benchmarks/matmul-dist-simple.cpp | 86 +++++++++++++++++++++++++++++++ benchmarks/matmul-distributed.cpp | 2 +- df_interface.cpp | 12 ++--- df_interface.h | 2 +- tests/CMakeLists.txt | 5 ++ tests/distributed_two.cpp | 52 +++++++++++++++++++ 7 files changed, 160 insertions(+), 8 deletions(-) create mode 100644 benchmarks/matmul-dist-simple.cpp create mode 100644 tests/distributed_two.cpp diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt index 78dcea9..55cb00c 100644 --- a/benchmarks/CMakeLists.txt +++ b/benchmarks/CMakeLists.txt @@ -73,6 +73,15 @@ target_link_libraries(matmul-rich PRIVATE ts_array PRIVATE ts_matrix) +add_executable(matmul-dist-simple + matmul-dist-simple.cpp) +target_link_libraries(matmul-dist-simple + PRIVATE df_interface + PRIVATE ts_type + PRIVATE df_operation + PRIVATE ts_array + PRIVATE ts_matrix) + add_executable(matmul-distributed matmul-distributed.cpp) target_link_libraries(matmul-distributed diff --git a/benchmarks/matmul-dist-simple.cpp b/benchmarks/matmul-dist-simple.cpp new file mode 100644 index 0000000..21b8a8c --- /dev/null +++ b/benchmarks/matmul-dist-simple.cpp @@ -0,0 +1,86 @@ +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "../df_interface.h" +#include "type_system/ts_type.h" +#include "type_system/types/ts_array.h" +#include "type_system/types/ts_matrix.h" +#include "type_system/types/ts_primitive.h" +#include "type_system/types/ts_string.h" + + +const size_t MAT_SIZE = 2; + +int a[MAT_SIZE][MAT_SIZE] = { + {1, 1}, + {1, 1} +}; +const int curr_host_id = 2; + +const int ns = 1; + +void fire_matrix(int matrix[MAT_SIZE][MAT_SIZE], int id, unsigned long long itr); + +int main(int argc, char **argv) { + // Program laminar set up + system("sudo find . -name \"lmr*\" -delete"); + laminar_reset(); /* reset setup data structures */ + laminar_init(); + + set_host(curr_host_id); + add_host(1, "169.231.230.190", "/disk2/cspot-namespace-platform/"); + add_host(2, "169.231.230.3", "/disk2/cspot-namespace-platform/"); + + add_node(ns, 1, 1, {DF_CUSTOM, MATRIX_MULTIPLY}); + add_node(ns, 2, 2, {DF_CUSTOM, MATRIX_MULTIPLY}); + + // Inputs + add_operand(ns, 1, 3); // a + add_operand(ns, 1, 4); // b + add_operand(ns, 1, 5); // c + + subscribe(ns, 1, 0, ns, 3); + subscribe(ns, 1, 1, ns, 4); + subscribe(ns, 2, 0, ns, 1); + subscribe(ns, 2, 1, ns, 5); + + /* Run program */ + laminar_setup(); + if(curr_host_id == 1){ + fire_matrix(a, 3, 1); + fire_matrix(a, 4, 1); + fire_matrix(a, 5, 1); + } + // get result + operand result; + int err = get_result(ns, 2, &result, 1); + if (err < 0) { + std::cout << "Failed to read the result " << std::endl; + } + ts_value* const result_value = load_value(&result.operand_value, ns, 2); + // print it out + int result_matrix[MAT_SIZE][MAT_SIZE]; + get_integer_matrix(result_matrix, result_value); + for (size_t i = 0; i < MAT_SIZE; i++) { + for (size_t j = 0; j < MAT_SIZE; j++) { + std::cout << result_matrix[i][j] << " "; + } + std::cout << std::endl; + } + return 0; +} + +void fire_matrix(int matrix[MAT_SIZE][MAT_SIZE], int id, unsigned long long itr){ + struct ts_value* operand_value = value_from_integer_matrix(matrix, MAT_SIZE, MAT_SIZE); + operand op(operand_value, itr); + write_value(&op.operand_value); + fire_operand(ns, id, &op); + value_deep_delete(operand_value); +} \ No newline at end of file diff --git a/benchmarks/matmul-distributed.cpp b/benchmarks/matmul-distributed.cpp index fb0a587..70ea46b 100644 --- a/benchmarks/matmul-distributed.cpp +++ b/benchmarks/matmul-distributed.cpp @@ -32,7 +32,7 @@ int b[8192][8192]; // {3, 1, 4, 2} // }; -bool TIMING = true; +bool TIMING = false; int curr_host_id = 2; diff --git a/df_interface.cpp b/df_interface.cpp index 01eb501..6c5be85 100644 --- a/df_interface.cpp +++ b/df_interface.cpp @@ -331,19 +331,19 @@ void fire_operand(const int ns, const int id, const operand* const op, const boo std::string output_woof = generate_woof_path(OUT_WF_TYPE, ns, id); // unsigned long long curr_itr = (unsigned long long)woof_last_seq(output_woof); - // fire only if the iteration number is 1 more then the previous iteration - printf("fire_operand: output_woof: %s, value: %f\n", - output_woof.c_str(), - op->operand_value.value.ts_double); - while (WooFInvalid(woof_last_seq(output_woof))) {} + // fire only if the iteration number is 1 more then the previous iteration while (woof_last_seq(output_woof) + 1 != op->itr) {} - + if (!trigger_handler) { woof_put(output_woof, "", op); } else { woof_put(output_woof, OUTPUT_HANDLER, op); } + + printf("fire_operand: output_woof: %s, value: %f\n", + output_woof.c_str(), + op->operand_value.value.ts_double); } int get_result(const int ns, const int id, operand* const res, const unsigned long itr) { diff --git a/df_interface.h b/df_interface.h index 7e6dbf3..d151a47 100644 --- a/df_interface.h +++ b/df_interface.h @@ -24,7 +24,7 @@ void set_host(int host_id); void add_host(int host_id, const std::string& host_ip, const std::string& woof_path, - enum RetryType retry_type = RETRY_EXPONENTIAL_BACKOFF); + enum RetryType retry_type = RETRY_LINEAR_BACKOFF); void add_node(int ns, int host_id, int id, struct df_operation operation); void add_operand(int ns, int host_id, int id); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 211ff19..cd5ca80 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -8,6 +8,11 @@ target_link_libraries(distributed_addition PRIVATE ts_array PRIVATE ts_matrix) +add_executable(distributed_two distributed_two.cpp) +target_link_libraries(distributed_two + PRIVATE df_interface + PRIVATE ts_type) + add_executable(distributed_vector distributed_vector.cpp) target_link_libraries(distributed_vector PRIVATE df_interface diff --git a/tests/distributed_two.cpp b/tests/distributed_two.cpp new file mode 100644 index 0000000..d2ea73c --- /dev/null +++ b/tests/distributed_two.cpp @@ -0,0 +1,52 @@ +#include "../df_interface.h" +#include "type_system/types/ts_primitive.h" + +#include +#include +#include + +int main() { + int ns = 1; // Laminar Namespace (not CSPOT's) + system("sudo find . -name \"lmr*\" -delete"); + laminar_reset(); /* reset setup data structures */ + laminar_init(); + + // Set up two devices (change IPs and/or cspot namespaces) + int curr_host_id = 2; + set_host(curr_host_id); + add_host(1, "169.231.230.190", "/disk2/cspot-namespace-platform/"); + add_host(2, "169.231.230.3", "/disk2/cspot-namespace-platform/"); + + // Nodes + add_node(ns, 1, 3, {.category = DF_ARITHMETIC, .operation = DF_ARITH_MULTIPLICATION}); // a * b + + // Inputs + add_operand(ns, 1, 1); // a + add_operand(ns, 2, 2); // b + + // Edges + subscribe("1:3:0", "1:1"); // ADD[0] <-- a + subscribe("1:3:1", "1:2"); // ADD[1] <-- b + + laminar_setup(); + if(curr_host_id == 1) { + // Example: (1 + 2) * 3 + struct ts_value value_a{}; + set_double(&value_a, 1); + operand op_a(&value_a, 1); + fire_operand(ns, 1, &op_a); + } + else { + struct ts_value value_b{}; + set_double(&value_b, 2); + operand op_b(&value_b, 1); + fire_operand(ns, 2, &op_b); + } + operand result; + int err = get_result(ns, 3, &result, 1); + if (err < 0) { + std::cout << "Failed to read the result " << std::endl; + } + + std::cout << "Result: " << result.operand_value.value.ts_double << std::endl; +} From 43e69d886b0971918ed23f209cd53f43fa2ce92a Mon Sep 17 00:00:00 2001 From: Ononymous Date: Thu, 13 Mar 2025 15:51:06 -0700 Subject: [PATCH 16/20] fixed bug --- df.h | 8 ++++++-- subscription_event_handler.cpp | 3 ++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/df.h b/df.h index 8c2ae83..4bb1905 100644 --- a/df.h +++ b/df.h @@ -80,11 +80,15 @@ struct operand { struct cached_output { operand op; unsigned long long seq; // CSPOT seq in output woof + int input_ns; + int input_id; // Defaults execution iteration and seq to 0 so initial access is thrown out and updated - explicit cached_output(const operand& op = operand(nullptr, 0), unsigned long long seq = 0) + explicit cached_output(const operand& op = operand(nullptr, 0), unsigned long long seq = 0, int input_ns = -1, int input_id = -1) : op(op) - , seq(seq) { + , seq(seq) + , input_ns(input_ns) + , input_id(input_id) { } }; diff --git a/subscription_event_handler.cpp b/subscription_event_handler.cpp index 07cc24c..470f9a8 100644 --- a/subscription_event_handler.cpp +++ b/subscription_event_handler.cpp @@ -179,6 +179,7 @@ extern "C" int subscription_event_handler(WOOF* wf, unsigned long seqno, void* p if (cached_last_output.op.itr == curr_itr) { // Operand for this seq has already been found and cached. Retrieve from cache and proceed op_values[input_index] = cached_last_output.op; + input_hosts[input_index] = subscription(cached_last_output.input_ns, cached_last_output.input_id); log_debug("[input:%lu] Already retrieved, continue", input_index); log_debug("[input:%lu] END input processing", input_index); continue; @@ -337,7 +338,7 @@ extern "C" int subscription_event_handler(WOOF* wf, unsigned long seqno, void* p input_index, subscription_operand.itr, new_sequence_number); - const cached_output new_cached_output = cached_output(subscription_operand, new_sequence_number); + const cached_output new_cached_output = cached_output(subscription_operand, new_sequence_number, input_subscription.ns, input_subscription.id); woof_put(last_used_sub_pos_woof, "", &new_cached_output); } From bcea226284ee747b0fd7f7c63633a106e01554bc Mon Sep 17 00:00:00 2001 From: Ononymous Date: Sat, 15 Mar 2025 16:03:29 -0700 Subject: [PATCH 17/20] verified no bug --- benchmarks/matmul-dist-simple.cpp | 55 ++++++++++++++++--------------- benchmarks/matmul-distributed.cpp | 33 +++++++++---------- 2 files changed, 45 insertions(+), 43 deletions(-) diff --git a/benchmarks/matmul-dist-simple.cpp b/benchmarks/matmul-dist-simple.cpp index 21b8a8c..b9bd5c5 100644 --- a/benchmarks/matmul-dist-simple.cpp +++ b/benchmarks/matmul-dist-simple.cpp @@ -39,42 +39,45 @@ int main(int argc, char **argv) { add_host(2, "169.231.230.3", "/disk2/cspot-namespace-platform/"); add_node(ns, 1, 1, {DF_CUSTOM, MATRIX_MULTIPLY}); - add_node(ns, 2, 2, {DF_CUSTOM, MATRIX_MULTIPLY}); // Inputs - add_operand(ns, 1, 3); // a - add_operand(ns, 1, 4); // b - add_operand(ns, 1, 5); // c + add_operand(ns, 1, 2); // a + add_operand(ns, 2, 3); // b - subscribe(ns, 1, 0, ns, 3); - subscribe(ns, 1, 1, ns, 4); - subscribe(ns, 2, 0, ns, 1); - subscribe(ns, 2, 1, ns, 5); + subscribe(ns, 1, 0, ns, 2); + subscribe(ns, 1, 1, ns, 3); /* Run program */ laminar_setup(); - if(curr_host_id == 1){ - fire_matrix(a, 3, 1); - fire_matrix(a, 4, 1); - fire_matrix(a, 5, 1); + if (curr_host_id == 1) { + fire_matrix(a, 2, 1); } - // get result - operand result; - int err = get_result(ns, 2, &result, 1); - if (err < 0) { - std::cout << "Failed to read the result " << std::endl; + else{ + fire_matrix(a, 3, 1); } - ts_value* const result_value = load_value(&result.operand_value, ns, 2); - // print it out - int result_matrix[MAT_SIZE][MAT_SIZE]; - get_integer_matrix(result_matrix, result_value); - for (size_t i = 0; i < MAT_SIZE; i++) { - for (size_t j = 0; j < MAT_SIZE; j++) { - std::cout << result_matrix[i][j] << " "; + for (unsigned long long itr = 1; itr < 31; itr++) { + operand result; + int err = get_result(ns, 1, &result, itr); + if (err < 0) { + std::cout << "Failed to read the result " << std::endl; + } + ts_value* const result_value = load_value(&result.operand_value, ns, 1); + int result_matrix[MAT_SIZE][MAT_SIZE]; + + get_integer_matrix(result_matrix, result_value); + for (int i = 0; i < MAT_SIZE; i++) { + for (int j = 0; j < MAT_SIZE; j++) { + std::cout << result_matrix[i][j] << " "; + } + std::cout << std::endl; + } + if (curr_host_id == 1) { + fire_matrix(a, 2, itr+1); + } + else{ + fire_matrix(result_matrix, 3, itr+1); } - std::cout << std::endl; } - return 0; } void fire_matrix(int matrix[MAT_SIZE][MAT_SIZE], int id, unsigned long long itr){ diff --git a/benchmarks/matmul-distributed.cpp b/benchmarks/matmul-distributed.cpp index 70ea46b..dd8436b 100644 --- a/benchmarks/matmul-distributed.cpp +++ b/benchmarks/matmul-distributed.cpp @@ -15,22 +15,22 @@ #include "type_system/types/ts_primitive.h" #include "type_system/types/ts_string.h" -int a[8192][8192]; -int b[8192][8192]; - -// int a[4][4] = { -// {2, 2, 3, 4}, -// {4, 3, 2, 1}, -// {2, 4, 1, 3}, -// {3, 1, 4, 2} -// }; - -// int b[4][4] = { -// {2, 2, 3, 4}, -// {4, 3, 2, 1}, -// {2, 4, 1, 3}, -// {3, 1, 4, 2} -// }; +// int a[8192][8192]; +// int b[8192][8192]; + +int a[4][4] = { + {2, 2, 3, 4}, + {4, 3, 2, 1}, + {2, 4, 1, 3}, + {3, 1, 4, 2} +}; + +int b[4][4] = { + {2, 2, 3, 4}, + {4, 3, 2, 1}, + {2, 4, 1, 3}, + {3, 1, 4, 2} +}; bool TIMING = false; @@ -84,7 +84,6 @@ double matmul_partition(const int mat_size, int n_partitions, int t_partitions, } // usleep(5e5); - sleep(1); if (!TIMING) { // Show matrix multiplication result From 498f8a4ec746bd52eb3ccec5b7bc623d9a3fe77f Mon Sep 17 00:00:00 2001 From: Gen Tamada Date: Tue, 8 Apr 2025 10:12:35 -0700 Subject: [PATCH 18/20] changed ip addr to new machine --- benchmarks/matmul-dist-simple.cpp | 2 +- benchmarks/matmul-distributed.cpp | 2 +- tests/distributed_addition.cpp | 2 +- tests/distributed_simple_laminar_example.cpp | 2 +- tests/distributed_two.cpp | 2 +- tests/distributed_vector.cpp | 2 +- tests/distributed_vector2.cpp | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/benchmarks/matmul-dist-simple.cpp b/benchmarks/matmul-dist-simple.cpp index b9bd5c5..1abc61e 100644 --- a/benchmarks/matmul-dist-simple.cpp +++ b/benchmarks/matmul-dist-simple.cpp @@ -36,7 +36,7 @@ int main(int argc, char **argv) { set_host(curr_host_id); add_host(1, "169.231.230.190", "/disk2/cspot-namespace-platform/"); - add_host(2, "169.231.230.3", "/disk2/cspot-namespace-platform/"); + add_host(2, "169.231.230.247", "/disk2/cspot-namespace-platform/"); add_node(ns, 1, 1, {DF_CUSTOM, MATRIX_MULTIPLY}); diff --git a/benchmarks/matmul-distributed.cpp b/benchmarks/matmul-distributed.cpp index dd8436b..55ed0ff 100644 --- a/benchmarks/matmul-distributed.cpp +++ b/benchmarks/matmul-distributed.cpp @@ -155,7 +155,7 @@ int main(int argc, char **argv) { set_host(curr_host_id); add_host(1, "169.231.230.190", "/disk2/cspot-namespace-platform/"); - add_host(2, "169.231.230.3", "/disk2/cspot-namespace-platform/"); + add_host(2, "169.231.230.247", "/disk2/cspot-namespace-platform/"); laminar_init(); // Matrix set up diff --git a/tests/distributed_addition.cpp b/tests/distributed_addition.cpp index 53a5338..a98a1e5 100644 --- a/tests/distributed_addition.cpp +++ b/tests/distributed_addition.cpp @@ -16,7 +16,7 @@ int main() { int curr_host_id = 2; set_host(curr_host_id); add_host(1, "169.231.230.190", "/disk2/cspot-namespace-platform/"); - add_host(2, "169.231.230.3", "/disk2/cspot-namespace-platform/"); + add_host(2, "169.231.230.247", "/disk2/cspot-namespace-platform/"); // Nodes is on device 1: (namespace, host, node) diff --git a/tests/distributed_simple_laminar_example.cpp b/tests/distributed_simple_laminar_example.cpp index 36b19a3..f9b9af7 100644 --- a/tests/distributed_simple_laminar_example.cpp +++ b/tests/distributed_simple_laminar_example.cpp @@ -24,7 +24,7 @@ int main() { int curr_host_id = 2; set_host(curr_host_id); add_host(1, "169.231.230.190", "/disk2/cspot-namespace-platform/"); - add_host(2, "169.231.230.3", "/disk2/cspot-namespace-platform/"); + add_host(2, "169.231.230.247", "/disk2/cspot-namespace-platform/"); // Nodes diff --git a/tests/distributed_two.cpp b/tests/distributed_two.cpp index d2ea73c..ace2fc8 100644 --- a/tests/distributed_two.cpp +++ b/tests/distributed_two.cpp @@ -15,7 +15,7 @@ int main() { int curr_host_id = 2; set_host(curr_host_id); add_host(1, "169.231.230.190", "/disk2/cspot-namespace-platform/"); - add_host(2, "169.231.230.3", "/disk2/cspot-namespace-platform/"); + add_host(2, "169.231.230.247", "/disk2/cspot-namespace-platform/"); // Nodes add_node(ns, 1, 3, {.category = DF_ARITHMETIC, .operation = DF_ARITH_MULTIPLICATION}); // a * b diff --git a/tests/distributed_vector.cpp b/tests/distributed_vector.cpp index b5d9297..d175ba3 100644 --- a/tests/distributed_vector.cpp +++ b/tests/distributed_vector.cpp @@ -18,7 +18,7 @@ int main() { int curr_host_id = 2; set_host(curr_host_id); add_host(1, "169.231.230.190", "/disk2/cspot-namespace-platform/"); - add_host(2, "169.231.230.3", "/disk2/cspot-namespace-platform/"); + add_host(2, "169.231.230.247", "/disk2/cspot-namespace-platform/"); laminar_init(); const struct df_operation parse = {DF_INTERNAL, DF_INTERNAL_NOOP}; diff --git a/tests/distributed_vector2.cpp b/tests/distributed_vector2.cpp index 0f8093f..3456126 100644 --- a/tests/distributed_vector2.cpp +++ b/tests/distributed_vector2.cpp @@ -18,7 +18,7 @@ int main() { int curr_host_id = 2; set_host(curr_host_id); add_host(1, "169.231.230.190", "/disk2/cspot-namespace-platform/"); - add_host(2, "169.231.230.3", "/disk2/cspot-namespace-platform/"); + add_host(2, "169.231.230.247", "/disk2/cspot-namespace-platform/"); laminar_init(); const struct df_operation parse = {DF_INTERNAL, DF_INTERNAL_NOOP}; From 902f80e66146202dc5ce95e33b2ff2b728d1c04b Mon Sep 17 00:00:00 2001 From: Gen Tamada Date: Mon, 14 Apr 2025 21:32:13 -0700 Subject: [PATCH 19/20] removed spin wait --- type_system/src/ts_type.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/type_system/src/ts_type.c b/type_system/src/ts_type.c index d1d3c5b..1c09fca 100644 --- a/type_system/src/ts_type.c +++ b/type_system/src/ts_type.c @@ -183,13 +183,13 @@ bool load_array_value(struct ts_value_array* const array, const char* uri) { // uuid_unparse_lower(array->storage_system.id, uuid_string); strcat(woof_id, uuid_string); - // Wait if WooF does not exist or no entry is written - while (!woof_exists(woof_id)) {} - - while (WooFGetLatestSeqno(woof_id) == 0) {} - const unsigned long index = WooFGetLatestSeqno(woof_id); + // Stop loading if WooF does not exist or no entry is written + if (WooFInvalid(index) || index == 0) { + return false; + } + switch (array->type) { case TS_UNINITIALIZED: array->value = NULL; From c5d2cab6003231be486d6f982ff3531d730dc016 Mon Sep 17 00:00:00 2001 From: Gen Tamada Date: Mon, 14 Apr 2025 21:55:29 -0700 Subject: [PATCH 20/20] removed extra content and added comments Co-authored-by: Krish Chaudhary --- .vscode/settings.json | 74 ------------------- benchmarks/matmul-dist-simple.cpp | 2 + benchmarks/matmul-distributed.cpp | 3 + df.h | 1 + df_interface.cpp | 6 +- df_interface.h | 3 +- .../logger_system/df_logger_settings.h | 2 +- subscription_event_handler.cpp | 5 ++ tests/distributed_addition.cpp | 3 + tests/distributed_two.cpp | 4 + tests/distributed_vector.cpp | 4 + tests/distributed_vector2.cpp | 4 + .../include/type_system/type_system/ts_type.h | 1 + .../type_system/type_system/ts_types.h | 1 - type_system/src/ts_type.c | 3 +- 15 files changed, 34 insertions(+), 82 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index f8af820..0000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,74 +0,0 @@ -{ - "files.associations": { - "*.hpp": "cpp", - "array": "cpp", - "atomic": "cpp", - "bit": "cpp", - "*.tcc": "cpp", - "bitset": "cpp", - "cctype": "cpp", - "chrono": "cpp", - "clocale": "cpp", - "cmath": "cpp", - "codecvt": "cpp", - "compare": "cpp", - "concepts": "cpp", - "condition_variable": "cpp", - "cstdarg": "cpp", - "cstddef": "cpp", - "cstdint": "cpp", - "cstdio": "cpp", - "cstdlib": "cpp", - "cstring": "cpp", - "ctime": "cpp", - "cwchar": "cpp", - "cwctype": "cpp", - "deque": "cpp", - "list": "cpp", - "map": "cpp", - "set": "cpp", - "string": "cpp", - "unordered_map": "cpp", - "unordered_set": "cpp", - "vector": "cpp", - "exception": "cpp", - "algorithm": "cpp", - "functional": "cpp", - "iterator": "cpp", - "memory": "cpp", - "memory_resource": "cpp", - "numeric": "cpp", - "optional": "cpp", - "random": "cpp", - "ratio": "cpp", - "regex": "cpp", - "string_view": "cpp", - "system_error": "cpp", - "tuple": "cpp", - "type_traits": "cpp", - "utility": "cpp", - "fstream": "cpp", - "future": "cpp", - "initializer_list": "cpp", - "iomanip": "cpp", - "iosfwd": "cpp", - "iostream": "cpp", - "istream": "cpp", - "limits": "cpp", - "mutex": "cpp", - "new": "cpp", - "numbers": "cpp", - "ostream": "cpp", - "semaphore": "cpp", - "shared_mutex": "cpp", - "sstream": "cpp", - "stdexcept": "cpp", - "stop_token": "cpp", - "streambuf": "cpp", - "thread": "cpp", - "cinttypes": "cpp", - "typeinfo": "cpp", - "valarray": "cpp", - "variant": "cpp" - } -} \ No newline at end of file diff --git a/benchmarks/matmul-dist-simple.cpp b/benchmarks/matmul-dist-simple.cpp index 1abc61e..09070a7 100644 --- a/benchmarks/matmul-dist-simple.cpp +++ b/benchmarks/matmul-dist-simple.cpp @@ -1,3 +1,5 @@ +// Test file testing distributed array capabilities - multiplies a matrix by itself and the result of previous iteration - distributed across two hosts. +// Written by Krish & Gen #include #include diff --git a/benchmarks/matmul-distributed.cpp b/benchmarks/matmul-distributed.cpp index 55ed0ff..aa54af9 100644 --- a/benchmarks/matmul-distributed.cpp +++ b/benchmarks/matmul-distributed.cpp @@ -1,3 +1,6 @@ +// Test file testing distributed array capabilities - multiplies two matrix inputs fromtwo machines - 30 seperate times. +// Written by Krish & Gen + #include #include diff --git a/df.h b/df.h index 4bb1905..cdf8b8e 100644 --- a/df.h +++ b/df.h @@ -84,6 +84,7 @@ struct cached_output { int input_id; // Defaults execution iteration and seq to 0 so initial access is thrown out and updated + // We are now storing the source of the cached outputs so we can fetch later in case of aggregate data explicit cached_output(const operand& op = operand(nullptr, 0), unsigned long long seq = 0, int input_ns = -1, int input_id = -1) : op(op) , seq(seq) diff --git a/df_interface.cpp b/df_interface.cpp index 6c5be85..831a8e0 100644 --- a/df_interface.cpp +++ b/df_interface.cpp @@ -331,6 +331,7 @@ void fire_operand(const int ns, const int id, const operand* const op, const boo std::string output_woof = generate_woof_path(OUT_WF_TYPE, ns, id); // unsigned long long curr_itr = (unsigned long long)woof_last_seq(output_woof); + // Spin waits that check the validity of the newest iteration and whether iteration while (WooFInvalid(woof_last_seq(output_woof))) {} // fire only if the iteration number is 1 more then the previous iteration while (woof_last_seq(output_woof) + 1 != op->itr) {} @@ -340,10 +341,6 @@ void fire_operand(const int ns, const int id, const operand* const op, const boo } else { woof_put(output_woof, OUTPUT_HANDLER, op); } - - printf("fire_operand: output_woof: %s, value: %f\n", - output_woof.c_str(), - op->operand_value.value.ts_double); } int get_result(const int ns, const int id, operand* const res, const unsigned long itr) { @@ -563,6 +560,7 @@ std::string graphviz_representation() { return g; } +// overloaded load_value with ns and id for aggregate data types struct ts_value* load_value(const struct ts_value* const unloaded_value, int ns, int id){ std::string woof_name = generate_woof_path(OUT_WF_TYPE, ns, id); diff --git a/df_interface.h b/df_interface.h index d151a47..a9eef0d 100644 --- a/df_interface.h +++ b/df_interface.h @@ -24,7 +24,7 @@ void set_host(int host_id); void add_host(int host_id, const std::string& host_ip, const std::string& woof_path, - enum RetryType retry_type = RETRY_LINEAR_BACKOFF); + enum RetryType retry_type = RETRY_EXPONENTIAL_BACKOFF); void add_node(int ns, int host_id, int id, struct df_operation operation); void add_operand(int ns, int host_id, int id); @@ -45,6 +45,7 @@ std::string generate_woof_host_url(int host_id); std::string graphviz_representation(); +// overloaded load_value in c++ form, for aggregate data types struct ts_value* load_value(const struct ts_value* const unloaded_value, int ns, int id); #endif // DF_INTERFACE_H diff --git a/logger_system/include/logger_system/df_logger_settings.h b/logger_system/include/logger_system/df_logger_settings.h index 799100c..ff48285 100644 --- a/logger_system/include/logger_system/df_logger_settings.h +++ b/logger_system/include/logger_system/df_logger_settings.h @@ -9,6 +9,6 @@ #define DIRECT_FLUSH #include "df_logger.h" -enum LOG_LEVELS CURRENT_LOG_LEVEL = DF_TRACE; +enum LOG_LEVELS CURRENT_LOG_LEVEL = DF_WARN; #endif // CSPOT_APPS_DF_LOGGER_SETTINGS_H diff --git a/subscription_event_handler.cpp b/subscription_event_handler.cpp index 470f9a8..c2889a2 100644 --- a/subscription_event_handler.cpp +++ b/subscription_event_handler.cpp @@ -26,6 +26,7 @@ operand perform_operation(const std::vector& operands, struct ts_value* operands_array[operand_count]; const struct ts_value* const_operands_array[operand_count]; for (size_t i = 0; i < operand_count; ++i) { + // Load the operand value, in case if aggregate data operands_array[i] = load_value(&operands[i].operand_value, input_hosts[i].ns, input_hosts[i].id); const_operands_array[i] = operands_array[i]; if (const_operands_array[i] == nullptr) { @@ -158,6 +159,7 @@ extern "C" int subscription_event_handler(WOOF* wf, unsigned long seqno, void* p log_debug("[namespace:%d][node_id:%d] has %lu inputs", node_namespace, node_id, input_count); // Scan through subscription outputs and collect operands std::vector op_values(input_count); + // Create a vector of subscriptions to hold the source hosts of the operands std::vector input_hosts(input_count); for (unsigned long input_index = 0; input_index < input_count; input_index++) { log_debug("[input:%lu] START input processing", input_index); @@ -179,6 +181,7 @@ extern "C" int subscription_event_handler(WOOF* wf, unsigned long seqno, void* p if (cached_last_output.op.itr == curr_itr) { // Operand for this seq has already been found and cached. Retrieve from cache and proceed op_values[input_index] = cached_last_output.op; + // Save the source of the cached output in case if need to fetch aggregate data later when performing operation input_hosts[input_index] = subscription(cached_last_output.input_ns, cached_last_output.input_id); log_debug("[input:%lu] Already retrieved, continue", input_index); log_debug("[input:%lu] END input processing", input_index); @@ -206,6 +209,8 @@ extern "C" int subscription_event_handler(WOOF* wf, unsigned long seqno, void* p input_subscription.ns, input_subscription.id, input_subscription.port); + + // set the operand input host input_hosts[input_index] = input_subscription; // Get relevant operand from subscription output (if it exists) diff --git a/tests/distributed_addition.cpp b/tests/distributed_addition.cpp index a98a1e5..4db2562 100644 --- a/tests/distributed_addition.cpp +++ b/tests/distributed_addition.cpp @@ -1,3 +1,6 @@ +// run a distributed addition operation on two devices. +// Written by Krish & Gen + #include "../df_interface.h" #include "type_system/types/ts_primitive.h" diff --git a/tests/distributed_two.cpp b/tests/distributed_two.cpp index ace2fc8..da989d8 100644 --- a/tests/distributed_two.cpp +++ b/tests/distributed_two.cpp @@ -1,3 +1,7 @@ +// passing a double from one host to another +// testing simple distributed capabilities +// Written by Krish & Gen + #include "../df_interface.h" #include "type_system/types/ts_primitive.h" diff --git a/tests/distributed_vector.cpp b/tests/distributed_vector.cpp index d175ba3..206e731 100644 --- a/tests/distributed_vector.cpp +++ b/tests/distributed_vector.cpp @@ -1,3 +1,7 @@ +// passing a vector of unsigned bytes between two hosts +// with 1 node, has 1 sec wait between each iteration +// Written by Krish & Gen + #include "../df_interface.h" #include "type_system/ts_type.h" #include "type_system/types/ts_array.h" diff --git a/tests/distributed_vector2.cpp b/tests/distributed_vector2.cpp index 3456126..2a2e8c5 100644 --- a/tests/distributed_vector2.cpp +++ b/tests/distributed_vector2.cpp @@ -1,3 +1,7 @@ +// passing a vector of unsigned bytes between two hosts +// with 2 nodes so we can achieve some synchronization +// Written by Krish & Gen + #include "../df_interface.h" #include "type_system/ts_type.h" #include "type_system/types/ts_array.h" diff --git a/type_system/include/type_system/type_system/ts_type.h b/type_system/include/type_system/type_system/ts_type.h index 7b7199a..8cc3421 100644 --- a/type_system/include/type_system/type_system/ts_type.h +++ b/type_system/include/type_system/type_system/ts_type.h @@ -17,6 +17,7 @@ extern "C" { */ struct ts_value* load_value(const struct ts_value* unloaded_value); +// used in df_interface.cpp for overloaded load_value bool load_string_value(struct ts_value_string* string, const char* uri); bool load_array_value(struct ts_value_array* array, const char* uri); diff --git a/type_system/include/type_system/type_system/ts_types.h b/type_system/include/type_system/type_system/ts_types.h index f47f3f4..6947364 100644 --- a/type_system/include/type_system/type_system/ts_types.h +++ b/type_system/include/type_system/type_system/ts_types.h @@ -78,7 +78,6 @@ struct ts_storage_system { uint8_t id[16]; // Unique identifier for composite items. Can but must not be used in combination with the // uuid.h library. size_t element_size; // Absolute element size of composite item in bytes. - // char uri[200]; }; diff --git a/type_system/src/ts_type.c b/type_system/src/ts_type.c index 1c09fca..5ab3acb 100644 --- a/type_system/src/ts_type.c +++ b/type_system/src/ts_type.c @@ -170,7 +170,8 @@ void* woof_get_array_value(const char* const woof_id, size_t value_size, const s bool load_array_value(struct ts_value_array* const array, const char* uri) { // NOLINT(misc-no-recursion) char woof_id[200]; - // STRATEGY + // if url is not null, use it to create the woof_id + // else use the default woof_id if (uri){ strcpy(woof_id, uri); strcat(woof_id, TS_STORAGE_PREFIX);