diff --git a/Sinks/CSVSink/CSVSink.cpp b/Sinks/CSVSink/CSVSink.cpp index 9488094..5b4b198 100644 --- a/Sinks/CSVSink/CSVSink.cpp +++ b/Sinks/CSVSink/CSVSink.cpp @@ -387,7 +387,7 @@ void CsvSink::write(int thr) runtimeEnv->freeBufferIds->push(bufferId); buffersWritten++; - runtimeEnv->tf_paras.bufProcessed.at(thr) = buffersWritten; + runtimeEnv->tf_paras.bufProcessed.at(thr)++; } outputFile.close(); diff --git a/Sinks/main.cpp b/Sinks/main.cpp index 270acd2..b4501ad 100644 --- a/Sinks/main.cpp +++ b/Sinks/main.cpp @@ -11,322 +11,320 @@ #include "../xdbc/metrics_calculator.h" // Utility functions for schema handling -static xdbc::SchemaAttribute createSchemaAttribute(std::string name, std::string tpe, int size) { - xdbc::SchemaAttribute att; - att.name = std::move(name); - att.tpe = std::move(tpe); - att.size = size; - return att; +static xdbc::SchemaAttribute createSchemaAttribute(std::string name, std::string tpe, int size) +{ + xdbc::SchemaAttribute att; + att.name = std::move(name); + att.tpe = std::move(tpe); + att.size = size; + return att; } -std::string formatSchema(const std::vector &schema) { - std::stringstream ss; - ss << std::setw(20) << std::left << "Name" - << std::setw(15) << std::left << "Type" - << std::setw(10) << std::left << "Size" << '\n'; - - for (const auto &tuple: schema) { - ss << std::setw(20) << std::left << tuple.name - << std::setw(15) << std::left << tuple.tpe - << std::setw(10) << std::left << tuple.size << '\n'; - } - return ss.str(); +std::string formatSchema(const std::vector &schema) +{ + std::stringstream ss; + ss << std::setw(20) << std::left << "Name" + << std::setw(15) << std::left << "Type" + << std::setw(10) << std::left << "Size" << '\n'; + + for (const auto &tuple : schema) + { + ss << std::setw(20) << std::left << tuple.name + << std::setw(15) << std::left << tuple.tpe + << std::setw(10) << std::left << tuple.size << '\n'; + } + return ss.str(); } -std::vector createSchemaFromConfig(const std::string &configFile) { - std::ifstream file(configFile); - if (!file.is_open()) { - spdlog::get("XDBC.SINK")->error("Failed to open schema file: {0}", configFile); - exit(EXIT_FAILURE); - } - - nlohmann::json schemaJson; - file >> schemaJson; - - std::vector schema; - for (const auto &item: schemaJson) { - schema.emplace_back(xdbc::SchemaAttribute{ - item["name"], item["type"], item["size"]}); - } - return schema; +std::vector createSchemaFromConfig(const std::string &configFile) +{ + std::ifstream file(configFile); + if (!file.is_open()) + { + spdlog::get("XDBC.SINK")->error("Failed to open schema file: {0}", configFile); + exit(EXIT_FAILURE); + } + + nlohmann::json schemaJson; + file >> schemaJson; + + std::vector schema; + for (const auto &item : schemaJson) + { + schema.emplace_back(xdbc::SchemaAttribute{ + item["name"], item["type"], item["size"]}); + } + return schema; } -std::string readJsonFileIntoString(const std::string &filePath) { - std::ifstream file(filePath); - if (!file.is_open()) { - spdlog::get("XDBC.SINK")->error("Failed to open schema file: {0}", filePath); - exit(EXIT_FAILURE); - } - - std::stringstream buffer; - buffer << file.rdbuf(); - return buffer.str(); +std::string readJsonFileIntoString(const std::string &filePath) +{ + std::ifstream file(filePath); + if (!file.is_open()) + { + spdlog::get("XDBC.SINK")->error("Failed to open schema file: {0}", filePath); + exit(EXIT_FAILURE); + } + + std::stringstream buffer; + buffer << file.rdbuf(); + return buffer.str(); } -void handleSinkCMDParams(int argc, char *argv[], xdbc::RuntimeEnv &env, std::string &outputBasePath) { - namespace po = boost::program_options; - - po::options_description desc("Usage: ./csvsink [options]\n\nAllowed options"); - desc.add_options()("help,h", "Produce help message.")("server-host,a", - po::value()->default_value("xdbcserver"), - "Server Host: \nDefault:\n xdbcserver")("server-port", - po::value()->default_value( - "1234"), - "Server port: \nDefault:\n 1234")( - "table,e", po::value()->default_value("lineitem_sf10"), "Input table name.")("output,o", - po::value()->default_value( - "/dev/shm/output"), - "Output CSV base file path.")( - "buffer-size,b", po::value()->default_value(64), "Buffer size in KiB.")("bufferpool-size,p", - po::value()->default_value( - 4096), - "Buffer pool size in KiB.")( - "net-parallelism,n", po::value()->default_value(1), "Set the network parallelism grade.\nDefault: 1")( - "decomp-parallelism,d", po::value()->default_value(1), "Decompression Parallelism.\nDefault: 1")( - "serialize-parallelism,s", po::value()->default_value(1), "Number of serializer threads.")( - "write-parallelism,w", po::value()->default_value(1), "Number of write threads.")( - "intermediate-format,f", po::value()->default_value(1), - "Intermediate format: 1 (row) or 2 (column).")("transfer-id,tid", po::value()->default_value(0), - "Set the transfer id.\nDefault: 0")("profiling-interval", - po::value()->default_value( - 1000), - "Set profiling interval.\nDefault: 1000")( - "skip-serializer", po::value()->default_value(0), - "Skip serialization (0/1).\nDefault: false")("target", po::value()->default_value("csv"), - "Target (csv, parquet).\nDefault: csv")("spawn-source", - po::value()->default_value( - 0), - "Set spawn source (0 means direct launch or 1 means spawned using controller).\nDefault: 0"); - - po::variables_map vm; - po::store(po::parse_command_line(argc, argv, desc), vm); - - if (vm.count("help")) { - std::cout << desc << std::endl; - exit(0); - } - - try { - po::notify(vm); - } - catch (po::required_option &e) { - spdlog::get("XDBC.SINK")->error("Missing required options: {0}", e.what()); - exit(EXIT_FAILURE); - } - - env.env_name = "Sink"; - env.server_host = vm["server-host"].as(); - env.server_port = vm["server-port"].as(); - env.transfer_id = vm["transfer-id"].as(); - env.table = vm["table"].as(); - env.buffer_size = vm["buffer-size"].as(); - env.buffers_in_bufferpool = vm["bufferpool-size"].as() / vm["buffer-size"].as(); - env.rcv_parallelism = vm["net-parallelism"].as(); - env.decomp_parallelism = vm["decomp-parallelism"].as(); - env.ser_parallelism = vm["serialize-parallelism"].as(); - env.write_parallelism = vm["write-parallelism"].as(); - env.iformat = vm["intermediate-format"].as(); - env.target = vm["target"].as(); - env.profilingInterval = vm["profiling-interval"].as(); - outputBasePath = vm["output"].as(); - - env.skip_serializer = vm["skip-serializer"].as(); - env.spawn_source = vm["spawn-source"].as(); - - std::string schemaFile = "/xdbc-client/tests/schemas/" + env.table + ".json"; - - env.schema = createSchemaFromConfig(schemaFile); - env.schemaJSON = readJsonFileIntoString(schemaFile); - env.tuple_size = std::accumulate(env.schema.begin(), env.schema.end(), 0, - [](int acc, const xdbc::SchemaAttribute &attr) { - return acc + attr.size; - }); - - env.tuples_per_buffer = (env.buffer_size * 1024) / env.tuple_size; - env.max_threads = env.buffers_in_bufferpool; - env.startTime = std::chrono::steady_clock::now(); - - spdlog::get("XDBC.SINK")->info("Table: {0}, Tuple size: {1}, Schema:\n{2}", env.table, env.tuple_size, - formatSchema(env.schema)); +void handleSinkCMDParams(int argc, char *argv[], xdbc::RuntimeEnv &env, std::string &outputBasePath) +{ + namespace po = boost::program_options; + + po::options_description desc("Usage: ./csvsink [options]\n\nAllowed options"); + desc.add_options()("help,h", "Produce help message.")("server-host,a", po::value()->default_value("xdbcserver"), + "Server Host: \nDefault:\n xdbcserver")("server-port", po::value()->default_value("1234"), + "Server port: \nDefault:\n 1234")("table,e", po::value()->default_value("lineitem_sf10"), "Input table name.")("output,o", po::value()->default_value("/dev/shm/output"), "Output CSV base file path.")("buffer-size,b", po::value()->default_value(64), "Buffer size in KiB.")("bufferpool-size,p", po::value()->default_value(4096), "Buffer pool size in KiB.")("net-parallelism,n", po::value()->default_value(1), "Set the network parallelism grade.\nDefault: 1")("decomp-parallelism,d", po::value()->default_value(1), "Decompression Parallelism.\nDefault: 1")("serialize-parallelism,s", po::value()->default_value(1), "Number of serializer threads.")("write-parallelism,w", po::value()->default_value(1), "Number of write threads.")("intermediate-format,f", po::value()->default_value(1), + "Intermediate format: 1 (row) or 2 (column).")("transfer-id,tid", po::value()->default_value(0), + "Set the transfer id.\nDefault: 0")("profiling-interval", po::value()->default_value(1000), + "Set profiling interval.\nDefault: 1000")("skip-serializer", po::value()->default_value(0), + "Skip serialization (0/1).\nDefault: false")("target", po::value()->default_value("csv"), + "Target (csv, parquet).\nDefault: csv")("spawn-source", po::value()->default_value(0), + "Set spawn source (0 means direct launch or 1 means spawned using controller).\nDefault: 0"); + + po::variables_map vm; + po::store(po::parse_command_line(argc, argv, desc), vm); + + if (vm.count("help")) + { + std::cout << desc << std::endl; + exit(0); + } + + try + { + po::notify(vm); + } + catch (po::required_option &e) + { + spdlog::get("XDBC.SINK")->error("Missing required options: {0}", e.what()); + exit(EXIT_FAILURE); + } + + env.env_name = "Sink"; + env.server_host = vm["server-host"].as(); + env.server_port = vm["server-port"].as(); + env.transfer_id = vm["transfer-id"].as(); + env.table = vm["table"].as(); + env.buffer_size = vm["buffer-size"].as(); + env.buffers_in_bufferpool = vm["bufferpool-size"].as() / vm["buffer-size"].as(); + env.rcv_parallelism = vm["net-parallelism"].as(); + env.decomp_parallelism = vm["decomp-parallelism"].as(); + env.ser_parallelism = vm["serialize-parallelism"].as(); + env.write_parallelism = vm["write-parallelism"].as(); + env.iformat = vm["intermediate-format"].as(); + env.target = vm["target"].as(); + env.profilingInterval = vm["profiling-interval"].as(); + outputBasePath = vm["output"].as(); + + env.skip_serializer = vm["skip-serializer"].as(); + env.spawn_source = vm["spawn-source"].as(); + + std::string schemaFile = "/xdbc-client/tests/schemas/" + env.table + ".json"; + + env.schema = createSchemaFromConfig(schemaFile); + env.schemaJSON = readJsonFileIntoString(schemaFile); + env.tuple_size = std::accumulate(env.schema.begin(), env.schema.end(), 0, + [](int acc, const xdbc::SchemaAttribute &attr) + { + return acc + attr.size; + }); + + env.tuples_per_buffer = (env.buffer_size * 1024) / env.tuple_size; + env.max_threads = env.buffers_in_bufferpool; + env.startTime = std::chrono::steady_clock::now(); + + spdlog::get("XDBC.SINK")->info("Table: {0}, Tuple size: {1}, Schema:\n{2}", env.table, env.tuple_size, formatSchema(env.schema)); } -nlohmann::json metrics_convert(xdbc::RuntimeEnv &env) { - nlohmann::json metrics_json = nlohmann::json::object(); // Use a JSON object - if ((env.pts) && (env.enable_updation == 1)) { - std::vector env_pts; - env_pts = env.pts->copy_newElements(); - auto component_metrics_ = calculate_metrics(env_pts, env.buffer_size); - for (const auto &pair: component_metrics_) { - nlohmann::json metric_object = nlohmann::json::object(); - const Metrics &metric = pair.second; - - metric_object["waitingTime_ms"] = metric.waiting_time_ms; - metric_object["processingTime_ms"] = metric.processing_time_ms; - metric_object["totalTime_ms"] = metric.overall_time_ms; - - metric_object["totalThroughput"] = metric.total_throughput; - metric_object["perBufferThroughput"] = metric.per_buffer_throughput; - - metrics_json[pair.first] = metric_object; - } - } - return metrics_json; +nlohmann::json metrics_convert(xdbc::RuntimeEnv &env) +{ + nlohmann::json metrics_json = nlohmann::json::object(); // Use a JSON object + if ((env.pts) && (env.enable_updation == 1)) + { + auto &env_pts = *(env.pts); + auto component_metrics_ = calculate_metrics(env_pts, env.buffer_size, true); + for (const auto &pair : component_metrics_) + { + nlohmann::json metric_object = nlohmann::json::object(); + const Metrics &metric = pair.second; + + metric_object["waitingTime_ms"] = metric.waiting_time_ms; + metric_object["processingTime_ms"] = metric.processing_time_ms; + metric_object["totalTime_ms"] = metric.overall_time_ms; + + metric_object["totalThroughput"] = metric.total_throughput; + metric_object["perBufferThroughput"] = metric.per_buffer_throughput; + + metrics_json[pair.first] = metric_object; + } + } + return metrics_json; } -nlohmann::json additional_msg(xdbc::RuntimeEnv &env) { - nlohmann::json metrics_json = nlohmann::json::object(); // Use a JSON object - metrics_json["totalTime_ms"] = env.tf_paras.elapsed_time; - metrics_json["bufTransferred"] = std::accumulate(env.tf_paras.bufProcessed.begin(), env.tf_paras.bufProcessed.end(), - 0); - - if (env.enable_updation == 1) { - metrics_json["freeBufferQ_load"] = std::get<0>(env.tf_paras.latest_queueSizes); - metrics_json["compressedBufferQ_load"] = std::get<1>(env.tf_paras.latest_queueSizes); - metrics_json["decompressedBufferQ_load"] = std::get<2>(env.tf_paras.latest_queueSizes); - metrics_json["serializedBufferQ_load"] = std::get<3>(env.tf_paras.latest_queueSizes); - } - return metrics_json; +nlohmann::json additional_msg(xdbc::RuntimeEnv &env) +{ + nlohmann::json metrics_json = nlohmann::json::object(); // Use a JSON object + metrics_json["totalTime_ms"] = env.tf_paras.elapsed_time; + metrics_json["bufTransferred"] = std::accumulate(env.tf_paras.bufProcessed.begin(), env.tf_paras.bufProcessed.end(), 0); + if (env.enable_updation == 1) + { + metrics_json["freeBufferQ_load"] = std::get<0>(env.tf_paras.latest_queueSizes); + metrics_json["compressedBufferQ_load"] = std::get<1>(env.tf_paras.latest_queueSizes); + metrics_json["decompressedBufferQ_load"] = std::get<2>(env.tf_paras.latest_queueSizes); + metrics_json["serializedBufferQ_load"] = std::get<3>(env.tf_paras.latest_queueSizes); + } + return metrics_json; } -void env_convert(xdbc::RuntimeEnv &env, const nlohmann::json &env_json) { - try { - // env.buffer_size = std::stoi(env_json.at("bufferSize").get()); - // env.buffers_in_bufferpool = std::stoi(env_json.at("bufferpoolSize").get()) / env_.buffer_size; - // env.rcv_parallelism = std::stoi(env_json.at("netParallelism").get()); - - // Update the actual environment object if updates are allowed - if (env.enable_updation == 1) { - env.write_parallelism = std::stoi(env_json.at("writeParallelism").get()); - env.decomp_parallelism = std::stoi(env_json.at("decompParallelism").get()); - env.ser_parallelism = std::stoi(env_json.at("serParallelism").get()); - env.env_manager.configureThreads("write", env.write_parallelism); - env.env_manager.configureThreads("serial", env.ser_parallelism); - env.env_manager.configureThreads("decompress", env.decomp_parallelism); - } - } - catch (const std::exception &e) { - std::cerr << "Error converting env JSON: " << e.what() << std::endl; - } +void env_convert(xdbc::RuntimeEnv &env, const nlohmann::json &env_json) +{ + try + { + // env.buffer_size = std::stoi(env_json.at("bufferSize").get()); + // env.buffers_in_bufferpool = std::stoi(env_json.at("bufferpoolSize").get()) / env_.buffer_size; + // env.rcv_parallelism = std::stoi(env_json.at("netParallelism").get()); + + // Update the actual environment object if updates are allowed + if (env.enable_updation == 1) + { + env.write_parallelism = std::stoi(env_json.at("writeParallelism").get()); + env.decomp_parallelism = std::stoi(env_json.at("decompParallelism").get()); + env.ser_parallelism = std::stoi(env_json.at("serParallelism").get()); + env.env_manager.configureThreads("write", env.write_parallelism); + env.env_manager.configureThreads("serial", env.ser_parallelism); + env.env_manager.configureThreads("decompress", env.decomp_parallelism); + } + } + catch (const std::exception &e) + { + std::cerr << "Error converting env JSON: " << e.what() << std::endl; + } } -int main(int argc, char *argv[]) { - auto console = spdlog::stdout_color_mt("XDBC.SINK"); - spdlog::set_level(spdlog::level::info); +int main(int argc, char *argv[]) +{ + auto console = spdlog::stdout_color_mt("XDBC.SINK"); + spdlog::set_level(spdlog::level::info); - xdbc::RuntimeEnv env; - std::string outputBasePath; + xdbc::RuntimeEnv env; + std::string outputBasePath; - handleSinkCMDParams(argc, argv, env, outputBasePath); + handleSinkCMDParams(argc, argv, env, outputBasePath); - //*** - // Initialize XClient - xdbc::XClient xclient(env); - xclient.startReceiving(env.table); + //*** + // Initialize XClient + xdbc::XClient xclient(env); + xclient.startReceiving(env.table); - if (env.target == "csv") { - CsvSink csvSink(outputBasePath, &env); + if (env.target == "csv") + { + CsvSink csvSink(outputBasePath, &env); - env.env_manager.registerOperation("serial", [&](int thr) { - try { - if (thr >= env.max_threads) { - spdlog::get("XCLIENT")->error("No of threads exceed limit"); - return; - } - csvSink.serialize(thr); - } catch (const std::exception &e) { - spdlog::get("XCLIENT")->error("Exception in thread {}: {}", thr, e.what()); - } catch (...) { - spdlog::get("XCLIENT")->error("Unknown exception in thread {}", thr); + env.env_manager.registerOperation("serial", [&](int thr) + { try { + if (thr >= env.max_threads) { + spdlog::get("XCLIENT")->error("No of threads exceed limit"); + return; } - }, env.decompressedBufferIds); - - env.env_manager.registerOperation("write", [&](int thr) { - try { - if (thr >= env.max_threads) { - spdlog::get("XCLIENT")->error("No of threads exceed limit"); - return; - } - csvSink.write(thr); - } catch (const std::exception &e) { - spdlog::get("XCLIENT")->error("Exception in thread {}: {}", thr, e.what()); + csvSink.serialize(thr); + } catch (const std::exception& e) { + spdlog::get("XCLIENT")->error("Exception in thread {}: {}", thr, e.what()); } catch (...) { - spdlog::get("XCLIENT")->error("Unknown exception in thread {}", thr); + spdlog::get("XCLIENT")->error("Unknown exception in thread {}", thr); + } }, env.decompressedBufferIds); + + env.env_manager.registerOperation("write", [&](int thr) + { try { + if (thr >= env.max_threads) { + spdlog::get("XCLIENT")->error("No of threads exceed limit"); + return; } - }, env.serializedBufferIds); - - env.env_manager.configureThreads("serial", env.ser_parallelism); - env.env_manager.configureThreads("write", env.write_parallelism); - } else if (env.target == "parquet") { - PQSink parquetSink(outputBasePath, &env); - - env.env_manager.registerOperation("serial", [&](int thr) { - try { - if (thr >= env.max_threads) { - spdlog::get("XCLIENT")->error("No of threads exceed limit"); - return; - } - parquetSink.serialize(thr); - } catch (const std::exception &e) { - spdlog::get("XCLIENT")->error("Exception in thread {}: {}", thr, e.what()); + csvSink.write(thr); + } catch (const std::exception& e) { + spdlog::get("XCLIENT")->error("Exception in thread {}: {}", thr, e.what()); } catch (...) { - spdlog::get("XCLIENT")->error("Unknown exception in thread {}", thr); + spdlog::get("XCLIENT")->error("Unknown exception in thread {}", thr); + } }, env.serializedBufferIds); + + env.env_manager.configureThreads("serial", env.ser_parallelism); + env.env_manager.configureThreads("write", env.write_parallelism); + } + else if (env.target == "parquet") + { + PQSink parquetSink(outputBasePath, &env); + + env.env_manager.registerOperation("serial", [&](int thr) + { try { + if (thr >= env.max_threads) { + spdlog::get("XCLIENT")->error("No of threads exceed limit"); + return; } - }, env.decompressedBufferIds); - - env.env_manager.registerOperation("write", [&](int thr) { - try { - if (thr >= env.max_threads) { - spdlog::get("XCLIENT")->error("No of threads exceed limit"); - return; - } - parquetSink.write(thr); - } catch (const std::exception &e) { - spdlog::get("XCLIENT")->error("Exception in thread {}: {}", thr, e.what()); + parquetSink.serialize(thr); + } catch (const std::exception& e) { + spdlog::get("XCLIENT")->error("Exception in thread {}: {}", thr, e.what()); } catch (...) { - spdlog::get("XCLIENT")->error("Unknown exception in thread {}", thr); + spdlog::get("XCLIENT")->error("Unknown exception in thread {}", thr); + } }, env.decompressedBufferIds); + + env.env_manager.registerOperation("write", [&](int thr) + { try { + if (thr >= env.max_threads) { + spdlog::get("XCLIENT")->error("No of threads exceed limit"); + return; } - }, env.serializedBufferIds); - - env.env_manager.configureThreads("serial", env.ser_parallelism); // start serial component threads - env.env_manager.configureThreads("write", env.write_parallelism); // start write component threads - } - - // *** Setup websocket interface for controller *** - std::thread io_thread; - WebSocketClient ws_client("xdbc-controller", "8002"); - if (env.spawn_source == 1) { - env.enable_updation = 1; - ws_client.start(); - io_thread = std::thread([&]() { - ws_client.run( - std::bind(&metrics_convert, std::ref(env)), - std::bind(&additional_msg, std::ref(env)), - std::bind(&env_convert, std::ref(env), std::placeholders::_1)); - }); - } - while (env.enable_updation == 1) // Reconfigure threads as long as it is allowed - { - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - } - // *** Finished Setup websocket interface for controller *** - - // Wait for receive threads to finish, then kill the remaining components in proper sequence : decompress-serial-write - xclient.finishReceiving(); - env.env_manager.configureThreads("serial", 0); - env.env_manager.joinThreads("serial"); - env.env_manager.configureThreads("write", 0); - env.env_manager.joinThreads("write"); - - xclient.finalize(); - spdlog::get("XDBC.CSVSINK")->info("{} serialization completed. Output files are available at: {}", env.target, - outputBasePath); - // *** Stop websocket client - if (env.spawn_source == 1) { - ws_client.stop(); - if (io_thread.joinable()) { - io_thread.join(); - } - } - - return 0; + parquetSink.write(thr); + } catch (const std::exception& e) { + spdlog::get("XCLIENT")->error("Exception in thread {}: {}", thr, e.what()); + } catch (...) { + spdlog::get("XCLIENT")->error("Unknown exception in thread {}", thr); + } }, env.serializedBufferIds); + + env.env_manager.configureThreads("serial", env.ser_parallelism); // start serial component threads + env.env_manager.configureThreads("write", env.write_parallelism); // start write component threads + } + + // *** Setup websocket interface for controller *** + std::thread io_thread; + WebSocketClient ws_client("xdbc-controller", "8002"); + if (env.spawn_source == 1) + { + env.enable_updation = 1; + ws_client.start(); + io_thread = std::thread([&]() + { ws_client.run( + std::bind(&metrics_convert, std::ref(env)), + std::bind(&additional_msg, std::ref(env)), + std::bind(&env_convert, std::ref(env), std::placeholders::_1)); }); + } + while (env.enable_updation == 1) // Reconfigure threads as long as it is allowed + { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + // *** Finished Setup websocket interface for controller *** + + // Wait for receive threads to finish, then kill the remaining components in proper sequence : decompress-serial-write + xclient.finishReceiving(); + env.env_manager.configureThreads("serial", 0); + env.env_manager.joinThreads("serial"); + env.env_manager.configureThreads("write", 0); + env.env_manager.joinThreads("write"); + + xclient.finalize(); + spdlog::get("XDBC.CSVSINK")->info("{} serialization completed. Output files are available at: {}", env.target, outputBasePath); + // *** Stop websocket client + if (env.spawn_source == 1) + { + ws_client.stop(); + if (io_thread.joinable()) + { + io_thread.join(); + } + } + + return 0; } diff --git a/xdbc/ControllerInterface/WebSocketClient.cpp b/xdbc/ControllerInterface/WebSocketClient.cpp index 8dcc538..16dcf7d 100644 --- a/xdbc/ControllerInterface/WebSocketClient.cpp +++ b/xdbc/ControllerInterface/WebSocketClient.cpp @@ -3,11 +3,13 @@ #include WebSocketClient::WebSocketClient(const std::string &host, const std::string &port) - : host_(host), port_(port), resolver_(ioc_), ws_(ioc_), timer_(ioc_), active_(false), stop_thread_(false), - operation_started_(false) {} + : host_(host), port_(port), resolver_(ioc_), ws_(ioc_), timer_(ioc_), active_(false), stop_thread_(false), + operation_started_(false) {} -void WebSocketClient::start() { - try { +void WebSocketClient::start() +{ + try + { // Resolve host and port auto results = resolver_.resolve(host_, port_); // Connect to the first resolved endpoint @@ -26,62 +28,59 @@ void WebSocketClient::start() { auto start_time = std::chrono::steady_clock::now(); const std::chrono::seconds timeout(10); // Set a timeout duration - while (!acknowledged) { + while (!acknowledged) + { // Check for timeout auto elapsed = std::chrono::steady_clock::now() - start_time; - if (elapsed > timeout) { + if (elapsed > timeout) + { spdlog::error("Timeout waiting for server acknowledgment."); throw std::runtime_error("Server acknowledgment timeout"); } // Attempt to read the acknowledgment - try { + try + { ws_.read(buffer); std::string ack_response = beast::buffers_to_string(buffer.data()); spdlog::info("Received acknowledgment: {}", ack_response); // Parse and check acknowledgment json ack_json = json::parse(ack_response); - if (ack_json["operation"] == "acknowledged") { + if (ack_json["operation"] == "acknowledged") + { acknowledged = true; operation_started_ = true; // Set flag indicating acknowledgment received spdlog::info("Server acknowledged the start request."); - } else { + } + else + { spdlog::warn("Server response does not acknowledge start: {}", ack_json.dump()); // throw std::runtime_error("Server rejected start request"); } } - catch (const std::exception &e) { + catch (const std::exception &e) + { spdlog::error("Error while waiting for acknowledgment: {}", e.what()); // Optional: Retry after a short delay std::this_thread::sleep_for(std::chrono::milliseconds(500)); } } } - catch (const std::exception &e) { + catch (const std::exception &e) + { spdlog::error("WebSocket Client Error during start: {}", e.what()); throw; // Rethrow the exception to notify the caller } } -void WebSocketClient::periodic_communication() { - try { - while (!stop_thread_) { - // Convert metrics to JSON and send it - json metrics_json = metrics_convert_(); - json addtnl_info = additional_msg_(); - json combined_payload = metrics_json; - for (auto &[key, value]: addtnl_info.items()) { - combined_payload[key] = value; - } - // json metrics_json = {{"waiting_time", "100ms"}}; - json request_json = { - {"operation", "get_environment"}, - {"payload", combined_payload} // Include metrics in the payload - }; - ws_.write(asio::buffer(request_json.dump())); - - // Read response from server +void WebSocketClient::periodic_communication() +{ + try + { + while (!stop_thread_) + { + // Read command from server beast::flat_buffer buffer; ws_.read(buffer); std::string env_response = beast::buffers_to_string(buffer.data()); @@ -89,28 +88,50 @@ void WebSocketClient::periodic_communication() { // Parse and process the response json env_json = json::parse(env_response); - if (env_json["operation"] == "set_environment") { + if (env_json["operation"] == "set_environment") + { json payload = env_json["payload"]; env_convert_(payload); // Process environment data from payload - } else { + } + else + { spdlog::warn("Unexpected operation received: {}", env_json["operation"]); } - // Wait for 1 second before next communication - std::this_thread::sleep_for(std::chrono::seconds(2)); + // Convert metrics to JSON and send it + json metrics_json = metrics_convert_(); + json addtnl_info = additional_msg_(); + json combined_payload = metrics_json; + for (auto &[key, value] : addtnl_info.items()) + { + combined_payload[key] = value; + } + // json metrics_json = {{"waiting_time", "100ms"}}; + json request_json = { + {"operation", "get_environment"}, + {"payload", combined_payload} // Include metrics in the payload + }; + ws_.write(asio::buffer(request_json.dump())); + + // // Wait for 1 second before next communication + // std::this_thread::sleep_for(std::chrono::seconds(2)); active_ = true; } } - catch (const std::exception &e) { + catch (const std::exception &e) + { std::cerr << "Error in periodic communication: " << e.what() << std::endl; } } void WebSocketClient::run(std::function metrics_convert, std::function additional_msg, - std::function env_convert) { - try { + std::function env_convert) +{ + try + { // Wait until the operation has started and acknowledgment is received - while (!operation_started_) { + while (!operation_started_) + { std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait briefly before checking again } @@ -124,21 +145,25 @@ void WebSocketClient::run(std::function metrics_convert, std::function #include -template -class customQueue { +template +class customQueue +{ private: std::mutex d_mutex; std::condition_variable d_condition; @@ -15,30 +16,36 @@ class customQueue { public: explicit customQueue(size_t max_capacity = 0) : capacity(max_capacity) {} - void push(T const &value) { + void push(T const &value) + { { std::unique_lock lock(this->d_mutex); - this->d_space_available.wait(lock, [=] { return capacity == 0 || d_queue.size() < capacity; }); + this->d_space_available.wait(lock, [=] + { return capacity == 0 || d_queue.size() < capacity; }); d_queue.push_front(value); } this->d_condition.notify_all(); } - T pop() { + T pop() + { std::unique_lock lock(this->d_mutex); - this->d_condition.wait(lock, [=] { return !this->d_queue.empty(); }); + this->d_condition.wait(lock, [=] + { return !this->d_queue.empty(); }); T rc(std::move(this->d_queue.back())); this->d_queue.pop_back(); this->d_space_available.notify_all(); // Notify threads waiting for space return rc; } - [[nodiscard]] size_t size() { + [[nodiscard]] size_t size() + { std::unique_lock lock(this->d_mutex); return d_queue.size(); } - void setCapacity(size_t new_capacity) { + void setCapacity(size_t new_capacity) + { { std::unique_lock lock(this->d_mutex); capacity = new_capacity; @@ -47,23 +54,23 @@ class customQueue { } // Get the current capacity - [[nodiscard]] size_t getCapacity() const { + [[nodiscard]] size_t getCapacity() const + { return capacity; } - std::vector copy_newElements() { - static size_t lastCopiedIndex = 0; // Tracks the last copied position - std::vector new_elements; // To store new elements - auto current_index = d_queue.size(); - { - // std::unique_lock lock(this->d_mutex); // Lock for thread safety - if (lastCopiedIndex < - current_index) { // Check if there are new elements - new_elements.assign(d_queue.rbegin(), d_queue.rbegin() + (d_queue.size() - - lastCopiedIndex)); // Reverse copy the new elements - lastCopiedIndex = current_index; // Update the index for the next call - } - } - return new_elements; // Return new elements in reverse order + auto begin() + { + return d_queue.rbegin(); + } + + auto beginFrom(size_t offset) + { + return d_queue.rbegin() + offset; + } + + auto end() + { + return d_queue.rend(); } }; diff --git a/xdbc/metrics_calculator.h b/xdbc/metrics_calculator.h index ee31f97..e1880ec 100644 --- a/xdbc/metrics_calculator.h +++ b/xdbc/metrics_calculator.h @@ -10,7 +10,8 @@ #include // Define the Metrics struct -struct Metrics { +struct Metrics +{ double waiting_time_ms; double processing_time_ms; double overall_time_ms; @@ -24,9 +25,11 @@ struct Metrics { }; // Helper function to calculate standard deviation -double calculate_stddev(const std::vector &values, double mean) { +double calculate_stddev(const std::vector &values, double mean) +{ double sum = 0.0; - for (const auto &value: values) { + for (const auto &value : values) + { sum += (value - mean) * (value - mean); } return std::sqrt(sum / values.size()); @@ -34,23 +37,42 @@ double calculate_stddev(const std::vector &values, double mean) { // Function to calculate metrics per component and then aggregate them std::unordered_map -calculate_metrics(const std::vector ×tamps, size_t buffer_size_kb) { +calculate_metrics(customQueue ×tamps, size_t buffer_size_kb, bool is_latest = false) +{ size_t buffer_size_bytes = buffer_size_kb * 1024; // Convert buffer size to bytes std::unordered_map>> events_per_component_thread; + static size_t lastCopiedIndex = 0; // Tracks the last copied position + if (is_latest == true) + { + // Create a map to keep track of the count of timestamps per component and thread + std::map, int> timestamp_counts; - // Group timestamps by component and thread - for (const auto &ts: timestamps) { - events_per_component_thread[ts.component][ts.thread].push_back(ts); + for (auto it = timestamps.beginFrom(lastCopiedIndex); it != timestamps.end(); ++it) + { + const auto &ts = *it; + events_per_component_thread[ts.component][ts.thread].push_back(ts); + } + lastCopiedIndex = timestamps.size(); + } + else + { + for (auto it = timestamps.begin(); it != timestamps.end(); ++it) + { + const auto &ts = *it; + events_per_component_thread[ts.component][ts.thread].push_back(ts); + } } std::unordered_map component_metrics; // Calculate metrics per component - for (const auto &[component, events_per_thread]: events_per_component_thread) { + for (const auto &[component, events_per_thread] : events_per_component_thread) + { std::vector thread_metrics; size_t total_buffers_processed = 0; - for (const auto &[thread_id, events]: events_per_thread) { + for (const auto &[thread_id, events] : events_per_thread) + { Metrics metrics = {0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0}; std::chrono::high_resolution_clock::time_point start_time, push_time, pop_time, end_time; @@ -65,29 +87,42 @@ calculate_metrics(const std::vector ×tamps, size const auto &first_element = events.front(); // Store the first event in the loop const auto &last_element = events.back(); // Store the last event in the loop - for (const auto &event: events) { - if (event.event == "start") { + for (const auto &event : events) + { + if (event.event == "start") + { start_time = event.timestamp; has_start_time = true; - } else if (event.event == "pop") { + } + else if (event.event == "pop") + { pop_time = event.timestamp; - if (has_push_time) { + if (has_push_time) + { waiting_time += pop_time - push_time; // Waiting time is pop_time - previous push_time - } else if (has_start_time) { + } + else if (has_start_time) + { waiting_time += pop_time - start_time; // Initial waiting time is pop_time - start_time } has_pop_time = true; - } else if (event.event == "push") { + } + else if (event.event == "push") + { push_time = event.timestamp; has_push_time = true; - if (has_pop_time) { + if (has_pop_time) + { processing_time += push_time - pop_time; // Processing time is push_time - pop_time thread_buffers_processed++; } - } else if (event.event == "end") { + } + else if (event.event == "end") + { end_time = event.timestamp; has_end_time = true; - if (has_pop_time) { + if (has_push_time) + { processing_time += end_time - push_time; // Finalize the processing time } } @@ -95,28 +130,37 @@ calculate_metrics(const std::vector ×tamps, size metrics.waiting_time_ms = std::chrono::duration_cast(waiting_time).count(); metrics.processing_time_ms = std::chrono::duration_cast(processing_time).count(); - if (has_end_time && has_start_time) { + if (has_end_time && has_start_time) + { metrics.overall_time_ms = std::chrono::duration_cast( - std::chrono::duration(end_time - start_time)) - .count(); - } else { + std::chrono::duration(end_time - start_time)) + .count(); + } + else + { metrics.overall_time_ms = std::chrono::duration_cast( - std::chrono::duration(last_element.timestamp - first_element.timestamp)) - .count(); + std::chrono::duration(last_element.timestamp - first_element.timestamp)) + .count(); + } + if (metrics.processing_time_ms == 0) + { + metrics.processing_time_ms = 1; } total_buffers_processed += thread_buffers_processed; // Calculate the total throughput in bytes per second for this thread - if (metrics.overall_time_ms > 0) { + if (metrics.overall_time_ms > 0) + { metrics.total_throughput = - (thread_buffers_processed * buffer_size_bytes) / (metrics.overall_time_ms / 1000.0); + (thread_buffers_processed * buffer_size_bytes) / (metrics.overall_time_ms / 1000.0); } // Calculate the per buffer throughput in bytes per second for this thread - if (metrics.processing_time_ms > 0) { + if (metrics.processing_time_ms > 0) + { metrics.per_buffer_throughput = - (thread_buffers_processed * buffer_size_bytes) / (metrics.processing_time_ms / 1000.0); + (thread_buffers_processed * buffer_size_bytes) / (metrics.processing_time_ms / 1000.0); } // Convert throughput to MB/s @@ -130,33 +174,39 @@ calculate_metrics(const std::vector ×tamps, size Metrics aggregated_metrics; size_t num_threads = thread_metrics.size(); aggregated_metrics.waiting_time_ms = std::accumulate(thread_metrics.begin(), thread_metrics.end(), 0.0, - [](const auto &sum, const auto &m) { + [](const auto &sum, const auto &m) + { return sum + m.waiting_time_ms; }) / num_threads; aggregated_metrics.processing_time_ms = std::accumulate(thread_metrics.begin(), thread_metrics.end(), 0.0, - [](const auto &sum, const auto &m) { + [](const auto &sum, const auto &m) + { return sum + m.processing_time_ms; }) / num_threads; aggregated_metrics.overall_time_ms = std::accumulate(thread_metrics.begin(), thread_metrics.end(), 0.0, - [](const auto &sum, const auto &m) { + [](const auto &sum, const auto &m) + { return sum + m.overall_time_ms; }) / num_threads; aggregated_metrics.total_throughput = std::accumulate(thread_metrics.begin(), thread_metrics.end(), 0.0, - [](const auto &sum, const auto &m) { + [](const auto &sum, const auto &m) + { return sum + m.total_throughput; }); aggregated_metrics.per_buffer_throughput = std::accumulate(thread_metrics.begin(), thread_metrics.end(), 0.0, - [](const auto &sum, const auto &m) { + [](const auto &sum, const auto &m) + { return sum + m.per_buffer_throughput; }) / num_threads; // Calculate standard deviations std::vector waiting_times, processing_times, overall_times, total_throughputs, per_buffer_throughputs; - for (const auto &m: thread_metrics) { + for (const auto &m : thread_metrics) + { waiting_times.push_back(m.waiting_time_ms); processing_times.push_back(m.processing_time_ms); overall_times.push_back(m.overall_time_ms); @@ -179,7 +229,8 @@ calculate_metrics(const std::vector ×tamps, size return component_metrics; } -std::tuple printAndReturnAverageLoad(xdbc::RuntimeEnv &_xdbcenv) { +std::tuple printAndReturnAverageLoad(xdbc::RuntimeEnv &_xdbcenv) +{ long long totalTimestamps = 0; size_t totalFreeBufferIdsSize = 0; size_t totalCompressedBufferIdsSize = 0; @@ -189,7 +240,8 @@ std::tuple printAndReturnAverageLoad(xdbc::Runti auto ret = std::tuple(0, 0, 0, 0); - for (const auto &record: _xdbcenv.queueSizes) { + for (const auto &record : _xdbcenv.queueSizes) + { totalTimestamps += std::get<0>(record); totalFreeBufferIdsSize += std::get<1>(record); totalCompressedBufferIdsSize += std::get<2>(record); @@ -197,7 +249,8 @@ std::tuple printAndReturnAverageLoad(xdbc::Runti totalSerializedBufferIdsSize += std::get<4>(record); } - if (recordCount > 0) { + if (recordCount > 0) + { double avgFreeBufferIdsSize = static_cast(totalFreeBufferIdsSize) / recordCount; double avgCompressedBufferIdsSize = static_cast(totalCompressedBufferIdsSize) / recordCount; double avgDecompressedBufferIdsSize = static_cast(totalDecompressedBufferIdsSize) / recordCount; @@ -206,9 +259,10 @@ std::tuple printAndReturnAverageLoad(xdbc::Runti ret = std::tuple(avgFreeBufferIdsSize, avgCompressedBufferIdsSize, avgDecompressedBufferIdsSize, avgSerializedBufferIdsSize); spdlog::get("XDBC.SINK")->info("Average Load of Queues: Free, Decompressor, Serializer, Writer"); - spdlog::get("XDBC.SINK")->info("{0}\t{1}\t{2}\t{3}", avgFreeBufferIdsSize, avgCompressedBufferIdsSize, - avgDecompressedBufferIdsSize, avgSerializedBufferIdsSize); - } else { + spdlog::get("XDBC.SINK")->info("{0}\t{1}\t{2}\t{3}", avgFreeBufferIdsSize, avgCompressedBufferIdsSize, avgDecompressedBufferIdsSize, avgSerializedBufferIdsSize); + } + else + { spdlog::get("XDBC.SINK")->info("No records available to calculate averages."); } diff --git a/xdbc/xclient.cpp b/xdbc/xclient.cpp index 8be3ae3..d5b400e 100644 --- a/xdbc/xclient.cpp +++ b/xdbc/xclient.cpp @@ -74,6 +74,7 @@ namespace xdbc // Unified receive queue _xdbcenv->freeBufferIds = std::make_shared>(); + _xdbcenv->freeBufferIds->setCapacity(_xdbcenv->buffers_in_bufferpool); // Unified decompression queue _xdbcenv->compressedBufferIds = std::make_shared>(); _xdbcenv->compressedBufferIds->setCapacity(queueCapacityPerComp); @@ -94,7 +95,7 @@ namespace xdbc _xdbcenv->freeBufferIds->push(i); } - _xdbcenv->tf_paras.bufProcessed.resize(_xdbcenv->max_threads); + _xdbcenv->tf_paras.bufProcessed.resize(_xdbcenv->max_threads, 0); spdlog::get("XDBC.CLIENT")->info("Initialized queues, " "freeBuffersQ: {0}, " @@ -126,11 +127,8 @@ namespace xdbc _xdbcenv->tf_paras.elapsed_time = static_cast(total_time); spdlog::get("XDBC.CLIENT")->info("Total elapsed time: {0} ms", total_time); - auto pts = std::vector(_xdbcenv->pts->size()); - while (_xdbcenv->pts->size() != 0) - pts.push_back(_xdbcenv->pts->pop()); - - auto component_metrics = calculate_metrics(pts, _xdbcenv->buffer_size); + auto &env_pts = *(_xdbcenv->pts); + auto component_metrics = calculate_metrics(env_pts, _xdbcenv->buffer_size); std::ostringstream totalTimes; std::ostringstream procTimes; std::ostringstream waitingTimes; @@ -281,26 +279,22 @@ namespace xdbc size_t decompressedBufferTotalSize = _xdbcenv->decompressedBufferIds->size(); size_t serializedBufferTotalSize = _xdbcenv->serializedBufferIds->size(); - // size_t freeBufferTotalSize = 0; - // for (auto &queue_ptr: _xdbcenv->freeBufferIds) { - // freeBufferTotalSize += queue_ptr->size(); - // } - - // size_t compressedBufferTotalSize = 0; - // for (auto &queue_ptr: _xdbcenv->compressedBufferIds) { - // compressedBufferTotalSize += queue_ptr->size(); - // } + float freeBufferLoadFloat = (freeBufferTotalSize * 100.0f) / _xdbcenv->freeBufferIds->getCapacity(); + float compressedBufferLoadFloat = (compressedBufferTotalSize * 100.0f) / _xdbcenv->compressedBufferIds->getCapacity(); + float decompressedBufferLoadFloat = (decompressedBufferTotalSize * 100.0f) / _xdbcenv->decompressedBufferIds->getCapacity(); + float serializedBufferLoadFloat = (serializedBufferTotalSize * 100.0f) / _xdbcenv->serializedBufferIds->getCapacity(); - // size_t decompressedBufferTotalSize = 0; - // for (auto &queue_ptr: _xdbcenv->decompressedBufferIds) { - // decompressedBufferTotalSize += queue_ptr->size(); - // } + size_t freeBufferLoad = static_cast(freeBufferLoadFloat); + size_t compressedBufferLoad = static_cast(compressedBufferLoadFloat); + size_t decompressedBufferLoad = static_cast(decompressedBufferLoadFloat); + size_t serializedBufferLoad = static_cast(serializedBufferLoadFloat); // Store the measurement as a tuple _xdbcenv->queueSizes.emplace_back(curTimeInterval, freeBufferTotalSize, compressedBufferTotalSize, decompressedBufferTotalSize, serializedBufferTotalSize); - _xdbcenv->tf_paras.latest_queueSizes = std::make_tuple(freeBufferTotalSize, compressedBufferTotalSize, decompressedBufferTotalSize, serializedBufferTotalSize); + _xdbcenv->tf_paras.latest_queueSizes = std::make_tuple(freeBufferLoad, compressedBufferLoad, + decompressedBufferLoad, serializedBufferLoad); std::this_thread::sleep_for(std::chrono::milliseconds(interval_ms)); curTimeInterval += interval_ms / 1000;