From b984a8d3a3333b0f28196d5b6011450dbc6353ba Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Wed, 14 Jan 2026 16:02:02 -0600 Subject: [PATCH 1/9] Separate flush messages from regular stores --- phlex/core/declared_provider.hpp | 1 - phlex/core/declared_transform.hpp | 2 -- phlex/core/declared_unfold.hpp | 1 - phlex/core/framework_graph.cpp | 20 ++++++++++++++++++++ phlex/core/framework_graph.hpp | 3 ++- phlex/core/message_sender.cpp | 6 +++--- phlex/core/message_sender.hpp | 9 ++++++--- phlex/core/multiplexer.cpp | 7 +------ phlex/core/store_counters.cpp | 6 ++++-- 9 files changed, 36 insertions(+), 19 deletions(-) diff --git a/phlex/core/declared_provider.hpp b/phlex/core/declared_provider.hpp index 2ef92c58..b81936eb 100644 --- a/phlex/core/declared_provider.hpp +++ b/phlex/core/declared_provider.hpp @@ -74,7 +74,6 @@ namespace phlex::experimental { if (msg.store->is_flush()) { flag_for(msg.store->id()->hash()).flush_received(msg.original_id); - stay_in_graph.try_put(msg); } else { // Check cache first auto index_hash = msg.store->id()->hash(); diff --git a/phlex/core/declared_transform.hpp b/phlex/core/declared_transform.hpp index 192ff468..ce241cfc 100644 --- a/phlex/core/declared_transform.hpp +++ b/phlex/core/declared_transform.hpp @@ -94,8 +94,6 @@ namespace phlex::experimental { auto& [stay_in_graph, to_output] = output; if (store->is_flush()) { flag_for(store->id()->hash()).flush_received(msg.original_id); - stay_in_graph.try_put(msg); - to_output.try_put(msg); } else { accessor a; if (stores_.insert(a, store->id()->hash())) { diff --git a/phlex/core/declared_unfold.hpp b/phlex/core/declared_unfold.hpp index 2c19d60a..86bfeab3 100644 --- a/phlex/core/declared_unfold.hpp +++ b/phlex/core/declared_unfold.hpp @@ -110,7 +110,6 @@ namespace phlex::experimental { auto const& store = msg.store; if (store->is_flush()) { flag_for(store->id()->hash()).flush_received(msg.id); - std::get<0>(output).try_put(msg); } else if (accessor a; stores_.insert(a, store->id()->hash())) { std::size_t const original_message_id{msg_counter_}; generator g{msg.store, this->full_name(), child_layer_name_}; diff --git a/phlex/core/framework_graph.cpp b/phlex/core/framework_graph.cpp index 17926921..d3b7797d 100644 --- a/phlex/core/framework_graph.cpp +++ b/phlex/core/framework_graph.cpp @@ -166,6 +166,26 @@ namespace phlex::experimental { nodes_.folds, nodes_.unfolds, nodes_.transforms); + + // Connect edges between all nodes and the flusher + auto connect_with_flusher = [this](auto& consumers) { + for (auto& n : consumers | std::views::values) { + if constexpr (requires { n->input_port(); }) { + make_edge(flusher_, *n->input_port()); + } else { + for (auto* p : n->ports()) { + make_edge(flusher_, *p); + } + } + } + }; + + connect_with_flusher(nodes_.folds); + connect_with_flusher(nodes_.observers); + connect_with_flusher(nodes_.predicates); + connect_with_flusher(nodes_.providers); + connect_with_flusher(nodes_.transforms); + connect_with_flusher(nodes_.unfolds); } product_store_ptr framework_graph::accept(product_store_ptr store) diff --git a/phlex/core/framework_graph.hpp b/phlex/core/framework_graph.hpp index ef824e32..dc064212 100644 --- a/phlex/core/framework_graph.hpp +++ b/phlex/core/framework_graph.hpp @@ -181,8 +181,9 @@ namespace phlex::experimental { std::vector registration_errors_{}; tbb::flow::input_node src_; multiplexer multiplexer_; + flusher_t flusher_{graph_}; std::stack eoms_; - message_sender sender_{hierarchy_, multiplexer_, eoms_}; + message_sender sender_{hierarchy_, flusher_, eoms_}; std::queue pending_stores_; flush_counters counters_; std::stack layers_; diff --git a/phlex/core/message_sender.cpp b/phlex/core/message_sender.cpp index 969ed1d7..4ab86db0 100644 --- a/phlex/core/message_sender.cpp +++ b/phlex/core/message_sender.cpp @@ -7,9 +7,9 @@ namespace phlex::experimental { message_sender::message_sender(data_layer_hierarchy& hierarchy, - multiplexer& mplexer, + flusher_t& flusher, std::stack& eoms) : - hierarchy_{hierarchy}, multiplexer_{mplexer}, eoms_{eoms} + hierarchy_{hierarchy}, flusher_{flusher}, eoms_{eoms} { } @@ -35,7 +35,7 @@ namespace phlex::experimental { assert(store->is_flush()); auto const message_id = ++calls_; message const msg{store, nullptr, message_id, original_message_id(store)}; - multiplexer_.try_put(std::move(msg)); + flusher_.try_put(std::move(msg)); } std::size_t message_sender::original_message_id(product_store_ptr const& store) diff --git a/phlex/core/message_sender.hpp b/phlex/core/message_sender.hpp index 8fb9abb5..5f583edc 100644 --- a/phlex/core/message_sender.hpp +++ b/phlex/core/message_sender.hpp @@ -3,18 +3,21 @@ #include "phlex/core/fwd.hpp" #include "phlex/core/message.hpp" -#include "phlex/core/multiplexer.hpp" #include "phlex/model/fwd.hpp" +#include "oneapi/tbb/flow_graph.h" + #include #include namespace phlex::experimental { + using flusher_t = tbb::flow::broadcast_node; + class message_sender { public: explicit message_sender(data_layer_hierarchy& hierarchy, - multiplexer& mplexer, + flusher_t& flusher, std::stack& eoms); void send_flush(product_store_ptr store); @@ -24,7 +27,7 @@ namespace phlex::experimental { std::size_t original_message_id(product_store_ptr const& store); data_layer_hierarchy& hierarchy_; - multiplexer& multiplexer_; + flusher_t& flusher_; std::stack& eoms_; std::map original_message_ids_; std::size_t calls_{}; diff --git a/phlex/core/multiplexer.cpp b/phlex/core/multiplexer.cpp index 9c60b7c8..f0da740a 100644 --- a/phlex/core/multiplexer.cpp +++ b/phlex/core/multiplexer.cpp @@ -56,12 +56,7 @@ namespace phlex::experimental { store->is_flush()); } - if (store->is_flush()) { - for (auto const& [_, port] : provider_input_ports_ | std::views::values) { - port->try_put(msg); - } - return {}; - } + assert(not store->is_flush()); auto start_time = steady_clock::now(); diff --git a/phlex/core/store_counters.cpp b/phlex/core/store_counters.cpp index d09cd4e8..98da8833 100644 --- a/phlex/core/store_counters.cpp +++ b/phlex/core/store_counters.cpp @@ -8,8 +8,8 @@ namespace phlex::experimental { void store_flag::flush_received(std::size_t const original_message_id) { - flush_received_ = true; original_message_id_ = original_message_id; + flush_received_ = true; } bool store_flag::is_complete() const noexcept { return processed_ and flush_received_; } @@ -21,7 +21,9 @@ namespace phlex::experimental { store_flag& detect_flush_flag::flag_for(data_cell_index::hash_type const hash) { flag_accessor fa; - flags_.emplace(fa, hash, std::make_unique()); + if (flags_.insert(fa, hash)) { + fa->second = std::make_unique(); + } return *fa->second; } From df3438b5602d79b6d49e2507c9797d1a0b608a62 Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Wed, 14 Jan 2026 16:21:59 -0600 Subject: [PATCH 2/9] Consolidate flush reception --- phlex/core/declared_observer.hpp | 2 +- phlex/core/declared_predicate.hpp | 2 +- phlex/core/declared_provider.hpp | 2 +- phlex/core/declared_transform.hpp | 2 +- phlex/core/declared_unfold.hpp | 2 +- phlex/core/fwd.hpp | 1 + phlex/core/store_counters.cpp | 7 +++++++ phlex/core/store_counters.hpp | 1 + 8 files changed, 14 insertions(+), 5 deletions(-) diff --git a/phlex/core/declared_observer.hpp b/phlex/core/declared_observer.hpp index 4d701bd9..d994061c 100644 --- a/phlex/core/declared_observer.hpp +++ b/phlex/core/declared_observer.hpp @@ -74,7 +74,7 @@ namespace phlex::experimental { auto const& msg = most_derived(messages); auto const& [store, message_id] = std::tie(msg.store, msg.id); if (store->is_flush()) { - flag_for(store->id()->hash()).flush_received(message_id); + receive_flush(msg); } else if (accessor a; needs_new(store, a)) { call(ft, messages, std::make_index_sequence{}); a->second = true; diff --git a/phlex/core/declared_predicate.hpp b/phlex/core/declared_predicate.hpp index 7d9efe69..49f56fba 100644 --- a/phlex/core/declared_predicate.hpp +++ b/phlex/core/declared_predicate.hpp @@ -81,7 +81,7 @@ namespace phlex::experimental { auto const& [store, message_id] = std::tie(msg.store, msg.id); predicate_result result{}; if (store->is_flush()) { - flag_for(store->id()->hash()).flush_received(message_id); + receive_flush(msg); } else if (const_accessor a; results_.find(a, store->id()->hash())) { result = {msg.eom, message_id, a->second.result}; } else if (accessor a; results_.insert(a, store->id()->hash())) { diff --git a/phlex/core/declared_provider.hpp b/phlex/core/declared_provider.hpp index b81936eb..86ea2432 100644 --- a/phlex/core/declared_provider.hpp +++ b/phlex/core/declared_provider.hpp @@ -73,7 +73,7 @@ namespace phlex::experimental { auto& [stay_in_graph, to_output] = output; if (msg.store->is_flush()) { - flag_for(msg.store->id()->hash()).flush_received(msg.original_id); + receive_flush(msg); } else { // Check cache first auto index_hash = msg.store->id()->hash(); diff --git a/phlex/core/declared_transform.hpp b/phlex/core/declared_transform.hpp index ce241cfc..5effae33 100644 --- a/phlex/core/declared_transform.hpp +++ b/phlex/core/declared_transform.hpp @@ -93,7 +93,7 @@ namespace phlex::experimental { std::tie(msg.store, msg.eom, msg.id); auto& [stay_in_graph, to_output] = output; if (store->is_flush()) { - flag_for(store->id()->hash()).flush_received(msg.original_id); + receive_flush(msg); } else { accessor a; if (stores_.insert(a, store->id()->hash())) { diff --git a/phlex/core/declared_unfold.hpp b/phlex/core/declared_unfold.hpp index 86bfeab3..4503489a 100644 --- a/phlex/core/declared_unfold.hpp +++ b/phlex/core/declared_unfold.hpp @@ -109,7 +109,7 @@ namespace phlex::experimental { auto const& msg = most_derived(messages); auto const& store = msg.store; if (store->is_flush()) { - flag_for(store->id()->hash()).flush_received(msg.id); + receive_flush(msg); } else if (accessor a; stores_.insert(a, store->id()->hash())) { std::size_t const original_message_id{msg_counter_}; generator g{msg.store, this->full_name(), child_layer_name_}; diff --git a/phlex/core/fwd.hpp b/phlex/core/fwd.hpp index b4e191fd..16847462 100644 --- a/phlex/core/fwd.hpp +++ b/phlex/core/fwd.hpp @@ -12,6 +12,7 @@ namespace phlex::experimental { class end_of_message; class generator; class framework_graph; + struct message; class message_sender; class multiplexer; class products_consumer; diff --git a/phlex/core/store_counters.cpp b/phlex/core/store_counters.cpp index 98da8833..c31af87d 100644 --- a/phlex/core/store_counters.cpp +++ b/phlex/core/store_counters.cpp @@ -1,4 +1,5 @@ #include "phlex/core/store_counters.hpp" +#include "phlex/core/message.hpp" #include "phlex/model/data_cell_counter.hpp" #include "fmt/std.h" @@ -18,6 +19,12 @@ namespace phlex::experimental { unsigned int store_flag::original_message_id() const noexcept { return original_message_id_; } + void detect_flush_flag::receive_flush(message const& msg) + { + assert(msg.store->is_flush()); + flag_for(msg.store->id()->hash()).flush_received(msg.original_id); + } + store_flag& detect_flush_flag::flag_for(data_cell_index::hash_type const hash) { flag_accessor fa; diff --git a/phlex/core/store_counters.hpp b/phlex/core/store_counters.hpp index 99216c94..6f9ca961 100644 --- a/phlex/core/store_counters.hpp +++ b/phlex/core/store_counters.hpp @@ -29,6 +29,7 @@ namespace phlex::experimental { class detect_flush_flag { protected: + void receive_flush(message const& msg); store_flag& flag_for(data_cell_index::hash_type hash); bool done_with(product_store_const_ptr const& store); From 45b42a0bb6ecb0e0f8819ece75debe6a19822ae1 Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Wed, 14 Jan 2026 17:09:33 -0600 Subject: [PATCH 3/9] Use flush ports instead of conditional logic --- phlex/core/declared_fold.hpp | 93 +++++++++++++++++-------------- phlex/core/declared_observer.hpp | 21 ++++++- phlex/core/declared_predicate.hpp | 19 ++++++- phlex/core/declared_provider.hpp | 64 ++++++++++++--------- phlex/core/declared_transform.hpp | 19 ++++++- phlex/core/declared_unfold.cpp | 6 +- phlex/core/declared_unfold.hpp | 50 ++++++++++++----- phlex/core/framework_graph.cpp | 35 ++++++++---- phlex/core/fwd.hpp | 3 + phlex/core/message_sender.hpp | 4 -- 10 files changed, 208 insertions(+), 106 deletions(-) diff --git a/phlex/core/declared_fold.hpp b/phlex/core/declared_fold.hpp index 9456f179..cbc35ea8 100644 --- a/phlex/core/declared_fold.hpp +++ b/phlex/core/declared_fold.hpp @@ -19,6 +19,7 @@ #include "oneapi/tbb/concurrent_unordered_map.h" #include "oneapi/tbb/flow_graph.h" +#include "spdlog/spdlog.h" #include #include @@ -42,6 +43,7 @@ namespace phlex::experimental { virtual tbb::flow::sender& sender() = 0; virtual tbb::flow::sender& to_output() = 0; + virtual tbb::flow::receiver& flush_port() = 0; virtual product_specifications const& output() const = 0; virtual std::size_t product_count() const = 0; }; @@ -75,53 +77,60 @@ namespace phlex::experimental { initializer_{std::move(initializer)}, output_{to_product_specifications(full_name(), std::move(output), make_type_ids())}, partition_{std::move(partition)}, + flush_receiver_{ + g, + tbb::flow::unlimited, + [this](message const& msg) -> tbb::flow::continue_msg { + auto const& [store, original_message_id] = std::tie(msg.store, msg.original_id); + if (store->id()->layer_name() != partition_) { + return {}; + } + + counter_for(store->id()->hash()).set_flush_value(store, original_message_id); + emit_and_evict_if_done(store->id(), msg.eom); + return {}; + }}, join_{make_join_or_none(g, std::make_index_sequence{})}, - fold_{g, - concurrency, - [this, ft = alg.release_algorithm()](messages_t const& messages, auto& outputs) { - // N.B. The assumption is that a fold will *never* need to cache - // the product store it creates. Any flush messages *do not* need - // to be propagated to downstream nodes. - auto const& msg = most_derived(messages); - auto const& [store, original_message_id] = std::tie(msg.store, msg.original_id); - - if (not store->is_flush() and not store->id()->parent(partition_)) { - return; - } - - if (store->is_flush()) { - // Downstream nodes always get the flush. - get<0>(outputs).try_put(msg); - if (store->id()->layer_name() != partition_) { - return; - } - } - - auto const& fold_index = - store->is_flush() ? store->id() : store->id()->parent(partition_); - assert(fold_index); - auto const& id_hash_for_counter = fold_index->hash(); - - if (store->is_flush()) { - counter_for(id_hash_for_counter).set_flush_value(store, original_message_id); - } else { - call(ft, messages, std::make_index_sequence{}); - counter_for(id_hash_for_counter).increment(store->id()->layer_hash()); - } - - if (auto counter = done_with(id_hash_for_counter)) { - auto parent = std::make_shared(fold_index, this->full_name()); - commit_(*parent); - ++product_count_; - // FIXME: This msg.eom value may be wrong! - get<0>(outputs).try_put({parent, msg.eom, counter->original_message_id()}); - } - }} + fold_{ + g, concurrency, [this, ft = alg.release_algorithm()](messages_t const& messages, auto&) { + // N.B. The assumption is that a fold will *never* need to cache + // the product store it creates. Any flush messages *do not* need + // to be propagated to downstream nodes. + auto const& msg = most_derived(messages); + auto const& [store, eom] = std::tie(msg.store, msg.eom); + + assert(not store->is_flush()); + + if (not store->id()->parent(partition_)) { + return; + } + + auto const& fold_index = store->id()->parent(partition_); + assert(fold_index); + auto const& id_hash_for_counter = fold_index->hash(); + + call(ft, messages, std::make_index_sequence{}); + counter_for(id_hash_for_counter).increment(store->id()->layer_hash()); + + emit_and_evict_if_done(fold_index, eom); + }} { make_edge(join_, fold_); } private: + void emit_and_evict_if_done(data_cell_index_ptr const& fold_index, + end_of_message_ptr const& eom) + { + if (auto counter = done_with(fold_index->hash())) { + auto parent = std::make_shared(fold_index, this->full_name()); + commit_(*parent); + ++product_count_; + // FIXME: This msg.eom value may be wrong! + output_port<0>(fold_).try_put({parent, eom, counter->original_message_id()}); + } + } + tbb::flow::receiver& port_for(product_query const& product_label) override { return receiver_for(join_, input(), product_label); @@ -129,6 +138,7 @@ namespace phlex::experimental { std::vector*> ports() override { return input_ports(join_); } + tbb::flow::receiver& flush_port() override { return flush_receiver_; } tbb::flow::sender& sender() override { return output_port<0ull>(fold_); } tbb::flow::sender& to_output() override { return sender(); } product_specifications const& output() const override { return output_; } @@ -178,6 +188,7 @@ namespace phlex::experimental { input_retriever_types input_{input_arguments()}; product_specifications output_; std::string partition_; + tbb::flow::function_node flush_receiver_; join_or_none_t join_; tbb::flow::multifunction_node, messages_t<1>> fold_; tbb::concurrent_unordered_map> results_; diff --git a/phlex/core/declared_observer.hpp b/phlex/core/declared_observer.hpp index d994061c..7049a325 100644 --- a/phlex/core/declared_observer.hpp +++ b/phlex/core/declared_observer.hpp @@ -37,6 +37,8 @@ namespace phlex::experimental { product_queries input_products); virtual ~declared_observer(); + virtual tbb::flow::receiver& flush_port() = 0; + protected: using hashes_t = tbb::concurrent_hash_map; using accessor = hashes_t::accessor; @@ -66,6 +68,15 @@ namespace phlex::experimental { AlgorithmBits alg, product_queries input_products) : declared_observer{std::move(name), std::move(predicates), std::move(input_products)}, + flush_receiver_{g, + tbb::flow::unlimited, + [this](message const& msg) -> tbb::flow::continue_msg { + receive_flush(msg); + if (done_with(msg.store)) { + cached_hashes_.erase(msg.store->id()->hash()); + } + return {}; + }}, join_{make_join_or_none(g, std::make_index_sequence{})}, observer_{g, concurrency, @@ -73,9 +84,10 @@ namespace phlex::experimental { messages_t const& messages) -> oneapi::tbb::flow::continue_msg { auto const& msg = most_derived(messages); auto const& [store, message_id] = std::tie(msg.store, msg.id); - if (store->is_flush()) { - receive_flush(msg); - } else if (accessor a; needs_new(store, a)) { + + assert(not store->is_flush()); + + if (accessor a; needs_new(store, a)) { call(ft, messages, std::make_index_sequence{}); a->second = true; flag_for(store->id()->hash()).mark_as_processed(); @@ -100,6 +112,8 @@ namespace phlex::experimental { std::vector*> ports() override { return input_ports(join_); } + tbb::flow::receiver& flush_port() override { return flush_receiver_; } + bool needs_new(product_store_const_ptr const& store, accessor& a) { if (cached_hashes_.count(store->id()->hash()) > 0ull) { @@ -118,6 +132,7 @@ namespace phlex::experimental { std::size_t num_calls() const final { return calls_.load(); } input_retriever_types input_{input_arguments()}; + tbb::flow::function_node flush_receiver_; join_or_none_t join_; tbb::flow::function_node> observer_; hashes_t cached_hashes_; diff --git a/phlex/core/declared_predicate.hpp b/phlex/core/declared_predicate.hpp index 49f56fba..d3ecaa41 100644 --- a/phlex/core/declared_predicate.hpp +++ b/phlex/core/declared_predicate.hpp @@ -40,6 +40,7 @@ namespace phlex::experimental { product_queries input_products); virtual ~declared_predicate(); + virtual tbb::flow::receiver& flush_port() = 0; virtual tbb::flow::sender& sender() = 0; protected: @@ -72,6 +73,15 @@ namespace phlex::experimental { AlgorithmBits alg, product_queries input_products) : declared_predicate{std::move(name), std::move(predicates), std::move(input_products)}, + flush_receiver_{g, + tbb::flow::unlimited, + [this](message const& msg) -> tbb::flow::continue_msg { + receive_flush(msg); + if (done_with(msg.store)) { + results_.erase(msg.store->id()->hash()); + } + return {}; + }}, join_{make_join_or_none(g, std::make_index_sequence{})}, predicate_{ g, @@ -79,10 +89,11 @@ namespace phlex::experimental { [this, ft = alg.release_algorithm()](messages_t const& messages) -> predicate_result { auto const& msg = most_derived(messages); auto const& [store, message_id] = std::tie(msg.store, msg.id); + + assert(not store->is_flush()); + predicate_result result{}; - if (store->is_flush()) { - receive_flush(msg); - } else if (const_accessor a; results_.find(a, store->id()->hash())) { + if (const_accessor a; results_.find(a, store->id()->hash())) { result = {msg.eom, message_id, a->second.result}; } else if (accessor a; results_.insert(a, store->id()->hash())) { bool const rc = call(ft, messages, std::make_index_sequence{}); @@ -109,6 +120,7 @@ namespace phlex::experimental { std::vector*> ports() override { return input_ports(join_); } + tbb::flow::receiver& flush_port() override { return flush_receiver_; } tbb::flow::sender& sender() override { return predicate_; } template @@ -121,6 +133,7 @@ namespace phlex::experimental { std::size_t num_calls() const final { return calls_.load(); } input_retriever_types input_{input_arguments()}; + tbb::flow::function_node flush_receiver_; join_or_none_t join_; tbb::flow::function_node, predicate_result> predicate_; results_t results_; diff --git a/phlex/core/declared_provider.hpp b/phlex/core/declared_provider.hpp index 86ea2432..43e3493e 100644 --- a/phlex/core/declared_provider.hpp +++ b/phlex/core/declared_provider.hpp @@ -35,6 +35,7 @@ namespace phlex::experimental { product_query const& output_product() const noexcept; virtual tbb::flow::receiver* input_port() = 0; + virtual tbb::flow::receiver& flush_port() = 0; virtual tbb::flow::sender& sender() = 0; virtual std::size_t num_calls() const = 0; @@ -68,41 +69,48 @@ namespace phlex::experimental { product_query output) : declared_provider{std::move(name), output}, output_{output.spec()}, + flush_receiver_{g, + tbb::flow::unlimited, + [this](message const& msg) -> tbb::flow::continue_msg { + receive_flush(msg); + if (done_with(msg.store)) { + cache_.erase(msg.store->id()->hash()); + } + return {}; + }}, provider_{ g, concurrency, [this, ft = alg.release_algorithm()](message const& msg, auto& output) { auto& [stay_in_graph, to_output] = output; - if (msg.store->is_flush()) { - receive_flush(msg); - } else { - // Check cache first - auto index_hash = msg.store->id()->hash(); - if (const_accessor ca; cache_.find(ca, index_hash)) { - // Cache hit - reuse the cached store - message const new_msg{ca->second, msg.eom, msg.id}; - stay_in_graph.try_put(new_msg); - to_output.try_put(new_msg); - return; - } - - // Cache miss - compute the result - auto result = std::invoke(ft, *msg.store->id()); - ++calls_; - - products new_products; - new_products.add(output_.name(), std::move(result)); - auto store = std::make_shared( - msg.store->id(), this->full_name(), std::move(new_products)); - - // Store in cache - cache_.emplace(index_hash, store); - - message const new_msg{store, msg.eom, msg.id}; + assert(not msg.store->is_flush()); + + // Check cache first + auto index_hash = msg.store->id()->hash(); + if (const_accessor ca; cache_.find(ca, index_hash)) { + // Cache hit - reuse the cached store + message const new_msg{ca->second, msg.eom, msg.id}; stay_in_graph.try_put(new_msg); to_output.try_put(new_msg); - flag_for(msg.store->id()->hash()).mark_as_processed(); + return; } + // Cache miss - compute the result + auto result = std::invoke(ft, *msg.store->id()); + ++calls_; + + products new_products; + new_products.add(output_.name(), std::move(result)); + auto store = std::make_shared( + msg.store->id(), this->full_name(), std::move(new_products)); + + // Store in cache + cache_.emplace(index_hash, store); + + message const new_msg{store, msg.eom, msg.id}; + stay_in_graph.try_put(new_msg); + to_output.try_put(new_msg); + flag_for(msg.store->id()->hash()).mark_as_processed(); + if (done_with(msg.store)) { cache_.erase(msg.store->id()->hash()); } @@ -116,11 +124,13 @@ namespace phlex::experimental { private: tbb::flow::receiver* input_port() override { return &provider_; } + tbb::flow::receiver& flush_port() override { return flush_receiver_; } tbb::flow::sender& sender() override { return output_port<0>(provider_); } std::size_t num_calls() const final { return calls_.load(); } product_specification output_; + tbb::flow::function_node flush_receiver_; tbb::flow::multifunction_node> provider_; std::atomic calls_; stores_t cache_; diff --git a/phlex/core/declared_transform.hpp b/phlex/core/declared_transform.hpp index 5effae33..4dc4c734 100644 --- a/phlex/core/declared_transform.hpp +++ b/phlex/core/declared_transform.hpp @@ -46,6 +46,7 @@ namespace phlex::experimental { virtual tbb::flow::sender& sender() = 0; virtual tbb::flow::sender& to_output() = 0; + virtual tbb::flow::receiver& flush_port() = 0; virtual product_specifications const& output() const = 0; virtual std::size_t product_count() const = 0; @@ -84,6 +85,15 @@ namespace phlex::experimental { declared_transform{std::move(name), std::move(predicates), std::move(input_products)}, output_{to_product_specifications( full_name(), std::move(output), make_output_type_ids())}, + flush_receiver_{g, + tbb::flow::unlimited, + [this](message const& msg) -> tbb::flow::continue_msg { + receive_flush(msg); + if (done_with(msg.store)) { + stores_.erase(msg.store->id()->hash()); + } + return {}; + }}, join_{make_join_or_none(g, std::make_index_sequence{})}, transform_{g, concurrency, @@ -92,9 +102,10 @@ namespace phlex::experimental { auto const& [store, message_eom, message_id] = std::tie(msg.store, msg.eom, msg.id); auto& [stay_in_graph, to_output] = output; - if (store->is_flush()) { - receive_flush(msg); - } else { + + assert(not store->is_flush()); + + { accessor a; if (stores_.insert(a, store->id()->hash())) { auto result = call(ft, messages, std::make_index_sequence{}); @@ -132,6 +143,7 @@ namespace phlex::experimental { std::vector*> ports() override { return input_ports(join_); } + tbb::flow::receiver& flush_port() override { return flush_receiver_; } tbb::flow::sender& sender() override { return output_port<0>(transform_); } tbb::flow::sender& to_output() override { return output_port<1>(transform_); } product_specifications const& output() const override { return output_; } @@ -154,6 +166,7 @@ namespace phlex::experimental { input_retriever_types input_{input_arguments()}; product_specifications output_; + tbb::flow::function_node flush_receiver_; join_or_none_t join_; tbb::flow::multifunction_node, messages_t<2u>> transform_; stores_t stores_; diff --git a/phlex/core/declared_unfold.cpp b/phlex/core/declared_unfold.cpp index b291b89e..5cb2cc1d 100644 --- a/phlex/core/declared_unfold.cpp +++ b/phlex/core/declared_unfold.cpp @@ -35,8 +35,10 @@ namespace phlex::experimental { declared_unfold::declared_unfold(algorithm_name name, std::vector predicates, - product_queries input_products) : - products_consumer{std::move(name), std::move(predicates), std::move(input_products)} + product_queries input_products, + std::string child_layer) : + products_consumer{std::move(name), std::move(predicates), std::move(input_products)}, + child_layer_{std::move(child_layer)} { } diff --git a/phlex/core/declared_unfold.hpp b/phlex/core/declared_unfold.hpp index 4503489a..428eabc7 100644 --- a/phlex/core/declared_unfold.hpp +++ b/phlex/core/declared_unfold.hpp @@ -58,13 +58,18 @@ namespace phlex::experimental { public: declared_unfold(algorithm_name name, std::vector predicates, - product_queries input_products); + product_queries input_products, + std::string child_layer); virtual ~declared_unfold(); + virtual tbb::flow::receiver& flush_port() = 0; virtual tbb::flow::sender& sender() = 0; virtual tbb::flow::sender& to_output() = 0; virtual product_specifications const& output() const = 0; virtual std::size_t product_count() const = 0; + virtual flusher_t& flusher() = 0; + + std::string const& child_layer() const noexcept { return child_layer_; } protected: using stores_t = tbb::concurrent_hash_map; @@ -72,6 +77,9 @@ namespace phlex::experimental { using const_accessor = stores_t::const_accessor; void report_cached_stores(stores_t const& stores) const; + + private: + std::string child_layer_; }; using declared_unfold_ptr = std::unique_ptr; @@ -95,35 +103,48 @@ namespace phlex::experimental { product_queries product_labels, std::vector output_products, std::string child_layer_name) : - declared_unfold{std::move(name), std::move(predicates), std::move(product_labels)}, + declared_unfold{std::move(name), + std::move(predicates), + std::move(product_labels), + std::move(child_layer_name)}, output_{to_product_specifications(full_name(), std::move(output_products), make_type_ids>>())}, - child_layer_name_{std::move(child_layer_name)}, + flush_receiver_{g, + tbb::flow::unlimited, + [this](message const& msg) -> tbb::flow::continue_msg { + receive_flush(msg); + if (done_with(msg.store)) { + stores_.erase(msg.store->id()->hash()); + } + return {}; + }}, join_{make_join_or_none(g, std::make_index_sequence{})}, unfold_{ g, concurrency, [this, p = std::move(predicate), ufold = std::move(unfold)](messages_t const& messages, - auto& output) { + auto&) { auto const& msg = most_derived(messages); auto const& store = msg.store; - if (store->is_flush()) { - receive_flush(msg); - } else if (accessor a; stores_.insert(a, store->id()->hash())) { + + assert(not store->is_flush()); + + if (accessor a; stores_.insert(a, store->id()->hash())) { std::size_t const original_message_id{msg_counter_}; - generator g{msg.store, this->full_name(), child_layer_name_}; + generator g{msg.store, this->full_name(), child_layer()}; call(p, ufold, msg.store->id(), g, msg.eom, messages, std::make_index_sequence{}); message const flush_msg{g.flush_store(), msg.eom, ++msg_counter_, original_message_id}; - std::get<0>(output).try_put(flush_msg); + flusher_.try_put(flush_msg); flag_for(store->id()->hash()).mark_as_processed(); } if (done_with(store)) { stores_.erase(store->id()->hash()); } - }} + }}, + flusher_{g} { make_edge(join_, unfold_); } @@ -137,9 +158,11 @@ namespace phlex::experimental { } std::vector*> ports() override { return input_ports(join_); } + tbb::flow::receiver& flush_port() override { return flush_receiver_; } tbb::flow::sender& sender() override { return output_port<0>(unfold_); } tbb::flow::sender& to_output() override { return sender(); } product_specifications const& output() const override { return output_; } + flusher_t& flusher() override { return flusher_; } template void call(Predicate const& predicate, @@ -156,7 +179,7 @@ namespace phlex::experimental { auto running_value = obj.initial_value(); while (std::invoke(predicate, obj, running_value)) { products new_products; - auto new_id = unfolded_id->make_child(counter, child_layer_name_); + auto new_id = unfolded_id->make_child(counter, child_layer()); if constexpr (requires { std::invoke(unfold, obj, running_value, *new_id); }) { auto [next_value, prods] = std::invoke(unfold, obj, running_value, *new_id); new_products.add_all(output_, std::move(prods)); @@ -173,7 +196,7 @@ namespace phlex::experimental { // Every data cell needs a flush (for now) message const child_flush_msg{child->make_flush(), nullptr, ++msg_counter_}; - output_port<0>(unfold_).try_put(child_flush_msg); + flusher_.try_put(child_flush_msg); } } @@ -182,9 +205,10 @@ namespace phlex::experimental { input_retriever_types input_{input_arguments()}; product_specifications output_; - std::string child_layer_name_; + tbb::flow::function_node flush_receiver_; join_or_none_t join_; tbb::flow::multifunction_node, messages_t<1u>> unfold_; + flusher_t flusher_; tbb::concurrent_hash_map stores_; std::atomic msg_counter_{}; // Is this sufficient? Probably not. std::atomic calls_{}; diff --git a/phlex/core/framework_graph.cpp b/phlex/core/framework_graph.cpp index d3b7797d..68af05d9 100644 --- a/phlex/core/framework_graph.cpp +++ b/phlex/core/framework_graph.cpp @@ -167,18 +167,33 @@ namespace phlex::experimental { nodes_.unfolds, nodes_.transforms); - // Connect edges between all nodes and the flusher - auto connect_with_flusher = [this](auto& consumers) { - for (auto& n : consumers | std::views::values) { - if constexpr (requires { n->input_port(); }) { - make_edge(flusher_, *n->input_port()); - } else { - for (auto* p : n->ports()) { - make_edge(flusher_, *p); + std::map flushers_from_unfolds; + for (auto const& n : nodes_.unfolds | std::views::values) { + flushers_from_unfolds.try_emplace(n->child_layer(), &n->flusher()); + } + + // Connect edges between all nodes, the graph-wide flusher, and the unfolds' flushers + auto connect_with_flusher = + [this, unfold_flushers = std::move(flushers_from_unfolds)](auto& consumers) { + for (auto& n : consumers | std::views::values) { + std::set flushers; + // For providers + if constexpr (requires { n->output_product(); }) { + make_edge(flusher_, n->flush_port()); + } else { + for (product_query const& pq : n->input()) { + if (auto it = unfold_flushers.find(pq.layer()); it != unfold_flushers.end()) { + flushers.insert(it->second); + } else { + flushers.insert(&flusher_); + } + } + for (flusher_t* flusher : flushers) { + make_edge(*flusher, n->flush_port()); + } } } - } - }; + }; connect_with_flusher(nodes_.folds); connect_with_flusher(nodes_.observers); diff --git a/phlex/core/fwd.hpp b/phlex/core/fwd.hpp index 16847462..294c09bb 100644 --- a/phlex/core/fwd.hpp +++ b/phlex/core/fwd.hpp @@ -3,6 +3,8 @@ #include "phlex/model/fwd.hpp" +#include "oneapi/tbb/flow_graph.h" + #include namespace phlex::experimental { @@ -18,6 +20,7 @@ namespace phlex::experimental { class products_consumer; using end_of_message_ptr = std::shared_ptr; + using flusher_t = tbb::flow::broadcast_node; } #endif // PHLEX_CORE_FWD_HPP diff --git a/phlex/core/message_sender.hpp b/phlex/core/message_sender.hpp index 5f583edc..277f9fcd 100644 --- a/phlex/core/message_sender.hpp +++ b/phlex/core/message_sender.hpp @@ -5,15 +5,11 @@ #include "phlex/core/message.hpp" #include "phlex/model/fwd.hpp" -#include "oneapi/tbb/flow_graph.h" - #include #include namespace phlex::experimental { - using flusher_t = tbb::flow::broadcast_node; - class message_sender { public: explicit message_sender(data_layer_hierarchy& hierarchy, From 5b6f7aa3e3c28c487f4c91644043a475ac3c827e Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Wed, 21 Jan 2026 16:55:35 -0600 Subject: [PATCH 4/9] Use strict equality comparisons --- test/allowed_families.cpp | 5 ++--- test/cached_execution.cpp | 11 +++++------ test/hierarchical_nodes.cpp | 2 +- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/test/allowed_families.cpp b/test/allowed_families.cpp index d0f3a54f..272c507e 100644 --- a/test/allowed_families.cpp +++ b/test/allowed_families.cpp @@ -59,8 +59,7 @@ TEST_CASE("Testing families", "[data model]") CHECK(g.execution_counts("rs") == 1ull); CHECK(g.execution_counts("rse") == 1ull); - // FIXME: Need to improve the synchronization to supply strict equality - CHECK(g.execution_counts("run_id_provider") >= 1ull); - CHECK(g.execution_counts("subrun_id_provider") >= 1ull); + CHECK(g.execution_counts("run_id_provider") == 1ull); + CHECK(g.execution_counts("subrun_id_provider") == 1ull); CHECK(g.execution_counts("event_id_provider") == 1ull); } diff --git a/test/cached_execution.cpp b/test/cached_execution.cpp index 6ce4f075..4d1ba2cc 100644 --- a/test/cached_execution.cpp +++ b/test/cached_execution.cpp @@ -90,13 +90,12 @@ TEST_CASE("Cached function calls", "[data model]") g.execute(); - // FIXME: Need to improve the synchronization to supply strict equality - CHECK(g.execution_counts("A1") >= n_runs); - CHECK(g.execution_counts("A2") >= n_runs); - CHECK(g.execution_counts("A3") >= n_runs); + CHECK(g.execution_counts("A1") == n_runs); + CHECK(g.execution_counts("A2") == n_runs); + CHECK(g.execution_counts("A3") == n_runs); - CHECK(g.execution_counts("B1") >= n_runs * n_subruns); - CHECK(g.execution_counts("B2") >= n_runs * n_subruns); + CHECK(g.execution_counts("B1") == n_runs * n_subruns); + CHECK(g.execution_counts("B2") == n_runs * n_subruns); CHECK(g.execution_counts("C") == n_runs * n_subruns * n_events); } diff --git a/test/hierarchical_nodes.cpp b/test/hierarchical_nodes.cpp index 457aa4e8..905f36cd 100644 --- a/test/hierarchical_nodes.cpp +++ b/test/hierarchical_nodes.cpp @@ -134,7 +134,7 @@ TEST_CASE("Hierarchical nodes", "[graph]") CHECK(g.execution_counts("square") == index_limit * number_limit); CHECK(g.execution_counts("add") == index_limit * number_limit); - CHECK(g.execution_counts("get_the_time") >= index_limit); + CHECK(g.execution_counts("get_the_time") == index_limit); CHECK(g.execution_counts("scale") == index_limit); CHECK(g.execution_counts("print_result") == index_limit); } From 8eba8b420bde1125eabb08c67160766a5d59f423 Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Thu, 22 Jan 2026 09:36:55 -0600 Subject: [PATCH 5/9] Caching for nodes that require data products from different data layers. --- plugins/layer_generator.hpp | 1 + test/CMakeLists.txt | 1 + test/repeater/CMakeLists.txt | 13 ++ test/repeater/index_router.cpp | 201 ++++++++++++++++++++++++++++++ test/repeater/index_router.hpp | 69 +++++++++++ test/repeater/message_types.hpp | 57 +++++++++ test/repeater/nodes.hpp | 148 ++++++++++++++++++++++ test/repeater/repeater.cpp | 132 ++++++++++++++++++++ test/repeater/repeater_node.hpp | 209 ++++++++++++++++++++++++++++++++ 9 files changed, 831 insertions(+) create mode 100644 test/repeater/CMakeLists.txt create mode 100644 test/repeater/index_router.cpp create mode 100644 test/repeater/index_router.hpp create mode 100644 test/repeater/message_types.hpp create mode 100644 test/repeater/nodes.hpp create mode 100644 test/repeater/repeater.cpp create mode 100644 test/repeater/repeater_node.hpp diff --git a/plugins/layer_generator.hpp b/plugins/layer_generator.hpp index 0bf17f2d..255822cf 100644 --- a/plugins/layer_generator.hpp +++ b/plugins/layer_generator.hpp @@ -54,6 +54,7 @@ namespace phlex::experimental { void operator()(framework_driver& driver) { execute(driver, data_cell_index::base_ptr()); } std::size_t emitted_cells(std::string layer_path = {}) const; + std::vector const& layer_paths() const noexcept { return layer_paths_; } private: void execute(framework_driver& driver, data_cell_index_ptr index, bool recurse = true); diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index ab51b55f..a53ce073 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -237,6 +237,7 @@ add_subdirectory(utilities) add_subdirectory(mock-workflow) add_subdirectory(demo-giantdata) add_subdirectory(python) +add_subdirectory(repeater) if(PHLEX_USE_FORM) add_subdirectory(form) diff --git a/test/repeater/CMakeLists.txt b/test/repeater/CMakeLists.txt new file mode 100644 index 00000000..07ccd723 --- /dev/null +++ b/test/repeater/CMakeLists.txt @@ -0,0 +1,13 @@ +add_library(index_router SHARED index_router.cpp) +target_link_libraries(index_router PRIVATE TBB::tbb phlex::model) + +cet_test( + repeater + USE_CATCH2_MAIN + SOURCE + repeater.cpp + LIBRARIES + phlex::core + index_router + layer_generator +) diff --git a/test/repeater/index_router.cpp b/test/repeater/index_router.cpp new file mode 100644 index 00000000..8e9160b6 --- /dev/null +++ b/test/repeater/index_router.cpp @@ -0,0 +1,201 @@ +#include "index_router.hpp" + +#include "fmt/format.h" +#include "fmt/ranges.h" +#include "spdlog/spdlog.h" + +#include +#include +#include + +using namespace tbb; +using namespace phlex; + +test::index_router::index_router(flow::graph& g, + std::vector layers, + std::map multilayers) +{ + for (auto const& layer : layers) { + broadcasters_.try_emplace(layer, g); + } + for (auto const& [node_name, multilayer] : multilayers) { + spdlog::trace("Making multilayer caster for {}", node_name); + multibroadcaster_entries casters; + casters.reserve(multilayer.size()); + for (auto const& [layer, flush_port, input_port] : multilayer) { + auto& entry = casters.emplace_back(layer, index_set_node{g}, flush_node{g}); + make_edge(entry.broadcaster, *input_port); // Connect with index ports of multi-algorithms + make_edge(entry.flusher, *flush_port); // Connect with flush ports of multi-algorithms + } + multibroadcasters_.try_emplace(node_name, std::move(casters)); + } +} + +void test::index_router::shutdown() +{ + backout_to(data_cell_index::base_ptr()); + last_index_ = nullptr; +} + +void test::index_router::route(data_cell_index_ptr const& index) +{ + backout_to(index); + auto msg_id = counter_.fetch_add(1); + send(index, msg_id); + multisend(index, msg_id); + last_index_ = index; +} + +void test::index_router::backout_to(data_cell_index_ptr const& index) +{ + assert(index); + + if (!last_index_) { + // This happens when we encounter the first index + return; + } + + if (index->parent() == last_index_->parent()) { + // At the same level in the hierarchy + return; + } + + if (index->parent(last_index_->layer_name())) { + // Descending further into the hierarchy + return; + } + + // What's left is situations where we need to go up the hierarchy chain. + + auto do_the_put = [this](data_cell_index_ptr const& index) { + // FIXME: This lookup should be fixed + for (auto& [_, senders] : cached_multicasters_) { + for (auto& sender : senders) { + if (sender.layer() == index->layer_name()) { + sender.put_end_token(index); + } + } + } + }; + + auto current = last_index_; + while (current and current->layer_hash() != index->layer_hash()) { + do_the_put(current); + current = current->parent(); + assert(current); // Cannot be non-null + } + do_the_put(current); +} + +auto test::index_router::index_node_for(std::string const& layer) -> index_set_node& +{ + std::vector candidates; + for (auto it = broadcasters_.begin(), e = broadcasters_.end(); it != e; ++it) { + if (it->first.ends_with("/" + layer)) { + candidates.push_back(it); + } + } + + if (candidates.size() == 1ull) { + return candidates[0]->second; + } + + if (candidates.empty()) { + throw std::runtime_error("No broadcaster found for layer specification" + layer); + } + + std::string msg{"Multiple layers match specification " + layer + ":\n"}; + for (auto const& it : candidates) { + msg += "\n- " + it->first; + } + throw std::runtime_error(msg); +} + +void test::index_router::send(data_cell_index_ptr const& index, std::size_t message_id) +{ + auto it = broadcasters_.find(index->layer_path()); + assert(it != broadcasters_.end()); + it->second.try_put({.msg_id = message_id, .index = index}); +} + +void test::index_router::multisend(data_cell_index_ptr const& index, std::size_t message_id) +{ + auto const layer_hash = index->layer_hash(); + // spdlog::trace("Multilayer send for layer hash {} {}", layer_hash, index->to_string()); + + auto do_the_put = [](data_cell_index_ptr const& index, + std::size_t message_id, + std::vector& nodes) { + for (auto& sender : nodes) { + sender.put_message(index, message_id); + } + }; + + if (auto it = cached_multicasters_.find(layer_hash); it != cached_multicasters_.end()) { + do_the_put(index, message_id, it->second); + return; + } + + auto [it, _] = cached_multicasters_.try_emplace(layer_hash); + + // spdlog::trace("Assigning new multi-caster for {} (path: {})", layer_hash, index->layer_path()); + for (auto& [multilayer_str, entries] : multibroadcasters_) { + // Now we need to check how to match "ports" and the multilayer + std::vector senders; + senders.reserve(entries.size()); + bool name_in_multilayer = false; + for (auto& [layer, caster, flusher] : entries) { + if (layer == index->layer_name()) { + senders.emplace_back(layer, &caster, &flusher); + name_in_multilayer = true; + } else if (index->parent(layer)) { + senders.emplace_back(layer, &caster, &flusher); + } + } + + if (name_in_multilayer and senders.size() == entries.size()) { + // spdlog::trace("Match for {}: {} (path: {})", multilayer_str, layer_hash, index->layer_path()); + it->second.insert(it->second.end(), + std::make_move_iterator(senders.begin()), + std::make_move_iterator(senders.end())); + } + } + // if (it->second.empty()) { + // spdlog::trace("No broadcasters for {}", layer_hash); + // } else { + // spdlog::trace("Number of broadcasters for {}: {}", layer_hash, it->second.size()); + // } + do_the_put(index, message_id, it->second); +} + +test::index_router::multilayer_sender::multilayer_sender(std::string const& layer, + index_set_node* broadcaster, + flush_node* flusher) : + layer_{layer}, broadcaster_{broadcaster}, flusher_{flusher} +{ +} + +void test::index_router::multilayer_sender::put_message(data_cell_index_ptr const& index, + std::size_t message_id) +{ + if (layer_ == index->layer_name()) { + broadcaster_->try_put({.msg_id = message_id, .index = index, .cache = false}); + return; + } + + // Flush values are needed only used for indices that are *not* the "lowest" in the branch + // of the hierarchy. + ++counter_; + broadcaster_->try_put({.msg_id = message_id, .index = index->parent(layer_)}); +} + +void test::index_router::multilayer_sender::put_end_token(data_cell_index_ptr const& index) +{ + auto count = std::exchange(counter_, 0); + if (count == 0) { + // See comment above about flush values + return; + } + + flusher_->try_put({.index = index, .count = count}); +} diff --git a/test/repeater/index_router.hpp b/test/repeater/index_router.hpp new file mode 100644 index 00000000..f4bc8cd1 --- /dev/null +++ b/test/repeater/index_router.hpp @@ -0,0 +1,69 @@ +#ifndef TEST_INDEX_ROUTER_HPP +#define TEST_INDEX_ROUTER_HPP + +#include "message_types.hpp" +#include "phlex/model/data_cell_index.hpp" + +#include "oneapi/tbb/flow_graph.h" + +#include +#include +#include + +namespace phlex::test { + using layers_t = std::vector; + + class index_router { + public: + using flush_node = tbb::flow::broadcast_node; + using index_set_node = tbb::flow::broadcast_node; + + explicit index_router(tbb::flow::graph& g, + std::vector layers, + std::map multilayers); + auto index_node_for(std::string const& layer) -> index_set_node&; + + void shutdown(); + void route(data_cell_index_ptr const& index); + + private: + void send(data_cell_index_ptr const& index, std::size_t message_id); + void multisend(data_cell_index_ptr const& index, std::size_t message_id); + void backout_to(data_cell_index_ptr const& index); + + using broadcasters_t = std::map; + broadcasters_t broadcasters_; + + struct multibroadcaster_entry { + std::string layer; + index_set_node broadcaster; + flush_node flusher; + }; + using multibroadcaster_entries = std::vector; + + using multibroadcasters_t = std::unordered_map; + multibroadcasters_t multibroadcasters_; + + class multilayer_sender { + public: + multilayer_sender(std::string const& layer, index_set_node* broadcaster, flush_node* flusher); + + auto const& layer() const noexcept { return layer_; } + void put_message(data_cell_index_ptr const& index, std::size_t message_id); + void put_end_token(data_cell_index_ptr const& index); + + private: + std::string layer_; + index_set_node* broadcaster_; + flush_node* flusher_; + int counter_ = 0; + }; + + using cached_casters_t = std::unordered_map>; + cached_casters_t cached_multicasters_; + std::atomic counter_; + data_cell_index_ptr last_index_; + }; +} + +#endif // TEST_INDEX_ROUTER_HPP diff --git a/test/repeater/message_types.hpp b/test/repeater/message_types.hpp new file mode 100644 index 00000000..178c068a --- /dev/null +++ b/test/repeater/message_types.hpp @@ -0,0 +1,57 @@ +#ifndef TEST_MESSAGE_TYPES_HPP +#define TEST_MESSAGE_TYPES_HPP + +#include "oneapi/tbb/flow_graph.h" +#include "phlex/model/data_cell_index.hpp" +#include "spdlog/spdlog.h" + +#include +#include +#include +#include +#include + +namespace phlex::test { + struct index_message { + std::size_t msg_id; + data_cell_index_ptr index; + bool cache{true}; + }; + + template + struct indexed_message { + std::size_t msg_id; + data_cell_index_ptr index; + T data; + }; + + struct indexed_end_token { + data_cell_index_ptr index; + int count; + }; + + struct named_index_port { + std::string layer; + tbb::flow::receiver* token_port; + tbb::flow::receiver* index_port; + }; + using named_index_ports = std::vector; + + template + using indexed_message_tuple = std::tuple...>; + + struct no_more_indices {}; + + using message_from_input = std::variant; + + struct index_message_matcher { + std::size_t operator()(index_message const& msg) const noexcept { return msg.msg_id; } + }; + + template + struct indexed_message_matcher { + std::size_t operator()(indexed_message const& msg) const noexcept { return msg.msg_id; } + }; +} + +#endif // TEST_MESSAGE_TYPES_HPP diff --git a/test/repeater/nodes.hpp b/test/repeater/nodes.hpp new file mode 100644 index 00000000..fc1cae10 --- /dev/null +++ b/test/repeater/nodes.hpp @@ -0,0 +1,148 @@ +#ifndef TEST_NODES_HPP +#define TEST_NODES_HPP + +#include "message_types.hpp" +#include "phlex/model/data_cell_index.hpp" +#include "repeater_node.hpp" + +#include "oneapi/tbb/concurrent_hash_map.h" +#include "oneapi/tbb/concurrent_queue.h" +#include "oneapi/tbb/flow_graph.h" +#include "spdlog/spdlog.h" + +#include +#include + +namespace phlex::test { + + template + class provider_node : public tbb::flow::function_node> { + public: + provider_node(tbb::flow::graph& g, std::string data_layer, std::string product_name, T value) : + tbb::flow::function_node>{ + g, + tbb::flow::unlimited, + [this, name = std::move(product_name)](index_message const& msg) -> indexed_message { + ++calls_; + spdlog::trace( + "Provider for '{}' (\"{}\") received {}", name, layer_, msg.index->to_string()); + return {.msg_id = msg.msg_id, .index = msg.index, .data = value_}; + }}, + layer_{std::move(data_layer)}, + value_{value} + { + } + + unsigned calls() const noexcept { return calls_; } + + std::string const& layer() const noexcept { return layer_; } + + private: + std::string const layer_; + std::atomic calls_; + T value_; + }; + + template + class consumer_node : public tbb::flow::function_node, indexed_message> { + public: + consumer_node(tbb::flow::graph& g, std::string consumer_name, std::string data_layer) : + tbb::flow::function_node, indexed_message>{ + g, + tbb::flow::unlimited, + [this, + consumer = std::move(consumer_name)](indexed_message const& msg) -> indexed_message { + ++calls_; + spdlog::trace("Consumer '{}' received '{}' (\"{}\")", consumer, msg.data, layer_); + return msg; + }}, + layer_{std::move(data_layer)} + { + } + + unsigned calls() const noexcept { return calls_; } + + std::string const& layer() const noexcept { return layer_; } + + private: + std::string const layer_; + std::atomic calls_; + }; + + template + using my_composite_node = + tbb::flow::composite_node, std::tuple>; + + template + auto& passthrough(auto& g) + { + return g; + } + + template + class multiarg_consumer_node : public my_composite_node { + using base = my_composite_node; + using input_t = typename base::input_ports_type; + using output_t = typename base::output_ports_type; + using args_t = indexed_message_tuple; + + public: + multiarg_consumer_node(tbb::flow::graph& g, + std::string node_name, + std::vector layer_names) : + my_composite_node{g}, + repeaters_{passthrough(g), passthrough(g), passthrough(g)...}, + join_{g, + indexed_message_matcher{}, + indexed_message_matcher{}, + indexed_message_matcher{}...}, + f_{g, + tbb::flow::unlimited, + [this](args_t const&) -> tbb::flow::continue_msg { + ++calls_; + // spdlog::trace("Consumer '{}' received", name_); + return {}; + }}, + name_{std::move(node_name)}, + layers_{std::move(layer_names)} + { + auto set_ports = [this](std::index_sequence) { + this->set_external_ports(input_t{std::get(repeaters_).data_port()...}, output_t{f_}); + // Connect repeaters to join + (make_edge(std::get(repeaters_), input_port(join_)), ...); + (std::get(repeaters_).set_metadata(name_, layers_[Is]), ...); + }; + + set_ports(std::make_index_sequence<2 + sizeof...(Ts)>{}); + make_edge(join_, f_); + } + + std::vector index_ports() + { + std::vector result; + [this](auto& result, std::index_sequence) { + (result.emplace_back(layers_[Is], + &std::get(repeaters_).flush_port(), + &std::get(repeaters_).index_port()), + ...); + }(result, std::make_index_sequence<2 + sizeof...(Ts)>{}); + return result; + } + + unsigned calls() const noexcept { return calls_; } + + std::string const& name() const noexcept { return name_; } + std::vector const& layers() const noexcept { return layers_; } + + private: + // This will be replaced by an std::vector + std::tuple, repeater_node, repeater_node...> repeaters_; + tbb::flow::join_node join_; + tbb::flow::function_node f_; + std::string const name_; + std::vector const layers_; + std::atomic calls_; + }; +} + +#endif // TEST_NODES_HPP diff --git a/test/repeater/repeater.cpp b/test/repeater/repeater.cpp new file mode 100644 index 00000000..3474889c --- /dev/null +++ b/test/repeater/repeater.cpp @@ -0,0 +1,132 @@ +#include "index_router.hpp" +#include "nodes.hpp" +#include "phlex/utilities/max_allowed_parallelism.hpp" +#include "phlex/utilities/resource_usage.hpp" +#include "plugins/layer_generator.hpp" + +#include "catch2/catch_test_macros.hpp" +#include "oneapi/tbb/flow_graph.h" +#include "spdlog/cfg/env.h" + +#include +#include + +using namespace oneapi::tbb; +using namespace phlex; + +namespace { + constexpr int n_runs = 1; + constexpr int n_subruns = 10; + constexpr int n_spills = 50; +} + +TEST_CASE("Test repeater", "[multithreading]") +{ + experimental::resource_usage usage; + + spdlog::cfg::load_env_levels(); + spdlog::set_pattern("[thread %t] [%^%-7l%$] %v"); + + flow::graph g; + + experimental::layer_generator gen; + gen.add_layer("run", {"job", n_runs}); + gen.add_layer("subrun", {"run", n_subruns}); + gen.add_layer("spill", {"subrun", n_spills}); + + framework_driver driver{driver_for_test(gen)}; + + // Providers + test::provider_node run_provider{g, "run", "geometry", 1}; + test::provider_node subrun_provider{g, "subrun", "calib", 2.5}; + test::provider_node spill_provider{g, "spill", "hits", 'a'}; + + // Single-argument consumers + test::consumer_node run_consumer{g, "consume_geometry", "run"}; + test::consumer_node subrun_consumer{g, "consume_calib", "subrun"}; + test::consumer_node spill_consumer{g, "consume_hits", "spill"}; + + // Multi-argument consumers + test::multiarg_consumer_node run_subrun_consumer{ + g, "consume_geometry_calib", {"run", "subrun"}}; + test::multiarg_consumer_node run_spill_consumer{ + g, "consume_geometry_hits", {"run", "spill"}}; + test::multiarg_consumer_node subrun_spill_consumer{ + g, "consume_calib_hits", {"subrun", "spill"}}; + test::multiarg_consumer_node all_consumer{ + g, "consume_all", {"run", "subrun", "spill"}}; + + // Get all multi-layer combinations + // FIXME: An optimization is to make sure these combinations are unique + std::map multilayers; + multilayers.try_emplace(run_subrun_consumer.name(), run_subrun_consumer.index_ports()); + multilayers.try_emplace(run_spill_consumer.name(), run_spill_consumer.index_ports()); + multilayers.try_emplace(subrun_spill_consumer.name(), subrun_spill_consumer.index_ports()); + multilayers.try_emplace(all_consumer.name(), all_consumer.index_ports()); + + test::index_router router{g, gen.layer_paths(), std::move(multilayers)}; + + flow::input_node src{ + g, [done = false, &driver, &router](flow_control& fc) mutable -> flow::continue_msg { + if (done) { + fc.stop(); + return {}; + } + + auto item = driver(); + if (not item) { + done = true; + router.shutdown(); + return {}; + } + router.route(*item); + return {}; + }}; + + // The input_node must be connected to *something* to ensure that the graph runs. + flow::function_node dummy{ + g, flow::unlimited, [](flow::continue_msg) -> flow::continue_msg { return {}; }}; + make_edge(src, dummy); + + // Connect index-set nodes to providers + make_edge(router.index_node_for("run"), run_provider); + make_edge(router.index_node_for("subrun"), subrun_provider); + make_edge(router.index_node_for("spill"), spill_provider); + + // Connect providers to one-argument consumers + make_edge(run_provider, run_consumer); + make_edge(subrun_provider, subrun_consumer); + make_edge(spill_provider, spill_consumer); + + // Connect providers to multi-argument consumers + make_edge(run_provider, input_port<0>(run_subrun_consumer)); + make_edge(subrun_provider, input_port<1>(run_subrun_consumer)); + + make_edge(run_provider, input_port<0>(run_spill_consumer)); + make_edge(spill_provider, input_port<1>(run_spill_consumer)); + + make_edge(subrun_provider, input_port<0>(subrun_spill_consumer)); + make_edge(spill_provider, input_port<1>(subrun_spill_consumer)); + + make_edge(run_provider, input_port<0>(all_consumer)); + make_edge(subrun_provider, input_port<1>(all_consumer)); + make_edge(spill_provider, input_port<2>(all_consumer)); + + src.activate(); + g.wait_for_all(); + + // Single-argument function-call checks + CHECK(run_provider.calls() == n_runs); + CHECK(subrun_provider.calls() == n_runs * n_subruns); + CHECK(spill_provider.calls() == n_runs * n_subruns * n_spills); + + CHECK(run_consumer.calls() == n_runs); + CHECK(subrun_consumer.calls() == n_runs * n_subruns); + CHECK(spill_consumer.calls() == n_runs * n_subruns * n_spills); + + // Multi-argument function-call checks + CHECK(run_subrun_consumer.calls() == n_runs * n_subruns); + CHECK(run_spill_consumer.calls() == n_runs * n_subruns * n_spills); + CHECK(subrun_spill_consumer.calls() == n_runs * n_subruns * n_spills); + CHECK(all_consumer.calls() == n_runs * n_subruns * n_spills); +} diff --git a/test/repeater/repeater_node.hpp b/test/repeater/repeater_node.hpp new file mode 100644 index 00000000..f71c35e3 --- /dev/null +++ b/test/repeater/repeater_node.hpp @@ -0,0 +1,209 @@ +#ifndef TEST_REPEATER_NODE_HPP +#define TEST_REPEATER_NODE_HPP + +#include "message_types.hpp" +#include "phlex/model/data_cell_index.hpp" + +#include "oneapi/tbb/concurrent_hash_map.h" +#include "oneapi/tbb/concurrent_queue.h" +#include "oneapi/tbb/flow_graph.h" +#include "spdlog/spdlog.h" + +#include +#include + +namespace phlex::test { + + template + using repeater_node_input = std::tuple, indexed_end_token, index_message>; + + template + class repeater_node : + public tbb::flow::composite_node, indexed_message_tuple> { + using base_t = tbb::flow::composite_node, indexed_message_tuple>; + using tagged_repeater_msg = + tbb::flow::tagged_msg, indexed_end_token, index_message>; + + struct cached_product { + std::shared_ptr> msg_with_product; + tbb::concurrent_queue msg_ids{}; + std::atomic counter; + std::atomic_flag flush_received{false}; + }; + + using cached_products_t = tbb::concurrent_hash_map; + using accessor = cached_products_t::accessor; + using const_accessor = cached_products_t::const_accessor; + + public: + repeater_node(tbb::flow::graph& g) : + base_t{g}, + indexer_{g}, + repeater_{ + g, tbb::flow::unlimited, [this](tagged_repeater_msg const& tagged, auto& outputs) { + // spdlog::warn("[{}/{}] Received {}", node_name_, layer_, tagged.tag()); + cached_product* entry{nullptr}; + data_cell_index_ptr current_index{}; + std::size_t key = -1ull; + auto& output = std::get<0>(outputs); + + auto drain_cache = [&output](data_cell_index_ptr const& index, + cached_product* entry, + std::string const& node_name [[maybe_unused]], + std::string const& layer [[maybe_unused]]) -> int { + assert(entry->msg_with_product); + int counter{}; + std::size_t new_msg_id{}; + while (entry->msg_ids.try_pop(new_msg_id)) { + // spdlog::info("[{}/{}] => Outputting {} ({})", + // node_name, + // layer, + // index->to_string(), + // index->hash()); + output.try_put( + {.msg_id = new_msg_id, .index = index, .data = entry->msg_with_product->data}); + ++counter; + } + return counter; + }; + + if (tagged.template is_a>()) { + auto const& msg = tagged.template cast_to>(); + current_index = msg.index; + key = msg.index->hash(); + if (!caching_required_) { + spdlog::debug( + "[{}/{}] -> Outputting {} ({})", node_name_, layer_, msg.index->to_string(), key); + output.try_put(msg); + } else { + assert(msg.data); + accessor a; + cached_products_.insert(a, key); + entry = &a->second; + spdlog::debug("[{}/{}] Caching product for {} ({})", + node_name_, + layer_, + msg.index->to_string(), + key); + entry->msg_with_product = std::make_shared>(msg); + entry->counter += drain_cache(msg.index, entry, node_name_, layer_); + } + } else if (tagged.template is_a()) { + auto const [index, count] = tagged.template cast_to(); + spdlog::debug( + "[{}/{}] Received flush {} ({})", node_name_, layer_, count, index->to_string()); + current_index = index; + key = index->hash(); + accessor a; + cached_products_.insert(a, key); + entry = &a->second; + entry->counter -= count; + std::ignore = entry->flush_received.test_and_set(); + } else { + + auto const [msg_id, index, cache] = tagged.template cast_to(); + key = index->hash(); + + if (caching_required_ and !cache) { + caching_required_ = false; + // No caching needed + accessor a; + if (cached_products_.find(a, key)) { + entry = &a->second; + if (entry->msg_with_product) { + spdlog::debug( + "[{}/{}] +> Outputting {} ({})", node_name_, layer_, index->to_string(), key); + output.try_put(*entry->msg_with_product); + ++entry->counter; + } + } + } else if (caching_required_) { + // Caching required + current_index = index; + key = index->hash(); + accessor a; + cached_products_.insert(a, key); + entry = &a->second; + if (entry->msg_with_product) { + spdlog::debug( + "[{}/{}] <> Outputting {} ({})", node_name_, layer_, index->to_string(), key); + output.try_put( + {.msg_id = msg_id, .index = index, .data = entry->msg_with_product->data}); + entry->counter += 1 + drain_cache(index, entry, node_name_, layer_); + } else { + entry->msg_ids.push(msg_id); + } + } + } + + // Cleanup + if (accessor a; cached_products_.find(a, key)) { + entry = &a->second; + if (!caching_required_) { + if (entry->counter == 0) { + spdlog::debug("[{}/{}] ++ Outputting {} ({})", + node_name_, + layer_, + entry->msg_with_product->index->to_string(), + key); + output.try_put(*entry->msg_with_product); + ++entry->counter; + } + assert(entry->counter == 1); + cached_products_.erase(a); + } else if (entry->flush_received.test() and entry->counter == 0) { + spdlog::trace("Erasing cached product {}", key); + cached_products_.erase(a); + } + entry = nullptr; // Do not attempt to use entry (stale!) + } + }} + { + base_t::set_external_ports(typename base_t::input_ports_type{input_port<0>(indexer_), + input_port<1>(indexer_), + input_port<2>(indexer_)}, + typename base_t::output_ports_type{output_port<0>(repeater_)}); + make_edge(indexer_, repeater_); + } + + tbb::flow::receiver>& data_port() { return input_port<0>(indexer_); } + tbb::flow::receiver& flush_port() { return input_port<1>(indexer_); } + tbb::flow::receiver& index_port() { return input_port<2>(indexer_); } + + ~repeater_node() + { + if (cached_products_.empty()) { + return; + } + spdlog::warn("[{}/{}] Cached products {}", node_name_, layer_, cached_products_.size()); + for (auto const& [_, cache] : cached_products_) { + if (cache.msg_with_product) { + spdlog::warn("[{}/{}] Product for {}", + node_name_, + layer_, + cache.msg_with_product->index->to_string()); + } else { + spdlog::warn("[{}/{}] Product for {}", node_name_, layer_, _); + } + } + } + + void set_metadata(std::string node_name, std::string layer) + { + node_name_ = std::move(node_name); + layer_ = std::move(layer); + } + + private: + tbb::flow::indexer_node, indexed_end_token, index_message> indexer_; + tbb::flow::multifunction_node> repeater_; + tbb::concurrent_hash_map cached_products_; // Key is the index hash + std::atomic n_cached_products_{}; + std::atomic caching_required_{true}; + std::atomic erase_cache_{false}; + std::string node_name_; + std::string layer_; + }; +} + +#endif // TEST_REPEATER_NODE_HPP From a0f7b16f88182f708ffa4df9f99e16b00240876f Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Tue, 3 Feb 2026 13:11:43 -0600 Subject: [PATCH 6/9] Add missing cassert header include --- phlex/core/store_counters.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/phlex/core/store_counters.cpp b/phlex/core/store_counters.cpp index c31af87d..a42cba01 100644 --- a/phlex/core/store_counters.cpp +++ b/phlex/core/store_counters.cpp @@ -5,6 +5,8 @@ #include "fmt/std.h" #include "spdlog/spdlog.h" +#include + namespace phlex::experimental { void store_flag::flush_received(std::size_t const original_message_id) From fe6f5129dadd40e1752e04dad1bd2af01953f512 Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Tue, 3 Feb 2026 13:17:35 -0600 Subject: [PATCH 7/9] Apply suggestions from code review Minor adjustments Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- test/repeater/index_router.cpp | 2 +- test/repeater/repeater_node.hpp | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/test/repeater/index_router.cpp b/test/repeater/index_router.cpp index 8e9160b6..2b2215c4 100644 --- a/test/repeater/index_router.cpp +++ b/test/repeater/index_router.cpp @@ -101,7 +101,7 @@ auto test::index_router::index_node_for(std::string const& layer) -> index_set_n } if (candidates.empty()) { - throw std::runtime_error("No broadcaster found for layer specification" + layer); + throw std::runtime_error("No broadcaster found for layer specification \"" + layer + "\""); } std::string msg{"Multiple layers match specification " + layer + ":\n"}; diff --git a/test/repeater/repeater_node.hpp b/test/repeater/repeater_node.hpp index f71c35e3..840d2af3 100644 --- a/test/repeater/repeater_node.hpp +++ b/test/repeater/repeater_node.hpp @@ -10,6 +10,8 @@ #include "spdlog/spdlog.h" #include +#include +#include #include namespace phlex::test { From 6c3bf0f081a152977834737e0035b6b04b80fb62 Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Tue, 3 Feb 2026 13:18:45 -0600 Subject: [PATCH 8/9] Correctly initialized std::atomic_flag per @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- test/repeater/repeater_node.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/repeater/repeater_node.hpp b/test/repeater/repeater_node.hpp index 840d2af3..6091e060 100644 --- a/test/repeater/repeater_node.hpp +++ b/test/repeater/repeater_node.hpp @@ -30,7 +30,7 @@ namespace phlex::test { std::shared_ptr> msg_with_product; tbb::concurrent_queue msg_ids{}; std::atomic counter; - std::atomic_flag flush_received{false}; + std::atomic_flag flush_received{}; }; using cached_products_t = tbb::concurrent_hash_map; From e62df3f79d69a5664764179c5d2d174c1c945c9d Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Tue, 3 Feb 2026 13:19:51 -0600 Subject: [PATCH 9/9] Remove unnecessary spdlog header per @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- phlex/core/declared_fold.hpp | 1 - 1 file changed, 1 deletion(-) diff --git a/phlex/core/declared_fold.hpp b/phlex/core/declared_fold.hpp index cbc35ea8..4db4de8c 100644 --- a/phlex/core/declared_fold.hpp +++ b/phlex/core/declared_fold.hpp @@ -19,7 +19,6 @@ #include "oneapi/tbb/concurrent_unordered_map.h" #include "oneapi/tbb/flow_graph.h" -#include "spdlog/spdlog.h" #include #include