Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions decent_bench/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,21 @@


class Agent:
"""Agent with unique id, local cost function, and activation scheme."""
"""Agent with unique id, local cost function, activation scheme and state snapshot period."""

def __init__(self, agent_id: int, cost: Cost, activation: AgentActivationScheme, state_snapshot_period: int):
if state_snapshot_period <= 0:
raise ValueError("state_snapshot_period must be a positive integer")

def __init__(self, agent_id: int, cost: Cost, activation: AgentActivationScheme):
self._id = agent_id
self._cost = cost
self._activation = activation
self._x_history: list[Array] = []
self._state_snapshot_period = state_snapshot_period
self._current_x: Array | None = None
self._x_history: dict[int, Array] = {}
self._auxiliary_variables: dict[str, Array] = {}
self._received_messages: dict[Agent, Array] = {}
self._n_x_updates = 0
self._n_sent_messages = 0
self._n_received_messages = 0
self._n_sent_messages_dropped = 0
Expand Down Expand Up @@ -55,24 +61,20 @@ def x(self) -> Array:
"""
Local optimization variable x.

Warning:
Do not use in-place operations (``+=``, ``-=``, ``*=``, etc.) on this property.
In-place operations will corrupt the optimization history by modifying all
historical values. Always use ``agent.x = agent.x + value`` instead of
``agent.x += value``. Does not affect the outcome of the optimization, but
will affect logging and metrics that depend on the optimization history.

Raises:
RuntimeError: if x is retrieved before being set or initialized

"""
if not self._x_history:
if self._current_x is None:
raise RuntimeError("x must be initialized before being accessed")
return self._x_history[-1]
return self._current_x

@x.setter
def x(self, x: Array) -> None:
self._x_history.append(x)
self._n_x_updates += 1
self._current_x = x
if self._n_x_updates % self._state_snapshot_period == 0:
self._x_history[self._n_x_updates] = iop.copy(x)

