Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/mix/tasks/phoenix_sync.install.ex
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ if Code.ensure_loaded?(Igniter) do
defp required_electric_version do
Phoenix.Sync.MixProject.project()
|> Keyword.fetch!(:deps)
|> Enum.find(&match?({:electric, _, _}, &1))
|> Enum.find(&(elem(&1, 0) == :electric))
|> elem(1)
end

Expand Down
7 changes: 4 additions & 3 deletions lib/phoenix/sync/electric.ex
Original file line number Diff line number Diff line change
Expand Up @@ -640,8 +640,9 @@ defmodule Phoenix.Sync.Electric do
end
end

if Code.ensure_loaded?(Electric.Shapes.Api) &&
Code.ensure_loaded?(Phoenix.Sync.Electric.ApiAdapter) do
if Code.ensure_loaded?(Electric.Shapes.Api) do
Code.ensure_loaded(Phoenix.Sync.Electric.ApiAdapter)

defimpl Phoenix.Sync.Adapter.PlugApi, for: Electric.Shapes.Api do
alias Electric.Shapes

Expand Down Expand Up @@ -701,7 +702,7 @@ if Code.ensure_loaded?(Electric.Shapes.Api) &&
end
end

def send_response(%ApiAdapter{}, conn, {request, response}) do
def send_response(_api, conn, {request, response}) do
conn
|> content_type()
|> Plug.Conn.assign(:request, request)
Expand Down
3 changes: 2 additions & 1 deletion lib/phoenix/sync/plug/cors.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ defmodule Phoenix.Sync.Plug.CORS do
"electric-offset",
"electric-schema",
"electric-up-to-date",
"electric-internal-known-error"
"electric-internal-known-error",
"retry-after"
]

@expose_headers ["transfer-encoding" | @electric_headers]
Expand Down
8 changes: 6 additions & 2 deletions lib/phoenix/sync/sandbox.ex
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,11 @@ if Phoenix.Sync.sandbox_enabled?() do
# mark the stack as ready
Electric.StatusMonitor.mark_pg_lock_acquired(stack_id, owner)
Electric.StatusMonitor.mark_replication_client_ready(stack_id, owner)
Electric.StatusMonitor.mark_connection_pool_ready(stack_id, owner)
Electric.StatusMonitor.mark_connection_pool_ready(stack_id, :admin, owner)
Electric.StatusMonitor.mark_connection_pool_ready(stack_id, :snapshot, owner)
Electric.StatusMonitor.mark_integrety_checks_passed(stack_id, owner)
Electric.StatusMonitor.mark_shape_log_collector_ready(stack_id, owner)
Electric.StatusMonitor.mark_supervisor_processes_ready(stack_id, owner)

api_config = Sandbox.Stack.config(stack_id, repo)
api = Electric.Application.api(api_config)
Expand Down Expand Up @@ -292,7 +296,7 @@ if Phoenix.Sync.sandbox_enabled?() do
defp generate_stack_id(opts) do
tags = Keyword.get(opts, :tags, %{})
# with parameterised tests the same file:line can be running simultaneously
uid = System.unique_integer() |> to_string()
uid = System.unique_integer([:monotonic]) |> to_string()

suffix =
case Map.fetch(tags, :line) do
Expand Down
11 changes: 11 additions & 0 deletions lib/phoenix/sync/sandbox/expiry_manager.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule Phoenix.Sync.Sandbox.ExpiryManager do
use GenServer

def start_link(args) do
GenServer.start_link(__MODULE__, args, name: Electric.ShapeCache.ExpiryManager.name(args))
end

def init(_) do
{:ok, []}
end
end
26 changes: 25 additions & 1 deletion lib/phoenix/sync/sandbox/inspector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ if Phoenix.Sync.sandbox_enabled?() do
@impl Electric.Postgres.Inspector
def list_relations_with_stale_cache(_), do: {:ok, []}

@impl Electric.Postgres.Inspector
def load_supported_features(stack_id) do
with {:ok, pid} <- validate_stack_alive(stack_id) do
GenServer.call(pid, :load_supported_features)
end
end

def start_link(args) do
GenServer.start_link(__MODULE__, args, name: name(args[:stack_id]))
end
Expand All @@ -57,7 +64,15 @@ if Phoenix.Sync.sandbox_enabled?() do
{:ok, stack_id} = Keyword.fetch(args, :stack_id)
{:ok, repo} = Keyword.fetch(args, :repo)

{:ok, %{repo: repo, stack_id: stack_id, relations: %{}, columns: %{}, oids: %{}}}
{:ok,
%{
repo: repo,
stack_id: stack_id,
relations: %{},
columns: %{},
oids: %{},
supported_features: %{}
}}
end

