diff --git a/lib/bgp/message/update/attribute/origin.ex b/lib/bgp/message/update/attribute/origin.ex index a66bdcd..6383bb3 100644 --- a/lib/bgp/message/update/attribute/origin.ex +++ b/lib/bgp/message/update/attribute/origin.ex @@ -3,29 +3,25 @@ defmodule BGP.Message.UPDATE.Attribute.Origin do alias BGP.Message.{Encoder, NOTIFICATION} - @type origin :: :igp | :egp | :incomplete - @type t :: %__MODULE__{origin: origin()} + @type t :: %__MODULE__{value: non_neg_integer()} - @enforce_keys [:origin] - defstruct origin: nil + @enforce_keys [:value] + defstruct value: nil @behaviour Encoder @impl Encoder - def decode(data, session), do: {%__MODULE__{origin: decode_origin(data)}, session} + def decode(<>, session) when value in 0..2, do: {%__MODULE__{value: value}, session} - def decode_origin(<<0::8>>), do: :igp - def decode_origin(<<1::8>>), do: :egp - def decode_origin(<<2::8>>), do: :incomplete - - def decode_origin(_data) do + def decode(_data, _session) do raise NOTIFICATION, code: :update_message, subcode: :invalid_origin_attribute end @impl Encoder - def encode(%__MODULE__{origin: origin}, session), do: {encode_origin(origin), 1, session} + def encode(%__MODULE__{value: value}, session) when value in 0..2, + do: {<>, 1, session} - defp encode_origin(:igp), do: <<0::8>> - defp encode_origin(:egp), do: <<1::8>> - defp encode_origin(:incomplete), do: <<2::8>> + def encode(_data, _session) do + raise NOTIFICATION, code: :update_message, subcode: :invalid_origin_attribute + end end diff --git a/lib/bgp/server/rde.ex b/lib/bgp/server/rde.ex index 9ada298..207dac9 100644 --- a/lib/bgp/server/rde.ex +++ b/lib/bgp/server/rde.ex @@ -1,14 +1,32 @@ defmodule BGP.Server.RDE do - @moduledoc false + @moduledoc """ + BGP RDE + Implementation of BGP Route Decision Engine and + [BGP Decision Process](https://datatracker.ietf.org/doc/html/rfc4271#section-9.1). + + This is a simplified diagram of the state machine showing the most significant events + and state transitions: + + ```mermaid + stateDiagram-v2 + [*] --> Idle + Idle --> Processing : state_timeout + Processing --> Idle : Processing done + ``` + """ @behaviour :gen_statem - alias BGP.{Message.UPDATE, Server} + alias BGP.Message.UPDATE + alias BGP.Message.UPDATE.Attribute + alias BGP.Server + alias BGP.Server.RDE.RIB + alias BGP.Server.Session require Logger - @enforce_keys [:config] - defstruct [:config] + @enforce_keys [:config, :adj_ribs_in, :adj_ribs_out, :loc_rib, :update_queue] + defstruct [:config, :adj_ribs_in, :adj_ribs_out, :loc_rib, :update_queue] @spec start_link(Keyword.t()) :: GenServer.on_start() def start_link(args), @@ -21,24 +39,282 @@ defmodule BGP.Server.RDE do def child_spec(opts), do: %{id: make_ref(), start: {__MODULE__, :start_link, [opts]}} - @spec process_update(Server.t(), UPDATE.t()) :: :ok - def process_update(server, update), - do: GenServer.call(Server.rde_for(server), {:process_update, update}) + @spec queue_update(Server.t(), Session.data(), UPDATE.t()) :: :ok + def queue_update(server, session, update), + do: :gen_statem.call(Server.rde_for(server), {:queue_update, session, update}) @impl :gen_statem def callback_mode, do: [:handle_event_function, :state_enter] @impl :gen_statem - def init(args), do: {:ok, :ready, %__MODULE__{config: args}} + def init(args) do + { + :ok, + :idle, + %__MODULE__{ + config: args, + update_queue: :queue.new(), + adj_ribs_in: RIB.new(:adj_ribs_in), + adj_ribs_out: RIB.new(:adj_ribs_out), + loc_rib: RIB.new(:loc_rib) + }, + {:state_timeout, 10_000, nil} + } + end @impl :gen_statem - def handle_event(:enter, old_state, new_state, %__MODULE__{}) do - Logger.info("#{old_state} -> #{new_state}") + def handle_event(:enter, old_state, new_state, %__MODULE__{} = data) do + Logger.info("#{data.config[:server]}: #{old_state} -> #{new_state}") :keep_state_and_data end @impl :gen_statem - def handle_event({:call, from}, {:process_update, %UPDATE{}}, _state, _data) do - {:keep_state_and_data, {:reply, from, :ok}} + def handle_event( + {:call, from}, + {:queue_update, session, update}, + :idle, + %__MODULE__{} = data + ) do + { + :keep_state, + %__MODULE__{data | update_queue: :queue.in({session, update}, data.update_queue)}, + [{:reply, from, :ok}] + } + end + + @impl :gen_statem + def handle_event(:state_timeout, _event, :idle, %__MODULE__{} = data) do + Logger.info("#{data.config[:server]}: RDE idle timeout") + {:next_state, :processing, data, {:next_event, :internal, :degree_of_preference}} + end + + @impl :gen_statem + def handle_event(:internal, :degree_of_preference, :processing, %__MODULE__{} = data) do + if :queue.len(data.update_queue) > 0 do + {:keep_state, degree_of_preference(data), {:next_event, :internal, :route_selection}} + else + Logger.info("#{data.config[:server]}: skipping Decision process: No UPDATEs received ") + {:next_state, :idle, data, {:state_timeout, 10_000, nil}} + end + end + + def handle_event(:internal, :route_selection, :processing, %__MODULE__{} = data), + do: {:keep_state, route_selection(data), {:next_event, :internal, :route_dissemination}} + + def handle_event(:internal, :route_dissemination, :processing, data), + do: {:next_state, :idle, route_dissemination(data), {:state_timeout, 10_000, nil}} + + defp degree_of_preference(%__MODULE__{} = data) do + Logger.info("#{data.config[:server]}: entering Phase 1: Calculation of Degree of Preference") + + :queue.fold( + fn {session, update}, :ok -> + for prefix <- update.nlri do + with {:ok, preference} <- preference(session, update, prefix) do + RIB.upsert( + data.adj_ribs_in, + {{session.bgp_id, prefix}, preference, update.path_attributes} + ) + end + end + + for prefix <- update.withdrawn_routes do + RIB.delete(data.adj_ribs_in, {session.bgp_id, prefix}) + end + + :ok + end, + :ok, + data.update_queue + ) + + Logger.info("#{data.config[:server]}: exiting Phase 1: Calculation of Degree of Preference") + + %__MODULE__{data | update_queue: :queue.new()} + end + + defp preference(%Session{ibgp: true} = session, path_attributes, route) do + with {:ok, preference} <- pib_preference(session, path_attributes, route) do + { + :ok, + Enum.find_value(path_attributes, preference, fn + %Attribute{value: %Attribute.LocalPref{value: value}} -> value + _attribute -> nil + end) + } + end + end + + defp preference(%Session{ibgp: false} = session, path_attributes, route), + do: pib_preference(session, path_attributes, route) + + defp pib_preference(_session, _path_attributes, _route), do: {:ok, 0} + + defp route_selection(%__MODULE__{} = data) do + Logger.info("#{data.config[:server]}: entering Phase 2: Route Selection") + + entries = + RIB.reduce( + data.adj_ribs_in, + %{}, + fn {{_bgp_id, prefix}, _preference, path_attributes} = entry, routes -> + if prefix_feasible?(path_attributes) do + Map.update(routes, prefix, entry, &select_prefix(entry, &1)) + else + routes + end + end + ) + + for {prefix, {_key, _preference, path_attributes}} <- entries do + RIB.upsert( + data.loc_rib, + { + prefix, + Enum.find_value(path_attributes, fn + %Attribute{value: %Attribute.NextHop{value: value}} -> value + _attribute -> nil + end) + } + ) + end + + Logger.info("#{data.config[:server]}: exiting Phase 2: Route Selection") + + data + end + + defp prefix_feasible?(path_attributes) do + nexthop_reachable?(path_attributes) && not as_path_loops?(path_attributes) + end + + defp nexthop_reachable?(path_attributes) do + Enum.find_value(path_attributes, false, fn + %Attribute{value: %Attribute.NextHop{value: _value}} -> + true + + _attribute -> + false + end) + end + + defp as_path_loops?(path_attributes) do + Enum.find_value(path_attributes, false, fn + %Attribute{value: %Attribute.ASPath{value: {_type, _length, value}}} -> + Enum.find_value(value, false, fn _asn -> false end) + + _attribute -> + false + end) + end + + defp select_prefix(entry, current), do: highest_preference(entry, current) + + defp highest_preference( + {_key, preference, _path_attributes} = entry, + {_current_key, current_preference, _current_path_attributes} + ) + when preference > current_preference, + do: entry + + defp highest_preference(entry, current), do: tie_break(entry, current) + + defp tie_break(entry, current), do: lower_as_path_length(entry, current) + + defp lower_as_path_length( + {_key, _preference, path_attributes} = entry, + {_current_key, _current_preference, current_path_attributes} = current + ) do + as_path_length = as_path_length(path_attributes) + current_as_path_length = as_path_length(current_path_attributes) + + cond do + as_path_length < current_as_path_length -> entry + as_path_length > current_as_path_length -> current + as_path_length == current_as_path_length -> lowest_origin(entry, current) + end + end + + defp as_path_length(path_attributes) do + Enum.find_value(path_attributes, fn + %Attribute{value: %Attribute.ASPath{value: {_type, length, _value}}} -> length + _attribute -> nil + end) + end + + defp lowest_origin( + {_key, _preference, path_attributes} = entry, + {_current_key, _current_preference, current_path_attributes} = current + ) do + origin = origin(path_attributes) + current_origin = origin(current_path_attributes) + + cond do + origin < current_origin -> entry + origin > current_origin -> current + origin == current_origin -> highest_med(entry, current) + end + end + + defp origin(path_attributes) do + Enum.find_value(path_attributes, fn + %Attribute{value: %Attribute.Origin{value: value}} -> value + _attribute -> nil + end) + end + + defp highest_med( + {_key, _preference, path_attributes} = entry, + {_current_key, _current_preference, current_path_attributes} = current + ) do + med = med(path_attributes) + current_med = med(current_path_attributes) + + cond do + med > current_med -> entry + med < current_med -> current + med == current_med -> ebgp_over_ibgp(entry, current) + end + end + + defp med(path_attributes) do + Enum.find_value(path_attributes, 0, fn + %Attribute{value: %Attribute.MultiExitDisc{value: value}} -> value + _attribute -> nil + end) + end + + defp ebgp_over_ibgp(entry, current), do: lowest_igp_cost(entry, current) + + defp lowest_igp_cost(entry, current), do: lowest_bgp_id(entry, current) + + defp lowest_bgp_id( + {{bgp_id, _prefix}, _preference, _path_attributes} = entry, + {{current_bgp_id, _current_prefix}, _current_preference, _current_path_attributes} = + current + ) do + cond do + bgp_id < current_bgp_id -> entry + bgp_id > current_bgp_id -> current + bgp_id == current_bgp_id -> lowest_peer_ip_address(entry, current) + end + end + + defp lowest_peer_ip_address( + {{bgp_id, _prefix}, _preference, _path_attributes} = entry, + {{current_bgp_id, _current_prefix}, _current_preference, _current_path_attributes} = + current + ) do + cond do + bgp_id < current_bgp_id -> entry + bgp_id > current_bgp_id -> current + end + end + + defp route_dissemination(%__MODULE__{} = data) do + Logger.info("#{data.config[:server]}: entering Phase 3: Route Dissemination") + data = %__MODULE__{data | adj_ribs_out: data.loc_rib} + Logger.info("#{data.config[:server]}: exiting Phase 3: Route Dissemination") + data end end diff --git a/lib/bgp/server/rde/rib.ex b/lib/bgp/server/rde/rib.ex new file mode 100644 index 0000000..2a4a702 --- /dev/null +++ b/lib/bgp/server/rde/rib.ex @@ -0,0 +1,27 @@ +defmodule BGP.Server.RDE.RIB do + @moduledoc """ + BGP RIBs + + Implements operations needed to store and update RIB tables using ETS. + """ + + @type t :: :ets.table() + @type key :: term() + @type entry :: tuple() + @type name :: atom() + + @spec new(name()) :: t() + def new(table), do: :ets.new(table, [:set, :protected]) + + @spec delete(t(), key()) :: true + def delete(table, key), do: :ets.delete(table, key) + + @spec dump(t()) :: [entry()] + def dump(table), do: :ets.tab2list(table) + + @spec reduce(t(), term(), (entry(), term() -> term())) :: term() + def reduce(table, acc, fun), do: :ets.foldl(fun, acc, table) + + @spec upsert(t(), entry()) :: true + def upsert(table, entry), do: :ets.insert(table, entry) +end diff --git a/lib/bgp/server/session.ex b/lib/bgp/server/session.ex index 74a1015..226209f 100644 --- a/lib/bgp/server/session.ex +++ b/lib/bgp/server/session.ex @@ -1,6 +1,6 @@ defmodule BGP.Server.Session do @moduledoc """ - BGP Session + BGP Session Implementation of BGP peering session handling and the [BGP FSM](https://datatracker.ietf.org/doc/html/rfc4271#section-8.2). @@ -1272,12 +1272,12 @@ defmodule BGP.Server.Session do %UPDATE{} when hold_time > 0 -> Logger.info("#{data.server}: received update from #{data.host}") - RDE.process_update(data.server, msg) + RDE.queue_update(data.server, data, msg) {:keep_state_and_data, [{:next_event, :internal, {:restart_timer, :hold_time, nil}}]} %UPDATE{} -> Logger.info("#{data.server}: received update from #{data.host}") - RDE.process_update(data.server, msg) + RDE.queue_update(data.server, data, msg) :keep_state_and_data end end @@ -1314,7 +1314,7 @@ defmodule BGP.Server.Session do defp compose_as_update(%__MODULE__{} = data) do %UPDATE{ path_attributes: [ - %Attribute{value: %Origin{origin: :igp}}, + %Attribute{value: %Origin{value: 0}}, %Attribute{value: %ASPath{value: [{:as_sequence, 1, [data.asn]}]}}, %Attribute{value: %NextHop{value: data.bgp_id}} ], diff --git a/mix.exs b/mix.exs index f083b66..300bdb1 100644 --- a/mix.exs +++ b/mix.exs @@ -5,7 +5,7 @@ defmodule BGP.MixProject do [ app: :bgp, version: "0.1.0", - elixir: "~> 1.13", + elixir: "~> 1.14", start_permanent: Mix.env() == :prod, deps: deps(), dialyzer: dialyzer(), diff --git a/test/bgp/message_test.exs b/test/bgp/message_test.exs index 785d8cf..b2aad41 100644 --- a/test/bgp/message_test.exs +++ b/test/bgp/message_test.exs @@ -87,7 +87,7 @@ defmodule BGP.MessageTest do ] attributes = [ - %Attribute{transitive: 1, value: %Origin{origin: :igp}}, + %Attribute{transitive: 1, value: %Origin{value: 0}}, %Attribute{transitive: 1, value: %ASPath{value: [{:as_sequence, 1, [session.asn]}]}}, %Attribute{transitive: 1, value: %NextHop{value: session.bgp_id}} ]