@property
def messages(self) -> Mapping[Agent, Array]:
Expand Down Expand Up @@ -106,7 +108,9 @@ def initialize(
if x is not None:
if iop.shape(x) != self.cost.shape:
raise ValueError(f"Initialized x has shape {iop.shape(x)}, expected {self.cost.shape}")
self._x_history = [iop.copy(x)]
self._x_history = {0: iop.copy(x)}
self._current_x = iop.copy(x)
self._n_x_updates = 0
if aux_vars:
self._auxiliary_variables = {k: iop.copy(v) for k, v in aux_vars.items()}
if received_msgs:
Expand Down Expand Up @@ -138,7 +142,8 @@ class AgentMetricsView:
"""Immutable view of agent that exposes useful properties for calculating metrics."""

cost: Cost
x_history: list[Array]
x_history: dict[int, Array]
n_x_updates: int
n_function_calls: int
n_gradient_calls: int
n_hessian_calls: int
Expand All @@ -150,9 +155,14 @@ class AgentMetricsView:
@staticmethod
def from_agent(agent: Agent) -> AgentMetricsView:
"""Create from agent."""
# Append the last x if not already recorded
if agent._current_x is not None and agent._n_x_updates not in agent._x_history: # noqa: SLF001
agent._x_history[agent._n_x_updates] = iop.copy(agent._current_x) # noqa: SLF001

return AgentMetricsView(
cost=agent.cost,
x_history=agent._x_history, # noqa: SLF001
n_x_updates=agent._n_x_updates, # noqa: SLF001
n_function_calls=agent._n_function_calls, # noqa: SLF001
n_gradient_calls=agent._n_gradient_calls, # noqa: SLF001
n_hessian_calls=agent._n_hessian_calls, # noqa: SLF001
Expand Down
41 changes: 36 additions & 5 deletions decent_bench/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@ def benchmark(
table_metrics: list[TableMetric] = DEFAULT_TABLE_METRICS,
table_fmt: Literal["grid", "latex"] = "grid",
*,
plot_grid: bool = True,
plot_path: str | None = None,
computational_cost: pm.ComputationalCost | None = None,
x_axis_scaling: float = 1e-4,
n_trials: int = 30,
confidence_level: float = 0.95,
log_level: int = logging.INFO,
max_processes: int | None = None,
progress_step: int | None = None,
show_speed: bool = False,
show_trial: bool = False,
compare_iterations_and_computational_cost: bool = False,
) -> None:
"""
Benchmark distributed algorithms.
Expand All @@ -51,6 +56,13 @@ def benchmark(
table_metrics: metrics to tabulate as confidence intervals after the execution, defaults to
:const:`~decent_bench.metrics.table_metrics.DEFAULT_TABLE_METRICS`
table_fmt: table format, grid is suitable for the terminal while latex can be copy-pasted into a latex document
plot_grid: whether to show grid lines on the plots
plot_path: optional file path to save the generated plot as an image file (e.g., "plots.png"). If ``None``,
the plot will only be displayed
computational_cost: computational cost settings for plot metrics, if ``None`` x-axis will be iterations instead
of computational cost
x_axis_scaling: scaling factor for computational cost x-axis, used to convert the cost units into more
manageable units for plotting. Only used if ``computational_cost`` is provided.
n_trials: number of times to run each algorithm on the benchmark problem, running more trials improves the
statistical results, at least 30 trials are recommended for the central limit theorem to apply
confidence_level: confidence level of the confidence intervals
Expand All @@ -64,15 +76,27 @@ def benchmark(
If `None`, the progress bar uses 1 unit per trial.
show_speed: whether to show speed (iterations/second) in the progress bar.
show_trial: whether to show which trials are currently running in the progress bar.
compare_iterations_and_computational_cost: whether to plot both metric vs computational cost and
metric vs iterations. Only used if ``computational_cost`` is provided.

Note:
If ``progress_step`` is too small performance may degrade due to the
overhead of updating the progress bar too often.

Computational cost can be interpreted as the cost of running the algorithm on a specific hardware setup.
Therefore the computational cost could be seen as the number of operations performed (similar to FLOPS) but
weighted by the time or energy it takes to perform them on the specific hardware.

.. include:: snippets/computational_cost.rst

If ``computational_cost`` is provided and ``compare_iterations_and_computational_cost`` is ``True``, each metric
will be plotted twice: once against computational cost and once against iterations.
Computational cost plots will be shown on the left and iteration plots on the right.

"""
manager = Manager()
log_listener = logger.start_log_listener(manager, log_level)
LOGGER.info("Starting benchmark execution, progress bar increments with each completed trial ")
LOGGER.info("Starting benchmark execution ")
with Status("Generating initial network state"):
nw_init_state = create_distributed_network(benchmark_problem)
LOGGER.debug(f"Nr of agents: {len(nw_init_state.agents())}")
Expand All @@ -82,10 +106,17 @@ def benchmark(
resulting_agent_states: dict[Algorithm, list[list[AgentMetricsView]]] = {}
for alg, networks in resulting_nw_states.items():
resulting_agent_states[alg] = [[AgentMetricsView.from_agent(a) for a in nw.agents()] for nw in networks]
with Status("Creating table"):
tm.tabulate(resulting_agent_states, benchmark_problem, table_metrics, confidence_level, table_fmt)
with Status("Creating plot"):
pm.plot(resulting_agent_states, benchmark_problem, plot_metrics)
tm.tabulate(resulting_agent_states, benchmark_problem, table_metrics, confidence_level, table_fmt)
pm.plot(
resulting_agent_states,
benchmark_problem,
plot_metrics,
computational_cost,
x_axis_scaling,
compare_iterations_and_computational_cost,
plot_path,
plot_grid,
)
LOGGER.info("Benchmark execution complete, thanks for using decent-bench")
log_listener.stop()

Expand Down
5 changes: 5 additions & 0 deletions decent_bench/benchmark_problem.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class BenchmarkProblem:
network_structure: graph defining how agents are connected
x_optimal: solution that minimizes the sum of the cost functions, used for calculating metrics
costs: local cost functions, each one is given to one agent
agent_state_snapshot_period: period for recording agent state snapshots, used for plot metrics
agent_activations: setting for agent activation/participation, each scheme is applied to one agent
message_compression: message compression setting
message_noise: message noise setting
Expand All @@ -51,6 +52,7 @@ class BenchmarkProblem:
network_structure: AnyGraph
x_optimal: Array
costs: Sequence[Cost]
agent_state_snapshot_period: int
agent_activations: Sequence[AgentActivationScheme]
message_compression: CompressionScheme
message_noise: NoiseScheme
Expand All @@ -61,6 +63,7 @@ def create_regression_problem(
cost_cls: type[LinearRegressionCost | LogisticRegressionCost],
*,
n_agents: int = 100,
agent_state_snapshot_period: int = 1,
n_neighbors_per_agent: int = 3,
asynchrony: bool = False,
compression: bool = False,
Expand All @@ -73,6 +76,7 @@ def create_regression_problem(
Args:
cost_cls: type of cost function
n_agents: number of agents
agent_state_snapshot_period: period for recording agent state snapshots, used for plot metrics
n_neighbors_per_agent: number of neighbors per agent
asynchrony: if true, agents only have a 50% probability of being active/participating at any given time
compression: if true, messages are rounded to 4 significant digits
Expand Down Expand Up @@ -100,6 +104,7 @@ def create_regression_problem(
return BenchmarkProblem(
network_structure=network_structure,
costs=costs,
agent_state_snapshot_period=agent_state_snapshot_period,
x_optimal=x_optimal,
agent_activations=agent_activations,
message_compression=message_compression,
Expand Down
48 changes: 45 additions & 3 deletions decent_bench/metrics/metric_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,32 @@
from numpy import linalg as la
from numpy.linalg import LinAlgError
from numpy.typing import NDArray
from rich.progress import BarColumn, Progress, TaskProgressColumn, TextColumn, TimeRemainingColumn

import decent_bench.utils.interoperability as iop
from decent_bench.agents import AgentMetricsView
from decent_bench.benchmark_problem import BenchmarkProblem
from decent_bench.utils.array import Array


class MetricProgressBar(Progress):
"""
Progress bar for metric calculations.

Make sure to set the field *status* in the task to show custom status messages.

"""

def __init__(self) -> None:
super().__init__(
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
TimeRemainingColumn(elapsed_when_finished=True),
TextColumn("{task.fields[status]}"),
)


def single(values: Sequence[float]) -> float:
"""
Assert that *values* contain exactly one element and return it.
Expand All @@ -37,7 +56,11 @@ def x_mean(agents: tuple[AgentMetricsView, ...], iteration: int = -1) -> Array:
ValueError: if no agent reached *iteration*

"""
all_x_at_iter = [a.x_history[iteration] for a in agents if len(a.x_history) > iteration]
if iteration == -1:
all_x_at_iter = [a.x_history[max(a.x_history)] for a in agents if len(a.x_history) > 0]
else:
all_x_at_iter = [a.x_history[iteration] for a in agents if iteration in a.x_history]

if len(all_x_at_iter) == 0:
raise ValueError(f"No agent reached iteration {iteration}")

Expand All @@ -56,7 +79,7 @@ def regret(agents: list[AgentMetricsView], problem: BenchmarkProblem, iteration:
mean_x = x_mean(tuple(agents), iteration)
optimal_cost = sum(a.cost.function(x_opt) for a in agents)
actual_cost = sum(a.cost.function(mean_x) for a in agents)
return abs(optimal_cost - actual_cost)
return actual_cost - optimal_cost


def gradient_norm(agents: list[AgentMetricsView], iteration: int = -1) -> float:
Expand All @@ -83,7 +106,7 @@ def x_error(agent: AgentMetricsView, problem: BenchmarkProblem) -> NDArray[float
where :math:`\mathbf{x}_k` is the agent's local x at iteration k,
and :math:`\mathbf{x}^\star` is the optimal x defined in the *problem*.
"""
x_per_iteration = np.asarray([iop.to_numpy(x) for x in agent.x_history])
x_per_iteration = np.asarray([iop.to_numpy(x) for _, x in sorted(agent.x_history.items())])
opt_x = problem.x_optimal
errors: NDArray[float64] = la.norm(x_per_iteration - opt_x, axis=tuple(range(1, x_per_iteration.ndim)))
return errors
Expand Down Expand Up @@ -131,3 +154,22 @@ def iterative_convergence_rate_and_order(agent: AgentMetricsView, problem: Bench
except LinAlgError:
rate, order = np.nan, np.nan
return rate, order


def common_sorted_iterations(agents: Sequence[AgentMetricsView]) -> list[int]:
"""
Get a sorted list of all common iterations reached by agents in *agents*.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would maybe add a bit more context, like: "since the agents can sample periodically, and potentially at different times, this function can be used to find the numbers of iterations where all agents have recorded their states, which can then be used to compute the metrics"


Since the agents can sample their states periodically, and may sample at different iterations,
this function returns only the iterations that are common to all agents. These iterations can then be used
to compute metrics that require synchronized iterations.

Args:
agents: sequence of agents to get the common iterations from

Returns:
sorted list of iterations reached by all agents

"""
common_iters = set.intersection(*(set(a.x_history.keys()) for a in agents)) if agents else set()
return sorted(common_iters)
Loading