Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 52 additions & 41 deletions phlex/core/declared_fold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "oneapi/tbb/concurrent_unordered_map.h"
#include "oneapi/tbb/flow_graph.h"
#include "spdlog/spdlog.h"
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

declared_fold.hpp adds #include "spdlog/spdlog.h", but this header doesn’t appear to use spdlog. Removing the unused include would reduce compile-time coupling.

Suggested change
#include "spdlog/spdlog.h"

Copilot uses AI. Check for mistakes.

#include <atomic>
#include <cassert>
Expand All @@ -42,6 +43,7 @@ namespace phlex::experimental {

virtual tbb::flow::sender<message>& sender() = 0;
virtual tbb::flow::sender<message>& to_output() = 0;
virtual tbb::flow::receiver<message>& flush_port() = 0;
virtual product_specifications const& output() const = 0;
virtual std::size_t product_count() const = 0;
};
Expand Down Expand Up @@ -75,60 +77,68 @@ namespace phlex::experimental {
initializer_{std::move(initializer)},
output_{to_product_specifications(full_name(), std::move(output), make_type_ids<R>())},
partition_{std::move(partition)},
flush_receiver_{
g,
tbb::flow::unlimited,
[this](message const& msg) -> tbb::flow::continue_msg {
auto const& [store, original_message_id] = std::tie(msg.store, msg.original_id);
if (store->id()->layer_name() != partition_) {
return {};
}

counter_for(store->id()->hash()).set_flush_value(store, original_message_id);
emit_and_evict_if_done(store->id(), msg.eom);
return {};
}},
join_{make_join_or_none(g, std::make_index_sequence<N>{})},
fold_{g,
concurrency,
[this, ft = alg.release_algorithm()](messages_t<N> const& messages, auto& outputs) {
// N.B. The assumption is that a fold will *never* need to cache
// the product store it creates. Any flush messages *do not* need
// to be propagated to downstream nodes.
auto const& msg = most_derived(messages);
auto const& [store, original_message_id] = std::tie(msg.store, msg.original_id);

if (not store->is_flush() and not store->id()->parent(partition_)) {
return;
}

if (store->is_flush()) {
// Downstream nodes always get the flush.
get<0>(outputs).try_put(msg);
if (store->id()->layer_name() != partition_) {
return;
}
}

auto const& fold_index =
store->is_flush() ? store->id() : store->id()->parent(partition_);
assert(fold_index);
auto const& id_hash_for_counter = fold_index->hash();

if (store->is_flush()) {
counter_for(id_hash_for_counter).set_flush_value(store, original_message_id);
} else {
call(ft, messages, std::make_index_sequence<N>{});
counter_for(id_hash_for_counter).increment(store->id()->layer_hash());
}

if (auto counter = done_with(id_hash_for_counter)) {
auto parent = std::make_shared<product_store>(fold_index, this->full_name());
commit_(*parent);
++product_count_;
// FIXME: This msg.eom value may be wrong!
get<0>(outputs).try_put({parent, msg.eom, counter->original_message_id()});
}
}}
fold_{
g, concurrency, [this, ft = alg.release_algorithm()](messages_t<N> const& messages, auto&) {
// N.B. The assumption is that a fold will *never* need to cache
// the product store it creates. Any flush messages *do not* need
// to be propagated to downstream nodes.
auto const& msg = most_derived(messages);
auto const& [store, eom] = std::tie(msg.store, msg.eom);

assert(not store->is_flush());

if (not store->id()->parent(partition_)) {
return;
}

auto const& fold_index = store->id()->parent(partition_);
assert(fold_index);
auto const& id_hash_for_counter = fold_index->hash();

call(ft, messages, std::make_index_sequence<N>{});
counter_for(id_hash_for_counter).increment(store->id()->layer_hash());

emit_and_evict_if_done(fold_index, eom);
}}
{
make_edge(join_, fold_);
}

private:
void emit_and_evict_if_done(data_cell_index_ptr const& fold_index,
end_of_message_ptr const& eom)
{
if (auto counter = done_with(fold_index->hash())) {
auto parent = std::make_shared<product_store>(fold_index, this->full_name());
commit_(*parent);
++product_count_;
// FIXME: This msg.eom value may be wrong!
output_port<0>(fold_).try_put({parent, eom, counter->original_message_id()});
}
}

tbb::flow::receiver<message>& port_for(product_query const& product_label) override
{
return receiver_for<N>(join_, input(), product_label);
}

std::vector<tbb::flow::receiver<message>*> ports() override { return input_ports<N>(join_); }

tbb::flow::receiver<message>& flush_port() override { return flush_receiver_; }
tbb::flow::sender<message>& sender() override { return output_port<0ull>(fold_); }
tbb::flow::sender<message>& to_output() override { return sender(); }
product_specifications const& output() const override { return output_; }
Expand Down Expand Up @@ -178,6 +188,7 @@ namespace phlex::experimental {
input_retriever_types<input_parameter_types> input_{input_arguments<input_parameter_types>()};
product_specifications output_;
std::string partition_;
tbb::flow::function_node<message> flush_receiver_;
join_or_none_t<N> join_;
tbb::flow::multifunction_node<messages_t<N>, messages_t<1>> fold_;
tbb::concurrent_unordered_map<data_cell_index, std::unique_ptr<R>> results_;
Expand Down
21 changes: 18 additions & 3 deletions phlex/core/declared_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ namespace phlex::experimental {
product_queries input_products);
virtual ~declared_observer();

virtual tbb::flow::receiver<message>& flush_port() = 0;

protected:
using hashes_t = tbb::concurrent_hash_map<data_cell_index::hash_type, bool>;
using accessor = hashes_t::accessor;
Expand Down Expand Up @@ -66,16 +68,26 @@ namespace phlex::experimental {
AlgorithmBits alg,
product_queries input_products) :
declared_observer{std::move(name), std::move(predicates), std::move(input_products)},
flush_receiver_{g,
tbb::flow::unlimited,
[this](message const& msg) -> tbb::flow::continue_msg {
receive_flush(msg);
if (done_with(msg.store)) {
cached_hashes_.erase(msg.store->id()->hash());
}
return {};
}},
join_{make_join_or_none(g, std::make_index_sequence<N>{})},
observer_{g,
concurrency,
[this, ft = alg.release_algorithm()](
messages_t<N> const& messages) -> oneapi::tbb::flow::continue_msg {
auto const& msg = most_derived(messages);
auto const& [store, message_id] = std::tie(msg.store, msg.id);
if (store->is_flush()) {
flag_for(store->id()->hash()).flush_received(message_id);
} else if (accessor a; needs_new(store, a)) {

assert(not store->is_flush());

if (accessor a; needs_new(store, a)) {
call(ft, messages, std::make_index_sequence<N>{});
a->second = true;
flag_for(store->id()->hash()).mark_as_processed();
Expand All @@ -100,6 +112,8 @@ namespace phlex::experimental {

std::vector<tbb::flow::receiver<message>*> ports() override { return input_ports<N>(join_); }

tbb::flow::receiver<message>& flush_port() override { return flush_receiver_; }

bool needs_new(product_store_const_ptr const& store, accessor& a)
{
if (cached_hashes_.count(store->id()->hash()) > 0ull) {
Expand All @@ -118,6 +132,7 @@ namespace phlex::experimental {
std::size_t num_calls() const final { return calls_.load(); }

input_retriever_types<InputArgs> input_{input_arguments<InputArgs>()};
tbb::flow::function_node<message> flush_receiver_;
join_or_none_t<N> join_;
tbb::flow::function_node<messages_t<N>> observer_;
hashes_t cached_hashes_;
Expand Down
19 changes: 16 additions & 3 deletions phlex/core/declared_predicate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ namespace phlex::experimental {
product_queries input_products);
virtual ~declared_predicate();

virtual tbb::flow::receiver<message>& flush_port() = 0;
virtual tbb::flow::sender<predicate_result>& sender() = 0;

protected:
Expand Down Expand Up @@ -72,17 +73,27 @@ namespace phlex::experimental {
AlgorithmBits alg,
product_queries input_products) :
declared_predicate{std::move(name), std::move(predicates), std::move(input_products)},
flush_receiver_{g,
tbb::flow::unlimited,
[this](message const& msg) -> tbb::flow::continue_msg {
receive_flush(msg);
if (done_with(msg.store)) {
results_.erase(msg.store->id()->hash());
}
return {};
}},
join_{make_join_or_none(g, std::make_index_sequence<N>{})},
predicate_{
g,
concurrency,
[this, ft = alg.release_algorithm()](messages_t<N> const& messages) -> predicate_result {
auto const& msg = most_derived(messages);
auto const& [store, message_id] = std::tie(msg.store, msg.id);

assert(not store->is_flush());

predicate_result result{};
if (store->is_flush()) {
flag_for(store->id()->hash()).flush_received(message_id);
} else if (const_accessor a; results_.find(a, store->id()->hash())) {
if (const_accessor a; results_.find(a, store->id()->hash())) {
result = {msg.eom, message_id, a->second.result};
} else if (accessor a; results_.insert(a, store->id()->hash())) {
bool const rc = call(ft, messages, std::make_index_sequence<N>{});
Expand All @@ -109,6 +120,7 @@ namespace phlex::experimental {

std::vector<tbb::flow::receiver<message>*> ports() override { return input_ports<N>(join_); }

tbb::flow::receiver<message>& flush_port() override { return flush_receiver_; }
tbb::flow::sender<predicate_result>& sender() override { return predicate_; }

template <std::size_t... Is>
Expand All @@ -121,6 +133,7 @@ namespace phlex::experimental {
std::size_t num_calls() const final { return calls_.load(); }

input_retriever_types<InputArgs> input_{input_arguments<InputArgs>()};
tbb::flow::function_node<message> flush_receiver_;
join_or_none_t<N> join_;
tbb::flow::function_node<messages_t<N>, predicate_result> predicate_;
results_t results_;
Expand Down
65 changes: 37 additions & 28 deletions phlex/core/declared_provider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ namespace phlex::experimental {
product_query const& output_product() const noexcept;

virtual tbb::flow::receiver<message>* input_port() = 0;
virtual tbb::flow::receiver<message>& flush_port() = 0;
virtual tbb::flow::sender<message>& sender() = 0;
virtual std::size_t num_calls() const = 0;

Expand Down Expand Up @@ -68,42 +69,48 @@ namespace phlex::experimental {
product_query output) :
declared_provider{std::move(name), output},
output_{output.spec()},
flush_receiver_{g,
tbb::flow::unlimited,
[this](message const& msg) -> tbb::flow::continue_msg {
receive_flush(msg);
if (done_with(msg.store)) {
cache_.erase(msg.store->id()->hash());
}
return {};
}},
provider_{
g, concurrency, [this, ft = alg.release_algorithm()](message const& msg, auto& output) {
auto& [stay_in_graph, to_output] = output;

if (msg.store->is_flush()) {
flag_for(msg.store->id()->hash()).flush_received(msg.original_id);
stay_in_graph.try_put(msg);
} else {
// Check cache first
auto index_hash = msg.store->id()->hash();
if (const_accessor ca; cache_.find(ca, index_hash)) {
// Cache hit - reuse the cached store
message const new_msg{ca->second, msg.eom, msg.id};
stay_in_graph.try_put(new_msg);
to_output.try_put(new_msg);
return;
}

// Cache miss - compute the result
auto result = std::invoke(ft, *msg.store->id());
++calls_;

products new_products;
new_products.add(output_.name(), std::move(result));
auto store = std::make_shared<product_store>(
msg.store->id(), this->full_name(), std::move(new_products));

// Store in cache
cache_.emplace(index_hash, store);

message const new_msg{store, msg.eom, msg.id};
assert(not msg.store->is_flush());

// Check cache first
auto index_hash = msg.store->id()->hash();
if (const_accessor ca; cache_.find(ca, index_hash)) {
// Cache hit - reuse the cached store
message const new_msg{ca->second, msg.eom, msg.id};
stay_in_graph.try_put(new_msg);
to_output.try_put(new_msg);
flag_for(msg.store->id()->hash()).mark_as_processed();
return;
}

// Cache miss - compute the result
auto result = std::invoke(ft, *msg.store->id());
++calls_;

products new_products;
new_products.add(output_.name(), std::move(result));
auto store = std::make_shared<product_store>(
msg.store->id(), this->full_name(), std::move(new_products));

// Store in cache
cache_.emplace(index_hash, store);

message const new_msg{store, msg.eom, msg.id};
stay_in_graph.try_put(new_msg);
to_output.try_put(new_msg);
flag_for(msg.store->id()->hash()).mark_as_processed();

if (done_with(msg.store)) {
cache_.erase(msg.store->id()->hash());
}
Expand All @@ -117,11 +124,13 @@ namespace phlex::experimental {

private:
tbb::flow::receiver<message>* input_port() override { return &provider_; }
tbb::flow::receiver<message>& flush_port() override { return flush_receiver_; }
tbb::flow::sender<message>& sender() override { return output_port<0>(provider_); }

std::size_t num_calls() const final { return calls_.load(); }

product_specification output_;
tbb::flow::function_node<message> flush_receiver_;
tbb::flow::multifunction_node<message, messages_t<2u>> provider_;
std::atomic<std::size_t> calls_;
stores_t cache_;
Expand Down
21 changes: 16 additions & 5 deletions phlex/core/declared_transform.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ namespace phlex::experimental {

virtual tbb::flow::sender<message>& sender() = 0;
virtual tbb::flow::sender<message>& to_output() = 0;
virtual tbb::flow::receiver<message>& flush_port() = 0;
virtual product_specifications const& output() const = 0;
virtual std::size_t product_count() const = 0;

Expand Down Expand Up @@ -84,6 +85,15 @@ namespace phlex::experimental {
declared_transform{std::move(name), std::move(predicates), std::move(input_products)},
output_{to_product_specifications(
full_name(), std::move(output), make_output_type_ids<function_t>())},
flush_receiver_{g,
tbb::flow::unlimited,
[this](message const& msg) -> tbb::flow::continue_msg {
receive_flush(msg);
if (done_with(msg.store)) {
stores_.erase(msg.store->id()->hash());
}
return {};
}},
join_{make_join_or_none(g, std::make_index_sequence<N>{})},
transform_{g,
concurrency,
Expand All @@ -92,11 +102,10 @@ namespace phlex::experimental {
auto const& [store, message_eom, message_id] =
std::tie(msg.store, msg.eom, msg.id);
auto& [stay_in_graph, to_output] = output;
if (store->is_flush()) {
flag_for(store->id()->hash()).flush_received(msg.original_id);
stay_in_graph.try_put(msg);
to_output.try_put(msg);
} else {

assert(not store->is_flush());

{
accessor a;
if (stores_.insert(a, store->id()->hash())) {
auto result = call(ft, messages, std::make_index_sequence<N>{});
Expand Down Expand Up @@ -134,6 +143,7 @@ namespace phlex::experimental {

std::vector<tbb::flow::receiver<message>*> ports() override { return input_ports<N>(join_); }

tbb::flow::receiver<message>& flush_port() override { return flush_receiver_; }
tbb::flow::sender<message>& sender() override { return output_port<0>(transform_); }
tbb::flow::sender<message>& to_output() override { return output_port<1>(transform_); }
product_specifications const& output() const override { return output_; }
Expand All @@ -156,6 +166,7 @@ namespace phlex::experimental {

input_retriever_types<input_parameter_types> input_{input_arguments<input_parameter_types>()};
product_specifications output_;
tbb::flow::function_node<message> flush_receiver_;
join_or_none_t<N> join_;
tbb::flow::multifunction_node<messages_t<N>, messages_t<2u>> transform_;
stores_t stores_;
Expand Down
Loading
Loading