From b646492ba75d8655ba5111a18b92e2aab06374d5 Mon Sep 17 00:00:00 2001 From: gal salomon Date: Sun, 15 May 2022 17:42:46 +0300 Subject: [PATCH 1/7] adding capability to execute the same query across different input-stream(CSV in this case), and to merge results of each of the streams and return it to caller as a single one. the different executions can ran in parallel to eachother Signed-off-by: gal salomon --- example/CMakeLists.txt | 4 + example/s3select_example.cpp | 39 +++++--- include/s3select.h | 166 ++++++++++++++++++++++++++++++++--- test/s3select_test.cpp | 32 +++---- test/s3select_test.h | 54 ++++++------ 5 files changed, 230 insertions(+), 65 deletions(-) diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt index 8b5c8c07..a2f80cb1 100644 --- a/example/CMakeLists.txt +++ b/example/CMakeLists.txt @@ -1,5 +1,7 @@ add_executable(s3select_example s3select_example.cpp) target_include_directories(s3select_example PUBLIC ../include ../rapidjson/include) +add_executable(s3select_scaleup s3select_scaleup.cpp) +target_include_directories(s3select_scaleup PUBLIC ../include) find_package(Arrow QUIET) @@ -8,9 +10,11 @@ if(Arrow_FOUND) add_executable(csv_to_parquet csv_to_parquet.cpp) target_include_directories(csv_to_parquet PUBLIC ../include) target_link_libraries(s3select_example boost_date_time boost_system boost_thread parquet arrow boost_filesystem) + target_link_libraries(s3select_scaleup boost_date_time boost_system boost_thread parquet arrow boost_filesystem) target_link_libraries(csv_to_parquet boost_date_time boost_system boost_thread parquet arrow) else() target_link_libraries(s3select_example boost_date_time boost_system boost_thread boost_filesystem) + target_link_libraries(s3select_scaleup boost_date_time boost_system boost_thread boost_filesystem) endif() add_executable(generate_rand_csv generate_rand_csv.c) diff --git a/example/s3select_example.cpp b/example/s3select_example.cpp index c663185e..657f4a74 100644 --- a/example/s3select_example.cpp +++ b/example/s3select_example.cpp @@ -18,7 +18,7 @@ class awsCli_handler { private: std::unique_ptr s3select_syntax; std::string m_s3select_query; - std::string m_result; + s3select_result m_result; std::unique_ptr m_s3_csv_object; std::string m_column_delimiter;//TODO remove std::string m_quot;//TODO remove @@ -156,7 +156,7 @@ class awsCli_handler { std::string get_result() { - return m_result; + return m_result.str(); } int run_s3select(const char *query, const char *input, size_t input_length, size_t object_size) @@ -229,7 +229,7 @@ class awsCli_handler { if (m_result.size() > strlen(PAYLOAD_LINE)) { m_result.append(END_PAYLOAD_LINE); - create_message(m_result, m_result.size() - 12, header_size); + create_message(m_result.str(), m_result.size() - 12, header_size); //s->formatter->write_bin_data(m_result.data(), buff_len); //if (op_ret < 0) //{ @@ -304,12 +304,12 @@ int run_query_on_parquet_file(const char* input_query, const char* input_file) rgw.set_get_size_api(fp_get_size); rgw.set_range_req_api(fp_range_req); - std::function fp_s3select_result_format = [](std::string& result){std::cout << result;result.clear();return 0;}; - std::function fp_s3select_header_format = [](std::string& result){result="";return 0;}; + std::function fp_s3select_result_format = [](s3select_result& result){std::cout << result;result.clear();return 0;}; + std::function fp_s3select_header_format = [](s3select_result& result){result="";return 0;}; parquet_object parquet_processor(input_file,&s3select_syntax,&rgw); - std::string result; + s3select_result result; do { @@ -455,7 +455,7 @@ int run_on_localFile(char* input_query) lstat(object_name.c_str(), &statbuf); - std::string s3select_result; + s3select_result result; s3selectEngine::csv_object::csv_defintions csv; csv.use_header_info = false; //csv.column_delimiter='|'; @@ -477,7 +477,23 @@ int run_on_localFile(char* input_query) { size_t input_sz = fread(buff, 1, BUFF_SIZE, fp); char* in=buff; - status = s3_csv_object.run_s3select_on_stream(s3select_result, in, input_sz, statbuf.st_size); + //input_sz = strlen(buff); + //size_t input_sz = in == 0 ? 0 : strlen(in); + + if (!input_sz || feof(fp)) + { + do_aggregate = true; + } + + int status; + if(do_aggregate == true) + { + status = s3_csv_object.run_s3select_on_object(result, in, input_sz, false, false, do_aggregate); + } + else + { + status = s3_csv_object.run_s3select_on_stream(result, in, input_sz, statbuf.st_size); + } if(status<0) { @@ -485,12 +501,12 @@ int run_on_localFile(char* input_query) break; } - if(s3select_result.size()>0) + if(result.size()>0) { - std::cout << s3select_result; + std::cout << result; } - s3select_result = ""; + result = ""; if(!input_sz || feof(fp)) { break; @@ -521,7 +537,6 @@ int run_on_single_query(const char* fname, const char* query) if (is_parquet_file(fname)) { - std::string result; int status = run_query_on_parquet_file(query, fname); return status; } diff --git a/include/s3select.h b/include/s3select.h index 7b386003..9d4524f6 100644 --- a/include/s3select.h +++ b/include/s3select.h @@ -18,6 +18,8 @@ #include #include +#include //TODO where producer should be implemented + #define _DEBUG_TERM {string token(a,b);std::cout << __FUNCTION__ << token << std::endl;} namespace s3selectEngine @@ -1931,7 +1933,149 @@ struct s3select_csv_definitions //TODO s3select_csv_definitions():row_delimiter('\n'), column_delimiter(','), output_row_delimiter('\n'), output_column_delimiter(','), escape_char('\\'), output_escape_char('\\'), output_quot_char('"'), quot_char('"'), use_header_info(false), ignore_header_info(false), quote_fields_always(false), quote_fields_asneeded(false), redundant_column(false), comment_empty_lines(false) {} }; - + +/////// result handling +class result_storage { + + private: + + + public: + + char x[1000]; + result_storage(int &x){} + result_storage(){} +}; + +class shared_queue{ + + private: + boost::lockfree::queue s3select_result_queue{128}; + + bool done; + + public: + + shared_queue():done(false){} + + void producers_complete() + { + done = true; + } + + int push(std::string& result) + { + result_storage rs;//TODO append should copy to result_storage to skip the string-copy + strncpy(rs.x,result.data(),sizeof(rs.x)); + + while(!s3select_result_queue.push(rs)); + + return 0; + } + + int pop() + { + result_storage value; + std::string result; + + while (!done) { + while (s3select_result_queue.pop(value)) + { + result.assign(value.x); + std::cout << ">>" << result << std::endl;//TODO std::function per different type of clients + } + } + + while (s3select_result_queue.pop(value)) + { + result.assign(value.x); + std::cout << ">>" << result << std::endl; + } + + return 0; + } + +}; + +class s3select_result { +//handle different forms of results (arrow-format, producer/consumer(MT), simple string) ... more + +std::string result; +shared_queue* m_shared_queue; + +public: + + s3select_result():m_shared_queue(nullptr){} + + void set_shared_queue(shared_queue* sq) + { + m_shared_queue = sq; + } + + void clear() + { + result.clear(); + } + + const char* c_str() const + { + return result.c_str(); + } + + size_t size() + { + return result.size(); + } + + std::string& append(char* in,size_t n) + { + return result.append(in,n); + } + + std::string& append(const char* in) + { + return result.append(in); + } + + std::string& append(const std::string& in) + { + return result.append(in); + } + + friend std::ostream& operator<< (std::ostream& os, s3select_result& str) + { + return os << str.str(); + } + + std::string& operator= (const std::string& str) + { + result.assign(str); + return result; + } + + std::string& str() + { + return result; + } + + int push_producer() + {//copy into fix size. + + if(m_shared_queue) + { + m_shared_queue->push(result); + result.clear(); + } + + return 0; + } + + int pop_consumer() + { + return 0; + } + +}; /////// handling different object types class base_s3object @@ -1979,7 +2123,7 @@ class base_s3object // for the case were the rows are not fetched, but "pushed" by the data-source parser (JSON) virtual bool multiple_row_processing(){return true;} - void result_values_to_string(multi_values& projections_resuls, std::string& result) + void result_values_to_string(multi_values& projections_resuls, s3select_result& result) { size_t i = 0; std::string output_delimiter(1,m_csv_defintion.output_column_delimiter); @@ -1990,7 +2134,7 @@ class base_s3object if (m_csv_defintion.quote_fields_always) { std::ostringstream quoted_result; quoted_result << std::quoted(res->to_string(),m_csv_defintion.output_quot_char, m_csv_defintion.escape_char); - result.append(quoted_result.str()); + result.append(quoted_result.str().data()); }//TODO to add asneeded else { @@ -2008,9 +2152,11 @@ class base_s3object } if(!m_aggr_flow) result.append(output_row_delimiter); + + result.push_producer();//only upon shared-queue exists } - int getMatchRow( std::string& result) + int getMatchRow( s3select_result& result) { multi_values projections_resuls; @@ -2241,7 +2387,7 @@ class csv_object : public base_s3object } - int run_s3select_on_stream(std::string& result, const char* csv_stream, size_t stream_length, size_t obj_size) + int run_s3select_on_stream(s3select_result& result, const char* csv_stream, size_t stream_length, size_t obj_size) { int status=0; try{ @@ -2266,7 +2412,7 @@ class csv_object : public base_s3object } private: - int run_s3select_on_stream_internal(std::string& result, const char* csv_stream, size_t stream_length, size_t obj_size) + int run_s3select_on_stream_internal(s3select_result& result, const char* csv_stream, size_t stream_length, size_t obj_size) { //purpose: the cv data is "streaming", it may "cut" rows in the middle, in that case the "broken-line" is stores //for later, upon next chunk of data is streaming, the stored-line is merge with current broken-line, and processed. @@ -2313,7 +2459,7 @@ class csv_object : public base_s3object } public: - int run_s3select_on_object(std::string& result, const char* csv_stream, size_t stream_length, bool skip_first_line, bool skip_last_line, bool do_aggregate) + int run_s3select_on_object(s3select_result& result, const char* csv_stream, size_t stream_length, bool skip_first_line, bool skip_last_line, bool do_aggregate) { if (do_aggregate && m_previous_line) { @@ -2460,9 +2606,9 @@ class parquet_object : public base_s3object } - int run_s3select_on_object(std::string &result, - std::function fp_s3select_result_format, - std::function fp_s3select_header_format) + int run_s3select_on_object(s3select_result &result, + std::function fp_s3select_result_format, + std::function fp_s3select_header_format) { int status = 0; diff --git a/test/s3select_test.cpp b/test/s3select_test.cpp index 2da4700c..b4f93204 100644 --- a/test/s3select_test.cpp +++ b/test/s3select_test.cpp @@ -787,8 +787,8 @@ void test_single_column_single_row(const char* input_query,const char* expected_ } s3selectEngine::csv_object s3_csv_object(&s3select_syntax); - std::string s3select_result; std::string json_result; + s3select_result csv_result; std::string input; size_t size = 1; generate_csv(input, size); @@ -800,12 +800,12 @@ void test_single_column_single_row(const char* input_query,const char* expected_ #ifdef _ARROW_EXIST csv_to_parquet(input); - std::string parquet_result; + s3select_result parquet_result; run_query_on_parquet_file(input_query,PARQUET_FILENAME,parquet_result); #endif s3_csv_object.m_csv_defintion.redundant_column = false; - status = s3_csv_object.run_s3select_on_object(s3select_result, input.c_str(), input.size(), + status = s3_csv_object.run_s3select_on_object(csv_result, input.c_str(), input.size(), false, // dont skip first line false, // dont skip last line true // aggregate call @@ -813,7 +813,7 @@ void test_single_column_single_row(const char* input_query,const char* expected_ if(strcmp(expected_result,"#failure#") == 0) { - if (status==0 && s3select_result.compare("#failure#")==0) + if (status==0 && csv_result.str().compare("#failure#")==0) { ASSERT_TRUE(false); } @@ -823,10 +823,11 @@ void test_single_column_single_row(const char* input_query,const char* expected_ ASSERT_EQ(status, 0); #ifdef _ARROW_EXIST - parquet_csv_report_error(parquet_result,s3select_result); + parquet_csv_report_error(parquet_result.str(),csv_result.str()); #endif json_csv_report_error(json_result, s3select_result); ASSERT_EQ(s3select_result, std::string(expected_result)); + ASSERT_EQ(csv_result.str(), std::string(expected_result)); } TEST(TestS3selectFunctions, syntax_1) @@ -854,18 +855,18 @@ TEST(TestS3selectFunctions, binop_constant) auto status = s3select_syntax.parse_query(input_query.c_str()); ASSERT_EQ(status, 0); s3selectEngine::csv_object s3_csv_object(&s3select_syntax); - std::string s3select_result; + s3select_result result; std::string input; size_t size = 128; generate_csv(input, size); - status = s3_csv_object.run_s3select_on_object(s3select_result, input.c_str(), input.size(), + status = s3_csv_object.run_s3select_on_object(result, input.c_str(), input.size(), false, // dont skip first line false, // dont skip last line true // aggregate call ); ASSERT_EQ(status, 0); - int count = count_string(s3select_result,"11,8,6,64,4,1024"); + int count = count_string(result.str(),"11,8,6,64,4,1024"); ASSERT_EQ(count,size); } @@ -918,12 +919,11 @@ TEST(TestS3SElect, from_stdin) auto status = s3select_syntax.parse_query(input_query.c_str()); ASSERT_EQ(status, 0); s3selectEngine::csv_object s3_csv_object(&s3select_syntax); - std::string s3select_result; + s3select_result result; std::string input; size_t size = 128; generate_csv(input, size); - std::string input_copy = input; - status = s3_csv_object.run_s3select_on_object(s3select_result, input.c_str(), input.size(), + status = s3_csv_object.run_s3select_on_object(result, input.c_str(), input.size(), false, // dont skip first line false, // dont skip last line true // aggregate call @@ -938,11 +938,11 @@ TEST(TestS3SElect, from_valid_object) auto status = s3select_syntax.parse_query(input_query.c_str()); ASSERT_EQ(status, 0); s3selectEngine::csv_object s3_csv_object(&s3select_syntax); - std::string s3select_result; + s3select_result result; std::string input; size_t size = 128; generate_csv(input, size); - status = s3_csv_object.run_s3select_on_object(s3select_result, input.c_str(), input.size(), + status = s3_csv_object.run_s3select_on_object(result, input.c_str(), input.size(), false, // dont skip first line false, // dont skip last line true // aggregate call @@ -979,17 +979,17 @@ TEST(TestS3selectFunctions, avgzero) auto status = s3select_syntax.parse_query(input_query.c_str()); ASSERT_EQ(status, 0); s3selectEngine::csv_object s3_csv_object(&s3select_syntax); - std::string s3select_result; + s3select_result result; std::string input; size_t size = 0; generate_csv(input, size); - status = s3_csv_object.run_s3select_on_object(s3select_result, input.c_str(), input.size(), + status = s3_csv_object.run_s3select_on_object(result, input.c_str(), input.size(), false, // dont skip first line false, // dont skip last line true // aggregate call ); ASSERT_EQ(status, -1); - ASSERT_EQ(s3select_result, std::string("")); + ASSERT_EQ(result.str(), std::string("")); } TEST(TestS3selectFunctions, floatavg) diff --git a/test/s3select_test.h b/test/s3select_test.h index 3518e32e..e5b8f94d 100644 --- a/test/s3select_test.h +++ b/test/s3select_test.h @@ -203,7 +203,7 @@ int csv_to_parquet(std::string & csv_object) return 0; } -int run_query_on_parquet_file(const char* input_query, const char* input_file, std::string &result) +int run_query_on_parquet_file(const char* input_query, const char* input_file, s3select_result &result) { int status; s3select s3select_syntax; @@ -243,8 +243,8 @@ int run_query_on_parquet_file(const char* input_query, const char* input_file, s rgw.set_get_size_api(fp_get_size); rgw.set_range_req_api(fp_range_req); - std::function fp_s3select_result_format = [](std::string& result){return 0;};//append - std::function fp_s3select_header_format = [](std::string& result){return 0;};//append + std::function fp_s3select_result_format = [](s3select_result& result){return 0;};//append + std::function fp_s3select_header_format = [](s3select_result& result){return 0;};//append parquet_object parquet_processor(input_file,&s3select_syntax,&rgw); @@ -529,27 +529,27 @@ std::string run_s3select(std::string expression) if(status) return failure_sign; - std::string s3select_result; + s3select_result csv_result; s3selectEngine::csv_object s3_csv_object(&s3select_syntax); std::string in = "1,1,1,1\n"; std::string csv_obj = in; - std::string parquet_result; + s3select_result parquet_result; - s3_csv_object.run_s3select_on_object(s3select_result, in.c_str(), in.size(), false, false, true); + s3_csv_object.run_s3select_on_object(csv_result, in.c_str(), in.size(), false, false, true); - s3select_result = s3select_result.substr(0, s3select_result.find_first_of(",")); - s3select_result = s3select_result.substr(0, s3select_result.find_first_of("\n"));//remove last \n + csv_result = csv_result.str().substr(0, csv_result.str().find_first_of(",")); + csv_result = csv_result.str().substr(0, csv_result.str().find_first_of("\n"));//remove last \n #ifdef _ARROW_EXIST csv_to_parquet(csv_obj); run_query_on_parquet_file(expression.c_str(),PARQUET_FILENAME,parquet_result); - parquet_result = parquet_result.substr(0, parquet_result.find_first_of(",")); - parquet_result = parquet_result.substr(0, parquet_result.find_first_of("\n"));//remove last \n + parquet_result = parquet_result.str().substr(0, parquet_result.str().find_first_of(",")); + parquet_result = parquet_result.str().substr(0, parquet_result.str().find_first_of("\n"));//remove last \n - parquet_csv_report_error(parquet_result,s3select_result); + parquet_csv_report_error(parquet_result.str(),csv_result.str()); #endif - return s3select_result; + return csv_result.str(); } void run_s3select_test_opserialization(std::string expression,std::string input, char *row_delimiter, char *column_delimiter) @@ -561,7 +561,7 @@ void run_s3select_test_opserialization(std::string expression,std::string input, if(status) return; - std::string s3select_result; + s3select_result s3select_result0; csv_object::csv_defintions csv; csv.redundant_column = false; @@ -570,22 +570,22 @@ void run_s3select_test_opserialization(std::string expression,std::string input, s3selectEngine::csv_object s3_csv_object(&s3select_syntax, csv); - s3_csv_object.run_s3select_on_object(s3select_result, input.c_str(), input.size(), false, false, true); + s3_csv_object.run_s3select_on_object(s3select_result0, input.c_str(), input.size(), false, false, true); - std::string s3select_result1 = s3select_result; + s3select_result s3select_result1 = s3select_result0; csv.row_delimiter = *row_delimiter; csv.column_delimiter = *column_delimiter; csv.output_row_delimiter = *row_delimiter; csv.output_column_delimiter = *column_delimiter; csv.redundant_column = false; - std::string s3select_result_second_phase; + s3select_result s3select_result_second_phase; s3selectEngine::csv_object s3_csv_object_second(&s3select_syntax, csv); - s3_csv_object_second.run_s3select_on_object(s3select_result_second_phase, s3select_result.c_str(), s3select_result.size(), false, false, true); + s3_csv_object_second.run_s3select_on_object(s3select_result_second_phase, s3select_result0.c_str(), s3select_result0.size(), false, false, true); - ASSERT_EQ(s3select_result_second_phase, s3select_result1); + ASSERT_EQ(s3select_result_second_phase.str(), s3select_result1.str()); } std::string run_s3select_opserialization_quot(std::string expression,std::string input, bool quot_always = false, char quot_char = '"') @@ -597,7 +597,7 @@ std::string run_s3select_opserialization_quot(std::string expression,std::string if(status) return failure_sign; - std::string s3select_result; + s3select_result result; csv_object::csv_defintions csv; csv.redundant_column = false; @@ -606,9 +606,9 @@ std::string run_s3select_opserialization_quot(std::string expression,std::string s3selectEngine::csv_object s3_csv_object(&s3select_syntax, csv); - s3_csv_object.run_s3select_on_object(s3select_result, input.c_str(), input.size(), false, false, true); + s3_csv_object.run_s3select_on_object(result, input.c_str(), input.size(), false, false, true); - return s3select_result; + return result.str(); } // JSON tests API's @@ -645,20 +645,20 @@ std::string run_s3select(std::string expression,std::string input, const char* j if(status) return failure_sign; - std::string s3select_result; std::string json_result; + s3select_result csv_result; s3selectEngine::csv_object s3_csv_object(&s3select_syntax); s3_csv_object.m_csv_defintion.redundant_column = false; - s3_csv_object.run_s3select_on_object(s3select_result, input.c_str(), input.size(), false, false, true); + s3_csv_object.run_s3select_on_object(csv_result, input.c_str(), input.size(), false, false, true); #ifdef _ARROW_EXIST static int file_no = 1; csv_to_parquet(parquet_input); - std::string parquet_result; + s3select_result parquet_result; run_query_on_parquet_file(expression.c_str(),PARQUET_FILENAME,parquet_result); - if (strcmp(parquet_result.c_str(),s3select_result.c_str())) + if (strcmp(parquet_result.c_str(),csv_result.c_str())) { std::cout << "failed on query " << expression << std::endl; std::cout << "input for query reside on" << "./failed_test_input" << std::to_string(file_no) << ".[csv|parquet]" << std::endl; @@ -683,7 +683,7 @@ std::string run_s3select(std::string expression,std::string input, const char* j } } - parquet_csv_report_error(parquet_result,s3select_result); + parquet_csv_report_error(parquet_result.str(),csv_result.str()); #endif //_ARROW_EXIST if(strlen(json_query) == 0) { @@ -693,7 +693,7 @@ std::string run_s3select(std::string expression,std::string input, const char* j run_json_query(json_query, js, json_result); json_csv_report_error(json_result, s3select_result); - return s3select_result; + return csv_result.str(); } From 32ff7284587633a897d8be6b4e6d9b4e53244b7c Mon Sep 17 00:00:00 2001 From: gal salomon Date: Sun, 15 May 2022 17:50:22 +0300 Subject: [PATCH 2/7] oops Signed-off-by: gal salomon --- example/s3select_scaleup.cpp | 154 +++++++++++++++++++++++++++++++++++ 1 file changed, 154 insertions(+) create mode 100644 example/s3select_scaleup.cpp diff --git a/example/s3select_scaleup.cpp b/example/s3select_scaleup.cpp new file mode 100644 index 00000000..5c1374e6 --- /dev/null +++ b/example/s3select_scaleup.cpp @@ -0,0 +1,154 @@ +#include "s3select.h" +#include +#include +#include +#include +#include +#include +#include +#include + +/// implements multi-threaded execution for s3select query. +#include +#include +#include + +using namespace s3selectEngine; +using namespace BOOST_SPIRIT_CLASSIC_NS; + +int run_s3select(s3selectEngine::csv_object& s3_csv_object, + s3selectEngine::s3select& query_ast, + s3select_result& result, + const char *input, size_t input_length, size_t object_size) +{ + //purpose: execution of a single stream. the result should be aggregated by the caller + int status = 0; + + if (query_ast.get_error_description().empty() == false) + { + result.append(query_ast.get_error_description()); + status = -1; + } + else + { + status = s3_csv_object.run_s3select_on_stream(result, input, input_length, object_size); + if (status < 0) + { + result.append(s3_csv_object.get_error_description()); + } + } + + return status; +} + +int process_on_file(char* query, char* file, shared_queue* sq) +{ + //purpose: client side "aware" of all differnt streams + //each of the stream can execute the query independently + + s3selectEngine::s3select query_ast; + csv_object::csv_defintions csv;//default + std::ifstream input_file_stream; + s3select_result result;//result is private per thread + int status =0; + + //sq is shared-queue across all threads participates in execution + result.set_shared_queue(sq); + + query_ast.parse_query(query); + s3selectEngine::csv_object s3_csv_object(&query_ast, csv); + + //open-file + try { + input_file_stream = std::ifstream(file, std::ios::in | std::ios::binary); + } + catch( ... ) + { + std::cout << "failed to open file " << file << std::endl; + return(-1); + } + + //read-chunk + auto file_sz = boost::filesystem::file_size(file); +#define BUFFER_SIZE (4*1024*1024) + std::string buff(BUFFER_SIZE,0); + while (1) + { + size_t read_sz = input_file_stream.readsome(buff.data(),BUFFER_SIZE); + result.clear(); + + //procesing a stream + status = run_s3select(s3_csv_object, + query_ast, + result, + buff.data(), + read_sz, + file_sz); + + if(status<0) + { + std::cout << "failure on execution " << std::endl; + break; + } + else + { + //std::cout << "chunk complete:" << std::endl; + } + + if(!read_sz || input_file_stream.eof()) + { + //std::cout << "res:" << result << std::endl; + break; + } + } + + return 0; +} + +int run_single_query_on_many_files(char* q, std::vector files) +{ + //the set of object defines one finite data-set. + //for CSV stream it need to know in advance the total size of data-set. + + // for(auto& f : files) { calculate total size } + // non-aggregate query : per each object open a stream run_s3select_on_stream(boost::lock_free::queue, ,total-size-of-all-objects) + + shared_queue sq; + boost::thread_group producer_threads, consumer_threads; + + //call consumer + for(auto& f : files) + { + auto thread_func = [&](){return process_on_file(q,f,&sq);}; + + producer_threads.create_thread( thread_func ); + } + + auto consumer_func = [&](){return sq.pop();}; + consumer_threads.create_thread(consumer_func); + + producer_threads.join_all(); + sq.producers_complete(); + + consumer_threads.join_all(); + + return 0; +} + +int main(int argc, char **argv) +{ + if(argc<2) return -1; + + char* query=argv[1]; + std::vector list_of_files; + + for(int i=2;i Date: Sun, 22 May 2022 20:33:59 +0300 Subject: [PATCH 3/7] adding the aggregation flow for scale-up processing. each aggregation node (sum,max ...) has 2 state to handle, first-phase(processing query), second-phase(aggregate results of all participants). Signed-off-by: gal salomon --- example/s3select_scaleup.cpp | 111 +++++++++++++++++++++++++----- include/s3select.h | 94 +++++++++++++++++++++++++- include/s3select_functions.h | 127 +++++++++++++++++++++++++++++------ include/s3select_oper.h | 90 ++++++++++++++++++++++++- 4 files changed, 382 insertions(+), 40 deletions(-) diff --git a/example/s3select_scaleup.cpp b/example/s3select_scaleup.cpp index 5c1374e6..1b75095a 100644 --- a/example/s3select_scaleup.cpp +++ b/example/s3select_scaleup.cpp @@ -27,6 +27,7 @@ int run_s3select(s3selectEngine::csv_object& s3_csv_object, if (query_ast.get_error_description().empty() == false) { result.append(query_ast.get_error_description()); + std::cout << "syntax error:" << result << std::endl; status = -1; } else @@ -35,18 +36,18 @@ int run_s3select(s3selectEngine::csv_object& s3_csv_object, if (status < 0) { result.append(s3_csv_object.get_error_description()); + std::cout << "runtime error:" << result << std::endl; } } return status; } -int process_on_file(char* query, char* file, shared_queue* sq) +int process_on_file(char* query, char* file,s3selectEngine::s3select* query_ast, shared_queue* sq) { //purpose: client side "aware" of all differnt streams //each of the stream can execute the query independently - s3selectEngine::s3select query_ast; csv_object::csv_defintions csv;//default std::ifstream input_file_stream; s3select_result result;//result is private per thread @@ -55,8 +56,10 @@ int process_on_file(char* query, char* file, shared_queue* sq) //sq is shared-queue across all threads participates in execution result.set_shared_queue(sq); - query_ast.parse_query(query); - s3selectEngine::csv_object s3_csv_object(&query_ast, csv); + query_ast->parse_query(query); + query_ast->set_execution_phase(base_statement::multiple_executions_en::FIRST_PHASE); + + s3selectEngine::csv_object s3_csv_object(query_ast, csv); //open-file try { @@ -76,10 +79,15 @@ int process_on_file(char* query, char* file, shared_queue* sq) { size_t read_sz = input_file_stream.readsome(buff.data(),BUFFER_SIZE); result.clear(); + if(!read_sz || input_file_stream.eof()) + { + break; + } //procesing a stream + std::cout << "run s3select " << read_sz << std::endl; status = run_s3select(s3_csv_object, - query_ast, + *query_ast, result, buff.data(), read_sz, @@ -92,7 +100,14 @@ int process_on_file(char* query, char* file, shared_queue* sq) } else { - //std::cout << "chunk complete:" << std::endl; +#if 0 + std::cout << "chunk complete:" << read_sz << std::endl; + auto sa = query_ast->get_scratch_area()->get_aggregation_results(); + for(auto v : sa) + { + std::cout << "aggregate:" << v.to_string() << std::endl; + } +#endif } if(!read_sz || input_file_stream.eof()) @@ -105,23 +120,81 @@ int process_on_file(char* query, char* file, shared_queue* sq) return 0; } -int run_single_query_on_many_files(char* q, std::vector files) -{ - //the set of object defines one finite data-set. - //for CSV stream it need to know in advance the total size of data-set. +int run_single_query_on_many_files_aggregate_query(char* q, std::vector files) +{ //the set of object defines one finite data-set. + + std::vector all_processing_object; + std::vector> vec_of_fp; + shared_queue sq; + int status=0; + boost::thread_group producer_threads, consumer_threads; + + s3selectEngine::s3select main_query; + status = main_query.parse_query(q); - // for(auto& f : files) { calculate total size } - // non-aggregate query : per each object open a stream run_s3select_on_stream(boost::lock_free::queue, ,total-size-of-all-objects) + if (status<0) + { + std::cout << "failed to parse query" << std::endl; + return -1; + } + + main_query.set_execution_phase(base_statement::multiple_executions_en::SECOND_PHASE); + merge_results main_query_process(&main_query); + + for(auto f : files) + { + s3selectEngine::s3select * ss = new (s3selectEngine::s3select); //TODO delete + all_processing_object.push_back(ss); + auto thread_func = [q,f,ss,&sq](){return process_on_file(q,f,ss,&sq);}; + vec_of_fp.push_back( thread_func ); + } + + for(auto& t : vec_of_fp) + { + producer_threads.create_thread( t ); + } + + auto consumer_func = [&](){return sq.pop();}; + consumer_threads.create_thread(consumer_func); + + producer_threads.join_all(); + sq.producers_complete(); + + consumer_threads.join_all(); + + for(auto ast : all_processing_object) + { + auto sa = ast->get_scratch_area()->get_aggregation_results(); + for(auto v : sa) + {//debug + std::cout << "aggregate:" << v->to_string() << std::endl; + } + } + + main_query_process.set_all_processing_objects(all_processing_object); + main_query_process.execute_query(); + std::cout << main_query_process.get_result() << std::endl; + + return 0; +} + +int run_single_query_on_many_files_non_aggregate(char* q, std::vector files) +{ //the set of object defines one finite data-set. for non-aggregation flow. shared_queue sq; boost::thread_group producer_threads, consumer_threads; + std::vector> vec_of_fp; - //call consumer - for(auto& f : files) + for(auto f : files) + { + s3selectEngine::s3select * ss = new (s3selectEngine::s3select); //TODO delete + auto thread_func = [q,f,ss,&sq](){return process_on_file(q,f,ss,&sq);}; + vec_of_fp.push_back( thread_func ); + } + + for(auto& t : vec_of_fp) { - auto thread_func = [&](){return process_on_file(q,f,&sq);}; - - producer_threads.create_thread( thread_func ); + producer_threads.create_thread( t ); } auto consumer_func = [&](){return sq.pop();}; @@ -141,13 +214,15 @@ int main(int argc, char **argv) char* query=argv[1]; std::vector list_of_files; + setvbuf(stdout, NULL, _IONBF, 0);//unbuffered stdout for(int i=2;i int semantic() { - for (const auto &e : get_projections_list()) + for (auto e : get_projections_list()) { e->resolve_node(); //upon validate there is no aggregation-function nested calls, it validates legit aggregation call. @@ -533,6 +533,14 @@ struct s3select : public bsc::grammar return 0; } + void set_execution_phase(base_statement::multiple_executions_en phase) + {//TODO only after parse_query + for (const auto &e : get_projections_list()) + { + e->set_phase_state(phase); + } + } + std::string get_error_description() { return error_description; @@ -2864,6 +2872,90 @@ class json_object : public base_s3object } ~json_object() = default; + +class merge_results : public base_s3object +{//purpose: upon processing several stream on a single aggregate query, this object should merge results. + + private: + + bool m_end_of_stream; + s3select_result m_result; + std::function publish_results; + + public: + + std::vector m_all_processing_object; + + void set_all_processing_objects(std::vector objs) + { + m_all_processing_object = objs; + } + + virtual bool is_end_of_stream() + { + return m_end_of_stream; + } + + virtual void row_fetch_data() + { + if(m_all_processing_object.size() == 0) + {//it means all rows are processed, it's time to sum-up. + m_end_of_stream = true; + m_is_to_aggregate = true; + return; + } + + //each item of m_all_processing_object contain results of some data portion + //these results consider as a single row. + //the row is processed by *this object(with the rest of the rows), the AST node state is set to + s3selectEngine::s3select* q = m_all_processing_object.back(); + m_all_processing_object.pop_back(); + m_s3_select->get_scratch_area()->set_aggregation_results(q->get_scratch_area()->get_aggregation_results()); + } + + s3select_result& get_result() + {//TODO should use std::function + return m_result; + } + + int execute_query() + { + do + { + + int num = 0; + try + { + num = getMatchRow(m_result); + } + catch (base_s3select_exception& e) + { + std::cout << "error while processing::" << e.what() << std::endl; +#if 0 + m_error_description = e.what(); + m_error_count ++; + if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL || m_error_count>100 || (m_stream>=m_end_stream))//abort query execution + { + return -1; + } +#endif + + } + + if (num < 0) + { + break; + } + + } + while (true); + + return 0; + } + + merge_results(s3selectEngine::s3select* q):base_s3object(q),m_end_of_stream(false){} + + ~merge_results(){} }; }; // namespace s3selectEngine diff --git a/include/s3select_functions.h b/include/s3select_functions.h index dc74db32..271cb6bc 100644 --- a/include/s3select_functions.h +++ b/include/s3select_functions.h @@ -372,6 +372,8 @@ class __function : public base_statement public: + __function():m_func_impl(nullptr),m_s3select_functions(nullptr){} + base_function* impl() { return m_func_impl; @@ -382,6 +384,8 @@ class __function : public base_statement m_scratch = sa; m_aliases = pa; m_json_statement = json_statement; + if(m_func_impl) + m_func_impl->set_scratch_area(sa); for (base_statement* ba : arguments) { ba->traverse_and_apply(sa, pa, json_statement); @@ -406,6 +410,17 @@ class __function : public base_statement } } + virtual void set_phase_state(multiple_executions_en phase) override + { + execution_phase = phase; + if(m_func_impl) + m_func_impl->set_execution_phase(execution_phase); + for (auto& ba : arguments) + { + ba->set_phase_state(phase); + } + } + bool is_aggregate() const override { return m_is_aggregate_function; @@ -536,10 +551,20 @@ struct _fn_sum : public base_function { auto iter = args->begin(); base_statement* x = *iter; + value res; + + if(is_second_phase()) + { + m_scratch_area->pop_saved_result(res); + } + else + { + res = x->eval(); + } try { - sum = sum + x->eval(); + sum = sum + res; } catch (base_s3select_exception& e) { @@ -554,7 +579,12 @@ struct _fn_sum : public base_function void get_aggregate_result(variable* result) override { - *result = sum ; + *result = sum; + + if(is_first_phase()) + { + m_scratch_area->push_aggregation_result(&sum); + } } }; @@ -562,6 +592,7 @@ struct _fn_count : public base_function { int64_t count; + value saved_result; _fn_count():count(0) { @@ -570,27 +601,38 @@ struct _fn_count : public base_function bool operator()(bs_stmt_vec_t* args, variable* result) override { - if (args->size()) - {// in case argument exist, should count only non-null. - auto iter = args->begin(); - base_statement* x = *iter; - - if(!x->eval().is_null()) - { - count += 1; - } + if(is_second_phase()) + { + m_scratch_area->pop_saved_result(saved_result); + count = count + saved_result.i64(); } else - {//in case of non-arguments // count() - count += 1; + { + if (args->size()) + {// in case argument exist, should count only non-null. + auto iter = args->begin(); + base_statement* x = *iter; + + if(!x->eval().is_null()) + { + count += 1; + } + } + else + {//in case of non-arguments // count() + count += 1; + } } - return true; } void get_aggregate_result(variable* result) override { result->set_value(count); + if(is_first_phase()) + { + m_scratch_area->push_aggregation_result(&result->get_value()); + } } }; @@ -600,6 +642,8 @@ struct _fn_avg : public base_function value sum; value count{0.0}; + value sum_save; + value count_save{0.0}; _fn_avg() : sum(0) { aggregate = true; } @@ -608,6 +652,15 @@ struct _fn_avg : public base_function auto iter = args->begin(); base_statement *x = *iter; + if(is_second_phase()) + { + m_scratch_area->pop_saved_result(sum_save); + m_scratch_area->pop_saved_result(count_save); + sum = sum + sum_save; + count = count + count_save; + return true; + } + try { sum = sum + x->eval(); @@ -623,11 +676,19 @@ struct _fn_avg : public base_function void get_aggregate_result(variable *result) override { + if(is_first_phase()) + { + m_scratch_area->push_aggregation_result(&sum); + m_scratch_area->push_aggregation_result(&count); + } + if(count == static_cast(0)) { throw base_s3select_exception("count cannot be zero!"); } else { - *result = sum/count ; + value tmp = sum; + *result = tmp/count ; //TODO division operation change left-op (sum in this case) } + } }; @@ -645,10 +706,20 @@ struct _fn_min : public base_function { auto iter = args->begin(); base_statement* x = *iter; + value res; + + if(is_second_phase()) + { + m_scratch_area->pop_saved_result(res); + } + else + { + res = x->eval(); + } - if(min > x->eval()) + if(min > res) { - min=x->eval(); + min=res; } return true; @@ -657,6 +728,10 @@ struct _fn_min : public base_function void get_aggregate_result(variable* result) override { *result = min; + if(is_first_phase()) + { + m_scratch_area->push_aggregation_result(&min); + } } }; @@ -675,10 +750,20 @@ struct _fn_max : public base_function { auto iter = args->begin(); base_statement* x = *iter; + value res; - if(max < x->eval()) + if(is_second_phase()) { - max=x->eval(); + m_scratch_area->pop_saved_result(res); + } + else + { + res = x->eval(); + } + + if(max < res) + { + max=res; } return true; @@ -687,6 +772,10 @@ struct _fn_max : public base_function void get_aggregate_result(variable* result) override { *result = max; + if(is_first_phase()) + { + m_scratch_area->push_aggregation_result(&max); + } } }; diff --git a/include/s3select_oper.h b/include/s3select_oper.h index 9d6abbc2..62b4c1b0 100644 --- a/include/s3select_oper.h +++ b/include/s3select_oper.h @@ -1105,6 +1105,9 @@ class scratch_area uint16_t buff_loc; int max_json_idx; + /// for saving aggregation results between phases of multiple streams executions. + std::vector m_aggregation_results; + public: typedef std::pair,value> json_key_value_t; @@ -1261,10 +1264,57 @@ class scratch_area } #endif // _ARROW_EXIST + void push_aggregation_result(value* v) + { + m_aggregation_results.push_back(v); + } + + void pop_saved_result(value& v) + {//TODO check size + v = *m_aggregation_results.front(); + m_aggregation_results.erase( m_aggregation_results.begin() ); + } + + std::vector& get_aggregation_results() + { + return m_aggregation_results; + } + + void set_aggregation_results(std::vector& aggregation_results) + { + m_aggregation_results = aggregation_results; + } + + void clear_aggregation_results() + { + m_aggregation_results.clear(); + } }; class base_statement { +public: + + enum class multiple_executions_en + {//upon aggregation flow across multiple executions, results are produced(first_phase) and later merged(second_phase). + //aggregate functions should be aware to this state. + FIRST_PHASE,SECOND_PHASE,NA + }; + + bool is_first_phase() const + { + return execution_phase == multiple_executions_en::FIRST_PHASE; + } + + bool is_second_phase() const + { + return execution_phase == multiple_executions_en::SECOND_PHASE; + } + + void set_phase(multiple_executions_en phase) + { + execution_phase = phase; + } protected: @@ -1282,7 +1332,8 @@ class base_statement public: base_statement():m_scratch(nullptr), is_last_call(false), m_is_cache_result(false), - m_projection_alias(nullptr), m_eval_stack_depth(0), m_skip_non_aggregate_op(false),m_json_statement(false) {} + m_projection_alias(nullptr), m_eval_stack_depth(0), m_skip_non_aggregate_op(false), + execution_phase(multiple_executions_en::NA){} virtual value& eval() { @@ -1351,6 +1402,20 @@ class base_statement } } + virtual void set_phase_state(multiple_executions_en phase) + { + execution_phase = phase; + + if (left()) + { + left()->set_phase_state(phase); + } + if (right()) + { + right()->set_phase_state(phase); + } + } + virtual bool is_aggregate() const { return false; @@ -2145,12 +2210,14 @@ class base_function protected: bool aggregate; + base_statement::multiple_executions_en execution_phase; + scratch_area* m_scratch_area; public: //TODO add semantic to base-function , it operate once on function creation // validate semantic on creation instead on run-time virtual bool operator()(bs_stmt_vec_t* args, variable* result) = 0; - base_function() : aggregate(false) {} + base_function() : aggregate(false),execution_phase(base_statement::multiple_executions_en::NA),m_scratch_area(nullptr) {} bool is_aggregate() const { return aggregate == true; @@ -2164,6 +2231,25 @@ class base_function this->~base_function(); } + void set_execution_phase(base_statement::multiple_executions_en phase) + { + execution_phase = phase; + } + + void set_scratch_area(scratch_area* sa) + { + m_scratch_area = sa; + } + + bool is_first_phase() + { + return execution_phase == base_statement::multiple_executions_en::FIRST_PHASE; + } + + bool is_second_phase() + { + return execution_phase == base_statement::multiple_executions_en::SECOND_PHASE; + } }; class base_date_extract : public base_function From 3c436b6370664c61949e927113b5144422177b17 Mon Sep 17 00:00:00 2001 From: gal salomon Date: Tue, 24 May 2022 17:25:15 +0300 Subject: [PATCH 4/7] adding csv_streamer to enable the processing of a single query on multiple execution flows for aggregation and non aggregation flow. bug fixes. Signed-off-by: gal salomon --- example/s3select_example.cpp | 4 +- example/s3select_scaleup.cpp | 262 ++++++++++++++++++----------------- include/s3select.h | 6 +- include/s3select_functions.h | 12 +- 4 files changed, 149 insertions(+), 135 deletions(-) diff --git a/example/s3select_example.cpp b/example/s3select_example.cpp index 657f4a74..3733eaa0 100644 --- a/example/s3select_example.cpp +++ b/example/s3select_example.cpp @@ -477,8 +477,6 @@ int run_on_localFile(char* input_query) { size_t input_sz = fread(buff, 1, BUFF_SIZE, fp); char* in=buff; - //input_sz = strlen(buff); - //size_t input_sz = in == 0 ? 0 : strlen(in); if (!input_sz || feof(fp)) { @@ -492,7 +490,7 @@ int run_on_localFile(char* input_query) } else { - status = s3_csv_object.run_s3select_on_stream(result, in, input_sz, statbuf.st_size); + status = s3_csv_object.run_s3select_on_stream(result, in, input_sz, __INT64_MAX__); } if(status<0) diff --git a/example/s3select_scaleup.cpp b/example/s3select_scaleup.cpp index 1b75095a..bbc2155c 100644 --- a/example/s3select_scaleup.cpp +++ b/example/s3select_scaleup.cpp @@ -16,50 +16,103 @@ using namespace s3selectEngine; using namespace BOOST_SPIRIT_CLASSIC_NS; -int run_s3select(s3selectEngine::csv_object& s3_csv_object, - s3selectEngine::s3select& query_ast, - s3select_result& result, - const char *input, size_t input_length, size_t object_size) -{ - //purpose: execution of a single stream. the result should be aggregated by the caller - int status = 0; +class csv_streamer { - if (query_ast.get_error_description().empty() == false) - { - result.append(query_ast.get_error_description()); - std::cout << "syntax error:" << result << std::endl; - status = -1; - } - else - { - status = s3_csv_object.run_s3select_on_stream(result, input, input_length, object_size); - if (status < 0) + //purpose: streamer object initiate it's own(isolated, not depended) execution flow, the caller keeps "pushing" data for processing + private: + + s3selectEngine::s3select m_query_ast; + std::string m_query; + s3select_result m_result; + shared_queue *m_sq;//it is used for non aggregate only + csv_object::csv_defintions csv;//default + s3selectEngine::csv_object* m_csv_object; + + public: + + csv_streamer(std::string query, shared_queue *sq):m_query(query),m_sq(sq),m_csv_object(nullptr) { - result.append(s3_csv_object.get_error_description()); - std::cout << "runtime error:" << result << std::endl; + int status = m_query_ast.parse_query(m_query.data()); + + if(status<0) + return;//TODO stop processing + + m_csv_object = new s3selectEngine::csv_object(&m_query_ast,csv); + + m_result.set_shared_queue(nullptr); + if(!m_query_ast.is_aggregate_query()) + { + m_result.set_shared_queue(sq); + } + else + { + //in case of aggregation query, results saved into temporary "table" + m_query_ast.set_execution_phase(base_statement::multiple_executions_en::FIRST_PHASE); + } + } + + s3selectEngine::s3select* getS3select() + { + return &m_query_ast; } - } - return status; -} + ~csv_streamer() + { + if(m_csv_object) + delete m_csv_object; + } -int process_on_file(char* query, char* file,s3selectEngine::s3select* query_ast, shared_queue* sq) -{ - //purpose: client side "aware" of all differnt streams - //each of the stream can execute the query independently - - csv_object::csv_defintions csv;//default - std::ifstream input_file_stream; - s3select_result result;//result is private per thread - int status =0; + int process_stream(char* stream,size_t stream_size,bool end_of_stream=false) + { + if(end_of_stream) + { + m_csv_object->run_s3select_on_object(m_result, 0, 0, false, false, true); + } + else + { + m_csv_object->run_s3select_on_stream(m_result, stream, stream_size, __INT64_MAX__); + } + return 0; + } - //sq is shared-queue across all threads participates in execution - result.set_shared_queue(sq); + int run_s3select(char* input,size_t input_length,size_t object_size) + { + //purpose: execution of a single stream. the result should be aggregated by the caller + int status = 0; - query_ast->parse_query(query); - query_ast->set_execution_phase(base_statement::multiple_executions_en::FIRST_PHASE); + if (m_query_ast.get_error_description().empty() == false) + { + m_result.append(m_query_ast.get_error_description()); + std::cout << "syntax error:" << m_result << std::endl; + status = -1; + } + else + { + status = m_csv_object->run_s3select_on_stream(m_result, input, input_length, object_size); + if (status < 0) + { + m_result.append(m_csv_object->get_error_description()); + std::cout << "runtime error:" << m_result << std::endl; + } + } - s3selectEngine::csv_object s3_csv_object(query_ast, csv); + return status; + } +}; + +int splitter() +{ + //get single object , split by size , search for \n bounderies + //thread per { split-data-portion --> {while(not-end-of-data-portion){read , process_stream()) } + return 0; +} + +//TODO stream_chunk() + +int stream_file(char* file, csv_streamer *cs) +{//each file processed on seperate thread + std::ifstream input_file_stream; + int status=0; //open-file try { @@ -72,139 +125,94 @@ int process_on_file(char* query, char* file,s3selectEngine::s3select* query_ast, } //read-chunk - auto file_sz = boost::filesystem::file_size(file); #define BUFFER_SIZE (4*1024*1024) std::string buff(BUFFER_SIZE,0); - while (1) + while (true) { size_t read_sz = input_file_stream.readsome(buff.data(),BUFFER_SIZE); - result.clear(); if(!read_sz || input_file_stream.eof()) - { + {//signaling end of stream + cs->process_stream(0,0,true); break; } - //procesing a stream - std::cout << "run s3select " << read_sz << std::endl; - status = run_s3select(s3_csv_object, - *query_ast, - result, - buff.data(), - read_sz, - file_sz); - + status = cs->process_stream(buff.data(),read_sz,false); if(status<0) { std::cout << "failure on execution " << std::endl; break; } - else - { -#if 0 - std::cout << "chunk complete:" << read_sz << std::endl; - auto sa = query_ast->get_scratch_area()->get_aggregation_results(); - for(auto v : sa) - { - std::cout << "aggregate:" << v.to_string() << std::endl; - } -#endif - } if(!read_sz || input_file_stream.eof()) { - //std::cout << "res:" << result << std::endl; break; } } - return 0; } -int run_single_query_on_many_files_aggregate_query(char* q, std::vector files) -{ //the set of object defines one finite data-set. +int start_multiple_execution_flows(std::string q, std::vector files) +{ //the object-set defines one finite data-set for the query. - std::vector all_processing_object; - std::vector> vec_of_fp; shared_queue sq; - int status=0; boost::thread_group producer_threads, consumer_threads; - - s3selectEngine::s3select main_query; - status = main_query.parse_query(q); - - if (status<0) - { - std::cout << "failed to parse query" << std::endl; - return -1; - } - - main_query.set_execution_phase(base_statement::multiple_executions_en::SECOND_PHASE); - merge_results main_query_process(&main_query); + std::vector> vec_of_fp; + std::vector all_streamers; + std::vector s3select_processing_objects; for(auto f : files) { - s3selectEngine::s3select * ss = new (s3selectEngine::s3select); //TODO delete - all_processing_object.push_back(ss); - auto thread_func = [q,f,ss,&sq](){return process_on_file(q,f,ss,&sq);}; + csv_streamer *cs = new csv_streamer(q,&sq); + all_streamers.push_back(cs); + auto thread_func = [f,cs](){return stream_file(f,cs);}; vec_of_fp.push_back( thread_func ); } - + for(auto& t : vec_of_fp) { - producer_threads.create_thread( t ); + //start with query processing + producer_threads.create_thread( t ); } auto consumer_func = [&](){return sq.pop();}; + //start with merging results of all threads consumer_threads.create_thread(consumer_func); - producer_threads.join_all(); - sq.producers_complete(); + //signaling. producers complete query processing. + sq.producers_complete(); + //upon all producers had complete their work, waiting for consumer to complete. consumer_threads.join_all(); - for(auto ast : all_processing_object) - { - auto sa = ast->get_scratch_area()->get_aggregation_results(); - for(auto v : sa) - {//debug - std::cout << "aggregate:" << v->to_string() << std::endl; + if(all_streamers[0]->getS3select()->is_aggregate_query()) + {//aggregation flow + for(auto cs : all_streamers) + { + auto sa = cs->getS3select()->get_scratch_area()->get_aggregation_results(); + s3select_processing_objects.push_back(cs->getS3select()); + for(auto v : sa) + {//debug + std::cout << "aggregate:" << v->to_string() << std::endl; + } } - } - - main_query_process.set_all_processing_objects(all_processing_object); - main_query_process.execute_query(); - std::cout << main_query_process.get_result() << std::endl; - return 0; -} - -int run_single_query_on_many_files_non_aggregate(char* q, std::vector files) -{ //the set of object defines one finite data-set. for non-aggregation flow. - - shared_queue sq; - boost::thread_group producer_threads, consumer_threads; - std::vector> vec_of_fp; + s3selectEngine::s3select main_query; + int status = main_query.parse_query(q.data()); + if(status<0) + { + std::cout << "failed to parse query" << std::endl; + return -1; + } - for(auto f : files) - { - s3selectEngine::s3select * ss = new (s3selectEngine::s3select); //TODO delete - auto thread_func = [q,f,ss,&sq](){return process_on_file(q,f,ss,&sq);}; - vec_of_fp.push_back( thread_func ); + //start second phase processing(relevant only for aggregation queries) + main_query.set_execution_phase(base_statement::multiple_executions_en::SECOND_PHASE); + merge_results main_query_process(&main_query); + main_query_process.set_all_processing_objects(s3select_processing_objects); + //execution at this point, means to scan all partial results(all participants), and aggregate results reside on all AST nodes. + main_query_process.execute_query(); + std::cout << main_query_process.get_result() << std::endl; } - - for(auto& t : vec_of_fp) - { - producer_threads.create_thread( t ); - } - - auto consumer_func = [&](){return sq.pop();}; - consumer_threads.create_thread(consumer_func); - producer_threads.join_all(); - sq.producers_complete(); - - consumer_threads.join_all(); - return 0; } @@ -213,6 +221,9 @@ int main(int argc, char **argv) if(argc<2) return -1; char* query=argv[1]; + std::string sql_query; + sql_query.assign(query); + std::vector list_of_files; setvbuf(stdout, NULL, _IONBF, 0);//unbuffered stdout @@ -221,8 +232,7 @@ int main(int argc, char **argv) list_of_files.push_back(argv[i]); } - //run_single_query_on_many_files_non_aggregate(query,list_of_files); - run_single_query_on_many_files_aggregate_query(query,list_of_files); + start_multiple_execution_flows(sql_query, list_of_files); return 0; } diff --git a/include/s3select.h b/include/s3select.h index e8f1dd57..41d454d3 100644 --- a/include/s3select.h +++ b/include/s3select.h @@ -19,6 +19,7 @@ #include #include //TODO where producer should be implemented +#define unlikely(x) __builtin_expect((x),0) #define _DEBUG_TERM {string token(a,b);std::cout << __FUNCTION__ << token << std::endl;} @@ -539,6 +540,8 @@ struct s3select : public bsc::grammar { e->set_phase_state(phase); } + + get_filter()->set_phase_state(phase); } std::string get_error_description() @@ -2202,7 +2205,8 @@ class base_s3object } row_update_data(); - if (!m_where_clause || m_where_clause->eval().is_true()) + //the second phase is relevant only for aggregation flows + if (!m_where_clause || unlikely(m_where_clause->is_second_phase()) || m_where_clause->eval().is_true()) { columnar_fetch_projection(); for (auto i : m_projections) diff --git a/include/s3select_functions.h b/include/s3select_functions.h index 271cb6bc..e0e5ca13 100644 --- a/include/s3select_functions.h +++ b/include/s3select_functions.h @@ -9,6 +9,8 @@ #include using namespace std::string_literals; +#define unlikely(x) __builtin_expect((x),0) +//#define unlikely(x) (x) #define BOOST_BIND_ACTION_PARAM( push_name ,param ) boost::bind( &push_name::operator(), g_ ## push_name , _1 ,_2, param) namespace s3selectEngine @@ -553,7 +555,7 @@ struct _fn_sum : public base_function base_statement* x = *iter; value res; - if(is_second_phase()) + if(unlikely(is_second_phase())) { m_scratch_area->pop_saved_result(res); } @@ -601,7 +603,7 @@ struct _fn_count : public base_function bool operator()(bs_stmt_vec_t* args, variable* result) override { - if(is_second_phase()) + if(unlikely(is_second_phase())) { m_scratch_area->pop_saved_result(saved_result); count = count + saved_result.i64(); @@ -652,7 +654,7 @@ struct _fn_avg : public base_function auto iter = args->begin(); base_statement *x = *iter; - if(is_second_phase()) + if(unlikely(is_second_phase())) { m_scratch_area->pop_saved_result(sum_save); m_scratch_area->pop_saved_result(count_save); @@ -708,7 +710,7 @@ struct _fn_min : public base_function base_statement* x = *iter; value res; - if(is_second_phase()) + if(unlikely(is_second_phase())) { m_scratch_area->pop_saved_result(res); } @@ -752,7 +754,7 @@ struct _fn_max : public base_function base_statement* x = *iter; value res; - if(is_second_phase()) + if(unlikely(is_second_phase())) { m_scratch_area->pop_saved_result(res); } From 0919c34fca6636acdc82f81e0e5c687160455965 Mon Sep 17 00:00:00 2001 From: gal salomon Date: Tue, 24 May 2022 19:48:26 +0300 Subject: [PATCH 5/7] bug fix. add pthread to scaleup app Signed-off-by: gal salomon --- example/CMakeLists.txt | 2 +- include/s3select.h | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt index a2f80cb1..4cb204c5 100644 --- a/example/CMakeLists.txt +++ b/example/CMakeLists.txt @@ -10,7 +10,7 @@ if(Arrow_FOUND) add_executable(csv_to_parquet csv_to_parquet.cpp) target_include_directories(csv_to_parquet PUBLIC ../include) target_link_libraries(s3select_example boost_date_time boost_system boost_thread parquet arrow boost_filesystem) - target_link_libraries(s3select_scaleup boost_date_time boost_system boost_thread parquet arrow boost_filesystem) + target_link_libraries(s3select_scaleup boost_date_time boost_system boost_thread parquet arrow boost_filesystem pthread) target_link_libraries(csv_to_parquet boost_date_time boost_system boost_thread parquet arrow) else() target_link_libraries(s3select_example boost_date_time boost_system boost_thread boost_filesystem) diff --git a/include/s3select.h b/include/s3select.h index 41d454d3..fc1c3231 100644 --- a/include/s3select.h +++ b/include/s3select.h @@ -541,7 +541,8 @@ struct s3select : public bsc::grammar e->set_phase_state(phase); } - get_filter()->set_phase_state(phase); + if(get_filter()) + get_filter()->set_phase_state(phase); } std::string get_error_description() From 0fc3e730202e16ae287cca1f2e6e1f6e08155f9f Mon Sep 17 00:00:00 2001 From: gal salomon Date: Wed, 25 May 2022 17:24:22 +0300 Subject: [PATCH 6/7] adding spiliting functionality. i.e. spliting an input into equal parts to be processed simultaneously Signed-off-by: gal salomon --- example/s3select_scaleup.cpp | 173 ++++++++++++++++++++++++++++++----- 1 file changed, 152 insertions(+), 21 deletions(-) diff --git a/example/s3select_scaleup.cpp b/example/s3select_scaleup.cpp index bbc2155c..66b7bc1b 100644 --- a/example/s3select_scaleup.cpp +++ b/example/s3select_scaleup.cpp @@ -100,14 +100,116 @@ class csv_streamer { } }; -int splitter() +int csv_splitter(const char* fn,std::vector>& ranges) { - //get single object , split by size , search for \n bounderies - //thread per { split-data-portion --> {while(not-end-of-data-portion){read , process_stream()) } + + char row_delim=10; + std::ifstream is(fn, std::ifstream::binary); + //size of file + is.seekg (0, is.end); + uint64_t length = is.tellg(); + is.seekg (0, is.beg); + uint16_t num_of_split = getenv("NUM_OF_INST") ? atoi(getenv("NUM_OF_INST")) : 8;//number of cores + //calculate split size + uint64_t split_sz = length / num_of_split; + + const uint32_t max_row_size=(split_sz > (num_of_split*1024)) ? 1024 : split_sz/10 ;//should twice as bigger than row max size + char buff[max_row_size]; + + uint64_t mark=0; + uint64_t prev_mark=0; + int range_no=0; + + do + { + + is.seekg(mark+(split_sz-max_row_size));//jump to location of next "cut" + is.read(buff,max_row_size); //reading small buff + uint64_t current_pos = is.tellg(); + uint64_t actual_read=is.gcount(); + + char* p=&buff[actual_read-1]; + + while(*p != row_delim && p != &buff[0])p--; + + if(*p != row_delim) + { + printf("row delimiter not found. abort\n"); + break; + } + + prev_mark = mark; + + range_no++; + + if(range_no(prev_mark,mark)); + printf("%d: range[%ld %ld] %ld\n",range_no,prev_mark,mark,mark-prev_mark); + + }while(mark!=length); + return 0; } -//TODO stream_chunk() +int stream_partof_file(const char* file, csv_streamer *cs,size_t from,size_t to) +{//each part is processed on seperate thread + std::ifstream input_file_stream; + size_t length = to - from; + size_t bytes_been_read = 0; + int status=0; + + //open-file + try { + input_file_stream = std::ifstream(file, std::ios::in | std::ios::binary); + input_file_stream.seekg(from); + } + catch( ... ) + { + std::cout << "failed to open file " << file << std::endl; + return(-1); + } + + //read-chunk +#define BUFFER_SIZE (4*1024*1024) + std::string buff(BUFFER_SIZE,0); + size_t buffer_to_read = BUFFER_SIZE; + while (true) + { + if(buffer_to_read > (length - bytes_been_read) ) + { + buffer_to_read = length - bytes_been_read; + } + + size_t read_sz = input_file_stream.readsome(buff.data(),buffer_to_read); + bytes_been_read += read_sz; + if(!read_sz || input_file_stream.eof()) + {//signaling end of stream + cs->process_stream(0,0,true); + break; + } + + status = cs->process_stream(buff.data(),read_sz,false); + if(status<0) + { + std::cout << "failure on execution " << std::endl; + break; + } + + if(!read_sz || input_file_stream.eof()) + { + break; + } + } + return 0; +} int stream_file(char* file, csv_streamer *cs) {//each file processed on seperate thread @@ -151,23 +253,12 @@ int stream_file(char* file, csv_streamer *cs) return 0; } -int start_multiple_execution_flows(std::string q, std::vector files) +int start_multiple_execution_flows(std::string q, std::vector& all_streamers, std::vector>& vec_of_fp, shared_queue& sq) { //the object-set defines one finite data-set for the query. - shared_queue sq; boost::thread_group producer_threads, consumer_threads; - std::vector> vec_of_fp; - std::vector all_streamers; std::vector s3select_processing_objects; - for(auto f : files) - { - csv_streamer *cs = new csv_streamer(q,&sq); - all_streamers.push_back(cs); - auto thread_func = [f,cs](){return stream_file(f,cs);}; - vec_of_fp.push_back( thread_func ); - } - for(auto& t : vec_of_fp) { //start with query processing @@ -215,14 +306,15 @@ int start_multiple_execution_flows(std::string q, std::vector files) return 0; } - -int main(int argc, char **argv) +int main_for_many_files(int argc, char **argv) { if(argc<2) return -1; - char* query=argv[1]; std::string sql_query; - sql_query.assign(query); + sql_query.assign(argv[1]); + shared_queue sq; + std::vector> vec_of_fp; + std::vector all_streamers; std::vector list_of_files; setvbuf(stdout, NULL, _IONBF, 0);//unbuffered stdout @@ -232,8 +324,47 @@ int main(int argc, char **argv) list_of_files.push_back(argv[i]); } - start_multiple_execution_flows(sql_query, list_of_files); + for(auto f : list_of_files) + { + csv_streamer *cs = new csv_streamer(sql_query,&sq); + all_streamers.push_back(cs); + auto thread_func = [f,cs](){return stream_file(f,cs);}; + vec_of_fp.push_back( thread_func ); + } + + start_multiple_execution_flows(sql_query,all_streamers,vec_of_fp,sq); return 0; } +int main(int argc, char **argv) +{ + if(argc<2) return -1; + + std::string sql_query; + sql_query.assign(argv[1]); + shared_queue sq; + std::vector> vec_of_fp; + std::vector all_streamers; + + std::string fn; + fn.assign(argv[2]); + + setvbuf(stdout, NULL, _IONBF, 0);//unbuffered stdout + + //spiliting single input file into ranges + std::vector> ranges; + csv_splitter(fn.data(),ranges); + + for(auto r : ranges) + { + csv_streamer *cs = new csv_streamer(sql_query,&sq); + all_streamers.push_back(cs); + auto thread_func = [fn,r,cs](){return stream_partof_file(fn.data(), cs, r.first, r.second);}; + vec_of_fp.push_back( thread_func ); + } + + start_multiple_execution_flows(sql_query,all_streamers,vec_of_fp,sq); + + return 0; +} From 974593d9693f18ca1025a00c212abe2822338d7c Mon Sep 17 00:00:00 2001 From: galsalomon66 Date: Mon, 5 Sep 2022 17:28:28 +0300 Subject: [PATCH 7/7] rebase. adding json flow Signed-off-by: galsalomon66 --- example/CMakeLists.txt | 2 +- example/s3select_example.cpp | 1 + include/s3select.h | 28 +++++++++++++++------------- include/s3select_oper.h | 2 ++ test/s3select_test.cpp | 14 +++++++------- test/s3select_test.h | 2 +- 6 files changed, 27 insertions(+), 22 deletions(-) diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt index 4cb204c5..0a7e6c75 100644 --- a/example/CMakeLists.txt +++ b/example/CMakeLists.txt @@ -1,7 +1,7 @@ add_executable(s3select_example s3select_example.cpp) target_include_directories(s3select_example PUBLIC ../include ../rapidjson/include) add_executable(s3select_scaleup s3select_scaleup.cpp) -target_include_directories(s3select_scaleup PUBLIC ../include) +target_include_directories(s3select_scaleup PUBLIC ../include ../rapidjson/include) find_package(Arrow QUIET) diff --git a/example/s3select_example.cpp b/example/s3select_example.cpp index 3733eaa0..1e0c4811 100644 --- a/example/s3select_example.cpp +++ b/example/s3select_example.cpp @@ -458,6 +458,7 @@ int run_on_localFile(char* input_query) s3select_result result; s3selectEngine::csv_object::csv_defintions csv; csv.use_header_info = false; + bool do_aggregate = false; //csv.column_delimiter='|'; //csv.row_delimiter='\t'; diff --git a/include/s3select.h b/include/s3select.h index fc1c3231..0015c2e2 100644 --- a/include/s3select.h +++ b/include/s3select.h @@ -2723,7 +2723,7 @@ class json_object : public base_s3object JsonParserHandler JsonHandler; size_t m_processed_bytes; bool m_end_of_stream; - std::string s3select_result; + s3select_result m_result; size_t m_row_count; bool star_operation_ind; std::string m_error_description; @@ -2792,23 +2792,23 @@ class json_object : public base_s3object //execute statement on row //create response (TODO callback) - size_t result_len = s3select_result.size(); + size_t result_len = m_result.size(); int status=0; try{ - status = getMatchRow(s3select_result); + status = getMatchRow(m_result); } catch(s3selectEngine::base_s3select_exception& e) { - sql_error_handling(e,s3select_result); + sql_error_handling(e,m_result.str()); status = -1; } m_sa->clear_data(); - if(star_operation_ind && (s3select_result.size() != result_len)) + if(star_operation_ind && (m_result.size() != result_len)) {//as explained above the star-operation is displayed differently std::string end_of_row; end_of_row = "#=== " + std::to_string(m_row_count++) + " ===#\n"; - s3select_result.append(end_of_row); + m_result.append(end_of_row); } return status; } @@ -2833,10 +2833,10 @@ class json_object : public base_s3object //the error-handling takes care of the error flow. m_error_description = e.what(); m_error_count++; - s3select_result.append(std::to_string(m_error_count)); - s3select_result += " : "; - s3select_result.append(m_error_description); - s3select_result += m_csv_defintion.output_row_delimiter; + m_result.append(std::to_string(m_error_count)); + m_result.append(std::string(" : ")); + m_result.append(m_error_description); + m_result.append(&m_csv_defintion.output_row_delimiter,1); } public: @@ -2845,21 +2845,21 @@ class json_object : public base_s3object { int status=0; m_processed_bytes += stream_length; - s3select_result.clear(); + m_result.clear(); if(!stream_length || !json_stream)//TODO m_processed_bytes(?) {//last processing cycle JsonHandler.process_json_buffer(0, 0, true);//TODO end-of-stream = end-of-row m_end_of_stream = true; sql_execution_on_row_cb(); - result = s3select_result; + result = m_result.str(); return 0; } try{ //the handler is processing any buffer size and return results per each buffer status = JsonHandler.process_json_buffer((char*)json_stream, stream_length); - result = s3select_result;//TODO remove this result copy + result = m_result.str();//TODO remove this result copy } catch(std::exception &e) { @@ -2878,6 +2878,8 @@ class json_object : public base_s3object ~json_object() = default; +}; + class merge_results : public base_s3object {//purpose: upon processing several stream on a single aggregate query, this object should merge results. diff --git a/include/s3select_oper.h b/include/s3select_oper.h index 62b4c1b0..53a4d8ce 100644 --- a/include/s3select_oper.h +++ b/include/s3select_oper.h @@ -1335,6 +1335,8 @@ class base_statement m_projection_alias(nullptr), m_eval_stack_depth(0), m_skip_non_aggregate_op(false), execution_phase(multiple_executions_en::NA){} + multiple_executions_en execution_phase; + virtual value& eval() { //purpose: on aggregation flow to run only the correct subtree(aggregation subtree) diff --git a/test/s3select_test.cpp b/test/s3select_test.cpp index b4f93204..801cc2be 100644 --- a/test/s3select_test.cpp +++ b/test/s3select_test.cpp @@ -825,8 +825,7 @@ void test_single_column_single_row(const char* input_query,const char* expected_ #ifdef _ARROW_EXIST parquet_csv_report_error(parquet_result.str(),csv_result.str()); #endif - json_csv_report_error(json_result, s3select_result); - ASSERT_EQ(s3select_result, std::string(expected_result)); + json_csv_report_error(json_result, csv_result.str()); ASSERT_EQ(csv_result.str(), std::string(expected_result)); } @@ -2664,7 +2663,8 @@ void generate_csv_quote_and_escape(std::string& out, char quote = '"', char escp TEST(TestS3selectFunctions, csv_quote_string_and_escape_char) { - std::string input, s3select_result_1, s3select_result_2, s3select_result_3; + std::string input; + s3select_result s3select_result_1, s3select_result_2, s3select_result_3; csv_object::csv_defintions csv; generate_csv_quote_and_escape(input); s3select s3select_syntax1, s3select_syntax2, s3select_syntax3; @@ -2683,7 +2683,7 @@ TEST(TestS3selectFunctions, csv_quote_string_and_escape_char) s3selectEngine::csv_object s3_csv_object_second(&s3select_syntax2, csv); s3_csv_object_second.run_s3select_on_object(s3select_result_2, input.c_str(), input.size(), false, false, true); - EXPECT_EQ(s3select_result_1, s3select_result_2); + EXPECT_EQ(s3select_result_1.str(), s3select_result_2.str()); csv.escape_char = '\0'; csv.quot_char = '\0'; @@ -2695,13 +2695,13 @@ TEST(TestS3selectFunctions, csv_quote_string_and_escape_char) s3selectEngine::csv_object s3_csv_object_third(&s3select_syntax3, csv); s3_csv_object_third.run_s3select_on_object(s3select_result_3, input.c_str(), input.size(), false, false, true); - EXPECT_EQ(s3select_result_3, input); + EXPECT_EQ(s3select_result_3.str(), input); } TEST(TestS3selectFunctions, csv_comment_line_and_trim_char) { std::string input; - std::string s3select_result_1, s3select_result_2; + s3select_result s3select_result_1, s3select_result_2; generate_csv_quote_and_escape(input); s3select s3select_syntax; @@ -2725,7 +2725,7 @@ TEST(TestS3selectFunctions, csv_comment_line_and_trim_char) s3selectEngine::csv_object s3_csv_object_second(&s3select_syntax, csv); s3_csv_object_second.run_s3select_on_object(s3select_result_2, input.c_str(), input.size(), false, false, true); - EXPECT_EQ(s3select_result_1, s3select_result_2); + EXPECT_EQ(s3select_result_1.str(), s3select_result_2.str()); } TEST(TestS3selectFunctions, presto_syntax_alignments) diff --git a/test/s3select_test.h b/test/s3select_test.h index e5b8f94d..be0ad2f6 100644 --- a/test/s3select_test.h +++ b/test/s3select_test.h @@ -691,7 +691,7 @@ std::string run_s3select(std::string expression,std::string input, const char* j } run_json_query(json_query, js, json_result); - json_csv_report_error(json_result, s3select_result); + json_csv_report_error(json_result, csv_result.str()); return csv_result.str(); }