diff --git a/lightsaber_bench/Dockerfile b/lightsaber_bench/Dockerfile index b9894f3..f2552e6 100644 --- a/lightsaber_bench/Dockerfile +++ b/lightsaber_bench/Dockerfile @@ -46,7 +46,7 @@ RUN apt update && \ RUN cd && \ apt remove --purge --auto-remove cmake && \ - version=3.16 && \ + version=3.17 && \ build=2 && \ mkdir ~/temp && \ cd ~/temp && \ @@ -66,7 +66,7 @@ RUN cd /usr/src/gtest && \ ln -s /usr/lib/libgtest_main.a /usr/local/lib/gtest/libgtest_main.a RUN cd && \ - git clone --depth 1 https://github.com/wzhao18/benchmark.git && \ + git clone --depth 1 https://github.com/google/benchmark.git && \ cd benchmark && \ mkdir build && \ cd build && \ @@ -98,6 +98,8 @@ ENV LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIBRARY_PATH ENV PATH=/usr/lib/ccache:$PATH RUN cd && \ + apt update && \ + apt upgrade -y && \ apt install -y \ git \ gcc \ @@ -137,6 +139,8 @@ RUN cd && \ make install RUN cd && \ + apt update && \ + apt upgrade -y && \ apt install -y \ autoconf \ automake \ @@ -180,4 +184,4 @@ RUN cd /root/lightsaber_bench && \ mkdir -p build && \ cd build && \ cmake .. -DCMAKE_BUILD_TYPE=Release && \ - make -j$(nproc) 2>/dev/null \ No newline at end of file + make -j$(nproc) 2>/dev/null diff --git a/lightsaber_bench/LightSaber b/lightsaber_bench/LightSaber index 0df44b1..4d71056 160000 --- a/lightsaber_bench/LightSaber +++ b/lightsaber_bench/LightSaber @@ -1 +1 @@ -Subproject commit 0df44b16266d4c2c2465017bc2c51ce7c393e671 +Subproject commit 4d710565a9a79a3bf582685540258f641f12de76 diff --git a/lightsaber_bench/include/ls_aggregate.h b/lightsaber_bench/include/ls_aggregate.h index bbdb98d..450466d 100644 --- a/lightsaber_bench/include/ls_aggregate.h +++ b/lightsaber_bench/include/ls_aggregate.h @@ -12,9 +12,6 @@ class AggregateBench : public Benchmark long window_size; void createApplication() override { - SystemConf::getInstance().SLOTS = 256; - SystemConf::getInstance().PARTIAL_WINDOWS = 64; - // Configure non-grouped aggregation. Check the application benchmarks for grouped aggreagations. std::vector aggregationTypes(1); aggregationTypes[0] = AggregationTypes::fromString("sum"); diff --git a/lightsaber_bench/include/ls_base.h b/lightsaber_bench/include/ls_base.h index 7c36ce3..b082af6 100644 --- a/lightsaber_bench/include/ls_base.h +++ b/lightsaber_bench/include/ls_base.h @@ -24,7 +24,7 @@ class Benchmark }; int m_sock = 0; int m_server_fd; - std::vector *InputBuffer = nullptr; + std::vector*> InputBuffers; QueryApplication *application = nullptr; virtual void createApplication() = 0; @@ -45,27 +45,39 @@ class Benchmark { double range = 100; - InputBuffer = new std::vector (size * sizeof(InputSchema)); - auto ptr = (InputSchema *) InputBuffer->data(); - for (unsigned long idx = 0; idx < size; idx++) { - ptr[idx].st = idx * period; - ptr[idx].dur = period; - ptr[idx].payload = static_cast(rand() / static_cast(RAND_MAX / range)) - (range / 2); + // Make 1000 copies of buffer. Each run will use a different one. + for (size_t i = 0; i < 1000; i++) { + auto buffer = new std::vector (size * sizeof(InputSchema)); + auto ptr = (InputSchema *) buffer->data(); + for (unsigned long idx = 0; idx < size; idx++) { + ptr[idx].st = idx * period; + ptr[idx].dur = period; + ptr[idx].payload = static_cast(rand() / static_cast(RAND_MAX / range)) - (range / 2); + } + InputBuffers.push_back(buffer); } } - int64_t runBenchmark(int64_t size, int64_t period) + int64_t runBenchmark(int64_t size, int64_t period, int64_t runs) { PopulateBufferWithData(size, period); auto start_time = std::chrono::high_resolution_clock::now(); - application->processData(*InputBuffer, -1); + + for (size_t i = 0; i < runs; i++) { + application->processData(*(InputBuffers[i % 1000])); + } + + // Signal that no more tasks will be enqueued + application->closeTaskQueue(); + // Wait for worker threads to finish executions + application->waitForCompletion(); + auto end_time = std::chrono::high_resolution_clock::now(); - int64_t time = duration_cast(end_time - start_time).count(); + return time; } - }; #endif // LIGHTSABER_BENCH_INCLUDE_BENCHMARK_H_ \ No newline at end of file diff --git a/lightsaber_bench/include/ls_yahoo.h b/lightsaber_bench/include/ls_yahoo.h index 69b767c..3b00da4 100644 --- a/lightsaber_bench/include/ls_yahoo.h +++ b/lightsaber_bench/include/ls_yahoo.h @@ -14,7 +14,8 @@ class YahooBench : public Benchmark long window_size; TupleSchema *getSchema () override { - auto schema = new TupleSchema(5, "YahooBenchmark"); + /* For some reason, it only works when I add padding to the schema to make it 64 bytes */ + auto schema = new TupleSchema(8, "YahooBenchmark"); auto longAttr = AttributeType(BasicType::Long); schema->setAttributeType(0, longAttr); /* st: long */ @@ -22,20 +23,27 @@ class YahooBench : public Benchmark schema->setAttributeType(2, longAttr); /* user_id: long */ schema->setAttributeType(3, longAttr); /* campaign_id: long */ schema->setAttributeType(4, longAttr); /* event_type: long */ + schema->setAttributeType(5, longAttr); /* dummy: long */ + schema->setAttributeType(6, longAttr); /* dummy: long */ + schema->setAttributeType(7, longAttr); /* dummy: long */ return schema; } void PopulateBufferWithData(int64_t size, int64_t period) override { - InputBuffer = new std::vector (size * sizeof(YahooSchema)); - auto ptr = (YahooSchema *) InputBuffer->data(); - for (unsigned long idx = 0; idx < size; idx++) { - ptr[idx].st = idx * period; - ptr[idx].dur = period; - ptr[idx].user_id = static_cast(rand() % 5 + 1); - ptr[idx].campaign_id = static_cast(rand() % 5 + 1); - ptr[idx].event_type = static_cast(rand() % 5 + 1); + // 10 input buffers for now + for (size_t i = 0; i < 1000; i++) { + auto buffer = new std::vector (size * sizeof(YahooSchema)); + auto ptr = (YahooSchema *) buffer->data(); + for (unsigned long idx = 0; idx < size; idx++) { + ptr[idx].st = i * 1000 + idx * period; + ptr[idx].dur = period; + ptr[idx].user_id = static_cast(rand() % 5 + 1); + ptr[idx].campaign_id = static_cast(2); + ptr[idx].event_type = static_cast(rand() % 5 + 1); + } + InputBuffers.push_back(buffer); } } @@ -60,11 +68,15 @@ class YahooBench : public Benchmark std::vector aggregationAttributes(1); aggregationAttributes[0] = new ColumnReference(2, BasicType::Long); - std::vector groupByAttributes(0); + // Must do a Groupby here, otherwise hit unhandled bug + std::vector groupByAttributes(1); + groupByAttributes[0] = new ColumnReference(3, BasicType::Long); - auto window = new WindowDefinition(ROW_BASED, window_size, window_size); + auto window = new WindowDefinition(RANGE_BASED, window_size, window_size); Aggregation *aggregation = new Aggregation(*window, aggregationTypes, aggregationAttributes, groupByAttributes); + bool replayTimestamps = window->isRangeBased(); + // Set up code-generated operator OperatorKernel *genCode = new OperatorKernel(true); genCode->setInputSchema(getSchema()); @@ -82,7 +94,7 @@ class YahooBench : public Benchmark long timestampReference = std::chrono::system_clock::now().time_since_epoch().count(); std::vector> queries(1); - queries[0] = std::make_shared(0, operators, *window, getSchema(), timestampReference, true, false, true); + queries[0] = std::make_shared(0, operators, *window, getSchema(), timestampReference, true, replayTimestamps, !replayTimestamps); application = new QueryApplication(queries); application->setup(); @@ -95,6 +107,9 @@ class YahooBench : public Benchmark long user_id; long campaign_id; long event_type; + long dummy1; + long dummy2; + long dummy3; }; YahooBench(long window_size) diff --git a/lightsaber_bench/main.cpp b/lightsaber_bench/main.cpp index 37e9d2d..11ca350 100644 --- a/lightsaber_bench/main.cpp +++ b/lightsaber_bench/main.cpp @@ -6,34 +6,42 @@ int main(int argc, const char **argv) { std::string testcase = (argc > 1) ? argv[1] : "select"; - int64_t size = (argc > 2) ? atoi(argv[2]) : 10000000; - int64_t runs = (argc > 3) ? atoi(argv[3]) : 1; + // In # of events. This will multiply by 1000. So don't set it too much or memory will exhaust. + int64_t size = (argc > 2) ? atoi(argv[2]) : 100000; + int64_t runs = (argc > 3) ? atoi(argv[3]) : 10000; + int64_t cores = (argc > 4) ? atoi(argv[4]) : 1; int64_t period = 1; - SystemConf::getInstance().BATCH_SIZE = 200000; // This means the input_size (size * sizeof(InputSchema)) must be multiple of 200,000 - SystemConf::getInstance().CIRCULAR_BUFFER_SIZE = size * sizeof(Benchmark::InputSchema); + + SystemConf::getInstance().BATCH_SIZE = 131072; // In Bytes + SystemConf::getInstance().CIRCULAR_BUFFER_SIZE = 8 * size * sizeof(Benchmark::InputSchema); // In Bytes, the coefficient is worth tuning + SystemConf::getInstance().WORKER_THREADS = cores; - double time = 0; std::unique_ptr benchmarkQuery {}; if (testcase == "select") { benchmarkQuery = std::make_unique(60); } else if (testcase == "where") { benchmarkQuery = std::make_unique(60); } else if (testcase == "aggregate") { + SystemConf::getInstance().SLOTS = 256; + SystemConf::getInstance().PARTIAL_WINDOWS = 64; benchmarkQuery = std::make_unique(1000); } else if (testcase == "alterdur") { benchmarkQuery = std::make_unique(1, 60); } else if (testcase == "yahoo") { - SystemConf::getInstance().CIRCULAR_BUFFER_SIZE = size * sizeof(YahooBench::YahooSchema); + SystemConf::getInstance().BATCH_SIZE = 1048576; + SystemConf::getInstance().SLOTS = 512; + SystemConf::getInstance().PARTIAL_WINDOWS = 512; + SystemConf::getInstance().HASH_TABLE_SIZE = 1024; + SystemConf::getInstance().BUNDLE_SIZE = size * sizeof(YahooBench::YahooSchema); // not used, simply to bypass assertions. + SystemConf::getInstance().CIRCULAR_BUFFER_SIZE = 8 * size * sizeof(YahooBench::YahooSchema); benchmarkQuery = std::make_unique(1000); } else { throw std::runtime_error("Invalid testcase"); } - for (int64_t i = 0; i < runs; i++) { - time += benchmarkQuery->runBenchmark(size, period); - } + double time = benchmarkQuery->runBenchmark(size, period, runs); - std::cout << "Testcase: " << testcase <<", Size: " << size * runs + std::cout << "Testcase: " << testcase << ", Size: " << size * runs << ", Time: " << std::setprecision(3) << time / 1000000 << std::endl; return 0; diff --git a/lightsaber_bench/remote/remoteSink.cpp b/lightsaber_bench/remote/remoteSink.cpp index 53c3604..42d5199 100644 --- a/lightsaber_bench/remote/remoteSink.cpp +++ b/lightsaber_bench/remote/remoteSink.cpp @@ -27,52 +27,39 @@ struct alignas(16) AggregateOutputSchema { struct alignas(16) YahooOutputSchema { long timestamp; + long campaign_id; int count; void print_data() { - std::cout << "[" << timestamp << "]: " << count << std::endl; + std::cout << "[" << timestamp << "]: " << campaign_id << " " << count << std::endl; } }; template class RemoteSink { - protected: +private: + int64_t batch_size; + int64_t buffer_size; int m_sock = 0; int m_server_fd = 0; - std::vector *buffer = nullptr; + std::vector buffer; - public: - int run(int64_t output_size) { - InitializeBuffer(output_size * sizeof(T)); - - setupSocket(); - readBytes(m_sock, output_size * sizeof(T), (void *)buffer->data()); - - print_buffer(buffer, output_size); - return 0; - } - - private: - void InitializeBuffer(int64_t size) { - buffer = new std::vector(size); - } - - static void readBytes(int socket, unsigned int length, void *buffer) { - size_t bytesRead = 0; - while (bytesRead < length) { - auto valread = read(socket, (char *)buffer + bytesRead, length - bytesRead); - bytesRead += valread; + void read_one_batch() { + int64_t bytes_read = 0; + while (bytes_read < buffer_size) { + auto valread = read(m_sock, (char *)(buffer.data()) + bytes_read, buffer_size - bytes_read); + bytes_read += valread; } } - void print_buffer(std::vector *buf, int64_t len) { - auto arr = (T *) buf->data(); - for (unsigned long idx = 0; idx < len; idx++) { + void print_buffer() { + auto arr = (T *) buffer.data(); + for (unsigned long idx = 0; idx < batch_size; idx++) { arr[idx].print_data(); } } - void setupSocket() { + void setup_socket() { struct sockaddr_in address {}; int opt = 1; int addrlen = sizeof(address); @@ -99,20 +86,40 @@ class RemoteSink { throw std::runtime_error("error: accept"); } } + +public: + RemoteSink(int64_t batch_size) : + batch_size(batch_size), + buffer_size(batch_size * sizeof(T)), + buffer(buffer_size) + {} + + void run() + { + setup_socket(); + + while (true) { + read_one_batch(); + std::cout << "Successfully read " << batch_size << " tuples." << std::endl; + // print_buffer(); + } + } }; int main(int argc, const char **argv) { std::string testcase = (argc > 1) ? argv[1] : "select"; - int64_t output_size = (argc > 2) ? atoi(argv[2]) : 10000000; + int64_t batch_size = (argc > 2) ? atoi(argv[2]) : 10000000; + if (testcase == "aggregate") { - auto remoteSink = std::make_unique>(); - remoteSink->run(output_size); + auto remoteSink = std::make_unique>(batch_size); + remoteSink->run(); } else if (testcase == "yahoo") { - auto remoteSink = std::make_unique>(); - remoteSink->run(output_size); + auto remoteSink = std::make_unique>(batch_size); + remoteSink->run(); } else { - auto remoteSink = std::make_unique>(); - remoteSink->run(output_size); + auto remoteSink = std::make_unique>(batch_size); + remoteSink->run(); } + return 0; } \ No newline at end of file diff --git a/lightsaber_bench/run.sh b/lightsaber_bench/run.sh new file mode 100644 index 0000000..4938271 --- /dev/null +++ b/lightsaber_bench/run.sh @@ -0,0 +1,14 @@ + +#!/bin/bash + +sb_bench_bin="./build/lightsaber" +log_file="./result.txt" + +for bench in "select" "where" "aggregate" "yahoo" +do + for cores in 1 3 7 15 31 + do + echo ${sb_bench_bin} ${bench} 100000 10000 ${cores} ${size} + ${sb_bench_bin} ${bench} 100000 10000 ${cores} ${size} + done +done \ No newline at end of file