diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 5d3818814..c9c2a73fa 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -16,6 +16,10 @@ on: branches: - main +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + jobs: tests: name: Tests diff --git a/Makefile b/Makefile index fd7f0f7fd..1259a1335 100644 --- a/Makefile +++ b/Makefile @@ -9,10 +9,10 @@ PORT ?= 4000 # Common commands dev: ## Start a dev server - ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=$(PORT) MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev REGION=fra DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" GEN_RPC_TCP_SERVER_PORT=5369 GEN_RPC_TCP_CLIENT_PORT=5469 iex --name $(NODE_NAME)@127.0.0.1 --cookie cookie -S mix phx.server + ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=$(PORT) MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev REGION=us-east-1 DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" GEN_RPC_TCP_SERVER_PORT=5369 GEN_RPC_TCP_CLIENT_PORT=5469 iex --name $(NODE_NAME)@127.0.0.1 --cookie cookie -S mix phx.server dev.orange: ## Start another dev server (orange) on port 4001 - ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=4001 MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" GEN_RPC_TCP_SERVER_PORT=5469 GEN_RPC_TCP_CLIENT_PORT=5369 iex --name orange@127.0.0.1 --cookie cookie -S mix phx.server + ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=4001 MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev REGION=eu-west-1 DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" GEN_RPC_TCP_SERVER_PORT=5469 GEN_RPC_TCP_CLIENT_PORT=5369 iex --name orange@127.0.0.1 --cookie cookie -S mix phx.server seed: ## Seed the database DB_ENC_KEY="1234567890123456" FLY_ALLOC_ID=123e4567-e89b-12d3-a456-426614174000 mix run priv/repo/dev_seeds.exs diff --git a/config/dev.exs b/config/dev.exs index a438f8ea4..0eff300d8 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -97,6 +97,8 @@ config :phoenix, :plug_init_mode, :runtime # Disable caching to ensure the rendered spec is refreshed config :open_api_spex, :cache_adapter, OpenApiSpex.Plug.NoneCache -config :opentelemetry, traces_exporter: {:otel_exporter_stdout, []} +# Disabled but can print to stdout with: +# config :opentelemetry, traces_exporter: {:otel_exporter_stdout, []} +config :opentelemetry, traces_exporter: :none config :mix_test_watch, clear: true diff --git a/config/runtime.exs b/config/runtime.exs index 47961f98a..447934b65 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -68,7 +68,7 @@ janitor_children_timeout = Env.get_integer("JANITOR_CHILDREN_TIMEOUT", :timer.se janitor_schedule_timer = Env.get_integer("JANITOR_SCHEDULE_TIMER_IN_MS", :timer.hours(4)) platform = if System.get_env("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE", do: :aws, else: :fly broadcast_pool_size = Env.get_integer("BROADCAST_POOL_SIZE", 10) -pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "pg2") |> String.to_atom() +pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "gen_rpc") |> String.to_atom() websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_000), :erlang.system_info(:wordsize)) no_channel_timeout_in_ms = diff --git a/lib/extensions/postgres_cdc_rls/replication_poller.ex b/lib/extensions/postgres_cdc_rls/replication_poller.ex index 65f4a33f1..85466ebe9 100644 --- a/lib/extensions/postgres_cdc_rls/replication_poller.ex +++ b/lib/extensions/postgres_cdc_rls/replication_poller.ex @@ -183,7 +183,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do topic = "realtime:postgres:" <> tenant_id - RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher) + RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher, :postgres_changes) end {:ok, rows_count} diff --git a/lib/realtime/gen_rpc/pub_sub.ex b/lib/realtime/gen_rpc/pub_sub.ex index b2a90b165..3ba9e053a 100644 --- a/lib/realtime/gen_rpc/pub_sub.ex +++ b/lib/realtime/gen_rpc/pub_sub.ex @@ -65,7 +65,11 @@ defmodule Realtime.GenRpcPubSub.Worker do def start_link({pubsub, worker}), do: GenServer.start_link(__MODULE__, pubsub, name: worker) @impl true - def init(pubsub), do: {:ok, pubsub} + def init(pubsub) do + Process.flag(:message_queue_data, :off_heap) + Process.flag(:fullsweep_after, 100) + {:ok, pubsub} + end @impl true def handle_info({:ftl, topic, message, dispatcher}, pubsub) do diff --git a/lib/realtime/monitoring/latency.ex b/lib/realtime/monitoring/latency.ex index 52c46adb4..d9ddd0d9a 100644 --- a/lib/realtime/monitoring/latency.ex +++ b/lib/realtime/monitoring/latency.ex @@ -7,7 +7,7 @@ defmodule Realtime.Latency do use Realtime.Logs alias Realtime.Nodes - alias Realtime.Rpc + alias Realtime.GenRpc defmodule Payload do @moduledoc false @@ -33,7 +33,7 @@ defmodule Realtime.Latency do } end - @every 5_000 + @every 15_000 def start_link(args) do GenServer.start_link(__MODULE__, args, name: __MODULE__) end @@ -76,7 +76,7 @@ defmodule Realtime.Latency do Task.Supervisor.async(Realtime.TaskSupervisor, fn -> {latency, response} = :timer.tc(fn -> - Rpc.call(n, __MODULE__, :pong, [pong_timeout], timeout: timer_timeout) + GenRpc.call(n, __MODULE__, :pong, [pong_timeout], timeout: timer_timeout) end) latency_ms = latency / 1_000 @@ -85,7 +85,7 @@ defmodule Realtime.Latency do from_node = Nodes.short_node_id_from_name(Node.self()) case response do - {:badrpc, reason} -> + {:error, :rpc_error, reason} -> log_error( "RealtimeNodeDisconnected", "Unable to connect to #{short_name} from #{region}: #{reason}" diff --git a/lib/realtime/monitoring/prom_ex/plugins/tenant.ex b/lib/realtime/monitoring/prom_ex/plugins/tenant.ex index 1bd324624..a3019a68a 100644 --- a/lib/realtime/monitoring/prom_ex/plugins/tenant.ex +++ b/lib/realtime/monitoring/prom_ex/plugins/tenant.ex @@ -36,10 +36,10 @@ defmodule Realtime.PromEx.Plugins.Tenant do event_name: [:realtime, :tenants, :payload, :size], measurement: :size, description: "Tenant payload size", - tags: [:tenant], + tags: [:tenant, :message_type], unit: :byte, reporter_options: [ - buckets: [100, 250, 500, 1000, 2000, 3000, 5000, 10_000, 25_000] + buckets: [250, 500, 1000, 3000, 5000, 10_000, 25_000, 100_000, 500_000, 1_000_000, 3_000_000] ] ), distribution( @@ -47,9 +47,10 @@ defmodule Realtime.PromEx.Plugins.Tenant do event_name: [:realtime, :tenants, :payload, :size], measurement: :size, description: "Payload size", + tags: [:message_type], unit: :byte, reporter_options: [ - buckets: [100, 250, 500, 1000, 2000, 3000, 5000, 10_000, 25_000] + buckets: [250, 500, 1000, 3000, 5000, 10_000, 25_000, 100_000, 500_000, 1_000_000, 3_000_000] ] ) ] @@ -157,6 +158,12 @@ defmodule Realtime.PromEx.Plugins.Tenant do description: "Sum of messages sent on a Realtime Channel.", tags: [:tenant] ), + sum( + [:realtime, :channel, :global, :events], + event_name: [:realtime, :rate_counter, :channel, :events], + measurement: :sum, + description: "Global sum of messages sent on a Realtime Channel." + ), sum( [:realtime, :channel, :presence_events], event_name: [:realtime, :rate_counter, :channel, :presence_events], @@ -164,6 +171,12 @@ defmodule Realtime.PromEx.Plugins.Tenant do description: "Sum of presence messages sent on a Realtime Channel.", tags: [:tenant] ), + sum( + [:realtime, :channel, :global, :presence_events], + event_name: [:realtime, :rate_counter, :channel, :presence_events], + measurement: :sum, + description: "Global sum of presence messages sent on a Realtime Channel." + ), sum( [:realtime, :channel, :db_events], event_name: [:realtime, :rate_counter, :channel, :db_events], @@ -171,6 +184,12 @@ defmodule Realtime.PromEx.Plugins.Tenant do description: "Sum of db messages sent on a Realtime Channel.", tags: [:tenant] ), + sum( + [:realtime, :channel, :global, :db_events], + event_name: [:realtime, :rate_counter, :channel, :db_events], + measurement: :sum, + description: "Global sum of db messages sent on a Realtime Channel." + ), sum( [:realtime, :channel, :joins], event_name: [:realtime, :rate_counter, :channel, :joins], diff --git a/lib/realtime/monitoring/prom_ex/plugins/tenants.ex b/lib/realtime/monitoring/prom_ex/plugins/tenants.ex index 0035e9594..e8106df58 100644 --- a/lib/realtime/monitoring/prom_ex/plugins/tenants.ex +++ b/lib/realtime/monitoring/prom_ex/plugins/tenants.ex @@ -21,6 +21,15 @@ defmodule Realtime.PromEx.Plugins.Tenants do unit: {:microsecond, :millisecond}, tags: [:success, :tenant, :mechanism], reporter_options: [buckets: [10, 250, 5000, 15_000]] + ), + distribution( + [:realtime, :global, :rpc], + event_name: [:realtime, :rpc], + description: "Global Latency of rpc calls", + measurement: :latency, + unit: {:microsecond, :millisecond}, + tags: [:success, :mechanism], + reporter_options: [buckets: [10, 250, 5000, 15_000]] ) ]) end diff --git a/lib/realtime/tenants.ex b/lib/realtime/tenants.ex index db2a02cc4..efd2397ac 100644 --- a/lib/realtime/tenants.ex +++ b/lib/realtime/tenants.ex @@ -328,18 +328,18 @@ defmodule Realtime.Tenants do %RateCounter.Args{id: {:channel, :authorization_errors, external_id}, opts: opts} end - @connect_per_second_default 10 + @connect_errors_per_second_default 10 @doc "RateCounter arguments for counting connect per second." - @spec connect_per_second_rate(Tenant.t() | String.t()) :: RateCounter.Args.t() - def connect_per_second_rate(%Tenant{external_id: external_id}) do - connect_per_second_rate(external_id) + @spec connect_errors_per_second_rate(Tenant.t() | String.t()) :: RateCounter.Args.t() + def connect_errors_per_second_rate(%Tenant{external_id: external_id}) do + connect_errors_per_second_rate(external_id) end - def connect_per_second_rate(tenant_id) do + def connect_errors_per_second_rate(tenant_id) do opts = [ - max_bucket_len: 10, + max_bucket_len: 30, limit: [ - value: @connect_per_second_default, + value: @connect_errors_per_second_default, measurement: :sum, log_fn: fn -> Logger.critical( diff --git a/lib/realtime/tenants/batch_broadcast.ex b/lib/realtime/tenants/batch_broadcast.ex index 98427621b..9e4ed4c3c 100644 --- a/lib/realtime/tenants/batch_broadcast.ex +++ b/lib/realtime/tenants/batch_broadcast.ex @@ -129,7 +129,14 @@ defmodule Realtime.Tenants.BatchBroadcast do broadcast = %Phoenix.Socket.Broadcast{topic: message.topic, event: @event_type, payload: payload} GenCounter.add(events_per_second_rate.id) - TenantBroadcaster.pubsub_broadcast(tenant.external_id, tenant_topic, broadcast, RealtimeChannel.MessageDispatcher) + + TenantBroadcaster.pubsub_broadcast( + tenant.external_id, + tenant_topic, + broadcast, + RealtimeChannel.MessageDispatcher, + :broadcast + ) end defp permissions_for_message(_, nil, _), do: nil diff --git a/lib/realtime/tenants/connect.ex b/lib/realtime/tenants/connect.ex index 0ee43f161..caf49cc57 100644 --- a/lib/realtime/tenants/connect.ex +++ b/lib/realtime/tenants/connect.ex @@ -57,7 +57,7 @@ defmodule Realtime.Tenants.Connect do | {:error, :connect_rate_limit_reached} | {:error, :rpc_error, term()} def lookup_or_start_connection(tenant_id, opts \\ []) when is_binary(tenant_id) do - rate_args = Tenants.connect_per_second_rate(tenant_id) + rate_args = Tenants.connect_errors_per_second_rate(tenant_id) RateCounter.new(rate_args) with {:ok, %{limit: %{triggered: false}}} <- RateCounter.get(rate_args), @@ -68,8 +68,14 @@ defmodule Realtime.Tenants.Connect do {:error, :connect_rate_limit_reached} {:error, :tenant_database_connection_initializing} -> - GenCounter.add(rate_args.id) - call_external_node(tenant_id, opts) + case call_external_node(tenant_id, opts) do + {:ok, pid} -> + {:ok, pid} + + error -> + GenCounter.add(rate_args.id) + error + end {:error, :initializing} -> {:error, :tenant_database_unavailable} diff --git a/lib/realtime_web/channels/presence.ex b/lib/realtime_web/channels/presence.ex index f4d378b92..9e173febe 100644 --- a/lib/realtime_web/channels/presence.ex +++ b/lib/realtime_web/channels/presence.ex @@ -8,5 +8,6 @@ defmodule RealtimeWeb.Presence do use Phoenix.Presence, otp_app: :realtime, pubsub_server: Realtime.PubSub, + dispatcher: RealtimeWeb.RealtimeChannel.MessageDispatcher, pool_size: 10 end diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index 91a417c21..104d9a077 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -18,7 +18,6 @@ defmodule RealtimeWeb.RealtimeChannel do alias Realtime.Tenants.Authorization alias Realtime.Tenants.Authorization.Policies alias Realtime.Tenants.Authorization.Policies.BroadcastPolicies - alias Realtime.Tenants.Authorization.Policies.PresencePolicies alias Realtime.Tenants.Connect alias RealtimeWeb.Channels.Payloads.Join @@ -259,27 +258,11 @@ defmodule RealtimeWeb.RealtimeChannel do {:noreply, assign(socket, %{pg_sub_ref: pg_sub_ref})} end - def handle_info( - %{event: "presence_diff"}, - %{assigns: %{policies: %Policies{presence: %PresencePolicies{read: false}}}} = socket - ) do - Logger.warning("Presence message ignored") - {:noreply, socket} - end - def handle_info(_msg, %{assigns: %{policies: %Policies{broadcast: %BroadcastPolicies{read: false}}}} = socket) do Logger.warning("Broadcast message ignored") {:noreply, socket} end - def handle_info(%{event: "presence_diff", payload: payload} = msg, socket) do - %{presence_rate_counter: presence_rate_counter} = socket.assigns - GenCounter.add(presence_rate_counter.id) - maybe_log_info(socket, msg) - push(socket, "presence_diff", payload) - {:noreply, socket} - end - def handle_info(%{event: type, payload: payload} = msg, socket) do count(socket) maybe_log_info(socket, msg) diff --git a/lib/realtime_web/channels/realtime_channel/broadcast_handler.ex b/lib/realtime_web/channels/realtime_channel/broadcast_handler.ex index f8e736c2e..036ad9159 100644 --- a/lib/realtime_web/channels/realtime_channel/broadcast_handler.ex +++ b/lib/realtime_web/channels/realtime_channel/broadcast_handler.ex @@ -76,14 +76,21 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do broadcast = %Phoenix.Socket.Broadcast{topic: tenant_topic, event: @event_type, payload: payload} if self_broadcast do - TenantBroadcaster.pubsub_broadcast(tenant_id, tenant_topic, broadcast, RealtimeChannel.MessageDispatcher) + TenantBroadcaster.pubsub_broadcast( + tenant_id, + tenant_topic, + broadcast, + RealtimeChannel.MessageDispatcher, + :broadcast + ) else TenantBroadcaster.pubsub_broadcast_from( tenant_id, self(), tenant_topic, broadcast, - RealtimeChannel.MessageDispatcher + RealtimeChannel.MessageDispatcher, + :broadcast ) end end diff --git a/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex b/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex index 32e1528f3..6604eb2bd 100644 --- a/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex +++ b/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex @@ -5,14 +5,8 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do require Logger - def fastlane_metadata(fastlane_pid, serializer, topic, log_level, tenant_id, replayed_message_ids \\ MapSet.new()) - - def fastlane_metadata(fastlane_pid, serializer, topic, :info, tenant_id, replayed_message_ids) do - {:rc_fastlane, fastlane_pid, serializer, topic, {:log, tenant_id}, replayed_message_ids} - end - - def fastlane_metadata(fastlane_pid, serializer, topic, _log_level, _tenant_id, replayed_message_ids) do - {:rc_fastlane, fastlane_pid, serializer, topic, replayed_message_ids} + def fastlane_metadata(fastlane_pid, serializer, topic, log_level, tenant_id, replayed_message_ids \\ MapSet.new()) do + {:rc_fastlane, fastlane_pid, serializer, topic, log_level, tenant_id, replayed_message_ids} end @doc """ @@ -20,48 +14,58 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do It also sends an :update_rate_counter to the subscriber and it can conditionally log """ @spec dispatch(list, pid, Phoenix.Socket.Broadcast.t()) :: :ok - def dispatch(subscribers, from, %Phoenix.Socket.Broadcast{} = msg) do + def dispatch(subscribers, from, %Phoenix.Socket.Broadcast{event: event} = msg) do # fastlane_pid is the actual socket transport pid # This reduce caches the serialization and bypasses the channel process going straight to the # transport process message_id = message_id(msg.payload) - # Credo doesn't like that we don't use the result aggregation - _ = - Enum.reduce(subscribers, %{}, fn - {pid, _}, cache when pid == from -> - cache + {_cache, count} = + Enum.reduce(subscribers, {%{}, 0}, fn + {pid, _}, {cache, count} when pid == from -> + {cache, count} - {pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, replayed_message_ids}}, cache -> + {pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, log_level, tenant_id, replayed_message_ids}}, + {cache, count} -> if already_replayed?(message_id, replayed_message_ids) do # skip already replayed message - cache + {cache, count} else - send(pid, :update_rate_counter) - do_dispatch(msg, fastlane_pid, serializer, join_topic, cache) - end + if event != "presence_diff", do: send(pid, :update_rate_counter) - {pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, {:log, tenant_id}, replayed_message_ids}}, cache -> - if already_replayed?(message_id, replayed_message_ids) do - # skip already replayed message - cache - else - send(pid, :update_rate_counter) - log = "Received message on #{join_topic} with payload: #{inspect(msg, pretty: true)}" - Logger.info(log, external_id: tenant_id, project: tenant_id) + maybe_log(log_level, join_topic, msg, tenant_id) - do_dispatch(msg, fastlane_pid, serializer, join_topic, cache) + cache = do_dispatch(msg, fastlane_pid, serializer, join_topic, cache) + {cache, count + 1} end - {pid, _}, cache -> + {pid, _}, {cache, count} -> send(pid, msg) - cache + {cache, count} end) + tenant_id = tenant_id(subscribers) + increment_presence_counter(tenant_id, event, count) + :ok end + defp increment_presence_counter(tenant_id, "presence_diff", count) when is_binary(tenant_id) do + tenant_id + |> Realtime.Tenants.presence_events_per_second_key() + |> Realtime.GenCounter.add(count) + end + + defp increment_presence_counter(_tenant_id, _event, _count), do: :ok + + defp maybe_log(:info, join_topic, msg, tenant_id) do + log = "Received message on #{join_topic} with payload: #{inspect(msg, pretty: true)}" + Logger.info(log, external_id: tenant_id, project: tenant_id) + end + + defp maybe_log(_level, _join_topic, _msg, _tenant_id), do: :ok + defp message_id(%{"meta" => %{"id" => id}}), do: id defp message_id(_), do: nil @@ -82,4 +86,10 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do Map.put(cache, serializer, encoded_msg) end end + + defp tenant_id([{_pid, {:rc_fastlane, _, _, _, _, tenant_id, _}} | _]) do + tenant_id + end + + defp tenant_id(_), do: nil end diff --git a/lib/realtime_web/channels/realtime_channel/presence_handler.ex b/lib/realtime_web/channels/realtime_channel/presence_handler.ex index 9dc23d219..ec16c7b16 100644 --- a/lib/realtime_web/channels/realtime_channel/presence_handler.ex +++ b/lib/realtime_web/channels/realtime_channel/presence_handler.ex @@ -11,7 +11,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do alias Phoenix.Tracker.Shard alias Realtime.GenCounter alias Realtime.RateCounter - alias Realtime.Tenants + # alias Realtime.Tenants alias Realtime.Tenants.Authorization alias RealtimeWeb.Presence alias RealtimeWeb.RealtimeChannel.Logging @@ -109,6 +109,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do %{assigns: %{presence_key: presence_key, tenant_topic: tenant_topic}} = socket payload = Map.get(payload, "payload", %{}) + RealtimeWeb.TenantBroadcaster.collect_payload_size(socket.assigns.tenant, payload, :presence) with :ok <- limit_presence_event(socket), {:ok, _} <- Presence.track(self(), tenant_topic, presence_key, payload) do @@ -138,13 +139,14 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do |> Phoenix.Presence.group() end + @presence_limit 100 defp limit_presence_event(socket) do - %{assigns: %{presence_rate_counter: presence_counter, tenant: tenant_id}} = socket + %{assigns: %{presence_rate_counter: presence_counter, tenant: _tenant_id}} = socket {:ok, rate_counter} = RateCounter.get(presence_counter) - tenant = Tenants.Cache.get_tenant_by_external_id(tenant_id) + # tenant = Tenants.Cache.get_tenant_by_external_id(tenant_id) - if rate_counter.avg > tenant.max_presence_events_per_second do + if rate_counter.avg > @presence_limit do {:error, :rate_limit_exceeded} else GenCounter.add(presence_counter.id) diff --git a/lib/realtime_web/channels/tenant_rate_limiters.ex b/lib/realtime_web/channels/tenant_rate_limiters.ex new file mode 100644 index 000000000..2101ac945 --- /dev/null +++ b/lib/realtime_web/channels/tenant_rate_limiters.ex @@ -0,0 +1,43 @@ +defmodule RealtimeWeb.TenantRateLimiters do + @moduledoc """ + Rate limiters for tenants. + """ + require Logger + alias Realtime.UsersCounter + alias Realtime.Tenants + alias Realtime.RateCounter + alias Realtime.Api.Tenant + + @spec check_tenant(Realtime.Api.Tenant.t()) :: :ok | {:error, :too_many_connections | :too_many_joins} + def check_tenant(tenant) do + with :ok <- max_concurrent_users_check(tenant) do + max_joins_per_second_check(tenant) + end + end + + defp max_concurrent_users_check(%Tenant{max_concurrent_users: max_conn_users, external_id: external_id}) do + total_conn_users = UsersCounter.tenant_users(external_id) + + if total_conn_users < max_conn_users, + do: :ok, + else: {:error, :too_many_connections} + end + + defp max_joins_per_second_check(%Tenant{max_joins_per_second: max_joins_per_second} = tenant) do + rate_args = Tenants.joins_per_second_rate(tenant.external_id, max_joins_per_second) + + RateCounter.new(rate_args) + + case RateCounter.get(rate_args) do + {:ok, %{limit: %{triggered: false}}} -> + :ok + + {:ok, %{limit: %{triggered: true}}} -> + {:error, :too_many_joins} + + error -> + Logger.error("UnknownErrorOnCounter: #{inspect(error)}") + {:error, error} + end + end +end diff --git a/lib/realtime_web/channels/user_socket.ex b/lib/realtime_web/channels/user_socket.ex index 849aa052d..6d4bf9017 100644 --- a/lib/realtime_web/channels/user_socket.ex +++ b/lib/realtime_web/channels/user_socket.ex @@ -16,6 +16,7 @@ defmodule RealtimeWeb.UserSocket do alias Realtime.PostgresCdc alias Realtime.Tenants + alias RealtimeWeb.TenantRateLimiters alias RealtimeWeb.ChannelsAuthorization alias RealtimeWeb.RealtimeChannel alias RealtimeWeb.RealtimeChannel.Logging @@ -56,6 +57,7 @@ defmodule RealtimeWeb.UserSocket do token when is_binary(token) <- token, jwt_secret_dec <- Crypto.decrypt!(jwt_secret), {:ok, claims} <- ChannelsAuthorization.authorize_conn(token, jwt_secret_dec, jwt_jwks), + :ok <- TenantRateLimiters.check_tenant(tenant), {:ok, postgres_cdc_module} <- PostgresCdc.driver(postgres_cdc_default) do %Tenant{ extensions: extensions, @@ -111,6 +113,16 @@ defmodule RealtimeWeb.UserSocket do log_error("MalformedJWT", "The token provided is not a valid JWT") {:error, :token_malformed} + {:error, :too_many_connections} -> + msg = "Too many connected users" + Logging.log_error(socket, "ConnectionRateLimitReached", msg) + {:error, :too_many_connections} + + {:error, :too_many_joins} -> + msg = "Too many joins per second" + Logging.log_error(socket, "JoinsRateLimitReached", msg) + {:error, :too_many_joins} + error -> log_error("ErrorConnectingToWebsocket", error) error diff --git a/lib/realtime_web/endpoint.ex b/lib/realtime_web/endpoint.ex index 190e1a917..894911803 100644 --- a/lib/realtime_web/endpoint.ex +++ b/lib/realtime_web/endpoint.ex @@ -15,7 +15,7 @@ defmodule RealtimeWeb.Endpoint do websocket: [ connect_info: [:peer_data, :uri, :x_headers], fullsweep_after: 20, - max_frame_size: 8_000_000, + max_frame_size: 5_000_000, # https://github.com/ninenines/cowboy/blob/24d32de931a0c985ff7939077463fc8be939f0e9/doc/src/manual/cowboy_websocket.asciidoc#L228 # active_n: The number of packets Cowboy will request from the socket at once. # This can be used to tweak the performance of the server. Higher values reduce diff --git a/lib/realtime_web/tenant_broadcaster.ex b/lib/realtime_web/tenant_broadcaster.ex index da02df79e..f8b739a0b 100644 --- a/lib/realtime_web/tenant_broadcaster.ex +++ b/lib/realtime_web/tenant_broadcaster.ex @@ -5,9 +5,12 @@ defmodule RealtimeWeb.TenantBroadcaster do alias Phoenix.PubSub - @spec pubsub_broadcast(tenant_id :: String.t(), PubSub.topic(), PubSub.message(), PubSub.dispatcher()) :: :ok - def pubsub_broadcast(tenant_id, topic, message, dispatcher) do - collect_payload_size(tenant_id, message) + @type message_type :: :broadcast | :presence | :postgres_changes + + @spec pubsub_broadcast(tenant_id :: String.t(), PubSub.topic(), PubSub.message(), PubSub.dispatcher(), message_type) :: + :ok + def pubsub_broadcast(tenant_id, topic, message, dispatcher, message_type) do + collect_payload_size(tenant_id, message, message_type) if pubsub_adapter() == :gen_rpc do PubSub.broadcast(Realtime.PubSub, topic, message, dispatcher) @@ -23,11 +26,12 @@ defmodule RealtimeWeb.TenantBroadcaster do from :: pid, PubSub.topic(), PubSub.message(), - PubSub.dispatcher() + PubSub.dispatcher(), + message_type ) :: :ok - def pubsub_broadcast_from(tenant_id, from, topic, message, dispatcher) do - collect_payload_size(tenant_id, message) + def pubsub_broadcast_from(tenant_id, from, topic, message, dispatcher, message_type) do + collect_payload_size(tenant_id, message, message_type) if pubsub_adapter() == :gen_rpc do PubSub.broadcast_from(Realtime.PubSub, from, topic, message, dispatcher) @@ -45,16 +49,18 @@ defmodule RealtimeWeb.TenantBroadcaster do @payload_size_event [:realtime, :tenants, :payload, :size] - defp collect_payload_size(tenant_id, payload) when is_struct(payload) do + @spec collect_payload_size(tenant_id :: String.t(), payload :: term, message_type :: message_type) :: :ok + def collect_payload_size(tenant_id, payload, message_type) when is_struct(payload) do # Extracting from struct so the __struct__ bit is not calculated as part of the payload - collect_payload_size(tenant_id, Map.from_struct(payload)) + collect_payload_size(tenant_id, Map.from_struct(payload), message_type) end - defp collect_payload_size(tenant_id, payload) do - :telemetry.execute(@payload_size_event, %{size: :erlang.external_size(payload)}, %{tenant: tenant_id}) + def collect_payload_size(tenant_id, payload, message_type) do + :telemetry.execute(@payload_size_event, %{size: :erlang.external_size(payload)}, %{ + tenant: tenant_id, + message_type: message_type + }) end - defp pubsub_adapter do - Application.fetch_env!(:realtime, :pubsub_adapter) - end + defp pubsub_adapter, do: Application.fetch_env!(:realtime, :pubsub_adapter) end diff --git a/mix.exs b/mix.exs index 4b0b1f40c..d0e42bf11 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.51.5", + version: "2.53.0", elixir: "~> 1.17.3", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, @@ -53,7 +53,7 @@ defmodule Realtime.MixProject do # Type `mix help deps` for examples and options. defp deps do [ - {:phoenix, "~> 1.7.0"}, + {:phoenix, override: true, github: "supabase/phoenix", branch: "feat/presence-custom-dispatcher-1.7.19"}, {:phoenix_ecto, "~> 4.4.0"}, {:ecto_sql, "~> 3.11"}, {:ecto_psql_extras, "~> 0.8"}, diff --git a/mix.lock b/mix.lock index c5fce6022..ba6f47328 100644 --- a/mix.lock +++ b/mix.lock @@ -66,7 +66,7 @@ "opentelemetry_semantic_conventions": {:hex, :opentelemetry_semantic_conventions, "1.27.0", "acd0194a94a1e57d63da982ee9f4a9f88834ae0b31b0bd850815fe9be4bbb45f", [:mix, :rebar3], [], "hexpm", "9681ccaa24fd3d810b4461581717661fd85ff7019b082c2dff89c7d5b1fc2864"}, "opentelemetry_telemetry": {:hex, :opentelemetry_telemetry, "1.1.2", "410ab4d76b0921f42dbccbe5a7c831b8125282850be649ee1f70050d3961118a", [:mix, :rebar3], [{:opentelemetry_api, "~> 1.3", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "641ab469deb181957ac6d59bce6e1321d5fe2a56df444fc9c19afcad623ab253"}, "otel_http": {:hex, :otel_http, "0.2.0", "b17385986c7f1b862f5d577f72614ecaa29de40392b7618869999326b9a61d8a", [:rebar3], [], "hexpm", "f2beadf922c8cfeb0965488dd736c95cc6ea8b9efce89466b3904d317d7cc717"}, - "phoenix": {:hex, :phoenix, "1.7.19", "36617efe5afbd821099a8b994ff4618a340a5bfb25531a1802c4d4c634017a57", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "ba4dc14458278773f905f8ae6c2ec743d52c3a35b6b353733f64f02dfe096cd6"}, + "phoenix": {:git, "https://github.com/supabase/phoenix.git", "7b884cc0cc1a49ad2bc272acda2e622b3e11c139", [branch: "feat/presence-custom-dispatcher-1.7.19"]}, "phoenix_ecto": {:hex, :phoenix_ecto, "4.4.3", "86e9878f833829c3f66da03d75254c155d91d72a201eb56ae83482328dc7ca93", [:mix], [{:ecto, "~> 3.5", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:plug, "~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "d36c401206f3011fefd63d04e8ef626ec8791975d9d107f9a0817d426f61ac07"}, "phoenix_html": {:hex, :phoenix_html, "3.3.4", "42a09fc443bbc1da37e372a5c8e6755d046f22b9b11343bf885067357da21cb3", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "0249d3abec3714aff3415e7ee3d9786cb325be3151e6c4b3021502c585bf53fb"}, "phoenix_live_dashboard": {:hex, :phoenix_live_dashboard, "0.8.6", "7b1f0327f54c9eb69845fd09a77accf922f488c549a7e7b8618775eb603a62c7", [:mix], [{:ecto, "~> 3.6.2 or ~> 3.7", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_mysql_extras, "~> 0.5", [hex: :ecto_mysql_extras, repo: "hexpm", optional: true]}, {:ecto_psql_extras, "~> 0.7", [hex: :ecto_psql_extras, repo: "hexpm", optional: true]}, {:ecto_sqlite3_extras, "~> 1.1.7 or ~> 1.2.0", [hex: :ecto_sqlite3_extras, repo: "hexpm", optional: true]}, {:mime, "~> 1.6 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:phoenix_live_view, "~> 0.19 or ~> 1.0", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6 or ~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "1681ab813ec26ca6915beb3414aa138f298e17721dc6a2bde9e6eb8a62360ff6"}, diff --git a/test/realtime/extensions/cdc_rls/cdc_rls_test.exs b/test/realtime/extensions/cdc_rls/cdc_rls_test.exs index 5f341c134..d12c0ba73 100644 --- a/test/realtime/extensions/cdc_rls/cdc_rls_test.exs +++ b/test/realtime/extensions/cdc_rls/cdc_rls_test.exs @@ -236,6 +236,15 @@ defmodule Realtime.Extensions.CdcRlsTest do RateCounter.stop(tenant.external_id) + on_exit(fn -> :telemetry.detach(__MODULE__) end) + + :telemetry.attach( + __MODULE__, + [:realtime, :tenants, :payload, :size], + &__MODULE__.handle_telemetry/4, + pid: self() + ) + %{tenant: tenant, conn: conn} end @@ -317,6 +326,13 @@ defmodule Realtime.Extensions.CdcRlsTest do assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} = RateCounter.get(rate) assert 1 in bucket + + assert_receive { + :telemetry, + [:realtime, :tenants, :payload, :size], + %{size: 341}, + %{tenant: "dev_tenant", message_type: :postgres_changes} + } end @aux_mod (quote do @@ -414,4 +430,6 @@ defmodule Realtime.Extensions.CdcRlsTest do :erpc.call(node, PostgresCdcRls, :handle_stop, [tenant.external_id, 10_000]) end end + + def handle_telemetry(event, measures, metadata, pid: pid), do: send(pid, {:telemetry, event, measures, metadata}) end diff --git a/test/realtime/gen_rpc_pub_sub_test.exs b/test/realtime/gen_rpc_pub_sub_test.exs index 0013c2e7b..517c6c369 100644 --- a/test/realtime/gen_rpc_pub_sub_test.exs +++ b/test/realtime/gen_rpc_pub_sub_test.exs @@ -1,2 +1,18 @@ Application.put_env(:phoenix_pubsub, :test_adapter, {Realtime.GenRpcPubSub, []}) Code.require_file("../../deps/phoenix_pubsub/test/shared/pubsub_test.exs", __DIR__) + +defmodule Realtime.GenRpcPubSubTest do + use ExUnit.Case, async: true + + test "it sets off_heap message_queue_data flag on the workers" do + assert Realtime.PubSubElixir.Realtime.PubSub.Adapter_1 + |> Process.whereis() + |> Process.info(:message_queue_data) == {:message_queue_data, :off_heap} + end + + test "it sets fullsweep_after flag on the workers" do + assert Realtime.PubSubElixir.Realtime.PubSub.Adapter_1 + |> Process.whereis() + |> Process.info(:fullsweep_after) == {:fullsweep_after, 100} + end +end diff --git a/test/realtime/monitoring/prom_ex/plugins/tenant_test.exs b/test/realtime/monitoring/prom_ex/plugins/tenant_test.exs index 164c8d2eb..77c1dc7cf 100644 --- a/test/realtime/monitoring/prom_ex/plugins/tenant_test.exs +++ b/test/realtime/monitoring/prom_ex/plugins/tenant_test.exs @@ -129,6 +129,17 @@ defmodule Realtime.PromEx.Plugins.TenantTest do assert metric_value(pattern) == metric_value + 1 end + test "global event exists after counter added", %{tenant: %{external_id: external_id}} do + pattern = + ~r/realtime_channel_global_events\s(?\d+)/ + + metric_value = metric_value(pattern) + FakeUserCounter.fake_event(external_id) + + Process.sleep(200) + assert metric_value(pattern) == metric_value + 1 + end + test "db_event exists after counter added", %{tenant: %{external_id: external_id}} do pattern = ~r/realtime_channel_db_events{tenant="#{external_id}"}\s(?\d+)/ @@ -139,6 +150,16 @@ defmodule Realtime.PromEx.Plugins.TenantTest do assert metric_value(pattern) == metric_value + 1 end + test "global db_event exists after counter added", %{tenant: %{external_id: external_id}} do + pattern = + ~r/realtime_channel_global_db_events\s(?\d+)/ + + metric_value = metric_value(pattern) + FakeUserCounter.fake_db_event(external_id) + Process.sleep(200) + assert metric_value(pattern) == metric_value + 1 + end + test "presence_event exists after counter added", %{tenant: %{external_id: external_id}} do pattern = ~r/realtime_channel_presence_events{tenant="#{external_id}"}\s(?\d+)/ @@ -149,6 +170,16 @@ defmodule Realtime.PromEx.Plugins.TenantTest do assert metric_value(pattern) == metric_value + 1 end + test "global presence_event exists after counter added", %{tenant: %{external_id: external_id}} do + pattern = + ~r/realtime_channel_global_presence_events\s(?\d+)/ + + metric_value = metric_value(pattern) + FakeUserCounter.fake_presence_event(external_id) + Process.sleep(200) + assert metric_value(pattern) == metric_value + 1 + end + test "metric read_authorization_check exists after check", context do pattern = ~r/realtime_tenants_read_authorization_check_count{tenant="#{context.tenant.external_id}"}\s(?\d+)/ @@ -231,18 +262,18 @@ defmodule Realtime.PromEx.Plugins.TenantTest do external_id = context.tenant.external_id pattern = - ~r/realtime_tenants_payload_size_count{tenant="#{external_id}"}\s(?\d+)/ + ~r/realtime_tenants_payload_size_count{message_type=\"presence\",tenant="#{external_id}"}\s(?\d+)/ metric_value = metric_value(pattern) message = %{topic: "a topic", event: "an event", payload: ["a", %{"b" => "c"}, 1, 23]} - RealtimeWeb.TenantBroadcaster.pubsub_broadcast(external_id, "a topic", message, Phoenix.PubSub) + RealtimeWeb.TenantBroadcaster.pubsub_broadcast(external_id, "a topic", message, Phoenix.PubSub, :presence) Process.sleep(200) assert metric_value(pattern) == metric_value + 1 bucket_pattern = - ~r/realtime_tenants_payload_size_bucket{tenant="#{external_id}",le="100"}\s(?\d+)/ + ~r/realtime_tenants_payload_size_bucket{message_type=\"presence\",tenant="#{external_id}",le="250"}\s(?\d+)/ assert metric_value(bucket_pattern) > 0 end @@ -250,17 +281,17 @@ defmodule Realtime.PromEx.Plugins.TenantTest do test "global metric payload size", context do external_id = context.tenant.external_id - pattern = ~r/realtime_payload_size_count\s(?\d+)/ + pattern = ~r/realtime_payload_size_count{message_type=\"broadcast\"}\s(?\d+)/ metric_value = metric_value(pattern) message = %{topic: "a topic", event: "an event", payload: ["a", %{"b" => "c"}, 1, 23]} - RealtimeWeb.TenantBroadcaster.pubsub_broadcast(external_id, "a topic", message, Phoenix.PubSub) + RealtimeWeb.TenantBroadcaster.pubsub_broadcast(external_id, "a topic", message, Phoenix.PubSub, :broadcast) Process.sleep(200) assert metric_value(pattern) == metric_value + 1 - bucket_pattern = ~r/realtime_payload_size_bucket{le="100"}\s(?\d+)/ + bucket_pattern = ~r/realtime_payload_size_bucket{message_type=\"broadcast\",le="250"}\s(?\d+)/ assert metric_value(bucket_pattern) > 0 end diff --git a/test/realtime/monitoring/prom_ex/plugins/tenants_test.exs b/test/realtime/monitoring/prom_ex/plugins/tenants_test.exs index 080fd3cfb..ded087c74 100644 --- a/test/realtime/monitoring/prom_ex/plugins/tenants_test.exs +++ b/test/realtime/monitoring/prom_ex/plugins/tenants_test.exs @@ -37,6 +37,16 @@ defmodule Realtime.PromEx.Plugins.TenantsTest do assert metric_value(pattern) == previous_value + 1 end + test "global success" do + pattern = ~r/realtime_global_rpc_count{mechanism=\"erpc\",success="true"}\s(?\d+)/ + # Enough time for the poll rate to be triggered at least once + Process.sleep(200) + previous_value = metric_value(pattern) + assert {:ok, "success"} = Rpc.enhanced_call(node(), Test, :success, [], tenant_id: "123") + Process.sleep(200) + assert metric_value(pattern) == previous_value + 1 + end + test "failure" do pattern = ~r/realtime_rpc_count{mechanism=\"erpc\",success="false",tenant="123"}\s(?\d+)/ # Enough time for the poll rate to be triggered at least once @@ -47,6 +57,16 @@ defmodule Realtime.PromEx.Plugins.TenantsTest do assert metric_value(pattern) == previous_value + 1 end + test "global failure" do + pattern = ~r/realtime_global_rpc_count{mechanism=\"erpc\",success="false"}\s(?\d+)/ + # Enough time for the poll rate to be triggered at least once + Process.sleep(200) + previous_value = metric_value(pattern) + assert {:error, "failure"} = Rpc.enhanced_call(node(), Test, :failure, [], tenant_id: "123") + Process.sleep(200) + assert metric_value(pattern) == previous_value + 1 + end + test "exception" do pattern = ~r/realtime_rpc_count{mechanism=\"erpc\",success="false",tenant="123"}\s(?\d+)/ # Enough time for the poll rate to be triggered at least once @@ -59,6 +79,19 @@ defmodule Realtime.PromEx.Plugins.TenantsTest do Process.sleep(200) assert metric_value(pattern) == previous_value + 1 end + + test "global exception" do + pattern = ~r/realtime_global_rpc_count{mechanism=\"erpc\",success="false"}\s(?\d+)/ + # Enough time for the poll rate to be triggered at least once + Process.sleep(200) + previous_value = metric_value(pattern) + + assert {:error, :rpc_error, %RuntimeError{message: "runtime error"}} = + Rpc.enhanced_call(node(), Test, :exception, [], tenant_id: "123") + + Process.sleep(200) + assert metric_value(pattern) == previous_value + 1 + end end test "event_metrics rpc" do diff --git a/test/realtime/tenants/connect_test.exs b/test/realtime/tenants/connect_test.exs index a52973d53..741f6ecf7 100644 --- a/test/realtime/tenants/connect_test.exs +++ b/test/realtime/tenants/connect_test.exs @@ -51,6 +51,36 @@ defmodule Realtime.Tenants.ConnectTest do end describe "handle cold start" do + test "multiple processes connecting calling Connect.connect", %{tenant: tenant} do + parent = self() + + # Let's slow down Connect.connect so that multiple RPC calls are executed + stub(Connect, :connect, fn x, y, z -> + :timer.sleep(1000) + call_original(Connect, :connect, [x, y, z]) + end) + + connect = fn -> send(parent, Connect.lookup_or_start_connection(tenant.external_id)) end + # Let's call enough times to potentially trigger the Connect RateCounter + + for _ <- 1..50, do: spawn(connect) + + assert_receive({:ok, pid}, 1100) + + for _ <- 1..49, do: assert_receive({:ok, ^pid}) + + # Does not trigger rate limit as connections eventually succeeded + + {:ok, rate_counter} = + tenant.external_id + |> Tenants.connect_errors_per_second_rate() + |> Realtime.RateCounter.get() + + assert rate_counter.sum == 0 + assert rate_counter.avg == 0.0 + assert rate_counter.limit.triggered == false + end + test "multiple proccesses succeed together", %{tenant: tenant} do parent = self() diff --git a/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs b/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs index 44ce83b99..53be2e51f 100644 --- a/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs +++ b/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs @@ -16,12 +16,24 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do describe "fastlane_metadata/5" do test "info level" do assert MessageDispatcher.fastlane_metadata(self(), Serializer, "realtime:topic", :info, "tenant_id") == - {:rc_fastlane, self(), Serializer, "realtime:topic", {:log, "tenant_id"}, MapSet.new()} + {:rc_fastlane, self(), Serializer, "realtime:topic", :info, "tenant_id", MapSet.new()} end test "non-info level" do assert MessageDispatcher.fastlane_metadata(self(), Serializer, "realtime:topic", :warning, "tenant_id") == - {:rc_fastlane, self(), Serializer, "realtime:topic", MapSet.new()} + {:rc_fastlane, self(), Serializer, "realtime:topic", :warning, "tenant_id", MapSet.new()} + end + + test "replayed message ids" do + assert MessageDispatcher.fastlane_metadata( + self(), + Serializer, + "realtime:topic", + :warning, + "tenant_id", + MapSet.new([1]) + ) == + {:rc_fastlane, self(), Serializer, "realtime:topic", :warning, "tenant_id", MapSet.new([1])} end end @@ -50,8 +62,8 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do from_pid = :erlang.list_to_pid(~c'<0.2.1>') subscribers = [ - {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", {:log, "tenant123"}, MapSet.new()}}, - {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", MapSet.new()}} + {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :info, "tenant123", MapSet.new()}}, + {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :warning, "tenant123", MapSet.new()}} ] msg = %Broadcast{topic: "some:other:topic", event: "event", payload: %{data: "test"}} @@ -74,6 +86,48 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do refute_receive _any end + test "dispatches 'presence_diff' messages to fastlane subscribers" do + parent = self() + + subscriber_pid = + spawn(fn -> + loop = fn loop -> + receive do + msg -> + send(parent, {:subscriber, msg}) + loop.(loop) + end + end + + loop.(loop) + end) + + from_pid = :erlang.list_to_pid(~c'<0.2.1>') + + subscribers = [ + {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :info, "tenant456", MapSet.new()}}, + {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :warning, "tenant456", MapSet.new()}} + ] + + msg = %Broadcast{topic: "some:other:topic", event: "presence_diff", payload: %{data: "test"}} + + log = + capture_log(fn -> + assert MessageDispatcher.dispatch(subscribers, from_pid, msg) == :ok + end) + + assert log =~ "Received message on realtime:topic with payload: #{inspect(msg, pretty: true)}" + + assert_receive {:encoded, %Broadcast{event: "presence_diff", payload: %{data: "test"}, topic: "realtime:topic"}} + assert_receive {:encoded, %Broadcast{event: "presence_diff", payload: %{data: "test"}, topic: "realtime:topic"}} + + assert Agent.get(TestSerializer, & &1) == 1 + + assert Realtime.GenCounter.get(Realtime.Tenants.presence_events_per_second_key("tenant456")) == 2 + + refute_receive _any + end + test "does not dispatch messages to fastlane subscribers if they already replayed it" do parent = self() @@ -95,8 +149,9 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do subscribers = [ {subscriber_pid, - {:rc_fastlane, self(), TestSerializer, "realtime:topic", {:log, "tenant123"}, replaeyd_message_ids}}, - {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", replaeyd_message_ids}} + {:rc_fastlane, self(), TestSerializer, "realtime:topic", :info, "tenant123", replaeyd_message_ids}}, + {subscriber_pid, + {:rc_fastlane, self(), TestSerializer, "realtime:topic", :warning, "tenant123", replaeyd_message_ids}} ] msg = %Broadcast{ @@ -131,8 +186,8 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do from_pid = :erlang.list_to_pid(~c'<0.2.1>') subscribers = [ - {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", {:log, "tenant123"}, MapSet.new()}}, - {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", MapSet.new()}} + {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :info, "tenant123", MapSet.new()}}, + {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :warning, "tenant123", MapSet.new()}} ] msg = %Broadcast{topic: "some:other:topic", event: "event", payload: "not a map"} diff --git a/test/realtime_web/channels/realtime_channel/presence_handler_test.exs b/test/realtime_web/channels/realtime_channel/presence_handler_test.exs index 0cdf422e2..219f13e55 100644 --- a/test/realtime_web/channels/realtime_channel/presence_handler_test.exs +++ b/test/realtime_web/channels/realtime_channel/presence_handler_test.exs @@ -100,25 +100,41 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do end describe "handle/3" do + setup do + on_exit(fn -> :telemetry.detach(__MODULE__) end) + + :telemetry.attach( + __MODULE__, + [:realtime, :tenants, :payload, :size], + &__MODULE__.handle_telemetry/4, + pid: self() + ) + end + test "with true policy and is private, user can track their presence and changes", %{ tenant: tenant, topic: topic, db_conn: db_conn } do + external_id = tenant.external_id key = random_string() policies = %Policies{presence: %PresencePolicies{read: true, write: true}} socket = socket_fixture(tenant, topic, key, policies: policies) - PresenceHandler.handle(%{"event" => "track"}, db_conn, socket) + PresenceHandler.handle(%{"event" => "track", "payload" => %{"A" => "b", "c" => "b"}}, db_conn, socket) topic = socket.assigns.tenant_topic assert_receive %Broadcast{topic: ^topic, event: "presence_diff", payload: %{joins: joins, leaves: %{}}} assert Map.has_key?(joins, key) + + assert_receive {:telemetry, [:realtime, :tenants, :payload, :size], %{size: 30}, + %{tenant: ^external_id, message_type: :presence}} end test "when tracking already existing user, metadata updated", %{tenant: tenant, topic: topic, db_conn: db_conn} do + external_id = tenant.external_id key = random_string() policies = %Policies{presence: %PresencePolicies{read: true, write: true}} socket = socket_fixture(tenant, topic, key, policies: policies) @@ -134,10 +150,18 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do assert_receive %Broadcast{topic: ^topic, event: "presence_diff", payload: %{joins: joins, leaves: %{}}} assert Map.has_key?(joins, key) + + assert_receive {:telemetry, [:realtime, :tenants, :payload, :size], %{size: 6}, + %{tenant: ^external_id, message_type: :presence}} + + assert_receive {:telemetry, [:realtime, :tenants, :payload, :size], %{size: 55}, + %{tenant: ^external_id, message_type: :presence}} + refute_receive :_ end test "with false policy and is public, user can track their presence and changes", %{tenant: tenant, topic: topic} do + external_id = tenant.external_id key = random_string() policies = %Policies{presence: %PresencePolicies{read: false, write: false}} socket = socket_fixture(tenant, topic, key, policies: policies, private?: false) @@ -147,6 +171,9 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do topic = socket.assigns.tenant_topic assert_receive %Broadcast{topic: ^topic, event: "presence_diff", payload: %{joins: joins, leaves: %{}}} assert Map.has_key?(joins, key) + + assert_receive {:telemetry, [:realtime, :tenants, :payload, :size], %{size: 6}, + %{tenant: ^external_id, message_type: :presence}} end test "user can untrack when they want", %{tenant: tenant, topic: topic, db_conn: db_conn} do @@ -434,6 +461,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do assert log =~ "PresenceRateLimitReached" end + @tag :skip @tag policies: [:authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence] test "respects rate limits on private channels", %{tenant: tenant, topic: topic, db_conn: db_conn} do key = random_string() @@ -517,4 +545,6 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do } } end + + def handle_telemetry(event, measures, metadata, pid: pid), do: send(pid, {:telemetry, event, measures, metadata}) end diff --git a/test/realtime_web/channels/realtime_channel_test.exs b/test/realtime_web/channels/realtime_channel_test.exs index ae6c1734a..8022d6ebd 100644 --- a/test/realtime_web/channels/realtime_channel_test.exs +++ b/test/realtime_web/channels/realtime_channel_test.exs @@ -239,23 +239,14 @@ defmodule RealtimeWeb.RealtimeChannelTest do end describe "presence" do - test "events are counted", %{tenant: tenant} do + test "presence state event is counted", %{tenant: tenant} do jwt = Generators.generate_jwt_token(tenant) {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt)) assert {:ok, _, %Socket{} = socket} = subscribe_and_join(socket, "realtime:test", %{}) - presence_diff = %Socket.Broadcast{event: "presence_diff", payload: %{joins: %{}, leaves: %{}}} - send(socket.channel_pid, presence_diff) - assert_receive %Socket.Message{topic: "realtime:test", event: "presence_state", payload: %{}} - assert_receive %Socket.Message{ - topic: "realtime:test", - event: "presence_diff", - payload: %{joins: %{}, leaves: %{}} - } - tenant_id = tenant.external_id # Wait for RateCounter to tick @@ -264,8 +255,8 @@ defmodule RealtimeWeb.RealtimeChannelTest do assert {:ok, %RateCounter{id: {:channel, :presence_events, ^tenant_id}, bucket: bucket}} = RateCounter.get(socket.assigns.presence_rate_counter) - # presence_state + presence_diff - assert 2 in bucket + # presence_state + assert Enum.sum(bucket) == 1 end end diff --git a/test/realtime_web/channels/tenant_rate_limiters_test.exs b/test/realtime_web/channels/tenant_rate_limiters_test.exs new file mode 100644 index 000000000..05d56ec82 --- /dev/null +++ b/test/realtime_web/channels/tenant_rate_limiters_test.exs @@ -0,0 +1,31 @@ +defmodule RealtimeWeb.TenantRateLimitersTest do + use Realtime.DataCase, async: true + + use Mimic + alias RealtimeWeb.TenantRateLimiters + alias Realtime.Api.Tenant + + setup do + tenant = %Tenant{external_id: random_string(), max_concurrent_users: 1, max_joins_per_second: 1} + + %{tenant: tenant} + end + + describe "check_tenant/1" do + test "rate is not exceeded", %{tenant: tenant} do + assert TenantRateLimiters.check_tenant(tenant) == :ok + end + + test "max concurrent users is exceeded", %{tenant: tenant} do + Realtime.UsersCounter.add(self(), tenant.external_id) + + assert TenantRateLimiters.check_tenant(tenant) == {:error, :too_many_connections} + end + + test "max joins is exceeded", %{tenant: tenant} do + expect(Realtime.RateCounter, :get, fn _ -> {:ok, %{limit: %{triggered: true}}} end) + + assert TenantRateLimiters.check_tenant(tenant) == {:error, :too_many_joins} + end + end +end diff --git a/test/realtime_web/controllers/broadcast_controller_test.exs b/test/realtime_web/controllers/broadcast_controller_test.exs index 7bd426353..900eb7aa9 100644 --- a/test/realtime_web/controllers/broadcast_controller_test.exs +++ b/test/realtime_web/controllers/broadcast_controller_test.exs @@ -272,8 +272,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do } do request_events_key = Tenants.requests_per_second_key(tenant) broadcast_events_key = Tenants.events_per_second_key(tenant) - connect_events_key = Tenants.connect_per_second_rate(tenant).id - expect(TenantBroadcaster, :pubsub_broadcast, 5, fn _, _, _, _ -> :ok end) + expect(TenantBroadcaster, :pubsub_broadcast, 5, fn _, _, _, _, _ -> :ok end) messages_to_send = Stream.repeatedly(fn -> generate_message_with_policies(db_conn, tenant) end) @@ -298,7 +297,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages}) - broadcast_calls = calls(&TenantBroadcaster.pubsub_broadcast/4) + broadcast_calls = calls(&TenantBroadcaster.pubsub_broadcast/5) Enum.each(messages_to_send, fn %{topic: topic} -> broadcast_topic = Tenants.tenant_topic(tenant, topic, false) @@ -314,7 +313,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do } assert Enum.any?(broadcast_calls, fn - [_, ^broadcast_topic, ^message, RealtimeChannel.MessageDispatcher] -> true + [_, ^broadcast_topic, ^message, RealtimeChannel.MessageDispatcher, :broadcast] -> true _ -> false end) end) @@ -330,8 +329,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do } do request_events_key = Tenants.requests_per_second_key(tenant) broadcast_events_key = Tenants.events_per_second_key(tenant) - connect_events_key = Tenants.connect_per_second_rate(tenant).id - expect(TenantBroadcaster, :pubsub_broadcast, 6, fn _, _, _, _ -> :ok end) + expect(TenantBroadcaster, :pubsub_broadcast, 6, fn _, _, _, _, _ -> :ok end) channels = Stream.repeatedly(fn -> generate_message_with_policies(db_conn, tenant) end) @@ -366,7 +364,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages}) - broadcast_calls = calls(&TenantBroadcaster.pubsub_broadcast/4) + broadcast_calls = calls(&TenantBroadcaster.pubsub_broadcast/5) Enum.each(channels, fn %{topic: topic} -> broadcast_topic = Tenants.tenant_topic(tenant, topic, false) @@ -382,7 +380,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do } assert Enum.count(broadcast_calls, fn - [_, ^broadcast_topic, ^message, RealtimeChannel.MessageDispatcher] -> true + [_, ^broadcast_topic, ^message, RealtimeChannel.MessageDispatcher, :broadcast] -> true _ -> false end) == 1 end) @@ -401,7 +399,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do open_channel_topic = Tenants.tenant_topic(tenant, "open_channel", true) assert Enum.count(broadcast_calls, fn - [_, ^open_channel_topic, ^message, RealtimeChannel.MessageDispatcher] -> true + [_, ^open_channel_topic, ^message, RealtimeChannel.MessageDispatcher, :broadcast] -> true _ -> false end) == 1 @@ -416,8 +414,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do } do request_events_key = Tenants.requests_per_second_key(tenant) broadcast_events_key = Tenants.events_per_second_key(tenant) - connect_events_key = Tenants.connect_per_second_rate(tenant).id - expect(TenantBroadcaster, :pubsub_broadcast, 5, fn _, _, _, _ -> :ok end) + expect(TenantBroadcaster, :pubsub_broadcast, 5, fn _, _, _, _, _ -> :ok end) messages_to_send = Stream.repeatedly(fn -> generate_message_with_policies(db_conn, tenant) end) @@ -438,12 +435,11 @@ defmodule RealtimeWeb.BroadcastControllerTest do GenCounter |> expect(:add, fn ^request_events_key -> :ok end) # remove the one message that won't be broadcasted for this user - |> expect(:add, 1, fn ^connect_events_key -> :ok end) |> expect(:add, length(messages) - 1, fn ^broadcast_events_key -> :ok end) conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages}) - broadcast_calls = calls(&TenantBroadcaster.pubsub_broadcast/4) + broadcast_calls = calls(&TenantBroadcaster.pubsub_broadcast/5) Enum.each(messages_to_send, fn %{topic: topic} -> broadcast_topic = Tenants.tenant_topic(tenant, topic, false) @@ -459,7 +455,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do } assert Enum.count(broadcast_calls, fn - [_, ^broadcast_topic, ^message, RealtimeChannel.MessageDispatcher] -> true + [_, ^broadcast_topic, ^message, RealtimeChannel.MessageDispatcher, :broadcast] -> true _ -> false end) == 1 end) @@ -472,7 +468,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do @tag role: "anon" test "user without permission won't broadcast", %{conn: conn, db_conn: db_conn, tenant: tenant} do request_events_key = Tenants.requests_per_second_key(tenant) - reject(&TenantBroadcaster.pubsub_broadcast/4) + reject(&TenantBroadcaster.pubsub_broadcast/5) messages = Stream.repeatedly(fn -> generate_message_with_policies(db_conn, tenant) end) diff --git a/test/realtime_web/tenant_broadcaster_test.exs b/test/realtime_web/tenant_broadcaster_test.exs index ddda381a1..bc3b4f90a 100644 --- a/test/realtime_web/tenant_broadcaster_test.exs +++ b/test/realtime_web/tenant_broadcaster_test.exs @@ -60,7 +60,7 @@ defmodule RealtimeWeb.TenantBroadcasterTest do test "pubsub_broadcast", %{node: node} do message = %Broadcast{topic: @topic, event: "an event", payload: %{"a" => "b"}} - TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub) + TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub, :broadcast) assert_receive ^message @@ -71,13 +71,13 @@ defmodule RealtimeWeb.TenantBroadcasterTest do :telemetry, [:realtime, :tenants, :payload, :size], %{size: 114}, - %{tenant: "realtime-dev"} + %{tenant: "realtime-dev", message_type: :broadcast} } end test "pubsub_broadcast list payload", %{node: node} do message = %Broadcast{topic: @topic, event: "an event", payload: ["a", %{"b" => "c"}, 1, 23]} - TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub) + TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub, :broadcast) assert_receive ^message @@ -88,13 +88,13 @@ defmodule RealtimeWeb.TenantBroadcasterTest do :telemetry, [:realtime, :tenants, :payload, :size], %{size: 130}, - %{tenant: "realtime-dev"} + %{tenant: "realtime-dev", message_type: :broadcast} } end test "pubsub_broadcast string payload", %{node: node} do message = %Broadcast{topic: @topic, event: "an event", payload: "some text payload"} - TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub) + TenantBroadcaster.pubsub_broadcast("realtime-dev", @topic, message, Phoenix.PubSub, :broadcast) assert_receive ^message @@ -105,7 +105,7 @@ defmodule RealtimeWeb.TenantBroadcasterTest do :telemetry, [:realtime, :tenants, :payload, :size], %{size: 119}, - %{tenant: "realtime-dev"} + %{tenant: "realtime-dev", message_type: :broadcast} } end end @@ -131,7 +131,7 @@ defmodule RealtimeWeb.TenantBroadcasterTest do message = %Broadcast{topic: @topic, event: "an event", payload: %{"a" => "b"}} - TenantBroadcaster.pubsub_broadcast_from("realtime-dev", self(), @topic, message, Phoenix.PubSub) + TenantBroadcaster.pubsub_broadcast_from("realtime-dev", self(), @topic, message, Phoenix.PubSub, :broadcast) assert_receive {:other_process, ^message} @@ -142,7 +142,7 @@ defmodule RealtimeWeb.TenantBroadcasterTest do :telemetry, [:realtime, :tenants, :payload, :size], %{size: 114}, - %{tenant: "realtime-dev"} + %{tenant: "realtime-dev", message_type: :broadcast} } # This process does not receive the message @@ -151,5 +151,38 @@ defmodule RealtimeWeb.TenantBroadcasterTest do end end + describe "collect_payload_size/3" do + @describetag pubsub_adapter: :gen_rpc + + test "emit telemetry for struct" do + TenantBroadcaster.collect_payload_size( + "realtime-dev", + %Phoenix.Socket.Broadcast{event: "broadcast", payload: %{"a" => "b"}}, + :broadcast + ) + + assert_receive {:telemetry, [:realtime, :tenants, :payload, :size], %{size: 65}, + %{tenant: "realtime-dev", message_type: :broadcast}} + end + + test "emit telemetry for map" do + TenantBroadcaster.collect_payload_size( + "realtime-dev", + %{event: "broadcast", payload: %{"a" => "b"}}, + :postgres_changes + ) + + assert_receive {:telemetry, [:realtime, :tenants, :payload, :size], %{size: 53}, + %{tenant: "realtime-dev", message_type: :postgres_changes}} + end + + test "emit telemetry for non-map" do + TenantBroadcaster.collect_payload_size("realtime-dev", "some blob", :presence) + + assert_receive {:telemetry, [:realtime, :tenants, :payload, :size], %{size: 15}, + %{tenant: "realtime-dev", message_type: :presence}} + end + end + def handle_telemetry(event, measures, metadata, pid: pid), do: send(pid, {:telemetry, event, measures, metadata}) end