Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions lightsaber_bench/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ RUN apt update && \

RUN cd && \
apt remove --purge --auto-remove cmake && \
version=3.16 && \
version=3.17 && \
build=2 && \
mkdir ~/temp && \
cd ~/temp && \
Expand All @@ -66,7 +66,7 @@ RUN cd /usr/src/gtest && \
ln -s /usr/lib/libgtest_main.a /usr/local/lib/gtest/libgtest_main.a

RUN cd && \
git clone --depth 1 https://github.com/wzhao18/benchmark.git && \
git clone --depth 1 https://github.com/google/benchmark.git && \
cd benchmark && \
mkdir build && \
cd build && \
Expand Down Expand Up @@ -98,6 +98,8 @@ ENV LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIBRARY_PATH
ENV PATH=/usr/lib/ccache:$PATH

RUN cd && \
apt update && \
apt upgrade -y && \
apt install -y \
git \
gcc \
Expand Down Expand Up @@ -137,6 +139,8 @@ RUN cd && \
make install

RUN cd && \
apt update && \
apt upgrade -y && \
apt install -y \
autoconf \
automake \
Expand Down Expand Up @@ -180,4 +184,4 @@ RUN cd /root/lightsaber_bench && \
mkdir -p build && \
cd build && \
cmake .. -DCMAKE_BUILD_TYPE=Release && \
make -j$(nproc) 2>/dev/null
make -j$(nproc) 2>/dev/null
3 changes: 0 additions & 3 deletions lightsaber_bench/include/ls_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ class AggregateBench : public Benchmark
long window_size;
void createApplication() override
{
SystemConf::getInstance().SLOTS = 256;
SystemConf::getInstance().PARTIAL_WINDOWS = 64;

// Configure non-grouped aggregation. Check the application benchmarks for grouped aggreagations.
std::vector<AggregationType> aggregationTypes(1);
aggregationTypes[0] = AggregationTypes::fromString("sum");
Expand Down
34 changes: 23 additions & 11 deletions lightsaber_bench/include/ls_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class Benchmark
};
int m_sock = 0;
int m_server_fd;
std::vector<char> *InputBuffer = nullptr;
std::vector<std::vector<char>*> InputBuffers;
QueryApplication *application = nullptr;
virtual void createApplication() = 0;

Expand All @@ -45,27 +45,39 @@ class Benchmark
{
double range = 100;

InputBuffer = new std::vector<char> (size * sizeof(InputSchema));
auto ptr = (InputSchema *) InputBuffer->data();
for (unsigned long idx = 0; idx < size; idx++) {
ptr[idx].st = idx * period;
ptr[idx].dur = period;
ptr[idx].payload = static_cast<float>(rand() / static_cast<double>(RAND_MAX / range)) - (range / 2);
// Make 1000 copies of buffer. Each run will use a different one.
for (size_t i = 0; i < 1000; i++) {
auto buffer = new std::vector<char> (size * sizeof(InputSchema));
auto ptr = (InputSchema *) buffer->data();
for (unsigned long idx = 0; idx < size; idx++) {
ptr[idx].st = idx * period;
ptr[idx].dur = period;
ptr[idx].payload = static_cast<float>(rand() / static_cast<double>(RAND_MAX / range)) - (range / 2);
}
InputBuffers.push_back(buffer);
}
}

int64_t runBenchmark(int64_t size, int64_t period)
int64_t runBenchmark(int64_t size, int64_t period, int64_t runs)
{
PopulateBufferWithData(size, period);

auto start_time = std::chrono::high_resolution_clock::now();
application->processData(*InputBuffer, -1);

for (size_t i = 0; i < runs; i++) {
application->processData(*(InputBuffers[i % 1000]));
}

// Signal that no more tasks will be enqueued
application->closeTaskQueue();
// Wait for worker threads to finish executions
application->waitForCompletion();

auto end_time = std::chrono::high_resolution_clock::now();

int64_t time = duration_cast<std::chrono::microseconds>(end_time - start_time).count();

return time;
}

};

#endif // LIGHTSABER_BENCH_INCLUDE_BENCHMARK_H_
39 changes: 27 additions & 12 deletions lightsaber_bench/include/ls_yahoo.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,36 @@ class YahooBench : public Benchmark
long window_size;
TupleSchema *getSchema () override
{
auto schema = new TupleSchema(5, "YahooBenchmark");
/* For some reason, it only works when I add padding to the schema to make it 64 bytes */
auto schema = new TupleSchema(8, "YahooBenchmark");
auto longAttr = AttributeType(BasicType::Long);

schema->setAttributeType(0, longAttr); /* st: long */
schema->setAttributeType(1, longAttr); /* dur: long */
schema->setAttributeType(2, longAttr); /* user_id: long */
schema->setAttributeType(3, longAttr); /* campaign_id: long */
schema->setAttributeType(4, longAttr); /* event_type: long */
schema->setAttributeType(5, longAttr); /* dummy: long */
schema->setAttributeType(6, longAttr); /* dummy: long */
schema->setAttributeType(7, longAttr); /* dummy: long */

return schema;
}

