diff --git a/ControllerInterface/WebSocketClient.cpp b/ControllerInterface/WebSocketClient.cpp index 8fd7e2e..9b71390 100644 --- a/ControllerInterface/WebSocketClient.cpp +++ b/ControllerInterface/WebSocketClient.cpp @@ -3,11 +3,13 @@ #include WebSocketClient::WebSocketClient(const std::string &host, const std::string &port) - : host_(host), port_(port), resolver_(ioc_), ws_(ioc_), timer_(ioc_), active_(false), stop_thread_(false), - operation_started_(false) {} + : host_(host), port_(port), resolver_(ioc_), ws_(ioc_), timer_(ioc_), active_(false), stop_thread_(false), + operation_started_(false) {} -void WebSocketClient::start() { - try { +void WebSocketClient::start() +{ + try + { // Resolve host and port auto results = resolver_.resolve(host_, port_); // Connect to the first resolved endpoint @@ -26,62 +28,59 @@ void WebSocketClient::start() { auto start_time = std::chrono::steady_clock::now(); const std::chrono::seconds timeout(10); // Set a timeout duration - while (!acknowledged) { + while (!acknowledged) + { // Check for timeout auto elapsed = std::chrono::steady_clock::now() - start_time; - if (elapsed > timeout) { + if (elapsed > timeout) + { spdlog::error("Timeout waiting for server acknowledgment."); throw std::runtime_error("Server acknowledgment timeout"); } // Attempt to read the acknowledgment - try { + try + { ws_.read(buffer); std::string ack_response = beast::buffers_to_string(buffer.data()); spdlog::info("Received acknowledgment: {}", ack_response); // Parse and check acknowledgment json ack_json = json::parse(ack_response); - if (ack_json["operation"] == "acknowledged") { + if (ack_json["operation"] == "acknowledged") + { acknowledged = true; operation_started_ = true; // Set flag indicating acknowledgment received spdlog::info("Server acknowledged the start request."); - } else { + } + else + { spdlog::warn("Server response does not acknowledge start: {}", ack_json.dump()); // throw std::runtime_error("Server rejected start request"); } } - catch (const std::exception &e) { + catch (const std::exception &e) + { spdlog::error("Error while waiting for acknowledgment: {}", e.what()); // Optional: Retry after a short delay std::this_thread::sleep_for(std::chrono::milliseconds(500)); } } } - catch (const std::exception &e) { + catch (const std::exception &e) + { spdlog::error("WebSocket Client Error during start: {}", e.what()); throw; // Rethrow the exception to notify the caller } } -void WebSocketClient::periodic_communication() { - try { - while (!stop_thread_) { - // Convert metrics to JSON and send it - json metrics_json = metrics_convert_(); - json addtnl_info = additional_msg_(); - json combined_payload = metrics_json; - for (auto &[key, value]: addtnl_info.items()) { - combined_payload[key] = value; - } - // json metrics_json = {{"waiting_time", "100ms"}}; - json request_json = { - {"operation", "get_environment"}, - {"payload", combined_payload} // Include metrics in the payload - }; - ws_.write(asio::buffer(request_json.dump())); - - // Read response from server +void WebSocketClient::periodic_communication() +{ + try + { + while (!stop_thread_) + { + // Read command from server beast::flat_buffer buffer; ws_.read(buffer); std::string env_response = beast::buffers_to_string(buffer.data()); @@ -89,28 +88,49 @@ void WebSocketClient::periodic_communication() { // Parse and process the response json env_json = json::parse(env_response); - if (env_json["operation"] == "set_environment") { + if (env_json["operation"] == "set_environment") + { json payload = env_json["payload"]; env_convert_(payload); // Process environment data from payload - } else { + } + else + { spdlog::warn("Unexpected operation received: {}", env_json["operation"]); } + // Convert metrics to JSON and send it + json metrics_json = metrics_convert_(); + json addtnl_info = additional_msg_(); + json combined_payload = metrics_json; + for (auto &[key, value] : addtnl_info.items()) + { + combined_payload[key] = value; + } + // json metrics_json = {{"waiting_time", "100ms"}}; + json request_json = { + {"operation", "get_environment"}, + {"payload", combined_payload} // Include metrics in the payload + }; + ws_.write(asio::buffer(request_json.dump())); // Wait for 1 second before next communication - std::this_thread::sleep_for(std::chrono::seconds(2)); + // std::this_thread::sleep_for(std::chrono::seconds(2)); active_ = true; } } - catch (const std::exception &e) { + catch (const std::exception &e) + { std::cerr << "Error in periodic communication: " << e.what() << std::endl; } } void WebSocketClient::run(std::function metrics_convert, std::function additional_msg, - std::function env_convert) { - try { + std::function env_convert) +{ + try + { // Wait until the operation has started and acknowledgment is received - while (!operation_started_) { + while (!operation_started_) + { std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait briefly before checking again } @@ -124,21 +144,25 @@ void WebSocketClient::run(std::function metrics_convert, std::function #include -template -class customQueue { +template +class customQueue +{ private: std::mutex d_mutex; std::condition_variable d_condition; @@ -15,30 +16,36 @@ class customQueue { public: explicit customQueue(size_t max_capacity = 0) : capacity(max_capacity) {} - void push(T const &value) { + void push(T const &value) + { { std::unique_lock lock(this->d_mutex); - this->d_space_available.wait(lock, [=] { return capacity == 0 || d_queue.size() < capacity; }); + this->d_space_available.wait(lock, [=] + { return capacity == 0 || d_queue.size() < capacity; }); d_queue.push_front(value); } this->d_condition.notify_all(); } - T pop() { + T pop() + { std::unique_lock lock(this->d_mutex); - this->d_condition.wait(lock, [=] { return !this->d_queue.empty(); }); + this->d_condition.wait(lock, [=] + { return !this->d_queue.empty(); }); T rc(std::move(this->d_queue.back())); this->d_queue.pop_back(); this->d_space_available.notify_all(); // Notify threads waiting for space return rc; } - [[nodiscard]] size_t size() { + [[nodiscard]] size_t size() + { std::unique_lock lock(this->d_mutex); return d_queue.size(); } - void setCapacity(size_t new_capacity) { + void setCapacity(size_t new_capacity) + { { std::unique_lock lock(this->d_mutex); capacity = new_capacity; @@ -47,23 +54,23 @@ class customQueue { } // Get the current capacity - [[nodiscard]] size_t getCapacity() const { + [[nodiscard]] size_t getCapacity() const + { return capacity; } - std::vector copy_newElements() { - static size_t lastCopiedIndex = 0; // Tracks the last copied position - std::vector new_elements; // To store new elements - auto current_index = d_queue.size(); - { - // std::unique_lock lock(this->d_mutex); // Lock for thread safety - if (lastCopiedIndex < - current_index) { // Check if there are new elements - new_elements.assign(d_queue.rbegin(), d_queue.rbegin() + (d_queue.size() - - lastCopiedIndex)); // Reverse copy the new elements - lastCopiedIndex = current_index; // Update the index for the next call - } - } - return new_elements; // Return new elements in reverse order + auto begin() + { + return d_queue.rbegin(); + } + + auto beginFrom(size_t offset) + { + return d_queue.rbegin() + offset; + } + + auto end() + { + return d_queue.rend(); } -}; \ No newline at end of file +}; diff --git a/main.cpp b/main.cpp index 27048d7..d3b477b 100755 --- a/main.cpp +++ b/main.cpp @@ -13,21 +13,22 @@ using namespace std; namespace po = boost::program_options; -void handleCMDParams(int ac, char *av[], RuntimeEnv &env) { +void handleCMDParams(int ac, char *av[], RuntimeEnv &env) +{ // Declare the supported options. po::options_description desc("Usage: ./xdbc-server [options]\n\nAllowed options"); desc.add_options()("help,h", "Produce this help message.")("system,y", po::value()->default_value("csv"), "Set system: \nDefault:\n csv\nOther:\n postgres, clickhouse")( - "compression-type,c", po::value()->default_value("nocomp"), - "Set Compression algorithm: \nDefault:\n nocomp\nOther:\n zstd\n snappy\n lzo\n lz4\n zlib\n cols")( - "intermediate-format,f", po::value()->default_value(1), - "Set intermediate-format: \nDefault:\n 1 (row)\nOther:\n 2 (col)")("buffer-size,b", - po::value()->default_value(64), - "Set buffer-size of buffers (in KiB).\nDefault: 64")( - "bufferpool-size,p", po::value()->default_value(4096), - "Set bufferpool memory size (in KiB).\nDefault: 4096") - //("tuple-size,t", po::value()->default_value(48), "Set the tuple size.\nDefault: 48") - ("sleep-time,s", po::value()->default_value(5), "Set a sleep-time in milli seconds.\nDefault: 5ms")( + "compression-type,c", po::value()->default_value("nocomp"), + "Set Compression algorithm: \nDefault:\n nocomp\nOther:\n zstd\n snappy\n lzo\n lz4\n zlib\n cols")( + "intermediate-format,f", po::value()->default_value(1), + "Set intermediate-format: \nDefault:\n 1 (row)\nOther:\n 2 (col)")("buffer-size,b", + po::value()->default_value(64), + "Set buffer-size of buffers (in KiB).\nDefault: 64")( + "bufferpool-size,p", po::value()->default_value(4096), + "Set bufferpool memory size (in KiB).\nDefault: 4096") + //("tuple-size,t", po::value()->default_value(48), "Set the tuple size.\nDefault: 48") + ("sleep-time,s", po::value()->default_value(5), "Set a sleep-time in milli seconds.\nDefault: 5ms")( "read-parallelism,rp", po::value()->default_value(4), "Set the read parallelism grade.\nDefault: 4")( "read-partitions,rpp", po::value()->default_value(1), "Set the number of read partitions.\nDefault: 1")("deser-parallelism,dp", @@ -41,7 +42,7 @@ void handleCMDParams(int ac, char *av[], RuntimeEnv &env) { "Set the transfer id.\nDefault: 0")("profiling-interval", po::value()->default_value(1000), "Set profiling interval.\nDefault: 1000")("skip-deserializer", po::value()->default_value( - false), + false), "Skip deserialization (0/1).\nDefault: false")( "spawn-source", po::value()->default_value(0), "Set spawn source (0 or 1).\nDefault: 0"); @@ -53,30 +54,36 @@ void handleCMDParams(int ac, char *av[], RuntimeEnv &env) { po::store(po::command_line_parser(ac, av).options(desc).positional(p).run(), vm); po::notify(vm); - if (vm.count("help")) { + if (vm.count("help")) + { cout << desc << "\n"; exit(0); } - if (vm.count("system")) { + if (vm.count("system")) + { spdlog::get("XDBC.SERVER")->info("system: {0}", vm["system"].as()); env.system = vm["system"].as(); } - if (vm.count("intermediate-format")) { + if (vm.count("intermediate-format")) + { spdlog::get("XDBC.SERVER")->info("Intermediate format: {0}", vm["intermediate-format"].as()); env.iformat = vm["intermediate-format"].as(); } - if (vm.count("compression-type")) { + if (vm.count("compression-type")) + { spdlog::get("XDBC.SERVER")->info("Compression algorithm: {0}", vm["compression-type"].as()); env.compression_algorithm = vm["compression-type"].as(); } - if (vm.count("buffer-size")) { + if (vm.count("buffer-size")) + { spdlog::get("XDBC.SERVER")->info("Buffer-size: {0} KiB", vm["buffer-size"].as()); env.buffer_size = vm["buffer-size"].as(); } - if (vm.count("bufferpool-size")) { + if (vm.count("bufferpool-size")) + { spdlog::get("XDBC.SERVER")->info("Bufferpool-size: {0} KiB", vm["bufferpool-size"].as()); env.buffers_in_bufferpool = vm["bufferpool-size"].as() / vm["buffer-size"].as(); spdlog::get("XDBC.SERVER")->info("Buffers in Bufferpool: {0}", env.buffers_in_bufferpool); @@ -85,43 +92,53 @@ void handleCMDParams(int ac, char *av[], RuntimeEnv &env) { spdlog::get("XDBC.SERVER")->info("Tuple size: {0}", vm["tuple-size"].as()); env.tuple_size = vm["tuple-size"].as(); }*/ - if (vm.count("sleep-time")) { + if (vm.count("sleep-time")) + { spdlog::get("XDBC.SERVER")->info("Sleep time: {0}ms", vm["sleep-time"].as()); env.sleep_time = std::chrono::milliseconds(vm["sleep-time"].as()); } - if (vm.count("read-parallelism")) { + if (vm.count("read-parallelism")) + { spdlog::get("XDBC.SERVER")->info("Read parallelism: {0}", vm["read-parallelism"].as()); env.read_parallelism = vm["read-parallelism"].as(); } - if (vm.count("read-partitions")) { + if (vm.count("read-partitions")) + { spdlog::get("XDBC.SERVER")->info("Read partitions: {0}", vm["read-partitions"].as()); env.read_partitions = vm["read-partitions"].as(); } - if (vm.count("network-parallelism")) { + if (vm.count("network-parallelism")) + { spdlog::get("XDBC.SERVER")->info("Network parallelism: {0}", vm["network-parallelism"].as()); env.network_parallelism = vm["network-parallelism"].as(); } - if (vm.count("deser-parallelism")) { + if (vm.count("deser-parallelism")) + { spdlog::get("XDBC.SERVER")->info("Deserialization parallelism: {0}", vm["deser-parallelism"].as()); env.deser_parallelism = vm["deser-parallelism"].as(); } - if (vm.count("compression-parallelism")) { + if (vm.count("compression-parallelism")) + { spdlog::get("XDBC.SERVER")->info("Compression parallelism: {0}", vm["compression-parallelism"].as()); env.compression_parallelism = vm["compression-parallelism"].as(); } - if (vm.count("transfer-id")) { + if (vm.count("transfer-id")) + { spdlog::get("XDBC.SERVER")->info("Transfer id: {0}", vm["transfer-id"].as()); env.transfer_id = vm["transfer-id"].as(); } - if (vm.count("profiling-interval")) { + if (vm.count("profiling-interval")) + { spdlog::get("XDBC.SERVER")->info("Profiling interval: {0}", vm["profiling-interval"].as()); env.profilingInterval = vm["profiling-interval"].as(); } - if (vm.count("skip-deserializer")) { + if (vm.count("skip-deserializer")) + { spdlog::get("XDBC.SERVER")->info("Skip serializer: {0}", vm["skip-deserializer"].as()); env.skip_deserializer = vm["skip-deserializer"].as(); } - if (vm.count("spawn-source")) { + if (vm.count("spawn-source")) + { spdlog::get("XDBC.SERVER")->info("Spawn source: {0}", vm["spawn-source"].as()); env.spawn_source = vm["spawn-source"].as(); } @@ -131,16 +148,18 @@ void handleCMDParams(int ac, char *av[], RuntimeEnv &env) { env.max_threads = env.buffers_in_bufferpool; } -nlohmann::json metrics_convert(RuntimeEnv &env) { +nlohmann::json metrics_convert(RuntimeEnv &env) +{ nlohmann::json metrics_json = nlohmann::json::object(); // Use a JSON object // auto env_pts = env->pts->copyAll(); - if ((env.pts) && (env.enable_updation_DS == 1) && (env.enable_updation_xServe == 1)) { - std::vector env_pts; - env_pts = env.pts->copy_newElements(); - auto component_metrics_ = calculate_metrics(env_pts, env.buffer_size); + if ((env.pts) && (env.enable_updation_DS == 1) && (env.enable_updation_xServe == 1)) + { + auto &env_pts = *(env.pts); + auto component_metrics_ = calculate_metrics(env_pts, env.buffer_size, true); - for (const auto &pair: component_metrics_) { + for (const auto &pair : component_metrics_) + { nlohmann::json metric_object = nlohmann::json::object(); const Metrics &metric = pair.second; @@ -157,10 +176,12 @@ nlohmann::json metrics_convert(RuntimeEnv &env) { return metrics_json; } -nlohmann::json additional_msg(RuntimeEnv &env) { +nlohmann::json additional_msg(RuntimeEnv &env) +{ nlohmann::json metrics_json = nlohmann::json::object(); // Use a JSON object metrics_json["totalTime_ms"] = env.tf_paras.elapsed_time; - if ((env.enable_updation_DS == 1) && (env.enable_updation_xServe == 1)) { + if (env.enable_updation_DS == 1) + { metrics_json["readBufferQ_load"] = std::get<0>(env.tf_paras.latest_queueSizes); metrics_json["deserializedBufferQ_load"] = std::get<1>(env.tf_paras.latest_queueSizes); metrics_json["compressedBufferQ_load"] = std::get<2>(env.tf_paras.latest_queueSizes); @@ -169,30 +190,36 @@ nlohmann::json additional_msg(RuntimeEnv &env) { return metrics_json; } -void env_convert(RuntimeEnv &env, const nlohmann::json &env_json) { - try { +void env_convert(RuntimeEnv &env, const nlohmann::json &env_json) +{ + try + { // env.buffer_size = std::stoi(env_json.at("bufferSize").get()); // env.buffers_in_bufferpool = std::stoi(env_json.at("bufferpoolSize").get()) / env_.buffer_size; // env.read_parallelism = std::stoi(env_json.at("readParallelism").get()); // env.network_parallelism = std::stoi(env_json.at("netParallelism").get()); - if (env.enable_updation_DS == 1) { + if (env.enable_updation_DS == 1) + { env.read_parallelism = std::stoi(env_json.at("readParallelism").get()); env.deser_parallelism = std::stoi(env_json.at("deserParallelism").get()); env.env_manager_DS.configureThreads("deserialize", env.deser_parallelism); env.env_manager_DS.configureThreads("read", env.read_parallelism); } - if (env.enable_updation_xServe == 1) { + if (env.enable_updation_xServe == 1) + { env.compression_parallelism = std::stoi(env_json.at("compParallelism").get()); env.env_manager_xServer.configureThreads("compress", env.compression_parallelism); } } - catch (const std::exception &e) { + catch (const std::exception &e) + { std::cerr << "Error converting env JSON: " << e.what() << std::endl; } } -int main(int argc, char *argv[]) { +int main(int argc, char *argv[]) +{ auto console = spdlog::stdout_color_mt("XDBC.SERVER"); @@ -202,14 +229,15 @@ int main(int argc, char *argv[]) { // ***Setup websocket interface for controller*** std::thread io_thread; WebSocketClient ws_client("xdbc-controller", "8003"); - if (xdbcEnv.spawn_source == 1) { + if (xdbcEnv.spawn_source == 1) + { ws_client.start(); - io_thread = std::thread([&]() { - ws_client.run( - std::bind(&metrics_convert, std::ref(xdbcEnv)), std::bind(&additional_msg, std::ref(xdbcEnv)), - std::bind(&env_convert, std::ref(xdbcEnv), std::placeholders::_1)); - }); - while (!ws_client.is_active()) { + io_thread = std::thread([&]() + { ws_client.run( + std::bind(&metrics_convert, std::ref(xdbcEnv)), std::bind(&additional_msg, std::ref(xdbcEnv)), + std::bind(&env_convert, std::ref(xdbcEnv), std::placeholders::_1)); }); + while (!ws_client.is_active()) + { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } @@ -225,20 +253,20 @@ int main(int argc, char *argv[]) { xdbcEnv.tf_paras.elapsed_time = static_cast(total_time); spdlog::get("XDBC.SERVER")->info("Total elapsed time: {} ms", total_time); - auto pts = std::vector(xdbcEnv.pts->size()); - while (xdbcEnv.pts->size() != 0) - pts.push_back(xdbcEnv.pts->pop()); + auto &env_pts = *(xdbcEnv.pts); + auto component_metrics = calculate_metrics(env_pts, xdbcEnv.buffer_size); - auto component_metrics = calculate_metrics(pts, xdbcEnv.buffer_size); std::ostringstream totalTimes; std::ostringstream procTimes; std::ostringstream waitingTimes; std::ostringstream totalThroughput; std::ostringstream perBufferThroughput; - for (const auto &[component, metrics]: component_metrics) { + for (const auto &[component, metrics] : component_metrics) + { - if (!component.empty()) { + if (!component.empty()) + { totalTimes << component << ":\t" << metrics.overall_time_ms << "ms, "; procTimes << component << ":\t" << metrics.processing_time_ms << "ms, "; waitingTimes << component << ":\t" << metrics.waiting_time_ms << "ms, "; @@ -247,9 +275,7 @@ int main(int argc, char *argv[]) { } } - spdlog::get("XDBC.SERVER")->info("xdbc server | \n all:\t {} \n proc:\t{} \n wait:\t{} \n thr:\t {} \n thr/b:\t {}", - totalTimes.str(), procTimes.str(), waitingTimes.str(), totalThroughput.str(), - perBufferThroughput.str()); + spdlog::get("XDBC.SERVER")->info("xdbc server | \n all:\t {} \n proc:\t{} \n wait:\t{} \n thr:\t {} \n thr/b:\t {}", totalTimes.str(), procTimes.str(), waitingTimes.str(), totalThroughput.str(), perBufferThroughput.str()); auto loads = printAndReturnAverageLoad(xdbcEnv); @@ -296,9 +322,11 @@ int main(int argc, char *argv[]) { << std::get<3>(loads) << "\n"; csv_file.close(); - if (xdbcEnv.spawn_source == 1) { + if (xdbcEnv.spawn_source == 1) + { ws_client.stop(); - if (io_thread.joinable()) { + if (io_thread.joinable()) + { io_thread.join(); } } diff --git a/metrics_calculator.h b/metrics_calculator.h index 9d37854..654dbb4 100644 --- a/metrics_calculator.h +++ b/metrics_calculator.h @@ -10,7 +10,8 @@ #include // Define the Metrics struct -struct Metrics { +struct Metrics +{ double waiting_time_ms; double processing_time_ms; double overall_time_ms; @@ -24,33 +25,53 @@ struct Metrics { }; // Helper function to calculate standard deviation -double calculate_stddev(const std::vector &values, double mean) { +double calculate_stddev(const std::vector &values, double mean) +{ double sum = 0.0; - for (const auto &value: values) { + for (const auto &value : values) + { sum += (value - mean) * (value - mean); } return std::sqrt(sum / values.size()); } -// Function to calculate metrics per component and then aggregate them std::unordered_map -calculate_metrics(const std::vector ×tamps, size_t buffer_size_kb) { +calculate_metrics(customQueue ×tamps, size_t buffer_size_kb, bool is_latest = false) +{ size_t buffer_size_bytes = buffer_size_kb * 1024; // Convert buffer size to bytes std::unordered_map>> events_per_component_thread; + static size_t lastCopiedIndex = 0; // Tracks the last copied position + if (is_latest == true) + { + // Create a map to keep track of the count of timestamps per component and thread + std::map, int> timestamp_counts; - // Group timestamps by component and thread - for (const auto &ts: timestamps) { - events_per_component_thread[ts.component][ts.thread].push_back(ts); + for (auto it = timestamps.beginFrom(lastCopiedIndex); it != timestamps.end(); ++it) + { + const auto &ts = *it; + events_per_component_thread[ts.component][ts.thread].push_back(ts); + } + lastCopiedIndex = timestamps.size(); + } + else + { + for (auto it = timestamps.begin(); it != timestamps.end(); ++it) + { + const auto &ts = *it; + events_per_component_thread[ts.component][ts.thread].push_back(ts); + } } std::unordered_map component_metrics; // Calculate metrics per component - for (const auto &[component, events_per_thread]: events_per_component_thread) { + for (const auto &[component, events_per_thread] : events_per_component_thread) + { std::vector thread_metrics; size_t total_buffers_processed = 0; - for (const auto &[thread_id, events]: events_per_thread) { + for (const auto &[thread_id, events] : events_per_thread) + { Metrics metrics = {0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0}; std::chrono::high_resolution_clock::time_point start_time, push_time, pop_time, end_time; @@ -65,29 +86,42 @@ calculate_metrics(const std::vector ×tamps, size_t buf const auto &first_element = events.front(); // Store the first event in the loop const auto &last_element = events.back(); // Store the last event in the loop - for (const auto &event: events) { - if (event.event == "start") { + for (const auto &event : events) + { + if (event.event == "start") + { start_time = event.timestamp; has_start_time = true; - } else if (event.event == "pop") { + } + else if (event.event == "pop") + { pop_time = event.timestamp; - if (has_push_time) { + if (has_push_time) + { waiting_time += pop_time - push_time; // Waiting time is pop_time - previous push_time - } else if (has_start_time) { + } + else if (has_start_time) + { waiting_time += pop_time - start_time; // Initial waiting time is pop_time - start_time } has_pop_time = true; - } else if (event.event == "push") { + } + else if (event.event == "push") + { push_time = event.timestamp; has_push_time = true; - if (has_pop_time) { + if (has_pop_time) + { processing_time += push_time - pop_time; // Processing time is push_time - pop_time thread_buffers_processed++; } - } else if (event.event == "end") { + } + else if (event.event == "end") + { end_time = event.timestamp; has_end_time = true; - if (has_pop_time) { + if (has_push_time) + { processing_time += end_time - push_time; // Finalize the processing time } } @@ -95,27 +129,37 @@ calculate_metrics(const std::vector ×tamps, size_t buf metrics.waiting_time_ms = std::chrono::duration_cast(waiting_time).count(); metrics.processing_time_ms = std::chrono::duration_cast(processing_time).count(); - if (has_end_time && has_start_time) { + if (has_end_time && has_start_time) + { metrics.overall_time_ms = std::chrono::duration_cast( - std::chrono::duration(end_time - start_time)) - .count(); - } else { + std::chrono::duration(end_time - start_time)) + .count(); + } + else + { metrics.overall_time_ms = std::chrono::duration_cast( - std::chrono::duration(last_element.timestamp - first_element.timestamp)) - .count(); + std::chrono::duration(last_element.timestamp - first_element.timestamp)) + .count(); + } + if (metrics.processing_time_ms == 0) + { + metrics.processing_time_ms = 1; } + total_buffers_processed += thread_buffers_processed; // Calculate the total throughput in bytes per second for this thread - if (metrics.overall_time_ms > 0) { + if (metrics.overall_time_ms > 0) + { metrics.total_throughput = - (thread_buffers_processed * buffer_size_bytes) / (metrics.overall_time_ms / 1000.0); + (thread_buffers_processed * buffer_size_bytes) / (metrics.overall_time_ms / 1000.0); } // Calculate the per buffer throughput in bytes per second for this thread - if (metrics.processing_time_ms > 0) { + if (metrics.processing_time_ms > 0) + { metrics.per_buffer_throughput = - (thread_buffers_processed * buffer_size_bytes) / (metrics.processing_time_ms / 1000.0); + (thread_buffers_processed * buffer_size_bytes) / (metrics.processing_time_ms / 1000.0); } // Convert throughput to MB/s @@ -129,33 +173,39 @@ calculate_metrics(const std::vector ×tamps, size_t buf Metrics aggregated_metrics; size_t num_threads = thread_metrics.size(); aggregated_metrics.waiting_time_ms = std::accumulate(thread_metrics.begin(), thread_metrics.end(), 0.0, - [](const auto &sum, const auto &m) { + [](const auto &sum, const auto &m) + { return sum + m.waiting_time_ms; }) / num_threads; aggregated_metrics.processing_time_ms = std::accumulate(thread_metrics.begin(), thread_metrics.end(), 0.0, - [](const auto &sum, const auto &m) { + [](const auto &sum, const auto &m) + { return sum + m.processing_time_ms; }) / num_threads; aggregated_metrics.overall_time_ms = std::accumulate(thread_metrics.begin(), thread_metrics.end(), 0.0, - [](const auto &sum, const auto &m) { + [](const auto &sum, const auto &m) + { return sum + m.overall_time_ms; }) / num_threads; aggregated_metrics.total_throughput = std::accumulate(thread_metrics.begin(), thread_metrics.end(), 0.0, - [](const auto &sum, const auto &m) { + [](const auto &sum, const auto &m) + { return sum + m.total_throughput; }); aggregated_metrics.per_buffer_throughput = std::accumulate(thread_metrics.begin(), thread_metrics.end(), 0.0, - [](const auto &sum, const auto &m) { + [](const auto &sum, const auto &m) + { return sum + m.per_buffer_throughput; }) / num_threads; // Calculate standard deviations std::vector waiting_times, processing_times, overall_times, total_throughputs, per_buffer_throughputs; - for (const auto &m: thread_metrics) { + for (const auto &m : thread_metrics) + { waiting_times.push_back(m.waiting_time_ms); processing_times.push_back(m.processing_time_ms); overall_times.push_back(m.overall_time_ms); @@ -178,7 +228,8 @@ calculate_metrics(const std::vector ×tamps, size_t buf return component_metrics; } -std::tuple printAndReturnAverageLoad(RuntimeEnv &_xdbcenv) { +std::tuple printAndReturnAverageLoad(RuntimeEnv &_xdbcenv) +{ long long totalTimestamps = 0; size_t totalReadBufferIdsSize = 0; size_t totalDeserBufferIdsSize = 0; @@ -187,7 +238,8 @@ std::tuple printAndReturnAverageLoad(RuntimeEnv size_t recordCount = _xdbcenv.queueSizes.size(); std::tuple ret(0, 0, 0, 0); - for (const auto &record: _xdbcenv.queueSizes) { + for (const auto &record : _xdbcenv.queueSizes) + { totalTimestamps += std::get<0>(record); totalReadBufferIdsSize += std::get<1>(record); totalDeserBufferIdsSize += std::get<2>(record); @@ -195,7 +247,8 @@ std::tuple printAndReturnAverageLoad(RuntimeEnv totalNetworkBufferIdsSize += std::get<4>(record); } - if (recordCount > 0) { + if (recordCount > 0) + { double avgReadBufferIdsSize = static_cast(totalReadBufferIdsSize) / recordCount; double avgDeserBufferIdsSize = static_cast(totalDeserBufferIdsSize) / recordCount; double avgCompressedBufferIdsSize = static_cast(totalDeserBufferIdsSize) / recordCount; @@ -205,9 +258,10 @@ std::tuple printAndReturnAverageLoad(RuntimeEnv avgNetworkBufferIdsSize); spdlog::get("XDBC.SERVER")->info("Average Load of Queues: Reader, Deserializer, Compressor, Sender"); - spdlog::get("XDBC.SERVER")->info("{0}\t{1}\t{2}\t{3}", avgReadBufferIdsSize, avgDeserBufferIdsSize, - avgCompressedBufferIdsSize, avgNetworkBufferIdsSize); - } else { + spdlog::get("XDBC.SERVER")->info("{0}\t{1}\t{2}\t{3}", avgReadBufferIdsSize, avgDeserBufferIdsSize, avgCompressedBufferIdsSize, avgNetworkBufferIdsSize); + } + else + { spdlog::get("XDBC.SERVER")->info("No records available to calculate averages."); } diff --git a/xdbcserver.cpp b/xdbcserver.cpp index 4707a3f..a39bc4b 100755 --- a/xdbcserver.cpp +++ b/xdbcserver.cpp @@ -70,7 +70,7 @@ XDBCServer::XDBCServer(RuntimeEnv &xdbcEnv) // initialize free queue and partition queue xdbcEnv.freeBufferPtr = std::make_shared>(); xdbcEnv.readPartPtr = std::make_shared>(); - + xdbcEnv.freeBufferPtr->setCapacity(xdbcEnv.buffers_in_bufferpool); // initially all buffers are put in the free buffer queue for (int i = 0; i < xdbcEnv.buffers_in_bufferpool; i++) xdbcEnv.freeBufferPtr->push(i); @@ -150,10 +150,20 @@ void XDBCServer::monitorQueues() size_t compressedBufferTotalSize = xdbcEnv->compBufferPtr->size(); size_t sendBufferTotalSize = xdbcEnv->sendBufferPtr->size(); + float readBufferLoadFloat = (readBufferTotalSize * 100.0f) / xdbcEnv->freeBufferPtr->getCapacity(); + float deserBufferLoadFloat = (deserBufferTotalSize * 100.0f) / xdbcEnv->deserBufferPtr->getCapacity(); + float compressedBufferLoadFloat = (compressedBufferTotalSize * 100.0f) / xdbcEnv->compBufferPtr->getCapacity(); + float sendBufferLoadFLoat = (sendBufferTotalSize * 100.0f) / xdbcEnv->sendBufferPtr->getCapacity(); + + size_t readBufferLoad = static_cast(readBufferLoadFloat); + size_t deserBufferLoad = static_cast(deserBufferLoadFloat); + size_t compressedBufferLoad = static_cast(compressedBufferLoadFloat); + size_t sendBufferLoad = static_cast(sendBufferLoadFLoat); + // Store the measurement as a tuple xdbcEnv->queueSizes.emplace_back(curTimeInterval, readBufferTotalSize, deserBufferTotalSize, compressedBufferTotalSize, sendBufferTotalSize); - xdbcEnv->tf_paras.latest_queueSizes = std::make_tuple(readBufferTotalSize, deserBufferTotalSize, compressedBufferTotalSize, sendBufferTotalSize); + xdbcEnv->tf_paras.latest_queueSizes = std::make_tuple(readBufferLoad, deserBufferLoad, compressedBufferLoad, sendBufferLoad); std::this_thread::sleep_for(std::chrono::milliseconds(xdbcEnv->profilingInterval)); curTimeInterval += xdbcEnv->profilingInterval / 1000; diff --git a/xdbcserver.h b/xdbcserver.h index fe26b61..11f15b7 100755 --- a/xdbcserver.h +++ b/xdbcserver.h @@ -1,7 +1,6 @@ #ifndef XDBCSERVER_H #define XDBCSERVER_H - #include #include #include @@ -15,7 +14,8 @@ using namespace boost::asio; using ip::tcp; constexpr size_t MAX_ATTRIBUTES = 230; -struct Header { +struct Header +{ size_t compressionType; size_t totalSize; @@ -25,10 +25,10 @@ struct Header { size_t crc; size_t attributeSize[MAX_ATTRIBUTES]; size_t attributeComp[MAX_ATTRIBUTES]; - }; -class XDBCServer { +class XDBCServer +{ public: explicit XDBCServer(RuntimeEnv &env); @@ -46,5 +46,4 @@ class XDBCServer { void monitorQueues(); }; - -#endif //XDBCSERVER_H +#endif // XDBCSERVER_H