diff --git a/.gitignore b/.gitignore index fc4d337..5082443 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ build/* .vscode/* +sync.sh +logs/* \ No newline at end of file diff --git a/.gitmodules b/.gitmodules index 45c343f..625304e 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,3 @@ -[submodule "ext/memoRDMA"] - path = ext/memoRDMA - url = git@github.com:AndreasGeyerTUD/memoRDMA.git +[submodule "ext/MemConnect"] + path = ext/MemConnect + url = git@github.com:AndreasGeyerTUD/MemConnect.git diff --git a/CMakeLists.txt b/CMakeLists.txt index aa928fb..94d1dfd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,12 +1,13 @@ -cmake_minimum_required(VERSION 3.10) +cmake_minimum_required(VERSION 3.16) project(disaggDataProvider VERSION 0.1) set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD_REQUIRED True) -set(CMAKE_CXX_FLAGS "-Wall -pedantic -Wextra -fopenmp") +set(warnings "-Wall -Wextra -pedantic -Wcast-align -Wcast-qual -Wctor-dtor-privacy -Wdisabled-optimization -Wformat=2 -Winit-self -Wlogical-op -Wmissing-include-dirs -Wnoexcept -Wold-style-cast -Woverloaded-virtual -Wredundant-decls -Wshadow -Wsign-promo -Wstrict-null-sentinel -Wstrict-overflow=5 -Wundef -Wno-unused -Wno-variadic-macros -Wno-parentheses -fdiagnostics-show-option") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${warnings}") +set(OpenMP_CXX_FLAGS "${OpenMP_CXX_FLAGS} -fopenmp") set(OPTIMIZE_OPTIONS "-O3") -set(warnings "-Wall -Wextra -pedantic") FIND_PACKAGE(OpenMP) @@ -19,13 +20,21 @@ endif() include_directories(SYSTEM ${OpenMP_INCLUDE_PATH}) -add_subdirectory("ext/memoRDMA") -include_directories(ext/memoRDMA) -include_directories(ext/memoRDMA/include) -include_directories(ext/memoRDMA/src) +if(NOT TARGET build-time-make-directory) + add_custom_target(build-time-make-directory ALL + COMMAND ${CMAKE_COMMAND} -E make_directory ${CMAKE_CURRENT_SOURCE_DIR}/logs + COMMAND ${CMAKE_COMMAND} -E make_directory ${CMAKE_CURRENT_SOURCE_DIR}/logs/bench + COMMAND ${CMAKE_COMMAND} -E make_directory ${CMAKE_CURRENT_SOURCE_DIR}/logs/tests) +endif() + +add_subdirectory("ext/MemConnect") +include_directories(ext/MemConnect) +include_directories(ext/MemConnect/include) +include_directories(ext/MemConnect/src) # add_compile_options("-fsanitize=thread") add_compile_options("-fopenmp") +add_compile_options("${OPTIMIZE_OPTIONS}") set(SRC_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src) set(INC_DIR ${CMAKE_CURRENT_SOURCE_DIR}/include) @@ -34,13 +43,13 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include) file(GLOB_RECURSE SOURCES ${SRC_DIR}/*.cpp) file(GLOB_RECURSE HEADERS ${INC_DIR}/*.h) -file(GLOB_RECURSE SOURCES_RDMA ext/memoRDMA/src/*.cpp) -list(REMOVE_ITEM SOURCES_RDMA ${CMAKE_CURRENT_SOURCE_DIR}/ext/memoRDMA/src/memoRDMA_server.cpp) -file(GLOB_RECURSE HEADERS_RDMA ext/memoRDMA/include/*.h) +file(GLOB_RECURSE SOURCES_RDMA ext/MemConnect/src/*.cpp) +list(REMOVE_ITEM SOURCES_RDMA ${CMAKE_CURRENT_SOURCE_DIR}/ext/MemConnect/src/memConnect_server.cpp) +file(GLOB_RECURSE HEADERS_RDMA ext/MemConnect/include/*.h) add_library(memoLib ${SOURCES_RDMA} ${HEADERS_RDMA}) target_link_libraries(memoLib ibverbs) -include_directories(${CMAKE_SOURCE_DIR}/ext/memoRDMA ${CMAKE_SOURCE_DIR}/ext/memoRDMA/include) +include_directories(${CMAKE_SOURCE_DIR}/ext/MemConnect ${CMAKE_SOURCE_DIR}/ext/MemConnect/include) add_executable(disaggDataProvider ${SOURCES} ${HEADERS}) target_link_libraries(disaggDataProvider "pthread" "memoLib" "numa" ${OpenMP_CXX_LIBRARIES}) \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..1acda61 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# Data Provider + +Initial commit. This will be some testing repository diff --git a/ext/memoRDMA b/ext/memoRDMA deleted file mode 160000 index dd46216..0000000 --- a/ext/memoRDMA +++ /dev/null @@ -1 +0,0 @@ -Subproject commit dd462162e35f65ced4a4915516f7680ee1002da6 diff --git a/include/Benchmarks.hpp b/include/Benchmarks.hpp index bac0942..8dcb7a3 100644 --- a/include/Benchmarks.hpp +++ b/include/Benchmarks.hpp @@ -38,12 +38,15 @@ class Benchmarks { template void execUPIBenchmark(); - + void execRDMABenchmark(); void execRDMAHashJoinBenchmark(); void execRDMAHashJoinPGBenchmark(); void execRDMAHashJoinStarBenchmark(); + void execChunkVsChunkStreamBenchmark(); + void execPaxVsPaxStreamBenchmark(); + static const size_t WORKER_NUMBER = 8; - Worker workers[WORKER_NUMBER]; + // Worker workers[WORKER_NUMBER]; }; diff --git a/include/Column.h b/include/Column.hpp similarity index 93% rename from include/Column.h rename to include/Column.hpp index 32c30a6..c1d5952 100644 --- a/include/Column.h +++ b/include/Column.hpp @@ -1,14 +1,16 @@ #pragma once +#include + +#include #include #include #include #include #include -#include -#include +#include -#include "DataCatalog.h" +#include "DataCatalog.hpp" struct col_t { template @@ -68,8 +70,8 @@ struct col_t { }; private: - T* data; col_t* col; + T* data; }; void* data = nullptr; @@ -87,6 +89,9 @@ struct col_t { std::mutex appendLock; std::condition_variable iterator_data_available; + size_t highestConsecPackArrived = 0; + std::unordered_map arrived; + ~col_t() { // May be a problem when freeing memory not allocated with numa_alloc numa_free(data, sizeInBytes); @@ -182,16 +187,16 @@ struct col_t { current_end = data; } - void request_data(bool fetch_complete_column) { + void request_data(bool fetch_complete_column, bool asStream = false) { std::unique_lock _lk(iteratorLock); if (is_complete || requested_chunks > received_chunks) { - LOG_DEBUG2("" << std::endl;) + LOG_DEBUG2("Data request ignored for: '" << ident << "' because: " << (is_complete ? "is_complete" : "not_complete") << std::endl;) // Do Nothing, ignore. return; } ++requested_chunks; - DataCatalog::getInstance().fetchColStub(1, ident, fetch_complete_column); + DataCatalog::getInstance().fetchColStub(1, ident, fetch_complete_column, asStream); } void append_chunk(size_t offset, size_t chunkSize, char* remoteData) { @@ -202,8 +207,8 @@ struct col_t { memcpy(reinterpret_cast(data) + offset, remoteData, chunkSize); } - void advance_end_pointer(size_t size) { - current_end = reinterpret_cast(reinterpret_cast(current_end) + size); + void advance_end_pointer(size_t _size) { + current_end = reinterpret_cast(reinterpret_cast(current_end) + _size); iterator_data_available.notify_all(); } @@ -252,7 +257,7 @@ struct col_t { break; } default: { - using namespace memordma; + using namespace memConnect; LOG_ERROR("Saw gen_void but its not handled." << std::endl;) } } @@ -275,6 +280,10 @@ struct col_t { case col_data_t::gen_double: { return checksum(); } + default: { + using namespace memConnect; + LOG_ERROR("Saw gen_void but its not handled." << std::endl;) + } } return 0; } @@ -301,6 +310,10 @@ struct col_t { log_to_file_typed(logfile); break; } + default: { + using namespace memConnect; + LOG_ERROR("Saw gen_void but its not handled." << std::endl;) + } }; } @@ -313,7 +326,7 @@ struct col_t { auto tmp = static_cast(data); for (size_t i = 0; i < size && i < 10; ++i) { if (datatype == col_data_t::gen_smallint) { - ss << " " << (uint64_t)tmp[i]; + ss << " " << static_cast(tmp[i]); } else { ss << " " << tmp[i]; } @@ -337,7 +350,7 @@ struct col_t { std::ofstream log(logname); for (size_t i = 0; i < size; ++i) { if (datatype == col_data_t::gen_smallint) { - log << " " << (uint64_t)tmp[i]; + log << " " << static_cast(tmp[i]); } else { log << " " << tmp[i]; } @@ -366,7 +379,7 @@ struct table_t { std::default_random_engine generator; - uint8_t colId = 0; + size_t colId = 0; for (auto& col : columns) { col->ident = ident + "_col_" + std::to_string(colId); diff --git a/include/DataCatalog.h b/include/DataCatalog.hpp similarity index 91% rename from include/DataCatalog.h rename to include/DataCatalog.hpp index 7176130..3d2fb5a 100644 --- a/include/DataCatalog.h +++ b/include/DataCatalog.hpp @@ -10,7 +10,7 @@ #include #include -#include "ConnectionManager.h" +#include "ConnectionManager.hpp" enum class catalog_communication_code : uint8_t { send_column_info = 0xA0, @@ -19,8 +19,12 @@ enum class catalog_communication_code : uint8_t { receive_column_data, fetch_column_chunk, receive_column_chunk, + fetch_column_as_stream, + receive_column_as_stream, fetch_pseudo_pax, + fetch_pseudo_pax_stream, receive_pseudo_pax, + receive_pseudo_pax_stream, receive_last_pseudo_pax, reconfigure_chunk_size, ack_reconfigure_chunk_size, @@ -113,6 +117,10 @@ struct col_network_info { << " " << size_info * sizeof(double) << " Bytes"; break; } + default: { + using namespace memConnect; + LOG_ERROR("Saw gen_void but its not handled." << std::endl;) + } } return std::move(ss.str()); } @@ -192,8 +200,8 @@ class DataCatalog { DataCatalog(); public: - uint64_t dataCatalog_chunkMaxSize = 1024 * 512 * 4; - uint64_t dataCatalog_chunkThreshold = 1024 * 512 * 4; + uint64_t dataCatalog_chunkMaxSize = 1024 * 1024 * 2; + uint64_t dataCatalog_chunkThreshold = 1024 * 1024 * 2; std::map tables; static DataCatalog& getInstance(); @@ -228,6 +236,6 @@ class DataCatalog { void generateBenchmarkData(const uint64_t distinctLocalColumns, const uint64_t remoteColumnsForLocal, const uint64_t localColumnElements, const uint64_t percentageOfRemote, const uint64_t localNumaNode = 0, const uint64_t remoteNumaNode = 0, bool sendToRemote = false, bool createTables = false); // Communication stubs - void fetchColStub(std::size_t conId, std::string& ident, bool whole_column = true) const; - void fetchPseudoPax(std::size_t conId, std::vector idents) const; + void fetchColStub(const std::size_t conId, const std::string& ident, bool whole_column = false, bool asStream = false) const; + void fetchPseudoPax(const std::size_t conId, const std::vector& idents, const bool asStream = false) const; }; \ No newline at end of file diff --git a/include/Operators.hpp b/include/Operators.hpp index 7ab32ba..612249e 100644 --- a/include/Operators.hpp +++ b/include/Operators.hpp @@ -1,6 +1,6 @@ #pragma once -#include +#include #include @@ -11,7 +11,7 @@ class Operators { std::vector out_vec; out_vec.reserve(blockSize); if (isFirst) { - for (auto e = 0; e < blockSize; ++e) { + for (size_t e = 0; e < blockSize; ++e) { if (data[e] < predicate) { out_vec.push_back(e); } @@ -116,7 +116,7 @@ class Operators { std::vector out_vec; out_vec.reserve(blockSize); if (isFirst) { - for (auto e = 0; e < blockSize; ++e) { + for (size_t e = 0; e < blockSize; ++e) { if (predicate_1 <= data[e] && data[e] <= predicate_2) { out_vec.push_back(e); } diff --git a/include/Queries.h b/include/Queries.h deleted file mode 100644 index 4306788..0000000 --- a/include/Queries.h +++ /dev/null @@ -1,20 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -void executeLocalBenchmarkingQueries(std::string& logName, std::string locality); -void executeRemoteBenchmarkingQueries(std::string& logName); - -void executeFrontPageBenchmarkingQueries(std::string& logName); - -void executeLocalMTBenchmarkingQueries(std::string& logName, std::string locality); -void executeRemoteMTBenchmarkingQueries(std::string& logName); diff --git a/src/Benchmarks.cpp b/src/Benchmarks.cpp index 8fccdec..493945a 100644 --- a/src/Benchmarks.cpp +++ b/src/Benchmarks.cpp @@ -33,6 +33,15 @@ inline void wait_col_data_ready(col_t* _col, char* _data) { waitingTime += (std::chrono::high_resolution_clock::now() - s_ts); } +inline void wait_col_data_ready2(col_t* _col, char* _data, const size_t _bytes) { + auto s_ts = std::chrono::high_resolution_clock::now(); + std::unique_lock lk(_col->iteratorLock); + if (!(_data + _bytes <= reinterpret_cast(_col->current_end))) { + _col->iterator_data_available.wait(lk, [_col, _data, _bytes] { return reinterpret_cast(_data) + _bytes <= reinterpret_cast(_col->current_end); }); + } + waitingTime += (std::chrono::high_resolution_clock::now() - s_ts); +} + template inline void fetch_data(col_t* column, uint64_t* data, const bool reload) { if (remote) { @@ -56,7 +65,7 @@ inline std::vector less_than(col_t* column, const uint64_t predicate, co std::vector out_vec; - fetch_data(column, data, reload); + // fetch_data(column, data, reload); if (timings) { auto s_ts = std::chrono::high_resolution_clock::now(); @@ -94,7 +103,7 @@ inline std::vector greater_than(col_t* column, const uint64_t predicate, std::vector out_vec; - fetch_data(column, data, reload); + // fetch_data(column, data, reload); if (timings) { auto s_ts = std::chrono::high_resolution_clock::now(); @@ -151,7 +160,7 @@ inline std::vector between_incl(col_t* column, const uint64_t predicate_ std::vector out_vec; - fetch_data(column, data, reload); + // fetch_data(column, data, reload); if (timings) { auto s_ts = std::chrono::high_resolution_clock::now(); @@ -183,8 +192,154 @@ inline std::vector between_excl(col_t* column, const uint64_t predicate_ return out_vec; }; +template +uint64_t chunk(const std::vector& idents) { + if (idents.size() != 3) { + LOG_ERROR("The size of 'idents' was not equal to 3" << std::endl;) + return 0; + } + + std::chrono::time_point s_ts; + + col_t* col_0 = DataCatalog::getInstance().find_remote(idents[0]); + col_0->request_data(false, stream); + col_t* col_1 = DataCatalog::getInstance().find_remote(idents[1]); + col_1->request_data(false, stream); + col_t* col_2 = DataCatalog::getInstance().find_remote(idents[2]); + col_2->request_data(false, stream); + + DataCatalog::getInstance().fetchPseudoPax(1, idents, stream); + + size_t columnSize = col_0->size; + size_t max_elems_per_chunk = DataCatalog::getInstance().dataCatalog_chunkMaxSize / sizeof(uint64_t); + size_t currentBlockSize = max_elems_per_chunk; + + if (max_elems_per_chunk > Benchmarks::OPTIMAL_BLOCK_SIZE / sizeof(uint64_t)) { + currentBlockSize = Benchmarks::OPTIMAL_BLOCK_SIZE / sizeof(uint64_t); + } + + uint64_t sum = 0; + size_t baseOffset = 0; + size_t currentChunkElementsProcessed = 0; + + uint64_t* data_col_0 = reinterpret_cast(col_0->data); + uint64_t* data_col_1 = reinterpret_cast(col_1->data); + uint64_t* data_col_2 = reinterpret_cast(col_2->data); + + while (baseOffset < columnSize) { + const size_t elem_diff = columnSize - baseOffset; + if (elem_diff < currentBlockSize) { + currentBlockSize = elem_diff; + } + + wait_col_data_ready2(col_0, reinterpret_cast(data_col_0), currentBlockSize * sizeof(uint64_t)); + wait_col_data_ready2(col_1, reinterpret_cast(data_col_1), currentBlockSize * sizeof(uint64_t)); + wait_col_data_ready2(col_2, reinterpret_cast(data_col_2), currentBlockSize * sizeof(uint64_t)); + if (!stream && currentChunkElementsProcessed == 0 && baseOffset + max_elems_per_chunk < columnSize) { + col_0->request_data(false, stream); + col_1->request_data(false, stream); + col_2->request_data(false, stream); + } + + std::vector le_idx = greater_than(col_2, 5, baseOffset, currentBlockSize, + less_than(col_1, 25, baseOffset, currentBlockSize, + between_incl(col_0, 10, 30, baseOffset, currentBlockSize, {}, false), false), + false); + + s_ts = std::chrono::high_resolution_clock::now(); + for (size_t idx : le_idx) { + sum += (data_col_0[idx] * data_col_2[idx]); + // ++sum; + } + workingTime += (std::chrono::high_resolution_clock::now() - s_ts); + + baseOffset += currentBlockSize; + data_col_0 += currentBlockSize; + data_col_1 += currentBlockSize; + data_col_2 += currentBlockSize; + currentChunkElementsProcessed = baseOffset % max_elems_per_chunk; + } + + return sum; +} + +template +uint64_t pax(const std::vector& idents) { + if (idents.size() != 3) { + LOG_ERROR("The size of 'idents' was not equal to 3" << std::endl;) + return 0; + } + + std::chrono::time_point s_ts; + + col_t* col_0 = DataCatalog::getInstance().find_remote(idents[0]); + col_t* col_1 = DataCatalog::getInstance().find_remote(idents[1]); + col_t* col_2 = DataCatalog::getInstance().find_remote(idents[2]); + + DataCatalog::getInstance().fetchPseudoPax(1, idents, stream); + + size_t columnSize = col_0->size; + + size_t total_id_len = 0; + for (auto& id : idents) { + total_id_len += id.size(); + } + + size_t appMetaSize; + + if (stream) { + appMetaSize = sizeof(size_t) + (sizeof(size_t) * (idents.size() * 3)) + total_id_len; + } else { + appMetaSize = 3 * sizeof(size_t) + (sizeof(size_t) * idents.size()) + total_id_len; + } + const size_t maximumPayloadSize = ConnectionManager::getInstance().getConnectionById(1)->maxBytesInPayload(appMetaSize); + + size_t max_elems_per_chunk = ((maximumPayloadSize / idents.size()) / (sizeof(uint64_t) * 4)) * 4; + size_t currentBlockSize = max_elems_per_chunk; + + uint64_t sum = 0; + size_t baseOffset = 0; + + uint64_t* data_col_0 = reinterpret_cast(col_0->data); + uint64_t* data_col_1 = reinterpret_cast(col_1->data); + uint64_t* data_col_2 = reinterpret_cast(col_2->data); + + while (baseOffset < columnSize) { + const size_t elem_diff = columnSize - baseOffset; + if (elem_diff < currentBlockSize) { + currentBlockSize = elem_diff; + } + + wait_col_data_ready2(col_0, reinterpret_cast(data_col_0), currentBlockSize); + wait_col_data_ready2(col_1, reinterpret_cast(data_col_1), currentBlockSize); + wait_col_data_ready2(col_2, reinterpret_cast(data_col_2), currentBlockSize); + if (!stream) { + DataCatalog::getInstance().fetchPseudoPax(1, idents); + } + + std::vector le_idx = greater_than(col_2, 5, baseOffset, currentBlockSize, + less_than(col_1, 25, baseOffset, currentBlockSize, + between_incl(col_0, 10, 30, baseOffset, currentBlockSize, {}, false), false), + false); + + s_ts = std::chrono::high_resolution_clock::now(); + for (size_t idx : le_idx) { + sum += (data_col_0[idx] * data_col_2[idx]); + // ++sum; + } + workingTime += (std::chrono::high_resolution_clock::now() - s_ts); + + baseOffset += currentBlockSize; + data_col_0 += currentBlockSize; + data_col_1 += currentBlockSize; + data_col_2 += currentBlockSize; + } + + return sum; +} + template -uint64_t pipe_1(const uint64_t predicate, const std::vector idents) { +uint64_t pipe_1(const uint64_t, const std::vector idents) { col_t* col_0; col_t* col_1; col_t* col_2; @@ -1386,10 +1541,7 @@ void Benchmarks::execUPIBenchmark() { } } - { - using namespace std::chrono_literals; - std::this_thread::sleep_for(100ms); - } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); s_ts = std::chrono::high_resolution_clock::now(); sync_point_1.arrive_and_wait(); @@ -1410,10 +1562,8 @@ void Benchmarks::execUPIBenchmark() { std::for_each(localWorkers.begin(), localWorkers.end(), [](std::unique_ptr& t) { t->join(); }); std::for_each(remoteWorkers.begin(), remoteWorkers.end(), [](std::unique_ptr& t) { t->join(); }); - { - using namespace std::chrono_literals; - std::this_thread::sleep_for(100ms); - } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } } @@ -1517,10 +1667,7 @@ void Benchmarks::execRDMABenchmark() { } } - { - using namespace std::chrono_literals; - std::this_thread::sleep_for(100ms); - } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); s_ts = std::chrono::high_resolution_clock::now(); sync_point_1.arrive_and_wait(); @@ -1543,10 +1690,7 @@ void Benchmarks::execRDMABenchmark() { std::for_each(remoteWorkers.begin(), remoteWorkers.end(), [](std::unique_ptr& t) { t->join(); }); localWorkers.clear(); remoteWorkers.clear(); - { - using namespace std::chrono_literals; - std::this_thread::sleep_for(100ms); - } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } } @@ -1599,14 +1743,14 @@ void Benchmarks::execRDMAHashJoinBenchmark() { DataCatalog::getInstance().eraseAllRemoteColumns(); DataCatalog::getInstance().fetchRemoteInfo(); - auto do_work = [&](std::string ident, size_t rbc, size_t func_indicator) { + auto do_work = [&](std::string ident, size_t rbc, size_t func_id) { size_t res = 0; size_t (*func)(std::pair idents); - if (func_indicator == 1) { + if (func_id == 1) { func = hash_join_1; - } else if (func_indicator == 2) { + } else if (func_id == 2) { func = hash_join_2; - } else if (func_indicator == 3) { + } else if (func_id == 3) { func = hash_join_3; } else { return; @@ -1631,10 +1775,7 @@ void Benchmarks::execRDMAHashJoinBenchmark() { } } - { - using namespace std::chrono_literals; - std::this_thread::sleep_for(100ms); - } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); s_ts = std::chrono::high_resolution_clock::now(); sync_point_1.arrive_and_wait(); @@ -1654,10 +1795,7 @@ void Benchmarks::execRDMAHashJoinBenchmark() { std::for_each(workers.begin(), workers.end(), [](std::unique_ptr& t) { t->join(); }); workers.clear(); - { - using namespace std::chrono_literals; - std::this_thread::sleep_for(100ms); - } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } } @@ -1778,11 +1916,8 @@ void Benchmarks::execRDMAHashJoinPGBenchmark() { spawn_threads(kernel_pair.first, local_pin_list, worker_pool, &ready_future, local_buffer_cnt, join_cnt, &ready_workers, &complete_workers, &done_cv, &done_cv_lock, &all_done, result_out_ptr, time_out_ptr, "col_"); - { - using namespace std::chrono_literals; - while (ready_workers != local_buffer_cnt) { - std::this_thread::sleep_for(1ms); - } + while (ready_workers != local_buffer_cnt) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); } auto start = std::chrono::high_resolution_clock::now(); @@ -1902,11 +2037,8 @@ void Benchmarks::execRDMAHashJoinStarBenchmark() { spawn_threads(kernel_pair.first, local_pin_list, worker_pool, &ready_future, local_buffer_cnt, join_cnt, &ready_workers, &complete_workers, &done_cv, &done_cv_lock, &all_done, result_out_ptr, time_out_ptr, "tab_"); - { - using namespace std::chrono_literals; - while (ready_workers != local_buffer_cnt) { - std::this_thread::sleep_for(1ms); - } + while (ready_workers != local_buffer_cnt) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); } auto start = std::chrono::high_resolution_clock::now(); @@ -1942,10 +2074,264 @@ void Benchmarks::execRDMAHashJoinStarBenchmark() { } } +void Benchmarks::execChunkVsChunkStreamBenchmark() { + cpu_set_t cpuset; + constexpr size_t numWorkers = 8; + + auto in_time_t = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); + std::stringstream logNameStream; + logNameStream << "../logs/bench/" << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d-%H-%M-%S_") << "ChunkVsChunkStream.log"; + std::string logName = logNameStream.str(); + + LOG_INFO("[Task] Set name: " << logName << std::endl;) + + std::ofstream out; + out.open(logName, std::ios_base::app); + out << std::fixed << std::setprecision(7) << std::endl; + std::vector> idents(numWorkers); + + DataCatalog::getInstance().clear(true); + DataCatalog::getInstance().generateBenchmarkData(numWorkers * 3, 0, 20000000, 0, 0, 1, true, false); + + for (size_t i = 0; i < numWorkers * 3; ++i) { + idents[i % numWorkers].push_back("col_" + std::to_string(i)); + } + + std::chrono::_V2::system_clock::time_point s_ts; + std::chrono::_V2::system_clock::time_point e_ts; + + std::barrier sync_point(numWorkers + 1); + + std::vector> workers; + workers.reserve(numWorkers); + + for (size_t i = 0; i < 10; ++i) { + DataCatalog::getInstance().eraseAllRemoteColumns(); + reset_timer(); + DataCatalog::getInstance().fetchRemoteInfo(); + std::vector results(numWorkers, 0); + + auto doChunk = [&](const size_t tid) { + sync_point.arrive_and_wait(); + results[tid] = chunk(idents[tid]); + sync_point.arrive_and_wait(); + }; + + for (size_t tid = 0; tid < numWorkers; ++tid) { + workers.emplace_back(std::make_unique(doChunk, tid)); + CPU_ZERO(&cpuset); + CPU_SET(tid, &cpuset); + int rc = pthread_setaffinity_np(workers.back()->native_handle(), sizeof(cpu_set_t), &cpuset); + if (rc != 0) { + LOG_ERROR("Error calling pthread_setaffinity_np in copy_pool assignment: " << rc << std::endl;) + exit(-10); + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + sync_point.arrive_and_wait(); + s_ts = std::chrono::high_resolution_clock::now(); + sync_point.arrive_and_wait(); + e_ts = std::chrono::high_resolution_clock::now(); + + std::chrono::duration secs = e_ts - s_ts; + + uint64_t result = std::reduce(results.begin(), results.end()); + + out << "\tChunk\tStream\t" << OPTIMAL_BLOCK_SIZE << "\t" << result << "\t" << secs.count() + << "Waiting time total: " << std::chrono::duration_cast(waitingTime).count() << " (per thread " << std::chrono::duration_cast(waitingTime).count() / numWorkers << ")\t" + << "Working time total: " << std::chrono::duration_cast(workingTime).count() << " (per thread " << std::chrono::duration_cast(workingTime).count() / numWorkers <<")" << std::endl + << std::flush; + LOG_SUCCESS(std::fixed << std::setprecision(7) << "\tChunk\tStream\t" << OPTIMAL_BLOCK_SIZE << "\t" << result << "\t" << secs.count() << "\t" + << "Waiting time total: " << std::chrono::duration_cast(waitingTime).count() << " (per thread " << std::chrono::duration_cast(waitingTime).count() / numWorkers << ")\t" + << "Working time total: " << std::chrono::duration_cast(workingTime).count() << " (per thread " << std::chrono::duration_cast(workingTime).count() / numWorkers << ")" << std::endl;) + std::for_each(workers.begin(), workers.end(), [](std::unique_ptr& t) { t->join(); }); + workers.clear(); + } + + for (size_t i = 0; i < 10; ++i) { + DataCatalog::getInstance().eraseAllRemoteColumns(); + reset_timer(); + DataCatalog::getInstance().fetchRemoteInfo(); + std::vector results(numWorkers); + + auto doChunk = [&](const size_t tid) { + sync_point.arrive_and_wait(); + results[tid] = chunk(idents[tid]); + sync_point.arrive_and_wait(); + }; + + for (size_t tid = 0; tid < numWorkers; ++tid) { + workers.emplace_back(std::make_unique(doChunk, tid)); + CPU_ZERO(&cpuset); + CPU_SET(tid, &cpuset); + int rc = pthread_setaffinity_np(workers.back()->native_handle(), sizeof(cpu_set_t), &cpuset); + if (rc != 0) { + LOG_ERROR("Error calling pthread_setaffinity_np in copy_pool assignment: " << rc << std::endl;) + exit(-10); + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + sync_point.arrive_and_wait(); + s_ts = std::chrono::high_resolution_clock::now(); + sync_point.arrive_and_wait(); + e_ts = std::chrono::high_resolution_clock::now(); + + std::chrono::duration secs = e_ts - s_ts; + + auto result = std::reduce(results.begin(), results.end()); + + out << "\tChunk\tSingle\t" << OPTIMAL_BLOCK_SIZE << "\t" << result << "\t" << secs.count() + << "Waiting time total: " << std::chrono::duration_cast(waitingTime).count() << " (per thread " << std::chrono::duration_cast(waitingTime).count() / numWorkers << ")\t" + << "Working time total: " << std::chrono::duration_cast(workingTime).count() << " (per thread " << std::chrono::duration_cast(workingTime).count() / numWorkers <<")" << std::endl + << std::flush; + LOG_SUCCESS(std::fixed << std::setprecision(7) << "\tChunk\tSingle\t" << OPTIMAL_BLOCK_SIZE << "\t" << result << "\t" << secs.count() << "\t" + << "Waiting time total: " << std::chrono::duration_cast(waitingTime).count() << " (per thread " << std::chrono::duration_cast(waitingTime).count() / numWorkers << ")\t" + << "Working time total: " << std::chrono::duration_cast(workingTime).count() << " (per thread " << std::chrono::duration_cast(workingTime).count() / numWorkers << ")" << std::endl;) + std::for_each(workers.begin(), workers.end(), [](std::unique_ptr& t) { t->join(); }); + workers.clear(); + } + + out.close(); + + LOG_NOFORMAT(std::endl;) + LOG_INFO("Chunk vs ChunkStream Benchmark ended." << std::endl;) +} + +void Benchmarks::execPaxVsPaxStreamBenchmark() { + cpu_set_t cpuset; + constexpr size_t numWorkers = 8; + + auto in_time_t = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); + std::stringstream logNameStream; + logNameStream << "../logs/bench/" << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d-%H-%M-%S_") << "PaxVsPaxStream.log"; + std::string logName = logNameStream.str(); + + LOG_INFO("[Task] Set name: " << logName << std::endl;) + + std::ofstream out; + out.open(logName, std::ios_base::app); + out << std::fixed << std::setprecision(7) << std::endl; + std::vector> idents(numWorkers); + + DataCatalog::getInstance().clear(true); + DataCatalog::getInstance().generateBenchmarkData(numWorkers * 3, 0, 20000000, 0, 0, 1, true, false); + + for (size_t i = 0; i < numWorkers * 3; ++i) { + idents[i % numWorkers].push_back("col_" + std::to_string(i)); + } + + std::chrono::_V2::system_clock::time_point s_ts; + std::chrono::_V2::system_clock::time_point e_ts; + + std::barrier sync_point(numWorkers + 1); + + std::vector> workers; + workers.reserve(numWorkers); + + for (size_t i = 0; i < 10; ++i) { + DataCatalog::getInstance().eraseAllRemoteColumns(); + reset_timer(); + DataCatalog::getInstance().fetchRemoteInfo(); + std::vector results(numWorkers, 0); + + auto doPax = [&](const size_t tid) { + sync_point.arrive_and_wait(); + results[tid] = pax(idents[tid]); + sync_point.arrive_and_wait(); + }; + + for (size_t tid = 0; tid < numWorkers; ++tid) { + workers.emplace_back(std::make_unique(doPax, tid)); + CPU_ZERO(&cpuset); + CPU_SET(tid, &cpuset); + int rc = pthread_setaffinity_np(workers.back()->native_handle(), sizeof(cpu_set_t), &cpuset); + if (rc != 0) { + LOG_ERROR("Error calling pthread_setaffinity_np in copy_pool assignment: " << rc << std::endl;) + exit(-10); + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + sync_point.arrive_and_wait(); + s_ts = std::chrono::high_resolution_clock::now(); + sync_point.arrive_and_wait(); + e_ts = std::chrono::high_resolution_clock::now(); + + std::chrono::duration secs = e_ts - s_ts; + + uint64_t result = std::reduce(results.begin(), results.end()); + + out << "\tPax\tStream\t" << OPTIMAL_BLOCK_SIZE << "\t" << result << "\t" << secs.count() + << "Waiting time total: " << std::chrono::duration_cast(waitingTime).count() << " (per thread " << std::chrono::duration_cast(waitingTime).count() / numWorkers << ")\t" + << "Working time total: " << std::chrono::duration_cast(workingTime).count() << " (per thread " << std::chrono::duration_cast(workingTime).count() / numWorkers <<")" << std::endl + << std::flush; + LOG_SUCCESS(std::fixed << std::setprecision(7) << "\tPax\tStream\t" << OPTIMAL_BLOCK_SIZE << "\t" << result << "\t" << secs.count() << "\t" + << "Waiting time total: " << std::chrono::duration_cast(waitingTime).count() << " (per thread " << std::chrono::duration_cast(waitingTime).count() / numWorkers << ")\t" + << "Working time total: " << std::chrono::duration_cast(workingTime).count() << " (per thread " << std::chrono::duration_cast(workingTime).count() / numWorkers << ")" << std::endl;) + std::for_each(workers.begin(), workers.end(), [](std::unique_ptr& t) { t->join(); }); + workers.clear(); + } + + for (size_t i = 0; i < 10; ++i) { + DataCatalog::getInstance().eraseAllRemoteColumns(); + reset_timer(); + DataCatalog::getInstance().fetchRemoteInfo(); + std::vector results(numWorkers); + + auto pax_single = [&](const size_t tid) { + sync_point.arrive_and_wait(); + results[tid] = pax(idents[tid]); + sync_point.arrive_and_wait(); + }; + + for (size_t tid = 0; tid < numWorkers; ++tid) { + workers.emplace_back(std::make_unique(pax_single, tid)); + CPU_ZERO(&cpuset); + CPU_SET(tid, &cpuset); + int rc = pthread_setaffinity_np(workers.back()->native_handle(), sizeof(cpu_set_t), &cpuset); + if (rc != 0) { + LOG_ERROR("Error calling pthread_setaffinity_np in copy_pool assignment: " << rc << std::endl;) + exit(-10); + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + sync_point.arrive_and_wait(); + s_ts = std::chrono::high_resolution_clock::now(); + sync_point.arrive_and_wait(); + e_ts = std::chrono::high_resolution_clock::now(); + + std::chrono::duration secs = e_ts - s_ts; + + auto result = std::reduce(results.begin(), results.end()); + + out << "\tPax\tSingle\t" << OPTIMAL_BLOCK_SIZE << "\t" << result << "\t" << secs.count() + << "Waiting time total: " << std::chrono::duration_cast(waitingTime).count() << " (per thread " << std::chrono::duration_cast(waitingTime).count() / numWorkers << ")\t" + << "Working time total: " << std::chrono::duration_cast(workingTime).count() << " (per thread " << std::chrono::duration_cast(workingTime).count() / numWorkers <<")" << std::endl + << std::flush; + LOG_SUCCESS(std::fixed << std::setprecision(7) << "\tPax\tSingle\t" << OPTIMAL_BLOCK_SIZE << "\t" << result << "\t" << secs.count() << "\t" + << "Waiting time total: " << std::chrono::duration_cast(waitingTime).count() << " (per thread " << std::chrono::duration_cast(waitingTime).count() / numWorkers << ")\t" + << "Working time total: " << std::chrono::duration_cast(workingTime).count() << " (per thread " << std::chrono::duration_cast(workingTime).count() / numWorkers << ")" << std::endl;) + std::for_each(workers.begin(), workers.end(), [](std::unique_ptr& t) { t->join(); }); + workers.clear(); + } + + out.close(); + + LOG_NOFORMAT(std::endl;) + LOG_INFO("Pax vs PaxStream Benchmark ended." << std::endl;) +} + void Benchmarks::executeAllBenchmarks() { // auto in_time_t = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); // std::stringstream logNameStreamSW; - // logNameStreamSW << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d-%H-%M-%S_") << "AllBenchmarks_SW.log"; + // logNameStreamSW << "../logs/bench/" << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d-%H-%M-%S_") << "AllBenchmarks_SW.log"; // std::string logNameSW = logNameStreamSW.str(); // LOG_INFO("[Task] Set name: " << logNameSW << std::endl;) @@ -1982,6 +2368,9 @@ void Benchmarks::executeAllBenchmarks() { // execRDMAHashJoinBenchmark(); - execRDMAHashJoinPGBenchmark(); + // execRDMAHashJoinPGBenchmark(); // execRDMAHashJoinStarBenchmark(); + + execChunkVsChunkStreamBenchmark(); + execPaxVsPaxStreamBenchmark(); } \ No newline at end of file diff --git a/src/DataCatalog.cpp b/src/DataCatalog.cpp index cf01323..a29206e 100644 --- a/src/DataCatalog.cpp +++ b/src/DataCatalog.cpp @@ -1,16 +1,15 @@ -#include -#include -#include -#include -#include -#include +#include "DataCatalog.hpp" +#include +#include +#include #include #include "Benchmarks.hpp" +#include "Column.hpp" #include "Worker.hpp" -using namespace memordma; +using namespace memConnect; DataCatalog::DataCatalog() { auto createColLambda = [this]() -> void { @@ -174,7 +173,7 @@ DataCatalog::DataCatalog() { }; auto retrieveRemoteColsLambda = [this]() -> void { - ConnectionManager::getInstance().sendOpCode(1, static_cast(catalog_communication_code::send_column_info), true); + ConnectionManager::getInstance().getConnectionById(1)->sendOpcode(static_cast(catalog_communication_code::send_column_info)); }; auto logLambda = [this]() -> void { @@ -217,200 +216,6 @@ DataCatalog::DataCatalog() { } }; - // auto pseudoPaxLambda = [this]() -> void { - // col_t* ld = find_remote("lo_orderdate"); - - // if (ld == nullptr) { - // fetchRemoteInfo(); - // } - - // ld = find_remote("lo_orderdate"); - // col_t* lq = find_remote("lo_quantity"); - // col_t* le = find_remote("lo_extendedprice"); - - // std::cout << "[DataCatalog] Fetching PseudoPAX for lo_orderdate, lo_quantity, lo_extendedprice" << std::endl; - // fetchPseudoPax(1, {"lo_orderdate", "lo_quantity", "lo_extendedprice"}); - // }; - - // auto benchQueriesRemote = [this]() -> void { - // using namespace std::chrono_literals; - - // for (uint8_t num_rb = 2; num_rb <= 2; ++num_rb) { - // for (uint64_t bytes = 1ull << 18; bytes <= 1ull << 20; bytes <<= 1) { - // auto in_time_t = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); - // std::stringstream logNameStream; - // logNameStream << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d-%H-%M-%S_") << "QB_" << +num_rb << "_" << +bytes << "_Remote.log"; - // std::string logName = logNameStream.str(); - - // std::cout << "[Task] Set name: " << logName << std::endl; - - // buffer_config_t bufferConfig = {.num_own_send_threads = num_rb, - // .num_own_receive_threads = num_rb, - // .num_remote_send_threads = num_rb, - // .num_remote_receive_threads = num_rb, - // .num_own_receive = num_rb, - // .size_own_receive = bytes, - // .num_remote_receive = num_rb, - // .size_remote_receive = bytes, - // .num_own_send = num_rb, - // .size_own_send = bytes, - // .num_remote_send = num_rb, - // .size_remote_send = bytes, - // .meta_info_size = 16}; - - // ConnectionManager::getInstance().reconfigureBuffer(1, bufferConfig); - - // std::cout << "[main] Used connection with id '1' and " << +num_rb << " remote receive buffer (size for one remote receive: " << memordma::Utility::GetBytesReadable(bytes) << ")" << std::endl; - // std::cout << std::endl; - - // executeRemoteBenchmarkingQueries(logName); - // } - - // std::cout << std::endl; - // std::cout << "QueryBench ended." << std::endl; - // } - // }; - - // auto benchQueriesLocal = [this]() -> void { - // using namespace std::chrono_literals; - - // auto in_time_t = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); - // std::stringstream logNameStream; - // logNameStream << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d-%H-%M-%S_") << "Local.log"; - // std::string logName = logNameStream.str(); - - // std::cout << "[Task] Set name: " << logName << std::endl; - - // executeLocalBenchmarkingQueries(logName, "Local"); - - // std::cout << std::endl; - // std::cout << "NUMAQueryBench ended." << std::endl; - // }; - - // auto benchQueriesNUMA = [this]() -> void { - // using namespace std::chrono_literals; - - // auto in_time_t = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); - // std::stringstream logNameStream; - // logNameStream << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d-%H-%M-%S_") << "NUMA.log"; - // std::string logName = logNameStream.str(); - - // std::cout << "[Task] Set name: " << logName << std::endl; - - // executeLocalBenchmarkingQueries(logName, "NUMA"); - - // std::cout << std::endl; - // std::cout << "NUMAQueryBench ended." << std::endl; - // }; - - // auto benchQueriesFrontPage = [this]() -> void { - // using namespace std::chrono_literals; - // uint8_t num_rb = 2; - // uint64_t bytes = 1ull << 19; - - // auto in_time_t = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); - // std::stringstream logNameStream; - // logNameStream << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d-%H-%M-%S_") << "FP_" << +num_rb << "_" << +bytes << "_Remote.log"; - // std::string logName = logNameStream.str(); - - // std::cout << "[Task] Set name: " << logName << std::endl; - - // buffer_config_t bufferConfig = {.num_own_send_threads = num_rb, - // .num_own_receive_threads = num_rb, - // .num_remote_send_threads = num_rb, - // .num_remote_receive_threads = num_rb, - // .num_own_receive = num_rb, - // .size_own_receive = bytes, - // .num_remote_receive = num_rb, - // .size_remote_receive = bytes, - // .num_own_send = num_rb, - // .size_own_send = bytes, - // .num_remote_send = num_rb, - // .size_remote_send = bytes, - // .meta_info_size = 16}; - - // ConnectionManager::getInstance().reconfigureBuffer(1, bufferConfig); - - // std::cout << "[main] Used connection with id '1' and " << +num_rb << " remote receive buffer (size for one remote receive: " << memordma::Utility::GetBytesReadable(bytes) << ")" << std::endl; - // std::cout << std::endl; - - // executeFrontPageBenchmarkingQueries(logName); - - // std::cout << std::endl; - // std::cout << "QueryBench ended." << std::endl; - // }; - - // auto benchQueriesRemoteMT = [this]() -> void { - // using namespace std::chrono_literals; - - // for (uint8_t num_rb = 2; num_rb <= 2; ++num_rb) { - // for (uint64_t bytes = 1ull << 18; bytes <= 1ull << 20; bytes <<= 1) { - // auto in_time_t = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); - // std::stringstream logNameStream; - // logNameStream << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d-%H-%M-%S_") << "QB-MT_" << +num_rb << "_" << +bytes << "_Remote.log"; - // std::string logName = logNameStream.str(); - - // std::cout << "[Task] Set name: " << logName << std::endl; - - // buffer_config_t bufferConfig = {.num_own_send_threads = num_rb, - // .num_own_receive_threads = num_rb, - // .num_remote_send_threads = num_rb, - // .num_remote_receive_threads = num_rb, - // .num_own_receive = num_rb, - // .size_own_receive = bytes, - // .num_remote_receive = num_rb, - // .size_remote_receive = bytes, - // .num_own_send = num_rb, - // .size_own_send = bytes, - // .num_remote_send = num_rb, - // .size_remote_send = bytes, - // .meta_info_size = 16}; - - // ConnectionManager::getInstance().reconfigureBuffer(1, bufferConfig); - - // std::cout << "[main] Used connection with id '1' and " << +num_rb << " remote receive buffer (size for one remote receive: " << memordma::Utility::GetBytesReadable(bytes) << ")" << std::endl; - // std::cout << std::endl; - - // executeRemoteMTBenchmarkingQueries(logName); - // } - - // std::cout << std::endl; - // std::cout << "QueryBench ended." << std::endl; - // } - // }; - - // auto benchQueriesLocalMT = [this]() -> void { - // using namespace std::chrono_literals; - - // auto in_time_t = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); - // std::stringstream logNameStream; - // logNameStream << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d-%H-%M-%S_") << "QB-MT_Local.log"; - // std::string logName = logNameStream.str(); - - // std::cout << "[Task] Set name: " << logName << std::endl; - - // executeLocalMTBenchmarkingQueries(logName, "Local"); - - // std::cout << std::endl; - // std::cout << "NUMAQueryBench ended." << std::endl; - // }; - - // auto benchQueriesNUMAMT = [this]() -> void { - // using namespace std::chrono_literals; - - // auto in_time_t = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); - // std::stringstream logNameStream; - // logNameStream << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d-%H-%M-%S_") << "QB-MT_NUMA.log"; - // std::string logName = logNameStream.str(); - - // std::cout << "[Task] Set name: " << logName << std::endl; - - // executeLocalMTBenchmarkingQueries(logName, "NUMA"); - - // std::cout << std::endl; - // std::cout << "NUMAQueryBench ended." << std::endl; - // }; - auto benchmarksAllLambda = [this]() -> void { Benchmarks::getInstance().executeAllBenchmarks(); }; @@ -420,23 +225,15 @@ DataCatalog::DataCatalog() { TaskManager::getInstance().registerTask(std::make_shared("printColHead", "[DataCatalog] Print first 10 values of column", printColLambda)); TaskManager::getInstance().registerTask(std::make_shared("retrieveRemoteCols", "[DataCatalog] Ask for remote columns", retrieveRemoteColsLambda)); TaskManager::getInstance().registerTask(std::make_shared("logColumn", "[DataCatalog] Log a column to file", logLambda)); - // TaskManager::getInstance().registerTask(std::make_shared("benchmarkRemote", "[DataCatalog] Execute Single Pipeline Remote", benchQueriesRemote)); - // TaskManager::getInstance().registerTask(std::make_shared("benchmarkLocal", "[DataCatalog] Execute Single Pipeline Local", benchQueriesLocal)); - // TaskManager::getInstance().registerTask(std::make_shared("benchmarkNUMA", "[DataCatalog] Execute Single Pipeline NUMA", benchQueriesNUMA)); - // TaskManager::getInstance().registerTask(std::make_shared("benchmarkFrontPage", "[DataCatalog] Execute Pipeline FrontPage", benchQueriesFrontPage)); - // TaskManager::getInstance().registerTask(std::make_shared("benchmarkMTMP", "[DataCatalog] Execute Multi Pipeline Remote", benchQueriesRemoteMT)); - // TaskManager::getInstance().registerTask(std::make_shared("benchmarkLocalMTMP", "[DataCatalog] Execute Multi Pipeline MT Local", benchQueriesLocalMT)); - // TaskManager::getInstance().registerTask(std::make_shared("benchmarkNUMAMTMP", "[DataCatalog] Execute Multi Pipeline MT NUMA", benchQueriesNUMAMT)); TaskManager::getInstance().registerTask(std::make_shared("benchmarksAll", "[DataCatalog] Execute All Benchmarks", benchmarksAllLambda)); TaskManager::getInstance().registerTask(std::make_shared("itTest", "[DataCatalog] IteratorTest", iteratorTestLambda)); - // TaskManager::getInstance().registerTask(std::make_shared("pseudoPaxTest", "[DataCatalog] PseudoPaxTest", pseudoPaxLambda)); /* Message Layout * [ header_t | payload ] * Payload layout * [ columnInfoCount | [col_network_info, identLength, ident]* ] */ - CallbackFunction cb_sendInfo = [this](const size_t conId, const ReceiveBuffer* rcv_buffer, const std::_Bind reset_buffer) -> void { + CallbackFunction cb_sendInfo = [this](const size_t conId, const ReceiveBuffer*, const std::_Bind reset_buffer) -> void { reset_buffer(); const uint8_t code = static_cast(catalog_communication_code::receive_column_info); const size_t columnCount = cols.size(); @@ -472,7 +269,7 @@ DataCatalog::DataCatalog() { ConnectionManager::getInstance().sendData(conId, data, totalPayloadSize, nullptr, 0, code); // Release temporary buffer - numa_free(reinterpret_cast(data), totalPayloadSize); + numa_free(data, totalPayloadSize); }; /* Message Layout @@ -481,7 +278,7 @@ DataCatalog::DataCatalog() { * [ columnInfoCount | [col_network_info, identLength, ident]* ] */ CallbackFunction cb_receiveInfo = [this](const size_t conId, const ReceiveBuffer* rcv_buffer, const std::_Bind reset_buffer) -> void { - char* data = rcv_buffer->getPayloadBasePtr(); + char* data = rcv_buffer->getPayloadBasePtr(); size_t colCnt; memcpy(&colCnt, data, sizeof(size_t)); @@ -530,7 +327,7 @@ DataCatalog::DataCatalog() { std::cin.clear(); std::cin.ignore(10000, '\n'); - LOG_CONSOLE("Fetch mode [1] whole column [2] (next) chunk" << std::endl;) + LOG_CONSOLE("Fetch mode [1] whole column [2] (next) chunk [3] as stream" << std::endl;) size_t mode; std::cin >> mode; std::cin.clear(); @@ -540,12 +337,13 @@ DataCatalog::DataCatalog() { switch (mode) { case 1: { fetchColStub(conId, ident, true); - break; - } + } break; case 2: { fetchColStub(conId, ident, false); - break; - } + } break; + case 3: { + fetchColStub(conId, ident, false, true); + } break; default: { LOG_WARNING("[DataCatalog] No valid value selected, aborting." << std::endl;) return; @@ -570,7 +368,7 @@ DataCatalog::DataCatalog() { * [ columnNameLength, columnName ] */ CallbackFunction cb_fetchCol = [this](const size_t conId, const ReceiveBuffer* rcv_buffer, const std::_Bind reset_buffer) -> void { - char* data = rcv_buffer->getPayloadBasePtr(); + char* data = rcv_buffer->getPayloadBasePtr(); size_t identSz; memcpy(&identSz, data, sizeof(size_t)); @@ -589,7 +387,7 @@ DataCatalog::DataCatalog() { * [ header_t | ident_len, ident, col_data_type | col_data ] */ const size_t appMetaSize = sizeof(size_t) + identSz + sizeof(col_data_t); - char* appMetaData = (char*)malloc(appMetaSize); + char* appMetaData = reinterpret_cast(malloc(appMetaSize)); char* tmp = appMetaData; memcpy(tmp, &identSz, sizeof(size_t)); @@ -600,7 +398,7 @@ DataCatalog::DataCatalog() { memcpy(tmp, &col->second->datatype, sizeof(col_data_t)); - ConnectionManager::getInstance().sendData(conId, (char*)col->second->data, col->second->sizeInBytes, appMetaData, appMetaSize, static_cast(catalog_communication_code::receive_column_data)); + ConnectionManager::getInstance().sendData(conId, reinterpret_cast(col->second->data), col->second->sizeInBytes, appMetaData, appMetaSize, static_cast(catalog_communication_code::receive_column_data)); free(appMetaData); } @@ -609,13 +407,13 @@ DataCatalog::DataCatalog() { /* Message Layout * [ header_t | ident_len, ident, col_data_type | col_data ] */ - CallbackFunction cb_receiveCol = [this](const size_t conId, const ReceiveBuffer* rcv_buffer, const std::_Bind reset_buffer) -> void { + CallbackFunction cb_receiveCol = [this](const size_t, const ReceiveBuffer* rcv_buffer, const std::_Bind reset_buffer) -> void { // Package header - package_t::header_t* head = reinterpret_cast(rcv_buffer->getFooterPtr()); + package_t::header_t* head = rcv_buffer->getFooterPtr(); // Start of AppMetaData - char* data = rcv_buffer->getAppMetaPtr(); + char* data = rcv_buffer->getAppMetaPtr(); // Actual column data payload - char* column_data = rcv_buffer->getPayloadBasePtr(); + char* column_data = rcv_buffer->getPayloadBasePtr(); size_t identSz; memcpy(&identSz, data, sizeof(size_t)); @@ -695,7 +493,7 @@ DataCatalog::DataCatalog() { // Send a chunk of a column to the requester CallbackFunction cb_fetchColChunk = [this](const size_t conId, const ReceiveBuffer* rcv_buffer, const std::_Bind reset_buffer) -> void { // package_t::header_t* head = reinterpret_cast(rcv_buffer->buf); - char* data = rcv_buffer->getPayloadBasePtr(); + char* data = rcv_buffer->getPayloadBasePtr(); // char* column_data = data + head->payload_start; size_t identSz; @@ -736,7 +534,7 @@ DataCatalog::DataCatalog() { * [ header_t | chunk_offset ident_len, ident, col_data_type | col_data ] */ const size_t appMetaSize = sizeof(size_t) + sizeof(size_t) + identSz + sizeof(col_data_t); - char* appMetaData = (char*)malloc(appMetaSize); + char* appMetaData = reinterpret_cast(malloc(appMetaSize)); char* tmp = appMetaData; // Write chunk offset relative to column start into meta data @@ -773,14 +571,14 @@ DataCatalog::DataCatalog() { /* Message Layout * [ header_t | chunk_offset ident_len, ident, col_data_type | col_data ] */ - CallbackFunction cb_receiveColChunk = [this](const size_t conId, const ReceiveBuffer* rcv_buffer, const std::_Bind reset_buffer) -> void { + CallbackFunction cb_receiveColChunk = [this](const size_t, const ReceiveBuffer* rcv_buffer, const std::_Bind reset_buffer) -> void { // std::cout << "[DataCatalog] Received a message with a (part of a) column chnunk." << std::endl; // Package header - package_t::header_t* head = reinterpret_cast(rcv_buffer->getFooterPtr()); + package_t::header_t* head = reinterpret_cast(rcv_buffer->getFooterPtr()); // Start of AppMetaData - char* data = rcv_buffer->getAppMetaPtr(); + char* data = rcv_buffer->getAppMetaPtr(); // Actual column data payload - char* column_data = rcv_buffer->getPayloadBasePtr(); + char* column_data = rcv_buffer->getPayloadBasePtr(); size_t chunk_offset; memcpy(&chunk_offset, data, sizeof(size_t)); @@ -845,13 +643,121 @@ DataCatalog::DataCatalog() { reset_buffer(); }; + CallbackFunction cb_fetchColAsStream = [this](const size_t conId, const ReceiveBuffer* rcv_buffer, const std::_Bind reset_buffer) -> void { + char* data = rcv_buffer->getPayloadBasePtr(); + + size_t identSz; + memcpy(&identSz, data, sizeof(size_t)); + data += sizeof(size_t); + + std::string ident(data, identSz); + + auto col = cols.find(ident); + + // LOG_DEBUG2("[DataCatalog] Remote requested data for column '" << ident << "' with ident len " << identSz << " and CS " << col->second->calc_checksum() << std::endl;) + + reset_buffer(); + + if (col != cols.end()) { + /* Message Layout + * [ header_t | ident_len, ident, col_data_type | col_data ] + */ + const size_t appMetaSize = sizeof(size_t) + identSz + sizeof(col_data_t); + char* appMetaData = reinterpret_cast(malloc(appMetaSize)); + char* tmp = appMetaData; + + memcpy(tmp, &identSz, sizeof(size_t)); + tmp += sizeof(size_t); + + memcpy(tmp, ident.c_str(), identSz); + tmp += identSz; + + memcpy(tmp, &col->second->datatype, sizeof(col_data_t)); + + ConnectionManager::getInstance().sendData(conId, reinterpret_cast(col->second->data), col->second->sizeInBytes, appMetaData, appMetaSize, static_cast(catalog_communication_code::receive_column_as_stream)); + + free(appMetaData); + } + }; + + /* Message Layout + * [ header_t | ident_len, ident, col_data_type | col_data ] + */ + CallbackFunction cb_receiveColAsStream = [this](const size_t, const ReceiveBuffer* rcv_buffer, const std::_Bind reset_buffer) -> void { + // Package footer + package_t::header_t* footer = rcv_buffer->getFooterPtr(); + // Start of AppMetaData + char* data = rcv_buffer->getAppMetaPtr(); + // Actual column data payload + char* column_data = rcv_buffer->getPayloadBasePtr(); + + size_t identSz; + memcpy(&identSz, data, sizeof(size_t)); + data += sizeof(size_t); + + std::string ident(data, identSz); + data += identSz; + + col_data_t data_type; + memcpy(&data_type, data, sizeof(col_data_t)); + + std::unique_lock lk(remote_info_lock); + auto col = find_remote(ident); + auto col_network_info_iterator = remote_col_info.find(ident); + lk.unlock(); + + // Column object already created? + if (col == nullptr) { + // No Col object, did we even fetch remote info beforehand? + if (col_network_info_iterator != remote_col_info.end()) { + col = add_remote_column(ident, col_network_info_iterator->second); + } else { + LOG_WARNING("[DataCatalog] No Network info for received column " << ident << ", fetch column info first -- discarding message" << std::endl;) + return; + } + } + + // Write currently received data to the column object + col->append_chunk(footer->payload_position_offset, footer->current_payload_size, column_data); + col->arrived.insert({reinterpret_cast(footer->package_number), reinterpret_cast(footer->current_payload_size)}); + // Update network info struct to check if we received all data + // lk.lock(); + std::lock_guard lg(col->iteratorLock); + col_network_info_iterator->second.received_bytes += footer->current_payload_size; + + if (col_network_info_iterator->second.check_complete()) { + // Arrival of last package + col->advance_end_pointer(col->sizeInBytes - (reinterpret_cast(col->current_end) - reinterpret_cast(col->data))); + col->is_complete = true; + col->arrived.clear(); + col->highestConsecPackArrived = 0; + ++col->received_chunks; + } else { + if (col->highestConsecPackArrived == footer->package_number - 1) { + uint64_t runningSum = footer->current_payload_size; + for (size_t k = footer->package_number + 1; k < col->sizeInBytes; ++k) { // col->sizeInBytes is a very loose upper bound for very worst case of 1 Byte messages -> necessary because messages can have different sizes + if (col->arrived.contains(k)) { + runningSum += col->arrived[k]; + } else { + col->highestConsecPackArrived = k - 1; + break; + } + } + col->advance_end_pointer(runningSum); + } + } + + // LOG_DEBUG2(ident << "\t" << footer->package_number << "\t" << footer->payload_position_offset << "\t" << footer->total_data_size << "\t" << col_network_info_iterator->second.received_bytes << std::endl;) + reset_buffer(); + }; + /* Extract column name and prepare sending its data * Message Layout * [ header_t | col_cnt | [col_ident_size]+ | [col_ident]+ ] */ CallbackFunction cb_fetchPseudoPax = [this](const size_t conId, const ReceiveBuffer* rcv_buffer, const std::_Bind reset_buffer) -> void { // package_t::header_t* head = reinterpret_cast(rcv_buffer->buf); - char* data = rcv_buffer->getPayloadBasePtr(); + char* data = rcv_buffer->getPayloadBasePtr(); size_t* ident_lens = reinterpret_cast(data); // Advance data to the first ident character @@ -885,7 +791,6 @@ DataCatalog::DataCatalog() { std::string global_ident; global_ident.reserve(total_id_len + idents.size() - 1); { - size_t offset = 0; const char delim = '-'; for (size_t i = 0; i < idents.size(); ++i) { const auto& id = idents[i]; @@ -931,7 +836,7 @@ DataCatalog::DataCatalog() { const size_t ident_metainfo_size = (1 + idents.size()) * sizeof(size_t) + total_id_len; // std::cout << "Init info->metadata_buf " << appMetaSize << " Bytes" << std::endl; - info->metadata_buf = (char*)malloc(appMetaSize); + info->metadata_buf = reinterpret_cast(malloc(appMetaSize)); char* tmp = info->metadata_buf; tmp += sizeof(size_t); // Placeholder for offset, later. tmp += sizeof(size_t); // Placeholder for bytes_per_column, later. @@ -946,10 +851,10 @@ DataCatalog::DataCatalog() { } info->offset_lock.unlock(); - auto prepare_pax = [](pax_inflight_col_info_t* my_info, size_t conId) -> void { + auto prepare_pax = [](pax_inflight_col_info_t* my_info, size_t _conId) -> void { my_info->offset_lock.lock(); if (my_info->payload_buf == nullptr) { - my_info->payload_buf = (char*)malloc(my_info->cols[0]->sizeInBytes * my_info->cols.size()); + my_info->payload_buf = reinterpret_cast(malloc(my_info->cols[0]->sizeInBytes * my_info->cols.size())); } else { my_info->offset_lock.unlock(); return; @@ -963,7 +868,7 @@ DataCatalog::DataCatalog() { size_t curr_payload_offset = 0; using element_type = uint64_t; - const size_t maximumPayloadSize = ConnectionManager::getInstance().getConnectionById(conId)->maxBytesInPayload(my_info->metadata_size); + const size_t maximumPayloadSize = ConnectionManager::getInstance().getConnectionById(_conId)->maxBytesInPayload(my_info->metadata_size); const size_t max_bytes_per_column = ((maximumPayloadSize / my_info->cols.size()) / (sizeof(element_type) * 4)) * 4 * sizeof(element_type); // std::cout << "Preparing " << my_info->cols[0]->sizeInBytes * my_info->cols.size() << " Bytes of data" << std::endl; @@ -979,26 +884,27 @@ DataCatalog::DataCatalog() { // std::cout << curr_payload_offset << " " << data_left_to_write << " " << bytes_per_column << " " << bytes_in_payload << std::endl; for (auto cur_col : my_info->cols) { - // std::cout << "Writing " << bytes_per_column << " Bytes for " << cur_col->ident << std::endl; + // LOG_DEBUG2("Writing " << bytes_per_column << " Bytes for " << cur_col->ident << std::endl;) const char* col_data = reinterpret_cast(cur_col->data) + curr_col_offset; memcpy(tmp, col_data, bytes_per_column); tmp += bytes_per_column; written_bytes += bytes_per_column; } + data_left_to_write -= bytes_per_column; + my_info->offset_lock.lock(); + if (data_left_to_write == 0) { + my_info->prepare_complete = true; + } my_info->prepared_offsets.push({curr_payload_offset, bytes_in_payload}); my_info->offset_cv.notify_one(); my_info->offset_lock.unlock(); curr_col_offset += bytes_per_column; curr_payload_offset += bytes_per_column * my_info->cols.size(); - data_left_to_write -= bytes_per_column; } - my_info->offset_lock.lock(); - my_info->prepare_complete = true; - my_info->offset_lock.unlock(); - // std::cout << "Prepared all messages, written Bytes: " << written_bytes << std::endl; + // LOG_DEBUG2("Prepared all messages, written Bytes: " << written_bytes << std::endl;) }; info->offset_lock.lock(); @@ -1012,14 +918,11 @@ DataCatalog::DataCatalog() { std::unique_lock lk(info->offset_lock); info->offset_cv.wait(lk, [info] { return !info->prepared_offsets.empty(); }); - // Semantically we want to "unlock" after waiting, yet this is a performance gain to not do it. - // lk.unlock(); - // lk.lock(); auto offset_size_pair = info->prepared_offsets.front(); info->prepared_offsets.pop(); lk.unlock(); - char* tmp_meta = (char*)malloc(info->metadata_size); + char* tmp_meta = reinterpret_cast(malloc(info->metadata_size)); char* tmp = tmp_meta; memcpy(tmp, info->metadata_buf, info->metadata_size); @@ -1036,21 +939,19 @@ DataCatalog::DataCatalog() { info->reset(); } } else { + LOG_WARNING("Not all requested columns are present! Handling not implemented yet." << std::endl;) reset_buffer(); } - paxInflightLock.unlock(); }; /* Message Layout * [ header_t | chunk_offset bytes_per_column col_cnt [ident_len]+, [ident] | [payload] ] */ - CallbackFunction cb_receivePseudoPax = [this](const size_t conId, const ReceiveBuffer* rcv_buffer, const std::_Bind reset_buffer) -> void { - // Package header - package_t::header_t* head = reinterpret_cast(rcv_buffer->getFooterPtr()); + CallbackFunction cb_receivePseudoPax = [this](const size_t, const ReceiveBuffer* rcv_buffer, const std::_Bind reset_buffer) -> void { // Start of AppMetaData - char* data = rcv_buffer->getAppMetaPtr(); + char* data = rcv_buffer->getAppMetaPtr(); // Start of actual payload - char* pax_ptr = rcv_buffer->getPayloadBasePtr(); + char* pax_ptr = rcv_buffer->getPayloadBasePtr(); size_t chunk_offset; memcpy(&chunk_offset, data, sizeof(size_t)); @@ -1068,32 +969,28 @@ DataCatalog::DataCatalog() { std::vector idents; idents.reserve(ident_infos[0]); - // std::cout << "[PseudoPax] message with " << bytes_per_column << " bytes per column for columns: " << std::flush; for (size_t i = 0; i < ident_infos[0]; ++i) { idents.emplace_back(data, ident_infos[i + 1]); data += ident_infos[i + 1]; - // std::cout << idents.back() << " "; } - // std::cout << std::endl; - std::vector remote_cols; - remote_cols.reserve(idents.size()); + std::vector remCols; + remCols.reserve(idents.size()); bool allPresent = true; for (auto& id : idents) { auto remote_col_it = find_remote(id); allPresent &= remote_col_it != nullptr; - remote_cols.push_back(remote_col_it); + remCols.push_back(remote_col_it); } - for (auto col : remote_cols) { + for (auto col : remCols) { auto col_network_info_iterator = remote_col_info.find(col->ident); if (col_network_info_iterator == remote_col_info.end()) { - // std::cout << "[PseudoPax] No Network info for received column " << col->ident << ", fetch column info first -- discarding message" << std::endl; + LOG_WARNING("[PseudoPax] No Network info for received column " << col->ident << ", fetch column info first -- discarding message" << std::endl;) return; } const size_t current_offset = col_network_info_iterator->second.received_bytes; - // std::cout << "Current offset for col (" << col << ") " << col->ident << ": " << current_offset << std::endl; col->append_chunk(current_offset, bytes_per_column, pax_ptr); pax_ptr += bytes_per_column; @@ -1104,7 +1001,6 @@ DataCatalog::DataCatalog() { col->advance_end_pointer(bytes_per_column); if (col_network_info_iterator->second.check_complete()) { col->is_complete = true; - // std::cout << "[PseudoPax] Received all data for column: " << col->ident << std::endl; } ++col->received_chunks; } @@ -1112,9 +1008,134 @@ DataCatalog::DataCatalog() { reset_buffer(); }; - CallbackFunction cb_reconfigureChunkSize = [this](const size_t conId, const ReceiveBuffer* rcv_buffer, const std::_Bind reset_buffer) -> void { + /* Extract column name and prepare sending its data + * Message Layout + * [ col_cnt | [col_ident_size]+ | [col_ident]+ | footer | opcode] + */ + CallbackFunction cb_fetchPseudoPaxStream = [this](const size_t conId, const ReceiveBuffer* rcv_buffer, const std::_Bind reset_buffer) -> void { // package_t::header_t* head = reinterpret_cast(rcv_buffer->buf); - char* data = rcv_buffer->getPayloadBasePtr(); + char* data = rcv_buffer->getPayloadBasePtr(); + size_t* ident_lens = reinterpret_cast(data); + + size_t colCount = ident_lens[0]; + + // Advance ident pointer behind colCount + ident_lens += 1; + + // Advance data to the first ident character + data += (colCount + 1) * sizeof(size_t); + + std::vector col_its(colCount); + size_t total_id_len = 0; + bool allPresent = true; // All columns present? + for (size_t i = 0; i < colCount; ++i) { + std::string ident(data, ident_lens[i]); + data += ident_lens[i]; + + auto col_info_it = cols.find(ident); + allPresent &= col_info_it != cols.end(); + col_its[i] = col_info_it; + total_id_len += ident.size(); + } + + const size_t metaSize = sizeof(size_t) * colCount + total_id_len; + char* meta = reinterpret_cast(std::malloc(metaSize)); + std::memcpy(meta, ident_lens, metaSize); + + reset_buffer(); + + // All columns are available + if (allPresent) { + std::vector dataPtrs(colCount); + std::vector dataSizes(colCount, 0); + + size_t i = 0; + for (auto col_it : col_its) { + dataPtrs[i] = reinterpret_cast(col_it->second->data); + dataSizes[i] = col_it->second->sizeInBytes; + + ++i; + } + + ConnectionManager::getInstance().sendData(conId, dataPtrs, dataSizes, meta, metaSize, static_cast(catalog_communication_code::receive_pseudo_pax_stream)); + free(meta); + } + // TODO: we need some kind of handling for allPresent==false! + }; + + /* Message Layout + * [ payload | col_cnt [chunk_offsets] [bytes_per_column] [col_ident_size]+ [col_ident]+ | header | opcode ] + */ + CallbackFunction cb_receivePseudoPaxStream = [this](const size_t, const ReceiveBuffer* rcv_buffer, const std::_Bind reset_buffer) -> void { + // Start of AppMetaData + char* meta = rcv_buffer->getAppMetaPtr(); + size_t* tmpMeta = reinterpret_cast(meta); + // Start of actual payload + char* data = rcv_buffer->getPayloadBasePtr(); + // Start of the footer + package_t::header_t* footer = rcv_buffer->getFooterPtr(); + + const size_t colCount = tmpMeta[0]; + + char* identPtr = meta + (sizeof(size_t) * (1 + (3 * colCount))); + + for (size_t i = 0; i < colCount; ++i) { + const size_t offset = tmpMeta[i + 1]; + const size_t bytes = tmpMeta[i + colCount + 1]; + const std::string ident(identPtr, tmpMeta[i + (colCount * 2) + 1]); + identPtr += tmpMeta[i + (colCount * 2) + 1]; + + auto col = find_remote(ident); + if (!col) { + LOG_WARNING("[PseudoPaxStream] No info for received column " << ident << ", fetch column info first -- discarding message" << std::endl;) + return; + } + + auto col_network_info_iterator = remote_col_info.find(ident); + if (col_network_info_iterator == remote_col_info.end()) { + LOG_WARNING("[PseudoPaxStream] No Network info for received column " << ident << ", fetch column info first -- discarding message" << std::endl;) + return; + } + + col->append_chunk(offset, bytes, data); + data += bytes; + + std::lock_guard lk(col->iteratorLock); + + col->arrived.insert({reinterpret_cast(footer->package_number), bytes}); + + // Update network info struct to check if we received all data + col_network_info_iterator->second.received_bytes += bytes; + + if (col_network_info_iterator->second.check_complete()) { + // Arrival of last package + col->advance_end_pointer(col->sizeInBytes - (reinterpret_cast(col->current_end) - reinterpret_cast(col->data))); + col->is_complete = true; + col->arrived.clear(); + col->highestConsecPackArrived = 0; + } else { + if (col->highestConsecPackArrived == footer->package_number - 1) { + uint64_t runningSum = bytes; + for (size_t k = footer->package_number + 1; k < col->sizeInBytes; ++k) { // col->sizeInBytes is a very loose upper bound for very worst case of 1 Byte messages -> necessary because messages can have different sizes + if (col->arrived.contains(k)) { + runningSum += col->arrived[k]; + } else { + col->highestConsecPackArrived = k - 1; + break; + } + } + col->advance_end_pointer(runningSum); + } + } + ++col->received_chunks; + } + + reset_buffer(); + }; + + CallbackFunction cb_reconfigureChunkSize = [this](const size_t, const ReceiveBuffer* rcv_buffer, const std::_Bind reset_buffer) -> void { + // package_t::header_t* head = reinterpret_cast(rcv_buffer->buf); + char* data = rcv_buffer->getPayloadBasePtr(); uint64_t newChunkSize; memcpy(&newChunkSize, data, sizeof(uint64_t)); @@ -1131,37 +1152,37 @@ DataCatalog::DataCatalog() { dataCatalog_chunkThreshold = newChunkThreshold > 0 ? newChunkThreshold : newChunkSize; } - ConnectionManager::getInstance().sendOpCode(1, static_cast(catalog_communication_code::ack_reconfigure_chunk_size), true); + ConnectionManager::getInstance().getConnectionById(1)->sendOpcode(static_cast(catalog_communication_code::ack_reconfigure_chunk_size)); }; - CallbackFunction cb_ackReconfigureChunkSize = [this](const size_t conId, const ReceiveBuffer* rcv_buffer, const std::_Bind reset_buffer) -> void { + CallbackFunction cb_ackReconfigureChunkSize = [this](const size_t, const ReceiveBuffer*, const std::_Bind reset_buffer) -> void { reset_buffer(); std::lock_guard lk(reconfigure_lock); reconfigured = true; reconfigure_done.notify_all(); }; - CallbackFunction cb_generateBenchmarkData = [this](const size_t conId, const ReceiveBuffer* rcv_buffer, const std::_Bind reset_buffer) -> void { - uint64_t* data = reinterpret_cast(rcv_buffer->getPayloadBasePtr()); + CallbackFunction cb_generateBenchmarkData = [this](const size_t, const ReceiveBuffer* rcv_buffer, const std::_Bind reset_buffer) -> void { + uint64_t* data = rcv_buffer->getPayloadBasePtr(); bool createTables = *reinterpret_cast(reinterpret_cast(data) + (sizeof(uint64_t) * 6)); generateBenchmarkData(data[0], data[1], data[2], data[3], data[4], data[5], false, createTables); reset_buffer(); }; - CallbackFunction cb_ackGenerateBenchmarkData = [this](const size_t conId, const ReceiveBuffer* rcv_buffer, const std::_Bind reset_buffer) -> void { + CallbackFunction cb_ackGenerateBenchmarkData = [this](const size_t, const ReceiveBuffer*, const std::_Bind reset_buffer) -> void { reset_buffer(); std::lock_guard lk(dataGenerationLock); dataGenerationDone = true; data_generation_done.notify_all(); }; - CallbackFunction cb_clearCatalog = [this](const size_t conId, const ReceiveBuffer* rcv_buffer, const std::_Bind reset_buffer) -> void { + CallbackFunction cb_clearCatalog = [this](const size_t, const ReceiveBuffer*, const std::_Bind reset_buffer) -> void { reset_buffer(); DataCatalog::getInstance().clear(); }; - CallbackFunction cb_ackClearCatalog = [this](const size_t conId, const ReceiveBuffer* rcv_buffer, const std::_Bind reset_buffer) -> void { + CallbackFunction cb_ackClearCatalog = [this](const size_t, const ReceiveBuffer*, const std::_Bind reset_buffer) -> void { reset_buffer(); std::lock_guard lk(clearCatalogLock); clearCatalogDone = true; @@ -1174,8 +1195,12 @@ DataCatalog::DataCatalog() { registerCallback(static_cast(catalog_communication_code::receive_column_data), cb_receiveCol); registerCallback(static_cast(catalog_communication_code::fetch_column_chunk), cb_fetchColChunk); registerCallback(static_cast(catalog_communication_code::receive_column_chunk), cb_receiveColChunk); + registerCallback(static_cast(catalog_communication_code::fetch_column_as_stream), cb_fetchColAsStream); + registerCallback(static_cast(catalog_communication_code::receive_column_as_stream), cb_receiveColAsStream); registerCallback(static_cast(catalog_communication_code::fetch_pseudo_pax), cb_fetchPseudoPax); + registerCallback(static_cast(catalog_communication_code::fetch_pseudo_pax_stream), cb_fetchPseudoPaxStream); registerCallback(static_cast(catalog_communication_code::receive_pseudo_pax), cb_receivePseudoPax); + registerCallback(static_cast(catalog_communication_code::receive_pseudo_pax_stream), cb_receivePseudoPaxStream); registerCallback(static_cast(catalog_communication_code::reconfigure_chunk_size), cb_reconfigureChunkSize); registerCallback(static_cast(catalog_communication_code::ack_reconfigure_chunk_size), cb_ackReconfigureChunkSize); registerCallback(static_cast(catalog_communication_code::generate_benchmark_data), cb_generateBenchmarkData); @@ -1196,7 +1221,7 @@ DataCatalog::~DataCatalog() { void DataCatalog::clear(bool sendRemote, bool destructor) { if (sendRemote) { - ConnectionManager::getInstance().sendOpCode(1, static_cast(catalog_communication_code::clear_catalog), true); + ConnectionManager::getInstance().getConnectionById(1)->sendOpcode(static_cast(catalog_communication_code::clear_catalog)); } for (auto it : cols) { @@ -1218,7 +1243,7 @@ void DataCatalog::clear(bool sendRemote, bool destructor) { clear_catalog_done.wait(lk, [this] { return clearCatalogDone; }); } } else if (!destructor) { - ConnectionManager::getInstance().sendOpCode(1, static_cast(catalog_communication_code::ack_clear_catalog), true); + ConnectionManager::getInstance().getConnectionById(1)->sendOpcode(static_cast(catalog_communication_code::ack_clear_catalog)); } } @@ -1288,6 +1313,10 @@ col_dict_t::iterator DataCatalog::generate(std::string ident, col_data_t type, s tmp->readableOffset = elemCount * sizeof(double); break; } + default: { + using namespace memConnect; + LOG_ERROR("Saw gen_void but its not handled." << std::endl;) + } } tmp->is_remote = false; tmp->is_complete = true; @@ -1324,12 +1353,12 @@ col_t* DataCatalog::add_remote_column(std::string name, col_network_info ni) { LOG_INFO("[DataCatalog] Column with same ident ('" << name << "') already present, cannot add remote column." << std::endl;) return it->second; } else { - LOG_DEBUG1("[DataCatalog] Creating new remote column: " << name << std::endl;) + // LOG_DEBUG1("[DataCatalog] Creating new remote column: " << name << std::endl;) col_t* col = new col_t(); col->ident = name; col->is_remote = true; - col->datatype = (col_data_t)ni.type_info; - col->allocate_on_numa((col_data_t)ni.type_info, ni.size_info, 0); + col->datatype = static_cast(ni.type_info); + col->allocate_on_numa(static_cast(ni.type_info), ni.size_info, 0); remote_cols.insert({name, col}); return col; } @@ -1405,18 +1434,19 @@ void DataCatalog::eraseAllRemoteColumns() { remote_col_info.clear(); } -void DataCatalog::fetchColStub(std::size_t conId, std::string& ident, bool wholeColumn) const { +void DataCatalog::fetchColStub(const std::size_t conId, const std::string& ident, bool wholeColumn, bool asStream) const { char* payload = reinterpret_cast(malloc(ident.size() + sizeof(size_t))); const size_t sz = ident.size(); memcpy(payload, &sz, sizeof(size_t)); memcpy(payload + sizeof(size_t), ident.c_str(), sz); catalog_communication_code code = wholeColumn ? catalog_communication_code::fetch_column_data : catalog_communication_code::fetch_column_chunk; + code = asStream ? catalog_communication_code::fetch_column_as_stream : code; ConnectionManager::getInstance().sendData(conId, payload, sz + sizeof(size_t), nullptr, 0, static_cast(code)); free(payload); } // Fetches a chunk of data sized CHUNK_MAX_SIZE containing information for all columns, equal amount of values -void DataCatalog::fetchPseudoPax(std::size_t conId, std::vector idents) const { +void DataCatalog::fetchPseudoPax(const std::size_t conId, const std::vector& idents, bool asStream) const { size_t string_sizes = 0; bool all_fetchable = true; @@ -1437,9 +1467,6 @@ void DataCatalog::fetchPseudoPax(std::size_t conId, std::vector ide } if (all_complete || !all_fetchable) { - // if (!all_complete) { - // std::cout << "Pending chunks of current pax request or complete, ignoring." << std::endl; - // } for (auto lk : locks) { lk->unlock(); } @@ -1454,7 +1481,7 @@ void DataCatalog::fetchPseudoPax(std::size_t conId, std::vector ide } /* col_cnt | [col_ident_size]+ | [col_ident]+ */ - char* payload = (char*)malloc(sizeof(size_t) + idents.size() * sizeof(size_t) + string_sizes); + char* payload = reinterpret_cast(malloc(sizeof(size_t) + idents.size() * sizeof(size_t) + string_sizes)); char* tmp = payload; size_t sz; @@ -1474,7 +1501,8 @@ void DataCatalog::fetchPseudoPax(std::size_t conId, std::vector ide tmp += id.size(); } const size_t total_payload_size = (sizeof(size_t) * (idents.size() + 1)) + string_sizes; - ConnectionManager::getInstance().sendData(conId, payload, total_payload_size, nullptr, 0, static_cast(catalog_communication_code::fetch_pseudo_pax)); + const catalog_communication_code code = asStream ? catalog_communication_code::fetch_pseudo_pax_stream : catalog_communication_code::fetch_pseudo_pax; + ConnectionManager::getInstance().sendData(conId, payload, total_payload_size, nullptr, 0, static_cast(code)); free(payload); } @@ -1487,19 +1515,11 @@ void DataCatalog::remoteInfoReady() { void DataCatalog::fetchRemoteInfo() { std::unique_lock lk(remote_info_lock); col_info_received = false; - ConnectionManager::getInstance().sendOpCode(1, static_cast(catalog_communication_code::send_column_info), true); - // while (!col_info_received) { - // using namespace std::chrono_literals; - // if (!remote_info_available.wait_for(lk, 1s, [this] { return col_info_received; })) { - // ConnectionManager::getInstance().sendOpCode(1, static_cast(catalog_communication_code::send_column_info)); - // } - // } + ConnectionManager::getInstance().getConnectionById(1)->sendOpcode(static_cast(catalog_communication_code::send_column_info)); remote_info_available.wait(lk, [this] { return col_info_received; }); } void DataCatalog::reconfigureChunkSize(const uint64_t newChunkSize, const uint64_t newChunkThreshold) { - using namespace std::chrono_literals; - if (newChunkSize == 0 || newChunkThreshold == 0) { LOG_WARNING("Either the new Chunk Size or the new Chunk Threshold was 0!" << std::endl;) return; @@ -1577,7 +1597,7 @@ void DataCatalog::generateBenchmarkData(const uint64_t distinctLocalColumns, con data_generation_done.wait(lk, [this] { return dataGenerationDone; }); } } else { - ConnectionManager::getInstance().sendOpCode(1, static_cast(catalog_communication_code::ack_generate_benchmark_data), true); + ConnectionManager::getInstance().getConnectionById(1)->sendOpcode(static_cast(catalog_communication_code::ack_generate_benchmark_data)); } print_all(); diff --git a/src/Queries.cpp b/src/Queries.cpp deleted file mode 100644 index 607c729..0000000 --- a/src/Queries.cpp +++ /dev/null @@ -1,751 +0,0 @@ -// #include -// #include -// #include - -// #include - -// #include "Benchmarks.hpp" -// #include "Operators.hpp" - -// std::chrono::duration waitingTime = std::chrono::duration::zero(); -// std::chrono::duration workingTime = std::chrono::duration::zero(); - -// inline void reset_timer() { -// waitingTime = std::chrono::duration::zero(); -// workingTime = std::chrono::duration::zero(); -// } - -// inline void wait_col_data_ready(col_t* _col, char* _data) { -// auto s_ts = std::chrono::high_resolution_clock::now(); -// std::unique_lock lk(_col->iteratorLock); -// if (!(_data < static_cast(_col->current_end))) { -// _col->iterator_data_available.wait(lk, [_col, _data] { return reinterpret_cast(_data) < static_cast(_col->current_end); }); -// } -// waitingTime += (std::chrono::high_resolution_clock::now() - s_ts); -// }; - -// template -// inline void fetch_data(col_t* column, uint64_t* data, const bool reload) { -// if (remote) { -// if (reload) { -// if (!prefetching && !paxed) { -// column->request_data(!chunked); -// } -// } -// wait_col_data_ready(column, reinterpret_cast(data)); -// if (reload) { -// if (prefetching && chunked && !paxed) { -// column->request_data(!chunked); -// } -// } -// } -// } - -// template -// inline std::vector less_than(col_t* column, const uint64_t predicate, const uint64_t offset, const size_t blockSize, const std::vector in_pos, const bool reload) { -// auto data = reinterpret_cast(column->data) + offset; - -// std::vector out_vec; - -// fetch_data(column, data, reload); - -// if (timings) { -// auto s_ts = std::chrono::high_resolution_clock::now(); -// out_vec = Operators::less_than(data, predicate, blockSize, in_pos); -// workingTime += (std::chrono::high_resolution_clock::now() - s_ts); -// } else { -// out_vec = Operators::less_than(data, predicate, blockSize, in_pos); -// } - -// return out_vec; -// }; - -// template -// inline std::vector less_equal(col_t* column, const uint64_t predicate, const uint64_t offset, const size_t blockSize, const std::vector in_pos, const bool reload) { -// auto data = reinterpret_cast(column->data) + offset; - -// std::vector out_vec; - -// fetch_data(column, data, reload); - -// if (timings) { -// auto s_ts = std::chrono::high_resolution_clock::now(); -// out_vec = Operators::less_equal(data, predicate, blockSize, in_pos); -// workingTime += (std::chrono::high_resolution_clock::now() - s_ts); -// } else { -// out_vec = Operators::less_equal(data, predicate, blockSize, in_pos); -// } - -// return out_vec; -// }; - -// template -// inline std::vector greater_than(col_t* column, const uint64_t predicate, const uint64_t offset, const size_t blockSize, const std::vector in_pos, const bool reload) { -// auto data = reinterpret_cast(column->data) + offset; - -// std::vector out_vec; - -// fetch_data(column, data, reload); - -// if (timings) { -// auto s_ts = std::chrono::high_resolution_clock::now(); -// out_vec = Operators::greater_than(data, predicate, blockSize, in_pos); -// workingTime += (std::chrono::high_resolution_clock::now() - s_ts); -// } else { -// out_vec = Operators::greater_than(data, predicate, blockSize, in_pos); -// } - -// return out_vec; -// }; - -// template -// inline std::vector greater_equal(col_t* column, const uint64_t predicate, const uint64_t offset, const size_t blockSize, const std::vector in_pos, const bool reload) { -// auto data = reinterpret_cast(column->data) + offset; - -// std::vector out_vec; - -// fetch_data(column, data, reload); - -// if (timings) { -// auto s_ts = std::chrono::high_resolution_clock::now(); -// out_vec = Operators::greater_equal(data, predicate, blockSize, in_pos); -// workingTime += (std::chrono::high_resolution_clock::now() - s_ts); -// } else { -// out_vec = Operators::greater_equal(data, predicate, blockSize, in_pos); -// } - -// return out_vec; -// }; - -// template -// inline std::vector equal(col_t* column, const uint64_t predicate, const uint64_t offset, const size_t blockSize, const std::vector in_pos, const bool reload) { -// auto data = reinterpret_cast(column->data) + offset; - -// std::vector out_vec; - -// fetch_data(column, data, reload); - -// if (timings) { -// auto s_ts = std::chrono::high_resolution_clock::now(); -// out_vec = Operators::equal(data, predicate, blockSize, in_pos); -// workingTime += (std::chrono::high_resolution_clock::now() - s_ts); -// } else { -// out_vec = Operators::equal(data, predicate, blockSize, in_pos); -// } - -// return out_vec; -// }; - -// template -// inline std::vector between_incl(col_t* column, const uint64_t predicate_1, const uint64_t predicate_2, const uint64_t offset, const size_t blockSize, const std::vector in_pos, const bool reload) { -// auto data = reinterpret_cast(column->data) + offset; - -// std::vector out_vec; - -// fetch_data(column, data, reload); - -// if (timings) { -// auto s_ts = std::chrono::high_resolution_clock::now(); -// out_vec = Operators::between_incl(data, predicate_1, predicate_2, blockSize, in_pos); -// workingTime += (std::chrono::high_resolution_clock::now() - s_ts); -// } else { -// out_vec = Operators::between_incl(data, predicate_1, predicate_2, blockSize, in_pos); -// } - -// return out_vec; -// }; - -// template -// inline std::vector between_excl(col_t* column, const uint64_t predicate_1, const uint64_t predicate_2, const uint64_t offset, const size_t blockSize, const std::vector in_pos, const bool reload) { -// auto data = reinterpret_cast(column->data) + offset; - -// std::vector out_vec; - -// fetch_data(column, data, reload); - -// if (timings) { -// auto s_ts = std::chrono::high_resolution_clock::now(); -// out_vec = Operators::between_excl(data, predicate_1, predicate_2, blockSize, in_pos); -// workingTime += (std::chrono::high_resolution_clock::now() - s_ts); -// } else { -// out_vec = Operators::between_excl(data, predicate_1, predicate_2, blockSize, in_pos); -// } - -// return out_vec; -// }; - -// template -// uint64_t pipe_1(const uint64_t predicate, const std::vector idents) { -// col_t* col_0; -// col_t* col_1; -// col_t* col_2; - -// if (idents.size() != 3) { -// LOG_ERROR("The size of 'idents' was not equal to 3" << std::endl;) -// return 0; -// } - -// std::chrono::time_point s_ts; - -// if (remote) { -// col_0 = DataCatalog::getInstance().find_remote(idents[0]); -// if (prefetching && !paxed) col_0->request_data(!chunked); -// col_1 = DataCatalog::getInstance().find_remote(idents[1]); -// if (prefetching && !paxed) col_1->request_data(!chunked); -// col_2 = DataCatalog::getInstance().find_remote(idents[2]); -// if (prefetching && !paxed) col_2->request_data(!chunked); - -// // if (prefetching && paxed) DataCatalog::getInstance().fetchPseudoPax(1, idents); -// } else { -// col_0 = DataCatalog::getInstance().find_local(idents[0]); -// col_1 = DataCatalog::getInstance().find_local(idents[1]); -// col_2 = DataCatalog::getInstance().find_local(idents[2]); -// } - -// size_t columnSize = col_0->size; - -// size_t max_elems_per_chunk = 0; -// size_t currentBlockSize = max_elems_per_chunk; -// // if (paxed) { -// // size_t total_id_len = 0; -// // for (auto& id : idents) { -// // total_id_len += id.size(); -// // } - -// // const size_t appMetaSize = 3 * sizeof(size_t) + (sizeof(size_t) * idents.size()) + total_id_len; -// // const size_t maximumPayloadSize = ConnectionManager::getInstance().getConnectionById(1)->maxBytesInPayload(appMetaSize); - -// // max_elems_per_chunk = ((maximumPayloadSize / idents.size()) / (sizeof(uint64_t) * 4)) * 4; -// // currentBlockSize = max_elems_per_chunk; -// // } else -// if (!(remote && (chunked || paxed))) { -// max_elems_per_chunk = columnSize; -// currentBlockSize = Benchmarks::OPTIMAL_BLOCK_SIZE / sizeof(uint64_t); -// } else { -// max_elems_per_chunk = DataCatalog::getInstance().dataCatalog_chunkMaxSize / sizeof(uint64_t); -// if (max_elems_per_chunk <= Benchmarks::OPTIMAL_BLOCK_SIZE / sizeof(uint64_t)) { -// currentBlockSize = max_elems_per_chunk; -// } else { -// currentBlockSize = Benchmarks::OPTIMAL_BLOCK_SIZE / sizeof(uint64_t); -// } -// } - -// uint64_t sum = 0; -// size_t baseOffset = 0; -// size_t currentChunkElementsProcessed = 0; - -// auto data_col_2 = reinterpret_cast(col_2->data); -// auto data_col_0 = reinterpret_cast(col_0->data); - -// while (baseOffset < columnSize) { -// // if (remote && paxed) { -// // if (!prefetching) DataCatalog::getInstance().fetchPseudoPax(1, idents); -// // wait_col_data_ready(col_2, reinterpret_cast(data_col_2)); -// // if (prefetching) DataCatalog::getInstance().fetchPseudoPax(1, idents); -// // } - -// const size_t elem_diff = columnSize - baseOffset; -// if (elem_diff < currentBlockSize) { -// currentBlockSize = elem_diff; -// } - -// auto le_idx = greater_than(col_2, 5, baseOffset, currentBlockSize, -// less_than(col_1, 25, baseOffset, currentBlockSize, -// between_incl(col_0, 10, 30, baseOffset, currentBlockSize, {}, currentChunkElementsProcessed == 0), currentChunkElementsProcessed == 0), -// currentChunkElementsProcessed == 0); - -// s_ts = std::chrono::high_resolution_clock::now(); -// for (auto idx : le_idx) { -// sum += (data_col_0[idx] * data_col_2[idx]); -// // ++sum; -// } -// workingTime += (std::chrono::high_resolution_clock::now() - s_ts); - -// baseOffset += currentBlockSize; -// data_col_0 += currentBlockSize; -// data_col_2 += currentBlockSize; -// currentChunkElementsProcessed = baseOffset % max_elems_per_chunk; -// } - -// return sum; -// } - -// template -// uint64_t pipe_2(const uint64_t predicate, const std::vector idents) { -// col_t* col_0; -// col_t* col_1; -// col_t* col_2; - -// if (idents.size() != 3) { -// LOG_ERROR("The size of 'idents' was not equal to 3" << std::endl;) -// return 0; -// } - -// std::chrono::time_point s_ts; - -// if (remote) { -// col_1 = DataCatalog::getInstance().find_remote(idents[1]); -// if (prefetching && !paxed) col_1->request_data(!chunked); -// col_0 = DataCatalog::getInstance().find_remote(idents[0]); -// if (prefetching && !paxed) col_0->request_data(!chunked); -// col_2 = DataCatalog::getInstance().find_remote(idents[2]); -// if (prefetching && !paxed) col_2->request_data(!chunked); - -// // if (prefetching && paxed) DataCatalog::getInstance().fetchPseudoPax(1, idents); -// } else { -// col_0 = DataCatalog::getInstance().find_local(idents[0]); -// col_1 = DataCatalog::getInstance().find_local(idents[1]); -// col_2 = DataCatalog::getInstance().find_local(idents[2]); -// } - -// size_t columnSize = col_1->size; - -// size_t max_elems_per_chunk = 0; -// size_t currentBlockSize = max_elems_per_chunk; -// // if (paxed) { -// // size_t total_id_len = 0; -// // for (auto& id : idents) { -// // total_id_len += id.size(); -// // } - -// // const size_t appMetaSize = 3 * sizeof(size_t) + (sizeof(size_t) * idents.size()) + total_id_len; -// // const size_t maximumPayloadSize = ConnectionManager::getInstance().getConnectionById(1)->maxBytesInPayload(appMetaSize); - -// // max_elems_per_chunk = ((maximumPayloadSize / idents.size()) / (sizeof(uint64_t) * 4)) * 4; -// // currentBlockSize = max_elems_per_chunk; -// // } else -// if (!(remote && (chunked || paxed))) { -// max_elems_per_chunk = columnSize; -// currentBlockSize = Benchmarks::OPTIMAL_BLOCK_SIZE / sizeof(uint64_t); -// } else { -// max_elems_per_chunk = DataCatalog::getInstance().dataCatalog_chunkMaxSize / sizeof(uint64_t); -// if (max_elems_per_chunk <= Benchmarks::OPTIMAL_BLOCK_SIZE / sizeof(uint64_t)) { -// currentBlockSize = max_elems_per_chunk; -// } else { -// currentBlockSize = Benchmarks::OPTIMAL_BLOCK_SIZE / sizeof(uint64_t); -// } -// } - -// uint64_t sum = 0; -// size_t baseOffset = 0; -// size_t currentChunkElementsProcessed = 0; - -// auto data_col_2 = reinterpret_cast(col_2->data); -// auto data_col_0 = reinterpret_cast(col_0->data); - -// while (baseOffset < columnSize) { -// // if (remote && paxed) { -// // if (!prefetching) DataCatalog::getInstance().fetchPseudoPax(1, idents); -// // wait_col_data_ready(col_2, reinterpret_cast(data_col_2)); -// // if (prefetching) DataCatalog::getInstance().fetchPseudoPax(1, idents); -// // } - -// const size_t elem_diff = columnSize - baseOffset; -// if (elem_diff < currentBlockSize) { -// currentBlockSize = elem_diff; -// } - -// auto le_idx = less_than(col_1, predicate, baseOffset, currentBlockSize, {}, currentChunkElementsProcessed == 0); - -// if (remote && !paxed) { -// if (currentChunkElementsProcessed == 0) { -// if (!prefetching && chunked) { -// col_2->request_data(!chunked); -// col_0->request_data(!chunked); -// } -// wait_col_data_ready(col_2, reinterpret_cast(data_col_2)); -// wait_col_data_ready(col_0, reinterpret_cast(data_col_0)); -// if (prefetching && chunked) { -// col_2->request_data(!chunked); -// col_0->request_data(!chunked); -// } -// } -// } - -// s_ts = std::chrono::high_resolution_clock::now(); -// for (auto idx : le_idx) { -// sum += (data_col_0[idx] * data_col_2[idx]); -// // ++sum; -// } -// workingTime += (std::chrono::high_resolution_clock::now() - s_ts); - -// baseOffset += currentBlockSize; -// data_col_0 += currentBlockSize; -// data_col_2 += currentBlockSize; -// currentChunkElementsProcessed = baseOffset % max_elems_per_chunk; -// } - -// return sum; -// } - -// template -// uint64_t pipe_3(const uint64_t predicate, const std::vector idents) { -// col_t* column_0; -// col_t* column_1; -// col_t* column_2; -// col_t* column_3; - -// if (idents.size() != 4) { -// LOG_ERROR("The size of 'idents' was not equal to 4" << std::endl;) -// return 0; -// } - -// std::chrono::time_point s_ts; - -// if (remote) { -// column_0 = DataCatalog::getInstance().find_remote(idents[0]); -// if (prefetching && !paxed) column_0->request_data(!chunked); -// column_1 = DataCatalog::getInstance().find_remote(idents[1]); -// if (prefetching && !paxed) column_1->request_data(!chunked); -// column_2 = DataCatalog::getInstance().find_remote(idents[2]); -// if (prefetching && !paxed) column_2->request_data(!chunked); -// column_3 = DataCatalog::getInstance().find_remote(idents[3]); -// if (prefetching && !paxed) column_3->request_data(!chunked); - -// // if (prefetching && paxed) DataCatalog::getInstance().fetchPseudoPax(1, idents); -// } else { -// column_0 = DataCatalog::getInstance().find_local(idents[0]); -// column_1 = DataCatalog::getInstance().find_local(idents[1]); -// column_2 = DataCatalog::getInstance().find_local(idents[2]); -// column_3 = DataCatalog::getInstance().find_local(idents[3]); -// } - -// size_t columnSize = column_0->size; - -// size_t max_elems_per_chunk = 0; -// size_t currentBlockSize = max_elems_per_chunk; -// // if (paxed) { -// // size_t total_id_len = 0; -// // for (auto& id : idents) { -// // total_id_len += id.size(); -// // } - -// // const size_t appMetaSize = 3 * sizeof(size_t) + (sizeof(size_t) * idents.size()) + total_id_len; -// // const size_t maximumPayloadSize = ConnectionManager::getInstance().getConnectionById(1)->maxBytesInPayload(appMetaSize); - -// // max_elems_per_chunk = ((maximumPayloadSize / idents.size()) / (sizeof(uint64_t) * 4)) * 4; -// // currentBlockSize = max_elems_per_chunk; -// // } else -// if (!(remote && (chunked || paxed))) { -// max_elems_per_chunk = columnSize; -// currentBlockSize = Benchmarks::OPTIMAL_BLOCK_SIZE / sizeof(uint64_t); -// } else { -// max_elems_per_chunk = DataCatalog::getInstance().dataCatalog_chunkMaxSize / sizeof(uint64_t); -// if (max_elems_per_chunk <= Benchmarks::OPTIMAL_BLOCK_SIZE / sizeof(uint64_t)) { -// currentBlockSize = max_elems_per_chunk; -// } else { -// currentBlockSize = Benchmarks::OPTIMAL_BLOCK_SIZE / sizeof(uint64_t); -// } -// } - -// uint64_t sum = 0; -// size_t baseOffset = 0; -// size_t currentChunkElementsProcessed = 0; - -// auto data_2 = reinterpret_cast(column_2->data); -// auto data_3 = reinterpret_cast(column_3->data); - -// while (baseOffset < columnSize) { -// // if (remote && paxed) { -// // if (!prefetching) DataCatalog::getInstance().fetchPseudoPax(1, idents); -// // wait_col_data_ready(column_3, reinterpret_cast(data_3)); -// // if (prefetching) DataCatalog::getInstance().fetchPseudoPax(1, idents); -// // } - -// const size_t elem_diff = columnSize - baseOffset; -// if (elem_diff < currentBlockSize) { -// currentBlockSize = elem_diff; -// } - -// auto le_idx = equal(column_3, 16, baseOffset, currentBlockSize, -// greater_than(column_2, 5, baseOffset, currentBlockSize, -// less_than(column_1, 25, baseOffset, currentBlockSize, -// between_incl(column_0, 10, 30, baseOffset, currentBlockSize, {}, currentChunkElementsProcessed == 0), currentChunkElementsProcessed == 0), -// currentChunkElementsProcessed == 0), -// currentChunkElementsProcessed == 0); - -// s_ts = std::chrono::high_resolution_clock::now(); -// for (auto idx : le_idx) { -// sum += (data_2[idx] * data_3[idx]); -// // ++sum; -// } -// workingTime += (std::chrono::high_resolution_clock::now() - s_ts); - -// baseOffset += currentBlockSize; -// data_2 += currentBlockSize; -// data_3 += currentBlockSize; -// currentChunkElementsProcessed = baseOffset % max_elems_per_chunk; -// } - -// return sum; -// } - -// // template -// // void doBenchmarkRemotes(Fn&& f1, Fn&& f2, Fn&& f3, Fn&& f4, Fn&& f5, std::ofstream& out, const uint64_t predicate) { -// // uint64_t sum = 0; -// // std::chrono::time_point s_ts; -// // std::chrono::time_point e_ts; -// // std::chrono::duration secs; - -// // for (size_t i = 0; i < 5; ++i) { -// // reset_timer(); -// // DataCatalog::getInstance().fetchRemoteInfo(); -// // s_ts = std::chrono::high_resolution_clock::now(); -// // sum = f1(predicate); -// // e_ts = std::chrono::high_resolution_clock::now(); - -// // secs = e_ts - s_ts; - -// // out << "Remote\tFull\tPipe\t" << Benchmarks::OPTIMAL_BLOCK_SIZE << "\t" << +predicate << "\t" << sum << "\t" << waitingTime.count() << "\t" << workingTime.count() << "\t" << secs.count() << std::endl -// // << std::flush; -// // std::cout << "Remote\tFull\tPipe\t" << Benchmarks::OPTIMAL_BLOCK_SIZE << "\t" << +predicate << "\t" << sum << "\t" << waitingTime.count() << "\t" << workingTime.count() << "\t" << secs.count() << std::endl; - -// // DataCatalog::getInstance().eraseAllRemoteColumns(); - -// // for (uint64_t chunkSize = 1ull << 18; chunkSize <= 1ull << 27; chunkSize <<= 1) { -// // DataCatalog::getInstance().reconfigureChunkSize(chunkSize, chunkSize); - -// // reset_timer(); -// // DataCatalog::getInstance().fetchRemoteInfo(); -// // s_ts = std::chrono::high_resolution_clock::now(); -// // sum = f2(predicate); -// // e_ts = std::chrono::high_resolution_clock::now(); - -// // secs = e_ts - s_ts; - -// // out << "Remote\tChunked\tOper\t" << +DataCatalog::getInstance().dataCatalog_chunkMaxSize << "\t" << +predicate << "\t" << sum << "\t" << waitingTime.count() << "\t" << workingTime.count() << "\t" << secs.count() << std::endl -// // << std::flush; -// // std::cout << "Remote\tChunked\tOper\t" << +DataCatalog::getInstance().dataCatalog_chunkMaxSize << "\t" << +predicate << "\t" << sum << "\t" << waitingTime.count() << "\t" << workingTime.count() << "\t" << secs.count() << std::endl; - -// // DataCatalog::getInstance().eraseAllRemoteColumns(); - -// // reset_timer(); -// // DataCatalog::getInstance().fetchRemoteInfo(); -// // s_ts = std::chrono::high_resolution_clock::now(); -// // sum = f3(predicate); -// // e_ts = std::chrono::high_resolution_clock::now(); - -// // secs = e_ts - s_ts; - -// // out << "Remote\tChunked\tPipe\t" << +DataCatalog::getInstance().dataCatalog_chunkMaxSize << "\t" << +predicate << "\t" << sum << "\t" << waitingTime.count() << "\t" << workingTime.count() << "\t" << secs.count() << std::endl -// // << std::flush; -// // std::cout << "Remote\tChunked\tPipe\t" << +DataCatalog::getInstance().dataCatalog_chunkMaxSize << "\t" << +predicate << "\t" << sum << "\t" << waitingTime.count() << "\t" << workingTime.count() << "\t" << secs.count() << std::endl; - -// // DataCatalog::getInstance().eraseAllRemoteColumns(); -// // } - -// // reset_timer(); -// // DataCatalog::getInstance().fetchRemoteInfo(); -// // s_ts = std::chrono::high_resolution_clock::now(); -// // sum = f4(predicate); -// // e_ts = std::chrono::high_resolution_clock::now(); - -// // secs = e_ts - s_ts; - -// // out << "Remote\tPaxed\tOper\t000000\t" << +predicate << "\t" << sum << "\t" << waitingTime.count() << "\t" << workingTime.count() << "\t" << secs.count() << std::endl -// // << std::flush; -// // std::cout << "Remote\tPaxed\tOper\t000000\t" << +predicate << "\t" << sum << "\t" << waitingTime.count() << "\t" << workingTime.count() << "\t" << secs.count() << std::endl; - -// // DataCatalog::getInstance().eraseAllRemoteColumns(); - -// // reset_timer(); -// // DataCatalog::getInstance().fetchRemoteInfo(); -// // s_ts = std::chrono::high_resolution_clock::now(); -// // sum = f5(predicate); -// // e_ts = std::chrono::high_resolution_clock::now(); - -// // secs = e_ts - s_ts; - -// // out << "Remote\tPaxed\tPipe\t000000\t" << +predicate << "\t" << sum << "\t" << waitingTime.count() << "\t" << workingTime.count() << "\t" << secs.count() << std::endl -// // << std::flush; -// // std::cout << "Remote\tPaxed\tPipe\t000000\t" << +predicate << "\t" << sum << "\t" << waitingTime.count() << "\t" << workingTime.count() << "\t" << secs.count() << std::endl; - -// // DataCatalog::getInstance().eraseAllRemoteColumns(); -// // } -// // } - -// // template -// // uint64_t frontPage_1() { -// // uint64_t sum_1 = 0; -// // uint64_t sum_2 = 0; - -// // DataCatalog::getInstance().fetchRemoteInfo(); -// // sum_1 = pipe_1(0); -// // DataCatalog::getInstance().eraseAllRemoteColumns(); -// // DataCatalog::getInstance().fetchRemoteInfo(); -// // sum_2 = pipe_3(0); - -// // return sum_1 + sum_2; -// // } - -// // template -// // uint64_t frontPage_2() { -// // uint64_t sum_1 = 0; -// // uint64_t sum_2 = 0; - -// // DataCatalog::getInstance().fetchRemoteInfo(); -// // sum_1 = pipe_1(0); -// // sum_2 = pipe_3(0); - -// // return sum_1 + sum_2; -// // } - -// // template -// // uint64_t frontPage_3() { -// // uint64_t sum_1 = 0; -// // uint64_t sum_2 = 0; - -// // DataCatalog::getInstance().fetchRemoteInfo(); -// // #pragma omp parallel for schedule(static, 1) num_threads(2) -// // for (size_t i = 0; i < 2; ++i) { -// // if (i == 0) { -// // sum_1 = pipe_1(0); -// // } else if (i == 1) { -// // sum_2 = pipe_3(0); -// // } -// // } - -// // return sum_1 + sum_2; -// // } - -// // void doBenchmarkFrontPage(std::ofstream& out) { -// // uint64_t sum = 0; -// // std::chrono::time_point s_ts; -// // std::chrono::time_point e_ts; -// // std::chrono::duration secs; -// // uint64_t chunkSize = 1ull << 22; - -// // DataCatalog::getInstance().reconfigureChunkSize(chunkSize, chunkSize); - -// // for (size_t i = 0; i < 5; ++i) { -// // reset_timer(); -// // s_ts = std::chrono::high_resolution_clock::now(); -// // sum = frontPage_1(); -// // e_ts = std::chrono::high_resolution_clock::now(); - -// // secs = e_ts - s_ts; - -// // out << +chunkSize << "\t1\t" << +sum << "\t" << waitingTime.count() << "\t" << secs.count() << std::endl -// // << std::flush; -// // std::cout << +chunkSize << "\t1\t" << +sum << "\t" << waitingTime.count() << "\t" << secs.count() << std::endl; - -// // DataCatalog::getInstance().eraseAllRemoteColumns(); -// // reset_timer(); - -// // s_ts = std::chrono::high_resolution_clock::now(); -// // sum = frontPage_2(); -// // e_ts = std::chrono::high_resolution_clock::now(); - -// // secs = e_ts - s_ts; - -// // out << +chunkSize << "\t2\t" << +sum << "\t" << waitingTime.count() << "\t" << secs.count() << std::endl -// // << std::flush; -// // std::cout << +chunkSize << "\t2\t" << +sum << "\t" << waitingTime.count() << "\t" << secs.count() << std::endl; - -// // DataCatalog::getInstance().eraseAllRemoteColumns(); -// // reset_timer(); - -// // s_ts = std::chrono::high_resolution_clock::now(); -// // sum = frontPage_2(); -// // e_ts = std::chrono::high_resolution_clock::now(); - -// // secs = e_ts - s_ts; - -// // out << +chunkSize << "\t3\t" << +sum << "\t" << waitingTime.count() << "\t" << secs.count() << std::endl -// // << std::flush; -// // std::cout << +chunkSize << "\t3\t" << +sum << "\t" << waitingTime.count() << "\t" << secs.count() << std::endl; - -// // DataCatalog::getInstance().eraseAllRemoteColumns(); -// // reset_timer(); - -// // s_ts = std::chrono::high_resolution_clock::now(); -// // sum = frontPage_3(); -// // e_ts = std::chrono::high_resolution_clock::now(); - -// // secs = e_ts - s_ts; - -// // out << +chunkSize << "\t4\t" << +sum << "\t" << waitingTime.count() / 2 << "\t" << secs.count() << std::endl -// // << std::flush; -// // std::cout << +chunkSize << "\t4\t" << +sum << "\t" << waitingTime.count() / 2 << "\t" << secs.count() << std::endl; - -// // DataCatalog::getInstance().eraseAllRemoteColumns(); -// // reset_timer(); -// // } -// // } - -// // void executeRemoteBenchmarkingQueries(std::string& logName) { -// // std::ofstream out; -// // out.open(logName, std::ios_base::app); -// // out << std::fixed << std::setprecision(7) << std::endl; -// // std::cout << std::fixed << std::setprecision(7) << std::endl; -// // const std::array predicates{1, 25, 50, 75, 100}; - -// // // - -// // doBenchmarkRemotes(pipe_1, // Remote Full Pipe/Oper (difference due to block-wise evaluation not significant) -// // pipe_1, // Remote Chunked Oper -// // pipe_1, // Remote Chunked Pipe -// // pipe_1, // Remote Paxed Oper -// // pipe_1, // Remote Paxed Pipe -// // out, 0); - -// // for (const auto predicate : predicates) { -// // doBenchmarkRemotes(pipe_2, // Remote Full Pipe/Oper (difference due to block-wise evaluation not significant) -// // pipe_2, // Remote Chunked Oper -// // pipe_2, // Remote Chunked Pipe -// // pipe_2, // Remote Paxed Oper -// // pipe_2, // Remote Paxed Pipe -// // out, predicate); -// // } - -// // out.close(); -// // } - -// // void executeFrontPageBenchmarkingQueries(std::string& logName) { -// // std::ofstream out; -// // out.open(logName, std::ios_base::app); -// // out << std::fixed << std::setprecision(7) << std::endl; -// // std::cout << std::fixed << std::setprecision(7) << std::endl; - -// // // - -// // doBenchmarkFrontPage(out); - -// // out.close(); -// // } - -// // void executeLocalBenchmarkingQueries(std::string& logName, std::string locality) { -// // std::ofstream out; -// // out.open(logName, std::ios_base::app); -// // out << std::fixed << std::setprecision(7) << std::endl; -// // std::cout << std::fixed << std::setprecision(7) << std::endl; -// // const std::array predicates{0, 1, 25, 50, 75, 100}; -// // uint64_t sum = 0; -// // std::chrono::_V2::system_clock::time_point s_ts; -// // std::chrono::_V2::system_clock::time_point e_ts; - -// // for (const auto predicate : predicates) { -// // for (size_t i = 0; i < 20; ++i) { -// // reset_timer(); - -// // if (predicate == 0) { -// // s_ts = std::chrono::high_resolution_clock::now(); -// // sum = pipe_1(predicate, {"col_0", "col_1", "col_2"}); -// // e_ts = std::chrono::high_resolution_clock::now(); -// // } else { -// // s_ts = std::chrono::high_resolution_clock::now(); -// // sum = pipe_2(predicate, {"col_0", "col_1", "col_2"}); -// // e_ts = std::chrono::high_resolution_clock::now(); -// // } - -// // std::chrono::duration secs = e_ts - s_ts; -// // auto additional_time = secs.count() - (workingTime.count() + waitingTime.count()); - -// // out << locality << "\tFull\tPipe\t" << Benchmarks::OPTIMAL_BLOCK_SIZE << "\t" << +predicate << "\t" << sum << "\t" << waitingTime.count() << "\t" << workingTime.count() << "\t" << secs.count() << std::endl -// // << std::flush; -// // std::cout << locality << "\tFull\tPipe\t" << Benchmarks::OPTIMAL_BLOCK_SIZE << "\t" << +predicate << "\t" << sum << "\t" << waitingTime.count() << "\t" << workingTime.count() << "\t" << secs.count() << "\t" << additional_time << std::endl; -// // } -// // } - -// // out.close(); -// // } \ No newline at end of file diff --git a/src/QueriesMT.cpp b/src/QueriesMT.cpp deleted file mode 100644 index 477eb93..0000000 --- a/src/QueriesMT.cpp +++ /dev/null @@ -1,874 +0,0 @@ -#include -#include -#include -#include - -#include - -inline void wait_col_data_ready(col_t* _col, char* _data) { - std::unique_lock lk(_col->iteratorLock); - if (!(_data < static_cast(_col->current_end))) { - _col->iterator_data_available.wait(lk, [_col, _data] { return reinterpret_cast(_data) < static_cast(_col->current_end); }); - } -}; - -template -inline std::vector less_than(col_t* column, const uint64_t predicate, const uint64_t offset, const size_t blockSize, const std::vector in_pos, const bool reload) { - auto data = reinterpret_cast(column->data) + offset; - if (remote) { - wait_col_data_ready(column, reinterpret_cast(data)); - if (reload) { - if (chunked && !paxed) { - column->request_data(!chunked); - } - } - } - - std::vector out_vec; - out_vec.reserve(blockSize); - if (isFirst) { - for (auto e = 0; e < blockSize; ++e) { - if (data[e] < predicate) { - out_vec.push_back(e); - } - } - } else { - for (auto e : in_pos) { - if (data[e] < predicate) { - out_vec.push_back(e); - } - } - } - - return out_vec; -}; - -template -inline std::vector less_equal(col_t* column, const uint64_t predicate, const uint64_t offset, const size_t blockSize, const std::vector in_pos, const bool reload) { - auto data = reinterpret_cast(column->data) + offset; - if (remote) { - wait_col_data_ready(column, reinterpret_cast(data)); - if (reload) { - if (chunked && !paxed) { - column->request_data(!chunked); - } - } - } - - std::vector out_vec; - out_vec.reserve(blockSize); - if (isFirst) { - for (auto e = 0; e < blockSize; ++e) { - if (data[e] <= predicate) { - out_vec.push_back(e); - } - } - } else { - for (auto e : in_pos) { - if (data[e] <= predicate) { - out_vec.push_back(e); - } - } - } - - return out_vec; -}; - -template -inline std::vector greater_than(col_t* column, const uint64_t predicate, const uint64_t offset, const size_t blockSize, const std::vector in_pos, const bool reload) { - auto data = reinterpret_cast(column->data) + offset; - if (remote) { - wait_col_data_ready(column, reinterpret_cast(data)); - if (reload) { - if (chunked && !paxed) { - column->request_data(!chunked); - } - } - } - - std::vector out_vec; - out_vec.reserve(blockSize); - if (isFirst) { - for (auto e = 0; e < blockSize; ++e) { - if (data[e] > predicate) { - out_vec.push_back(e); - } - } - } else { - for (auto e : in_pos) { - if (data[e] > predicate) { - out_vec.push_back(e); - } - } - } - - return out_vec; -}; - -template -inline std::vector greater_equal(col_t* column, const uint64_t predicate, const uint64_t offset, const size_t blockSize, const std::vector in_pos, const bool reload) { - auto data = reinterpret_cast(column->data) + offset; - if (remote) { - wait_col_data_ready(column, reinterpret_cast(data)); - if (reload) { - if (chunked && !paxed) { - column->request_data(!chunked); - } - } - } - - std::vector out_vec; - out_vec.reserve(blockSize); - if (isFirst) { - for (auto e = 0; e < blockSize; ++e) { - if (data[e] >= predicate) { - out_vec.push_back(e); - } - } - } else { - for (auto e : in_pos) { - if (data[e] >= predicate) { - out_vec.push_back(e); - } - } - } - - return out_vec; -}; - -template -inline std::vector equal(col_t* column, const uint64_t predicate, const uint64_t offset, const size_t blockSize, const std::vector in_pos, const bool reload) { - auto data = reinterpret_cast(column->data) + offset; - if (remote) { - wait_col_data_ready(column, reinterpret_cast(data)); - if (reload) { - if (chunked && !paxed) { - column->request_data(!chunked); - } - } - } - - auto s_ts = std::chrono::high_resolution_clock::now(); - std::vector out_vec; - out_vec.reserve(blockSize); - if (isFirst) { - for (auto e = 0; e < blockSize; ++e) { - if (data[e] == predicate) { - out_vec.push_back(e); - } - } - } else { - for (auto e : in_pos) { - if (data[e] == predicate) { - out_vec.push_back(e); - } - } - } - - return out_vec; -}; - -template -inline std::vector between_incl(col_t* column, const uint64_t predicate_1, const uint64_t predicate_2, const uint64_t offset, const size_t blockSize, const std::vector in_pos, const bool reload) { - auto data = reinterpret_cast(column->data) + offset; - if (remote) { - wait_col_data_ready(column, reinterpret_cast(data)); - if (reload) { - if (chunked && !paxed) { - column->request_data(!chunked); - } - } - } - - std::vector out_vec; - out_vec.reserve(blockSize); - if (isFirst) { - for (auto e = 0; e < blockSize; ++e) { - if (predicate_1 <= data[e] && data[e] <= predicate_2) { - out_vec.push_back(e); - } - } - } else { - for (auto e : in_pos) { - if (predicate_1 <= data[e] && data[e] <= predicate_2) { - out_vec.push_back(e); - } - } - } - - return out_vec; -}; - -template -inline std::vector between_excl(col_t* column, const uint64_t predicate_1, const uint64_t predicate_2, const uint64_t offset, const size_t blockSize, const std::vector in_pos, const bool reload) { - auto data = reinterpret_cast(column->data) + offset; - if (remote) { - wait_col_data_ready(column, reinterpret_cast(data)); - if (reload) { - if (chunked && !paxed) { - column->request_data(!chunked); - } - } - } - - std::vector out_vec; - out_vec.reserve(blockSize); - if (isFirst) { - for (auto e = 0; e < blockSize; ++e) { - if (predicate_1 < data[e] && data[e] < predicate_2) { - out_vec.push_back(e); - } - } - } else { - for (auto e : in_pos) { - if (predicate_1 < data[e] && data[e] < predicate_2) { - out_vec.push_back(e); - } - } - } - - return out_vec; -}; - -template -uint64_t pipeTempOne(col_t* column1, col_t* column2, col_t* column3, const uint64_t predicate, const std::vector idents) { - size_t OPTIMAL_BLOCK_SIZE_MT = 262144; - - if (remote && !prefetching) { - if (paxed) { - DataCatalog::getInstance().fetchPseudoPax(1, idents); - } else { - column1->request_data(!chunked); - column2->request_data(!chunked); - column3->request_data(!chunked); - } - } - - const size_t columnSize = column1->size; - - size_t max_elems_per_chunk = 0; - size_t standard_block_elements = 0; - if (paxed) { - size_t total_id_len = 0; - for (auto& id : idents) { - total_id_len += id.size(); - } - - const size_t appMetaSize = 3 * sizeof(size_t) + (sizeof(size_t) * idents.size()) + total_id_len; - const size_t maximumPayloadSize = ConnectionManager::getInstance().getConnectionById(1)->maxBytesInPayload(appMetaSize); - - max_elems_per_chunk = ((maximumPayloadSize / idents.size()) / (sizeof(uint64_t) * 4)) * 4; - standard_block_elements = max_elems_per_chunk; - } else if (!(remote && (chunked || paxed))) { - max_elems_per_chunk = columnSize; - standard_block_elements = OPTIMAL_BLOCK_SIZE_MT / sizeof(uint64_t); - } else { - max_elems_per_chunk = DataCatalog::getInstance().dataCatalog_chunkMaxSize / sizeof(uint64_t); - if (max_elems_per_chunk <= OPTIMAL_BLOCK_SIZE_MT / sizeof(uint64_t)) { - standard_block_elements = max_elems_per_chunk; - } else { - standard_block_elements = OPTIMAL_BLOCK_SIZE_MT / sizeof(uint64_t); - } - } - - standard_block_elements /= 4; - - size_t num_blocks = (columnSize / standard_block_elements) + (columnSize % standard_block_elements == 0 ? 0 : 1); - std::array sums{0, 0, 0, 0}; - -#pragma omp parallel for schedule(static, 1) num_threads(4) - for (size_t i = 0; i < num_blocks; ++i) { - size_t baseOffset = i * standard_block_elements; - size_t currentBlockElems = standard_block_elements; - - auto data_2 = reinterpret_cast(column2->data) + baseOffset; - auto data_3 = reinterpret_cast(column3->data) + baseOffset; - - bool reloading = baseOffset % max_elems_per_chunk == 0; - - if (remote && paxed) { - wait_col_data_ready(column3, reinterpret_cast(data_3)); - if (reloading) DataCatalog::getInstance().fetchPseudoPax(1, idents); - } - - const size_t elem_diff = columnSize - baseOffset; - if (elem_diff < currentBlockElems) { - currentBlockElems = elem_diff; - } - - auto le_idx = less_than(column1, predicate, baseOffset, currentBlockElems, {}, reloading); - - if (remote && !paxed) { - wait_col_data_ready(column2, reinterpret_cast(data_2)); - wait_col_data_ready(column3, reinterpret_cast(data_3)); - if (reloading) { - if (chunked) { - column2->request_data(!chunked); - column3->request_data(!chunked); - } - } - } - - int tid = omp_get_thread_num(); - - for (auto idx : le_idx) { - sums[tid] += (data_2[idx] * data_3[idx]); - // ++sum; - } - } - - uint64_t sum = 0; - - for (auto s : sums) { - sum += s; - } - - return sum; -} - -template -uint64_t pipeTempTwo(col_t* column1, col_t* column2, col_t* column3, const uint64_t predicate, const std::vector idents) { - size_t OPTIMAL_BLOCK_SIZE_MT = 131072; - - if (remote && !prefetching) { - if (paxed) { - DataCatalog::getInstance().fetchPseudoPax(1, idents); - } else { - column1->request_data(!chunked); - column2->request_data(!chunked); - column3->request_data(!chunked); - } - } - - const size_t columnSize = column1->size; - - size_t max_elems_per_chunk = 0; - size_t standard_block_elements = 0; - if (paxed) { - size_t total_id_len = 0; - for (auto& id : idents) { - total_id_len += id.size(); - } - - const size_t appMetaSize = 3 * sizeof(size_t) + (sizeof(size_t) * idents.size()) + total_id_len; - const size_t maximumPayloadSize = ConnectionManager::getInstance().getConnectionById(1)->maxBytesInPayload(appMetaSize); - - max_elems_per_chunk = ((maximumPayloadSize / idents.size()) / (sizeof(uint64_t) * 4)) * 4; - standard_block_elements = max_elems_per_chunk; - } else if (!(remote && (chunked || paxed))) { - max_elems_per_chunk = columnSize; - standard_block_elements = OPTIMAL_BLOCK_SIZE_MT / sizeof(uint64_t); - } else { - max_elems_per_chunk = DataCatalog::getInstance().dataCatalog_chunkMaxSize / sizeof(uint64_t); - if (max_elems_per_chunk <= OPTIMAL_BLOCK_SIZE_MT / sizeof(uint64_t)) { - standard_block_elements = max_elems_per_chunk; - } else { - standard_block_elements = OPTIMAL_BLOCK_SIZE_MT / sizeof(uint64_t); - } - } - - standard_block_elements /= 2; - - size_t num_blocks = (columnSize / standard_block_elements) + (columnSize % standard_block_elements == 0 ? 0 : 1); - std::array sums{0, 0}; - -#pragma omp parallel for schedule(static, 1) num_threads(2) - for (size_t i = 0; i < num_blocks; ++i) { - size_t baseOffset = i * standard_block_elements; - size_t currentBlockElems = standard_block_elements; - - auto data_2 = reinterpret_cast(column2->data) + baseOffset; - auto data_3 = reinterpret_cast(column3->data) + baseOffset; - - bool reloading = baseOffset % max_elems_per_chunk == 0; - - if (remote && paxed) { - wait_col_data_ready(column3, reinterpret_cast(data_3)); - if (reloading) DataCatalog::getInstance().fetchPseudoPax(1, idents); - } - - const size_t elem_diff = columnSize - baseOffset; - if (elem_diff < currentBlockElems) { - currentBlockElems = elem_diff; - } - - auto le_idx = less_than(column1, predicate, baseOffset, currentBlockElems, {}, reloading); - - if (remote && !paxed) { - wait_col_data_ready(column2, reinterpret_cast(data_2)); - wait_col_data_ready(column3, reinterpret_cast(data_3)); - if (reloading) { - if (chunked) { - column2->request_data(!chunked); - column3->request_data(!chunked); - } - } - } - - int tid = omp_get_thread_num(); - - for (auto idx : le_idx) { - sums[tid] += (data_2[idx] * data_3[idx]); - // ++sum; - } - } - - uint64_t sum = 0; - - for (auto s : sums) { - sum += s; - } - - return sum; -} - -template -uint64_t pipeTempThree(col_t* column1, col_t* column2, col_t* column3, const uint64_t predicate, const std::vector idents) { - size_t OPTIMAL_BLOCK_SIZE_MT = 65536; - - if (remote && !prefetching) { - if (paxed) { - DataCatalog::getInstance().fetchPseudoPax(1, idents); - } else { - column1->request_data(!chunked); - column2->request_data(!chunked); - column3->request_data(!chunked); - } - } - - const size_t columnSize = column1->size; - - size_t max_elems_per_chunk = 0; - size_t standard_block_elements = 0; - if (paxed) { - size_t total_id_len = 0; - for (auto& id : idents) { - total_id_len += id.size(); - } - - const size_t appMetaSize = 3 * sizeof(size_t) + (sizeof(size_t) * idents.size()) + total_id_len; - const size_t maximumPayloadSize = ConnectionManager::getInstance().getConnectionById(1)->maxBytesInPayload(appMetaSize); - - max_elems_per_chunk = ((maximumPayloadSize / idents.size()) / (sizeof(uint64_t) * 4)) * 4; - standard_block_elements = max_elems_per_chunk; - } else if (!(remote && (chunked || paxed))) { - max_elems_per_chunk = columnSize; - standard_block_elements = OPTIMAL_BLOCK_SIZE_MT / sizeof(uint64_t); - } else { - max_elems_per_chunk = DataCatalog::getInstance().dataCatalog_chunkMaxSize / sizeof(uint64_t); - if (max_elems_per_chunk <= OPTIMAL_BLOCK_SIZE_MT / sizeof(uint64_t)) { - standard_block_elements = max_elems_per_chunk; - } else { - standard_block_elements = OPTIMAL_BLOCK_SIZE_MT / sizeof(uint64_t); - } - } - - uint64_t sum = 0; - - size_t num_blocks = (columnSize / standard_block_elements) + (columnSize % standard_block_elements == 0 ? 0 : 1); - - for (size_t i = 0; i < num_blocks; ++i) { - size_t baseOffset = i * standard_block_elements; - size_t currentBlockElems = standard_block_elements; - - auto data_2 = reinterpret_cast(column2->data) + baseOffset; - auto data_3 = reinterpret_cast(column3->data) + baseOffset; - - bool reloading = baseOffset % max_elems_per_chunk == 0; - - if (remote && paxed) { - wait_col_data_ready(column3, reinterpret_cast(data_3)); - if (reloading) DataCatalog::getInstance().fetchPseudoPax(1, idents); - } - - const size_t elem_diff = columnSize - baseOffset; - if (elem_diff < currentBlockElems) { - currentBlockElems = elem_diff; - } - - auto le_idx = less_than(column1, predicate, baseOffset, currentBlockElems, {}, reloading); - - if (remote && !paxed) { - wait_col_data_ready(column2, reinterpret_cast(data_2)); - wait_col_data_ready(column3, reinterpret_cast(data_3)); - if (reloading) { - if (chunked) { - column2->request_data(!chunked); - column3->request_data(!chunked); - } - } - } - - int tid = omp_get_thread_num(); - - for (auto idx : le_idx) { - sum += (data_2[idx] * data_3[idx]); - // ++sum; - } - } - - return sum; -} - -// 4 Pipelines with 4 Threads no parallel execution -> 1 Pipeline executed by 4 Threads -template -uint64_t orchBenchmark1(const std::vector idents, const std::array, 4> idx) { - std::vector columns; - const std::array predicates{50, 75, 25, 100}; - std::array sums; - uint64_t sum = 0; - - for (auto ident : idents) { - col_t* col; - if (remote) { - col = DataCatalog::getInstance().find_remote(ident); - if (prefetching && !paxed) col->request_data(!chunked); - } else { - col = DataCatalog::getInstance().find_local(ident); - } - - columns.push_back(col); - } - - if (paxed && prefetching) { - DataCatalog::getInstance().fetchPseudoPax(1, idents); - } - - for (size_t i = 0; i < 4; ++i) { - if (prefetching) { - sums[i] = pipeTempOne(columns[idx[i][0]], columns[idx[i][1]], columns[idx[i][2]], predicates[i], idents); - } else { - sums[i] = pipeTempOne(columns[idx[i][0]], columns[idx[i][1]], columns[idx[i][2]], predicates[i], {idents[idx[i][0]], idents[idx[i][1]], idents[idx[i][2]]}); - } - } - - for (auto s : sums) { - sum += s; - } - - return sum; -} - -// 4 Pipelines with 4 Threads half parallel execution -> 1 Pipeline executed by 2 Threads -template -uint64_t orchBenchmark2(const std::vector idents, const std::array, 4> idx) { - std::vector columns; - const std::array predicates{50, 25, 75, 100}; - std::array sums; - uint64_t sum = 0; - - for (auto ident : idents) { - col_t* col; - if (remote) { - col = DataCatalog::getInstance().find_remote(ident); - if (prefetching && !paxed) col->request_data(!chunked); - } else { - col = DataCatalog::getInstance().find_local(ident); - } - - columns.push_back(col); - } - - if (paxed && prefetching) { - DataCatalog::getInstance().fetchPseudoPax(1, idents); - } - -#pragma omp parallel for schedule(static, 2) num_threads(2) - for (size_t i = 0; i < 4; ++i) { - if (prefetching) { - sums[i] = pipeTempTwo(columns[idx[i][0]], columns[idx[i][1]], columns[idx[i][2]], predicates[i], idents); - } else { - sums[i] = pipeTempTwo(columns[idx[i][0]], columns[idx[i][1]], columns[idx[i][2]], predicates[i], {idents[idx[i][0]], idents[idx[i][1]], idents[idx[i][2]]}); - } - } - - for (auto s : sums) { - sum += s; - } - - return sum; -} - -// 4 Pipelines with 4 Threads full parallel execution -template -uint64_t orchBenchmark3(const std::vector idents, const std::array, 4> idx) { - std::vector columns; - const std::array predicates{50, 25, 75, 100}; - std::array sums; - uint64_t sum = 0; - - for (auto ident : idents) { - col_t* col; - if (remote) { - col = DataCatalog::getInstance().find_remote(ident); - if (prefetching && !paxed) col->request_data(!chunked); - } else { - col = DataCatalog::getInstance().find_local(ident); - } - - columns.push_back(col); - } - - if (paxed && prefetching) { - DataCatalog::getInstance().fetchPseudoPax(1, idents); - } - -#pragma omp parallel for schedule(static, 1) num_threads(4) - for (size_t i = 0; i < 4; ++i) { - if (prefetching) { - sums[i] = pipeTempThree(columns[idx[i][0]], columns[idx[i][1]], columns[idx[i][2]], predicates[i], idents); - } else { - sums[i] = pipeTempThree(columns[idx[i][0]], columns[idx[i][1]], columns[idx[i][2]], predicates[i], {idents[idx[i][0]], idents[idx[i][1]], idents[idx[i][2]]}); - } - } - - for (auto s : sums) { - sum += s; - } - - return sum; -} - -// 4 Pipelines with 1 Thread sequentially executed -> Baseline -template -uint64_t orchBenchmark4(const std::vector idents, const std::array, 4> idx) { - std::vector columns; - const std::array predicates{50, 25, 75, 100}; - std::array sums; - uint64_t sum = 0; - - for (auto ident : idents) { - col_t* col; - if (remote) { - col = DataCatalog::getInstance().find_remote(ident); - if (prefetching && !paxed) col->request_data(!chunked); - } else { - col = DataCatalog::getInstance().find_local(ident); - } - - columns.push_back(col); - } - - if (paxed && prefetching) { - DataCatalog::getInstance().fetchPseudoPax(1, idents); - } - - for (size_t i = 0; i < 4; ++i) { - if (prefetching) { - sums[i] = pipeTempThree(columns[idx[i][0]], columns[idx[i][1]], columns[idx[i][2]], predicates[i], idents); - } else { - sums[i] = pipeTempThree(columns[idx[i][0]], columns[idx[i][1]], columns[idx[i][2]], predicates[i], {idents[idx[i][0]], idents[idx[i][1]], idents[idx[i][2]]}); - } - } - - for (auto s : sums) { - sum += s; - } - - return sum; -} - -template -void doBenchmark(Fn&& f1, Fn&& f2, Fn&& f3, Fn&& f4, Fn&& f5, Fn&& f6, std::ofstream& out, const std::string benchIdent, const std::string overlapIdent) { - uint64_t sum = 0; - std::chrono::time_point s_ts; - std::chrono::time_point e_ts; - std::chrono::duration secs; - - for (size_t i = 0; i < 5; ++i) { - DataCatalog::getInstance().fetchRemoteInfo(); - s_ts = std::chrono::high_resolution_clock::now(); - sum = f1(); - e_ts = std::chrono::high_resolution_clock::now(); - - secs = e_ts - s_ts; - - out << "Remote\tFull\tPipeline\t65536\t" << benchIdent << "\t" << overlapIdent << "\t" << sum << "\t" << secs.count() << std::endl - << std::flush; - std::cout << "Remote\tFull\tPipeline\t65536\t" << benchIdent << "\t" << overlapIdent << "\t" << sum << "\t" << secs.count() << std::endl; - - DataCatalog::getInstance().eraseAllRemoteColumns(); - - DataCatalog::getInstance().fetchRemoteInfo(); - s_ts = std::chrono::high_resolution_clock::now(); - sum = f2(); - e_ts = std::chrono::high_resolution_clock::now(); - - secs = e_ts - s_ts; - - out << "Remote\tFull\tPrefetch\t65536\t" << benchIdent << "\t" << overlapIdent << "\t" << sum << "\t" << secs.count() << std::endl - << std::flush; - std::cout << "Remote\tFull\tPrefetch\t65536\t" << benchIdent << "\t" << overlapIdent << "\t" << sum << "\t" << secs.count() << std::endl; - - DataCatalog::getInstance().eraseAllRemoteColumns(); - - for (uint64_t chunkSize = 1ull << 19; chunkSize <= 1ull << 28; chunkSize <<= 1) { - DataCatalog::getInstance().reconfigureChunkSize(chunkSize, chunkSize); - - DataCatalog::getInstance().fetchRemoteInfo(); - s_ts = std::chrono::high_resolution_clock::now(); - sum = f3(); - e_ts = std::chrono::high_resolution_clock::now(); - - secs = e_ts - s_ts; - - out << "Remote\tChunked\tPipeline\t" << +DataCatalog::getInstance().dataCatalog_chunkMaxSize << "\t" << benchIdent << "\t" << overlapIdent << "\t" << sum << "\t" << secs.count() << std::endl - << std::flush; - std::cout << "Remote\tChunked\tPipeline\t" << +DataCatalog::getInstance().dataCatalog_chunkMaxSize << "\t" << benchIdent << "\t" << overlapIdent << "\t" << sum << "\t" << secs.count() << std::endl; - - DataCatalog::getInstance().eraseAllRemoteColumns(); - - DataCatalog::getInstance().fetchRemoteInfo(); - s_ts = std::chrono::high_resolution_clock::now(); - sum = f4(); - e_ts = std::chrono::high_resolution_clock::now(); - - secs = e_ts - s_ts; - - out << "Remote\tChunked\tPrefetch\t" << +DataCatalog::getInstance().dataCatalog_chunkMaxSize << "\t" << benchIdent << "\t" << overlapIdent << "\t" << sum << "\t" << secs.count() << std::endl - << std::flush; - std::cout << "Remote\tChunked\tPrefetch\t" << +DataCatalog::getInstance().dataCatalog_chunkMaxSize << "\t" << benchIdent << "\t" << overlapIdent << "\t" << sum << "\t" << secs.count() << std::endl; - - DataCatalog::getInstance().eraseAllRemoteColumns(); - } - - // DataCatalog::getInstance().fetchRemoteInfo(); - // s_ts = std::chrono::high_resolution_clock::now(); - // sum = f5(); - // e_ts = std::chrono::high_resolution_clock::now(); - - // secs = e_ts - s_ts; - - // out << "Remote\tPaxed\tPipeline\t000000\t" << benchIdent << "\t" << overlapIdent << "\t" << sum << "\t" << secs.count() << std::endl - // << std::flush; - // std::cout << "Remote\tPaxed\tPipeline\t000000\t" << benchIdent << "\t" << overlapIdent << "\t" << sum << "\t" << secs.count() << std::endl; - - // DataCatalog::getInstance().eraseAllRemoteColumns(); - - // DataCatalog::getInstance().fetchRemoteInfo(); - // s_ts = std::chrono::high_resolution_clock::now(); - // sum = f6(); - // e_ts = std::chrono::high_resolution_clock::now(); - - // secs = e_ts - s_ts; - - // out << "Remote\tPaxed\tPrefetch\t000000\t" << benchIdent << "\t" << overlapIdent << "\t" << sum << "\t" << secs.count() << std::endl - // << std::flush; - // std::cout << "Remote\tPaxed\tPrefetch\t000000\t" << benchIdent << "\t" << overlapIdent << "\t" << sum << "\t" << secs.count() << std::endl; - - // DataCatalog::getInstance().eraseAllRemoteColumns(); - } -} - -void benchmark(const std::vector idents, const std::array, 4> idx, std::ofstream& out, const std::string overlapIdent) { - // - - doBenchmark(std::bind(orchBenchmark1, idents, idx), // Remote Full Pipe - std::bind(orchBenchmark1, idents, idx), // Remote Full Prefetch - std::bind(orchBenchmark1, idents, idx), // Remote Chunked Pipe - std::bind(orchBenchmark1, idents, idx), // Remote Chunked Prefetch - std::bind(orchBenchmark1, idents, idx), // Remote Paxed Pipe - std::bind(orchBenchmark1, idents, idx), // Remote Paxed Prefetch - out, "1-4", overlapIdent); - - doBenchmark(std::bind(orchBenchmark2, idents, idx), // Remote Full Pipe - std::bind(orchBenchmark2, idents, idx), // Remote Full Prefetch - std::bind(orchBenchmark2, idents, idx), // Remote Chunked Pipe - std::bind(orchBenchmark2, idents, idx), // Remote Chunked Prefetch - std::bind(orchBenchmark2, idents, idx), // Remote Paxed Pipe - std::bind(orchBenchmark2, idents, idx), // Remote Paxed Prefetch - out, "2-4", overlapIdent); - - doBenchmark(std::bind(orchBenchmark3, idents, idx), // Remote Full Pipe - std::bind(orchBenchmark3, idents, idx), // Remote Full Prefetch - std::bind(orchBenchmark3, idents, idx), // Remote Chunked Pipe - std::bind(orchBenchmark3, idents, idx), // Remote Chunked Prefetch - std::bind(orchBenchmark3, idents, idx), // Remote Paxed Pipe - std::bind(orchBenchmark3, idents, idx), // Remote Paxed Prefetch - out, "4-4", overlapIdent); - - doBenchmark(std::bind(orchBenchmark4, idents, idx), // Remote Full Pipe - std::bind(orchBenchmark4, idents, idx), // Remote Full Prefetch - std::bind(orchBenchmark4, idents, idx), // Remote Chunked Pipe - std::bind(orchBenchmark4, idents, idx), // Remote Chunked Prefetch - std::bind(orchBenchmark4, idents, idx), // Remote Paxed Pipe - std::bind(orchBenchmark4, idents, idx), // Remote Paxed Prefetch - out, "4-1", overlapIdent); -} - -void executeRemoteMTBenchmarkingQueries(std::string& logName) { - std::ofstream out; - out.open(logName, std::ios_base::app); - out << std::fixed << std::setprecision(7) << std::endl; - std::cout << std::fixed << std::setprecision(7) << std::endl; - - benchmark({"col_0", "col_1", "col_2"}, {{{0, 1, 2}, {0, 1, 2}, {0, 1, 2}, {0, 1, 2}}}, out, "3-3"); - benchmark({"col_0", "col_1", "col_2", "col_3", "col_4", "col_5"}, {{{0, 1, 2}, {0, 1, 3}, {0, 1, 4}, {0, 1, 5}}}, out, "2-3"); - benchmark({"col_0", "col_1", "col_2", "col_3", "col_4", "col_5", "col_6", "col_7", "col_8"}, {{{0, 1, 2}, {0, 3, 4}, {0, 5, 6}, {0, 7, 8}}}, out, "1-3"); - benchmark({"col_0", "col_1", "col_2", "col_3", "col_4", "col_5", "col_6", "col_7", "col_8", "col_9", "col_10", "col_11"}, {{{0, 1, 2}, {3, 4, 5}, {6, 7, 8}, {9, 10, 11}}}, out, "0-3"); - - out.close(); -} - -void localBenchmark(const std::vector idents, const std::array, 4> idx, std::ofstream& out, const std::string overlapIdent, const std::string& locality) { - std::chrono::_V2::system_clock::time_point s_ts; - std::chrono::_V2::system_clock::time_point e_ts; - std::chrono::duration secs; - uint64_t sum = 0; - - for (size_t i = 0; i < 20; ++i) { - s_ts = std::chrono::high_resolution_clock::now(); - sum = orchBenchmark1(idents, idx); - e_ts = std::chrono::high_resolution_clock::now(); - - secs = e_ts - s_ts; - - out << locality << "\tFull\tPrefetch\t65536\t1-4\t" << overlapIdent << "\t" << sum << "\t" << secs.count() << std::endl - << std::flush; - std::cout << locality << "\tFull\tPrefetch\t65536\t1-4\t" << overlapIdent << "\t" << sum << "\t" << secs.count() << std::endl; - - s_ts = std::chrono::high_resolution_clock::now(); - sum = orchBenchmark2(idents, idx); - e_ts = std::chrono::high_resolution_clock::now(); - - secs = e_ts - s_ts; - - out << locality << "\tFull\tPrefetch\t65536\t2-4\t" << overlapIdent << "\t" << sum << "\t" << secs.count() << std::endl - << std::flush; - std::cout << locality << "\tFull\tPrefetch\t65536\t2-4\t" << overlapIdent << "\t" << sum << "\t" << secs.count() << std::endl; - - s_ts = std::chrono::high_resolution_clock::now(); - sum = orchBenchmark3(idents, idx); - e_ts = std::chrono::high_resolution_clock::now(); - - secs = e_ts - s_ts; - - out << locality << "\tFull\tPrefetch\t65536\t4-4\t" << overlapIdent << "\t" << sum << "\t" << secs.count() << std::endl - << std::flush; - std::cout << locality << "\tFull\tPrefetch\t65536\t4-4\t" << overlapIdent << "\t" << sum << "\t" << secs.count() << std::endl; - - s_ts = std::chrono::high_resolution_clock::now(); - sum = orchBenchmark4(idents, idx); - e_ts = std::chrono::high_resolution_clock::now(); - - secs = e_ts - s_ts; - - out << locality << "\tFull\tPrefetch\t65536\t4-1\t" << overlapIdent << "\t" << sum << "\t" << secs.count() << std::endl - << std::flush; - std::cout << locality << "\tFull\tPrefetch\t65536\t4-1\t" << overlapIdent << "\t" << sum << "\t" << secs.count() << std::endl; - } -} - -void executeLocalMTBenchmarkingQueries(std::string& logName, std::string locality) { - std::ofstream out; - out.open(logName, std::ios_base::app); - out << std::fixed << std::setprecision(7) << std::endl; - std::cout << std::fixed << std::setprecision(7) << std::endl; - - localBenchmark({"col_0", "col_1", "col_2"}, {{{0, 1, 2}, {0, 1, 2}, {0, 1, 2}, {0, 1, 2}}}, out, "3-3", locality); - localBenchmark({"col_0", "col_1", "col_2", "col_3", "col_4", "col_5"}, {{{0, 1, 2}, {0, 1, 3}, {0, 1, 4}, {0, 1, 5}}}, out, "2-3", locality); - localBenchmark({"col_0", "col_1", "col_2", "col_3", "col_4", "col_5", "col_6", "col_7", "col_8"}, {{{0, 1, 2}, {0, 3, 4}, {0, 5, 6}, {0, 7, 8}}}, out, "1-3", locality); - localBenchmark({"col_0", "col_1", "col_2", "col_3", "col_4", "col_5", "col_6", "col_7", "col_8", "col_9", "col_10", "col_11"}, {{{0, 1, 2}, {3, 4, 5}, {6, 7, 8}, {9, 10, 11}}}, out, "0-3", locality); - - out.close(); -} \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 31907fd..5bb7519 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,19 +1,18 @@ -#include -#include -#include -#include -#include #include #include #include +#include +#include +#include +#include #include +#include #include #include "Benchmarks.hpp" -#include "Column.h" -#include "DataCatalog.h" -#include "Logger.h" +#include "Column.hpp" +#include "DataCatalog.hpp" #include "Worker.hpp" void signal_handler(int signal) { @@ -46,7 +45,7 @@ bool checkLinkUp() { return (result.find("State: Active") != std::string::npos); } -using namespace memordma; +using namespace memConnect; int main(int argc, char *argv[]) { for (auto sig : {SIGINT, SIGUSR1}) { @@ -74,18 +73,30 @@ int main(int argc, char *argv[]) { bool abort = false; auto globalExit = [&]() -> void { - { - using namespace std::chrono_literals; - std::this_thread::sleep_for(500ms); - } + std::this_thread::sleep_for(std::chrono::milliseconds(500)); abort = true; ConnectionManager::getInstance().stop(true); }; +// LOG_DEBUG1("Creating Columns" << std::endl;) +// #pragma omp parallel for schedule(static, 2) num_threads(8) +// for (size_t i = 0; i < 24; ++i) { +// std::string name = "col_" + std::to_string(i); + +// DataCatalog::getInstance().generate(name, col_data_t::gen_bigint, 200000000, 0); +// } + +// #pragma omp parallel for schedule(static, 2) num_threads(8) +// for (size_t i = 24; i < 48; ++i) { +// std::string name = "col_" + std::to_string(i); + +// DataCatalog::getInstance().generate(name, col_data_t::gen_bigint, 200000000, 1); +// } + TaskManager::getInstance().setGlobalAbortFunction(globalExit); - if (ConnectionManager::getInstance().configuration->get(MEMO_DEFAULT_CONNECTION_AUTO_LISTEN)) { + if (ConnectionManager::getInstance().configuration->get(MEMCONNECT_DEFAULT_CONNECTION_AUTO_LISTEN)) { std::thread([]() -> void { TaskManager::getInstance().executeByIdent("listenConnection"); }).detach(); - } else if (ConnectionManager::getInstance().configuration->get(MEMO_DEFAULT_CONNECTION_AUTO_INITIATE)) { + } else if (ConnectionManager::getInstance().configuration->get(MEMCONNECT_DEFAULT_CONNECTION_AUTO_INITIATE)) { std::thread([]() -> void { TaskManager::getInstance().executeByIdent("openConnection"); }).detach(); }