Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions src/vt-lb/algo/driver/driver.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,40 @@ void runLB(DriverAlgoEnum algo, CommT& comm, ConfigT config, std::unique_ptr<mod
break;
case DriverAlgoEnum::TemperedLB:
{
// Run TemperedLB algorithm
auto lb = std::make_unique<algo::temperedlb::TemperedLB<CommT>>(comm, config);
lb->inputData(std::move(phase_data));
lb->run();
// Run TemperedLB algorithm
auto lb = std::make_unique<algo::temperedlb::TemperedLB<CommT>>(comm, config);
lb->inputData(std::move(phase_data));
lb->run();
}
break;
default:
throw std::runtime_error("Invalid load balancer algorithm");
}
}

template <typename CommT, typename ConfigT>
std::unordered_map<model::RankType, std::vector<model::TaskType>>
runLBAllGather(DriverAlgoEnum algo, CommT& comm, ConfigT config, std::unique_ptr<model::PhaseData> phase_data) {
switch (algo) {
case DriverAlgoEnum::None:
// No load balancing
return {};
break;
case DriverAlgoEnum::TemperedLB:
{
// Run TemperedLB algorithm
auto lb = std::make_unique<algo::temperedlb::TemperedLB<CommT>>(comm, config);
lb->inputData(std::move(phase_data));
auto local_tasks = lb->run();
return lb->getGlobalDistribution(local_tasks);
}
break;
default:
throw std::runtime_error("Invalid load balancer algorithm");
return {};
}
}

} /* end namespace vt_lb */

#endif /*INCLUDED_VT_LB_ALGO_DRIVER_DRIVER_IMPL_H*/
33 changes: 27 additions & 6 deletions src/vt-lb/algo/temperedlb/temperedlb.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,24 @@ struct TemperedLB final : baselb::BaseLB {
}
}

/**
* @brief Get the global distribution of tasks after load balancing
*
* @param local_tasks The set of tasks on the this rank
*/
std::unordered_map<model::RankType, std::vector<model::TaskType>>
getGlobalDistribution(std::unordered_set<model::TaskType> const& local_tasks) {
std::vector<model::TaskType> local_task_vec(
local_tasks.begin(), local_tasks.end()
);
auto all_task_vecs = handle_.template allgather<model::TaskType>(
local_task_vec.data(), static_cast<int>(local_task_vec.size())
);

return all_task_vecs;
}


Clusterer const* getClusterer() const { return clusterer_.get(); }

