Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakePresets.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"hidden": false,
"cacheVariables": {
"CMAKE_EXPORT_COMPILE_COMMANDS": "YES",
"CMAKE_CXX_STANDARD": "20",
"CMAKE_CXX_STANDARD": "23",
"CMAKE_CXX_STANDARD_REQUIRED": "YES",
"CMAKE_CXX_EXTENSIONS": "NO",
"CMAKE_PROJECT_TOP_LEVEL_INCLUDES": "CetProvideDependency"
Expand Down
59 changes: 56 additions & 3 deletions phlex/configuration.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,55 @@
#include "phlex/configuration.hpp"
#include "phlex/core/product_query.hpp"

#include <ranges>
#include <algorithm>
#include <string>
#include <string_view>

namespace {
std::optional<std::string> value_if_exists(boost::json::object const& obj,
std::string_view parameter)
{
if (!obj.contains(parameter)) {
return std::nullopt;
}
auto const& val = obj.at(parameter);
if (!val.is_string()) {
std::string kind;
switch (val.kind()) {
case boost::json::kind::null:
// seems reasonable to interpret this as if the value were not provided
return std::nullopt;
break;
case boost::json::kind::bool_:
kind = "bool"s;
break;
case boost::json::kind::int64:
kind = "std::int64_t"s;
break;
case boost::json::kind::uint64:
kind = "std::uint64_t"s;
break;
case boost::json::kind::double_:
kind = "double"s;
break;
case boost::json::kind::array:
kind = "array"s;
break;
case boost::json::kind::object:
kind = "object"s;
break;
default:
kind = "HOW??"s;
break;
}
throw std::runtime_error(
fmt::format("Error retrieving parameter '{}'. Should be a string but is instead a {}",
parameter,
kind));
}
return std::string(val.get_string());
}
}

namespace phlex {
std::vector<std::string> configuration::keys() const
Expand All @@ -25,7 +72,13 @@ namespace phlex {
{
using detail::value_decorate_exception;
auto query_object = jv.as_object();
return product_query{{value_decorate_exception<std::string>(query_object, "product")},
value_decorate_exception<std::string>(query_object, "layer")};
auto creator = value_decorate_exception<std::string>(query_object, "creator");
auto layer = value_decorate_exception<std::string>(query_object, "layer");
auto suffix = value_if_exists(query_object, "suffix");
auto stage = value_if_exists(query_object, "stage");
return product_tag{.creator = std::move(creator),
.layer = std::move(layer),
.suffix = std::move(suffix),
.stage = std::move(stage)};
}
}
8 changes: 2 additions & 6 deletions phlex/core/detail/filter_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@
#include <string>

namespace {
phlex::product_query const output_dummy{
phlex::experimental::product_specification{
phlex::experimental::algorithm_name{"for_output_only", ""},
"for_output_only",
phlex::experimental::type_id{}},
"dummy_layer"};
phlex::product_query const output_dummy = phlex::product_tag{
.creator = "for_output_only"s, .layer = "dummy_layer"s, .suffix = "for_output_only"s};
phlex::product_queries const for_output_only{output_dummy};
}

Expand Down
93 changes: 50 additions & 43 deletions phlex/core/edge_creation_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,64 +3,71 @@
#include "fmt/format.h"
#include "fmt/ranges.h"
#include "spdlog/spdlog.h"
#include <algorithm>
#include <ranges>
#include <set>

namespace phlex::experimental {
edge_creation_policy::named_output_port const* edge_creation_policy::find_producer(
product_query const& query) const
{
auto const& spec = query.spec();
auto [b, e] = producers_.equal_range(spec.name());
if (b == e) {
spdlog::debug(
"Failed to find an algorithm that creates {} products. Assuming it comes from a provider",
spec.name());
auto [creator_b, creator_e] = creator_db_.equal_range(query.creator_hash());
if (creator_b == creator_e) {
spdlog::debug("Found no matching creators for {}. Assuming it comes from a provider.",
query.to_string());
return nullptr;
}
std::map<std::string, named_output_port const*> candidates;
for (auto const& [key, producer] : std::ranges::subrange{b, e}) {
if (producer.node.match(spec.qualifier())) {
if (spec.type() != producer.type) {
spdlog::debug("Matched {} ({}) from {} but types don't match (`{}` vs `{}`). Excluding "
"from candidate list.",
spec.full(),
query.to_string(),
producer.node.full(),
spec.type(),
producer.type);
} else {
if (spec.type().exact_compare(producer.type)) {
spdlog::debug("Matched {} ({}) from {} and types match. Keeping in candidate list.",
spec.full(),
query.to_string(),
producer.node.full());
} else {
spdlog::warn("Matched {} ({}) from {} and types match, but not exactly (produce {} and "
"consume {}). Keeping in candidate list!",
spec.full(),
query.to_string(),
producer.node.full(),
spec.type().exact_name(),
producer.type.exact_name());
}
candidates.emplace(producer.node.full(), &producer);
}
}
std::set matching_creator = std::ranges::subrange(creator_b, creator_e) | std::views::values |
std::ranges::to<std::set>();

auto [type_b, type_e] = type_db_.equal_range(query.type_hash());
if (type_b == type_e) {
throw std::runtime_error(fmt::format(
"Found no matching types for {} (require {})", query.to_string(), query.type()));
}
std::set matching_type =
std::ranges::subrange(type_b, type_e) | std::views::values | std::ranges::to<std::set>();

if (candidates.empty()) {
throw std::runtime_error("Cannot identify product matching the specified label " +
spec.full());
std::set<named_output_port const*> creator_and_type{};
std::ranges::set_intersection(
matching_creator, matching_type, std::inserter(creator_and_type, creator_and_type.begin()));
if (creator_and_type.empty()) {
throw std::runtime_error(
fmt::format("Found no creator + type match for {} (required type {})",
query.to_string(),
query.type()));
}

if (candidates.size() > 1ull) {
std::set<named_output_port const*> all_matched;
if (query.suffix()) {
for (auto const* port : creator_and_type) {
if (port->suffix == query.suffix()) {
all_matched.insert(port);
}
}
if (all_matched.empty()) {
throw std::runtime_error(
fmt::format("Of {} creator + type matches for {}, found 0 suffix matches",
creator_and_type.size(),
query.to_string()));
}
} else {
all_matched = std::move(creator_and_type);
}
if (all_matched.size() > 1) {
std::string msg =
fmt::format("More than one candidate matches the specification {}: \n - {}\n",
spec.full(),
fmt::join(std::views::keys(candidates), "\n - "));
fmt::format("Found {} duplicate matches for {}", all_matched.size(), query.to_string());
throw std::runtime_error(msg);
}

return candidates.begin()->second;
return *all_matched.begin();
}

std::uint64_t edge_creation_policy::hash_string(std::string const& str)
{
using namespace boost::hash2;
xxhash_64 h;
hash_append(h, {}, str);
return h.result();
}
}
46 changes: 30 additions & 16 deletions phlex/core/edge_creation_policy.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef PHLEX_CORE_EDGE_CREATION_POLICY_HPP
#define PHLEX_CORE_EDGE_CREATION_POLICY_HPP

#include "phlex/core/concepts.hpp"
#include "phlex/core/message.hpp"
#include "phlex/model/product_specification.hpp"
#include "phlex/model/type_id.hpp"
Expand All @@ -9,6 +10,7 @@

#include <map>
#include <ranges>
#include <spdlog/spdlog.h>
#include <string>

namespace phlex::experimental {
Expand All @@ -20,45 +22,57 @@ namespace phlex::experimental {
edge_creation_policy(Args&... producers);

struct named_output_port {
algorithm_name node;
std::string suffix;
algorithm_name creator;
tbb::flow::sender<message>* port;
tbb::flow::sender<message>* to_output;
type_id type;
};

named_output_port const* find_producer(product_query const& query) const;
auto values() const { return producers_ | std::views::values; }
auto values() const { return producers_; }

private:
template <typename T>
static std::multimap<product_name_t, named_output_port> producing_nodes(T& nodes);
// Store a stack of all named_output_ports
std::forward_list<named_output_port> producers_;

// And maps indexing by (hash of) each field
std::multimap<std::uint64_t, named_output_port const*> creator_db_;
std::multimap<std::uint64_t, named_output_port const*> suffix_db_;
std::multimap<std::uint64_t, named_output_port const*> type_db_;

std::multimap<product_name_t, named_output_port> producers_;
// Utility to add producers
template <typename T>
void add_nodes(T& nodes);
static std::uint64_t hash_string(std::string const& str);
};

// =============================================================================
// Implementation

template <typename T>
std::multimap<product_name_t, edge_creation_policy::named_output_port>
edge_creation_policy::producing_nodes(T& nodes)
void edge_creation_policy::add_nodes(T& nodes)
{
std::multimap<product_name_t, named_output_port> result;
for (auto const& [node_name, node] : nodes) {
for (auto const& product_name : node->output()) {
if (empty(product_name.name()))
continue;
result.emplace(
product_name.name(),
named_output_port{node_name, &node->sender(), &node->to_output(), product_name.type()});
for (auto const& spec : node->output()) {
spdlog::debug("Adding product {} with type {}", spec.full(), spec.type());
auto& port = producers_.emplace_front(
spec.name(), spec.qualifier(), &node->sender(), &node->to_output(), spec.type());

// creator_db_ contains entries for plugin name and algorithm name
creator_db_.emplace(hash_string(spec.plugin()), &port);
creator_db_.emplace(hash_string(spec.algorithm()), &port);

suffix_db_.emplace(hash_string(spec.name()), &port);
type_db_.emplace(spec.type().hash(), &port);
}
}
return result;
}

template <typename... Args>
edge_creation_policy::edge_creation_policy(Args&... producers)
{
(producers_.merge(producing_nodes(producers)), ...);
(add_nodes(producers), ...);
}
}

Expand Down
32 changes: 11 additions & 21 deletions phlex/core/input_arguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,22 @@

#include "fmt/format.h"

#include <algorithm>
#include <set>
#include <stdexcept>
#include <vector>

namespace phlex::experimental::detail {
void verify_no_duplicate_input_products(std::string const& algorithm_name,
product_queries to_sort)
product_queries input_queries)
{
std::sort(begin(to_sort), end(to_sort));
std::set unique_and_sorted(begin(to_sort), end(to_sort));
product_queries duplicates;
std::set_difference(begin(to_sort),
end(to_sort),
begin(unique_and_sorted),
end(unique_and_sorted),
std::back_inserter(duplicates));
if (empty(duplicates)) {
return;
for (std::size_t i = 0; i < input_queries.size(); ++i) {
for (std::size_t j = i + 1; j < input_queries.size(); ++j) {
if (input_queries[i].match(input_queries[j]) || input_queries[j].match(input_queries[i])) {
throw std::runtime_error(
fmt::format("The algorithm {} has at least one duplicate input:\n- {}\n- {}\n",
algorithm_name,
input_queries[i].to_string(),
input_queries[j].to_string()));
}
}
}

std::string error =
fmt::format("Algorithm '{}' uses the following products more than once:\n", algorithm_name);
for (auto const& label : duplicates) {
error += fmt::format(" - '{}'\n", label.to_string());
}
throw std::runtime_error(error);
}
}
Loading
Loading