Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
build/*
.vscode/*
sync.sh
logs/*
6 changes: 3 additions & 3 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[submodule "ext/memoRDMA"]
path = ext/memoRDMA
url = git@github.com:AndreasGeyerTUD/memoRDMA.git
[submodule "ext/MemConnect"]
path = ext/MemConnect
url = git@github.com:AndreasGeyerTUD/MemConnect.git
31 changes: 20 additions & 11 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
cmake_minimum_required(VERSION 3.10)
cmake_minimum_required(VERSION 3.16)

project(disaggDataProvider VERSION 0.1)

set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED True)
set(CMAKE_CXX_FLAGS "-Wall -pedantic -Wextra -fopenmp")
set(warnings "-Wall -Wextra -pedantic -Wcast-align -Wcast-qual -Wctor-dtor-privacy -Wdisabled-optimization -Wformat=2 -Winit-self -Wlogical-op -Wmissing-include-dirs -Wnoexcept -Wold-style-cast -Woverloaded-virtual -Wredundant-decls -Wshadow -Wsign-promo -Wstrict-null-sentinel -Wstrict-overflow=5 -Wundef -Wno-unused -Wno-variadic-macros -Wno-parentheses -fdiagnostics-show-option")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${warnings}")
set(OpenMP_CXX_FLAGS "${OpenMP_CXX_FLAGS} -fopenmp")
set(OPTIMIZE_OPTIONS "-O3")
set(warnings "-Wall -Wextra -pedantic")

FIND_PACKAGE(OpenMP)

Expand All @@ -19,13 +20,21 @@ endif()

include_directories(SYSTEM ${OpenMP_INCLUDE_PATH})

add_subdirectory("ext/memoRDMA")
include_directories(ext/memoRDMA)
include_directories(ext/memoRDMA/include)
include_directories(ext/memoRDMA/src)
if(NOT TARGET build-time-make-directory)
add_custom_target(build-time-make-directory ALL
COMMAND ${CMAKE_COMMAND} -E make_directory ${CMAKE_CURRENT_SOURCE_DIR}/logs
COMMAND ${CMAKE_COMMAND} -E make_directory ${CMAKE_CURRENT_SOURCE_DIR}/logs/bench
COMMAND ${CMAKE_COMMAND} -E make_directory ${CMAKE_CURRENT_SOURCE_DIR}/logs/tests)
endif()

add_subdirectory("ext/MemConnect")
include_directories(ext/MemConnect)
include_directories(ext/MemConnect/include)
include_directories(ext/MemConnect/src)

# add_compile_options("-fsanitize=thread")
add_compile_options("-fopenmp")
add_compile_options("${OPTIMIZE_OPTIONS}")

set(SRC_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src)
set(INC_DIR ${CMAKE_CURRENT_SOURCE_DIR}/include)
Expand All @@ -34,13 +43,13 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include)
file(GLOB_RECURSE SOURCES ${SRC_DIR}/*.cpp)
file(GLOB_RECURSE HEADERS ${INC_DIR}/*.h)

file(GLOB_RECURSE SOURCES_RDMA ext/memoRDMA/src/*.cpp)
list(REMOVE_ITEM SOURCES_RDMA ${CMAKE_CURRENT_SOURCE_DIR}/ext/memoRDMA/src/memoRDMA_server.cpp)
file(GLOB_RECURSE HEADERS_RDMA ext/memoRDMA/include/*.h)
file(GLOB_RECURSE SOURCES_RDMA ext/MemConnect/src/*.cpp)
list(REMOVE_ITEM SOURCES_RDMA ${CMAKE_CURRENT_SOURCE_DIR}/ext/MemConnect/src/memConnect_server.cpp)
file(GLOB_RECURSE HEADERS_RDMA ext/MemConnect/include/*.h)
add_library(memoLib ${SOURCES_RDMA} ${HEADERS_RDMA})
target_link_libraries(memoLib ibverbs)

include_directories(${CMAKE_SOURCE_DIR}/ext/memoRDMA ${CMAKE_SOURCE_DIR}/ext/memoRDMA/include)
include_directories(${CMAKE_SOURCE_DIR}/ext/MemConnect ${CMAKE_SOURCE_DIR}/ext/MemConnect/include)

add_executable(disaggDataProvider ${SOURCES} ${HEADERS})
target_link_libraries(disaggDataProvider "pthread" "memoLib" "numa" ${OpenMP_CXX_LIBRARIES})
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Data Provider

Initial commit. This will be some testing repository
1 change: 0 additions & 1 deletion ext/memoRDMA
Submodule memoRDMA deleted from dd4621
7 changes: 5 additions & 2 deletions include/Benchmarks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,15 @@ class Benchmarks {

template <bool filter>
void execUPIBenchmark();

void execRDMABenchmark();
void execRDMAHashJoinBenchmark();
void execRDMAHashJoinPGBenchmark();
void execRDMAHashJoinStarBenchmark();

void execChunkVsChunkStreamBenchmark();
void execPaxVsPaxStreamBenchmark();

static const size_t WORKER_NUMBER = 8;
Worker workers[WORKER_NUMBER];
// Worker workers[WORKER_NUMBER];
};
39 changes: 26 additions & 13 deletions include/Column.h → include/Column.hpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
#pragma once

#include <numa.h>

#include <Logger.hpp>
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <cstring>
#include <mutex>
#include <Logger.h>
#include <numa.h>
#include <unordered_map>

#include "DataCatalog.h"
#include "DataCatalog.hpp"

struct col_t {
template <typename T, bool chunk_iterator>
Expand Down Expand Up @@ -68,8 +70,8 @@ struct col_t {
};

private:
T* data;
col_t* col;
T* data;
};

void* data = nullptr;
Expand All @@ -87,6 +89,9 @@ struct col_t {
std::mutex appendLock;
std::condition_variable iterator_data_available;

size_t highestConsecPackArrived = 0;
std::unordered_map<uint64_t, uint64_t> arrived;

~col_t() {
// May be a problem when freeing memory not allocated with numa_alloc
numa_free(data, sizeInBytes);
Expand Down Expand Up @@ -182,16 +187,16 @@ struct col_t {
current_end = data;
}

void request_data(bool fetch_complete_column) {
void request_data(bool fetch_complete_column, bool asStream = false) {
std::unique_lock<std::mutex> _lk(iteratorLock);
if (is_complete || requested_chunks > received_chunks) {
LOG_DEBUG2("<data request ignored: " << (is_complete ? "is_complete" : "not_complete") << ">" << std::endl;)
LOG_DEBUG2("Data request ignored for: '" << ident << "' because: " << (is_complete ? "is_complete" : "not_complete") << std::endl;)
// Do Nothing, ignore.
return;
}
++requested_chunks;

DataCatalog::getInstance().fetchColStub(1, ident, fetch_complete_column);
DataCatalog::getInstance().fetchColStub(1, ident, fetch_complete_column, asStream);
}

void append_chunk(size_t offset, size_t chunkSize, char* remoteData) {
Expand All @@ -202,8 +207,8 @@ struct col_t {
memcpy(reinterpret_cast<char*>(data) + offset, remoteData, chunkSize);
}

void advance_end_pointer(size_t size) {
current_end = reinterpret_cast<void*>(reinterpret_cast<char*>(current_end) + size);
void advance_end_pointer(size_t _size) {
current_end = reinterpret_cast<void*>(reinterpret_cast<char*>(current_end) + _size);
iterator_data_available.notify_all();
}

Expand Down Expand Up @@ -252,7 +257,7 @@ struct col_t {
break;
}
default: {
using namespace memordma;
using namespace memConnect;
LOG_ERROR("Saw gen_void but its not handled." << std::endl;)
}
}
Expand All @@ -275,6 +280,10 @@ struct col_t {
case col_data_t::gen_double: {
return checksum<double>();
}
default: {
using namespace memConnect;
LOG_ERROR("Saw gen_void but its not handled." << std::endl;)
}
}
return 0;
}
Expand All @@ -301,6 +310,10 @@ struct col_t {
log_to_file_typed<double>(logfile);
break;
}
default: {
using namespace memConnect;
LOG_ERROR("Saw gen_void but its not handled." << std::endl;)
}
};
}

Expand All @@ -313,7 +326,7 @@ struct col_t {
auto tmp = static_cast<T>(data);
for (size_t i = 0; i < size && i < 10; ++i) {
if (datatype == col_data_t::gen_smallint) {
ss << " " << (uint64_t)tmp[i];
ss << " " << static_cast<uint64_t>(tmp[i]);
} else {
ss << " " << tmp[i];
}
Expand All @@ -337,7 +350,7 @@ struct col_t {
std::ofstream log(logname);
for (size_t i = 0; i < size; ++i) {
if (datatype == col_data_t::gen_smallint) {
log << " " << (uint64_t)tmp[i];
log << " " << static_cast<uint64_t>(tmp[i]);
} else {
log << " " << tmp[i];
}
Expand Down Expand Up @@ -366,7 +379,7 @@ struct table_t {

std::default_random_engine generator;

uint8_t colId = 0;
size_t colId = 0;

for (auto& col : columns) {
col->ident = ident + "_col_" + std::to_string(colId);
Expand Down
18 changes: 13 additions & 5 deletions include/DataCatalog.h → include/DataCatalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include <string>
#include <unordered_map>

#include "ConnectionManager.h"
#include "ConnectionManager.hpp"

enum class catalog_communication_code : uint8_t {
send_column_info = 0xA0,
Expand All @@ -19,8 +19,12 @@ enum class catalog_communication_code : uint8_t {
receive_column_data,
fetch_column_chunk,
receive_column_chunk,
fetch_column_as_stream,
receive_column_as_stream,
fetch_pseudo_pax,
fetch_pseudo_pax_stream,
receive_pseudo_pax,
receive_pseudo_pax_stream,
receive_last_pseudo_pax,
reconfigure_chunk_size,
ack_reconfigure_chunk_size,
Expand Down Expand Up @@ -113,6 +117,10 @@ struct col_network_info {
<< " " << size_info * sizeof(double) << " Bytes";
break;
}
default: {
using namespace memConnect;
LOG_ERROR("Saw gen_void but its not handled." << std::endl;)
}
}
return std::move(ss.str());
}
Expand Down Expand Up @@ -192,8 +200,8 @@ class DataCatalog {
DataCatalog();

public:
uint64_t dataCatalog_chunkMaxSize = 1024 * 512 * 4;
uint64_t dataCatalog_chunkThreshold = 1024 * 512 * 4;
uint64_t dataCatalog_chunkMaxSize = 1024 * 1024 * 2;
uint64_t dataCatalog_chunkThreshold = 1024 * 1024 * 2;
std::map<std::string, table_t*> tables;

static DataCatalog& getInstance();
Expand Down Expand Up @@ -228,6 +236,6 @@ class DataCatalog {
void generateBenchmarkData(const uint64_t distinctLocalColumns, const uint64_t remoteColumnsForLocal, const uint64_t localColumnElements, const uint64_t percentageOfRemote, const uint64_t localNumaNode = 0, const uint64_t remoteNumaNode = 0, bool sendToRemote = false, bool createTables = false);

// Communication stubs
void fetchColStub(std::size_t conId, std::string& ident, bool whole_column = true) const;
void fetchPseudoPax(std::size_t conId, std::vector<std::string> idents) const;
void fetchColStub(const std::size_t conId, const std::string& ident, bool whole_column = false, bool asStream = false) const;
void fetchPseudoPax(const std::size_t conId, const std::vector<std::string>& idents, const bool asStream = false) const;
};
6 changes: 3 additions & 3 deletions include/Operators.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include <Column.h>
#include <Column.hpp>

#include <vector>

Expand All @@ -11,7 +11,7 @@ class Operators {
std::vector<size_t> out_vec;
out_vec.reserve(blockSize);
if (isFirst) {
for (auto e = 0; e < blockSize; ++e) {
for (size_t e = 0; e < blockSize; ++e) {
if (data[e] < predicate) {
out_vec.push_back(e);
}
Expand Down Expand Up @@ -116,7 +116,7 @@ class Operators {
std::vector<size_t> out_vec;
out_vec.reserve(blockSize);
if (isFirst) {
for (auto e = 0; e < blockSize; ++e) {
for (size_t e = 0; e < blockSize; ++e) {
if (predicate_1 <= data[e] && data[e] <= predicate_2) {
out_vec.push_back(e);
}
Expand Down
20 changes: 0 additions & 20 deletions include/Queries.h

This file was deleted.

Loading