From e4ea4bd27f2c4a32485fd6520c468c9cd737e235 Mon Sep 17 00:00:00 2001 From: Ekjot Singh Date: Mon, 21 Jul 2025 16:54:49 +0530 Subject: [PATCH] feat: v2.8.0 (db events metric) --- config/runtime.exs | 6 +++++- deploy/fly/prod.toml | 6 +++--- .../postgres_cdc_rls/message_dispatcher.ex | 11 +++++++++- .../postgres_cdc_rls/replication_poller.ex | 13 ++++++------ lib/realtime/telemetry/logger.ex | 3 ++- lib/realtime/tenants.ex | 13 ++++++++++++ lib/realtime_web/channels/realtime_channel.ex | 21 ++++++++++++++++++- 7 files changed, 60 insertions(+), 13 deletions(-) diff --git a/config/runtime.exs b/config/runtime.exs index 17b3fc3..c86d2cf 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -18,7 +18,11 @@ if config_env() == :prod do http: [ port: String.to_integer(System.get_env("PORT") || "4000"), transport_options: [ - max_connections: String.to_integer(System.get_env("MAX_CONNECTIONS") || "16384"), + # max_connection is per connection supervisor + # num_conns_sups defaults to num_acceptors + # total conns accepted here is max_connections * num_acceptors + # ref: https://ninenines.eu/docs/en/ranch/2.0/manual/ranch/ + max_connections: String.to_integer(System.get_env("MAX_CONNECTIONS") || "1000"), num_acceptors: String.to_integer(System.get_env("NUM_ACCEPTORS") || "100"), # IMPORTANT: support IPv6 addresses socket_opts: [:inet6] diff --git a/deploy/fly/prod.toml b/deploy/fly/prod.toml index f098ce6..df71a88 100644 --- a/deploy/fly/prod.toml +++ b/deploy/fly/prod.toml @@ -24,9 +24,9 @@ processes = [] protocol = "tcp" script_checks = [] [services.concurrency] - # should match ranch.info - hard_limit = 16384 - soft_limit = 16384 + # should match :ranch.info max_connections * num_acceptors + hard_limit = 100000 + soft_limit = 100000 type = "connections" [[services.ports]] diff --git a/lib/extensions/postgres_cdc_rls/message_dispatcher.ex b/lib/extensions/postgres_cdc_rls/message_dispatcher.ex index b7a942e..f9cba28 100644 --- a/lib/extensions/postgres_cdc_rls/message_dispatcher.ex +++ b/lib/extensions/postgres_cdc_rls/message_dispatcher.ex @@ -7,13 +7,16 @@ defmodule Extensions.PostgresCdcRls.MessageDispatcher do """ alias Phoenix.Socket.Broadcast + alias Realtime.Tenants + alias Realtime.GenCounter def dispatch([_ | _] = topic_subscriptions, _from, payload) do {sub_ids, payload} = Map.pop(payload, :subscription_ids) _ = Enum.reduce(topic_subscriptions, %{}, fn - {_pid, {:subscriber_fastlane, fastlane_pid, serializer, ids, join_topic, is_new_api}}, + {_pid, + {:subscriber_fastlane, fastlane_pid, serializer, ids, join_topic, tenant, is_new_api}}, cache -> for {bin_id, id} <- ids, reduce: [] do acc -> @@ -40,6 +43,7 @@ defmodule Extensions.PostgresCdcRls.MessageDispatcher do } end + count(tenant) broadcast_message(cache, fastlane_pid, new_payload, serializer) _ -> @@ -62,4 +66,9 @@ defmodule Extensions.PostgresCdcRls.MessageDispatcher do Map.put(cache, msg, encoded_msg) end end + + defp count(tenant) do + Tenants.db_events_per_second_key(tenant) + |> GenCounter.add() + end end diff --git a/lib/extensions/postgres_cdc_rls/replication_poller.ex b/lib/extensions/postgres_cdc_rls/replication_poller.ex index 4b8734c..2bca753 100644 --- a/lib/extensions/postgres_cdc_rls/replication_poller.ex +++ b/lib/extensions/postgres_cdc_rls/replication_poller.ex @@ -13,8 +13,6 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do alias Extensions.PostgresCdcRls.{Replications, MessageDispatcher} alias DBConnection.Backoff alias Realtime.PubSub - alias Realtime.GenCounter - alias Realtime.Tenants alias Realtime.Adapters.Changes.{ DeletedRecord, @@ -40,6 +38,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do args["db_socket_opts"] ) + tenant = args["id"] + state = %{ backoff: Backoff.new( @@ -62,10 +62,10 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do retry_ref: nil, retry_count: 0, slot_name: args["slot_name"] <> slot_name_suffix(), - tenant: args["id"] + tenant: tenant } - Logger.metadata(external_id: state.tenant, project: state.tenant) + Logger.metadata(external_id: tenant, project: tenant) {:ok, state, {:continue, :prepare}} end @@ -105,7 +105,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do {:ok, %Postgrex.Result{ columns: ["wal", "is_rls_enabled", "subscription_ids", "errors"] = columns, - rows: [_ | _] = rows + rows: [_ | _] = rows, + num_rows: rows_count }} -> Enum.reduce(rows, [], fn row, acc -> columns @@ -130,7 +131,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do ) end) - {:ok, length(rows)} + {:ok, rows_count} {:ok, _} -> {:ok, 0} diff --git a/lib/realtime/telemetry/logger.ex b/lib/realtime/telemetry/logger.ex index d5ac76d..4c292eb 100644 --- a/lib/realtime/telemetry/logger.ex +++ b/lib/realtime/telemetry/logger.ex @@ -10,7 +10,8 @@ defmodule Realtime.Telemetry.Logger do @events [ [:realtime, :connections], [:realtime, :rate_counter, :channel, :events], - [:realtime, :rate_counter, :channel, :joins] + [:realtime, :rate_counter, :channel, :joins], + [:realtime, :rate_counter, :channel, :db_events] ] def start_link(args \\ []) do diff --git a/lib/realtime/tenants.ex b/lib/realtime/tenants.ex index 0e2e8f8..3d99582 100644 --- a/lib/realtime/tenants.ex +++ b/lib/realtime/tenants.ex @@ -84,6 +84,19 @@ defmodule Realtime.Tenants do {:channel, :events, tenant.external_id} end + @doc """ + The GenCounter key to use when counting events for RealtimeChannel events. + """ + + @spec db_events_per_second_key(Tenant.t() | String.t()) :: {:channel, :db_events, String.t()} + def db_events_per_second_key(tenant) when is_binary(tenant) do + {:channel, :db_events, tenant} + end + + def db_events_per_second_key(%Tenant{} = tenant) do + {:channel, :db_events, tenant.external_id} + end + @spec get_tenant_limits(Realtime.Api.Tenant.t(), maybe_improper_list) :: list def get_tenant_limits(%Tenant{} = tenant, keys) when is_list(keys) do nodes = [Node.self() | Node.list()] diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index 957b1db..f43eba0 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -77,6 +77,8 @@ defmodule RealtimeWeb.RealtimeChannel do socket = socket |> assign_access_token(params) |> assign_counter() + start_db_rate_counter(tenant) + with false <- SignalHandler.shutdown_in_progress?(), :ok <- limit_joins(socket), :ok <- limit_channels(socket), @@ -142,8 +144,11 @@ defmodule RealtimeWeb.RealtimeChannel do {UUID.string_to_binary!(id), :erlang.phash2(params)} end + IO.inspect(topic, label: "CHANNEL TOPIC") + metadata = [ - metadata: {:subscriber_fastlane, transport_pid, serializer, ids, topic, is_new_api} + metadata: + {:subscriber_fastlane, transport_pid, serializer, ids, topic, tenant, is_new_api} ] # Endpoint.subscribe("realtime:postgres:" <> tenant, metadata) @@ -660,4 +665,18 @@ defmodule RealtimeWeb.RealtimeChannel do e -> {:error, Exception.message(e)} end end + + defp start_db_rate_counter(tenant) do + key = Tenants.db_events_per_second_key(tenant) + GenCounter.new(key) + + RateCounter.new(key, + idle_shutdown: :infinity, + telemetry: %{ + event_name: [:channel, :db_events], + measurements: %{}, + metadata: %{tenant: tenant} + } + ) + end end