void PopulateBufferWithData(int64_t size, int64_t period) override
{
InputBuffer = new std::vector<char> (size * sizeof(YahooSchema));
auto ptr = (YahooSchema *) InputBuffer->data();
for (unsigned long idx = 0; idx < size; idx++) {
ptr[idx].st = idx * period;
ptr[idx].dur = period;
ptr[idx].user_id = static_cast<long>(rand() % 5 + 1);
ptr[idx].campaign_id = static_cast<long>(rand() % 5 + 1);
ptr[idx].event_type = static_cast<long>(rand() % 5 + 1);
// 10 input buffers for now
for (size_t i = 0; i < 1000; i++) {
auto buffer = new std::vector<char> (size * sizeof(YahooSchema));
auto ptr = (YahooSchema *) buffer->data();
for (unsigned long idx = 0; idx < size; idx++) {
ptr[idx].st = i * 1000 + idx * period;
ptr[idx].dur = period;
ptr[idx].user_id = static_cast<long>(rand() % 5 + 1);
ptr[idx].campaign_id = static_cast<long>(2);
ptr[idx].event_type = static_cast<long>(rand() % 5 + 1);
}
InputBuffers.push_back(buffer);
}
}

Expand All @@ -60,11 +68,15 @@ class YahooBench : public Benchmark
std::vector<ColumnReference *> aggregationAttributes(1);
aggregationAttributes[0] = new ColumnReference(2, BasicType::Long);

std::vector<Expression *> groupByAttributes(0);
// Must do a Groupby here, otherwise hit unhandled bug
std::vector<Expression *> groupByAttributes(1);
groupByAttributes[0] = new ColumnReference(3, BasicType::Long);

auto window = new WindowDefinition(ROW_BASED, window_size, window_size);
auto window = new WindowDefinition(RANGE_BASED, window_size, window_size);
Aggregation *aggregation = new Aggregation(*window, aggregationTypes, aggregationAttributes, groupByAttributes);

bool replayTimestamps = window->isRangeBased();

// Set up code-generated operator
OperatorKernel *genCode = new OperatorKernel(true);
genCode->setInputSchema(getSchema());
Expand All @@ -82,7 +94,7 @@ class YahooBench : public Benchmark
long timestampReference = std::chrono::system_clock::now().time_since_epoch().count();

std::vector<std::shared_ptr<Query>> queries(1);
queries[0] = std::make_shared<Query>(0, operators, *window, getSchema(), timestampReference, true, false, true);
queries[0] = std::make_shared<Query>(0, operators, *window, getSchema(), timestampReference, true, replayTimestamps, !replayTimestamps);

application = new QueryApplication(queries);
application->setup();
Expand All @@ -95,6 +107,9 @@ class YahooBench : public Benchmark
long user_id;
long campaign_id;
long event_type;
long dummy1;
long dummy2;
long dummy3;
};

YahooBench(long window_size)
Expand Down
28 changes: 18 additions & 10 deletions lightsaber_bench/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,42 @@

