Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
841625e
fix: runtime setup error (#1520)
filipecabaco Sep 2, 2025
1b63b4f
fix: use primary instead of replica on rename_settings_field (#1521)
edgurgel Sep 3, 2025
da3404a
feat: upgrade cowboy & ranch (#1523)
edgurgel Sep 4, 2025
bd2c141
fix: Fix GenRpc to not try to connect to nodes that are not alive (#1…
edgurgel Sep 8, 2025
6cfe6e1
fix: enable presence on track message (#1527)
filipecabaco Sep 8, 2025
b13bb21
fix: set cowboy active_n=100 as cowboy 2.12.0 (#1530)
edgurgel Sep 10, 2025
a17ce3e
fix: provide error_code metadata on RealtimeChannel.Logging (#1531)
edgurgel Sep 12, 2025
eeba306
feat: disable UTF8 validation on websocket frames (#1532)
edgurgel Sep 14, 2025
70339c7
fix: move DB setup to happen after Connect.init (#1533)
edgurgel Sep 15, 2025
50891cd
fix: handle wal bloat (#1528)
filipecabaco Sep 15, 2025
5ccea17
feat: replay realtime.messages (#1526)
edgurgel Sep 16, 2025
c4ba2aa
feat: gen_rpc pub sub adapter (#1529)
edgurgel Sep 16, 2025
e8a343a
fix: ensure message id doesn't raise on non-map payloads (#1534)
edgurgel Sep 17, 2025
380b882
fix: match error on Connect (#1536)
filipecabaco Sep 18, 2025
4ba956f
feat: websocket max heap size configuration (#1538)
edgurgel Sep 22, 2025
1df809e
fix: update gen_rpc to fix gen_rpc_dispatcher issues (#1537)
edgurgel Sep 22, 2025
9a21897
fix: improve ErlSysMon logging for processes (#1540)
edgurgel Sep 22, 2025
54cd3f7
fix: make pubsub adapter configurable (#1539)
edgurgel Sep 22, 2025
e4ee7c8
fix: specify that only private channels are allowed when replaying (#…
edgurgel Sep 25, 2025
d4565df
fix: rate limit connect module (#1541)
filipecabaco Sep 26, 2025
d309c55
build: automatically cancel old tests/build on new push (#1545)
kevcodez Sep 27, 2025
a72a835
fix: move message queue data to off-heap for gen_rpc pub sub workers …
edgurgel Sep 30, 2025
353c142
fix: rate limit Connect.lookup_or_start_connection on error only (#1549)
edgurgel Oct 1, 2025
748398c
fix: increase connect error rate window to 30 seconds (#1550)
edgurgel Oct 1, 2025
92e7b59
fix: set a lower fullsweep_after flag for GenRpcPubSub workers (#1551)
edgurgel Oct 1, 2025
6248e2b
fix: hardcode presence limit (#1552)
filipecabaco Oct 2, 2025
e84ac08
fix: further decrease limit on presence events (#1553)
filipecabaco Oct 2, 2025
13052aa
fix: bump up realtime (#1554)
filipecabaco Oct 2, 2025
6e650f0
fix: lower rate limit to 100 events per second (#1556)
filipecabaco Oct 2, 2025
05ac93e
fix: move connect rate limit to socket (#1555)
filipecabaco Oct 2, 2025
e9eaf9f
fix: collect global metrics without tenant tagging (#1557)
edgurgel Oct 2, 2025
16bd44d
feat: presence payload size (#1559)
edgurgel Oct 5, 2025
07de665
fix: use GenRpc for Realtime.Latency pings (#1560)
edgurgel Oct 6, 2025
ecac071
Fastlane for phoenix presence_diff (#1558)
edgurgel Oct 6, 2025
058be58
fix: limit db events (#1562)
edgurgel Oct 7, 2025
d5658ad
chore: split tests and lint workflows (#1564)
edgurgel Oct 8, 2025
8b621bd
fix: use LiveView stream for status page (#1565)
chasers Oct 8, 2025
aeafab6
fix: refine join payload checking (#1567)
filipecabaco Oct 10, 2025
cbcbbfd
fix: shard user scopes in syn (#1566)
filipecabaco Oct 10, 2025
e773008
Merge branch 'main' into upstream-main
Fudster Oct 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
name: Lint
on:
pull_request:
paths:
- "lib/**"
- "test/**"
- "config/**"
- "priv/**"
- "assets/**"
- "rel/**"
- "mix.exs"
- "Dockerfile"
- "run.sh"

push:
branches:
- main

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
tests:
name: Lint
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Setup elixir
id: beam
uses: erlef/setup-beam@v1
with:
otp-version: 27.x # Define the OTP version [required]
elixir-version: 1.17.x # Define the elixir version [required]
- name: Cache Mix
uses: actions/cache@v4
with:
path: |
deps
_build
key: ${{ github.workflow }}-${{ runner.os }}-mix-${{ env.elixir }}-${{ env.otp }}-${{ hashFiles('**/mix.lock') }}
restore-keys: |
${{ github.workflow }}-${{ runner.os }}-mix-${{ env.elixir }}-${{ env.otp }}-

- name: Install dependencies
run: mix deps.get
- name: Set up Postgres
run: docker compose -f docker-compose.dbs.yml up -d
- name: Run main database migrations
run: mix ecto.migrate --log-migrator-sql
- name: Run database tenant migrations
run: mix ecto.migrate --migrations-path lib/realtime/tenants/repo/migrations
- name: Run format check
run: mix format --check-formatted
- name: Credo checks
run: mix credo
- name: Run hex audit
run: mix hex.audit
- name: Run mix_audit
run: mix deps.audit
- name: Run sobelow
run: mix sobelow --config .sobelow-conf
- name: Retrieve PLT Cache
uses: actions/cache@v4
id: plt-cache
with:
path: priv/plts
key: ${{ runner.os }}-${{ steps.beam.outputs.otp-version }}-${{ steps.beam.outputs.elixir-version }}-plts-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}
- name: Create PLTs
if: steps.plt-cache.outputs.cache-hit != 'true'
run: |
mkdir -p priv/plts
mix dialyzer.build
- name: Run dialyzer
run: mix dialyzer
- name: Run dev seeds
run: DB_ENC_KEY="1234567890123456" mix ecto.setup
42 changes: 10 additions & 32 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

env:
MIX_ENV: test

jobs:
tests:
name: Tests
Expand All @@ -36,44 +39,19 @@ jobs:
- name: Cache Mix
uses: actions/cache@v4
with:
path: deps
key: ${{ runner.os }}-mix-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}
path: |
deps
_build
key: ${{ github.workflow }}-${{ runner.os }}-mix-${{ env.elixir }}-${{ env.otp }}-${{ hashFiles('**/mix.lock') }}
restore-keys: |
${{ runner.os }}-mix-
${{ github.workflow }}-${{ runner.os }}-mix-${{ env.elixir }}-${{ env.otp }}-

- name: Pull postgres image quietly in background (used by test/support/containers.ex)
run: docker pull supabase/postgres:15.8.1.040 > /dev/null 2>&1 &
- name: Install dependencies
run: mix deps.get
- name: Set up Postgres
run: docker compose -f docker-compose.dbs.yml up -d
- name: Run main database migrations
run: mix ecto.migrate --log-migrator-sql
- name: Run database tenant migrations
run: mix ecto.migrate --migrations-path lib/realtime/tenants/repo/migrations
- name: Run format check
run: mix format --check-formatted
- name: Credo checks
run: mix credo
- name: Run hex audit
run: mix hex.audit
- name: Run mix_audit
run: mix deps.audit
- name: Run sobelow
run: mix sobelow --config .sobelow-conf
- name: Retrieve PLT Cache
uses: actions/cache@v4
id: plt-cache
with:
path: priv/plts
key: ${{ runner.os }}-${{ steps.beam.outputs.otp-version }}-${{ steps.beam.outputs.elixir-version }}-plts-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}
- name: Create PLTs
if: steps.plt-cache.outputs.cache-hit != 'true'
run: |
mkdir -p priv/plts
mix dialyzer.build
- name: Run dialyzer
run: mix dialyzer
- name: Run dev seeds
run: DB_ENC_KEY="1234567890123456" mix ecto.setup
- name: Start epmd
run: epmd -daemon
- name: Run tests
Expand Down
4 changes: 3 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ platform = if System.get_env("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE", do: :aws
broadcast_pool_size = Env.get_integer("BROADCAST_POOL_SIZE", 10)
pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "gen_rpc") |> String.to_atom()
websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_000), :erlang.system_info(:wordsize))
users_scope_shards = Env.get_integer("USERS_SCOPE_SHARDS", 5)

no_channel_timeout_in_ms =
if config_env() == :test,
Expand Down Expand Up @@ -126,7 +127,8 @@ config :realtime,
no_channel_timeout_in_ms: no_channel_timeout_in_ms,
platform: platform,
pubsub_adapter: pubsub_adapter,
broadcast_pool_size: broadcast_pool_size
broadcast_pool_size: broadcast_pool_size,
users_scope_shards: users_scope_shards

if config_env() != :test && run_janitor? do
config :realtime,
Expand Down
39 changes: 29 additions & 10 deletions lib/extensions/postgres_cdc_rls/replication_poller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
alias Realtime.Adapters.Changes.NewRecord
alias Realtime.Adapters.Changes.UpdatedRecord
alias Realtime.Database
alias Realtime.RateCounter
alias Realtime.Tenants

def start_link(opts), do: GenServer.start_link(__MODULE__, opts)

Expand All @@ -26,6 +28,12 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
tenant_id = args["id"]
Logger.metadata(external_id: tenant_id, project: tenant_id)

%Realtime.Api.Tenant{} = Tenants.Cache.get_tenant_by_external_id(tenant_id)

rate_counter_args = Tenants.db_events_per_second_rate(tenant_id, 4000)

RateCounter.new(rate_counter_args)

state = %{
backoff: Backoff.new(backoff_min: 100, backoff_max: 5_000, backoff_type: :rand_exp),
db_host: args["db_host"],
Expand All @@ -41,7 +49,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
retry_ref: nil,
retry_count: 0,
slot_name: args["slot_name"] <> slot_name_suffix(),
tenant_id: tenant_id
tenant_id: tenant_id,
rate_counter_args: rate_counter_args
}

{:ok, _} = Registry.register(__MODULE__.Registry, tenant_id, %{})
Expand Down Expand Up @@ -74,7 +83,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
max_record_bytes: max_record_bytes,
max_changes: max_changes,
conn: conn,
tenant_id: tenant_id
tenant_id: tenant_id,
rate_counter_args: rate_counter_args
} = state
) do
cancel_timer(poll_ref)
Expand All @@ -84,7 +94,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
{time, list_changes} = :timer.tc(Replications, :list_changes, args)
record_list_changes_telemetry(time, tenant_id)

case handle_list_changes_result(list_changes, tenant_id) do
case handle_list_changes_result(list_changes, tenant_id, rate_counter_args) do
{:ok, row_count} ->
Backoff.reset(backoff)

Expand Down Expand Up @@ -177,20 +187,29 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
rows: [_ | _] = rows,
num_rows: rows_count
}},
tenant_id
tenant_id,
rate_counter_args
) do
for row <- rows,
change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
topic = "realtime:postgres:" <> tenant_id
case RateCounter.get(rate_counter_args) do
{:ok, %{limit: %{triggered: true}}} ->
:ok

RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher, :postgres_changes)
_ ->
Realtime.GenCounter.add(rate_counter_args.id, rows_count)

for row <- rows,
change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
topic = "realtime:postgres:" <> tenant_id

RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher, :postgres_changes)
end
end

{:ok, rows_count}
end

defp handle_list_changes_result({:ok, _}, _), do: {:ok, 0}
defp handle_list_changes_result({:error, reason}, _), do: {:error, reason}
defp handle_list_changes_result({:ok, _}, _, _), do: {:ok, 0}
defp handle_list_changes_result({:error, reason}, _, _), do: {:error, reason}

def generate_record([
{"wal",
Expand Down
3 changes: 1 addition & 2 deletions lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ defmodule Realtime.Application do
Realtime.PromEx.set_metrics_tags()
:ets.new(Realtime.Tenants.Connect, [:named_table, :set, :public])
:syn.set_event_handler(Realtime.SynHandler)

:ok = :syn.add_node_to_scopes([:users, RegionNodes, Realtime.Tenants.Connect])
:ok = :syn.add_node_to_scopes([RegionNodes, Realtime.Tenants.Connect | Realtime.UsersCounter.scopes()])

region = Application.get_env(:realtime, :region)
:syn.join(RegionNodes, region, self(), node: node())
Expand Down
5 changes: 4 additions & 1 deletion lib/realtime/nodes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ defmodule Realtime.Nodes do

iex> node = :"pink@127.0.0.1"
iex> Realtime.Helpers.short_node_id_from_name(node)
"127.0.0.1"
"pink@127.0.0.1"

iex> node = :"pink@10.0.1.1"
iex> Realtime.Helpers.short_node_id_from_name(node)
Expand All @@ -124,6 +124,9 @@ defmodule Realtime.Nodes do
[_, _, _, _, _, one, two, _] ->
one <> two

["127.0.0.1"] ->
Atom.to_string(name)

_other ->
host
end
Expand Down
28 changes: 27 additions & 1 deletion lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ defmodule Realtime.Tenants do
"""
@spec list_connected_tenants(atom()) :: [String.t()]
def list_connected_tenants(node) do
:syn.group_names(:users, node)
UsersCounter.scopes()
|> Enum.flat_map(fn scope -> :syn.group_names(scope, node) end)
end

@doc """
Expand Down Expand Up @@ -247,6 +248,31 @@ defmodule Realtime.Tenants do
%RateCounter.Args{id: db_events_per_second_key(tenant_id), opts: opts}
end

@doc "RateCounter arguments for counting database events per second with a limit."
@spec db_events_per_second_rate(String.t(), non_neg_integer) :: RateCounter.Args.t()
def db_events_per_second_rate(tenant_id, max_events_per_second) when is_binary(tenant_id) do
opts = [
telemetry: %{
event_name: [:channel, :db_events],
measurements: %{},
metadata: %{tenant: tenant_id}
},
limit: [
value: max_events_per_second,
measurement: :avg,
log: true,
log_fn: fn ->
Logger.error("MessagePerSecondRateLimitReached: Too many postgres changes messages per second",
external_id: tenant_id,
project: tenant_id
)
end
]
]

%RateCounter.Args{id: db_events_per_second_key(tenant_id), opts: opts}
end

@doc """
The GenCounter key to use when counting events for RealtimeChannel events.
iex> Realtime.Tenants.db_events_per_second_key("tenant_id")
Expand Down
21 changes: 18 additions & 3 deletions lib/realtime/user_counter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,32 @@ defmodule Realtime.UsersCounter do
Adds a RealtimeChannel pid to the `:users` scope for a tenant so we can keep track of all connected clients for a tenant.
"""
@spec add(pid(), String.t()) :: :ok
def add(pid, tenant), do: :syn.join(:users, tenant, pid)
def add(pid, tenant_id), do: tenant_id |> scope() |> :syn.join(tenant_id, pid)

@doc """
Returns the count of all connected clients for a tenant for the cluster.
"""
@spec tenant_users(String.t()) :: non_neg_integer()
def tenant_users(tenant), do: :syn.member_count(:users, tenant)
def tenant_users(tenant_id), do: tenant_id |> scope() |> :syn.member_count(tenant_id)

@doc """
Returns the count of all connected clients for a tenant for a single node.
"""
@spec tenant_users(atom, String.t()) :: non_neg_integer()
def tenant_users(node_name, tenant), do: :syn.member_count(:users, tenant, node_name)
def tenant_users(node_name, tenant_id), do: tenant_id |> scope() |> :syn.member_count(tenant_id, node_name)

@doc """
Returns the scope for a given tenant id.
"""
@spec scope(String.t()) :: atom()
def scope(tenant_id) do
shards = Application.get_env(:realtime, :users_scope_shards)
shard = :erlang.phash2(tenant_id, shards)
:"users_#{shard}"
end

def scopes() do
shards = Application.get_env(:realtime, :users_scope_shards)
Enum.map(0..(shards - 1), fn shard -> :"users_#{shard}" end)
end
end
8 changes: 8 additions & 0 deletions lib/realtime_web/channels/payloads/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ defmodule RealtimeWeb.Channels.Payloads.Config do
end

def changeset(config, attrs) do
attrs =
attrs
|> Enum.map(fn
{k, v} when is_list(v) -> {k, Enum.filter(v, fn v -> v != nil end)}
{k, v} -> {k, v}
end)
|> Map.new()

config
|> cast(attrs, [:private], message: &Join.error_message/2)
|> cast_embed(:broadcast, invalid_message: "unable to parse, expected a map")
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime_web/channels/payloads/presence.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule RealtimeWeb.Channels.Payloads.Presence do

embedded_schema do
field :enabled, :boolean, default: true
field :key, :string, default: UUID.uuid1()
field :key, :any, default: UUID.uuid1(), virtual: true
end

def changeset(presence, attrs) do
Expand Down
Loading
Loading