From e4ea4bd27f2c4a32485fd6520c468c9cd737e235 Mon Sep 17 00:00:00 2001 From: Ekjot Singh Date: Mon, 21 Jul 2025 16:54:49 +0530 Subject: [PATCH 1/2] feat: v2.8.0 (db events metric) --- config/runtime.exs | 6 +++++- deploy/fly/prod.toml | 6 +++--- .../postgres_cdc_rls/message_dispatcher.ex | 11 +++++++++- .../postgres_cdc_rls/replication_poller.ex | 13 ++++++------ lib/realtime/telemetry/logger.ex | 3 ++- lib/realtime/tenants.ex | 13 ++++++++++++ lib/realtime_web/channels/realtime_channel.ex | 21 ++++++++++++++++++- 7 files changed, 60 insertions(+), 13 deletions(-) diff --git a/config/runtime.exs b/config/runtime.exs index 17b3fc3..c86d2cf 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -18,7 +18,11 @@ if config_env() == :prod do http: [ port: String.to_integer(System.get_env("PORT") || "4000"), transport_options: [ - max_connections: String.to_integer(System.get_env("MAX_CONNECTIONS") || "16384"), + # max_connection is per connection supervisor + # num_conns_sups defaults to num_acceptors + # total conns accepted here is max_connections * num_acceptors + # ref: https://ninenines.eu/docs/en/ranch/2.0/manual/ranch/ + max_connections: String.to_integer(System.get_env("MAX_CONNECTIONS") || "1000"), num_acceptors: String.to_integer(System.get_env("NUM_ACCEPTORS") || "100"), # IMPORTANT: support IPv6 addresses socket_opts: [:inet6] diff --git a/deploy/fly/prod.toml b/deploy/fly/prod.toml index f098ce6..df71a88 100644 --- a/deploy/fly/prod.toml +++ b/deploy/fly/prod.toml @@ -24,9 +24,9 @@ processes = [] protocol = "tcp" script_checks = [] [services.concurrency] - # should match ranch.info - hard_limit = 16384 - soft_limit = 16384 + # should match :ranch.info max_connections * num_acceptors + hard_limit = 100000 + soft_limit = 100000 type = "connections" [[services.ports]] diff --git a/lib/extensions/postgres_cdc_rls/message_dispatcher.ex b/lib/extensions/postgres_cdc_rls/message_dispatcher.ex index b7a942e..f9cba28 100644 --- a/lib/extensions/postgres_cdc_rls/message_dispatcher.ex +++ b/lib/extensions/postgres_cdc_rls/message_dispatcher.ex @@ -7,13 +7,16 @@ defmodule Extensions.PostgresCdcRls.MessageDispatcher do """ alias Phoenix.Socket.Broadcast + alias Realtime.Tenants + alias Realtime.GenCounter def dispatch([_ | _] = topic_subscriptions, _from, payload) do {sub_ids, payload} = Map.pop(payload, :subscription_ids) _ = Enum.reduce(topic_subscriptions, %{}, fn - {_pid, {:subscriber_fastlane, fastlane_pid, serializer, ids, join_topic, is_new_api}}, + {_pid, + {:subscriber_fastlane, fastlane_pid, serializer, ids, join_topic, tenant, is_new_api}}, cache -> for {bin_id, id} <- ids, reduce: [] do acc -> @@ -40,6 +43,7 @@ defmodule Extensions.PostgresCdcRls.MessageDispatcher do } end + count(tenant) broadcast_message(cache, fastlane_pid, new_payload, serializer) _ -> @@ -62,4 +66,9 @@ defmodule Extensions.PostgresCdcRls.MessageDispatcher do Map.put(cache, msg, encoded_msg) end end + + defp count(tenant) do + Tenants.db_events_per_second_key(tenant) + |> GenCounter.add() + end end diff --git a/lib/extensions/postgres_cdc_rls/replication_poller.ex b/lib/extensions/postgres_cdc_rls/replication_poller.ex index 4b8734c..2bca753 100644 --- a/lib/extensions/postgres_cdc_rls/replication_poller.ex +++ b/lib/extensions/postgres_cdc_rls/replication_poller.ex @@ -13,8 +13,6 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do alias Extensions.PostgresCdcRls.{Replications, MessageDispatcher} alias DBConnection.Backoff alias Realtime.PubSub - alias Realtime.GenCounter - alias Realtime.Tenants alias Realtime.Adapters.Changes.{ DeletedRecord, @@ -40,6 +38,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do args["db_socket_opts"] ) + tenant = args["id"] + state = %{ backoff: Backoff.new( @@ -62,10 +62,10 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do retry_ref: nil, retry_count: 0, slot_name: args["slot_name"] <> slot_name_suffix(), - tenant: args["id"] + tenant: tenant } - Logger.metadata(external_id: state.tenant, project: state.tenant) + Logger.metadata(external_id: tenant, project: tenant) {:ok, state, {:continue, :prepare}} end @@ -105,7 +105,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do {:ok, %Postgrex.Result{ columns: ["wal", "is_rls_enabled", "subscription_ids", "errors"] = columns, - rows: [_ | _] = rows + rows: [_ | _] = rows, + num_rows: rows_count }} -> Enum.reduce(rows, [], fn row, acc -> columns @@ -130,7 +131,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do ) end) - {:ok, length(rows)} + {:ok, rows_count} {:ok, _} -> {:ok, 0} diff --git a/lib/realtime/telemetry/logger.ex b/lib/realtime/telemetry/logger.ex index d5ac76d..4c292eb 100644 --- a/lib/realtime/telemetry/logger.ex +++ b/lib/realtime/telemetry/logger.ex @@ -10,7 +10,8 @@ defmodule Realtime.Telemetry.Logger do @events [ [:realtime, :connections], [:realtime, :rate_counter, :channel, :events], - [:realtime, :rate_counter, :channel, :joins] + [:realtime, :rate_counter, :channel, :joins], + [:realtime, :rate_counter, :channel, :db_events] ] def start_link(args \\ []) do diff --git a/lib/realtime/tenants.ex b/lib/realtime/tenants.ex index 0e2e8f8..3d99582 100644 --- a/lib/realtime/tenants.ex +++ b/lib/realtime/tenants.ex @@ -84,6 +84,19 @@ defmodule Realtime.Tenants do {:channel, :events, tenant.external_id} end + @doc """ + The GenCounter key to use when counting events for RealtimeChannel events. + """ + + @spec db_events_per_second_key(Tenant.t() | String.t()) :: {:channel, :db_events, String.t()} + def db_events_per_second_key(tenant) when is_binary(tenant) do + {:channel, :db_events, tenant} + end + + def db_events_per_second_key(%Tenant{} = tenant) do + {:channel, :db_events, tenant.external_id} + end + @spec get_tenant_limits(Realtime.Api.Tenant.t(), maybe_improper_list) :: list def get_tenant_limits(%Tenant{} = tenant, keys) when is_list(keys) do nodes = [Node.self() | Node.list()] diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index 957b1db..f43eba0 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -77,6 +77,8 @@ defmodule RealtimeWeb.RealtimeChannel do socket = socket |> assign_access_token(params) |> assign_counter() + start_db_rate_counter(tenant) + with false <- SignalHandler.shutdown_in_progress?(), :ok <- limit_joins(socket), :ok <- limit_channels(socket), @@ -142,8 +144,11 @@ defmodule RealtimeWeb.RealtimeChannel do {UUID.string_to_binary!(id), :erlang.phash2(params)} end + IO.inspect(topic, label: "CHANNEL TOPIC") + metadata = [ - metadata: {:subscriber_fastlane, transport_pid, serializer, ids, topic, is_new_api} + metadata: + {:subscriber_fastlane, transport_pid, serializer, ids, topic, tenant, is_new_api} ] # Endpoint.subscribe("realtime:postgres:" <> tenant, metadata) @@ -660,4 +665,18 @@ defmodule RealtimeWeb.RealtimeChannel do e -> {:error, Exception.message(e)} end end + + defp start_db_rate_counter(tenant) do + key = Tenants.db_events_per_second_key(tenant) + GenCounter.new(key) + + RateCounter.new(key, + idle_shutdown: :infinity, + telemetry: %{ + event_name: [:channel, :db_events], + measurements: %{}, + metadata: %{tenant: tenant} + } + ) + end end From f53dba07d8484a0da043c507f563fd78c0cccb8a Mon Sep 17 00:00:00 2001 From: Ekjot Singh Date: Mon, 21 Jul 2025 17:00:18 +0530 Subject: [PATCH 2/2] feat: v2.9.0 --- .../postgres_cdc_rls/replication_poller.ex | 19 +++++++++++-- .../monitoring/prom_ex/plugins/tenant.ex | 28 ++++++++++++++++--- lib/realtime_web/channels/realtime_channel.ex | 2 -- mix.exs | 3 +- 4 files changed, 42 insertions(+), 10 deletions(-) diff --git a/lib/extensions/postgres_cdc_rls/replication_poller.ex b/lib/extensions/postgres_cdc_rls/replication_poller.ex index 2bca753..47321fd 100644 --- a/lib/extensions/postgres_cdc_rls/replication_poller.ex +++ b/lib/extensions/postgres_cdc_rls/replication_poller.ex @@ -96,9 +96,24 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do cancel_timer(retry_ref) try do - Replications.list_changes(conn, slot_name, publication, max_changes, max_record_bytes) + {time, response} = + :timer.tc(Replications, :list_changes, [ + conn, + slot_name, + publication, + max_changes, + max_record_bytes + ]) + + Realtime.Telemetry.execute( + [:realtime, :replication, :poller, :query, :stop], + %{duration: time}, + %{tenant: tenant} + ) + + response catch - :error, reason -> + {:error, reason} -> {:error, reason} end |> case do diff --git a/lib/realtime/monitoring/prom_ex/plugins/tenant.ex b/lib/realtime/monitoring/prom_ex/plugins/tenant.ex index 6669532..756954c 100644 --- a/lib/realtime/monitoring/prom_ex/plugins/tenant.ex +++ b/lib/realtime/monitoring/prom_ex/plugins/tenant.ex @@ -20,7 +20,8 @@ defmodule Realtime.PromEx.Plugins.Tenant do def event_metrics(_opts) do # Event metrics definitions [ - channel_events() + channel_events(), + replication_metrics() ] end @@ -71,9 +72,28 @@ defmodule Realtime.PromEx.Plugins.Tenant do end end + defp replication_metrics() do + Event.build( + :realtime_tenant_replication_event_metrics, + [ + distribution( + [:realtime, :replication, :poller, :query, :duration], + event_name: [:realtime, :replication, :poller, :query, :stop], + measurement: :duration, + description: "Duration of the logical replication slot polling query for Realtime RLS.", + tags: [:tenant], + unit: {:native, :millisecond}, + reporter_options: [ + buckets: [125, 250, 500, 1_000, 2_000, 4_000, 8_000, 16_000, 32_000, 64_000] + ] + ) + ] + ) + end + defp channel_events() do Event.build( - :realtime_tenant_events, + :realtime_tenant_channel_event_metrics, [ sum( [:realtime, :channel, :events], @@ -91,14 +111,14 @@ defmodule Realtime.PromEx.Plugins.Tenant do ), sum( [:realtime, :channel, :joins], - event_name: [:realtime, :rate_counter, :channel, :events], + event_name: [:realtime, :rate_counter, :channel, :joins], measurement: :sum, description: "Sum of Realtime Channel joins.", tags: [:tenant] ), last_value( [:realtime, :channel, :joins, :limit_per_second], - event_name: [:realtime, :rate_counter, :channel, :events], + event_name: [:realtime, :rate_counter, :channel, :joins], measurement: :limit, description: "Rate limit of joins per second on a Realtime Channel.", tags: [:tenant] diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index f43eba0..a9d97b1 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -144,8 +144,6 @@ defmodule RealtimeWeb.RealtimeChannel do {UUID.string_to_binary!(id), :erlang.phash2(params)} end - IO.inspect(topic, label: "CHANNEL TOPIC") - metadata = [ metadata: {:subscriber_fastlane, transport_pid, serializer, ids, topic, tenant, is_new_api} diff --git a/mix.exs b/mix.exs index a39f87b..cf4b403 100644 --- a/mix.exs +++ b/mix.exs @@ -4,8 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - # When bumping this version make sure to check - version: "2.7.1", + version: "2.9.0", elixir: "~> 1.14.0", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod,