From cee760afb573007bcf0924368113df8160405369 Mon Sep 17 00:00:00 2001 From: Seppe Degryse <80254822+Griezn@users.noreply.github.com> Date: Sun, 23 Feb 2025 23:13:15 +0100 Subject: [PATCH 1/6] Removed free tracking --- benchmark/Query1.cpp | 1 - include/memory.h | 4 -- include/mutli_source.h | 26 ----------- src/memory.c | 18 ------- src/multi_source.c | 103 ----------------------------------------- src/query.c | 1 - 6 files changed, 153 deletions(-) delete mode 100644 include/mutli_source.h delete mode 100644 src/multi_source.c diff --git a/benchmark/Query1.cpp b/benchmark/Query1.cpp index a100f44..de4269e 100644 --- a/benchmark/Query1.cpp +++ b/benchmark/Query1.cpp @@ -157,7 +157,6 @@ namespace } state.counters["Allocs"] = (double) get_alloc_count(); - state.counters["Peak"] = (double) get_peak_allocated(); state.counters["Total"] = (double) get_total_allocated(); free_file_source(source1); diff --git a/include/memory.h b/include/memory.h index c31dc29..d7f2524 100644 --- a/include/memory.h +++ b/include/memory.h @@ -9,12 +9,8 @@ void* tracked_malloc(size_t size); -void tracked_free(void* ptr, size_t size); - size_t get_alloc_count(); -size_t get_peak_allocated(); - size_t get_total_allocated(); void reset_memory_counter(); diff --git a/include/mutli_source.h b/include/mutli_source.h deleted file mode 100644 index 6a9bd27..0000000 --- a/include/mutli_source.h +++ /dev/null @@ -1,26 +0,0 @@ -// -// Created by Seppe Degryse on 01/12/2024. -// - -#ifndef MUTLISOURCE_H -#define MUTLISOURCE_H -#include "source.h" - -typedef struct MultiSource { - source_t source; - uint8_t num_sources; - uint8_t capacity; - source_t* *sources; // array of source ptrs -} multi_source_t; - -source_t *create_multi_source(); - -sink_t *create_multi_sink(); - -void multi_source_add(source_t* multi_source, const source_t* source); - -void free_multi_source(source_t *source); - -void free_multi_sink(sink_t *sink); - -#endif //MUTLISOURCE_H diff --git a/src/memory.c b/src/memory.c index 5ad4e11..91ad740 100644 --- a/src/memory.c +++ b/src/memory.c @@ -5,8 +5,6 @@ #include static size_t total_allocated = 0; -static size_t current_allocated = 0; -static size_t peak_allocated = 0; static size_t allocation_count = 0; //static pthread_mutex_t mem_lock = PTHREAD_MUTEX_INITIALIZER; @@ -16,33 +14,17 @@ void* tracked_malloc(size_t size) { void* ptr = malloc(size); if (ptr) { total_allocated += size; - current_allocated += size; allocation_count++; - if (current_allocated > peak_allocated) { - peak_allocated = current_allocated; - } } //pthread_mutex_unlock(&mem_lock); return ptr; } - -void tracked_free(void* ptr, const size_t size) { - if (!ptr) return; - //pthread_mutex_lock(&mem_lock); - free(ptr); - current_allocated -= size; - //pthread_mutex_unlock(&mem_lock); -} - size_t get_alloc_count() {return allocation_count;} -size_t get_peak_allocated() {return peak_allocated;} size_t get_total_allocated() {return total_allocated;} void reset_memory_counter() { total_allocated = 0; - peak_allocated = 0; allocation_count = 0; - current_allocated = 0; } \ No newline at end of file diff --git a/src/multi_source.c b/src/multi_source.c deleted file mode 100644 index 8318161..0000000 --- a/src/multi_source.c +++ /dev/null @@ -1,103 +0,0 @@ -// -// Created by Seppe Degryse on 01/12/2024. -// -#include "mutli_source.h" - -#include -#include - - -data_t *get_next_multi_source(const source_t *source, const uint32_t size, const uint32_t step) -{ - multi_source_t *ms = (multi_source_t*) source; - data_t* datas[ms->num_sources]; - uint32_t sizes[ms->num_sources]; - data_t* out = malloc(sizeof(data_t)); - out->size = 0; - out->width = 1; - - // fetch data, set size - for (int i = 0; i < ms->num_sources; ++i) { - data_t *next = ms->sources[i]->get_next(ms->sources[i], size, step); - - if (next == NULL) - return NULL; - - datas[i] = next; - sizes[i] = 0; - out->size += datas[i]->size; - } - - // allocate the output array - out->data = malloc(sizeof(triple_t) * out->size); - - uint32_t i = 0; - while (i < out->size) { // loop until all elements are processed - for (int j = 0; j < ms->num_sources; ++j) { - if (sizes[j] < datas[j]->size) { // check if an array is fully processed - out->data[i++] = datas[j]->data[sizes[j]++]; - } - } - } - - return out; -} - - -source_t *create_multi_source() -{ - multi_source_t *ms = malloc(sizeof(multi_source_t)); - ms->num_sources = 0; - ms->capacity = 2; - ms->sources = malloc(ms->capacity * sizeof(source_t*)); - ms->source.get_next = get_next_multi_source; - - return (source_t*) ms; -} - - -void push_next_msink(sink_t *sink, const data_t *data) -{ - if (sink->buffer.data) - free(sink->buffer.data); - - sink->buffer = *data; -} - - -sink_t *create_multi_sink() -{ - sink_t *sink = malloc(sizeof(sink_t)); - sink->buffer = (data_t) {NULL, 0, 1}; - sink->push_next = push_next_msink; - return sink; -} - - -void multi_source_add(source_t* multi_source, const source_t* source) -{ - multi_source_t *ms = (multi_source_t*) multi_source; - if (ms->num_sources == ms->capacity) { - ms->capacity = ms->capacity * 2; - ms->sources = realloc(ms->sources, ms->capacity); - } - - ms->sources[ms->num_sources++] = (source_t*) source; -} - - -void free_multi_source(source_t *source) -{ - multi_source_t *ms = (multi_source_t*) source; - free(ms->sources); - ms->sources = NULL; - free(ms); - ms = NULL; -} - - -void free_multi_sink(sink_t *sink) -{ - free(sink); - sink = NULL; -} diff --git a/src/query.c b/src/query.c index c5094cb..72cc2c5 100644 --- a/src/query.c +++ b/src/query.c @@ -9,7 +9,6 @@ #include #define malloc(size) tracked_malloc(size) -#define free(ptr, size) tracked_free(ptr, size) /// The join operator From d9a217960bac051f17c6250dbd7f7a6b24634d28 Mon Sep 17 00:00:00 2001 From: Seppe Degryse <80254822+Griezn@users.noreply.github.com> Date: Sun, 23 Feb 2025 23:13:45 +0100 Subject: [PATCH 2/6] Re-added reset file source --- include/file_source.h | 2 ++ src/file_source.c | 10 ++++++++++ 2 files changed, 12 insertions(+) diff --git a/include/file_source.h b/include/file_source.h index dbeed4e..3b15538 100644 --- a/include/file_source.h +++ b/include/file_source.h @@ -26,4 +26,6 @@ void free_file_source(source_t *source); void free_file_sink(sink_t *sink); +void reset_file_source(source_t *source); //TODO: add for the source in general + #endif //FILE_SOURCE_H diff --git a/src/file_source.c b/src/file_source.c index 4dddf03..15adcdf 100644 --- a/src/file_source.c +++ b/src/file_source.c @@ -71,6 +71,16 @@ source_t *create_file_source(const char *filename, const uint8_t consumers) return (source_t*) fs; } + +void reset_file_source(source_t *source) +{ + file_source_t *fs = (file_source_t*) source; + + fs->source.index = 0; + fs->source.consumed = 0; +} + + void push_next_fsink(sink_t *sink, const data_t *data) { file_sink_t *fs = (file_sink_t*) sink; From 9522301f9845a175499517ec98d3758e376176bf Mon Sep 17 00:00:00 2001 From: Seppe Degryse <80254822+Griezn@users.noreply.github.com> Date: Sun, 23 Feb 2025 23:14:22 +0100 Subject: [PATCH 3/6] Moved to flat query plan --- .github/workflows/cmake-multi-platform.yml | 2 +- CMakeLists.txt | 2 - include/query.h | 15 ++ src/query.c | 165 +++++++++------------ tests/queryTests.cpp | 48 ------ 5 files changed, 89 insertions(+), 143 deletions(-) diff --git a/.github/workflows/cmake-multi-platform.yml b/.github/workflows/cmake-multi-platform.yml index f893c31..5ec0d6d 100644 --- a/.github/workflows/cmake-multi-platform.yml +++ b/.github/workflows/cmake-multi-platform.yml @@ -4,7 +4,7 @@ name: CMake on multiple platforms on: push: - branches: [ "main" ] + branches: [ "main" , "flat"] pull_request: branches: [ "main" ] diff --git a/CMakeLists.txt b/CMakeLists.txt index a280744..350ede0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,8 +34,6 @@ add_library(SR STATIC src/data.c include/file_source.h src/file_source.c - include/mutli_source.h - src/multi_source.c include/operator.h include/memory.h src/memory.c diff --git a/include/query.h b/include/query.h index 86b0944..fe6d8c7 100644 --- a/include/query.h +++ b/include/query.h @@ -10,6 +10,19 @@ #include +typedef struct ExecutionStep { + const operator_t *operator_; + data_t *left_input; + data_t *right_input; + data_t *output; +} step_t; + +#define MAX_OPERATOR_COUNT 256 //uint8_t + +typedef struct ExecutionPlan { + struct ExecutionStep *steps; + uint8_t num_steps; +} plan_t; typedef struct Query { struct Operator *root; @@ -24,6 +37,8 @@ typedef struct { void execute_query(const query_t *query, sink_t *sink); +void flatten_query(const operator_t* operator_, data_t *results, uint8_t index, plan_t *plan); + void join_triple_copy(const data_t *src1, uint32_t index1, const data_t *src2, uint32_t index2, data_t *dest); diff --git a/src/query.c b/src/query.c index 72cc2c5..d9c2a84 100644 --- a/src/query.c +++ b/src/query.c @@ -7,15 +7,11 @@ #include #include +#include #define malloc(size) tracked_malloc(size) -/// The join operator -/// @param in1 The first input stream to be joined -/// @param in2 The second input stream to be joined -/// @param out The output stream containing matching from the first and hte second stream -/// @param param Join parameters containing a function ptr specifying the join condition void join(const data_t *in1, const data_t *in2, data_t *out, const join_params_t param) { const uint32_t size = (in1->size * in2->size) * (in1->width + in2->width); @@ -52,11 +48,6 @@ void cart_join(const data_t *in1, const data_t *in2, data_t *out, const cart_joi } -/// The filter operator -/// @param in The input stream to be filtered -/// @param out The filtered output stream -/// @param param The filter parameters containing a function ptr specifying the filter condition -/// @note The out buffer will likely be larger than the out->size void filter(const data_t *in, data_t *out, const filter_params_t param) { const uint32_t size = in->size * in->width; @@ -73,9 +64,6 @@ void filter(const data_t *in, data_t *out, const filter_params_t param) } -/// The window operator creates a copy of the input stream in a newly specified size -/// @param out A selection of the input stream -/// @param params The window parameter bool window(data_t *out, const window_params_t params) { data_t* data = params.source->get_next(params.source, params.size, params.step); @@ -84,15 +72,14 @@ bool window(data_t *out, const window_params_t params) return false; *out = *data; - free(data, data->size * data->width); + const uint32_t size = out->size * out->width * sizeof(triple_t); + out->data = malloc(size); + memcpy(out->data, data->data, size); + free(data); return true; } -/// Performs a column selection on a stream -/// @param in The input stream -/// @param out The input stream with only the triples having one of the specified predicates -/// @param param The select parameter containing an array with the wanted predicates void select_query(const data_t *in, data_t *out, const select_params_t param) { // TODO: add extra test for double occurences in 1 row @@ -112,92 +99,74 @@ void select_query(const data_t *in, data_t *out, const select_params_t param) } -bool execute_operator(const operator_t *operator, const data_t *in, data_t *out); -void *execute_operator_thread(void *arg) { - const operator_thread_arg_t *targ = arg; - bool *return_value = malloc(sizeof(bool)); - *return_value = execute_operator(targ->operator_, targ->in, targ->out); - return return_value; -} - - -/// This function executed the right operator -/// @param operator The operator to be executed -/// @param in The input stream -/// @param out The output stream -bool execute_operator(const operator_t *operator, const data_t *in, data_t *out) +bool execute_plan(const plan_t *plan, data_t **out) { - data_t tmpo1 = *in; - data_t tmpo2 = {NULL, 0, 1}; - - switch (operator->type) { - case JOIN: - case CARTESIAN: - assert(operator->left); - assert(operator->right); + for (uint8_t i = 0; i < plan->num_steps; ++i) { + const step_t *step = &plan->steps[i]; + const operator_t *op = step->operator_; + const data_t *left_input = step->left_input; + const data_t *right_input = step->right_input; + data_t *output = step->output; + + switch (op->type) { + case JOIN: + join(left_input, right_input, output, op->params.join); + free(left_input->data); + free(right_input->data); + break; - bool left_bool = execute_operator(operator->left, in, &tmpo1); - bool right_bool = execute_operator(operator->right, in, &tmpo2); + case CARTESIAN: + cart_join(left_input, right_input, output, op->params.cart_join); + free(left_input->data); + free(right_input->data); + break; - if (!left_bool || !right_bool) { - if (tmpo1.data && tmpo1.data != in->data) - free(tmpo1.data, tmpo1.size*tmpo1.width); + case FILTER: + filter(left_input, output, op->params.filter); + free(left_input->data); + break; - if (tmpo2.data) - free(tmpo2.data, tmpo2.size*tmpo2.width); + case WINDOW: + if (!window(output, op->params.window)) return false; + break; - tmpo1.data = NULL; - tmpo2.data = NULL; - return false; - } + case SELECT: + select_query(left_input, output, op->params.select); + free(left_input->data); + break; + } + } - if (operator->type == JOIN) - join(&tmpo1, &tmpo2, out, operator->params.join); - else - cart_join(&tmpo1, &tmpo2, out, operator->params.cart_join); //TODO: test + *out = plan->steps[plan->num_steps - 1].output; + return true; +} - free(tmpo2.data, tmpo2.size * tmpo2.width); - tmpo2.data = NULL; - break; - case FILTER: - if (operator->left) { - if(!execute_operator(operator->left, in, &tmpo1)) - return false; - } - filter(&tmpo1, out, operator->params.filter); - break; - case WINDOW: - if (operator->left) { - if(!execute_operator(operator->left, in, &tmpo1)) - return false; - } +void flatten_query(const operator_t* operator_, data_t *results, const uint8_t index, plan_t *plan) +{ + assert(operator_); - if (!window(out, operator->params.window)) - return false; + data_t *output = &results[index]; - break; - case SELECT: - if (operator->left) { - if(!execute_operator(operator->left, in, &tmpo1)) - return false; - } + const uint8_t next_index_l = 2*index + 1; + const uint8_t next_index_r = 2*index + 2; + data_t *left_input = operator_->left ? &results[next_index_l] : NULL; + data_t *right_input = operator_->right ? &results[next_index_r] : NULL; - select_query(&tmpo1, out, operator->params.select); - break; - } + if (operator_->left) flatten_query(operator_->left, results, next_index_l, plan); + if (operator_->right) flatten_query(operator_->right, results, next_index_r, plan); + const step_t step = {operator_, left_input, right_input, output}; + plan->steps[plan->num_steps++] = step; - if (tmpo1.data != in->data) { - assert(operator->left); - if (operator->left->type == WINDOW) - return true; +} - free(tmpo1.data, tmpo1.size * tmpo1.width); - tmpo1.data = NULL; - } - return true; +void init_plan(plan_t *plan) +{ + plan->num_steps = 0; + plan->steps = malloc(MAX_OPERATOR_COUNT * sizeof(step_t)); + assert(plan->steps); } @@ -206,9 +175,21 @@ bool execute_operator(const operator_t *operator, const data_t *in, data_t *out) /// @param sink The sink consuming the output stream void execute_query(const query_t *query, sink_t *sink) { - data_t data = {NULL, 0, 1}; + plan_t plan; + init_plan(&plan); - while (execute_operator(query->root, &data, &data)) { - sink->push_next(sink, &data); + // Max number of operators (256) + // 3: left input, right input, output + data_t *results = malloc(MAX_OPERATOR_COUNT * 3 * sizeof(data_t)); + assert(results); + + flatten_query(query->root, results, 0, &plan); + + data_t *out = NULL; + while (execute_plan(&plan, &out)) { + sink->push_next(sink, out); } + + free(plan.steps); + free(results); } diff --git a/tests/queryTests.cpp b/tests/queryTests.cpp index 2d9e53a..580de81 100644 --- a/tests/queryTests.cpp +++ b/tests/queryTests.cpp @@ -44,54 +44,6 @@ bool check_join(const triple_t in1, const triple_t in2) return in1.object == in2.object; } -TEST_F(QueryTestFixture, test_query_join_death_no_children) -{ - skip_teardown = true; - join_check_t conditions[1] = {check_join}; - operator_t join_op = { - .type = JOIN, - .left = nullptr, - .right = nullptr, - .params = {.join = {.size = 1, .checks = conditions}} - }; - query_t query_join = {.root = &join_op}; - - // Program should abort because .left and .right are NULL - ASSERT_DEATH(execute_query(&query_join, gsink), ""); -} - -TEST_F(QueryTestFixture, test_query_join_death_no_left_child) -{ - skip_teardown = true; - join_check_t conditions[1] = {check_join}; - operator_t join_op = { - .type = JOIN, - .left = nullptr, - .right = &join_op, - .params = {.join = {.size = 1, .checks = conditions}} - }; - query_t query_join = {.root = &join_op}; - - // Program should abort because .left is NULL - ASSERT_DEATH(execute_query(&query_join, gsink), ""); -} - -TEST_F(QueryTestFixture, test_query_join_death_no_right_child) -{ - skip_teardown = true; - join_check_t conditions[1] = {check_join}; - operator_t join_op = { - .type = JOIN, - .left = &join_op, - .right = nullptr, - .params = {.join = {.size = 1, .checks = conditions}} - }; - query_t query_join = {.root = &join_op}; - - // Program should abort because .right is NULL - ASSERT_DEATH(execute_query(&query_join, gsink), ""); -} - bool check_filter(const triple_t in) { From cff7b48d4fc212112971a5c5e19b5fb0b1a931a7 Mon Sep 17 00:00:00 2001 From: Seppe Degryse <80254822+Griezn@users.noreply.github.com> Date: Sun, 23 Feb 2025 23:53:08 +0100 Subject: [PATCH 4/6] Changed order of query plan --- src/query.c | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/src/query.c b/src/query.c index d9c2a84..505d768 100644 --- a/src/query.c +++ b/src/query.c @@ -72,9 +72,6 @@ bool window(data_t *out, const window_params_t params) return false; *out = *data; - const uint32_t size = out->size * out->width * sizeof(triple_t); - out->data = malloc(size); - memcpy(out->data, data->data, size); free(data); return true; } @@ -101,29 +98,30 @@ void select_query(const data_t *in, data_t *out, const select_params_t param) bool execute_plan(const plan_t *plan, data_t **out) { - for (uint8_t i = 0; i < plan->num_steps; ++i) { + for (int8_t i = plan->num_steps - 1; i >= 0; --i) { const step_t *step = &plan->steps[i]; const operator_t *op = step->operator_; const data_t *left_input = step->left_input; const data_t *right_input = step->right_input; data_t *output = step->output; + switch (op->type) { case JOIN: join(left_input, right_input, output, op->params.join); - free(left_input->data); - free(right_input->data); + if (op->left->type != WINDOW) free(left_input->data); + if (op->right->type != WINDOW) free(right_input->data); break; case CARTESIAN: cart_join(left_input, right_input, output, op->params.cart_join); - free(left_input->data); - free(right_input->data); + if (op->left->type != WINDOW) free(left_input->data); + if (op->right->type != WINDOW) free(right_input->data); break; case FILTER: filter(left_input, output, op->params.filter); - free(left_input->data); + if (op->left->type != WINDOW) free(left_input->data); break; case WINDOW: @@ -132,12 +130,12 @@ bool execute_plan(const plan_t *plan, data_t **out) case SELECT: select_query(left_input, output, op->params.select); - free(left_input->data); + if (op->left->type != WINDOW) free(left_input->data); break; } } - *out = plan->steps[plan->num_steps - 1].output; + *out = plan->steps[0].output; return true; } @@ -153,12 +151,11 @@ void flatten_query(const operator_t* operator_, data_t *results, const uint8_t i data_t *left_input = operator_->left ? &results[next_index_l] : NULL; data_t *right_input = operator_->right ? &results[next_index_r] : NULL; - if (operator_->left) flatten_query(operator_->left, results, next_index_l, plan); - if (operator_->right) flatten_query(operator_->right, results, next_index_r, plan); - const step_t step = {operator_, left_input, right_input, output}; plan->steps[plan->num_steps++] = step; + if (operator_->left) flatten_query(operator_->left, results, next_index_l, plan); + if (operator_->right) flatten_query(operator_->right, results, next_index_r, plan); } From c80ea4334f4b697e099be35398926024006806ad Mon Sep 17 00:00:00 2001 From: Seppe Degryse <80254822+Griezn@users.noreply.github.com> Date: Mon, 24 Feb 2025 10:49:35 +0100 Subject: [PATCH 5/6] Added assertions --- src/query.c | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/src/query.c b/src/query.c index 505d768..5c2149d 100644 --- a/src/query.c +++ b/src/query.c @@ -15,8 +15,7 @@ void join(const data_t *in1, const data_t *in2, data_t *out, const join_params_t param) { const uint32_t size = (in1->size * in2->size) * (in1->width + in2->width); - out->data = malloc(size * sizeof(triple_t)); - assert(out->data); + out->data = malloc(size * sizeof(triple_t)); assert(out->data); out->size = 0; out->width = in1->width + in2->width; @@ -33,8 +32,7 @@ void join(const data_t *in1, const data_t *in2, data_t *out, const join_params_t void cart_join(const data_t *in1, const data_t *in2, data_t *out, const cart_join_params_t param) { const uint32_t size = (in1->size * in2->size) * (in1->width + in2->width); - out->data = malloc(size * sizeof(triple_t)); - assert(out->data); + out->data = malloc(size * sizeof(triple_t)); assert(out->data); out->size = 0; out->width = in1->width + in2->width; @@ -51,8 +49,7 @@ void cart_join(const data_t *in1, const data_t *in2, data_t *out, const cart_joi void filter(const data_t *in, data_t *out, const filter_params_t param) { const uint32_t size = in->size * in->width; - out->data = malloc(size * sizeof(triple_t)); - assert(out->data); + out->data = malloc(size * sizeof(triple_t)); assert(out->data); out->size = 0; out->width = in->width; @@ -81,7 +78,7 @@ void select_query(const data_t *in, data_t *out, const select_params_t param) { // TODO: add extra test for double occurences in 1 row const uint32_t size = in->size * param.width; - out->data = malloc(size * sizeof(triple_t)); + out->data = malloc(size * sizeof(triple_t)); assert(out->data); out->size = in->size; out->width = param.width; uint32_t out_idx = 0; @@ -99,27 +96,31 @@ void select_query(const data_t *in, data_t *out, const select_params_t param) bool execute_plan(const plan_t *plan, data_t **out) { for (int8_t i = plan->num_steps - 1; i >= 0; --i) { - const step_t *step = &plan->steps[i]; - const operator_t *op = step->operator_; + const step_t *step = &plan->steps[i]; assert(step); + const operator_t *op = step->operator_; assert(op); const data_t *left_input = step->left_input; const data_t *right_input = step->right_input; data_t *output = step->output; - switch (op->type) { case JOIN: + assert(left_input); + assert(right_input); join(left_input, right_input, output, op->params.join); if (op->left->type != WINDOW) free(left_input->data); if (op->right->type != WINDOW) free(right_input->data); break; case CARTESIAN: + assert(left_input); + assert(right_input); cart_join(left_input, right_input, output, op->params.cart_join); if (op->left->type != WINDOW) free(left_input->data); if (op->right->type != WINDOW) free(right_input->data); break; case FILTER: + assert(left_input); filter(left_input, output, op->params.filter); if (op->left->type != WINDOW) free(left_input->data); break; @@ -129,20 +130,21 @@ bool execute_plan(const plan_t *plan, data_t **out) break; case SELECT: + assert(left_input); select_query(left_input, output, op->params.select); if (op->left->type != WINDOW) free(left_input->data); break; } } - *out = plan->steps[0].output; + *out = plan->steps[0].output; assert(out); return true; } void flatten_query(const operator_t* operator_, data_t *results, const uint8_t index, plan_t *plan) { - assert(operator_); + assert(operator_); assert(results); assert(plan); data_t *output = &results[index]; @@ -162,8 +164,7 @@ void flatten_query(const operator_t* operator_, data_t *results, const uint8_t i void init_plan(plan_t *plan) { plan->num_steps = 0; - plan->steps = malloc(MAX_OPERATOR_COUNT * sizeof(step_t)); - assert(plan->steps); + plan->steps = malloc(MAX_OPERATOR_COUNT * sizeof(step_t)); assert(plan->steps); } @@ -177,8 +178,7 @@ void execute_query(const query_t *query, sink_t *sink) // Max number of operators (256) // 3: left input, right input, output - data_t *results = malloc(MAX_OPERATOR_COUNT * 3 * sizeof(data_t)); - assert(results); + data_t *results = malloc(MAX_OPERATOR_COUNT * 3 * sizeof(data_t)); assert(results); flatten_query(query->root, results, 0, &plan); From 040a4dcd06df5326168430b8d8b15b80afbb5c53 Mon Sep 17 00:00:00 2001 From: Seppe Degryse <80254822+Griezn@users.noreply.github.com> Date: Mon, 24 Feb 2025 15:42:10 +0100 Subject: [PATCH 6/6] Possible fix for memory leak --- include/utils.h | 11 ++--------- src/file_source.c | 2 +- src/query.c | 8 ++++++-- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/include/utils.h b/include/utils.h index bf179df..049847e 100644 --- a/include/utils.h +++ b/include/utils.h @@ -5,14 +5,7 @@ #ifndef UTILS_H #define UTILS_H -static int min(const int a, const int b) { - return a < b ? a : b; -} - -/* -static int max(const int a, const int b) { - return a > b ? a : b; -} -*/ +#define MIN(a, b) ((a) < (b) ? (a) : (b)) +#define MAX(a, b) ((a) > (b) ? (a) : (b)) #endif //UTILS_H diff --git a/src/file_source.c b/src/file_source.c index 15adcdf..62b1897 100644 --- a/src/file_source.c +++ b/src/file_source.c @@ -25,7 +25,7 @@ data_t *get_next_file(const source_t *source, const uint32_t size, const uint32_ data_t *data = malloc(sizeof(data_t)); assert(data); data->data = fs->source.buffer.data + (fs->source.index * fs->source.buffer.width); - data->size = min(size, fs->source.buffer.size - fs->source.index); + data->size = MIN(size, fs->source.buffer.size - fs->source.index); data->width = source->buffer.width; if (++fs->source.consumed == fs->source.consumers) { diff --git a/src/query.c b/src/query.c index 5c2149d..b012d9f 100644 --- a/src/query.c +++ b/src/query.c @@ -9,6 +9,8 @@ #include #include +#include "utils.h" + #define malloc(size) tracked_malloc(size) @@ -96,6 +98,7 @@ void select_query(const data_t *in, data_t *out, const select_params_t param) bool execute_plan(const plan_t *plan, data_t **out) { for (int8_t i = plan->num_steps - 1; i >= 0; --i) { + if(plan->steps[i].operator_ == NULL) continue; const step_t *step = &plan->steps[i]; assert(step); const operator_t *op = step->operator_; assert(op); const data_t *left_input = step->left_input; @@ -154,7 +157,8 @@ void flatten_query(const operator_t* operator_, data_t *results, const uint8_t i data_t *right_input = operator_->right ? &results[next_index_r] : NULL; const step_t step = {operator_, left_input, right_input, output}; - plan->steps[plan->num_steps++] = step; + plan->steps[index] = step; + plan->num_steps = MAX(plan->num_steps, index+1); if (operator_->left) flatten_query(operator_->left, results, next_index_l, plan); if (operator_->right) flatten_query(operator_->right, results, next_index_r, plan); @@ -164,7 +168,7 @@ void flatten_query(const operator_t* operator_, data_t *results, const uint8_t i void init_plan(plan_t *plan) { plan->num_steps = 0; - plan->steps = malloc(MAX_OPERATOR_COUNT * sizeof(step_t)); assert(plan->steps); + plan->steps = calloc(MAX_OPERATOR_COUNT, sizeof(step_t)); assert(plan->steps); }