private:
Expand Down Expand Up @@ -402,12 +420,15 @@ struct TemperedLB final : baselb::BaseLB {
double global_min = 0.0;
double global_max = 0.0;
double global_sum = 0.0;
// For now, do P reductions since we don't have broadcast yet
for (int p = 0; p < comm_.numRanks(); ++p) {
handle_.reduce(p, MPI_DOUBLE, MPI_MIN, &local_value, &global_min, 1);
handle_.reduce(p, MPI_DOUBLE, MPI_MAX, &local_value, &global_max, 1);
handle_.reduce(p, MPI_DOUBLE, MPI_SUM, &local_value, &global_sum, 1);
}

handle_.reduce(0, MPI_DOUBLE, MPI_MIN, &local_value, &global_min, 1);
handle_.reduce(0, MPI_DOUBLE, MPI_MAX, &local_value, &global_max, 1);
handle_.reduce(0, MPI_DOUBLE, MPI_SUM, &local_value, &global_sum, 1);

handle_.broadcast(0, MPI_DOUBLE, &global_min, 1);
handle_.broadcast(0, MPI_DOUBLE, &global_max, 1);
handle_.broadcast(0, MPI_DOUBLE, &global_sum, 1);

double global_avg = global_sum / static_cast<double>(comm_.numRanks());
double I = 0;
if (global_avg > 0.0) {
Expand Down
10 changes: 7 additions & 3 deletions src/vt-lb/model/Communication.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,23 @@ namespace vt_lb::model {
struct Edge {
Edge() = default;
Edge(TaskType from, TaskType to, BytesType volume)
: from_(from), to_(to), volume_(volume)
: from_(from), to_(to), volume_(volume), num_messages_(1)
{}
// New ctor with explicit endpoint ranks
Edge(TaskType from, TaskType to, BytesType volume, RankType from_rank, RankType to_rank)
: from_(from), to_(to), volume_(volume), from_rank_(from_rank), to_rank_(to_rank)
: from_(from), to_(to), volume_(volume), from_rank_(from_rank), to_rank_(to_rank), num_messages_(1)
{}

TaskType getFrom() const { return from_; }
TaskType getTo() const { return to_; }
BytesType getVolume() const { return volume_; }
// New rank accessors
void setVolume(BytesType volume) { volume_ = volume; }
RankType getFromRank() const { return from_rank_; }
RankType getToRank() const { return to_rank_; }
void setFromRank(RankType rank) { from_rank_ = rank; }
void setToRank(RankType rank) { to_rank_ = rank; }
int getNumMessages() const { return num_messages_; }
void setNumMessages(int num) { num_messages_ = num; }

template <typename Serializer>
void serialize(Serializer& s) {
Expand All @@ -79,6 +81,7 @@ struct Edge {
s | volume_;
s | from_rank_;
s | to_rank_;
s | num_messages_;
}

private:
Expand All @@ -87,6 +90,7 @@ struct Edge {
BytesType volume_ = 0.0;
RankType from_rank_ = invalid_rank;
RankType to_rank_ = invalid_rank;
int num_messages_ = 0;
};

/**
Expand Down
40 changes: 40 additions & 0 deletions src/vt-lb/model/PhaseData.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,21 @@ struct PhaseData {
}
communications_.push_back(e);
}
void aggregateCommunication(Edge const& e) {
for (int i = 0; i < static_cast<int>(communications_.size()); ++i) {
auto& comm = communications_[i];
if (comm.getFrom() == e.getFrom() && comm.getTo() == e.getTo()) {
comm.setVolume(comm.getVolume() + e.getVolume());
comm.setNumMessages(comm.getNumMessages() + e.getNumMessages());
task_messages_out_[e.getFrom()] += e.getNumMessages();
task_messages_in_[e.getTo()] += e.getNumMessages();
return;
}
}
communications_.push_back(e);
task_messages_out_[e.getFrom()] += e.getNumMessages();
task_messages_in_[e.getTo()] += e.getNumMessages();
}
void addSharedBlock(SharedBlock const& b) { shared_blocks_.emplace(b.getId(), b); }

RankType getRank() const { return rank_; }
Expand Down Expand Up @@ -98,6 +113,25 @@ struct PhaseData {
std::unordered_map<TaskType, Task> const& getTasksMap() const { return tasks_; }
std::vector<Edge> const& getCommunications() const { return communications_; }
std::vector<Edge>& getCommunicationsRef() { return communications_; }

std::size_t getCommunicationMessagesCount() const {
std::size_t count = 0;
for (const auto& comm : communications_) {
count += comm.getNumMessages();
}
return count;
}

std::size_t getTaskMessagesOut(TaskType id) const {
auto it = task_messages_out_.find(id);
return it != task_messages_out_.end() ? it->second : 0;
}

std::size_t getTaskMessagesIn(TaskType id) const {
auto it = task_messages_in_.find(id);
return it != task_messages_in_.end() ? it->second : 0;
}

std::unordered_map<SharedBlockType, SharedBlock> const& getSharedBlocksMap() const { return shared_blocks_; }
std::unordered_set<TaskType> getTaskIds() const {
std::unordered_set<TaskType> ids;
Expand Down Expand Up @@ -133,6 +167,8 @@ struct PhaseData {
tasks_.clear();
communications_.clear();
shared_blocks_.clear();
task_messages_out_.clear();
task_messages_in_.clear();
rank_footprint_bytes_ = 0.0;
rank_max_memory_available_ = 0.0;
}
Expand All @@ -157,6 +193,8 @@ struct PhaseData {
s | tasks_;
s | communications_;
s | shared_blocks_;
s | task_messages_out_;
s | task_messages_in_;
s | rank_footprint_bytes_;
s | rank_max_memory_available_;
}
Expand All @@ -170,6 +208,8 @@ struct PhaseData {
std::unordered_map<TaskType, Task> tasks_;
std::vector<Edge> communications_;
std::unordered_map<SharedBlockType, SharedBlock> shared_blocks_;
std::unordered_map<TaskType, std::size_t> task_messages_out_;
std::unordered_map<TaskType, std::size_t> task_messages_in_;
BytesType rank_footprint_bytes_ = 0.0;
BytesType rank_max_memory_available_ = 0.0;
};
Expand Down
10 changes: 5 additions & 5 deletions tests/unit/graph_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ void generateIntraRankComm(
}
}
double bytes = weight_per_edge_dist(gen);
pd.addCommunication(Edge{from, to, bytes, rank, rank});
pd.aggregateCommunication(Edge{from, to, bytes, rank, rank});
}
}

Expand Down Expand Up @@ -359,15 +359,15 @@ void generateInterRankComm(
while ((remote_rank = remote_rank_dist(gen)) == rank) {}
TaskType to = remote_task_dist(gen);
double bytes = weight_per_edge_dist(gen);
pd.addCommunication(Edge{from, to, bytes, rank, remote_rank});
pd.aggregateCommunication(Edge{from, to, bytes, rank, remote_rank});
}
for (std::size_t e = from_edge_count; e < local_endpoints.size(); ++e) {
TaskType to = local_endpoints[e];
int remote_rank = rank;
while ((remote_rank = remote_rank_dist(gen)) == rank) {}
TaskType from = remote_task_dist(gen);
double bytes = weight_per_edge_dist(gen);
pd.addCommunication(Edge{from, to, bytes, remote_rank, rank});
pd.aggregateCommunication(Edge{from, to, bytes, remote_rank, rank});
}
}

Expand Down Expand Up @@ -428,7 +428,7 @@ void generateRankComm(
while ((to = remote_task_dist(gen)) == from) {}
}
double bytes = weight_per_edge_dist(gen);
pd.addCommunication(Edge{from, to, bytes, rank, remote_rank});
pd.aggregateCommunication(Edge{from, to, bytes, rank, remote_rank});
}
for (std::size_t e = from_edge_count; e < local_endpoints.size(); ++e) {
TaskType to = local_endpoints[e];
Expand All @@ -441,7 +441,7 @@ void generateRankComm(
while ((from = remote_task_dist(gen)) == to) {}
}
double bytes = weight_per_edge_dist(gen);
pd.addCommunication(Edge{from, to, bytes, remote_rank, rank});
pd.aggregateCommunication(Edge{from, to, bytes, remote_rank, rank});
}
}

Expand Down
Loading
Loading