Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Sinks/CSVSink/CSVSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ void CsvSink::write(int thr)

runtimeEnv->freeBufferIds->push(bufferId);
buffersWritten++;
runtimeEnv->tf_paras.bufProcessed.at(thr) = buffersWritten;
runtimeEnv->tf_paras.bufProcessed.at(thr)++;
}

outputFile.close();
Expand Down
578 changes: 288 additions & 290 deletions Sinks/main.cpp

Large diffs are not rendered by default.

117 changes: 72 additions & 45 deletions xdbc/ControllerInterface/WebSocketClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
#include <thread>

WebSocketClient::WebSocketClient(const std::string &host, const std::string &port)
: host_(host), port_(port), resolver_(ioc_), ws_(ioc_), timer_(ioc_), active_(false), stop_thread_(false),
operation_started_(false) {}
: host_(host), port_(port), resolver_(ioc_), ws_(ioc_), timer_(ioc_), active_(false), stop_thread_(false),
operation_started_(false) {}

void WebSocketClient::start() {
try {
void WebSocketClient::start()
{
try
{
// Resolve host and port
auto results = resolver_.resolve(host_, port_);
// Connect to the first resolved endpoint
Expand All @@ -26,91 +28,110 @@ void WebSocketClient::start() {
auto start_time = std::chrono::steady_clock::now();
const std::chrono::seconds timeout(10); // Set a timeout duration

while (!acknowledged) {
while (!acknowledged)
{
// Check for timeout
auto elapsed = std::chrono::steady_clock::now() - start_time;
if (elapsed > timeout) {
if (elapsed > timeout)
{
spdlog::error("Timeout waiting for server acknowledgment.");
throw std::runtime_error("Server acknowledgment timeout");
}

// Attempt to read the acknowledgment
try {
try
{
ws_.read(buffer);
std::string ack_response = beast::buffers_to_string(buffer.data());
spdlog::info("Received acknowledgment: {}", ack_response);

// Parse and check acknowledgment
json ack_json = json::parse(ack_response);
if (ack_json["operation"] == "acknowledged") {
if (ack_json["operation"] == "acknowledged")
{
acknowledged = true;
operation_started_ = true; // Set flag indicating acknowledgment received
spdlog::info("Server acknowledged the start request.");
} else {
}
else
{
spdlog::warn("Server response does not acknowledge start: {}", ack_json.dump());
// throw std::runtime_error("Server rejected start request");
}
}
catch (const std::exception &e) {
catch (const std::exception &e)
{
spdlog::error("Error while waiting for acknowledgment: {}", e.what());
// Optional: Retry after a short delay
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
}
catch (const std::exception &e) {
catch (const std::exception &e)
{
spdlog::error("WebSocket Client Error during start: {}", e.what());
throw; // Rethrow the exception to notify the caller
}
}

void WebSocketClient::periodic_communication() {
try {
while (!stop_thread_) {
// Convert metrics to JSON and send it
json metrics_json = metrics_convert_();
json addtnl_info = additional_msg_();
json combined_payload = metrics_json;
for (auto &[key, value]: addtnl_info.items()) {
combined_payload[key] = value;
}
// json metrics_json = {{"waiting_time", "100ms"}};
json request_json = {
{"operation", "get_environment"},
{"payload", combined_payload} // Include metrics in the payload
};
ws_.write(asio::buffer(request_json.dump()));

// Read response from server
void WebSocketClient::periodic_communication()
{
try
{
while (!stop_thread_)
{
// Read command from server
beast::flat_buffer buffer;
ws_.read(buffer);
std::string env_response = beast::buffers_to_string(buffer.data());
////spdlog::info("Received JSON: {}", env_response); // Log the received JSON

// Parse and process the response
json env_json = json::parse(env_response);
if (env_json["operation"] == "set_environment") {
if (env_json["operation"] == "set_environment")
{
json payload = env_json["payload"];
env_convert_(payload); // Process environment data from payload
} else {
}
else
{
spdlog::warn("Unexpected operation received: {}", env_json["operation"]);
}

// Wait for 1 second before next communication
std::this_thread::sleep_for(std::chrono::seconds(2));
// Convert metrics to JSON and send it
json metrics_json = metrics_convert_();
json addtnl_info = additional_msg_();
json combined_payload = metrics_json;
for (auto &[key, value] : addtnl_info.items())
{
combined_payload[key] = value;
}
// json metrics_json = {{"waiting_time", "100ms"}};
json request_json = {
{"operation", "get_environment"},
{"payload", combined_payload} // Include metrics in the payload
};
ws_.write(asio::buffer(request_json.dump()));

// // Wait for 1 second before next communication
// std::this_thread::sleep_for(std::chrono::seconds(2));
active_ = true;
}
}
catch (const std::exception &e) {
catch (const std::exception &e)
{
std::cerr << "Error in periodic communication: " << e.what() << std::endl;
}
}

void WebSocketClient::run(std::function<json()> metrics_convert, std::function<json()> additional_msg,
std::function<void(const json &)> env_convert) {
try {
std::function<void(const json &)> env_convert)
{
try
{
// Wait until the operation has started and acknowledgment is received
while (!operation_started_) {
while (!operation_started_)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait briefly before checking again
}

Expand All @@ -124,21 +145,25 @@ void WebSocketClient::run(std::function<json()> metrics_convert, std::function<j

ioc_.run(); // Start processing asynchronous operations
}
catch (const std::exception &e) {
catch (const std::exception &e)
{
std::cerr << "Error in io_context run: " << e.what() << std::endl;
}
}

void WebSocketClient::stop() {
void WebSocketClient::stop()
{
stop_thread_ = true; // Signal thread to stop

try {
if (ws_.is_open()) {
try
{
if (ws_.is_open())
{
// Send the "finished" message to the server
json stop_payload = additional_msg_();
json stop_message = {
{"operation", "transfer_finished"},
{"payload", stop_payload} // Include metrics in the payload
{"operation", "transfer_finished"},
{"payload", stop_payload} // Include metrics in the payload
};
ws_.write(asio::buffer(stop_message.dump()));
spdlog::info("Sent stop message: {}", stop_message.dump());
Expand All @@ -148,12 +173,14 @@ void WebSocketClient::stop() {
}
ioc_.stop(); // Stop the io_context loop
}
catch (const std::exception &e) {
catch (const std::exception &e)
{
std::cerr << "Error during stop: " << e.what() << std::endl;
}
}

// Check if the WebSocket client is active
bool WebSocketClient::is_active() const {
bool WebSocketClient::is_active() const
{
return active_.load();
}
53 changes: 30 additions & 23 deletions xdbc/customQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
#include <condition_variable>
#include <deque>

template<typename T>
class customQueue {
template <typename T>
class customQueue
{
private:
std::mutex d_mutex;
std::condition_variable d_condition;
Expand All @@ -15,30 +16,36 @@ class customQueue {
public:
explicit customQueue(size_t max_capacity = 0) : capacity(max_capacity) {}

void push(T const &value) {
void push(T const &value)
{
{
std::unique_lock<std::mutex> lock(this->d_mutex);
this->d_space_available.wait(lock, [=] { return capacity == 0 || d_queue.size() < capacity; });
this->d_space_available.wait(lock, [=]
{ return capacity == 0 || d_queue.size() < capacity; });
d_queue.push_front(value);
}
this->d_condition.notify_all();
}

T pop() {
T pop()
{
std::unique_lock<std::mutex> lock(this->d_mutex);
this->d_condition.wait(lock, [=] { return !this->d_queue.empty(); });
this->d_condition.wait(lock, [=]
{ return !this->d_queue.empty(); });
T rc(std::move(this->d_queue.back()));
this->d_queue.pop_back();
this->d_space_available.notify_all(); // Notify threads waiting for space
return rc;
}

[[nodiscard]] size_t size() {
[[nodiscard]] size_t size()
{
std::unique_lock<std::mutex> lock(this->d_mutex);
return d_queue.size();
}

void setCapacity(size_t new_capacity) {
void setCapacity(size_t new_capacity)
{
{
std::unique_lock<std::mutex> lock(this->d_mutex);
capacity = new_capacity;
Expand All @@ -47,23 +54,23 @@ class customQueue {
}

// Get the current capacity
[[nodiscard]] size_t getCapacity() const {
[[nodiscard]] size_t getCapacity() const
{
return capacity;
}

std::vector<T> copy_newElements() {
static size_t lastCopiedIndex = 0; // Tracks the last copied position
std::vector<T> new_elements; // To store new elements
auto current_index = d_queue.size();
{
// std::unique_lock<std::mutex> lock(this->d_mutex); // Lock for thread safety
if (lastCopiedIndex <
current_index) { // Check if there are new elements
new_elements.assign(d_queue.rbegin(), d_queue.rbegin() + (d_queue.size() -
lastCopiedIndex)); // Reverse copy the new elements
lastCopiedIndex = current_index; // Update the index for the next call
}
}
return new_elements; // Return new elements in reverse order
auto begin()
{
return d_queue.rbegin();
}

auto beginFrom(size_t offset)
{
return d_queue.rbegin() + offset;
}

auto end()
{
return d_queue.rend();
}
};
Loading