diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 000000000..b27a4e9f3 --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,78 @@ +name: Lint +on: + pull_request: + paths: + - "lib/**" + - "test/**" + - "config/**" + - "priv/**" + - "assets/**" + - "rel/**" + - "mix.exs" + - "Dockerfile" + - "run.sh" + + push: + branches: + - main + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +jobs: + tests: + name: Lint + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Setup elixir + id: beam + uses: erlef/setup-beam@v1 + with: + otp-version: 27.x # Define the OTP version [required] + elixir-version: 1.17.x # Define the elixir version [required] + - name: Cache Mix + uses: actions/cache@v4 + with: + path: | + deps + _build + key: ${{ github.workflow }}-${{ runner.os }}-mix-${{ env.elixir }}-${{ env.otp }}-${{ hashFiles('**/mix.lock') }} + restore-keys: | + ${{ github.workflow }}-${{ runner.os }}-mix-${{ env.elixir }}-${{ env.otp }}- + + - name: Install dependencies + run: mix deps.get + - name: Set up Postgres + run: docker compose -f docker-compose.dbs.yml up -d + - name: Run main database migrations + run: mix ecto.migrate --log-migrator-sql + - name: Run database tenant migrations + run: mix ecto.migrate --migrations-path lib/realtime/tenants/repo/migrations + - name: Run format check + run: mix format --check-formatted + - name: Credo checks + run: mix credo + - name: Run hex audit + run: mix hex.audit + - name: Run mix_audit + run: mix deps.audit + - name: Run sobelow + run: mix sobelow --config .sobelow-conf + - name: Retrieve PLT Cache + uses: actions/cache@v4 + id: plt-cache + with: + path: priv/plts + key: ${{ runner.os }}-${{ steps.beam.outputs.otp-version }}-${{ steps.beam.outputs.elixir-version }}-plts-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }} + - name: Create PLTs + if: steps.plt-cache.outputs.cache-hit != 'true' + run: | + mkdir -p priv/plts + mix dialyzer.build + - name: Run dialyzer + run: mix dialyzer + - name: Run dev seeds + run: DB_ENC_KEY="1234567890123456" mix ecto.setup diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index c9c2a73fa..45d27634a 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -20,6 +20,9 @@ concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} cancel-in-progress: true +env: + MIX_ENV: test + jobs: tests: name: Tests @@ -36,44 +39,19 @@ jobs: - name: Cache Mix uses: actions/cache@v4 with: - path: deps - key: ${{ runner.os }}-mix-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }} + path: | + deps + _build + key: ${{ github.workflow }}-${{ runner.os }}-mix-${{ env.elixir }}-${{ env.otp }}-${{ hashFiles('**/mix.lock') }} restore-keys: | - ${{ runner.os }}-mix- + ${{ github.workflow }}-${{ runner.os }}-mix-${{ env.elixir }}-${{ env.otp }}- + - name: Pull postgres image quietly in background (used by test/support/containers.ex) + run: docker pull supabase/postgres:15.8.1.040 > /dev/null 2>&1 & - name: Install dependencies run: mix deps.get - name: Set up Postgres run: docker compose -f docker-compose.dbs.yml up -d - - name: Run main database migrations - run: mix ecto.migrate --log-migrator-sql - - name: Run database tenant migrations - run: mix ecto.migrate --migrations-path lib/realtime/tenants/repo/migrations - - name: Run format check - run: mix format --check-formatted - - name: Credo checks - run: mix credo - - name: Run hex audit - run: mix hex.audit - - name: Run mix_audit - run: mix deps.audit - - name: Run sobelow - run: mix sobelow --config .sobelow-conf - - name: Retrieve PLT Cache - uses: actions/cache@v4 - id: plt-cache - with: - path: priv/plts - key: ${{ runner.os }}-${{ steps.beam.outputs.otp-version }}-${{ steps.beam.outputs.elixir-version }}-plts-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }} - - name: Create PLTs - if: steps.plt-cache.outputs.cache-hit != 'true' - run: | - mkdir -p priv/plts - mix dialyzer.build - - name: Run dialyzer - run: mix dialyzer - - name: Run dev seeds - run: DB_ENC_KEY="1234567890123456" mix ecto.setup - name: Start epmd run: epmd -daemon - name: Run tests diff --git a/config/runtime.exs b/config/runtime.exs index 447934b65..f09d22846 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -70,6 +70,7 @@ platform = if System.get_env("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE", do: :aws broadcast_pool_size = Env.get_integer("BROADCAST_POOL_SIZE", 10) pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "gen_rpc") |> String.to_atom() websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_000), :erlang.system_info(:wordsize)) +users_scope_shards = Env.get_integer("USERS_SCOPE_SHARDS", 5) no_channel_timeout_in_ms = if config_env() == :test, @@ -126,7 +127,8 @@ config :realtime, no_channel_timeout_in_ms: no_channel_timeout_in_ms, platform: platform, pubsub_adapter: pubsub_adapter, - broadcast_pool_size: broadcast_pool_size + broadcast_pool_size: broadcast_pool_size, + users_scope_shards: users_scope_shards if config_env() != :test && run_janitor? do config :realtime, diff --git a/lib/extensions/postgres_cdc_rls/replication_poller.ex b/lib/extensions/postgres_cdc_rls/replication_poller.ex index 85466ebe9..34697572c 100644 --- a/lib/extensions/postgres_cdc_rls/replication_poller.ex +++ b/lib/extensions/postgres_cdc_rls/replication_poller.ex @@ -18,6 +18,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do alias Realtime.Adapters.Changes.NewRecord alias Realtime.Adapters.Changes.UpdatedRecord alias Realtime.Database + alias Realtime.RateCounter + alias Realtime.Tenants def start_link(opts), do: GenServer.start_link(__MODULE__, opts) @@ -26,6 +28,12 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do tenant_id = args["id"] Logger.metadata(external_id: tenant_id, project: tenant_id) + %Realtime.Api.Tenant{} = Tenants.Cache.get_tenant_by_external_id(tenant_id) + + rate_counter_args = Tenants.db_events_per_second_rate(tenant_id, 4000) + + RateCounter.new(rate_counter_args) + state = %{ backoff: Backoff.new(backoff_min: 100, backoff_max: 5_000, backoff_type: :rand_exp), db_host: args["db_host"], @@ -41,7 +49,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do retry_ref: nil, retry_count: 0, slot_name: args["slot_name"] <> slot_name_suffix(), - tenant_id: tenant_id + tenant_id: tenant_id, + rate_counter_args: rate_counter_args } {:ok, _} = Registry.register(__MODULE__.Registry, tenant_id, %{}) @@ -74,7 +83,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do max_record_bytes: max_record_bytes, max_changes: max_changes, conn: conn, - tenant_id: tenant_id + tenant_id: tenant_id, + rate_counter_args: rate_counter_args } = state ) do cancel_timer(poll_ref) @@ -84,7 +94,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do {time, list_changes} = :timer.tc(Replications, :list_changes, args) record_list_changes_telemetry(time, tenant_id) - case handle_list_changes_result(list_changes, tenant_id) do + case handle_list_changes_result(list_changes, tenant_id, rate_counter_args) do {:ok, row_count} -> Backoff.reset(backoff) @@ -177,20 +187,29 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do rows: [_ | _] = rows, num_rows: rows_count }}, - tenant_id + tenant_id, + rate_counter_args ) do - for row <- rows, - change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do - topic = "realtime:postgres:" <> tenant_id + case RateCounter.get(rate_counter_args) do + {:ok, %{limit: %{triggered: true}}} -> + :ok - RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher, :postgres_changes) + _ -> + Realtime.GenCounter.add(rate_counter_args.id, rows_count) + + for row <- rows, + change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do + topic = "realtime:postgres:" <> tenant_id + + RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher, :postgres_changes) + end end {:ok, rows_count} end - defp handle_list_changes_result({:ok, _}, _), do: {:ok, 0} - defp handle_list_changes_result({:error, reason}, _), do: {:error, reason} + defp handle_list_changes_result({:ok, _}, _, _), do: {:ok, 0} + defp handle_list_changes_result({:error, reason}, _, _), do: {:error, reason} def generate_record([ {"wal", diff --git a/lib/realtime/application.ex b/lib/realtime/application.ex index 99096edfb..45cc0271e 100644 --- a/lib/realtime/application.ex +++ b/lib/realtime/application.ex @@ -46,8 +46,7 @@ defmodule Realtime.Application do Realtime.PromEx.set_metrics_tags() :ets.new(Realtime.Tenants.Connect, [:named_table, :set, :public]) :syn.set_event_handler(Realtime.SynHandler) - - :ok = :syn.add_node_to_scopes([:users, RegionNodes, Realtime.Tenants.Connect]) + :ok = :syn.add_node_to_scopes([RegionNodes, Realtime.Tenants.Connect | Realtime.UsersCounter.scopes()]) region = Application.get_env(:realtime, :region) :syn.join(RegionNodes, region, self(), node: node()) diff --git a/lib/realtime/nodes.ex b/lib/realtime/nodes.ex index ae237eb5f..34c9f3cfb 100644 --- a/lib/realtime/nodes.ex +++ b/lib/realtime/nodes.ex @@ -105,7 +105,7 @@ defmodule Realtime.Nodes do iex> node = :"pink@127.0.0.1" iex> Realtime.Helpers.short_node_id_from_name(node) - "127.0.0.1" + "pink@127.0.0.1" iex> node = :"pink@10.0.1.1" iex> Realtime.Helpers.short_node_id_from_name(node) @@ -124,6 +124,9 @@ defmodule Realtime.Nodes do [_, _, _, _, _, one, two, _] -> one <> two + ["127.0.0.1"] -> + Atom.to_string(name) + _other -> host end diff --git a/lib/realtime/tenants.ex b/lib/realtime/tenants.ex index efd2397ac..9e53e18f1 100644 --- a/lib/realtime/tenants.ex +++ b/lib/realtime/tenants.ex @@ -21,7 +21,8 @@ defmodule Realtime.Tenants do """ @spec list_connected_tenants(atom()) :: [String.t()] def list_connected_tenants(node) do - :syn.group_names(:users, node) + UsersCounter.scopes() + |> Enum.flat_map(fn scope -> :syn.group_names(scope, node) end) end @doc """ @@ -247,6 +248,31 @@ defmodule Realtime.Tenants do %RateCounter.Args{id: db_events_per_second_key(tenant_id), opts: opts} end + @doc "RateCounter arguments for counting database events per second with a limit." + @spec db_events_per_second_rate(String.t(), non_neg_integer) :: RateCounter.Args.t() + def db_events_per_second_rate(tenant_id, max_events_per_second) when is_binary(tenant_id) do + opts = [ + telemetry: %{ + event_name: [:channel, :db_events], + measurements: %{}, + metadata: %{tenant: tenant_id} + }, + limit: [ + value: max_events_per_second, + measurement: :avg, + log: true, + log_fn: fn -> + Logger.error("MessagePerSecondRateLimitReached: Too many postgres changes messages per second", + external_id: tenant_id, + project: tenant_id + ) + end + ] + ] + + %RateCounter.Args{id: db_events_per_second_key(tenant_id), opts: opts} + end + @doc """ The GenCounter key to use when counting events for RealtimeChannel events. iex> Realtime.Tenants.db_events_per_second_key("tenant_id") diff --git a/lib/realtime/user_counter.ex b/lib/realtime/user_counter.ex index 6190030d9..9ea38c780 100644 --- a/lib/realtime/user_counter.ex +++ b/lib/realtime/user_counter.ex @@ -8,17 +8,32 @@ defmodule Realtime.UsersCounter do Adds a RealtimeChannel pid to the `:users` scope for a tenant so we can keep track of all connected clients for a tenant. """ @spec add(pid(), String.t()) :: :ok - def add(pid, tenant), do: :syn.join(:users, tenant, pid) + def add(pid, tenant_id), do: tenant_id |> scope() |> :syn.join(tenant_id, pid) @doc """ Returns the count of all connected clients for a tenant for the cluster. """ @spec tenant_users(String.t()) :: non_neg_integer() - def tenant_users(tenant), do: :syn.member_count(:users, tenant) + def tenant_users(tenant_id), do: tenant_id |> scope() |> :syn.member_count(tenant_id) @doc """ Returns the count of all connected clients for a tenant for a single node. """ @spec tenant_users(atom, String.t()) :: non_neg_integer() - def tenant_users(node_name, tenant), do: :syn.member_count(:users, tenant, node_name) + def tenant_users(node_name, tenant_id), do: tenant_id |> scope() |> :syn.member_count(tenant_id, node_name) + + @doc """ + Returns the scope for a given tenant id. + """ + @spec scope(String.t()) :: atom() + def scope(tenant_id) do + shards = Application.get_env(:realtime, :users_scope_shards) + shard = :erlang.phash2(tenant_id, shards) + :"users_#{shard}" + end + + def scopes() do + shards = Application.get_env(:realtime, :users_scope_shards) + Enum.map(0..(shards - 1), fn shard -> :"users_#{shard}" end) + end end diff --git a/lib/realtime_web/channels/payloads/config.ex b/lib/realtime_web/channels/payloads/config.ex index 923020174..029aa93b5 100644 --- a/lib/realtime_web/channels/payloads/config.ex +++ b/lib/realtime_web/channels/payloads/config.ex @@ -17,6 +17,14 @@ defmodule RealtimeWeb.Channels.Payloads.Config do end def changeset(config, attrs) do + attrs = + attrs + |> Enum.map(fn + {k, v} when is_list(v) -> {k, Enum.filter(v, fn v -> v != nil end)} + {k, v} -> {k, v} + end) + |> Map.new() + config |> cast(attrs, [:private], message: &Join.error_message/2) |> cast_embed(:broadcast, invalid_message: "unable to parse, expected a map") diff --git a/lib/realtime_web/channels/payloads/presence.ex b/lib/realtime_web/channels/payloads/presence.ex index 53e09047d..785df9222 100644 --- a/lib/realtime_web/channels/payloads/presence.ex +++ b/lib/realtime_web/channels/payloads/presence.ex @@ -8,7 +8,7 @@ defmodule RealtimeWeb.Channels.Payloads.Presence do embedded_schema do field :enabled, :boolean, default: true - field :key, :string, default: UUID.uuid1() + field :key, :any, default: UUID.uuid1(), virtual: true end def changeset(presence, attrs) do diff --git a/lib/realtime_web/live/status_live/index.ex b/lib/realtime_web/live/status_live/index.ex index 8a2d32054..f55eddfa5 100644 --- a/lib/realtime_web/live/status_live/index.ex +++ b/lib/realtime_web/live/status_live/index.ex @@ -3,11 +3,18 @@ defmodule RealtimeWeb.StatusLive.Index do alias Realtime.Latency.Payload alias Realtime.Nodes + alias RealtimeWeb.Endpoint @impl true def mount(_params, _session, socket) do - if connected?(socket), do: RealtimeWeb.Endpoint.subscribe("admin:cluster") - {:ok, assign(socket, pings: default_pings(), nodes: Enum.count(all_nodes()))} + if connected?(socket), do: Endpoint.subscribe("admin:cluster") + + socket = + socket + |> assign(nodes: Enum.count(all_nodes())) + |> stream(:pings, default_pings()) + + {:ok, socket} end @impl true @@ -17,17 +24,14 @@ defmodule RealtimeWeb.StatusLive.Index do @impl true def handle_info(%Phoenix.Socket.Broadcast{payload: %Payload{} = payload}, socket) do - pair = payload.from_node <> "_" <> payload.node - payload = %{pair => payload} - - pings = Map.merge(socket.assigns.pings, payload) + pair = pair_id(payload.from_node, payload.node) - {:noreply, assign(socket, pings: pings)} + {:noreply, stream(socket, :pings, [%{id: pair, payload: payload}])} end defp apply_action(socket, :index, _params) do socket - |> assign(:page_title, "Status - Supabase Realtime") + |> assign(:page_title, "Realtime Status") end defp all_nodes do @@ -35,9 +39,14 @@ defmodule RealtimeWeb.StatusLive.Index do end defp default_pings do - for n <- all_nodes(), f <- all_nodes(), into: %{} do - pair = n <> "_" <> f - {pair, %Payload{from_node: f, latency: "Loading...", node: n, timestamp: "Loading..."}} + for n <- all_nodes(), f <- all_nodes() do + pair = pair_id(f, n) + + %{id: pair, payload: %Payload{from_node: f, latency: "Loading...", node: n, timestamp: "Loading..."}} end end + + defp pair_id(from, to) do + from <> "_" <> to + end end diff --git a/lib/realtime_web/live/status_live/index.html.heex b/lib/realtime_web/live/status_live/index.html.heex index 645001714..63ea4fc0d 100644 --- a/lib/realtime_web/live/status_live/index.html.heex +++ b/lib/realtime_web/live/status_live/index.html.heex @@ -1,16 +1,16 @@ <.h1>Supabase Realtime: Multiplayer Edition + <.h2>Cluster Status +
Understand the latency between nodes across the Realtime cluster.