From cb9d256ac4abf57774327eb1f8830ac961d74739 Mon Sep 17 00:00:00 2001 From: Harun Zengin Date: Fri, 13 Jun 2025 17:21:16 +0200 Subject: [PATCH 1/6] Catch :noproc errors in Connection and make sure that client doesn't get sent any :DOWN messages --- lib/xandra/cluster.ex | 14 +- lib/xandra/connection.ex | 258 ++++++++++++++++--------------- test/integration/errors_test.exs | 2 +- test/xandra_test.exs | 24 +++ 4 files changed, 164 insertions(+), 134 deletions(-) diff --git a/lib/xandra/cluster.ex b/lib/xandra/cluster.ex index ac025ee9..5916301b 100644 --- a/lib/xandra/cluster.ex +++ b/lib/xandra/cluster.ex @@ -434,12 +434,7 @@ defmodule Xandra.Cluster do cluster, options, fn conn -> - try do - Xandra.execute(conn, batch, options_without_retry_strategy) - catch - :exit, {:noproc, _} -> - {:error, ConnectionError.new("execute", {:cluster, :pool_closed})} - end + Xandra.execute(conn, batch, options_without_retry_strategy) end ) end @@ -463,12 +458,7 @@ defmodule Xandra.Cluster do cluster, options, fn conn -> - try do - Xandra.execute(conn, query, params, options_without_retry_strategy) - catch - :exit, {:noproc, _} -> - {:error, ConnectionError.new("execute", {:cluster, :pool_closed})} - end + Xandra.execute(conn, query, params, options_without_retry_strategy) end ) end diff --git a/lib/xandra/connection.ex b/lib/xandra/connection.ex index 5f15b6f9..047fbef3 100644 --- a/lib/xandra/connection.ex +++ b/lib/xandra/connection.ex @@ -64,72 +64,77 @@ defmodule Xandra.Connection do req_alias = Process.monitor(conn_pid, alias: :reply_demonitor) telemetry_metadata = Keyword.fetch!(options, :telemetry_metadata) + try do + case :gen_statem.call(conn_pid, {:checkout_state_for_next_request, req_alias}) do + {:ok, checked_out_state() = state} -> + checked_out_state( + protocol_module: protocol_module, + stream_id: stream_id, + prepared_cache: prepared_cache + ) = state + + metadata = + telemetry_meta(state, conn_pid, %{ + query: prepared, + extra_metadata: telemetry_metadata + }) + + options = Keyword.put(options, :stream_id, stream_id) + prepared = hydrate_query(prepared, state, options) + timeout = Keyword.fetch!(options, :timeout) - case :gen_statem.call(conn_pid, {:checkout_state_for_next_request, req_alias}) do - {:ok, checked_out_state() = state} -> - checked_out_state( - protocol_module: protocol_module, - stream_id: stream_id, - prepared_cache: prepared_cache - ) = state - - metadata = - telemetry_meta(state, conn_pid, %{ - query: prepared, - extra_metadata: telemetry_metadata - }) - - options = Keyword.put(options, :stream_id, stream_id) - prepared = hydrate_query(prepared, state, options) - timeout = Keyword.fetch!(options, :timeout) - - case prepared_cache_lookup(prepared_cache, prepared, Keyword.fetch!(options, :force)) do - # If the prepared query was in the cache, we emit a Telemetry event and we must - # make sure to put the stream ID we checked out back into the pool. - {:ok, prepared} -> - Process.demonitor(req_alias, [:flush]) - :telemetry.execute([:xandra, :prepared_cache, :hit], %{}, metadata) - :gen_statem.cast(conn_pid, {:release_stream_id, stream_id}) - {:ok, prepared} - - {:error, cache_status} -> - # If the prepared query is not in the cache, we need to prepare it and then - # cache it. - :telemetry.execute([:xandra, :prepared_cache, cache_status], %{}, metadata) - - :telemetry.span([:xandra, :prepare_query], metadata, fn -> - with :ok <- send_prepare_frame(state, prepared, options), - {:ok, %Frame{} = frame} <- - receive_response_frame(conn_pid, req_alias, state, timeout) do - case protocol_module.decode_response(frame, prepared, options) do - {%Prepared{} = prepared, warnings} -> - Prepared.Cache.insert(prepared_cache, prepared) - - maybe_execute_telemetry_for_warnings( - state, - conn_pid, - prepared, - warnings - ) - - reprepared = cache_status == :hit - {{:ok, prepared}, Map.put(metadata, :reprepared, reprepared)} - - %Xandra.Error{} = error -> - {{:error, error}, Map.put(metadata, :reason, error)} + case prepared_cache_lookup(prepared_cache, prepared, Keyword.fetch!(options, :force)) do + # If the prepared query was in the cache, we emit a Telemetry event and we must + # make sure to put the stream ID we checked out back into the pool. + {:ok, prepared} -> + Process.demonitor(req_alias, [:flush]) + :telemetry.execute([:xandra, :prepared_cache, :hit], %{}, metadata) + :gen_statem.cast(conn_pid, {:release_stream_id, stream_id}) + {:ok, prepared} + + {:error, cache_status} -> + # If the prepared query is not in the cache, we need to prepare it and then + # cache it. + :telemetry.execute([:xandra, :prepared_cache, cache_status], %{}, metadata) + + :telemetry.span([:xandra, :prepare_query], metadata, fn -> + with :ok <- send_prepare_frame(state, prepared, options), + {:ok, %Frame{} = frame} <- + receive_response_frame(conn_pid, req_alias, state, timeout) do + case protocol_module.decode_response(frame, prepared, options) do + {%Prepared{} = prepared, warnings} -> + Prepared.Cache.insert(prepared_cache, prepared) + + maybe_execute_telemetry_for_warnings( + state, + conn_pid, + prepared, + warnings + ) + + reprepared = cache_status == :hit + {{:ok, prepared}, Map.put(metadata, :reprepared, reprepared)} + + %Xandra.Error{} = error -> + {{:error, error}, Map.put(metadata, :reason, error)} + end + else + {:error, reason} -> + Process.demonitor(req_alias, [:flush]) + reason = ConnectionError.new("prepare", reason) + {{:error, reason}, Map.put(metadata, :reason, reason)} end - else - {:error, reason} -> - Process.demonitor(req_alias, [:flush]) - reason = ConnectionError.new("prepare", reason) - {{:error, reason}, Map.put(metadata, :reason, reason)} - end - end) - end + end) + end - {:error, error} -> + {:error, error} -> + Process.demonitor(req_alias, [:flush]) + {:error, ConnectionError.new("check out connection", error)} + end + catch + :exit, {:noproc, _} -> Process.demonitor(req_alias, [:flush]) - {:error, ConnectionError.new("check out connection", error)} + {:error, ConnectionError.new("check out connection", :no_connection_process)} end end @@ -159,70 +164,81 @@ defmodule Xandra.Connection do conn_pid = GenServer.whereis(conn) req_alias = Process.monitor(conn_pid, alias: :reply_demonitor) - case :gen_statem.call(conn_pid, {:checkout_state_for_next_request, req_alias}) do - {:ok, checked_out_state() = checked_out_state} -> - checked_out_state( - transport: %Transport{} = transport, - protocol_module: protocol_module, - stream_id: stream_id - ) = checked_out_state - - options = Keyword.put(options, :stream_id, stream_id) - query = hydrate_query(query, checked_out_state, options) - timeout = Keyword.fetch!(options, :timeout) - - telemetry_meta = - checked_out_state - |> telemetry_meta(conn_pid, %{query: query}) - |> Map.put(:extra_metadata, options[:telemetry_metadata]) - - # This is in an anonymous function so that we can use it in a Telemetry span. - fun = fn -> - with {:ok, payload} <- encode_query(query_mod, query, params, options), - :ok <- Transport.send(transport, payload), - {:ok, %Frame{} = frame} <- - receive_response_frame(conn_pid, req_alias, checked_out_state, timeout) do - case protocol_module.decode_response(frame, query, options) do - {%_{} = response, warnings} -> - maybe_execute_telemetry_for_warnings(checked_out_state, conn_pid, query, warnings) - - # If the query was a "USE keyspace" query, we need to update the current - # keyspace for the connection. This is race conditioney, but it's probably ok. - case response do - %SetKeyspace{keyspace: keyspace} -> - :gen_statem.cast(conn_pid, {:set_keyspace, keyspace}) - - _other -> - :ok - end - - {:ok, response} - - %Xandra.Error{} = error -> - {:error, error} + try do + case :gen_statem.call(conn_pid, {:checkout_state_for_next_request, req_alias}) do + {:ok, checked_out_state() = checked_out_state} -> + checked_out_state( + transport: %Transport{} = transport, + protocol_module: protocol_module, + stream_id: stream_id + ) = checked_out_state + + options = Keyword.put(options, :stream_id, stream_id) + query = hydrate_query(query, checked_out_state, options) + timeout = Keyword.fetch!(options, :timeout) + + telemetry_meta = + checked_out_state + |> telemetry_meta(conn_pid, %{query: query}) + |> Map.put(:extra_metadata, options[:telemetry_metadata]) + + # This is in an anonymous function so that we can use it in a Telemetry span. + fun = fn -> + with {:ok, payload} <- encode_query(query_mod, query, params, options), + :ok <- Transport.send(transport, payload), + {:ok, %Frame{} = frame} <- + receive_response_frame(conn_pid, req_alias, checked_out_state, timeout) do + case protocol_module.decode_response(frame, query, options) do + {%_{} = response, warnings} -> + maybe_execute_telemetry_for_warnings( + checked_out_state, + conn_pid, + query, + warnings + ) + + # If the query was a "USE keyspace" query, we need to update the current + # keyspace for the connection. This is race conditioney, but it's probably ok. + case response do + %SetKeyspace{keyspace: keyspace} -> + :gen_statem.cast(conn_pid, {:set_keyspace, keyspace}) + + _other -> + :ok + end + + {:ok, response} + + %Xandra.Error{} = error -> + {:error, error} + end + else + {:error, {:encoding_failed, error, stacktrace}} -> + Process.demonitor(req_alias, [:flush]) + :gen_statem.cast(conn_pid, {:release_stream_id, stream_id}) + reraise error, stacktrace + + {:error, reason} -> + Process.demonitor(req_alias, [:flush]) + {:error, ConnectionError.new("execute", reason)} end - else - {:error, {:encoding_failed, error, stacktrace}} -> - Process.demonitor(req_alias, [:flush]) - :gen_statem.cast(conn_pid, {:release_stream_id, stream_id}) - reraise error, stacktrace - - {:error, reason} -> - Process.demonitor(req_alias, [:flush]) - {:error, ConnectionError.new("execute", reason)} end - end - :telemetry.span([:xandra, :execute_query], telemetry_meta, fn -> - case fun.() do - {:ok, response} -> {{:ok, response}, telemetry_meta} - {:error, error} -> {{:error, error}, Map.put(telemetry_meta, :reason, error)} - end - end) + :telemetry.span([:xandra, :execute_query], telemetry_meta, fn -> + case fun.() do + {:ok, response} -> {{:ok, response}, telemetry_meta} + {:error, error} -> {{:error, error}, Map.put(telemetry_meta, :reason, error)} + end + end) - {:error, error} -> + {:error, error} -> + Process.demonitor(req_alias, [:flush]) + {:error, ConnectionError.new("check out connection", error)} + end + catch + :exit, {:noproc, _details} -> Process.demonitor(req_alias, [:flush]) - {:error, ConnectionError.new("check out connection", error)} + {:error, ConnectionError.new("check out connection", :no_connection_process)} end end diff --git a/test/integration/errors_test.exs b/test/integration/errors_test.exs index e2bc224e..8aac76d8 100644 --- a/test/integration/errors_test.exs +++ b/test/integration/errors_test.exs @@ -94,7 +94,7 @@ defmodule ErrorsTest do test "noproc errors are caught" do {:ok, cluster} = start_supervised(Xandra.Cluster.PoolMock) - assert {:error, %ConnectionError{action: "execute", reason: {:cluster, :pool_closed}}} = + assert {:error, %ConnectionError{action: "check out connection", reason: :no_connection_process}} = Cluster.execute(cluster, "select * from system.peers") end end diff --git a/test/xandra_test.exs b/test/xandra_test.exs index ba8dd9ec..977f0dc5 100644 --- a/test/xandra_test.exs +++ b/test/xandra_test.exs @@ -307,6 +307,20 @@ defmodule XandraTest do assert {:ok, prepared} = Xandra.prepare(conn, "SELECT * FROM system.local") assert {:ok, %Xandra.Page{}} = Xandra.execute(conn, prepared, []) end + + test "returns an error when the Connection process doesn't exist" do + dead_pid = dead_pid() + + assert {:error, %ConnectionError{action: "check out connection", reason: :no_connection_process}} = + Xandra.prepare(dead_pid, "SELECT * FROM system.local") + + refute_receive({:DOWN, _ref, :process, _pid, :noproc}, 5) + + assert {:error, %ConnectionError{action: "check out connection", reason: :no_connection_process}} = + Xandra.execute(dead_pid, "SELECT * FROM system.local") + + refute_receive({:DOWN, _ref, :process, _pid, :noproc}, 5) + end end describe "failure handling" do @@ -390,4 +404,14 @@ defmodule XandraTest do send(pid, {ref, options}) Keyword.replace!(options, :nodes, original_start_options[:nodes]) end + + def dead_pid do + pid = spawn(fn -> :ok end) + ref = Process.monitor(pid) + + receive do + {:DOWN, ^ref, :process, ^pid, _reason} -> + pid + end + end end From b54be6bd3cfbc826646ce80c1b4aae2dcfb01f6c Mon Sep 17 00:00:00 2001 From: Harun Zengin Date: Fri, 13 Jun 2025 17:28:33 +0200 Subject: [PATCH 2/6] format no_connection_process reason --- lib/xandra/connection_error.ex | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/xandra/connection_error.ex b/lib/xandra/connection_error.ex index 89ea959d..98df71d8 100644 --- a/lib/xandra/connection_error.ex +++ b/lib/xandra/connection_error.ex @@ -67,6 +67,10 @@ defmodule Xandra.ConnectionError do "connection dropped in the middle of a request" end + defp format_reason(:no_connection_process) do + "the connection process doesn't exist" + end + defp format_reason({:connection_process_crashed, reason}) do "connection process crashed before sending a response with reason: #{inspect(reason)}" end From 6136237741235182739d0f690131926622adc0d8 Mon Sep 17 00:00:00 2001 From: Harun Zengin Date: Fri, 13 Jun 2025 17:30:38 +0200 Subject: [PATCH 3/6] format --- lib/xandra/connection.ex | 5 +++-- test/integration/errors_test.exs | 3 ++- test/xandra_test.exs | 6 ++++-- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/lib/xandra/connection.ex b/lib/xandra/connection.ex index 047fbef3..1d6166d1 100644 --- a/lib/xandra/connection.ex +++ b/lib/xandra/connection.ex @@ -64,6 +64,7 @@ defmodule Xandra.Connection do req_alias = Process.monitor(conn_pid, alias: :reply_demonitor) telemetry_metadata = Keyword.fetch!(options, :telemetry_metadata) + try do case :gen_statem.call(conn_pid, {:checkout_state_for_next_request, req_alias}) do {:ok, checked_out_state() = state} -> @@ -99,8 +100,8 @@ defmodule Xandra.Connection do :telemetry.span([:xandra, :prepare_query], metadata, fn -> with :ok <- send_prepare_frame(state, prepared, options), - {:ok, %Frame{} = frame} <- - receive_response_frame(conn_pid, req_alias, state, timeout) do + {:ok, %Frame{} = frame} <- + receive_response_frame(conn_pid, req_alias, state, timeout) do case protocol_module.decode_response(frame, prepared, options) do {%Prepared{} = prepared, warnings} -> Prepared.Cache.insert(prepared_cache, prepared) diff --git a/test/integration/errors_test.exs b/test/integration/errors_test.exs index 8aac76d8..e1314707 100644 --- a/test/integration/errors_test.exs +++ b/test/integration/errors_test.exs @@ -94,7 +94,8 @@ defmodule ErrorsTest do test "noproc errors are caught" do {:ok, cluster} = start_supervised(Xandra.Cluster.PoolMock) - assert {:error, %ConnectionError{action: "check out connection", reason: :no_connection_process}} = + assert {:error, + %ConnectionError{action: "check out connection", reason: :no_connection_process}} = Cluster.execute(cluster, "select * from system.peers") end end diff --git a/test/xandra_test.exs b/test/xandra_test.exs index 977f0dc5..d990a141 100644 --- a/test/xandra_test.exs +++ b/test/xandra_test.exs @@ -311,12 +311,14 @@ defmodule XandraTest do test "returns an error when the Connection process doesn't exist" do dead_pid = dead_pid() - assert {:error, %ConnectionError{action: "check out connection", reason: :no_connection_process}} = + assert {:error, + %ConnectionError{action: "check out connection", reason: :no_connection_process}} = Xandra.prepare(dead_pid, "SELECT * FROM system.local") refute_receive({:DOWN, _ref, :process, _pid, :noproc}, 5) - assert {:error, %ConnectionError{action: "check out connection", reason: :no_connection_process}} = + assert {:error, + %ConnectionError{action: "check out connection", reason: :no_connection_process}} = Xandra.execute(dead_pid, "SELECT * FROM system.local") refute_receive({:DOWN, _ref, :process, _pid, :noproc}, 5) From 87a62b5c85e28ecee74b83150b3a8132a7802ac4 Mon Sep 17 00:00:00 2001 From: Harun Zengin Date: Mon, 23 Jun 2025 16:32:19 +0200 Subject: [PATCH 4/6] Apply suggested PR review nit-picks --- test/xandra_test.exs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/test/xandra_test.exs b/test/xandra_test.exs index d990a141..32062851 100644 --- a/test/xandra_test.exs +++ b/test/xandra_test.exs @@ -315,13 +315,13 @@ defmodule XandraTest do %ConnectionError{action: "check out connection", reason: :no_connection_process}} = Xandra.prepare(dead_pid, "SELECT * FROM system.local") - refute_receive({:DOWN, _ref, :process, _pid, :noproc}, 5) + refute_receive {:DOWN, _ref, :process, _pid, :noproc}, 5 assert {:error, %ConnectionError{action: "check out connection", reason: :no_connection_process}} = Xandra.execute(dead_pid, "SELECT * FROM system.local") - refute_receive({:DOWN, _ref, :process, _pid, :noproc}, 5) + refute_receive {:DOWN, _ref, :process, _pid, :noproc}, 5 end end @@ -407,13 +407,11 @@ defmodule XandraTest do Keyword.replace!(options, :nodes, original_start_options[:nodes]) end - def dead_pid do - pid = spawn(fn -> :ok end) - ref = Process.monitor(pid) + defp dead_pid do + {pid, ref} = spawn_monitor(fn -> :ok end) receive do - {:DOWN, ^ref, :process, ^pid, _reason} -> - pid + {:DOWN, ^ref, _, _, _} -> pid end end end From bfb81daf9b44f72365c4fc8f0b849f926979e142 Mon Sep 17 00:00:00 2001 From: Harun Zengin Date: Mon, 23 Jun 2025 16:52:12 +0200 Subject: [PATCH 5/6] Implemented suggested gen_statem_call_trapping_noproc function --- lib/xandra/connection.ex | 270 +++++++++++++++++++-------------------- 1 file changed, 132 insertions(+), 138 deletions(-) diff --git a/lib/xandra/connection.ex b/lib/xandra/connection.ex index 1d6166d1..406a7ed5 100644 --- a/lib/xandra/connection.ex +++ b/lib/xandra/connection.ex @@ -65,77 +65,71 @@ defmodule Xandra.Connection do telemetry_metadata = Keyword.fetch!(options, :telemetry_metadata) - try do - case :gen_statem.call(conn_pid, {:checkout_state_for_next_request, req_alias}) do - {:ok, checked_out_state() = state} -> - checked_out_state( - protocol_module: protocol_module, - stream_id: stream_id, - prepared_cache: prepared_cache - ) = state - - metadata = - telemetry_meta(state, conn_pid, %{ - query: prepared, - extra_metadata: telemetry_metadata - }) - - options = Keyword.put(options, :stream_id, stream_id) - prepared = hydrate_query(prepared, state, options) - timeout = Keyword.fetch!(options, :timeout) - - case prepared_cache_lookup(prepared_cache, prepared, Keyword.fetch!(options, :force)) do - # If the prepared query was in the cache, we emit a Telemetry event and we must - # make sure to put the stream ID we checked out back into the pool. - {:ok, prepared} -> - Process.demonitor(req_alias, [:flush]) - :telemetry.execute([:xandra, :prepared_cache, :hit], %{}, metadata) - :gen_statem.cast(conn_pid, {:release_stream_id, stream_id}) - {:ok, prepared} - - {:error, cache_status} -> - # If the prepared query is not in the cache, we need to prepare it and then - # cache it. - :telemetry.execute([:xandra, :prepared_cache, cache_status], %{}, metadata) - - :telemetry.span([:xandra, :prepare_query], metadata, fn -> - with :ok <- send_prepare_frame(state, prepared, options), - {:ok, %Frame{} = frame} <- - receive_response_frame(conn_pid, req_alias, state, timeout) do - case protocol_module.decode_response(frame, prepared, options) do - {%Prepared{} = prepared, warnings} -> - Prepared.Cache.insert(prepared_cache, prepared) - - maybe_execute_telemetry_for_warnings( - state, - conn_pid, - prepared, - warnings - ) - - reprepared = cache_status == :hit - {{:ok, prepared}, Map.put(metadata, :reprepared, reprepared)} - - %Xandra.Error{} = error -> - {{:error, error}, Map.put(metadata, :reason, error)} - end - else - {:error, reason} -> - Process.demonitor(req_alias, [:flush]) - reason = ConnectionError.new("prepare", reason) - {{:error, reason}, Map.put(metadata, :reason, reason)} + case gen_statem_call_trapping_noproc(conn_pid, {:checkout_state_for_next_request, req_alias}) do + {:ok, checked_out_state() = state} -> + checked_out_state( + protocol_module: protocol_module, + stream_id: stream_id, + prepared_cache: prepared_cache + ) = state + + metadata = + telemetry_meta(state, conn_pid, %{ + query: prepared, + extra_metadata: telemetry_metadata + }) + + options = Keyword.put(options, :stream_id, stream_id) + prepared = hydrate_query(prepared, state, options) + timeout = Keyword.fetch!(options, :timeout) + + case prepared_cache_lookup(prepared_cache, prepared, Keyword.fetch!(options, :force)) do + # If the prepared query was in the cache, we emit a Telemetry event and we must + # make sure to put the stream ID we checked out back into the pool. + {:ok, prepared} -> + Process.demonitor(req_alias, [:flush]) + :telemetry.execute([:xandra, :prepared_cache, :hit], %{}, metadata) + :gen_statem.cast(conn_pid, {:release_stream_id, stream_id}) + {:ok, prepared} + + {:error, cache_status} -> + # If the prepared query is not in the cache, we need to prepare it and then + # cache it. + :telemetry.execute([:xandra, :prepared_cache, cache_status], %{}, metadata) + + :telemetry.span([:xandra, :prepare_query], metadata, fn -> + with :ok <- send_prepare_frame(state, prepared, options), + {:ok, %Frame{} = frame} <- + receive_response_frame(conn_pid, req_alias, state, timeout) do + case protocol_module.decode_response(frame, prepared, options) do + {%Prepared{} = prepared, warnings} -> + Prepared.Cache.insert(prepared_cache, prepared) + + maybe_execute_telemetry_for_warnings( + state, + conn_pid, + prepared, + warnings + ) + + reprepared = cache_status == :hit + {{:ok, prepared}, Map.put(metadata, :reprepared, reprepared)} + + %Xandra.Error{} = error -> + {{:error, error}, Map.put(metadata, :reason, error)} end - end) - end + else + {:error, reason} -> + Process.demonitor(req_alias, [:flush]) + reason = ConnectionError.new("prepare", reason) + {{:error, reason}, Map.put(metadata, :reason, reason)} + end + end) + end - {:error, error} -> - Process.demonitor(req_alias, [:flush]) - {:error, ConnectionError.new("check out connection", error)} - end - catch - :exit, {:noproc, _} -> + {:error, error} -> Process.demonitor(req_alias, [:flush]) - {:error, ConnectionError.new("check out connection", :no_connection_process)} + {:error, ConnectionError.new("check out connection", error)} end end @@ -165,84 +159,84 @@ defmodule Xandra.Connection do conn_pid = GenServer.whereis(conn) req_alias = Process.monitor(conn_pid, alias: :reply_demonitor) - try do - case :gen_statem.call(conn_pid, {:checkout_state_for_next_request, req_alias}) do - {:ok, checked_out_state() = checked_out_state} -> - checked_out_state( - transport: %Transport{} = transport, - protocol_module: protocol_module, - stream_id: stream_id - ) = checked_out_state - - options = Keyword.put(options, :stream_id, stream_id) - query = hydrate_query(query, checked_out_state, options) - timeout = Keyword.fetch!(options, :timeout) - - telemetry_meta = - checked_out_state - |> telemetry_meta(conn_pid, %{query: query}) - |> Map.put(:extra_metadata, options[:telemetry_metadata]) - - # This is in an anonymous function so that we can use it in a Telemetry span. - fun = fn -> - with {:ok, payload} <- encode_query(query_mod, query, params, options), - :ok <- Transport.send(transport, payload), - {:ok, %Frame{} = frame} <- - receive_response_frame(conn_pid, req_alias, checked_out_state, timeout) do - case protocol_module.decode_response(frame, query, options) do - {%_{} = response, warnings} -> - maybe_execute_telemetry_for_warnings( - checked_out_state, - conn_pid, - query, - warnings - ) - - # If the query was a "USE keyspace" query, we need to update the current - # keyspace for the connection. This is race conditioney, but it's probably ok. - case response do - %SetKeyspace{keyspace: keyspace} -> - :gen_statem.cast(conn_pid, {:set_keyspace, keyspace}) - - _other -> - :ok - end - - {:ok, response} - - %Xandra.Error{} = error -> - {:error, error} - end - else - {:error, {:encoding_failed, error, stacktrace}} -> - Process.demonitor(req_alias, [:flush]) - :gen_statem.cast(conn_pid, {:release_stream_id, stream_id}) - reraise error, stacktrace - - {:error, reason} -> - Process.demonitor(req_alias, [:flush]) - {:error, ConnectionError.new("execute", reason)} + case gen_statem_call_trapping_noproc(conn_pid, {:checkout_state_for_next_request, req_alias}) do + {:ok, checked_out_state() = checked_out_state} -> + checked_out_state( + transport: %Transport{} = transport, + protocol_module: protocol_module, + stream_id: stream_id + ) = checked_out_state + + options = Keyword.put(options, :stream_id, stream_id) + query = hydrate_query(query, checked_out_state, options) + timeout = Keyword.fetch!(options, :timeout) + + telemetry_meta = + checked_out_state + |> telemetry_meta(conn_pid, %{query: query}) + |> Map.put(:extra_metadata, options[:telemetry_metadata]) + + # This is in an anonymous function so that we can use it in a Telemetry span. + fun = fn -> + with {:ok, payload} <- encode_query(query_mod, query, params, options), + :ok <- Transport.send(transport, payload), + {:ok, %Frame{} = frame} <- + receive_response_frame(conn_pid, req_alias, checked_out_state, timeout) do + case protocol_module.decode_response(frame, query, options) do + {%_{} = response, warnings} -> + maybe_execute_telemetry_for_warnings( + checked_out_state, + conn_pid, + query, + warnings + ) + + # If the query was a "USE keyspace" query, we need to update the current + # keyspace for the connection. This is race conditioney, but it's probably ok. + case response do + %SetKeyspace{keyspace: keyspace} -> + :gen_statem.cast(conn_pid, {:set_keyspace, keyspace}) + + _other -> + :ok + end + + {:ok, response} + + %Xandra.Error{} = error -> + {:error, error} end + else + {:error, {:encoding_failed, error, stacktrace}} -> + Process.demonitor(req_alias, [:flush]) + :gen_statem.cast(conn_pid, {:release_stream_id, stream_id}) + reraise error, stacktrace + + {:error, reason} -> + Process.demonitor(req_alias, [:flush]) + {:error, ConnectionError.new("execute", reason)} end + end - :telemetry.span([:xandra, :execute_query], telemetry_meta, fn -> - case fun.() do - {:ok, response} -> {{:ok, response}, telemetry_meta} - {:error, error} -> {{:error, error}, Map.put(telemetry_meta, :reason, error)} - end - end) + :telemetry.span([:xandra, :execute_query], telemetry_meta, fn -> + case fun.() do + {:ok, response} -> {{:ok, response}, telemetry_meta} + {:error, error} -> {{:error, error}, Map.put(telemetry_meta, :reason, error)} + end + end) - {:error, error} -> - Process.demonitor(req_alias, [:flush]) - {:error, ConnectionError.new("check out connection", error)} - end - catch - :exit, {:noproc, _details} -> + {:error, error} -> Process.demonitor(req_alias, [:flush]) - {:error, ConnectionError.new("check out connection", :no_connection_process)} + {:error, ConnectionError.new("check out connection", error)} end end + defp gen_statem_call_trapping_noproc(pid, call) do + :gen_statem.call(pid, call) + catch + :exit, {:noproc, _} -> {:error, :no_connection_process} + end + defp hydrate_query(%Simple{} = simple, checked_out_state() = response, options) do %Simple{ simple From d3b6d1c2b422109714b628d74d4f1516d893ef8e Mon Sep 17 00:00:00 2001 From: Harun Zengin Date: Mon, 23 Jun 2025 16:54:07 +0200 Subject: [PATCH 6/6] Format --- lib/xandra/connection.ex | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/lib/xandra/connection.ex b/lib/xandra/connection.ex index 406a7ed5..cc4e7dda 100644 --- a/lib/xandra/connection.ex +++ b/lib/xandra/connection.ex @@ -184,12 +184,7 @@ defmodule Xandra.Connection do receive_response_frame(conn_pid, req_alias, checked_out_state, timeout) do case protocol_module.decode_response(frame, query, options) do {%_{} = response, warnings} -> - maybe_execute_telemetry_for_warnings( - checked_out_state, - conn_pid, - query, - warnings - ) + maybe_execute_telemetry_for_warnings(checked_out_state, conn_pid, query, warnings) # If the query was a "USE keyspace" query, we need to update the current # keyspace for the connection. This is race conditioney, but it's probably ok.