int main(int argc, const char **argv) {
std::string testcase = (argc > 1) ? argv[1] : "select";
int64_t size = (argc > 2) ? atoi(argv[2]) : 10000000;
int64_t runs = (argc > 3) ? atoi(argv[3]) : 1;
// In # of events. This will multiply by 1000. So don't set it too much or memory will exhaust.
int64_t size = (argc > 2) ? atoi(argv[2]) : 100000;
int64_t runs = (argc > 3) ? atoi(argv[3]) : 10000;
int64_t cores = (argc > 4) ? atoi(argv[4]) : 1;
int64_t period = 1;
SystemConf::getInstance().BATCH_SIZE = 200000; // This means the input_size (size * sizeof(InputSchema)) must be multiple of 200,000
SystemConf::getInstance().CIRCULAR_BUFFER_SIZE = size * sizeof(Benchmark::InputSchema);

SystemConf::getInstance().BATCH_SIZE = 131072; // In Bytes
SystemConf::getInstance().CIRCULAR_BUFFER_SIZE = 8 * size * sizeof(Benchmark::InputSchema); // In Bytes, the coefficient is worth tuning
SystemConf::getInstance().WORKER_THREADS = cores;

double time = 0;
std::unique_ptr<Benchmark> benchmarkQuery {};
if (testcase == "select") {
benchmarkQuery = std::make_unique<SelectBench>(60);
} else if (testcase == "where") {
benchmarkQuery = std::make_unique<WhereBench>(60);
} else if (testcase == "aggregate") {
SystemConf::getInstance().SLOTS = 256;
SystemConf::getInstance().PARTIAL_WINDOWS = 64;
benchmarkQuery = std::make_unique<AggregateBench>(1000);
} else if (testcase == "alterdur") {
benchmarkQuery = std::make_unique<AlterDurBench>(1, 60);
} else if (testcase == "yahoo") {
SystemConf::getInstance().CIRCULAR_BUFFER_SIZE = size * sizeof(YahooBench::YahooSchema);
SystemConf::getInstance().BATCH_SIZE = 1048576;
SystemConf::getInstance().SLOTS = 512;
SystemConf::getInstance().PARTIAL_WINDOWS = 512;
SystemConf::getInstance().HASH_TABLE_SIZE = 1024;
SystemConf::getInstance().BUNDLE_SIZE = size * sizeof(YahooBench::YahooSchema); // not used, simply to bypass assertions.
SystemConf::getInstance().CIRCULAR_BUFFER_SIZE = 8 * size * sizeof(YahooBench::YahooSchema);
benchmarkQuery = std::make_unique<YahooBench>(1000);
} else {
throw std::runtime_error("Invalid testcase");
}

for (int64_t i = 0; i < runs; i++) {
time += benchmarkQuery->runBenchmark(size, period);
}
double time = benchmarkQuery->runBenchmark(size, period, runs);

std::cout << "Testcase: " << testcase <<", Size: " << size * runs
std::cout << "Testcase: " << testcase << ", Size: " << size * runs
<< ", Time: " << std::setprecision(3) << time / 1000000 << std::endl;

return 0;
Expand Down
77 changes: 42 additions & 35 deletions lightsaber_bench/remote/remoteSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,52 +27,39 @@ struct alignas(16) AggregateOutputSchema {

struct alignas(16) YahooOutputSchema {
long timestamp;
long campaign_id;
int count;

void print_data() {
std::cout << "[" << timestamp << "]: " << count << std::endl;
std::cout << "[" << timestamp << "]: " << campaign_id << " " << count << std::endl;
}
};

template<typename T>
class RemoteSink {
protected:
private:
int64_t batch_size;
int64_t buffer_size;
int m_sock = 0;
int m_server_fd = 0;
std::vector<char> *buffer = nullptr;
std::vector<char> buffer;

public:
int run(int64_t output_size) {
InitializeBuffer(output_size * sizeof(T));

setupSocket();
readBytes(m_sock, output_size * sizeof(T), (void *)buffer->data());

print_buffer(buffer, output_size);
return 0;
}

private:
void InitializeBuffer(int64_t size) {
buffer = new std::vector<char>(size);
}

static void readBytes(int socket, unsigned int length, void *buffer) {
size_t bytesRead = 0;
while (bytesRead < length) {
auto valread = read(socket, (char *)buffer + bytesRead, length - bytesRead);
bytesRead += valread;
void read_one_batch() {
int64_t bytes_read = 0;
while (bytes_read < buffer_size) {
auto valread = read(m_sock, (char *)(buffer.data()) + bytes_read, buffer_size - bytes_read);
bytes_read += valread;
}
}

void print_buffer(std::vector<char> *buf, int64_t len) {
auto arr = (T *) buf->data();
for (unsigned long idx = 0; idx < len; idx++) {
void print_buffer() {
auto arr = (T *) buffer.data();
for (unsigned long idx = 0; idx < batch_size; idx++) {
arr[idx].print_data();
}
}

void setupSocket() {
void setup_socket() {
struct sockaddr_in address {};
int opt = 1;
int addrlen = sizeof(address);
Expand All @@ -99,20 +86,40 @@ class RemoteSink {
throw std::runtime_error("error: accept");
}
}

public:
RemoteSink(int64_t batch_size) :
batch_size(batch_size),
buffer_size(batch_size * sizeof(T)),
buffer(buffer_size)
{}

void run()
{
setup_socket();

while (true) {
read_one_batch();
std::cout << "Successfully read " << batch_size << " tuples." << std::endl;
// print_buffer();
}
}
};

int main(int argc, const char **argv) {
std::string testcase = (argc > 1) ? argv[1] : "select";
int64_t output_size = (argc > 2) ? atoi(argv[2]) : 10000000;
int64_t batch_size = (argc > 2) ? atoi(argv[2]) : 10000000;

if (testcase == "aggregate") {
auto remoteSink = std::make_unique<RemoteSink<AggregateOutputSchema>>();
remoteSink->run(output_size);
auto remoteSink = std::make_unique<RemoteSink<AggregateOutputSchema>>(batch_size);
remoteSink->run();
} else if (testcase == "yahoo") {
auto remoteSink = std::make_unique<RemoteSink<YahooOutputSchema>>();
remoteSink->run(output_size);
auto remoteSink = std::make_unique<RemoteSink<YahooOutputSchema>>(batch_size);
remoteSink->run();
} else {
auto remoteSink = std::make_unique<RemoteSink<OutputSchema>>();
remoteSink->run(output_size);
auto remoteSink = std::make_unique<RemoteSink<OutputSchema>>(batch_size);
remoteSink->run();
}

return 0;
}
14 changes: 14 additions & 0 deletions lightsaber_bench/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

#!/bin/bash

sb_bench_bin="./build/lightsaber"
log_file="./result.txt"

for bench in "select" "where" "aggregate" "yahoo"
do
for cores in 1 3 7 15 31
do
echo ${sb_bench_bin} ${bench} 100000 10000 ${cores} ${size}
${sb_bench_bin} ${bench} 100000 10000 ${cores} ${size}
done
done