From de6059b524b5f38c841081493df3a1ce76199443 Mon Sep 17 00:00:00 2001 From: WeiZhao <740286700@qq.com> Date: Sun, 5 Jun 2022 22:26:07 -0400 Subject: [PATCH 01/11] Fix measurements for lightsaber bench --- lightsaber_bench/LightSaber | 2 +- lightsaber_bench/include/ls_base.h | 11 +--- lightsaber_bench/main.cpp | 13 ++--- lightsaber_bench/remote/remoteSink.cpp | 73 ++++++++++++++------------ 4 files changed, 47 insertions(+), 52 deletions(-) diff --git a/lightsaber_bench/LightSaber b/lightsaber_bench/LightSaber index 0df44b1..1dd615e 160000 --- a/lightsaber_bench/LightSaber +++ b/lightsaber_bench/LightSaber @@ -1 +1 @@ -Subproject commit 0df44b16266d4c2c2465017bc2c51ce7c393e671 +Subproject commit 1dd615e8c6046e11d32fbad5521bd27eccb69002 diff --git a/lightsaber_bench/include/ls_base.h b/lightsaber_bench/include/ls_base.h index 7c36ce3..c691865 100644 --- a/lightsaber_bench/include/ls_base.h +++ b/lightsaber_bench/include/ls_base.h @@ -54,16 +54,9 @@ class Benchmark } } - int64_t runBenchmark(int64_t size, int64_t period) + void runBenchmark() { - PopulateBufferWithData(size, period); - - auto start_time = std::chrono::high_resolution_clock::now(); - application->processData(*InputBuffer, -1); - auto end_time = std::chrono::high_resolution_clock::now(); - - int64_t time = duration_cast(end_time - start_time).count(); - return time; + application->processData(*InputBuffer); } }; diff --git a/lightsaber_bench/main.cpp b/lightsaber_bench/main.cpp index 37e9d2d..a76f615 100644 --- a/lightsaber_bench/main.cpp +++ b/lightsaber_bench/main.cpp @@ -6,13 +6,12 @@ int main(int argc, const char **argv) { std::string testcase = (argc > 1) ? argv[1] : "select"; - int64_t size = (argc > 2) ? atoi(argv[2]) : 10000000; - int64_t runs = (argc > 3) ? atoi(argv[3]) : 1; + int64_t size = (argc > 2) ? atoi(argv[2]) : 50000000; + int64_t runs = (argc > 3) ? atoi(argv[3]) : 100; int64_t period = 1; SystemConf::getInstance().BATCH_SIZE = 200000; // This means the input_size (size * sizeof(InputSchema)) must be multiple of 200,000 SystemConf::getInstance().CIRCULAR_BUFFER_SIZE = size * sizeof(Benchmark::InputSchema); - double time = 0; std::unique_ptr benchmarkQuery {}; if (testcase == "select") { benchmarkQuery = std::make_unique(60); @@ -29,12 +28,10 @@ int main(int argc, const char **argv) { throw std::runtime_error("Invalid testcase"); } - for (int64_t i = 0; i < runs; i++) { - time += benchmarkQuery->runBenchmark(size, period); + benchmarkQuery->PopulateBufferWithData(size, period); + for (size_t i = 0; i < runs; i++) { + benchmarkQuery->runBenchmark(); } - std::cout << "Testcase: " << testcase <<", Size: " << size * runs - << ", Time: " << std::setprecision(3) << time / 1000000 << std::endl; - return 0; } \ No newline at end of file diff --git a/lightsaber_bench/remote/remoteSink.cpp b/lightsaber_bench/remote/remoteSink.cpp index 53c3604..fa0721b 100644 --- a/lightsaber_bench/remote/remoteSink.cpp +++ b/lightsaber_bench/remote/remoteSink.cpp @@ -36,43 +36,29 @@ struct alignas(16) YahooOutputSchema { template class RemoteSink { - protected: +private: + int64_t batch_size; + int64_t buffer_size; int m_sock = 0; int m_server_fd = 0; - std::vector *buffer = nullptr; + std::vector buffer; - public: - int run(int64_t output_size) { - InitializeBuffer(output_size * sizeof(T)); - - setupSocket(); - readBytes(m_sock, output_size * sizeof(T), (void *)buffer->data()); - - print_buffer(buffer, output_size); - return 0; - } - - private: - void InitializeBuffer(int64_t size) { - buffer = new std::vector(size); - } - - static void readBytes(int socket, unsigned int length, void *buffer) { - size_t bytesRead = 0; - while (bytesRead < length) { - auto valread = read(socket, (char *)buffer + bytesRead, length - bytesRead); - bytesRead += valread; + void read_one_batch() { + int64_t bytes_read = 0; + while (bytes_read < buffer_size) { + auto valread = read(m_sock, (char *)(buffer.data()) + bytes_read, buffer_size - bytes_read); + bytes_read += valread; } } - void print_buffer(std::vector *buf, int64_t len) { - auto arr = (T *) buf->data(); - for (unsigned long idx = 0; idx < len; idx++) { + void print_buffer() { + auto arr = (T *) buffer.data(); + for (unsigned long idx = 0; idx < batch_size; idx++) { arr[idx].print_data(); } } - void setupSocket() { + void setup_socket() { struct sockaddr_in address {}; int opt = 1; int addrlen = sizeof(address); @@ -99,20 +85,39 @@ class RemoteSink { throw std::runtime_error("error: accept"); } } + +public: + RemoteSink(int64_t batch_size) : + batch_size(batch_size), + buffer_size(batch_size * sizeof(T)), + buffer(buffer_size) + {} + + void run() + { + setup_socket(); + + while (true) { + read_one_batch(); + print_buffer(); + } + } }; int main(int argc, const char **argv) { std::string testcase = (argc > 1) ? argv[1] : "select"; - int64_t output_size = (argc > 2) ? atoi(argv[2]) : 10000000; + int64_t batch_size = (argc > 2) ? atoi(argv[2]) : 100000; + if (testcase == "aggregate") { - auto remoteSink = std::make_unique>(); - remoteSink->run(output_size); + auto remoteSink = std::make_unique>(batch_size); + remoteSink->run(); } else if (testcase == "yahoo") { - auto remoteSink = std::make_unique>(); - remoteSink->run(output_size); + auto remoteSink = std::make_unique>(batch_size); + remoteSink->run(); } else { - auto remoteSink = std::make_unique>(); - remoteSink->run(output_size); + auto remoteSink = std::make_unique>(batch_size); + remoteSink->run(); } + return 0; } \ No newline at end of file From 6c550cd884414219b5e48f6b7dc0e244aa87ead3 Mon Sep 17 00:00:00 2001 From: WeiZhao <740286700@qq.com> Date: Sun, 5 Jun 2022 22:46:36 -0400 Subject: [PATCH 02/11] Change size --- lightsaber_bench/main.cpp | 2 +- lightsaber_bench/remote/remoteSink.cpp | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/lightsaber_bench/main.cpp b/lightsaber_bench/main.cpp index a76f615..cfa232c 100644 --- a/lightsaber_bench/main.cpp +++ b/lightsaber_bench/main.cpp @@ -6,7 +6,7 @@ int main(int argc, const char **argv) { std::string testcase = (argc > 1) ? argv[1] : "select"; - int64_t size = (argc > 2) ? atoi(argv[2]) : 50000000; + int64_t size = (argc > 2) ? atoi(argv[2]) : 40000000; int64_t runs = (argc > 3) ? atoi(argv[3]) : 100; int64_t period = 1; SystemConf::getInstance().BATCH_SIZE = 200000; // This means the input_size (size * sizeof(InputSchema)) must be multiple of 200,000 diff --git a/lightsaber_bench/remote/remoteSink.cpp b/lightsaber_bench/remote/remoteSink.cpp index fa0721b..e67af5f 100644 --- a/lightsaber_bench/remote/remoteSink.cpp +++ b/lightsaber_bench/remote/remoteSink.cpp @@ -99,14 +99,15 @@ class RemoteSink { while (true) { read_one_batch(); - print_buffer(); + std::cout << "Successfully read " << batch_size << " tuples." << std::endl; + // print_buffer(); } } }; int main(int argc, const char **argv) { std::string testcase = (argc > 1) ? argv[1] : "select"; - int64_t batch_size = (argc > 2) ? atoi(argv[2]) : 100000; + int64_t batch_size = (argc > 2) ? atoi(argv[2]) : 10000000; if (testcase == "aggregate") { auto remoteSink = std::make_unique>(batch_size); From 71d7cd0e0e927ec6aa6ae3c7169d98b1fe7d88c2 Mon Sep 17 00:00:00 2001 From: WeiZhao <740286700@qq.com> Date: Mon, 6 Jun 2022 21:23:52 -0400 Subject: [PATCH 03/11] Add flag to indicate task queue has received all tasks / Master Thread waits for threads to finish before return --- lightsaber_bench/LightSaber | 2 +- lightsaber_bench/include/ls_base.h | 21 ++++++++++++++++++--- lightsaber_bench/main.cpp | 11 ++++++----- 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/lightsaber_bench/LightSaber b/lightsaber_bench/LightSaber index 1dd615e..7e33899 160000 --- a/lightsaber_bench/LightSaber +++ b/lightsaber_bench/LightSaber @@ -1 +1 @@ -Subproject commit 1dd615e8c6046e11d32fbad5521bd27eccb69002 +Subproject commit 7e338996a515137d7f900b97fb8f5de32871873e diff --git a/lightsaber_bench/include/ls_base.h b/lightsaber_bench/include/ls_base.h index c691865..6727d25 100644 --- a/lightsaber_bench/include/ls_base.h +++ b/lightsaber_bench/include/ls_base.h @@ -54,11 +54,26 @@ class Benchmark } } - void runBenchmark() + int64_t runBenchmark(int64_t size, int64_t period, int64_t runs) { - application->processData(*InputBuffer); - } + PopulateBufferWithData(size, period); + + auto start_time = std::chrono::high_resolution_clock::now(); + + for (int64_t i = 0; i < runs; i++) { + application->processData(*InputBuffer); + } + + // Signal that no more tasks will be enqueued + application->closeTaskQueue(); + // Wait for worker threads to finish executions + application->waitForCompletion(); + auto end_time = std::chrono::high_resolution_clock::now(); + int64_t time = duration_cast(end_time - start_time).count(); + + return time; + } }; #endif // LIGHTSABER_BENCH_INCLUDE_BENCHMARK_H_ \ No newline at end of file diff --git a/lightsaber_bench/main.cpp b/lightsaber_bench/main.cpp index cfa232c..6f38948 100644 --- a/lightsaber_bench/main.cpp +++ b/lightsaber_bench/main.cpp @@ -7,10 +7,11 @@ int main(int argc, const char **argv) { std::string testcase = (argc > 1) ? argv[1] : "select"; int64_t size = (argc > 2) ? atoi(argv[2]) : 40000000; - int64_t runs = (argc > 3) ? atoi(argv[3]) : 100; + int64_t runs = (argc > 3) ? atoi(argv[3]) : 1; int64_t period = 1; SystemConf::getInstance().BATCH_SIZE = 200000; // This means the input_size (size * sizeof(InputSchema)) must be multiple of 200,000 SystemConf::getInstance().CIRCULAR_BUFFER_SIZE = size * sizeof(Benchmark::InputSchema); + SystemConf::getInstance().WORKER_THREADS = 1; std::unique_ptr benchmarkQuery {}; if (testcase == "select") { @@ -28,10 +29,10 @@ int main(int argc, const char **argv) { throw std::runtime_error("Invalid testcase"); } - benchmarkQuery->PopulateBufferWithData(size, period); - for (size_t i = 0; i < runs; i++) { - benchmarkQuery->runBenchmark(); - } + double time = benchmarkQuery->runBenchmark(size, period, runs); + + std::cout << "Testcase: " << testcase << ", Size: " << size * runs + << ", Time: " << std::setprecision(3) << time / 1000000 << std::endl; return 0; } \ No newline at end of file From f590d77972a872e24b253375ba8ef73e7d1bbe13 Mon Sep 17 00:00:00 2001 From: WeiZhao <740286700@qq.com> Date: Mon, 6 Jun 2022 22:04:32 -0400 Subject: [PATCH 04/11] Add num_cores as command line arguments --- lightsaber_bench/main.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lightsaber_bench/main.cpp b/lightsaber_bench/main.cpp index 6f38948..553f0d5 100644 --- a/lightsaber_bench/main.cpp +++ b/lightsaber_bench/main.cpp @@ -8,10 +8,11 @@ int main(int argc, const char **argv) { std::string testcase = (argc > 1) ? argv[1] : "select"; int64_t size = (argc > 2) ? atoi(argv[2]) : 40000000; int64_t runs = (argc > 3) ? atoi(argv[3]) : 1; + int64_t cores = (argc > 4) ? atoi(argv[4]) : 1; int64_t period = 1; SystemConf::getInstance().BATCH_SIZE = 200000; // This means the input_size (size * sizeof(InputSchema)) must be multiple of 200,000 SystemConf::getInstance().CIRCULAR_BUFFER_SIZE = size * sizeof(Benchmark::InputSchema); - SystemConf::getInstance().WORKER_THREADS = 1; + SystemConf::getInstance().WORKER_THREADS = cores; std::unique_ptr benchmarkQuery {}; if (testcase == "select") { From 7533caf9f5b9264bee059f31e5b87e4f2ca094f2 Mon Sep 17 00:00:00 2001 From: WeiZhao <740286700@qq.com> Date: Mon, 13 Jun 2022 00:36:57 -0400 Subject: [PATCH 05/11] Tune parameters to fix error in multi-threaded aggregate --- lightsaber_bench/LightSaber | 2 +- lightsaber_bench/include/ls_yahoo.h | 3 +++ lightsaber_bench/main.cpp | 8 ++++---- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/lightsaber_bench/LightSaber b/lightsaber_bench/LightSaber index 7e33899..4d71056 160000 --- a/lightsaber_bench/LightSaber +++ b/lightsaber_bench/LightSaber @@ -1 +1 @@ -Subproject commit 7e338996a515137d7f900b97fb8f5de32871873e +Subproject commit 4d710565a9a79a3bf582685540258f641f12de76 diff --git a/lightsaber_bench/include/ls_yahoo.h b/lightsaber_bench/include/ls_yahoo.h index 69b767c..a5ec02f 100644 --- a/lightsaber_bench/include/ls_yahoo.h +++ b/lightsaber_bench/include/ls_yahoo.h @@ -41,6 +41,9 @@ class YahooBench : public Benchmark void createApplication() override { + SystemConf::getInstance().SLOTS = 256; + SystemConf::getInstance().PARTIAL_WINDOWS = 64; + // Filter out event that has event_type == 1 auto predicate = new ComparisonPredicate(EQUAL_OP, new ColumnReference(4, BasicType::Long), new LongConstant(1)); Selection *selection = new Selection(predicate); diff --git a/lightsaber_bench/main.cpp b/lightsaber_bench/main.cpp index 553f0d5..2e90cd2 100644 --- a/lightsaber_bench/main.cpp +++ b/lightsaber_bench/main.cpp @@ -6,12 +6,12 @@ int main(int argc, const char **argv) { std::string testcase = (argc > 1) ? argv[1] : "select"; - int64_t size = (argc > 2) ? atoi(argv[2]) : 40000000; + int64_t size = (argc > 2) ? atoi(argv[2]) : 10000000; // In # of events int64_t runs = (argc > 3) ? atoi(argv[3]) : 1; int64_t cores = (argc > 4) ? atoi(argv[4]) : 1; int64_t period = 1; - SystemConf::getInstance().BATCH_SIZE = 200000; // This means the input_size (size * sizeof(InputSchema)) must be multiple of 200,000 - SystemConf::getInstance().CIRCULAR_BUFFER_SIZE = size * sizeof(Benchmark::InputSchema); + SystemConf::getInstance().BATCH_SIZE = 131072; // In Bytes + SystemConf::getInstance().CIRCULAR_BUFFER_SIZE = 2 * size * sizeof(Benchmark::InputSchema); // In Bytes, the coefficient is worth tuning SystemConf::getInstance().WORKER_THREADS = cores; std::unique_ptr benchmarkQuery {}; @@ -24,7 +24,7 @@ int main(int argc, const char **argv) { } else if (testcase == "alterdur") { benchmarkQuery = std::make_unique(1, 60); } else if (testcase == "yahoo") { - SystemConf::getInstance().CIRCULAR_BUFFER_SIZE = size * sizeof(YahooBench::YahooSchema); + SystemConf::getInstance().CIRCULAR_BUFFER_SIZE = 2 * size * sizeof(YahooBench::YahooSchema); benchmarkQuery = std::make_unique(1000); } else { throw std::runtime_error("Invalid testcase"); From 9784f7cd2c3e1648ef7269ed41c072c8905188cf Mon Sep 17 00:00:00 2001 From: WeiZhao <740286700@qq.com> Date: Sun, 9 Oct 2022 04:40:32 -0400 Subject: [PATCH 06/11] Fix yahoo benchmark --- lightsaber_bench/include/ls_yahoo.h | 9 ++++++++- lightsaber_bench/main.cpp | 9 +++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/lightsaber_bench/include/ls_yahoo.h b/lightsaber_bench/include/ls_yahoo.h index a5ec02f..349a503 100644 --- a/lightsaber_bench/include/ls_yahoo.h +++ b/lightsaber_bench/include/ls_yahoo.h @@ -14,7 +14,8 @@ class YahooBench : public Benchmark long window_size; TupleSchema *getSchema () override { - auto schema = new TupleSchema(5, "YahooBenchmark"); + /* For some reason, it only works when I add padding to the schema to make it 64 bytes */ + auto schema = new TupleSchema(8, "YahooBenchmark"); auto longAttr = AttributeType(BasicType::Long); schema->setAttributeType(0, longAttr); /* st: long */ @@ -22,6 +23,9 @@ class YahooBench : public Benchmark schema->setAttributeType(2, longAttr); /* user_id: long */ schema->setAttributeType(3, longAttr); /* campaign_id: long */ schema->setAttributeType(4, longAttr); /* event_type: long */ + schema->setAttributeType(5, longAttr); /* dummy: long */ + schema->setAttributeType(6, longAttr); /* dummy: long */ + schema->setAttributeType(7, longAttr); /* dummy: long */ return schema; } @@ -98,6 +102,9 @@ class YahooBench : public Benchmark long user_id; long campaign_id; long event_type; + long dummy1; + long dummy2; + long dummy3; }; YahooBench(long window_size) diff --git a/lightsaber_bench/main.cpp b/lightsaber_bench/main.cpp index 2e90cd2..fa91996 100644 --- a/lightsaber_bench/main.cpp +++ b/lightsaber_bench/main.cpp @@ -6,12 +6,12 @@ int main(int argc, const char **argv) { std::string testcase = (argc > 1) ? argv[1] : "select"; - int64_t size = (argc > 2) ? atoi(argv[2]) : 10000000; // In # of events - int64_t runs = (argc > 3) ? atoi(argv[3]) : 1; + int64_t size = (argc > 2) ? atoi(argv[2]) : 1000000; // In # of events + int64_t runs = (argc > 3) ? atoi(argv[3]) : 10000; int64_t cores = (argc > 4) ? atoi(argv[4]) : 1; int64_t period = 1; SystemConf::getInstance().BATCH_SIZE = 131072; // In Bytes - SystemConf::getInstance().CIRCULAR_BUFFER_SIZE = 2 * size * sizeof(Benchmark::InputSchema); // In Bytes, the coefficient is worth tuning + SystemConf::getInstance().CIRCULAR_BUFFER_SIZE = 8 * size * sizeof(Benchmark::InputSchema); // In Bytes, the coefficient is worth tuning SystemConf::getInstance().WORKER_THREADS = cores; std::unique_ptr benchmarkQuery {}; @@ -24,7 +24,8 @@ int main(int argc, const char **argv) { } else if (testcase == "alterdur") { benchmarkQuery = std::make_unique(1, 60); } else if (testcase == "yahoo") { - SystemConf::getInstance().CIRCULAR_BUFFER_SIZE = 2 * size * sizeof(YahooBench::YahooSchema); + SystemConf::getInstance().BATCH_SIZE = 1048576; + SystemConf::getInstance().CIRCULAR_BUFFER_SIZE = 8 * size * sizeof(YahooBench::YahooSchema); benchmarkQuery = std::make_unique(1000); } else { throw std::runtime_error("Invalid testcase"); From 22140e0d8f5aa1b62afab2a5247b602923c2418b Mon Sep 17 00:00:00 2001 From: WeiZhao <740286700@qq.com> Date: Tue, 11 Oct 2022 23:14:35 -0400 Subject: [PATCH 07/11] Fix Dockerfile --- lightsaber_bench/Dockerfile | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lightsaber_bench/Dockerfile b/lightsaber_bench/Dockerfile index b9894f3..87691a1 100644 --- a/lightsaber_bench/Dockerfile +++ b/lightsaber_bench/Dockerfile @@ -46,7 +46,7 @@ RUN apt update && \ RUN cd && \ apt remove --purge --auto-remove cmake && \ - version=3.16 && \ + version=3.17 && \ build=2 && \ mkdir ~/temp && \ cd ~/temp && \ @@ -66,7 +66,7 @@ RUN cd /usr/src/gtest && \ ln -s /usr/lib/libgtest_main.a /usr/local/lib/gtest/libgtest_main.a RUN cd && \ - git clone --depth 1 https://github.com/wzhao18/benchmark.git && \ + git clone --depth 1 https://github.com/google/benchmark.git && \ cd benchmark && \ mkdir build && \ cd build && \ @@ -98,6 +98,7 @@ ENV LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIBRARY_PATH ENV PATH=/usr/lib/ccache:$PATH RUN cd && \ + apt upgrade -y && \ apt install -y \ git \ gcc \ @@ -137,6 +138,7 @@ RUN cd && \ make install RUN cd && \ + apt upgrade -y && \ apt install -y \ autoconf \ automake \ From ca254e772bc97c9f03b93e9756d333c2faddf0b0 Mon Sep 17 00:00:00 2001 From: WeiZhao <740286700@qq.com> Date: Wed, 12 Oct 2022 01:34:07 -0400 Subject: [PATCH 08/11] Fix yahoo benchmark scaling issue --- lightsaber_bench/include/ls_aggregate.h | 3 --- lightsaber_bench/include/ls_base.h | 20 +++++++++------ lightsaber_bench/include/ls_yahoo.h | 33 ++++++++++++++----------- lightsaber_bench/main.cpp | 11 ++++++--- lightsaber_bench/remote/remoteSink.cpp | 3 ++- lightsaber_bench/run.sh | 14 +++++++++++ 6 files changed, 55 insertions(+), 29 deletions(-) create mode 100644 lightsaber_bench/run.sh diff --git a/lightsaber_bench/include/ls_aggregate.h b/lightsaber_bench/include/ls_aggregate.h index bbdb98d..450466d 100644 --- a/lightsaber_bench/include/ls_aggregate.h +++ b/lightsaber_bench/include/ls_aggregate.h @@ -12,9 +12,6 @@ class AggregateBench : public Benchmark long window_size; void createApplication() override { - SystemConf::getInstance().SLOTS = 256; - SystemConf::getInstance().PARTIAL_WINDOWS = 64; - // Configure non-grouped aggregation. Check the application benchmarks for grouped aggreagations. std::vector aggregationTypes(1); aggregationTypes[0] = AggregationTypes::fromString("sum"); diff --git a/lightsaber_bench/include/ls_base.h b/lightsaber_bench/include/ls_base.h index 6727d25..b673470 100644 --- a/lightsaber_bench/include/ls_base.h +++ b/lightsaber_bench/include/ls_base.h @@ -24,7 +24,7 @@ class Benchmark }; int m_sock = 0; int m_server_fd; - std::vector *InputBuffer = nullptr; + std::vector*> InputBuffers; QueryApplication *application = nullptr; virtual void createApplication() = 0; @@ -45,12 +45,16 @@ class Benchmark { double range = 100; - InputBuffer = new std::vector (size * sizeof(InputSchema)); - auto ptr = (InputSchema *) InputBuffer->data(); - for (unsigned long idx = 0; idx < size; idx++) { - ptr[idx].st = idx * period; - ptr[idx].dur = period; - ptr[idx].payload = static_cast(rand() / static_cast(RAND_MAX / range)) - (range / 2); + // 10000 input buffers, each of size 'size' + for (size_t i = 0; i < 10000; i++) { + auto buffer = new std::vector (size * sizeof(InputSchema)); + auto ptr = (InputSchema *) buffer->data(); + for (unsigned long idx = 0; idx < size; idx++) { + ptr[idx].st = idx * period; + ptr[idx].dur = period; + ptr[idx].payload = static_cast(rand() / static_cast(RAND_MAX / range)) - (range / 2); + } + InputBuffers.push_back(buffer); } } @@ -61,7 +65,7 @@ class Benchmark auto start_time = std::chrono::high_resolution_clock::now(); for (int64_t i = 0; i < runs; i++) { - application->processData(*InputBuffer); + application->processData(*(InputBuffers[runs % 10000])); } // Signal that no more tasks will be enqueued diff --git a/lightsaber_bench/include/ls_yahoo.h b/lightsaber_bench/include/ls_yahoo.h index 349a503..536b3a0 100644 --- a/lightsaber_bench/include/ls_yahoo.h +++ b/lightsaber_bench/include/ls_yahoo.h @@ -32,22 +32,23 @@ class YahooBench : public Benchmark void PopulateBufferWithData(int64_t size, int64_t period) override { - InputBuffer = new std::vector (size * sizeof(YahooSchema)); - auto ptr = (YahooSchema *) InputBuffer->data(); - for (unsigned long idx = 0; idx < size; idx++) { - ptr[idx].st = idx * period; - ptr[idx].dur = period; - ptr[idx].user_id = static_cast(rand() % 5 + 1); - ptr[idx].campaign_id = static_cast(rand() % 5 + 1); - ptr[idx].event_type = static_cast(rand() % 5 + 1); + // 10 input buffers for now + for (size_t i = 0; i < 10000; i++) { + auto buffer = new std::vector (size * sizeof(YahooSchema)); + auto ptr = (YahooSchema *) buffer->data(); + for (unsigned long idx = 0; idx < size; idx++) { + ptr[idx].st = idx * period; + ptr[idx].dur = period; + ptr[idx].user_id = static_cast(rand() % 5 + 1); + ptr[idx].campaign_id = static_cast(2); + ptr[idx].event_type = static_cast(rand() % 5 + 1); + } + InputBuffers.push_back(buffer); } } void createApplication() override { - SystemConf::getInstance().SLOTS = 256; - SystemConf::getInstance().PARTIAL_WINDOWS = 64; - // Filter out event that has event_type == 1 auto predicate = new ComparisonPredicate(EQUAL_OP, new ColumnReference(4, BasicType::Long), new LongConstant(1)); Selection *selection = new Selection(predicate); @@ -67,11 +68,15 @@ class YahooBench : public Benchmark std::vector aggregationAttributes(1); aggregationAttributes[0] = new ColumnReference(2, BasicType::Long); - std::vector groupByAttributes(0); + // Must do a Groupby here, otherwise hit unhandled bug + std::vector groupByAttributes(1); + groupByAttributes[0] = new ColumnReference(3, BasicType::Long); - auto window = new WindowDefinition(ROW_BASED, window_size, window_size); + auto window = new WindowDefinition(RANGE_BASED, window_size, window_size); Aggregation *aggregation = new Aggregation(*window, aggregationTypes, aggregationAttributes, groupByAttributes); + bool replayTimestamps = window->isRangeBased(); + // Set up code-generated operator OperatorKernel *genCode = new OperatorKernel(true); genCode->setInputSchema(getSchema()); @@ -89,7 +94,7 @@ class YahooBench : public Benchmark long timestampReference = std::chrono::system_clock::now().time_since_epoch().count(); std::vector> queries(1); - queries[0] = std::make_shared(0, operators, *window, getSchema(), timestampReference, true, false, true); + queries[0] = std::make_shared(0, operators, *window, getSchema(), timestampReference, true, replayTimestamps, !replayTimestamps); application = new QueryApplication(queries); application->setup(); diff --git a/lightsaber_bench/main.cpp b/lightsaber_bench/main.cpp index fa91996..89a01a4 100644 --- a/lightsaber_bench/main.cpp +++ b/lightsaber_bench/main.cpp @@ -6,8 +6,8 @@ int main(int argc, const char **argv) { std::string testcase = (argc > 1) ? argv[1] : "select"; - int64_t size = (argc > 2) ? atoi(argv[2]) : 1000000; // In # of events - int64_t runs = (argc > 3) ? atoi(argv[3]) : 10000; + int64_t size = (argc > 2) ? atoi(argv[2]) : 10000; // In # of events + int64_t runs = (argc > 3) ? atoi(argv[3]) : 1000000; int64_t cores = (argc > 4) ? atoi(argv[4]) : 1; int64_t period = 1; SystemConf::getInstance().BATCH_SIZE = 131072; // In Bytes @@ -20,11 +20,16 @@ int main(int argc, const char **argv) { } else if (testcase == "where") { benchmarkQuery = std::make_unique(60); } else if (testcase == "aggregate") { + SystemConf::getInstance().SLOTS = 256; + SystemConf::getInstance().PARTIAL_WINDOWS = 64; benchmarkQuery = std::make_unique(1000); } else if (testcase == "alterdur") { benchmarkQuery = std::make_unique(1, 60); } else if (testcase == "yahoo") { - SystemConf::getInstance().BATCH_SIZE = 1048576; + SystemConf::getInstance().SLOTS = 128; + SystemConf::getInstance().PARTIAL_WINDOWS = 32; + SystemConf::getInstance().HASH_TABLE_SIZE = 128; + SystemConf::getInstance().BUNDLE_SIZE = size * sizeof(YahooBench::YahooSchema); // not used, simply to bypass assertions. SystemConf::getInstance().CIRCULAR_BUFFER_SIZE = 8 * size * sizeof(YahooBench::YahooSchema); benchmarkQuery = std::make_unique(1000); } else { diff --git a/lightsaber_bench/remote/remoteSink.cpp b/lightsaber_bench/remote/remoteSink.cpp index e67af5f..42d5199 100644 --- a/lightsaber_bench/remote/remoteSink.cpp +++ b/lightsaber_bench/remote/remoteSink.cpp @@ -27,10 +27,11 @@ struct alignas(16) AggregateOutputSchema { struct alignas(16) YahooOutputSchema { long timestamp; + long campaign_id; int count; void print_data() { - std::cout << "[" << timestamp << "]: " << count << std::endl; + std::cout << "[" << timestamp << "]: " << campaign_id << " " << count << std::endl; } }; diff --git a/lightsaber_bench/run.sh b/lightsaber_bench/run.sh new file mode 100644 index 0000000..1ee62c9 --- /dev/null +++ b/lightsaber_bench/run.sh @@ -0,0 +1,14 @@ + +#!/bin/bash + +sb_bench_bin="./build/lightsaber" +log_file="./result.txt" + +for bench in "select" "where" "aggregate" "yahoo" +do + for cores in 1 3 7 15 31 + do + echo ${sb_bench_bin} ${bench} 10000 1000000 ${cores} ${size} + ${sb_bench_bin} ${bench} 10000 1000000 ${cores} ${size} + done +done \ No newline at end of file From 525a179aa8b950ea7348a0b5b55953c646c6e492 Mon Sep 17 00:00:00 2001 From: WeiZhao <740286700@qq.com> Date: Wed, 12 Oct 2022 16:23:20 -0400 Subject: [PATCH 09/11] Fix yahoo --- lightsaber_bench/include/ls_base.h | 8 ++++---- lightsaber_bench/include/ls_yahoo.h | 4 ++-- lightsaber_bench/main.cpp | 13 ++++++++----- lightsaber_bench/run.sh | 15 ++++++++++++++- 4 files changed, 28 insertions(+), 12 deletions(-) diff --git a/lightsaber_bench/include/ls_base.h b/lightsaber_bench/include/ls_base.h index b673470..b082af6 100644 --- a/lightsaber_bench/include/ls_base.h +++ b/lightsaber_bench/include/ls_base.h @@ -45,8 +45,8 @@ class Benchmark { double range = 100; - // 10000 input buffers, each of size 'size' - for (size_t i = 0; i < 10000; i++) { + // Make 1000 copies of buffer. Each run will use a different one. + for (size_t i = 0; i < 1000; i++) { auto buffer = new std::vector (size * sizeof(InputSchema)); auto ptr = (InputSchema *) buffer->data(); for (unsigned long idx = 0; idx < size; idx++) { @@ -64,8 +64,8 @@ class Benchmark auto start_time = std::chrono::high_resolution_clock::now(); - for (int64_t i = 0; i < runs; i++) { - application->processData(*(InputBuffers[runs % 10000])); + for (size_t i = 0; i < runs; i++) { + application->processData(*(InputBuffers[i % 1000])); } // Signal that no more tasks will be enqueued diff --git a/lightsaber_bench/include/ls_yahoo.h b/lightsaber_bench/include/ls_yahoo.h index 536b3a0..3b00da4 100644 --- a/lightsaber_bench/include/ls_yahoo.h +++ b/lightsaber_bench/include/ls_yahoo.h @@ -33,11 +33,11 @@ class YahooBench : public Benchmark void PopulateBufferWithData(int64_t size, int64_t period) override { // 10 input buffers for now - for (size_t i = 0; i < 10000; i++) { + for (size_t i = 0; i < 1000; i++) { auto buffer = new std::vector (size * sizeof(YahooSchema)); auto ptr = (YahooSchema *) buffer->data(); for (unsigned long idx = 0; idx < size; idx++) { - ptr[idx].st = idx * period; + ptr[idx].st = i * 1000 + idx * period; ptr[idx].dur = period; ptr[idx].user_id = static_cast(rand() % 5 + 1); ptr[idx].campaign_id = static_cast(2); diff --git a/lightsaber_bench/main.cpp b/lightsaber_bench/main.cpp index 89a01a4..11ca350 100644 --- a/lightsaber_bench/main.cpp +++ b/lightsaber_bench/main.cpp @@ -6,10 +6,12 @@ int main(int argc, const char **argv) { std::string testcase = (argc > 1) ? argv[1] : "select"; - int64_t size = (argc > 2) ? atoi(argv[2]) : 10000; // In # of events - int64_t runs = (argc > 3) ? atoi(argv[3]) : 1000000; + // In # of events. This will multiply by 1000. So don't set it too much or memory will exhaust. + int64_t size = (argc > 2) ? atoi(argv[2]) : 100000; + int64_t runs = (argc > 3) ? atoi(argv[3]) : 10000; int64_t cores = (argc > 4) ? atoi(argv[4]) : 1; int64_t period = 1; + SystemConf::getInstance().BATCH_SIZE = 131072; // In Bytes SystemConf::getInstance().CIRCULAR_BUFFER_SIZE = 8 * size * sizeof(Benchmark::InputSchema); // In Bytes, the coefficient is worth tuning SystemConf::getInstance().WORKER_THREADS = cores; @@ -26,9 +28,10 @@ int main(int argc, const char **argv) { } else if (testcase == "alterdur") { benchmarkQuery = std::make_unique(1, 60); } else if (testcase == "yahoo") { - SystemConf::getInstance().SLOTS = 128; - SystemConf::getInstance().PARTIAL_WINDOWS = 32; - SystemConf::getInstance().HASH_TABLE_SIZE = 128; + SystemConf::getInstance().BATCH_SIZE = 1048576; + SystemConf::getInstance().SLOTS = 512; + SystemConf::getInstance().PARTIAL_WINDOWS = 512; + SystemConf::getInstance().HASH_TABLE_SIZE = 1024; SystemConf::getInstance().BUNDLE_SIZE = size * sizeof(YahooBench::YahooSchema); // not used, simply to bypass assertions. SystemConf::getInstance().CIRCULAR_BUFFER_SIZE = 8 * size * sizeof(YahooBench::YahooSchema); benchmarkQuery = std::make_unique(1000); diff --git a/lightsaber_bench/run.sh b/lightsaber_bench/run.sh index 1ee62c9..0a02ce0 100644 --- a/lightsaber_bench/run.sh +++ b/lightsaber_bench/run.sh @@ -11,4 +11,17 @@ do echo ${sb_bench_bin} ${bench} 10000 1000000 ${cores} ${size} ${sb_bench_bin} ${bench} 10000 1000000 ${cores} ${size} done -done \ No newline at end of file +done + + +std::string testcase = (argc > 1) ? argv[1] : "select"; +int64_t size = (argc > 2) ? atoi(argv[2]) : 10000; // In # of events +int64_t runs = (argc > 3) ? atoi(argv[3]) : 1000000; +int64_t batch_size = (argc > 4) ? atoi(argv[4]) : 1; +int64_t slots = (argc > 5) ? atoi(argv[5]) : 1; +int64_t partial_windows = (argc > 6) ? atoi(argv[6]) : 1; +int64_t hash_table_size = (argc > 7) ? atoi(argv[7]) : 1; +int64_t cores = (argc > 8) ? atoi(argv[8]) : 1; + + +./lightsaber yahoo 100000 10000 1048576 512 512 1024 15 \ No newline at end of file From 6daf88f4429f8038bd81bd05c5f6d9939adf0325 Mon Sep 17 00:00:00 2001 From: Wei Zhao <51183510+wzhao18@users.noreply.github.com> Date: Wed, 12 Oct 2022 19:57:57 -0400 Subject: [PATCH 10/11] Update Dockerfile --- lightsaber_bench/Dockerfile | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lightsaber_bench/Dockerfile b/lightsaber_bench/Dockerfile index 87691a1..f2552e6 100644 --- a/lightsaber_bench/Dockerfile +++ b/lightsaber_bench/Dockerfile @@ -98,6 +98,7 @@ ENV LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIBRARY_PATH ENV PATH=/usr/lib/ccache:$PATH RUN cd && \ + apt update && \ apt upgrade -y && \ apt install -y \ git \ @@ -138,6 +139,7 @@ RUN cd && \ make install RUN cd && \ + apt update && \ apt upgrade -y && \ apt install -y \ autoconf \ @@ -182,4 +184,4 @@ RUN cd /root/lightsaber_bench && \ mkdir -p build && \ cd build && \ cmake .. -DCMAKE_BUILD_TYPE=Release && \ - make -j$(nproc) 2>/dev/null \ No newline at end of file + make -j$(nproc) 2>/dev/null From 33348c5eb94c7e08cb014d90eba6d264edb54251 Mon Sep 17 00:00:00 2001 From: WeiZhao <740286700@qq.com> Date: Sat, 15 Oct 2022 21:17:13 -0400 Subject: [PATCH 11/11] fix run script --- lightsaber_bench/run.sh | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/lightsaber_bench/run.sh b/lightsaber_bench/run.sh index 0a02ce0..4938271 100644 --- a/lightsaber_bench/run.sh +++ b/lightsaber_bench/run.sh @@ -8,20 +8,7 @@ for bench in "select" "where" "aggregate" "yahoo" do for cores in 1 3 7 15 31 do - echo ${sb_bench_bin} ${bench} 10000 1000000 ${cores} ${size} - ${sb_bench_bin} ${bench} 10000 1000000 ${cores} ${size} + echo ${sb_bench_bin} ${bench} 100000 10000 ${cores} ${size} + ${sb_bench_bin} ${bench} 100000 10000 ${cores} ${size} done -done - - -std::string testcase = (argc > 1) ? argv[1] : "select"; -int64_t size = (argc > 2) ? atoi(argv[2]) : 10000; // In # of events -int64_t runs = (argc > 3) ? atoi(argv[3]) : 1000000; -int64_t batch_size = (argc > 4) ? atoi(argv[4]) : 1; -int64_t slots = (argc > 5) ? atoi(argv[5]) : 1; -int64_t partial_windows = (argc > 6) ? atoi(argv[6]) : 1; -int64_t hash_table_size = (argc > 7) ? atoi(argv[7]) : 1; -int64_t cores = (argc > 8) ? atoi(argv[8]) : 1; - - -./lightsaber yahoo 100000 10000 1048576 512 512 1024 15 \ No newline at end of file +done \ No newline at end of file