diff --git a/.github/workflows/config.yml b/.github/workflows/config.yml index 8f3e066f..530f69a5 100644 --- a/.github/workflows/config.yml +++ b/.github/workflows/config.yml @@ -4,7 +4,7 @@ on: [push] jobs: build-mac: - runs-on: macos-14 + runs-on: macos-15 steps: - uses: actions/checkout@v1 - name: "build mac" diff --git a/symmetri/gui/draw_context_menu.cpp b/symmetri/gui/draw_context_menu.cpp index 8ffbad4b..2cd5bea2 100644 --- a/symmetri/gui/draw_context_menu.cpp +++ b/symmetri/gui/draw_context_menu.cpp @@ -12,60 +12,74 @@ void draw_context_menu(const model::ViewModel& vm) { ImGui::PushStyleVar(ImGuiStyleVar_WindowPadding, ImVec2(8, 8)); if (ImGui::BeginPopup("context_menu")) { if (vm.selected_node_idx.has_value()) { - const bool is_place = std::get<0>(vm.selected_node_idx.value()); + const model::Model::NodeType node_type = + std::get<0>(vm.selected_node_idx.value()); const size_t selected_idx = std::get<1>(vm.selected_node_idx.value()); ImGui::Text( - "%s '%s'", (is_place ? "Place" : "Transition"), - (is_place ? vm.net.place : vm.net.transition)[selected_idx].c_str()); + "%s '%s'", + (node_type == model::Model::NodeType::Place ? "Place" : "Transition"), + (node_type == model::Model::NodeType::Place + ? vm.net.place + : vm.net.transition)[selected_idx] + .c_str()); ImGui::Separator(); if (ImGui::BeginMenu("Add arc to...")) { - for (const auto& node_idx : is_place ? vm.t_view : vm.p_view) { - if (is_place) { - drawColorDropdownMenu(vm.net.transition[node_idx].c_str(), - vm.colors, [&](const char* c) { - addArc(is_place, selected_idx, node_idx, - symmetri::Token(c)); + for (const auto& node_idx : node_type == model::Model::NodeType::Place + ? vm.t_view + : vm.p_view) { + if (node_type == model::Model::NodeType::Place) { + drawColorDropdownMenu(vm.net.transition[node_idx], vm.colors, + [&](auto c) -> void { + addArc(node_type, selected_idx, node_idx, + symmetri::Token(c.data())); }); } else if (ImGui::MenuItem((vm.net.place[node_idx].c_str()))) { - addArc(is_place, selected_idx, node_idx, symmetri::Success); + addArc(node_type, selected_idx, node_idx, symmetri::Success); } if (ImGui::IsItemHovered()) { - bool is_target_place = not is_place; + const auto target_node_type = + node_type == model::Model::NodeType::Place + ? model::Model::NodeType::Transition + : model::Model::NodeType::Place; if (not vm.selected_target_node_idx.has_value() || vm.selected_target_node_idx.value() != - std::tuple{is_target_place, node_idx}) { - setSelectedTargetNode(is_target_place, node_idx); + std::tuple{target_node_type, + node_idx}) { + setSelectedTargetNode(target_node_type, node_idx); } } } ImGui::EndMenu(); } - if (is_place) { - drawColorDropdownMenu("Add marking", vm.colors, [=](const char* c) { - addTokenToPlace( - symmetri::AugmentedToken{selected_idx, symmetri::Token(c)}); + if (node_type == model::Model::NodeType::Place) { + drawColorDropdownMenu("Add marking", vm.colors, [=](auto c) -> void { + addTokenToPlace(symmetri::AugmentedToken{selected_idx, + symmetri::Token(c.data())}); }); } if (ImGui::MenuItem("Delete")) { - is_place ? removePlace(selected_idx) : removeTransition(selected_idx); + node_type == model::Model::NodeType::Place + ? removePlace(selected_idx) + : removeTransition(selected_idx); } } else if (vm.selected_arc_idxs.has_value()) { ImGui::Text("Arc"); ImGui::Separator(); if (ImGui::MenuItem("Delete")) { - const auto& [is_input, idx, sub_idx] = vm.selected_arc_idxs.value(); - removeArc(is_input, idx, sub_idx); + const auto& [source_node_type, idx, sub_idx] = + vm.selected_arc_idxs.value(); + removeArc(source_node_type, idx, sub_idx); } } else { // @todo make this offset relative to the menubars ImVec2 scene_pos = ImGui::GetIO().MousePos + ImVec2(-275, -40); if (ImGui::MenuItem("Add place")) { - addNode(true, scene_pos); + addNode(model::Model::NodeType::Place, scene_pos); } if (ImGui::MenuItem("Add transition")) { - addNode(false, scene_pos); + addNode(model::Model::NodeType::Transition, scene_pos); } } ImGui::EndPopup(); diff --git a/symmetri/gui/draw_graph.cpp b/symmetri/gui/draw_graph.cpp index e528aab4..0861a6ef 100644 --- a/symmetri/gui/draw_graph.cpp +++ b/symmetri/gui/draw_graph.cpp @@ -25,7 +25,7 @@ static ImVec2 size; static ImVec2 offset; -static const ImVec2 NODE_WINDOW_PADDING(8.0f, 8.0f); +static const ImVec2 NODE_WINDOW_PADDING(16.0f, 16.0f); static ImVec2 GetCenterPos(const model::Coordinate& pos, const ImVec2& size) { return ImVec2(pos.x + size.x * 0.5f, pos.y + size.y * 0.5f); @@ -46,7 +46,8 @@ void draw_grid(const model::ViewModel& vm) { }; void draw_arc(size_t t_idx, const model::ViewModel& vm) { - const auto draw = [&](const symmetri::AugmentedToken& t, bool is_input, + const auto draw = [&](const symmetri::AugmentedToken& t, + model::Model::NodeType source_node_type, size_t sub_idx) { if (std::find(vm.p_view.begin(), vm.p_view.end(), std::get(t)) == vm.p_view.end()) { @@ -54,12 +55,15 @@ void draw_arc(size_t t_idx, const model::ViewModel& vm) { } const ImVec2 i = - offset + GetCenterPos(!is_input ? vm.t_positions[t_idx] - : vm.p_positions[std::get(t)], - size); + offset + + GetCenterPos(source_node_type == model::Model::NodeType::Transition + ? vm.t_positions[t_idx] + : vm.p_positions[std::get(t)], + size); const ImVec2 o = - offset + GetCenterPos(is_input ? vm.t_positions[t_idx] - : vm.p_positions[std::get(t)], + offset + GetCenterPos(source_node_type == model::Model::NodeType::Place + ? vm.t_positions[t_idx] + : vm.p_positions[std::get(t)], size); const float max_distance = 2.f; @@ -70,12 +74,12 @@ void draw_arc(size_t t_idx, const model::ViewModel& vm) { const bool is_segment_hovered = (ImLengthSqr(mouse_pos_delta_to_segment) <= max_distance * max_distance); if (is_segment_hovered && ImGui::IsMouseClicked(0)) { - setSelectedArc(is_input, t_idx, sub_idx); + setSelectedArc(source_node_type, t_idx, sub_idx); } const auto is_selected_arc = [&]() { return vm.selected_arc_idxs.has_value() && - std::get<0>(*vm.selected_arc_idxs) == is_input && + std::get<0>(*vm.selected_arc_idxs) == source_node_type && std::get<1>(*vm.selected_arc_idxs) == t_idx && std::get<2>(*vm.selected_arc_idxs) == sub_idx; }; @@ -101,26 +105,33 @@ void draw_arc(size_t t_idx, const model::ViewModel& vm) { }; for (size_t sub_idx = 0; sub_idx < vm.net.input_n[t_idx].size(); sub_idx++) { - draw(vm.net.input_n[t_idx][sub_idx], true, sub_idx); + draw(vm.net.input_n[t_idx][sub_idx], model::Model::NodeType::Place, + sub_idx); } for (size_t sub_idx = 0; sub_idx < vm.net.output_n[t_idx].size(); sub_idx++) { - draw(vm.net.output_n[t_idx][sub_idx], false, sub_idx); + draw(vm.net.output_n[t_idx][sub_idx], model::Model::NodeType::Transition, + sub_idx); } }; -void draw_nodes(bool is_place, size_t idx, const std::string& name, - const model::Coordinate& position, bool highlight, +void draw_nodes(model::Model::NodeType node_type, size_t idx, + const std::string& name, const model::Coordinate& position, + bool highlight, const std::vector& tokens) { - ImGui::PushID(is_place ? idx + 10000 : idx); + ImGui::PushID(model::Model::NodeType::Place == node_type ? idx + 10000 : idx); ImVec2 node_rect_min = offset + ImVec2(position.x, position.y); + ImGui::PushFont(NULL, .85f * ImGui::GetFontSize()); // Display node contents first auto textWidth = ImGui::CalcTextSize(name.c_str()).x; - ImGui::SetCursorScreenPos(node_rect_min + NODE_WINDOW_PADDING + - ImVec2(8.0f - textWidth * 0.5f, -20.0f)); + ImGui::SetCursorScreenPos( + node_rect_min + NODE_WINDOW_PADDING + + ImVec2(8.0f - textWidth * 0.5f, -3.0f * NODE_WINDOW_PADDING.y)); ImGui::BeginGroup(); // Lock horizontal position ImGui::Text("%s", name.c_str()); // this crashed once.. + ImGui::EndGroup(); + ImGui::PopFont(); // Save the size of what we have emitted and whether any of the widgets are // being used @@ -135,10 +146,10 @@ void draw_nodes(bool is_place, size_t idx, const std::string& name, const bool node_moving_active = ImGui::IsItemActive(); const bool is_clicked = ImGui::IsMouseClicked(ImGuiMouseButton_Left); if (node_moving_active && is_clicked) { - setSelectedNode(is_place, idx); + setSelectedNode(node_type, idx); } else if (node_moving_active && ImGui::IsMouseDragging(ImGuiMouseButton_Left)) { - moveNode(is_place, idx, ImGui::GetIO().MouseDelta); + moveNode(node_type, idx, ImGui::GetIO().MouseDelta); } const int opacity = 255; @@ -146,7 +157,7 @@ void draw_nodes(bool is_place, size_t idx, const std::string& name, auto select_color = highlight ? IM_COL32(255, 255, 0, opacity) : IM_COL32(100, 100, 100, opacity); - if (is_place) { + if (node_type == model::Model::NodeType::Place) { draw_list->AddCircleFilled(offset + GetCenterPos(position, size), 0.5f * size.x, IM_COL32(200, 200, 200, opacity), -5); @@ -180,7 +191,8 @@ void draw_graph(const model::ViewModel& vm) { if (ImGui::IsMouseClicked(ImGuiMouseButton_Left) && ImGui::IsWindowHovered(ImGuiHoveredFlags_ChildWindows) && !ImGui::IsAnyItemHovered() && - (vm.selected_node_idx.has_value() || vm.selected_arc_idxs.has_value())) { + (vm.selected_node_idx.has_value() || vm.selected_arc_idxs.has_value() || + vm.selected_target_node_idx.has_value())) { resetSelection(); } @@ -233,25 +245,30 @@ void draw_graph(const model::ViewModel& vm) { draw_arc(idx, vm); const bool should_hightlight = (is_selected_node && - (!std::get<0>(vm.selected_node_idx.value()) && + (std::get<0>(vm.selected_node_idx.value()) == + model::Model::NodeType::Transition && idx == std::get<1>(vm.selected_node_idx.value()))) || (is_target_node && - (!std::get<0>(vm.selected_target_node_idx.value()) && + (std::get<0>(vm.selected_target_node_idx.value()) == + model::Model::NodeType::Transition && idx == std::get<1>(vm.selected_target_node_idx.value()))); - draw_nodes(false, idx, vm.net.transition[idx], vm.t_positions[idx], - should_hightlight, vm.tokens); + draw_nodes(model::Model::NodeType::Transition, idx, vm.net.transition[idx], + vm.t_positions[idx], should_hightlight, vm.tokens); } for (auto idx : vm.p_view) { const bool should_hightlight = (is_selected_node && - (std::get<0>(vm.selected_node_idx.value()) && + (std::get<0>(vm.selected_node_idx.value()) == + model::Model::NodeType::Place && idx == std::get<1>(vm.selected_node_idx.value()))) || (is_target_node && - (std::get<0>(vm.selected_target_node_idx.value()) && + (std::get<0>(vm.selected_target_node_idx.value()) == + model::Model::NodeType::Place && idx == std::get<1>(vm.selected_target_node_idx.value()))); - draw_nodes(true, idx, vm.net.place[idx], vm.p_positions[idx], - should_hightlight, vm.tokens); + + draw_nodes(model::Model::NodeType::Place, idx, vm.net.place[idx], + vm.p_positions[idx], should_hightlight, vm.tokens); } // Scrolling diff --git a/symmetri/gui/draw_menu.cpp b/symmetri/gui/draw_menu.cpp index 1fbd62c0..fa549854 100644 --- a/symmetri/gui/draw_menu.cpp +++ b/symmetri/gui/draw_menu.cpp @@ -28,45 +28,50 @@ void drawTokenLine(const symmetri::AugmentedToken& at) { ImGui::SameLine(); ImGui::TextColored( ImGui::ColorConvertU32ToFloat4(getColor(std::get(at))), - "%s", std::get(at).toString()); + "%s", std::get(at).toString().data()); } void draw_menu(const model::ViewModel& vm) { // is now also true if there's nothing selected. const bool is_a_node_selected = vm.selected_node_idx.has_value(); - const bool is_place = vm.selected_node_idx.has_value() && - std::get<0>(vm.selected_node_idx.value()); + const model::Model::NodeType node_type = + std::get<0>(vm.selected_node_idx.value_or( + std::make_tuple(model::Model::NodeType::Place, size_t(0)))); + const size_t selected_idx = is_a_node_selected ? std::get<1>(vm.selected_node_idx.value()) : 9999; draw_simulation_menu(vm); ImGui::Text("Selected"); ImGui::Separator(); - if (vm.selected_node_idx.has_value()) { + if (is_a_node_selected) { ImGui::Text("Name"); ImGui::SameLine(); static int i = 0; const auto id = std::string("##") + std::to_string(i++); ImGui::PushItemWidth(-1); - const auto& model_name = - (is_place ? vm.net.place : vm.net.transition)[selected_idx]; + const auto& model_name = (model::Model::NodeType::Place == node_type + ? vm.net.place + : vm.net.transition)[selected_idx]; ImGui::Text("%s", model_name.c_str()); static char view_name[128] = ""; strcpy(view_name, model_name.c_str()); ImGui::PushItemWidth(-1); - ImGui::InputText( - "##input tex", view_name, 128, ImGuiInputTextFlags_CallbackEdit, - (is_place ? updatePlaceName : updateTransitionName)(selected_idx)); + ImGui::InputText("##input tex", view_name, 128, + ImGuiInputTextFlags_CallbackEdit, + (model::Model::NodeType::Place == node_type + ? updatePlaceName + : updateTransitionName)(selected_idx)); ImGui::PopItemWidth(); label_id = 0; - if (is_place) { + if (model::Model::NodeType::Place == node_type) { ImGui::Text("Marking"); std::ranges::for_each(vm.tokens | std::views::filter([=](const auto& at) { return std::get(at) == selected_idx; }), &drawTokenLine); - } else if (not is_place) { + } else if (model::Model::NodeType::Transition == node_type) { ImGui::Text("Priority"); ImGui::SameLine(); static char view_priority[4] = ""; @@ -79,29 +84,34 @@ void draw_menu(const model::ViewModel& vm) { updatePriority(selected_idx)); ImGui::Text("Output"); ImGui::SameLine(); - if (ImGui::BeginCombo("##output", - fire(vm.net.store[selected_idx]).toString())) { + if (ImGui::BeginCombo( + "##output", fire(vm.net.store[selected_idx]).toString().data())) { for (const auto& color : vm.colors) { - if (ImGui::Selectable(color)) { - updateTransitionOutputColor(selected_idx, symmetri::Token(color)); + if (ImGui::Selectable(color.data())) { + updateTransitionOutputColor(selected_idx, + symmetri::Token(color.data())); } } ImGui::EndCombo(); } } } else if (vm.selected_arc_idxs.has_value()) { - const auto& [is_input, selected_idx, sub_idx] = + const auto [node_type, selected_idx, sub_idx] = vm.selected_arc_idxs.value(); const auto color = std::get( - (is_input ? vm.net.input_n : vm.net.output_n)[selected_idx][sub_idx]); + (node_type == model::Model::NodeType::Place + ? vm.net.input_n + : vm.net.output_n)[selected_idx][sub_idx]); - if (is_input) { - drawColorDropdownMenu(color.toString(), vm.colors, [=](const char* c) { - updateArcColor(is_input, selected_idx, sub_idx, symmetri::Token(c)); - }); + if (node_type == model::Model::NodeType::Place) { + drawColorDropdownMenu(std::string(color.toString()), vm.colors, + [=](auto c) { + updateArcColor(node_type, selected_idx, sub_idx, + symmetri::Token(c.data())); + }); } else { - ImGui::Text("%s", color.toString()); + ImGui::Text("%s", color.toString().data()); } } ImGui::Dummy(ImVec2(0.0f, 20.0f)); @@ -119,8 +129,10 @@ void draw_menu(const model::ViewModel& vm) { ImGui::Separator(); for (const auto& idx : vm.p_view) { - renderNodeEntry(true, vm.net.place[idx], idx, - is_a_node_selected && is_place && idx == selected_idx); + renderNodeEntry(model::Model::NodeType::Place, vm.net.place[idx], idx, + is_a_node_selected && + model::Model::NodeType::Place == node_type && + idx == selected_idx); } ImGui::EndTabItem(); } @@ -135,8 +147,10 @@ void draw_menu(const model::ViewModel& vm) { } ImGui::Separator(); for (const auto& idx : vm.t_view) { - renderNodeEntry(false, vm.net.transition[idx], idx, - is_a_node_selected && !is_place && idx == selected_idx); + renderNodeEntry( + model::Model::NodeType::Transition, vm.net.transition[idx], idx, + is_a_node_selected && model::Model::NodeType::Place != node_type && + idx == selected_idx); } ImGui::EndTabItem(); @@ -146,7 +160,7 @@ void draw_menu(const model::ViewModel& vm) { ImGui::Text("%s,", vm.net.place[place].c_str()); ImGui::SameLine(); ImGui::TextColored(ImGui::ColorConvertU32ToFloat4(getColor(color)), - "%s", color.toString()); + "%s", color.toString().data()); } ImGui::EndTabItem(); } @@ -162,9 +176,9 @@ void draw_menu(const model::ViewModel& vm) { } ImGui::Separator(); for (const auto& color : vm.colors) { - ImGui::TextColored( - ImGui::ColorConvertU32ToFloat4(getColor(symmetri::Token(color))), - "%s", color); + ImGui::TextColored(ImGui::ColorConvertU32ToFloat4( + getColor(symmetri::Token(color.data()))), + "%s", color.data()); } ImGui::EndTabItem(); } diff --git a/symmetri/gui/imgui b/symmetri/gui/imgui index bad5ee16..37b7a7a9 160000 --- a/symmetri/gui/imgui +++ b/symmetri/gui/imgui @@ -1 +1 @@ -Subproject commit bad5ee167b06484e06015bca34ebfb20f8aeed24 +Subproject commit 37b7a7a9dfc4a775c371bbd0ab8ceaf132c78fce diff --git a/symmetri/gui/load_file.cpp b/symmetri/gui/load_file.cpp index e8d7f7f3..ba057bf2 100644 --- a/symmetri/gui/load_file.cpp +++ b/symmetri/gui/load_file.cpp @@ -18,6 +18,7 @@ void loadPetriNet(const std::filesystem::path& file) { auto& m = *model.data; m.selected_arc_idxs.reset(); m.selected_node_idx.reset(); + m.selected_target_node_idx.reset(); m.active_file = file; if (not m.active_file.has_value()) { return model; diff --git a/symmetri/gui/metal_main.mm b/symmetri/gui/metal_main.mm index 261fcd2c..05f03ffa 100644 --- a/symmetri/gui/metal_main.mm +++ b/symmetri/gui/metal_main.mm @@ -12,6 +12,7 @@ #define GLFW_EXPOSE_NATIVE_COCOA #include #include +#include "load_file.h" #import #import @@ -32,8 +33,8 @@ int main(int, char **) { io.ConfigFlags |= ImGuiConfigFlags_NavEnableGamepad; // Enable Gamepad Controls // Setup style - // ImGui::StyleColorsDark(); - ImGui::StyleColorsLight(); + ImGui::StyleColorsDark(); + // ImGui::StyleColorsLight(); // Setup window glfwSetErrorCallback(glfw_error_callback); @@ -62,6 +63,7 @@ int main(int, char **) { MTLRenderPassDescriptor *renderPassDescriptor = [MTLRenderPassDescriptor new]; auto root_subscription = go(); + loadPetriNet("/Users/thomashorstink/Downloads/Inspection.grml"); // Main loop while (!glfwWindowShouldClose(window)) { diff --git a/symmetri/gui/model.cpp b/symmetri/gui/model.cpp index b0e00226..082f7379 100644 --- a/symmetri/gui/model.cpp +++ b/symmetri/gui/model.cpp @@ -43,7 +43,7 @@ ViewModel::ViewModel(Model m) t_fireable(std::accumulate( t_view.begin(), t_view.end(), std::vector{}, [this](std::vector&& t_fireable, size_t t_idx) { - if (canFire(net.input_n[t_idx], tokens)) { + if (canFire(net.input_n.at(t_idx), tokens)) { t_fireable.push_back(t_idx); } return std::move(t_fireable); diff --git a/symmetri/gui/model.h b/symmetri/gui/model.h index 571cb1b3..f68a548d 100644 --- a/symmetri/gui/model.h +++ b/symmetri/gui/model.h @@ -12,59 +12,64 @@ namespace model { struct ViewModel; -using Drawable = void (*)(const ViewModel &); +using Drawable = void (*)(const ViewModel&); struct Coordinate { float x, y; }; -Coordinate operator+(Coordinate &lhs, Coordinate &rhs); -Coordinate &operator+=(Coordinate &lhs, const Coordinate &rhs); +Coordinate operator+(Coordinate& lhs, Coordinate& rhs); +Coordinate& operator+=(Coordinate& lhs, const Coordinate& rhs); struct Model { + enum class NodeType { Place, Transition }; + // using SelectedNode = std::pair; + struct shared { - bool show_grid; + bool show_grid = true; Coordinate scrolling; - std::optional> selected_arc_idxs; - std::optional> selected_node_idx, + std::optional> selected_arc_idxs; + std::optional> selected_node_idx, selected_target_node_idx; std::optional active_file; std::map> blockers; std::vector t_positions, p_positions; std::vector t_view, p_view; - std::vector colors = symmetri::Token::getColors(); + std::vector colors = symmetri::Token::getColors(); std::vector tokens; std::vector drawables; symmetri::Petri::PTNet net; }; - std::shared_ptr data = std::make_shared(); + Model() : data(std::make_shared()) {} + std::shared_ptr data; }; struct ViewModel { - std::vector drawables; - bool show_grid; - Coordinate scrolling; - // is place, index, sub-index - std::optional> selected_arc_idxs; - // is place | index - std::optional> selected_node_idx, + const std::vector drawables; + const bool show_grid; + const Coordinate scrolling; + // source NodeType, source index, target sub-index + std::optional> + selected_arc_idxs; + // NodeType | index + std::optional> selected_node_idx, selected_target_node_idx; const std::string active_file; - std::vector t_view, p_view; - std::vector colors; - std::vector tokens; + const std::vector t_view, p_view; + const std::vector colors; + const std::vector tokens; - const symmetri::Petri::PTNet &net; - const std::vector &t_positions, &p_positions; - std::vector t_fireable; + const symmetri::Petri::PTNet& net; + const std::vector&t_positions, &p_positions; + const std::vector t_fireable; ViewModel() = delete; ViewModel(Model m); }; -using Reducer = std::function; +using Reducer = std::function; using Computer = std::function; Model initializeModel(); diff --git a/symmetri/gui/position_parsers.cpp b/symmetri/gui/position_parsers.cpp index b11aeeef..daed3131 100644 --- a/symmetri/gui/position_parsers.cpp +++ b/symmetri/gui/position_parsers.cpp @@ -57,8 +57,8 @@ std::map readGrmlPositions( std::string(attribute->Attribute("name")); if (child_attribute == "name") { auto place_id = std::string(attribute->GetText()); - positions[place_id] = {std::stof(child->Attribute("x")), - std::stof(child->Attribute("y"))}; + positions[place_id] = {2.0f * std::stof(child->Attribute("x")), + 2.0f * std::stof(child->Attribute("y"))}; } } } diff --git a/symmetri/gui/reducers.cpp b/symmetri/gui/reducers.cpp index 90983cf6..587b7765 100644 --- a/symmetri/gui/reducers.cpp +++ b/symmetri/gui/reducers.cpp @@ -18,9 +18,10 @@ void moveView(const ImVec2& d) { }); } -void moveNode(bool is_place, size_t idx, const ImVec2& d) { +void moveNode(model::Model::NodeType node_type, size_t idx, const ImVec2& d) { rxdispatch::push([=](model::Model&& m) mutable { - (is_place ? m.data->p_positions[idx] : m.data->t_positions[idx]) += + (model::Model::NodeType::Place == node_type ? m.data->p_positions[idx] + : m.data->t_positions[idx]) += model::Coordinate{d.x, d.y}; return m; }); @@ -45,9 +46,9 @@ std::string viewContainsNameAlready(const std::vector& view, return std::move(name); } -void addNode(bool is_place, ImVec2 pos) { +void addNode(model::Model::NodeType node_type, ImVec2 pos) { rxdispatch::push([=](model::Model&& m) mutable { - if (is_place) { + if (model::Model::NodeType::Place == node_type) { m.data->net.place.push_back( viewContainsNameAlready(m.data->p_view, m.data->net.place, "place")); m.data->net.p_to_ts_n.push_back({}); @@ -69,7 +70,8 @@ void addNode(bool is_place, ImVec2 pos) { }); } -void removeArc(bool is_input, size_t transition_idx, size_t sub_idx) { +void removeArc(model::Model::NodeType source_node_type, size_t transition_idx, + size_t sub_idx) { rxdispatch::push([=](model::Model&& m) mutable { // remove transition from view m.data->t_view.erase(std::remove(m.data->t_view.begin(), @@ -86,7 +88,7 @@ void removeArc(bool is_input, size_t transition_idx, size_t sub_idx) { m.data->t_view.push_back(new_idx); // now copy the arcs, except for the one we want to delete - if (is_input) { + if (source_node_type == model::Model::NodeType::Place) { m.data->net.output_n.push_back(m.data->net.output_n[transition_idx]); m.data->net.input_n.push_back({}); std::copy_if(m.data->net.input_n[transition_idx].begin(), @@ -107,10 +109,11 @@ void removeArc(bool is_input, size_t transition_idx, size_t sub_idx) { }); } -void addArc(bool is_place, size_t source, size_t target, +void addArc(model::Model::NodeType node_type, size_t source, size_t target, symmetri::Token color) { rxdispatch::push([=](model::Model&& m) mutable { - const size_t transition_idx = is_place ? target : source; + const size_t transition_idx = + model::Model::NodeType::Transition == node_type ? source : target; // remove transition from view m.data->t_view.erase(std::remove(m.data->t_view.begin(), m.data->t_view.end(), transition_idx), @@ -128,7 +131,7 @@ void addArc(bool is_place, size_t source, size_t target, m.data->t_view.push_back(new_transition_idx); // add the arc - if (is_place) { + if (model::Model::NodeType::Place == node_type) { auto& p_to_ts = m.data->net.p_to_ts_n[source]; m.data->net.input_n[new_transition_idx].push_back({source, color}); if (std::find(p_to_ts.begin(), p_to_ts.end(), transition_idx) == @@ -140,7 +143,6 @@ void addArc(bool is_place, size_t source, size_t target, } std::erase(m.data->drawables, &draw_context_menu); - return m; }); } @@ -171,17 +173,19 @@ void removeTransition(size_t idx) { std::remove(m.data->t_view.begin(), m.data->t_view.end(), idx), m.data->t_view.end()); m.data->selected_node_idx.reset(); + m.data->selected_target_node_idx.reset(); std::erase(m.data->drawables, &draw_context_menu); return m; }); } -void updateArcColor(bool is_input, size_t idx, size_t sub_idx, - const symmetri::Token color) { +void updateArcColor(model::Model::NodeType source_node_type, size_t idx, + size_t sub_idx, const symmetri::Token color) { rxdispatch::push([=](model::Model&& m) mutable { - std::get( - (is_input ? m.data->net.input_n : m.data->net.output_n)[idx][sub_idx]) = + std::get((source_node_type == model::Model::NodeType::Place + ? m.data->net.input_n + : m.data->net.output_n)[idx][sub_idx]) = color; return m; }); @@ -250,17 +254,17 @@ void setContextMenuInactive() { }); } -void setSelectedNode(bool is_place, size_t idx) { +void setSelectedNode(model::Model::NodeType node_type, size_t idx) { rxdispatch::push([=](model::Model&& m) { - m.data->selected_node_idx = {is_place, idx}; + m.data->selected_node_idx = {node_type, idx}; m.data->selected_arc_idxs.reset(); return m; }); }; -void setSelectedTargetNode(bool is_place, size_t idx) { +void setSelectedTargetNode(model::Model::NodeType node_type, size_t idx) { rxdispatch::push([=](model::Model&& m) { - m.data->selected_target_node_idx = {is_place, idx}; + m.data->selected_target_node_idx = {node_type, idx}; return m; }); }; @@ -269,23 +273,26 @@ void resetSelection() { rxdispatch::push([](model::Model&& m) { m.data->selected_node_idx.reset(); m.data->selected_arc_idxs.reset(); + m.data->selected_target_node_idx.reset(); return m; }); }; -void setSelectedArc(bool is_input, size_t idx, size_t sub_idx) { +void setSelectedArc(model::Model::NodeType source_node_type, size_t t_idx, + size_t p_idx) { rxdispatch::push([=](model::Model&& m) { m.data->selected_node_idx.reset(); - m.data->selected_arc_idxs = {is_input, idx, sub_idx}; + m.data->selected_target_node_idx.reset(); + m.data->selected_arc_idxs = {source_node_type, t_idx, p_idx}; return m; }); }; -void renderNodeEntry(bool is_place, const std::string& name, size_t idx, - bool selected) { +void renderNodeEntry(model::Model::NodeType node_type, const std::string& name, + size_t idx, bool selected) { ImGui::PushID(idx); if (ImGui::Selectable(name.c_str(), selected)) { - setSelectedNode(is_place, idx); + setSelectedNode(node_type, idx); } ImGui::PopID(); } diff --git a/symmetri/gui/reducers.h b/symmetri/gui/reducers.h index 38b48721..48caac39 100644 --- a/symmetri/gui/reducers.h +++ b/symmetri/gui/reducers.h @@ -9,13 +9,15 @@ void moveView(const ImVec2& d); -void moveNode(bool is_place, size_t idx, const ImVec2& d); +void moveNode(model::Model::NodeType node_type, size_t idx, const ImVec2& d); -void addNode(bool is_place, ImVec2 pos); +void addNode(model::Model::NodeType node_type, ImVec2 pos); -void removeArc(bool is_input, size_t transition_idx, size_t sub_idx); +void removeArc(model::Model::NodeType source_node_type, size_t transition_idx, + size_t sub_idx); -void addArc(bool is_place, size_t source, size_t target, symmetri::Token color); +void addArc(model::Model::NodeType node_type, size_t source, size_t target, + symmetri::Token color); void removePlace(size_t idx); @@ -23,8 +25,8 @@ void removeTransition(size_t idx); void showGrid(bool show_grid); -void updateArcColor(bool is_input, size_t idx, size_t sub_idx, - const symmetri::Token color); +void updateArcColor(model::Model::NodeType source_node_type, size_t idx, + size_t sub_idx, const symmetri::Token color); ImGuiInputTextCallback updatePlaceName(const size_t id); ImGuiInputTextCallback updateTransitionName(const size_t id); @@ -34,9 +36,9 @@ void setContextMenuActive(); void setContextMenuInactive(); -void setSelectedNode(bool is_place, size_t idx); +void setSelectedNode(model::Model::NodeType node_type, size_t idx); -void setSelectedTargetNode(bool is_place, size_t idx); +void setSelectedTargetNode(model::Model::NodeType node_type, size_t idx); void resetSelectedTargetNode(); @@ -44,10 +46,11 @@ void resetSelection(); void resetNetView(); -void setSelectedArc(bool is_input, size_t idx, size_t sub_idx); +void setSelectedArc(model::Model::NodeType source_node_type, size_t source_idx, + size_t target_idx); -void renderNodeEntry(bool is_place, const std::string& name, size_t idx, - bool selected); +void renderNodeEntry(model::Model::NodeType node_type, const std::string& name, + size_t idx, bool selected); int updateActiveFile(ImGuiInputTextCallbackData* data); diff --git a/symmetri/gui/rpp/rpp/disposables.hpp b/symmetri/gui/rpp/rpp/disposables.hpp index a25eed09..a4a146c9 100644 --- a/symmetri/gui/rpp/rpp/disposables.hpp +++ b/symmetri/gui/rpp/rpp/disposables.hpp @@ -12,11 +12,43 @@ /** * @defgroup disposables Disposables - * @brief Disposable owns some resource and provides ability to `dispose()` it: - * destroy/remove/disconnect and etc. - * @details In RPP it used as "inverted subscription": observable sets - * disposable to observer via `set_upstream(disposable)` with meaning "if you - * want to cancel me -> dispose this disposable" + * + * @brief Disposable is handle/resource passed from observable to observer via + * the `set_upstream` method. Observer disposes this disposable when it wants to + * unsubscribe from observable. + * + * @details In reactive programming, a **disposable** is an object that + * represents a resource that needs to be released or disposed of when it is no + * longer needed. This can include things like file handles, network + * connections, or any other resource that needs to be cleaned up after use. The + * purpose of a disposable is to provide a way to manage resources in a safe and + * efficient manner. By using disposables, you can ensure that resources are + * released in a timely manner, preventing memory leaks and other issues that + * can arise from resource leaks. + * + * There are 2 main purposes of disposables: + * 1. **Upstream disposable**
+ * This is a disposable that the observable puts into the observer. + * The upstream disposable keeps some state or callback that should be disposed + * of when the observer is disposed (== no longer wants to receive emissions, + * for example, was completed/errored or just unsubscribed) This ensures that + * any resources used by the observable are properly cleaned up when the + * observer obtains on_error/on_completed or disposed in any other way. + * + * 2. **External disposable**
+ * This is a disposable that allows the observer to be disposed of from outside + * the observer itself. This can be useful in situations where you need to + * cancel an ongoing operation or release resources before the observable has + * completed its work. To achieve this in rpp you can pass disposable to + * `subscribe` method or use `subscribe_with_disposable` overload instead. + * + * @note In rpp all disposables should be created via @link + * rpp::disposable_wrapper_impl @endlink instead of manually. + * + * @warning From user of rpp library it is not really expected to handle + * disposables manually somehow **except** of case where user want to control + * lifetime of observable-observer connection manually. + * * @ingroup rpp */ diff --git a/symmetri/gui/rpp/rpp/disposables/composite_disposable.hpp b/symmetri/gui/rpp/rpp/disposables/composite_disposable.hpp index 25b05d9a..82cf2850 100644 --- a/symmetri/gui/rpp/rpp/disposables/composite_disposable.hpp +++ b/symmetri/gui/rpp/rpp/disposables/composite_disposable.hpp @@ -10,168 +10,146 @@ #pragma once -#include - +#include #include #include +#include #include -#include - -namespace rpp -{ - /** - * @brief Disposable which can keep some other sub-disposables. When this root disposable is disposed, then all sub-disposables would be disposed too. - * @tparam Container is type of internal storage used to keep dependencies - * - * @ingroup disposables - */ - template - class composite_disposable_impl : public interface_composite_disposable - { - public: - composite_disposable_impl() = default; - composite_disposable_impl(const composite_disposable_impl&) = delete; - composite_disposable_impl(composite_disposable_impl&& other) noexcept = delete; - - bool is_disposed() const noexcept final - { - // just need atomicity, not guarding anything - return m_current_state.load(std::memory_order::seq_cst) == State::Disposed; +namespace rpp { +/** + * @brief Disposable which can keep some other sub-disposables. When this root + * disposable is disposed, then all sub-disposables would be disposed too. + * @tparam Container is type of internal storage used to keep dependencies + * + * @ingroup disposables + */ +template +class composite_disposable_impl : public interface_composite_disposable { + public: + composite_disposable_impl() = default; + composite_disposable_impl(const composite_disposable_impl&) = delete; + composite_disposable_impl(composite_disposable_impl&& other) noexcept = + delete; + + bool is_disposed() const noexcept final { + // just need atomicity, not guarding anything + return m_current_state.load(std::memory_order::seq_cst) == State::Disposed; + } + + void dispose_impl(interface_disposable::Mode mode) noexcept final { + while (true) { + State expected{State::None}; + // need to acquire possible state changing from `add` + if (m_current_state.compare_exchange_strong(expected, State::Disposed, + std::memory_order::seq_cst)) { + composite_dispose_impl(mode); + + m_disposables.dispose(); + m_disposables.clear(); + return; + } + + if (expected == State::Disposed) return; + } + } + + using interface_composite_disposable::add; + + void add(disposable_wrapper disposable) override { + if (disposable.is_disposed() || disposable.lock().get() == this) return; + + while (true) { + State expected{State::None}; + // need to acquire possible disposables state changing from other `add` + if (m_current_state.compare_exchange_strong(expected, State::Edit, + std::memory_order::seq_cst)) { + try { + m_disposables.push_back(std::move(disposable)); + } catch (...) { + m_current_state.store(State::None, std::memory_order::seq_cst); + throw; } - - void dispose_impl(interface_disposable::Mode mode) noexcept final - { - while (true) - { - State expected{State::None}; - // need to acquire possible state changing from `add` - if (m_current_state.compare_exchange_strong(expected, State::Disposed, std::memory_order::seq_cst)) - { - composite_dispose_impl(mode); - - m_disposables.dispose(); - m_disposables.clear(); - return; - } - - if (expected == State::Disposed) - return; - } + // need to propogate disposables state changing to others + m_current_state.store(State::None, std::memory_order::seq_cst); + return; + } + + if (expected == State::Disposed) { + disposable.dispose(); + return; + } + } + } + + void remove(const disposable_wrapper& disposable) override { + while (true) { + State expected{State::None}; + // need to acquire possible disposables state changing from other `add` or + // `remove` + if (m_current_state.compare_exchange_strong(expected, State::Edit, + std::memory_order::seq_cst)) { + try { + m_disposables.remove(disposable); + } catch (...) { + m_current_state.store(State::None, std::memory_order::seq_cst); + throw; } - - using interface_composite_disposable::add; - - void add(disposable_wrapper disposable) override - { - if (disposable.is_disposed() || disposable.lock().get() == this) - return; - - while (true) - { - State expected{State::None}; - // need to acquire possible disposables state changing from other `add` - if (m_current_state.compare_exchange_strong(expected, State::Edit, std::memory_order::seq_cst)) - { - try - { - m_disposables.push_back(std::move(disposable)); - } - catch (...) - { - m_current_state.store(State::None, std::memory_order::seq_cst); - throw; - } - // need to propogate disposables state changing to others - m_current_state.store(State::None, std::memory_order::seq_cst); - return; - } - - if (expected == State::Disposed) - { - disposable.dispose(); - return; - } - } - } - - void remove(const disposable_wrapper& disposable) override - { - while (true) - { - State expected{State::None}; - // need to acquire possible disposables state changing from other `add` or `remove` - if (m_current_state.compare_exchange_strong(expected, State::Edit, std::memory_order::seq_cst)) - { - try - { - m_disposables.remove(disposable); - } - catch (...) - { - m_current_state.store(State::None, std::memory_order::seq_cst); - throw; - } - // need to propogate disposables state changing to others - m_current_state.store(State::None, std::memory_order::seq_cst); - return; - } - - if (expected == State::Disposed) - return; - } + // need to propogate disposables state changing to others + m_current_state.store(State::None, std::memory_order::seq_cst); + return; + } + + if (expected == State::Disposed) return; + } + } + + void clear() override { + while (true) { + State expected{State::None}; + // need to acquire possible disposables state changing from other `add` or + // `remove` + if (m_current_state.compare_exchange_strong(expected, State::Edit, + std::memory_order::seq_cst)) { + try { + m_disposables.dispose(); + m_disposables.clear(); + } catch (...) { + m_current_state.store(State::None, std::memory_order::seq_cst); + throw; } - - void clear() override - { - while (true) - { - State expected{State::None}; - // need to acquire possible disposables state changing from other `add` or `remove` - if (m_current_state.compare_exchange_strong(expected, State::Edit, std::memory_order::seq_cst)) - { - try - { - m_disposables.dispose(); - m_disposables.clear(); - } - catch (...) - { - m_current_state.store(State::None, std::memory_order::seq_cst); - throw; - } - // need to propogate disposables state changing to others - m_current_state.store(State::None, std::memory_order::seq_cst); - return; - } - - if (expected == State::Disposed) - return; - } - } - - protected: - virtual void composite_dispose_impl(interface_disposable::Mode) noexcept {} - - private: - enum class State : uint8_t - { - None, // default state - Edit, // set it during adding new element into deps or removing. After success -> back to None - Disposed // permanent state after dispose - }; - - Container m_disposables{}; - std::atomic m_current_state{}; - }; - - /** - * @brief Disposable which can keep some other sub-disposables. When this root disposable is disposed, then all sub-disposables would be disposed too. - * @note By default uses vector as internal storage - * - * @ingroup disposables - */ - class composite_disposable : public composite_disposable_impl> - { - }; -} // namespace rpp + // need to propogate disposables state changing to others + m_current_state.store(State::None, std::memory_order::seq_cst); + return; + } + + if (expected == State::Disposed) return; + } + } + + protected: + virtual void composite_dispose_impl(interface_disposable::Mode) noexcept {} + + private: + enum class State : uint8_t { + None, // default state + Edit, // set it during adding new element into deps or removing. After + // success -> back to None + Disposed // permanent state after dispose + }; + + Container m_disposables{}; + std::atomic m_current_state{}; +}; + +/** + * @brief Disposable which can keep some other sub-disposables. When this root + * disposable is disposed, then all sub-disposables would be disposed too. + * @note By default uses vector as internal storage + * + * @ingroup disposables + */ +class composite_disposable + : public composite_disposable_impl< + rpp::details::disposables::default_disposables_container> {}; +} // namespace rpp diff --git a/symmetri/gui/rpp/rpp/disposables/details/container.hpp b/symmetri/gui/rpp/rpp/disposables/details/container.hpp index 048e97a3..7b978c2e 100644 --- a/symmetri/gui/rpp/rpp/disposables/details/container.hpp +++ b/symmetri/gui/rpp/rpp/disposables/details/container.hpp @@ -10,16 +10,24 @@ #pragma once #include +#include #include #include #include namespace rpp::details::disposables { -class dynamic_disposables_container_base { +class dynamic_disposables_container { public: - explicit dynamic_disposables_container_base(size_t count) { - m_data.reserve(count); - } + explicit dynamic_disposables_container() = default; + + dynamic_disposables_container(const dynamic_disposables_container&) = delete; + dynamic_disposables_container( + dynamic_disposables_container&& other) noexcept = default; + + dynamic_disposables_container& operator=( + const dynamic_disposables_container& other) = delete; + dynamic_disposables_container& operator=( + dynamic_disposables_container&& other) noexcept = default; void push_back(const rpp::disposable_wrapper& d) { m_data.push_back(d); } @@ -43,13 +51,6 @@ class dynamic_disposables_container_base { mutable std::vector m_data{}; }; -template -class dynamic_disposables_container - : public dynamic_disposables_container_base { - public: - dynamic_disposables_container() : dynamic_disposables_container_base{Count} {} -}; - template class static_disposables_container { public: @@ -132,11 +133,13 @@ class static_disposables_container { size_t m_size{}; }; -struct none_disposables_container { +template <> +class static_disposables_container<0> { + public: [[noreturn]] static void push_back(const rpp::disposable_wrapper&) { throw rpp::utils::more_disposables_than_expected{ - "none_disposables_container expected none disposables but obtained " - "one"}; + "static_disposables_container<0> expected no disposables but received " + "at least one"}; } static void remove(const rpp::disposable_wrapper&) {} diff --git a/symmetri/gui/rpp/rpp/disposables/disposable_wrapper.hpp b/symmetri/gui/rpp/rpp/disposables/disposable_wrapper.hpp index cfe9b384..05e7b317 100644 --- a/symmetri/gui/rpp/rpp/disposables/disposable_wrapper.hpp +++ b/symmetri/gui/rpp/rpp/disposables/disposable_wrapper.hpp @@ -95,12 +95,53 @@ class disposable_wrapper_base { namespace rpp { /** - * @brief Wrapper to keep disposable. Any disposable have to be created right - * from this wrapper with help of `make` function. - * @details Member functions is safe to call even if internal disposable is - * gone. Also it provides access to "raw" shared_ptr and it can be nullptr in - * case of disposable empty/ptr gone. - * @details Can keep weak_ptr in case of not owning disposable + * @brief Main RPP wrapper over @link disposables @endlink. + * @details This wrapper invented to provide safe and easy-to-use access to + disposables. It has next core points: + * - disposable_wrapper is kind of smart_pointer (like std::shared_ptr) but for + disposables. So, default constructed wrapper is empty wrapper. + * - disposable_wrapper shares ownership like std::shared_ptr + * - any disposable created via disposable_wrapper would have call `dispose()` + during it's destruction (during destruction of last disposable_wrapper owning + it) + * - disposable_wrapper's methods is safe to use over empty/gone/disposed/weak + disposables. + * - as soon as disposable can be actually "any internal state" it provides + access to "raw" shared_ptr and it can be nullptr in case of disposable + empty/ptr gone. + * - disposable_wrapper can be strong or weak (same as std::shared_ptr). weak + disposable is important, for example, when it keeps observer and this observer + should keep this disposable at the same time. + * - disposable_wrapper has popluar methods to work with disposable: + `dispose()`, `is_disposed()` and `add()`/`remove()`/`clear()` (for + `interface_composite_disposable`). + * + * To construct wrapper you have to use `make` method: + * @code{cpp} + * auto d = + rpp::disposable_wrapper::make(some_arguments, + to_construct_it); + * @endcode + * + * To achieve desired performance RPP is avoiding to returning disposable by + default. So, it is why `subscribe` method is not returning anything by default. + If you want to attach disposable to observer you can use overloading method + accepting disposable as first argument like this: + * @code{cpp} + * auto d = rpp::composite_disposable_wrapper::make(); + * observable.subscribe(d, [](int v){}); + * @endcode + + * or use `subscribe_with_disposable` method instead + * @code{cpp} + * auto d = observable.subscribe_with_disposable([](int){}); + * @endcode + * + * @note rpp has 2 predefined disposable_wrappers for most popular cases: + * - @link rpp::disposable_wrapper @endlink is wrapper for simple @link + rpp::interface_disposable @endlink + * - @link rpp::composite_disposable_wrapper @endlink is wrapper for @link + rpp::composite_disposable @endlink * * @ingroup disposables */ @@ -120,8 +161,13 @@ class disposable_wrapper_impl final : public details::disposable_wrapper_base { bool operator==(const disposable_wrapper_impl&) const = default; /** - * @brief Way to create disposable_wrapper. Passed `TTarget` type can be any - * type derived from `TDisposable`. + * @brief Main way to create disposable_wrapper. Passed `TTarget` type can be + * any type derived from `TDisposable`. + * @par Example: + * + * \code{cpp} + * rpp::disposable_wrapper::make(); + * \endcode */ template TTarget = TDefaultMake, typename... TArgs> diff --git a/symmetri/gui/rpp/rpp/disposables/fwd.hpp b/symmetri/gui/rpp/rpp/disposables/fwd.hpp index 6cfb7344..54516a89 100644 --- a/symmetri/gui/rpp/rpp/disposables/fwd.hpp +++ b/symmetri/gui/rpp/rpp/disposables/fwd.hpp @@ -12,65 +12,69 @@ #include -namespace rpp::details -{ - template - class auto_dispose_wrapper; -} // namespace rpp::details +namespace rpp::details { +template +class auto_dispose_wrapper; +} // namespace rpp::details -namespace rpp -{ - struct interface_disposable; - struct interface_composite_disposable; +namespace rpp { +struct interface_disposable; +struct interface_composite_disposable; - template - class disposable_wrapper_impl; +template +class disposable_wrapper_impl; - /** - * @brief Wrapper to keep "simple" disposable. Specialization of rpp::disposable_wrapper_impl - * - * @ingroup disposables - */ - using disposable_wrapper = disposable_wrapper_impl; +/** + * @brief Wrapper to keep "simple" disposable. Specialization of + * rpp::disposable_wrapper_impl + * + * @ingroup disposables + */ +using disposable_wrapper = disposable_wrapper_impl; - /** - * @brief Wrapper to keep "composite" disposable. Specialization of rpp::disposable_wrapper_impl - * - * @ingroup disposables - */ - using composite_disposable_wrapper = disposable_wrapper_impl; -} // namespace rpp +/** + * @brief Wrapper to keep "composite" disposable. Specialization of + * rpp::disposable_wrapper_impl + * + * @ingroup disposables + */ +using composite_disposable_wrapper = + disposable_wrapper_impl; +} // namespace rpp -namespace rpp::details::disposables -{ - template - class dynamic_disposables_container; +namespace rpp::details::disposables { +namespace constraint { +template +concept disposables_container = + requires(T& c, const T& const_c, const rpp::disposable_wrapper& d) { + c.push_back(d); + const_c.dispose(); + c.clear(); + }; +} // namespace constraint - template - class static_disposables_container; +/** + * @brief Container with std::vector as underlying storage. + */ +class dynamic_disposables_container; - struct none_disposables_container; +/** + * @brief Container with fixed std::array as underlying storage. + */ +template +class static_disposables_container; - namespace constraint - { - template - concept disposable_container = requires(T& c, const T& const_c, const rpp::disposable_wrapper& d) { - c.push_back(d); - const_c.dispose(); - c.clear(); - }; - } // namespace constraint -} // namespace rpp::details::disposables +using default_disposables_container = dynamic_disposables_container; +} // namespace rpp::details::disposables -namespace rpp -{ - class composite_disposable; +namespace rpp { +class composite_disposable; - template - class callback_disposable; +template +class callback_disposable; - class refcount_disposable; +class refcount_disposable; - template - disposable_wrapper make_callback_disposable(Fn&& invocable); -} // namespace rpp +template +disposable_wrapper make_callback_disposable(Fn&& invocable); +} // namespace rpp diff --git a/symmetri/gui/rpp/rpp/disposables/refcount_disposable.hpp b/symmetri/gui/rpp/rpp/disposables/refcount_disposable.hpp index 91889427..1c19e65b 100644 --- a/symmetri/gui/rpp/rpp/disposables/refcount_disposable.hpp +++ b/symmetri/gui/rpp/rpp/disposables/refcount_disposable.hpp @@ -47,8 +47,7 @@ class refcount_disposable friend class details::refocunt_disposable_inner; refcount_disposable() = default; - enum class Mode : bool { WeakRefStrongSource, StrongRefRefSource }; - composite_disposable_wrapper add_ref(Mode mode = Mode::WeakRefStrongSource); + composite_disposable_wrapper add_ref(); private: std::atomic m_refcount{0}; @@ -80,8 +79,7 @@ class refocunt_disposable_inner final } // namespace rpp::details namespace rpp { -inline composite_disposable_wrapper refcount_disposable::add_ref( - refcount_disposable::Mode mode) { +inline composite_disposable_wrapper refcount_disposable::add_ref() { auto current_value = m_refcount.load(std::memory_order::seq_cst); while (true) { if (current_value == s_disposed) @@ -91,10 +89,8 @@ inline composite_disposable_wrapper refcount_disposable::add_ref( if (m_refcount.compare_exchange_strong(current_value, current_value + 1, std::memory_order::seq_cst)) { auto inner = composite_disposable_wrapper::make< - details::refocunt_disposable_inner>( - mode == Mode::WeakRefStrongSource ? wrapper_from_this() - : wrapper_from_this().as_weak()); - add(mode == Mode::WeakRefStrongSource ? inner.as_weak() : inner); + details::refocunt_disposable_inner>(wrapper_from_this()); + add(inner.as_weak()); return inner; } } diff --git a/symmetri/gui/rpp/rpp/observables.hpp b/symmetri/gui/rpp/rpp/observables.hpp index a4e37078..1e524411 100644 --- a/symmetri/gui/rpp/rpp/observables.hpp +++ b/symmetri/gui/rpp/rpp/observables.hpp @@ -42,7 +42,7 @@ * instead if you can't guarantee serial emissions. * * For example: - * \code{.cpp} + * @code{.cpp} * auto s1 = rpp::source::just(1) | rpp::operators::repeat() | * rpp::operators::subscribe_on(rpp::schedulers::new_thread{}); auto s2 = * rpp::source::just(2) | rpp::operators::repeat() | @@ -56,18 +56,18 @@ * }) * | rpp::operators::as_blocking() * | rpp::operators::subscribe([](int){}); - * \endcode + * @endcode * * This will never produce: - * \code{.log} + * @code{.log} * enter 1 * enter 2 * exit 2 * exit 1 - * \endcode + * @endcode * * Only serially: - * \code{.log} + * @code{.log} * enter 1 * exit 1 * enter 1 @@ -76,13 +76,14 @@ * exit 2 * enter 2 * exit 2 - * \endcode + * @endcode * @see https://reactivex.io/documentation/observable.html * @ingroup rpp */ #include #include +#include #include #include #include diff --git a/symmetri/gui/rpp/rpp/observables/blocking_observable.hpp b/symmetri/gui/rpp/rpp/observables/blocking_observable.hpp index 04a45ec6..5762dadd 100644 --- a/symmetri/gui/rpp/rpp/observables/blocking_observable.hpp +++ b/symmetri/gui/rpp/rpp/observables/blocking_observable.hpp @@ -42,9 +42,8 @@ template ::template add<1>; + using optimal_disposables_strategy = + typename Strategy::optimal_disposables_strategy::template add<1>; blocking_strategy(observable&& observable) : m_original{std::move(observable)} {} diff --git a/symmetri/gui/rpp/rpp/observables/connectable_observable.hpp b/symmetri/gui/rpp/rpp/observables/connectable_observable.hpp index 0887914c..a74cee9c 100644 --- a/symmetri/gui/rpp/rpp/observables/connectable_observable.hpp +++ b/symmetri/gui/rpp/rpp/observables/connectable_observable.hpp @@ -33,10 +33,9 @@ struct ref_count_on_subscribe_t< std::shared_ptr m_state = std::make_shared(); using value_type = rpp::utils::extract_observable_type_t; - using expected_disposable_strategy = - typename rpp::details::observables::deduce_disposable_strategy_t< - rpp::connectable_observable>::template add<1>; + using optimal_disposables_strategy = + typename rpp::connectable_observable:: + optimal_disposables_strategy::template add<1>; template Strategy> void subscribe(observer&& obs) const { @@ -71,7 +70,7 @@ namespace rpp { * @ingroup observables */ template -class connectable_observable final +class connectable_observable : public decltype(std::declval().get_observable()) { using base = decltype(std::declval().get_observable()); @@ -79,13 +78,13 @@ class connectable_observable final static_assert(rpp::constraint::subject); connectable_observable(const OriginalObservable& original_observable, - const Subject& subject = Subject{}) + const Subject& subject) : base{subject.get_observable()}, m_original_observable{original_observable}, m_subject{subject} {} connectable_observable(OriginalObservable && original_observable, - const Subject& subject = Subject{}) + const Subject& subject) : base{subject.get_observable()}, m_original_observable{std::move(original_observable)}, m_subject{subject} {} @@ -166,6 +165,15 @@ class connectable_observable final return std::move(*this) | std::forward(op); } + auto as_dynamic_connectable() const& { + return rpp::dynamic_connectable_observable{ + m_original_observable.as_dynamic(), m_subject}; + } + auto as_dynamic_connectable()&& { + return rpp::dynamic_connectable_observable{ + std::move(m_original_observable).as_dynamic(), std::move(m_subject)}; + } + private: RPP_NO_UNIQUE_ADDRESS OriginalObservable m_original_observable; Subject m_subject; diff --git a/symmetri/gui/rpp/rpp/observables/details/chain_strategy.hpp b/symmetri/gui/rpp/rpp/observables/details/chain_strategy.hpp index c05d9009..f4386790 100644 --- a/symmetri/gui/rpp/rpp/observables/details/chain_strategy.hpp +++ b/symmetri/gui/rpp/rpp/observables/details/chain_strategy.hpp @@ -22,13 +22,14 @@ class chain { using operator_traits = typename TStrategy::template operator_traits; - static_assert(rpp::constraint::operator_chain< - TStrategy, typename base::value_type, - typename base::expected_disposable_strategy>); + static_assert( + rpp::constraint::operator_); public: - using expected_disposable_strategy = deduce_updated_disposable_strategy< - TStrategy, typename base::expected_disposable_strategy>; + using optimal_disposables_strategy = + typename TStrategy::template updated_optimal_disposables_strategy< + typename base::optimal_disposables_strategy>; using value_type = typename operator_traits::result_type; chain(const TStrategy& strategy, const TStrategies&... strategies) @@ -41,20 +42,24 @@ class chain { void subscribe(Observer&& observer) const { [[maybe_unused]] const auto drain_on_exit = own_current_thread_if_needed(); - if constexpr (rpp::constraint::operator_lift_with_disposable_strategy< + if constexpr (rpp::constraint::operator_lift_with_disposables_strategy< TStrategy, typename base::value_type, - typename base::expected_disposable_strategy>) - m_strategies.subscribe(m_strategy.template lift_with_disposable_strategy< + typename base::optimal_disposables_strategy>) + m_strategies.subscribe(m_strategy.template lift_with_disposables_strategy< typename base::value_type, - typename base::expected_disposable_strategy>( + typename base::optimal_disposables_strategy>( std::forward(observer))); else if constexpr (rpp::constraint::operator_lift< TStrategy, typename base::value_type>) m_strategies.subscribe( m_strategy.template lift( std::forward(observer))); - else + else { + static_assert( + rpp::constraint::operator_subscribe); m_strategy.subscribe(std::forward(observer), m_strategies); + } } private: @@ -74,8 +79,8 @@ class chain { template class chain { public: - using expected_disposable_strategy = - rpp::details::observables::deduce_disposable_strategy_t; + using optimal_disposables_strategy = + typename TStrategy::optimal_disposables_strategy; using value_type = typename TStrategy::value_type; chain(const TStrategy& strategy) : m_strategy(strategy) {} diff --git a/symmetri/gui/rpp/rpp/observables/details/disposable_strategy.hpp b/symmetri/gui/rpp/rpp/observables/details/disposable_strategy.hpp deleted file mode 100644 index 6d021582..00000000 --- a/symmetri/gui/rpp/rpp/observables/details/disposable_strategy.hpp +++ /dev/null @@ -1,126 +0,0 @@ -// ReactivePlusPlus library -// -// Copyright Aleksey Loginov 2023 - present. -// Distributed under the Boost Software License, Version 1.0. -// (See accompanying file LICENSE_1_0.txt or copy at -// https://www.boost.org/LICENSE_1_0.txt) -// -// Project home: https://github.com/victimsnino/ReactivePlusPlus - -#pragma once - -#include -#include - -namespace rpp::details::observables { -enum class AtomicMode { NonAtomic = 0, Atomic = 1 }; - -template -using deduce_atomic_bool = - std::conditional_t; - -template -struct dynamic_disposable_strategy_selector { - template - using add = - dynamic_disposable_strategy_selector; - - using disposable_container = - disposables::dynamic_disposables_container; - using disposable_strategy = - observers::dynamic_local_disposable_strategy>; -}; - -template -using atomic_dynamic_disposable_strategy_selector = - dynamic_disposable_strategy_selector; - -struct default_disposable_strategy_selector { - template - using add = default_disposable_strategy_selector; - - using disposable_container = dynamic_disposable_strategy_selector< - 0, AtomicMode::Atomic>::disposable_container; - using disposable_strategy = dynamic_disposable_strategy_selector< - 0, AtomicMode::Atomic>::disposable_strategy; -}; - -template -struct fixed_disposable_strategy_selector { - template - using add = fixed_disposable_strategy_selector; - - using disposable_container = disposables::static_disposables_container; - using disposable_strategy = - observers::static_local_disposable_strategy>; -}; - -template -struct fixed_disposable_strategy_selector<0, Mode> { - template - using add = fixed_disposable_strategy_selector; - - using disposable_container = - default_disposable_strategy_selector::disposable_container; - using disposable_strategy = - observers::bool_local_disposable_strategy>; -}; - -template -using atomic_fixed_disposable_strategy_selector = - fixed_disposable_strategy_selector; - -using bool_disposable_strategy_selector = - fixed_disposable_strategy_selector<0, AtomicMode::NonAtomic>; -using atomic_bool_disposable_strategy_selector = - fixed_disposable_strategy_selector<0, AtomicMode::Atomic>; - -namespace details { -template -concept has_expected_disposable_strategy = - requires { typename T::expected_disposable_strategy; }; - -template -consteval auto* deduce_disposable_strategy() { - if constexpr (has_expected_disposable_strategy) - return static_cast(nullptr); - else - return static_cast(nullptr); -} - -template -concept has_updated_disposable_strategy = - requires { typename T::template updated_disposable_strategy; }; - -template -consteval auto* deduce_updated_disposable_strategy() { - if constexpr (has_updated_disposable_strategy) - return static_cast*>( - nullptr); - else - return static_cast(nullptr); -} -} // namespace details - -template -using deduce_disposable_strategy_t = - std::remove_pointer_t())>; - -template -using deduce_updated_disposable_strategy = std::remove_pointer_t< - decltype(details::deduce_updated_disposable_strategy())>; - -namespace constraint { -template -concept disposable_strategy = requires(const T&) { - typename T::template add; - typename T::disposable_strategy; - typename T::disposable_container; - requires observers::constraint::disposable_strategy< - typename T::disposable_strategy>; -}; -} // namespace constraint -} // namespace rpp::details::observables diff --git a/symmetri/gui/rpp/rpp/observables/details/disposables_strategy.hpp b/symmetri/gui/rpp/rpp/observables/details/disposables_strategy.hpp new file mode 100644 index 00000000..82c23b12 --- /dev/null +++ b/symmetri/gui/rpp/rpp/observables/details/disposables_strategy.hpp @@ -0,0 +1,47 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#pragma once + +#include +#include + +namespace rpp::details::observables { +struct dynamic_disposables_strategy { + template + using add = dynamic_disposables_strategy; + + using disposables_container = disposables::dynamic_disposables_container; + using observer_disposables_strategy = observers::dynamic_disposables_strategy; +}; + +template +struct fixed_disposables_strategy { + template + using add = fixed_disposables_strategy; + + using disposables_container = + disposables::static_disposables_container; + using observer_disposables_strategy = + observers::static_disposables_strategy; +}; + +using default_disposables_strategy = dynamic_disposables_strategy; + +namespace constraint { +template +concept disposables_strategy = requires(const T&) { + typename T::template add; + typename T::observer_disposables_strategy; + typename T::disposables_container; + requires observers::constraint::disposables_strategy< + typename T::observer_disposables_strategy>; +}; +} // namespace constraint +} // namespace rpp::details::observables diff --git a/symmetri/gui/rpp/rpp/observables/dynamic_connectable_observable.hpp b/symmetri/gui/rpp/rpp/observables/dynamic_connectable_observable.hpp new file mode 100644 index 00000000..b6d1f6b8 --- /dev/null +++ b/symmetri/gui/rpp/rpp/observables/dynamic_connectable_observable.hpp @@ -0,0 +1,55 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#pragma once + +#include +#include + +namespace rpp { +template +class dynamic_connectable_observable final + : public connectable_observable< + rpp::dynamic_observable< + rpp::subjects::utils::extract_subject_type_t>, + Subject> { + public: + static_assert(rpp::constraint::subject); + + using base = connectable_observable< + rpp::dynamic_observable< + rpp::subjects::utils::extract_subject_type_t>, + Subject>; + + using base::base; + + template > + Strategy> + requires(!rpp::constraint::decayed_same_as< + Strategy, + rpp::dynamic_observable< + rpp::subjects::utils::extract_subject_type_t>>) + dynamic_connectable_observable( + const rpp::connectable_observable& original) + : dynamic_connectable_observable{original.as_dynamic_connectable()} {} + + template > + Strategy> + requires(!rpp::constraint::decayed_same_as< + Strategy, + rpp::dynamic_observable< + rpp::subjects::utils::extract_subject_type_t>>) + dynamic_connectable_observable( + rpp::connectable_observable&& original) + : dynamic_connectable_observable{ + std::move(original).as_dynamic_connectable()} {} +}; +} // namespace rpp diff --git a/symmetri/gui/rpp/rpp/observables/dynamic_observable.hpp b/symmetri/gui/rpp/rpp/observables/dynamic_observable.hpp index 05e98245..d7a947cf 100644 --- a/symmetri/gui/rpp/rpp/observables/dynamic_observable.hpp +++ b/symmetri/gui/rpp/rpp/observables/dynamic_observable.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include namespace rpp::details::observables { @@ -25,6 +26,8 @@ template class dynamic_strategy final { public: using value_type = Type; + using optimal_disposables_strategy = + rpp::details::observables::default_disposables_strategy; template Strategy> requires(!rpp::constraint::decayed_same_as +#include #include #include #include namespace rpp::constraint { +/** + * @concept observable_strategy + * @brief A concept that defines the requirements for an observable strategy. + * + * This concept ensures that a type `S` meets the following criteria: + * - It has a `subscribe` method that accepts observer of type `T` and returns + * `void`. + * - It defines a nested type `value_type` to represent the type of values + * emitted by the observable. + * - It defines a nested type `optimal_disposables_strategy` to define the + * optimal disposables strategy observer could/should use to handle current + * observable properly. + * + * @tparam S The type to be checked against the concept. + * @tparam T The type of the values emitted by the observable. + * + * @ingroup observables + */ template concept observable_strategy = requires( const S& strategy, rpp::details::observers::fake_observer&& observer) { { strategy.subscribe(std::move(observer)) } -> std::same_as; + typename S::value_type; + typename S::optimal_disposables_strategy; + requires rpp::details::observables::constraint::disposables_strategy< + typename S::optimal_disposables_strategy>; }; } // namespace rpp::constraint @@ -34,8 +56,10 @@ class blocking_strategy; template struct fake_strategy { using value_type = Type; + using optimal_disposables_strategy = + rpp::details::observables::fixed_disposables_strategy<0>; - static void subscribe(const auto&) {} + consteval static void subscribe(const auto&) {} }; } // namespace rpp::details::observables @@ -81,6 +105,14 @@ concept observables_of_same_type = rpp::utils::extract_observable_type_t> && ...); +/** + * @concept operator_subscribe + * @brief Simple operator defining logic how to subscribe passed observer to + * passed observable. In most cases it means operator have some custom logic + * over observable too, so, you need to have access to observable, for example, + * subscribe to observable multiple times. + * @ingroup operators + */ template concept operator_subscribe = requires(const Op& op, @@ -91,6 +123,12 @@ concept operator_subscribe = { op.subscribe(std::move(observer), chain) }; }; +/** + * @concept operator_lift + * @brief Accept downstream observer and return new upstream (of type Type) + * observer. + * @ingroup operators + */ template concept operator_lift = requires(const Op& op, @@ -101,32 +139,68 @@ concept operator_lift = } -> rpp::constraint::observer_of_type; }; +/** + * @concept operator_lift_with_disposables_strategy + * @brief Same as @link rpp::constraint::operator_lift @endlink but with custom + * disposables logic. For example, if you are manually create storage for + * disposables and want to do it optimal. + * @ingroup operators + */ + template -concept operator_lift_with_disposable_strategy = +concept operator_lift_with_disposables_strategy = requires(const Op& op, rpp::details::observers::fake_observer::template operator_traits::result_type>&& observer) { { - op.template lift_with_disposable_strategy( + op.template lift_with_disposables_strategy( std::move(observer)) } -> rpp::constraint::observer_of_type; }; -template -concept operator_chain = +template +concept has_operator_traits = requires() { + typename std::decay_t::template operator_traits; + typename std::decay_t::template operator_traits::result_type; +}; + +template +concept has_operator_disposables_strategy = requires() { - typename std::decay_t::template operator_traits; - typename std::decay_t::template operator_traits::result_type; + typename std::decay_t::template updated_optimal_disposables_strategy< + typename details::observables::default_disposables_strategy>; } && - details::observables::constraint::disposable_strategy< - details::observables::deduce_updated_disposable_strategy< - std::decay_t, typename details::observables::chain< - details::observables::fake_strategy>:: - expected_disposable_strategy>> && + details::observables::constraint::disposables_strategy< + typename std::decay_t:: + template updated_optimal_disposables_strategy< + typename details::observables::default_disposables_strategy>>; + +/** + * @concept operator_ + * @details Concept for any RPP-related operator: + * - operator should have type-traits: template sub-struct `operator_traits` + * where template typename is type of upstream. + * - Such and sub-struct should have `result_type` using to type of final + * observable/downstream observer + * - You can place any static_asserts to this `operator_traits` if you have + * some specific requiremenets + * - operator should have template using `updated_optimal_disposables_strategy` + * accepting `rpp::details::observables::disposbles_strategy` and returning new + * (updated) strategy to provide optimal behavior. For example, your operator + * could add+1 disposables to the strategy + * - operator should satisfy `rpp::constraint::operator_subscribe`, + * `rpp::constraint::operator_lift` or + * `rpp::constraint::operator_lift_with_disposables_strategy` + * + * @ingroup operators + */ +template +concept operator_ = + has_operator_traits && has_operator_disposables_strategy && (operator_subscribe, Type> || operator_lift, Type> || - operator_lift_with_disposable_strategy, Type, - DisposableStrategy>); + operator_lift_with_disposables_strategy, Type, + DisposableStrategy>); } // namespace rpp::constraint @@ -134,6 +208,9 @@ namespace rpp { template class dynamic_observable; +template +class dynamic_connectable_observable; + template Strategy> class blocking_observable; diff --git a/symmetri/gui/rpp/rpp/observables/observable.hpp b/symmetri/gui/rpp/rpp/observables/observable.hpp index 1100bbd3..3b62d74a 100644 --- a/symmetri/gui/rpp/rpp/observables/observable.hpp +++ b/symmetri/gui/rpp/rpp/observables/observable.hpp @@ -47,8 +47,8 @@ class observable { using value_type = Type; using strategy_type = Strategy; - using expected_disposable_strategy = - rpp::details::observables::deduce_disposable_strategy_t; + using optimal_disposables_strategy = + typename Strategy::optimal_disposables_strategy; template requires(!constraint::variadic_decayed_same_as, @@ -83,13 +83,17 @@ class observable { template ObserverStrategy> requires(!constraint::observer) void subscribe(ObserverStrategy&& observer_strategy) const { - if constexpr (details::observers::has_disposable_strategy) - subscribe(rpp::observer>{ - std::forward(observer_strategy)}); + if constexpr (std::decay_t::preferred_disposables_mode == + rpp::details::observers::disposables_mode::Auto) + subscribe( + rpp::observer, + typename optimal_disposables_strategy:: + observer_disposables_strategy>>{ + std::forward(observer_strategy)}); else - subscribe(rpp::observer_with_disposable< - Type, std::decay_t, - typename expected_disposable_strategy::disposable_strategy>{ + subscribe(rpp::observer>{ std::forward(observer_strategy)}); } @@ -107,7 +111,7 @@ class observable { * observer when it needed * * @par Example - * \code{.cpp} + * @code{.cpp} * auto disposable = rpp::composite_disposable_wrapper::make(); * rpp::source::just(1) * | rpp::operators::repeat() @@ -118,7 +122,7 @@ class observable { * std::this_thread::sleep_for(std::chrono::seconds(1)); * disposable.dispose(); * std::this_thread::sleep_for(std::chrono::seconds(1)); - * \endcode + * @endcode * */ template ObserverStrategy> @@ -127,7 +131,8 @@ class observable { observer&& obs) const { if (!d.is_disposed()) m_strategy.subscribe( - observer_with_disposable>{ + observer_with_external_disposable>{ d, std::move(obs)}); return d; } @@ -152,8 +157,9 @@ class observable { composite_disposable_wrapper subscribe( const composite_disposable_wrapper& d, ObserverStrategy&& observer_strategy) const { - subscribe(observer_with_disposable>{ - d, std::forward(observer_strategy)}); + subscribe( + observer_with_external_disposable>{ + d, std::forward(observer_strategy)}); return d; } @@ -178,8 +184,8 @@ class observable { if (!observer.is_disposed()) return subscribe(rpp::composite_disposable_wrapper::make< rpp::composite_disposable_impl< - typename expected_disposable_strategy:: - disposable_container>>(), + typename optimal_disposables_strategy:: + disposables_container>>(), std::move(observer)); return composite_disposable_wrapper::empty(); } @@ -202,7 +208,7 @@ class observable { subscribe_with_disposable(ObserverStrategy&& observer_strategy) const { return subscribe( rpp::composite_disposable_wrapper::make>(), + typename optimal_disposables_strategy::disposables_container>>(), std::forward(observer_strategy)); } @@ -225,7 +231,7 @@ class observable { subscribe_with_disposable(dynamic_observer observer) const { return subscribe>( rpp::composite_disposable_wrapper::make>(), + typename optimal_disposables_strategy::disposables_container>>(), std::move(observer)); } @@ -244,11 +250,12 @@ class observable { std::decay_t, std::decay_t>; - subscribe(observer_with_disposable< - Type, strategy, - typename expected_disposable_strategy::disposable_strategy>{ - std::forward(on_next), std::forward(on_error), - std::forward(on_completed)}); + subscribe( + observer>{ + std::forward(on_next), std::forward(on_error), + std::forward(on_completed)}); } /** @@ -283,7 +290,7 @@ class observable { OnCompleted&& on_completed = {}) const { auto res = rpp::composite_disposable_wrapper::make>(); + typename optimal_disposables_strategy::disposables_container>>(); subscribe(make_lambda_observer( res, std::forward(on_next), std::forward(on_error), std::forward(on_completed))); @@ -329,7 +336,7 @@ class observable { * observer when it needed * * @par Example - * \code{.cpp} + * @code{.cpp} * auto disposable = rpp::composite_disposable_wrapper::make(); * rpp::source::just(1) * | rpp::operators::repeat() @@ -340,7 +347,7 @@ class observable { * std::this_thread::sleep_for(std::chrono::seconds(1)); * disposable.dispose(); * std::this_thread::sleep_for(std::chrono::seconds(1)); - * \endcode + * @endcode * */ template OnNext, @@ -374,7 +381,7 @@ class observable { * observer when it needed * * @par Example - * \code{.cpp} + * @code{.cpp} * auto disposable = rpp::composite_disposable_wrapper::make(); * rpp::source::just(1) * | rpp::operators::repeat() @@ -385,7 +392,7 @@ class observable { * std::this_thread::sleep_for(std::chrono::seconds(1)); * disposable.dispose(); * std::this_thread::sleep_for(std::chrono::seconds(1)); - * \endcode + * @endcode * */ template OnNext, std::invocable<> OnCompleted> @@ -425,9 +432,13 @@ class observable { }) { using result_type = typename std::decay_t::template operator_traits< Type>::result_type; - return observable, Strategy>>{ - std::forward(op), m_strategy}; + if constexpr (requires { + typename std::decay_t::template operator_traits< + Type>::result_type; + }) // narrow compilataion error a bit + return observable, Strategy>>{ + std::forward(op), m_strategy}; } else { return std::forward(op)(*this); } diff --git a/symmetri/gui/rpp/rpp/observables/variant_observable.hpp b/symmetri/gui/rpp/rpp/observables/variant_observable.hpp index 8069c000..a2cbb78b 100644 --- a/symmetri/gui/rpp/rpp/observables/variant_observable.hpp +++ b/symmetri/gui/rpp/rpp/observables/variant_observable.hpp @@ -17,6 +17,9 @@ namespace rpp::details { template ... Observables> struct variant_observable_strategy { + using optimal_disposables_strategy = + rpp::details::observables::default_disposables_strategy; + using value_type = Type; template TT> requires(!constraint::decayed_same_as) diff --git a/symmetri/gui/rpp/rpp/observers.hpp b/symmetri/gui/rpp/rpp/observers.hpp index aa6fb6d7..51c0758a 100644 --- a/symmetri/gui/rpp/rpp/observers.hpp +++ b/symmetri/gui/rpp/rpp/observers.hpp @@ -12,14 +12,48 @@ /** * @defgroup observers Observers - * @brief Observer subscribes on Observable and obtains values provided by + * + * @details Observer subscribes on Observable and obtains values provided by * Observable. - * @details Observer is kind of wrapper over 3 core functions: - * - on_next(T) - callback with new emission provided by observable - * - on_error(err) - failure termination callback with reason of failure of + * + * In fact observer is kind of wrapper over 3 core functions: + * - `on_next(T)` - callback with new emission provided by observable + * - `on_error(err)` - failure termination callback with reason of failure of * observable (why observable can't continue processing) - * - on_completed() - succeed termination callback - observable is done, no any - * future emissions from this + * - `on_completed()` - succeed termination callback - observable is done, no + * any future emissions from this + * + * Additionally in RPP observer handles @link disposables @endlink related + * logic: + * - `set_upstream(disposable)` - observable could pass to observer it's own + * disposable to provide ability for observer to terminate observable's internal + * actions/state. + * - `is_disposed()` - observable could check if observer is still interested in + * emissions (`false`) or done and no any futher calls would be success (`true`) + * + * @par Observer creation: + * - **Observer creation inside subscribe:**
+ * RPP expects user to create observers only inside `subscribe` function of + * observables. Something like this: + * @code{.cpp} + * rpp::source::just(1).subscribe([](int){}, [](const std::exception_ptr&){}, + * [](){}); rpp::source::just(1) | rpp::operators::subscribe([](int){}, [](const + * std::exception_ptr&){}, [](){}); + * @endcode + * Some of the callbacks (on_next/on_error/on_completed) can be omitted. Check + * @link rpp::operators::subscribe @endlink for more details. + * + * - **Advanced observer creation:**
+ * Technically it is possible to create custom observer via creating new + * class/struct which satisfies concept @link rpp::constraint::observer_strategy + * @endlink, but it is **highly not-recommended for most cases**
Also + * *technically* you could create your observer via `make_lambda_observer` + * function, but it is not recommended too: it could disable some built-in + * optimizations and cause worse performance.
Also it is **most probably** + * bad pattern and invalid usage of RX if you want to keep/store observers as + * member variables/fields. Most probably you are doing something wrong IF you + * are not implementing custom observable/operator. + * * @see https://reactivex.io/documentation/observable.html * @ingroup rpp */ diff --git a/symmetri/gui/rpp/rpp/observers/details/disposable_strategy.hpp b/symmetri/gui/rpp/rpp/observers/details/disposable_strategy.hpp deleted file mode 100644 index 30db40c1..00000000 --- a/symmetri/gui/rpp/rpp/observers/details/disposable_strategy.hpp +++ /dev/null @@ -1,106 +0,0 @@ -// ReactivePlusPlus library -// -// Copyright Aleksey Loginov 2022 - present. -// Distributed under the Boost Software License, Version 1.0. -// (See accompanying file LICENSE_1_0.txt or copy at -// https://www.boost.org/LICENSE_1_0.txt) -// -// Project home: https://github.com/victimsnino/ReactivePlusPlus -// - -#pragma once - -#include - -#include -#include - -#include - -namespace rpp::details::observers -{ - class atomic_bool - { - public: - atomic_bool() = default; - atomic_bool(atomic_bool&& other) noexcept - // just need atomicity, not guarding anything - : m_value{other.m_value.load(std::memory_order::seq_cst)} - { - } - - bool test() const noexcept - { - // just need atomicity, not guarding anything - return m_value.load(std::memory_order::seq_cst); - } - - void set() noexcept - { - // just need atomicity, not guarding anything - m_value.store(true, std::memory_order::seq_cst); - } - - private: - std::atomic_bool m_value{}; - }; - - class non_atomic_bool - { - public: - non_atomic_bool() = default; - non_atomic_bool(non_atomic_bool&& other) noexcept = default; - - bool test() const noexcept - { - return m_value; - } - - void set() noexcept - { - m_value = true; - } - - private: - bool m_value{}; - }; - - template Bool> - class local_disposable_strategy - { - public: - local_disposable_strategy() = default; - local_disposable_strategy(local_disposable_strategy&& other) noexcept = default; - - void add(const disposable_wrapper& d) - { - m_upstreams.push_back(d); - } - - bool is_disposed() const noexcept - { - // just need atomicity, not guarding anything - return m_is_disposed.test(); - } - - void dispose() const - { - // just need atomicity, not guarding anything - m_is_disposed.set(); - m_upstreams.dispose(); - } - - private: - RPP_NO_UNIQUE_ADDRESS DisposableContainer m_upstreams{}; - mutable Bool m_is_disposed{}; - }; - - struct none_disposable_strategy - { - static void add(const rpp::disposable_wrapper&) {} - - static bool is_disposed() noexcept { return false; } - - static void dispose() {} - }; -} // namespace rpp::details::observers diff --git a/symmetri/gui/rpp/rpp/observers/details/disposables_strategy.hpp b/symmetri/gui/rpp/rpp/observers/details/disposables_strategy.hpp new file mode 100644 index 00000000..3aec8cde --- /dev/null +++ b/symmetri/gui/rpp/rpp/observers/details/disposables_strategy.hpp @@ -0,0 +1,59 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include +#include +#include +#include + +namespace rpp::details::observers { +template +class local_disposables_strategy { + public: + local_disposables_strategy() = default; + local_disposables_strategy(local_disposables_strategy&& other) noexcept = + default; + + void add(const disposable_wrapper& d) { m_upstreams.push_back(d); } + + bool is_disposed() const noexcept { return m_is_disposed; } + + void dispose() const { + m_is_disposed = true; + m_upstreams.dispose(); + } + + private: + RPP_NO_UNIQUE_ADDRESS DisposableContainer m_upstreams{}; + mutable bool m_is_disposed{}; +}; + +struct none_disposables_strategy { + static constexpr void add(const rpp::disposable_wrapper&) {} + + static constexpr bool is_disposed() noexcept { return false; } + + static constexpr void dispose() {} +}; + +class boolean_disposables_strategy { + public: + static constexpr void add(const rpp::disposable_wrapper&) {} + + bool is_disposed() const noexcept { return m_is_disposed; } + + void dispose() const { m_is_disposed = true; } + + private: + mutable bool m_is_disposed{}; +}; +} // namespace rpp::details::observers diff --git a/symmetri/gui/rpp/rpp/observers/details/fwd.hpp b/symmetri/gui/rpp/rpp/observers/details/fwd.hpp index 379b1386..b5b8cafb 100644 --- a/symmetri/gui/rpp/rpp/observers/details/fwd.hpp +++ b/symmetri/gui/rpp/rpp/observers/details/fwd.hpp @@ -15,75 +15,89 @@ #include namespace rpp::details::observers { -class atomic_bool; -class non_atomic_bool; +enum class disposables_mode : uint8_t { + // Let observer deduce disposables mode + Auto = 0, + // No any disposables logic for observer expected + None = 1, + // Use external (passed to constructor) composite_disposable_wrapper as + // disposable + External = 2, + // Observer just controls is_disposed or not but upstreams handled via + // observer_strategy + Boolean = 3 +}; -template Bool> -class local_disposable_strategy; +namespace constraint { +template +concept disposables_strategy = + requires(T& v, const T& const_v, const disposable_wrapper& d) { + v.add(d); + { const_v.is_disposed() } -> std::same_as; + const_v.dispose(); + }; +} // namespace constraint + +template +class local_disposables_strategy; /** * @brief No any disposable logic at all. Used only inside proxy-forwarding * operators where extra disposable logic not requires */ -struct none_disposable_strategy; - +struct none_disposables_strategy; /** - * @brief Dynamic disposable logic based on pre-allocated vector + * @brief Just control is_disposed or not via boolean and ignore upstreams at + * all */ -template Bool> -using dynamic_local_disposable_strategy = - local_disposable_strategy, - Bool>; +class boolean_disposables_strategy; /** - * @brief Same as dynamic strategy, but based on array. + * @brief Keep disposables inside dynamic_disposables_container container (based + * on std::vector) */ -template Bool> -using static_local_disposable_strategy = - local_disposable_strategy, - Bool>; +using dynamic_disposables_strategy = + local_disposables_strategy; /** - * @brief Just an boolean with no any disposables + * @brief Keep disposables inside static_disposables_container container (based + * on std::array) */ -template Bool> -using bool_local_disposable_strategy = - local_disposable_strategy; +template +using static_disposables_strategy = local_disposables_strategy< + disposables::static_disposables_container>; -/** - * @brief External disposable used as strategy - */ -using external_disposable_strategy = composite_disposable_wrapper; - -namespace constraint { -template -concept disposable_strategy = - requires(T& v, const T& const_v, const disposable_wrapper& d) { - v.add(d); - { const_v.is_disposed() } -> std::same_as; - const_v.dispose(); - }; -} // namespace constraint - -template -concept has_disposable_strategy = - requires { typename T::preferred_disposable_strategy; }; +using default_disposables_strategy = dynamic_disposables_strategy; namespace details { -template -consteval auto* deduce_disposable_strategy() { - if constexpr (has_disposable_strategy) - return static_cast(nullptr); +template +consteval auto* deduce_optimal_disposables_strategy() { + static_assert( + mode == disposables_mode::Auto || mode == disposables_mode::None || + mode == disposables_mode::External || mode == disposables_mode::Boolean); + +#if defined(RPP_DISABLE_DISPOSABLES_OPTIMIZATION) and \ + RPP_DISABLE_DISPOSABLES_OPTIMIZATION + if constexpr (mode == disposables_mode::External) + return static_cast(nullptr); + else + return static_cast(nullptr); +#else + if constexpr (mode == disposables_mode::Auto) + return static_cast(nullptr); + else if constexpr (mode == disposables_mode::None) + return static_cast(nullptr); + else if constexpr (mode == disposables_mode::External) + return static_cast(nullptr); + else if constexpr (mode == disposables_mode::Boolean) + return static_cast(nullptr); else - return static_cast*>( - nullptr); + return static_cast(nullptr); +#endif } } // namespace details -template -using deduce_disposable_strategy_t = - std::remove_pointer_t())>; +template +using deduce_optimal_disposables_strategy_t = std::remove_pointer_t< + decltype(details::deduce_optimal_disposables_strategy())>; } // namespace rpp::details::observers diff --git a/symmetri/gui/rpp/rpp/observers/dynamic_observer.hpp b/symmetri/gui/rpp/rpp/observers/dynamic_observer.hpp index 5e835b7b..563c1c5b 100644 --- a/symmetri/gui/rpp/rpp/observers/dynamic_observer.hpp +++ b/symmetri/gui/rpp/rpp/observers/dynamic_observer.hpp @@ -95,6 +95,9 @@ class type_erased_observer template class dynamic_strategy final { public: + static constexpr auto preferred_disposables_mode = + rpp::details::observers::disposables_mode::None; + template Strategy> requires( !rpp::constraint::decayed_same_as>) diff --git a/symmetri/gui/rpp/rpp/observers/fwd.hpp b/symmetri/gui/rpp/rpp/observers/fwd.hpp index b7ecd2e0..b46f1add 100644 --- a/symmetri/gui/rpp/rpp/observers/fwd.hpp +++ b/symmetri/gui/rpp/rpp/observers/fwd.hpp @@ -33,22 +33,52 @@ concept observer_strategy_base = }; /** - * @brief Concept to define strategy to override observer behavior. Strategy - * must be able to handle all observer's callbacks: - * on_next/on_error/on_completed + * @concept observer_strategy + * @brief Concept defines requirements for an user-defined observer strategy. * * @tparam S is Strategy * @tparam Type is type of value observer would obtain * + * @details Strategy should be able to handle: + * - on_next for both: const lvalue ref and rvalue ref of Type + * - on_error(exception_ptr) for unsuccessful termination event + * - on_completed() for successful termination event + * - set_upstream(disposable) for custom disposables related logic. In most + * cases you should OR do nothing OR just forward disposable to downstream + * observer (and set preferred_disposables_mode to None) OR fully handle + * disposales related logic properly + * - is_disposed() for extending custom disposables related logic with + * indicating current status. + * - `static constexpr rpp::details::observers::disposables_mode + * preferred_disposables_mode` with preferred disposables logic for observer + * over this strategy + * * @ingroup observers */ template -concept observer_strategy = - observer_strategy_base && - requires(const S& const_strategy, const Type& v, Type& mv) { - const_strategy.on_next(v); - const_strategy.on_next(std::move(mv)); - }; +concept observer_strategy = observer_strategy_base && requires( + const S& + const_strategy, + const Type& v, + Type& mv) { + const_strategy.on_next(v); + const_strategy.on_next(std::move(mv)); + + // strategy has to provide it's preferred disposable mode: minimal level + // of disposable logic it could work with. if observer_strategy fully + // controls disposable logic or just forwards disposable to downstream + // observer: rpp::details::observers::disposables_mode::None if you not + // sure about this field - just use + // rpp::details::observers::disposables_mode::Auto + { + std::decay_t::preferred_disposables_mode + } + -> rpp::constraint::decayed_same_as< + rpp::details::observers:: + disposables_mode>; /* = + rpp::details::observers::disposables_mode::Auto + */ +}; } // namespace rpp::constraint namespace rpp::details::observers { @@ -59,36 +89,43 @@ template OnNext, std::invocable OnError, std::invocable<> OnCompleted> struct lambda_strategy; -} // namespace rpp::details::observers -namespace rpp::details { -template -struct with_disposable_strategy { - using preferred_disposable_strategy = Strategy; +template +struct override_disposables_strategy { + static constexpr auto preferred_disposables_mode = + rpp::details::observers::disposables_mode::Auto; - with_disposable_strategy() = delete; + override_disposables_strategy() = delete; - static void on_next(const auto&) noexcept; - static void on_error(const std::exception_ptr&) noexcept; - static void on_completed() noexcept; + consteval static void on_next(const auto&) noexcept {} + consteval static void on_error(const std::exception_ptr&) noexcept {} + consteval static void on_completed() noexcept {} - static void set_upstream(const disposable_wrapper&) noexcept; - static bool is_disposed() noexcept; + consteval static void set_upstream(const disposable_wrapper&) noexcept {} + consteval static bool is_disposed() noexcept { return false; } }; -} // namespace rpp::details - +} // namespace rpp::details::observers namespace rpp { template Strategy> class observer; +/* + * @brief Same as rpp::observer, but with passed + * rpp::composite_disposable_wrapper to constructor instead (as a result, it's + * possible to dispose observer early outside) + * @ingroup observers + */ template Strategy, - rpp::details::observers::constraint::disposable_strategy - DisposableStrategy = - rpp::details::observers::external_disposable_strategy> -using observer_with_disposable = observer< - Type, rpp::details::with_disposable_strategy>; + constraint::observer_strategy Strategy> +using observer_with_external_disposable = + observer>>; template class dynamic_observer; @@ -113,12 +150,19 @@ using lambda_observer = observer>; +/* + * @brief Same as rpp::lambda_observer, but with passed + * rpp::composite_disposable_wrapper to constructor instead (as a result, it's + * possible to dispose observer early outside) + * @ingroup observers + */ template OnNext, std::invocable OnError, std::invocable<> OnCompleted> -using lambda_observer_with_disposable = - observer_with_disposable>; +using lambda_observer_with_external_disposable = + observer_with_external_disposable< + Type, details::observers::lambda_strategy>; /** * @brief Constructs observer specialized with passed callbacks. Most easiesest @@ -159,9 +203,9 @@ template OnNext, auto make_lambda_observer(const rpp::composite_disposable_wrapper& d, OnNext&& on_next, OnError&& on_error = {}, OnCompleted&& on_completed = {}) - -> lambda_observer_with_disposable, - std::decay_t, - std::decay_t>; + -> lambda_observer_with_external_disposable, + std::decay_t, + std::decay_t>; /** * @brief Constructs observer specialized with passed callbacks. Most easiesest @@ -219,8 +263,8 @@ auto make_lambda_observer(const rpp::composite_disposable_wrapper& d, namespace rpp::details::observers { struct fake_strategy { - using preferred_disposable_strategy = - rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = + rpp::details::observers::disposables_mode::None; static void on_next(const auto&) noexcept {} diff --git a/symmetri/gui/rpp/rpp/observers/lambda_observer.hpp b/symmetri/gui/rpp/rpp/observers/lambda_observer.hpp index 411057d4..e5c0ae79 100644 --- a/symmetri/gui/rpp/rpp/observers/lambda_observer.hpp +++ b/symmetri/gui/rpp/rpp/observers/lambda_observer.hpp @@ -14,73 +14,60 @@ #include #include -namespace rpp::details::observers -{ - template OnNext, - std::invocable OnError, - std::invocable<> OnCompleted> - struct lambda_strategy - { - template TOnNext, rpp::constraint::decayed_same_as TOnError, rpp::constraint::decayed_same_as TOnCompleted> - explicit lambda_strategy(TOnNext&& on_next, TOnError&& on_error, TOnCompleted&& on_completed) - : on_next{std::forward(on_next)} - , on_error{std::forward(on_error)} - , on_completed{std::forward(on_completed)} - { - } +namespace rpp::details::observers { +template OnNext, + std::invocable OnError, + std::invocable<> OnCompleted> +struct lambda_strategy { + static constexpr auto preferred_disposables_mode = + rpp::details::observers::disposables_mode::Auto; - RPP_NO_UNIQUE_ADDRESS OnNext on_next{}; - RPP_NO_UNIQUE_ADDRESS OnError on_error{}; - RPP_NO_UNIQUE_ADDRESS OnCompleted on_completed{}; + template TOnNext, + rpp::constraint::decayed_same_as TOnError, + rpp::constraint::decayed_same_as TOnCompleted> + explicit lambda_strategy(TOnNext&& on_next, TOnError&& on_error, + TOnCompleted&& on_completed) + : on_next{std::forward(on_next)}, + on_error{std::forward(on_error)}, + on_completed{std::forward(on_completed)} {} - static void set_upstream(const disposable_wrapper&) noexcept {} + RPP_NO_UNIQUE_ADDRESS OnNext on_next{}; + RPP_NO_UNIQUE_ADDRESS OnError on_error{}; + RPP_NO_UNIQUE_ADDRESS OnCompleted on_completed{}; - static bool is_disposed() noexcept { return false; } - }; -} // namespace rpp::details::observers + static void set_upstream(const disposable_wrapper&) noexcept {} -namespace rpp -{ - template OnNext, - std::invocable OnError, - std::invocable<> OnCompleted> - auto make_lambda_observer(OnNext&& on_next, - OnError&& on_error, - OnCompleted&& on_completed) -> lambda_observer, - std::decay_t, - std::decay_t> - { - return lambda_observer, - std::decay_t, - std::decay_t>{ - std::forward(on_next), - std::forward(on_error), - std::forward(on_completed)}; - } + static bool is_disposed() noexcept { return false; } +}; +} // namespace rpp::details::observers - template OnNext, - std::invocable OnError, - std::invocable<> OnCompleted> - auto make_lambda_observer(const rpp::composite_disposable_wrapper& d, - OnNext&& on_next, - OnError&& on_error, - OnCompleted&& on_completed) -> lambda_observer_with_disposable, - std::decay_t, - std::decay_t> - { - return lambda_observer_with_disposable, - std::decay_t, - std::decay_t>{ - d, - std::forward(on_next), - std::forward(on_error), - std::forward(on_completed)}; - } -} // namespace rpp +namespace rpp { +template OnNext, + std::invocable OnError, + std::invocable<> OnCompleted> +auto make_lambda_observer(OnNext&& on_next, OnError&& on_error, + OnCompleted&& on_completed) + -> lambda_observer, std::decay_t, + std::decay_t> { + return lambda_observer, std::decay_t, + std::decay_t>{ + std::forward(on_next), std::forward(on_error), + std::forward(on_completed)}; +} + +template OnNext, + std::invocable OnError, + std::invocable<> OnCompleted> +auto make_lambda_observer(const rpp::composite_disposable_wrapper& d, + OnNext&& on_next, OnError&& on_error, + OnCompleted&& on_completed) + -> lambda_observer_with_external_disposable, + std::decay_t, + std::decay_t> { + return lambda_observer_with_external_disposable, + std::decay_t, + std::decay_t>{ + d, std::forward(on_next), std::forward(on_error), + std::forward(on_completed)}; +} +} // namespace rpp diff --git a/symmetri/gui/rpp/rpp/observers/mock_observer.hpp b/symmetri/gui/rpp/rpp/observers/mock_observer.hpp index 405d30dc..a83874ad 100644 --- a/symmetri/gui/rpp/rpp/observers/mock_observer.hpp +++ b/symmetri/gui/rpp/rpp/observers/mock_observer.hpp @@ -17,6 +17,9 @@ template class mock_observer_strategy final { public: + static constexpr auto preferred_disposables_mode = + rpp::details::observers::disposables_mode::Auto; + explicit mock_observer_strategy(bool copy_values = true) : m_state{std::make_shared(copy_values)} {} @@ -58,7 +61,8 @@ class mock_observer_strategy final { return rpp::observer>{*this}; } auto get_observer(rpp::composite_disposable_wrapper d) const { - return rpp::observer_with_disposable>{ + return rpp::observer_with_external_disposable>{ std::move(d), *this}; } diff --git a/symmetri/gui/rpp/rpp/observers/observer.hpp b/symmetri/gui/rpp/rpp/observers/observer.hpp index 70645e29..822fbd41 100644 --- a/symmetri/gui/rpp/rpp/observers/observer.hpp +++ b/symmetri/gui/rpp/rpp/observers/observer.hpp @@ -14,7 +14,7 @@ #include #include #include -#include +#include #include #include #include @@ -23,7 +23,7 @@ namespace rpp::details { template Strategy, - observers::constraint::disposable_strategy DisposablesStrategy> + observers::constraint::disposables_strategy DisposablesStrategy> class observer_impl { protected: template @@ -33,7 +33,8 @@ class observer_impl { m_disposable{std::move(strategy)} {} public: - using preferred_disposable_strategy = observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = + rpp::details::observers::disposables_mode::None; using on_next_lvalue = void (observer_impl::*)(const Type&) const noexcept; using on_next_rvalue = void (observer_impl::*)(Type&&) const noexcept; @@ -141,9 +142,9 @@ namespace rpp { * @warning If you are passing disposable to ctor, then state of this disposable * would be used used (if empty disposable or disposed -> observer is disposed * by default) - * @warning It is expected, that member of this observer would be called in - * SERIAL way. It means, no any parallel calls allowed, only serial ones from - * one observable. + * @warning It is expected that members of this observer are called in a SERIAL + * manner. This means no parallel or concurrent calls are allowed—only serial + * calls or those guarded under a lock. * * @tparam Type of value this observer can handle * @tparam Strategy used to provide logic over observer's callbacks @@ -159,13 +160,13 @@ template > { + details::observers::deduce_optimal_disposables_strategy_t< + Strategy::preferred_disposables_mode>> { public: using DisposableStrategy = - details::observers::deduce_disposable_strategy_t; - using Base = details::observer_impl< - Type, Strategy, - details::observers::deduce_disposable_strategy_t>; + details::observers::deduce_optimal_disposables_strategy_t< + Strategy::preferred_disposables_mode>; + using Base = details::observer_impl; template requires constraint::is_constructible_from @@ -190,11 +191,12 @@ class observer final } }; -template < - constraint::decayed_type Type, constraint::observer_strategy Strategy, - rpp::details::observers::constraint::disposable_strategy DisposableStrategy> -class observer> +template Strategy, + rpp::details::observers::constraint::disposables_strategy + DisposableStrategy> +class observer> final : public details::observer_impl { public: using Base = details::observer_impl; @@ -226,7 +228,7 @@ template class observer> : public details::observer_impl< Type, rpp::details::observers::dynamic_strategy, - details::observers::none_disposable_strategy> { + details::observers::none_disposables_strategy> { public: template TStrategy> requires(!std::same_as> observer(observer&& other) : details::observer_impl, - details::observers::none_disposable_strategy>{ - details::observers::none_disposable_strategy{}, std::move(other)} {} + details::observers::none_disposables_strategy>{ + details::observers::none_disposables_strategy{}, + std::move(other)} {} dynamic_observer as_dynamic() && { return dynamic_observer{std::move(*this)}; diff --git a/symmetri/gui/rpp/rpp/operators.hpp b/symmetri/gui/rpp/rpp/operators.hpp index 015f3e8c..a0450353 100644 --- a/symmetri/gui/rpp/rpp/operators.hpp +++ b/symmetri/gui/rpp/rpp/operators.hpp @@ -12,18 +12,15 @@ /** * @defgroup operators Operators - * @brief Operators provide a way to modify observables and extend them with - * custom logic. - * @details By default, an observable emits values based on some underlying - * logic. For example, it might iterate over a vector and emit values. Operators - * allow you to make such a stream more complex, for example, by emitting only - * certain values, transforming them to strings, etc. As a result, you get - * another stream of different values, but more suitable for a specific case. + * @brief Operators modify observables and extend them with custom logic. + * @details Observables emit values based on underlying logic, such as iterating + * over a vector and etc. Operators allow you to enhance this stream, for + * example, by filtering values, transforming them, etc., resulting in a more + * suitable stream for specific cases. * - * For example, you can create an observable to get characters from console - * input, continue until the '0' character is encountered, filter out non-letter - * characters, and send the remaining letters as uppercase to the observer. With - * operators, this is straightforward to implement correctly: + * Example: Create an observable to read characters from console input, continue + * until '0' is encountered, filter out non-letter characters, and send the + * remaining letters as uppercase to the observer: * * @code{.cpp} * #include @@ -45,9 +42,75 @@ * } * @endcode * - * Check the [API - * Reference](https://victimsnino.github.io/ReactivePlusPlus/v2/docs/html/group__operators.html) - * for more details about operators. + * @par How operators work and how to create your own? + * Example: + * + * @code{cpp} + * rpp::source::create([](const auto& observer){ + * observer.on_next(1); + * observer.on_completed(); + * }); + * @endcode + * + * This example creates an observable of `int` using the `create` operator, + * which emits the value `1` and then completes. The type of this observable is + * `rpp::observable`, where `...` is an implementation-defined type. + * To convert `int` to `std::string`, you can use the `map` operator: + * + * @code{cpp} + * rpp::source::create([](const auto& observer){ + * observer.on_next(1); + * observer.on_completed(); + * }) + * | rpp::operators::map([](int v){ return std::to_string(v); }); + * @endcode + * + * Now it is an `observable of strings` (`rpp::observable`). + * The `map` operator is a functor-adaptor that accepts an observable and + * returns another observable. It transforms the original observable's type to + * the "final type" by invoking the passed function. In this case, the final + * type is `std::string`. The `map` operator can be implemented in multiple + * ways: + * + * 1) call-based (function/functor or others) - operator accepts (old) + * observable and returns new (modified) observable + * @code{cpp} + * template + * struct map + * { + * Fn fn{}; + * + * template + * auto operator()(const rpp::observable& observable) const { + * using FinalType = std::invoke_result_t; + * return rpp::source::create([observable, fn](const + * rpp::dynamic_observer& observer) + * { + * observable.subscribe([observer, fn](const auto& v) { + * observer.on_next(fn(v)); }, [observer](const std::exception_ptr& err) { + * observer.on_error(err); }, [observer]() { observer.on_completed(); }); + * }); + * } + * } + * @endcode + * It is template for such an functor-adaptor. It is also fully valid example of + * call-based operator: + * @code{cpp} + * rpp::source::just(1) + * | [](const auto& observable) { return rpp::source::concat(observable, + * rpp::source::just(2)); }; + * @endcode + * This converts the observable to a concatenation of the original observable + * and `just(2)`. + * + * 2) type-traits based - should satisfy @link rpp::constraint::operator_ + * @endlink concept.
For example, you can implement such an operator like + * this: + * @snippet readme.cpp simple_custom_map + * But in this case you are missing disposables-related functionality. + * So, it is better to implement it via providing custom observer's strategy + * with correct handling of disposables. Check real @link rpp::operators::map + * @endlink implementation for it =) * * @see https://reactivex.io/documentation/operators.html * @ingroup rpp @@ -131,6 +194,7 @@ #include #include #include +#include #include #include #include diff --git a/symmetri/gui/rpp/rpp/operators/buffer.hpp b/symmetri/gui/rpp/rpp/operators/buffer.hpp index ef665c49..b79ea299 100644 --- a/symmetri/gui/rpp/rpp/operators/buffer.hpp +++ b/symmetri/gui/rpp/rpp/operators/buffer.hpp @@ -23,8 +23,8 @@ class buffer_observer_strategy { static_assert(std::same_as>); public: - using preferred_disposable_strategy = - rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = + rpp::details::observers::disposables_mode::None; buffer_observer_strategy(TObserver&& observer, size_t count) : m_observer{std::move(observer)} { @@ -72,8 +72,8 @@ struct buffer_t : lift_operator { using observer_strategy = buffer_observer_strategy; }; - template - using updated_disposable_strategy = Prev; + template + using updated_optimal_disposables_strategy = Prev; }; } // namespace rpp::operators::details diff --git a/symmetri/gui/rpp/rpp/operators/combine_latest.hpp b/symmetri/gui/rpp/rpp/operators/combine_latest.hpp index 05cfc448..4e73afed 100644 --- a/symmetri/gui/rpp/rpp/operators/combine_latest.hpp +++ b/symmetri/gui/rpp/rpp/operators/combine_latest.hpp @@ -18,10 +18,11 @@ namespace rpp::operators::details { template -class combine_latest_state final : public combining_state { +class combine_latest_disposable final : public combining_disposable { public: - explicit combine_latest_state(Observer&& observer, const TSelector& selector) - : combining_state(std::move(observer), sizeof...(Args)), + explicit combine_latest_disposable(Observer&& observer, + const TSelector& selector) + : combining_disposable(std::move(observer), sizeof...(Args)), m_selector(selector) {} const auto& get_selector() const { return m_selector; } @@ -38,25 +39,26 @@ template struct combine_latest_observer_strategy final : public combining_observer_strategy< - combine_latest_state> { + combine_latest_disposable> { using combining_observer_strategy< - combine_latest_state>::state; + combine_latest_disposable>::disposable; template void on_next(T&& v) const { // mutex need to be locked during changing of values, generating new values // and sending of new values due to we can't update value while we are // sending old one - const auto observer = state->get_observer_under_lock(); - state->get_values().template get().emplace(std::forward(v)); + const auto observer = disposable->get_observer_under_lock(); + disposable->get_values().template get().emplace(std::forward(v)); - state->get_values().apply(&apply_impl, state, observer); + disposable->get_values().apply(&apply_impl, + disposable, observer); } private: - template + template static void apply_impl( - const TState& disposable, + const TDisposable& disposable, const rpp::utils::pointer_under_lock& observer, const std::optional&... vals) { if ((vals.has_value() && ...)) @@ -66,7 +68,7 @@ struct combine_latest_observer_strategy final template struct combine_latest_t - : public combining_operator_t {}; } // namespace rpp::operators::details diff --git a/symmetri/gui/rpp/rpp/operators/concat.hpp b/symmetri/gui/rpp/rpp/operators/concat.hpp index a88e8370..6033e7ff 100644 --- a/symmetri/gui/rpp/rpp/operators/concat.hpp +++ b/symmetri/gui/rpp/rpp/operators/concat.hpp @@ -10,9 +10,10 @@ #pragma once +#include #include #include -#include +#include #include #include #include @@ -31,14 +32,11 @@ enum class ConcatStage : uint8_t { template -class concat_state_t final : public std::enable_shared_from_this< - concat_state_t> { +class concat_disposable final : public rpp::details::base_disposable, + public rpp::details::enable_wrapper_from_this< + concat_disposable> { public: - concat_state_t(TObserver&& observer) : m_observer{std::move(observer)} { - const auto d = disposable_wrapper_impl::make(); - m_disposable = d.lock(); - get_observer()->set_upstream(d); - } + concat_disposable(TObserver&& observer) : m_observer{std::move(observer)} {} rpp::utils::pointer_under_lock get_observer() { return m_observer; @@ -46,49 +44,54 @@ class concat_state_t final : public std::enable_shared_from_this< rpp::utils::pointer_under_lock> get_queue() { return m_queue; } - const std::shared_ptr& get_disposable() const { - return m_disposable; - } std::atomic& stage() { return m_stage; } - void drain(rpp::composite_disposable_wrapper refcounted) { - while (!m_disposable->is_disposed()) { + void drain() { + while (!is_disposed()) { const auto observable = get_observable(); if (!observable) { stage().store(ConcatStage::None, std::memory_order::relaxed); - refcounted.dispose(); - if (m_disposable->is_disposed()) get_observer()->on_completed(); + if (get_base_child_disposable().is_disposed()) + get_observer()->on_completed(); return; } - if (handle_observable_impl(observable.value(), refcounted)) return; + if (handle_observable_impl(observable.value())) return; } } void handle_observable( - const rpp::constraint::decayed_same_as auto& observable, - rpp::composite_disposable_wrapper refcounted) { - if (handle_observable_impl(observable, refcounted)) return; + const rpp::constraint::decayed_same_as auto& observable) { + if (handle_observable_impl(observable)) return; - drain(refcounted); + drain(); + } + + rpp::composite_disposable& get_base_child_disposable() { + return m_child_disposables[0]; + } + rpp::composite_disposable& get_inner_child_disposable() { + return m_child_disposables[1]; } private: bool handle_observable_impl( - const rpp::constraint::decayed_same_as auto& observable, - rpp::composite_disposable_wrapper refcounted) { + const rpp::constraint::decayed_same_as auto& observable) { stage().store(ConcatStage::Draining, std::memory_order::relaxed); - refcounted.clear(); observable.subscribe(concat_inner_observer_strategy{ - this->shared_from_this(), std::move(refcounted)}); + disposable_wrapper_impl{this->wrapper_from_this()} + .lock()}); ConcatStage current = ConcatStage::Draining; return stage().compare_exchange_strong(current, ConcatStage::Processing, std::memory_order::seq_cst); } - private: + void base_dispose_impl(interface_disposable::Mode) noexcept override { + for (auto& d : m_child_disposables) d.dispose(); + } + std::optional get_observable() { auto queue = get_queue(); if (queue->empty()) return std::nullopt; @@ -98,93 +101,100 @@ class concat_state_t final : public std::enable_shared_from_this< } private: - std::shared_ptr m_disposable{}; rpp::utils::value_with_mutex m_observer; rpp::utils::value_with_mutex> m_queue; std::atomic m_stage{}; -}; -template -struct concat_observer_strategy_base { - concat_observer_strategy_base( - std::shared_ptr> state, - rpp::composite_disposable_wrapper refcounted) - : state{std::move(state)}, refcounted{std::move(refcounted)} {} - - concat_observer_strategy_base( - std::shared_ptr> state) - : concat_observer_strategy_base{ - state, state->get_disposable()->add_ref( - refcount_disposable::Mode::StrongRefRefSource)} {} - - std::shared_ptr> state; - rpp::composite_disposable_wrapper refcounted; - - void on_error(const std::exception_ptr& err) const { - state->get_observer()->on_error(err); - } - - void set_upstream(const disposable_wrapper& d) const { refcounted.add(d); } - - bool is_disposed() const { return refcounted.is_disposed(); } + std::array m_child_disposables{}; }; template -struct concat_inner_observer_strategy - : public concat_observer_strategy_base { - using base = concat_observer_strategy_base; +struct concat_inner_observer_strategy { + static constexpr auto preferred_disposables_mode = + rpp::details::observers::disposables_mode::Boolean; - using base::concat_observer_strategy_base; + std::shared_ptr> disposable{}; template void on_next(T&& v) const { - base::state->get_observer()->on_next(std::forward(v)); + disposable->get_observer()->on_next(std::forward(v)); + } + + void on_error(const std::exception_ptr& err) const { + disposable->get_observer()->on_error(err); } void on_completed() const { + disposable->get_inner_child_disposable().clear(); + ConcatStage current{ConcatStage::Draining}; - if (base::state->stage().compare_exchange_strong( + if (disposable->stage().compare_exchange_strong( current, ConcatStage::CompletedWhileDraining, std::memory_order::seq_cst)) return; assert(current == ConcatStage::Processing); - base::state->drain(base::refcounted); + disposable->drain(); + } + + void set_upstream(const disposable_wrapper& d) const { + disposable->get_inner_child_disposable().add(d); + } + + bool is_disposed() const { + return disposable->get_inner_child_disposable().is_disposed(); } }; template -struct concat_observer_strategy - : public concat_observer_strategy_base { - using base = concat_observer_strategy_base; - using preferred_disposable_strategy = - rpp::details::observers::none_disposable_strategy; +struct concat_observer_strategy { + static constexpr auto preferred_disposables_mode = + rpp::details::observers::disposables_mode::None; + + std::shared_ptr> disposable; concat_observer_strategy(TObserver&& observer) - : base{std::make_shared>( - std::move(observer))} {} + : disposable{init_state(std::move(observer))} {} template void on_next(T&& v) const { ConcatStage current = ConcatStage::None; - if (base::state->stage().compare_exchange_strong( + if (disposable->stage().compare_exchange_strong( current, ConcatStage::Draining, std::memory_order::seq_cst)) - base::state->handle_observable( - std::forward(v), - base::state->get_disposable()->add_ref( - refcount_disposable::Mode::StrongRefRefSource)); + disposable->handle_observable(std::forward(v)); else - base::state->get_queue()->push(std::forward(v)); + disposable->get_queue()->push(std::forward(v)); + } + + void on_error(const std::exception_ptr& err) const { + disposable->get_observer()->on_error(err); } void on_completed() const { - base::refcounted.dispose(); - if (base::state->get_disposable()->is_disposed()) - base::state->get_observer()->on_completed(); + disposable->get_base_child_disposable().dispose(); + if (disposable->stage() == ConcatStage::None) + disposable->get_observer()->on_completed(); + } + + void set_upstream(const disposable_wrapper& d) const { + disposable->get_base_child_disposable().add(d); + } + + bool is_disposed() const { + return disposable->get_base_child_disposable().is_disposed(); + } + + private: + static std::shared_ptr> init_state( + TObserver&& observer) { + const auto d = disposable_wrapper_impl< + concat_disposable>::make(std::move(observer)); + auto ptr = d.lock(); + ptr->get_observer()->set_upstream(d.as_weak()); + return ptr; } }; @@ -201,9 +211,9 @@ struct concat_t : lift_operator { using observer_strategy = concat_observer_strategy; }; - template - using updated_disposable_strategy = - rpp::details::observables::fixed_disposable_strategy_selector<1>; + template + using updated_optimal_disposables_strategy = + rpp::details::observables::fixed_disposables_strategy<1>; }; } // namespace rpp::operators::details diff --git a/symmetri/gui/rpp/rpp/operators/debounce.hpp b/symmetri/gui/rpp/rpp/operators/debounce.hpp index bec7a4c5..8633a26a 100644 --- a/symmetri/gui/rpp/rpp/operators/debounce.hpp +++ b/symmetri/gui/rpp/rpp/operators/debounce.hpp @@ -10,34 +10,43 @@ #pragma once +#include #include #include #include namespace rpp::operators::details { -template -class debounce_state; +template < + rpp::constraint::observer Observer, typename Worker, + rpp::details::disposables::constraint::disposables_container Container> +class debounce_disposable; -template -struct debounce_state_wrapper { - std::shared_ptr> state{}; +template < + rpp::constraint::observer Observer, typename Worker, + rpp::details::disposables::constraint::disposables_container Container> +struct debounce_disposable_wrapper { + std::shared_ptr> + disposable{}; - bool is_disposed() const { return state->is_disposed(); } + bool is_disposed() const { return disposable->is_disposed(); } void on_error(const std::exception_ptr& err) const { - state->get_observer_under_lock()->on_error(err); + disposable->get_observer_under_lock()->on_error(err); } }; -template -class debounce_state final : public rpp::details::enable_wrapper_from_this< - debounce_state>, - public rpp::details::base_disposable { +template < + rpp::constraint::observer Observer, typename Worker, + rpp::details::disposables::constraint::disposables_container Container> +class debounce_disposable final + : public rpp::composite_disposable_impl, + public rpp::details::enable_wrapper_from_this< + debounce_disposable> { using T = rpp::utils::extract_observer_type_t; public: - debounce_state(Observer&& in_observer, Worker&& in_worker, - rpp::schedulers::duration period) + debounce_disposable(Observer&& in_observer, Worker&& in_worker, + rpp::schedulers::duration period) : m_observer(std::move(in_observer)), m_worker{std::move(in_worker)}, m_period{period} {} @@ -67,20 +76,20 @@ class debounce_state final : public rpp::details::enable_wrapper_from_this< void schedule() { m_worker.schedule( m_time_when_value_should_be_emitted.value(), - [](const debounce_state_wrapper& handler) - -> schedulers::optional_delay_to { - auto value_or_duration = handler.state->extract_value_or_time(); + [](const debounce_disposable_wrapper& + handler) -> schedulers::optional_delay_to { + auto value_or_duration = handler.disposable->extract_value_or_time(); if (auto* timepoint = std::get_if(&value_or_duration)) return schedulers::optional_delay_to{*timepoint}; if (auto* value = std::get_if(&value_or_duration)) - handler.state->get_observer_under_lock()->on_next( + handler.disposable->get_observer_under_lock()->on_next( std::move(*value)); return std::nullopt; }, - debounce_state_wrapper{ + debounce_disposable_wrapper{ this->wrapper_from_this().lock()}); } @@ -109,31 +118,34 @@ class debounce_state final : public rpp::details::enable_wrapper_from_this< std::optional m_value_to_be_emitted{}; }; -template +template < + rpp::constraint::observer Observer, typename Worker, + rpp::details::disposables::constraint::disposables_container Container> struct debounce_observer_strategy { - using preferred_disposable_strategy = - rpp::details::observers::none_disposable_strategy; + static constexpr auto preferred_disposables_mode = + rpp::details::observers::disposables_mode::None; - std::shared_ptr> state{}; + std::shared_ptr> + disposable{}; void set_upstream(const rpp::disposable_wrapper& d) const { - state->get_observer_under_lock()->set_upstream(d); + disposable->add(d); } - bool is_disposed() const { return state->is_disposed(); } + bool is_disposed() const { return disposable->is_disposed(); } template void on_next(T&& v) const { - state->emplace_safe(std::forward(v)); + disposable->emplace_safe(std::forward(v)); } void on_error(const std::exception_ptr& err) const noexcept { - state->get_observer_under_lock()->on_error(err); + disposable->get_observer_under_lock()->on_error(err); } void on_completed() const noexcept { - const auto observer = state->get_observer_under_lock(); - if (const auto value = state->extract_value()) + const auto observer = disposable->get_observer_under_lock(); + if (const auto value = disposable->extract_value()) observer->on_next(std::move(value).value()); observer->on_completed(); } @@ -146,25 +158,30 @@ struct debounce_t { using result_type = T; }; - template - using updated_disposable_strategy = typename Prev::template add<1>; + template + using updated_optimal_disposables_strategy = + rpp::details::observables::fixed_disposables_strategy<1>; rpp::schedulers::duration duration; RPP_NO_UNIQUE_ADDRESS Scheduler scheduler; template - auto lift(Observer&& observer) const { + auto lift_with_disposables_strategy(Observer&& observer) const { using worker_t = rpp::schedulers::utils::get_worker_t; - - auto d = rpp::disposable_wrapper_impl< - debounce_state, - worker_t>>::make(std::forward(observer), - scheduler.create_worker(), duration); - auto ptr = d.lock(); - ptr->get_observer_under_lock()->set_upstream(d.as_weak()); - return rpp::observer< - Type, debounce_observer_strategy, worker_t>>{ + using container = typename DisposableStrategy::disposables_container; + + const auto disposable = disposable_wrapper_impl< + debounce_disposable, worker_t, container>>:: + make(std::forward(observer), scheduler.create_worker(), + duration); + auto ptr = disposable.lock(); + ptr->get_observer_under_lock()->set_upstream(disposable.as_weak()); + return rpp::observer, + worker_t, container>>{ std::move(ptr)}; } }; diff --git a/symmetri/gui/rpp/rpp/operators/delay.hpp b/symmetri/gui/rpp/rpp/operators/delay.hpp index 6a83e355..0892a24f 100644 --- a/symmetri/gui/rpp/rpp/operators/delay.hpp +++ b/symmetri/gui/rpp/rpp/operators/delay.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -27,12 +28,15 @@ struct emission { rpp::schedulers::time_point time_point{}; }; -template -struct delay_state final { +template < + rpp::constraint::observer Observer, typename Worker, + rpp::details::disposables::constraint::disposables_container Container> +struct delay_disposable final + : public rpp::composite_disposable_impl { using T = rpp::utils::extract_observer_type_t; - delay_state(Observer&& in_observer, Worker&& in_worker, - rpp::schedulers::duration delay) + delay_disposable(Observer&& in_observer, Worker&& in_worker, + rpp::schedulers::duration delay) : observer(std::move(in_observer)), worker{std::move(in_worker)}, delay{delay} {} @@ -46,64 +50,75 @@ struct delay_state final { bool is_active{}; }; -template -struct delay_state_wrapper { - std::shared_ptr> state{}; +template < + rpp::constraint::observer Observer, typename Worker, + rpp::details::disposables::constraint::disposables_container Container> +struct delay_disposable_wrapper { + std::shared_ptr> disposable{}; - bool is_disposed() const { return state->observer.is_disposed(); } + bool is_disposed() const { return disposable->is_disposed(); } void on_error(const std::exception_ptr& err) const { - state->observer.on_error(err); + disposable->observer.on_error(err); } }; -template +template < + rpp::constraint::observer Observer, typename Worker, + rpp::details::disposables::constraint::disposables_container Container, + bool ClearOnError> struct delay_observer_strategy { - std::shared_ptr> state{}; + static constexpr auto preferred_disposables_mode = + rpp::details::observers::disposables_mode::Boolean; + std::shared_ptr> disposable{}; void set_upstream(const rpp::disposable_wrapper& d) const { - state->observer.set_upstream(d); + disposable->add(d); } - bool is_disposed() const { return state->observer.is_disposed(); } + bool is_disposed() const { return disposable->is_disposed(); } template void on_next(T&& v) const { emplace(std::forward(v)); } - void on_error(const std::exception_ptr& err) const noexcept { emplace(err); } + void on_error(const std::exception_ptr& err) const noexcept { + emplace(err); + disposable->clear(); + } - void on_completed() const noexcept { emplace(rpp::utils::none{}); } + void on_completed() const noexcept { + emplace(rpp::utils::none{}); + disposable->clear(); + } private: template void emplace(TT&& value) const { if (const auto tp = emplace_safe(std::forward(value))) { - state->worker.schedule( + disposable->worker.schedule( tp.value(), - [](const delay_state_wrapper& wrapper) { - return drain_queue(wrapper.state); - }, - delay_state_wrapper{state}); + [](const delay_disposable_wrapper& + wrapper) { return drain_queue(wrapper.disposable); }, + delay_disposable_wrapper{disposable}); } } template std::optional emplace_safe(TT&& item) const { - std::lock_guard lock{state->mutex}; + std::lock_guard lock{disposable->mutex}; if constexpr (ClearOnError && rpp::constraint::decayed_same_as) { - state->queue = + disposable->queue = std::queue>>{}; - state->observer.on_error(std::forward(item)); + disposable->observer.on_error(std::forward(item)); return std::nullopt; } else { - const auto tp = state->worker.now() + state->delay; - state->queue.emplace(std::forward(item), tp); - if (!state->is_active) { - state->is_active = true; + const auto tp = disposable->worker.now() + disposable->delay; + disposable->queue.emplace(std::forward(item), tp); + if (!disposable->is_active) { + disposable->is_active = true; return tp; } return std::nullopt; @@ -111,31 +126,33 @@ struct delay_observer_strategy { } static schedulers::optional_delay_to drain_queue( - const std::shared_ptr>& state) { - while (true) { - std::unique_lock lock{state->mutex}; - if (state->queue.empty()) { - state->is_active = false; + const std::shared_ptr>& + disposable) { + for (bool just_schedule = false;; just_schedule = true) { + std::unique_lock lock{disposable->mutex}; + if (disposable->queue.empty()) { + disposable->is_active = false; return std::nullopt; } - auto& top = state->queue.front(); - if (top.time_point > state->worker.now()) + auto& top = disposable->queue.front(); + if (just_schedule || top.time_point > disposable->worker.now()) return schedulers::optional_delay_to{top.time_point}; auto item = std::move(top.value); - state->queue.pop(); + disposable->queue.pop(); lock.unlock(); - std::visit(rpp::utils::overloaded{ - [&](rpp::utils::extract_observer_type_t&& v) { - state->observer.on_next(std::move(v)); - }, - [&](const std::exception_ptr& err) { - state->observer.on_error(err); - }, - [&](rpp::utils::none) { state->observer.on_completed(); }}, - std::move(item)); + std::visit( + rpp::utils::overloaded{ + [&](rpp::utils::extract_observer_type_t&& v) { + disposable->observer.on_next(std::move(v)); + }, + [&](const std::exception_ptr& err) { + disposable->observer.on_error(err); + }, + [&](rpp::utils::none) { disposable->observer.on_completed(); }}, + std::move(item)); } } }; @@ -147,24 +164,30 @@ struct delay_t { using result_type = T; }; - template - using updated_disposable_strategy = Prev; + template + using updated_optimal_disposables_strategy = + rpp::details::observables::fixed_disposables_strategy<1>; rpp::schedulers::duration duration; RPP_NO_UNIQUE_ADDRESS Scheduler scheduler; template - auto lift(Observer&& observer) const { + auto lift_with_disposables_strategy(Observer&& observer) const { using worker_t = rpp::schedulers::utils::get_worker_t; - - auto state = - std::make_shared, worker_t>>( - std::forward(observer), scheduler.create_worker(), - duration); - return rpp::observer, - worker_t, ClearOnError>>{ - std::move(state)}; + using container = typename DisposableStrategy::disposables_container; + + const auto disposable = disposable_wrapper_impl< + delay_disposable, worker_t, + container>>::make(std::forward(observer), + scheduler.create_worker(), duration); + auto ptr = disposable.lock(); + ptr->observer.set_upstream(disposable.as_weak()); + return rpp::observer< + Type, delay_observer_strategy, worker_t, + container, ClearOnError>>{std::move(ptr)}; } }; } // namespace rpp::operators::details diff --git a/symmetri/gui/rpp/rpp/operators/details/combining_strategy.hpp b/symmetri/gui/rpp/rpp/operators/details/combining_strategy.hpp index cd9a07e6..975797cc 100644 --- a/symmetri/gui/rpp/rpp/operators/details/combining_strategy.hpp +++ b/symmetri/gui/rpp/rpp/operators/details/combining_strategy.hpp @@ -12,6 +12,7 @@ #include #include +#include #include #include #include @@ -19,11 +20,9 @@ namespace rpp::operators::details { template -class combining_state - : public rpp::details::enable_wrapper_from_this>, - public rpp::details::base_disposable { +class combining_disposable : public composite_disposable { public: - explicit combining_state(Observer&& observer, size_t on_completed_needed) + explicit combining_disposable(Observer&& observer, size_t on_completed_needed) : m_observer_with_mutex{std::move(observer)}, m_on_completed_needed{on_completed_needed} {} @@ -42,27 +41,31 @@ class combining_state std::atomic_size_t m_on_completed_needed; }; -template +template struct combining_observer_strategy { - std::shared_ptr state{}; + // `Auto` due to we have to dispose disposables during on_completed anyway + static constexpr auto preferred_disposables_mode = + rpp::details::observers::disposables_mode::Auto; + + std::shared_ptr disposable{}; void set_upstream(const rpp::disposable_wrapper& d) const { - state->get_observer_under_lock()->set_upstream(d); + disposable->add(d); } - bool is_disposed() const { return state->is_disposed(); } + bool is_disposed() const { return disposable->is_disposed(); } void on_error(const std::exception_ptr& err) const { - state->get_observer_under_lock()->on_error(err); + disposable->get_observer_under_lock()->on_error(err); } void on_completed() const { - if (state->decrement_on_completed()) - state->get_observer_under_lock()->on_completed(); + if (disposable->decrement_on_completed()) + disposable->get_observer_under_lock()->on_completed(); } }; -template