Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ if config_env() == :prod do
http: [
port: String.to_integer(System.get_env("PORT") || "4000"),
transport_options: [
max_connections: String.to_integer(System.get_env("MAX_CONNECTIONS") || "16384"),
# max_connection is per connection supervisor
# num_conns_sups defaults to num_acceptors
# total conns accepted here is max_connections * num_acceptors
# ref: https://ninenines.eu/docs/en/ranch/2.0/manual/ranch/
max_connections: String.to_integer(System.get_env("MAX_CONNECTIONS") || "1000"),
num_acceptors: String.to_integer(System.get_env("NUM_ACCEPTORS") || "100"),
# IMPORTANT: support IPv6 addresses
socket_opts: [:inet6]
Expand Down
6 changes: 3 additions & 3 deletions deploy/fly/prod.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ processes = []
protocol = "tcp"
script_checks = []
[services.concurrency]
# should match ranch.info
hard_limit = 16384
soft_limit = 16384
# should match :ranch.info max_connections * num_acceptors
hard_limit = 100000
soft_limit = 100000
type = "connections"

[[services.ports]]
Expand Down
11 changes: 10 additions & 1 deletion lib/extensions/postgres_cdc_rls/message_dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ defmodule Extensions.PostgresCdcRls.MessageDispatcher do
"""

alias Phoenix.Socket.Broadcast
alias Realtime.Tenants
alias Realtime.GenCounter

def dispatch([_ | _] = topic_subscriptions, _from, payload) do
{sub_ids, payload} = Map.pop(payload, :subscription_ids)

_ =
Enum.reduce(topic_subscriptions, %{}, fn
{_pid, {:subscriber_fastlane, fastlane_pid, serializer, ids, join_topic, is_new_api}},
{_pid,
{:subscriber_fastlane, fastlane_pid, serializer, ids, join_topic, tenant, is_new_api}},
cache ->
for {bin_id, id} <- ids, reduce: [] do
acc ->
Expand All @@ -40,6 +43,7 @@ defmodule Extensions.PostgresCdcRls.MessageDispatcher do
}
end

count(tenant)
broadcast_message(cache, fastlane_pid, new_payload, serializer)

_ ->
Expand All @@ -62,4 +66,9 @@ defmodule Extensions.PostgresCdcRls.MessageDispatcher do
Map.put(cache, msg, encoded_msg)
end
end

defp count(tenant) do
Tenants.db_events_per_second_key(tenant)
|> GenCounter.add()
end
end
13 changes: 7 additions & 6 deletions lib/extensions/postgres_cdc_rls/replication_poller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
alias Extensions.PostgresCdcRls.{Replications, MessageDispatcher}
alias DBConnection.Backoff
alias Realtime.PubSub
alias Realtime.GenCounter
alias Realtime.Tenants

alias Realtime.Adapters.Changes.{
DeletedRecord,
Expand All @@ -40,6 +38,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
args["db_socket_opts"]
)

tenant = args["id"]

state = %{
backoff:
Backoff.new(
Expand All @@ -62,10 +62,10 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
retry_ref: nil,
retry_count: 0,
slot_name: args["slot_name"] <> slot_name_suffix(),
tenant: args["id"]
tenant: tenant
}

Logger.metadata(external_id: state.tenant, project: state.tenant)
Logger.metadata(external_id: tenant, project: tenant)

{:ok, state, {:continue, :prepare}}
end
Expand Down Expand Up @@ -105,7 +105,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
{:ok,
%Postgrex.Result{
columns: ["wal", "is_rls_enabled", "subscription_ids", "errors"] = columns,
rows: [_ | _] = rows
rows: [_ | _] = rows,
num_rows: rows_count
}} ->
Enum.reduce(rows, [], fn row, acc ->
columns
Expand All @@ -130,7 +131,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
)
end)

{:ok, length(rows)}
{:ok, rows_count}

{:ok, _} ->
{:ok, 0}
Expand Down
3 changes: 2 additions & 1 deletion lib/realtime/telemetry/logger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ defmodule Realtime.Telemetry.Logger do
@events [
[:realtime, :connections],
[:realtime, :rate_counter, :channel, :events],
[:realtime, :rate_counter, :channel, :joins]
[:realtime, :rate_counter, :channel, :joins],
[:realtime, :rate_counter, :channel, :db_events]
]

def start_link(args \\ []) do
Expand Down
13 changes: 13 additions & 0 deletions lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@ defmodule Realtime.Tenants do
{:channel, :events, tenant.external_id}
end

@doc """
The GenCounter key to use when counting events for RealtimeChannel events.
"""

@spec db_events_per_second_key(Tenant.t() | String.t()) :: {:channel, :db_events, String.t()}
def db_events_per_second_key(tenant) when is_binary(tenant) do
{:channel, :db_events, tenant}
end

def db_events_per_second_key(%Tenant{} = tenant) do
{:channel, :db_events, tenant.external_id}
end

@spec get_tenant_limits(Realtime.Api.Tenant.t(), maybe_improper_list) :: list
def get_tenant_limits(%Tenant{} = tenant, keys) when is_list(keys) do
nodes = [Node.self() | Node.list()]
Expand Down
21 changes: 20 additions & 1 deletion lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ defmodule RealtimeWeb.RealtimeChannel do

socket = socket |> assign_access_token(params) |> assign_counter()

start_db_rate_counter(tenant)

with false <- SignalHandler.shutdown_in_progress?(),
:ok <- limit_joins(socket),
:ok <- limit_channels(socket),
Expand Down Expand Up @@ -142,8 +144,11 @@ defmodule RealtimeWeb.RealtimeChannel do
{UUID.string_to_binary!(id), :erlang.phash2(params)}
end

IO.inspect(topic, label: "CHANNEL TOPIC")

metadata = [
metadata: {:subscriber_fastlane, transport_pid, serializer, ids, topic, is_new_api}
metadata:
{:subscriber_fastlane, transport_pid, serializer, ids, topic, tenant, is_new_api}
]

# Endpoint.subscribe("realtime:postgres:" <> tenant, metadata)
Expand Down Expand Up @@ -660,4 +665,18 @@ defmodule RealtimeWeb.RealtimeChannel do
e -> {:error, Exception.message(e)}
end
end

defp start_db_rate_counter(tenant) do
key = Tenants.db_events_per_second_key(tenant)
GenCounter.new(key)

RateCounter.new(key,
idle_shutdown: :infinity,
telemetry: %{
event_name: [:channel, :db_events],
measurements: %{},
metadata: %{tenant: tenant}
}
)
end
end
Loading