@impl GenServer
Expand Down Expand Up @@ -88,6 +103,15 @@ if Phoenix.Sync.sandbox_enabled?() do
{:reply, result, state}
end

def handle_call(:load_supported_features, _from, state) do
{result, state} =
fetch_lazy(state, :supported_features, nil, fn ->
Electric.Postgres.Inspector.DirectInspector.load_supported_features(pool(state))
end)

{:reply, result, state}
end

defp pool(state) do
%{pid: pool} = Ecto.Adapter.lookup_meta(state.repo.get_dynamic_repo())
pool
Expand Down
30 changes: 18 additions & 12 deletions lib/phoenix/sync/sandbox/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ if Phoenix.Sync.sandbox_enabled?() do
@moduledoc false

alias Electric.Replication.Changes.{
Transaction,
Begin,
Commit,
NewRecord,
UpdatedRecord,
DeletedRecord,
Expand Down Expand Up @@ -50,6 +51,12 @@ if Phoenix.Sync.sandbox_enabled?() do

def init(stack_id) do
state = %{txid: 10000, stack_id: stack_id}

Electric.LsnTracker.set_last_processed_lsn(
stack_id,
Electric.Postgres.Lsn.from_integer(0)
)

{:ok, state}
end

Expand All @@ -62,7 +69,7 @@ if Phoenix.Sync.sandbox_enabled?() do
:ok =
txid
|> transaction(msgs)
|> ShapeLogCollector.store_transaction(ShapeLogCollector.name(stack_id))
|> ShapeLogCollector.handle_operations(ShapeLogCollector.name(stack_id))

{:noreply, %{state | txid: next_txid}}
end
Expand All @@ -73,21 +80,20 @@ if Phoenix.Sync.sandbox_enabled?() do
:ok =
state.txid
|> transaction(changes)
|> ShapeLogCollector.store_transaction(ShapeLogCollector.name(state.stack_id))
|> ShapeLogCollector.handle_operations(ShapeLogCollector.name(state.stack_id))

{:noreply, %{state | txid: state.txid + 100}}
end

defp transaction(txid, changes) do
%Transaction{
xid: txid,
lsn: Electric.Postgres.Lsn.from_integer(txid),
last_log_offset: Enum.at(changes, -1) |> Map.fetch!(:log_offset),
changes: changes,
num_changes: length(changes),
commit_timestamp: DateTime.utc_now(),
affected_relations: Enum.into(changes, MapSet.new(), & &1.relation)
}
[%Begin{xid: txid} | changes] ++
[
%Commit{
lsn: Electric.Postgres.Lsn.from_integer(txid),
transaction_size: 100,
commit_timestamp: DateTime.utc_now()
}
]
end

defp msg_from_change({{:insert, schema_meta, values}, i}, lsn, txid) do
Expand Down
54 changes: 14 additions & 40 deletions lib/phoenix/sync/sandbox/publication_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,57 +4,31 @@ if Phoenix.Sync.sandbox_enabled?() do

use GenServer

@behaviour Electric.Replication.PublicationManager

def start_link(_) do
:ignore
end

def init(_arg) do
:ignore
end

def name(stack_id) when is_binary(stack_id) do
Phoenix.Sync.Sandbox.name({__MODULE__, stack_id})
end

def name(opts) when is_list(opts) do
opts
|> Keyword.fetch!(:stack_id)
|> name()
def start_link(args) do
GenServer.start_link(__MODULE__, args, name: name(args))
end

def recover_shape(_shape_handle, _shape, _opts) do
:ok
def name(stack_ref) do
Electric.Replication.PublicationManager.name(stack_ref)
end

def recover_shape(_shape, _opts) do
:ok
def init(opts) do
{:ok, opts}
end

def add_shape(_shape_handle, _shape, opts) do
snapshotter = self()
{:ok, owner} = Keyword.fetch(opts, :owner)
{:ok, repo} = Keyword.fetch(opts, :repo)
# intercept the snapshotter process's add_shape call to add it to the allow
# list for the sandbox repo
def handle_call({:add_shape, _shape_handle, _pub_filter}, {snapshotter, _ref}, state) do
{:ok, owner} = Keyword.fetch(state, :owner)
{:ok, repo} = Keyword.fetch(state, :repo)

Ecto.Adapters.SQL.Sandbox.allow(repo, owner, snapshotter)
:ok
end

def add_shape(_shape, _opts) do
:ok
end

def remove_shape(_shape_handle, _shape, _opts) do
:ok
end

def remove_shape(_shape, _opts) do
:ok
{:reply, :ok, state}
end

def refresh_publication(_opts) do
:ok
def handle_call(_msg, _from, state) do
{:reply, :ok, state}
end
end
end
Loading
Loading