From 50729076a92dcb4fc46492e56aa8561a50b35958 Mon Sep 17 00:00:00 2001 From: Simon Escobar Benitez Date: Mon, 22 Dec 2025 09:56:36 -0500 Subject: [PATCH 1/3] try to research and fix socket closed errors --- lib/ch/connection.ex | 133 +++++++++++++++++++++++-------------------- mix.exs | 2 +- 2 files changed, 71 insertions(+), 64 deletions(-) diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index 9ff5074..ca87c69 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -65,13 +65,14 @@ defmodule Ch.Connection do @impl true @spec ping(conn) :: {:ok, conn} | {:disconnect, Mint.Types.error() | Error.t(), conn} def ping(conn) do - conn = maybe_reconnect(conn) - headers = [{"user-agent", @user_agent}] + with {:ok, conn} <- maybe_reconnect(conn) do + headers = [{"user-agent", @user_agent}] - case request(conn, "GET", "/ping", headers, _body = "", _opts = []) do - {:ok, conn, _response} -> {:ok, conn} - {:error, error, conn} -> {:disconnect, error, conn} - {:disconnect, _error, _conn} = disconnect -> disconnect + case request(conn, "GET", "/ping", headers, _body = "", _opts = []) do + {:ok, conn, _response} -> {:ok, conn} + {:error, error, conn} -> {:disconnect, error, conn} + {:disconnect, _error, _conn} = disconnect -> disconnect + end end end @@ -103,26 +104,27 @@ defmodule Ch.Connection do @impl true def handle_declare(query, params, opts, conn) do - conn = maybe_reconnect(conn) - %Query{command: command, decode: decode} = query - {query_params, extra_headers, body} = params - - path = path(conn, query_params, opts) - headers = headers(conn, extra_headers, opts) - timeout = timeout(conn, opts) - - with {:ok, conn, _ref} <- send_request(conn, "POST", path, headers, body), - {:ok, conn, columns, headers, reader} <- recv_declare(conn, decode, timeout) do - result = %Result{ - command: command, - columns: columns, - rows: [], - num_rows: 0, - headers: headers, - data: [] - } - - {:ok, query, result, {conn, reader}} + with {:ok, conn} <- maybe_reconnect(conn) do + %Query{command: command, decode: decode} = query + {query_params, extra_headers, body} = params + + path = path(conn, query_params, opts) + headers = headers(conn, extra_headers, opts) + timeout = timeout(conn, opts) + + with {:ok, conn, _ref} <- send_request(conn, "POST", path, headers, body), + {:ok, conn, columns, headers, reader} <- recv_declare(conn, decode, timeout) do + result = %Result{ + command: command, + columns: columns, + rows: [], + num_rows: 0, + headers: headers, + data: [] + } + + {:ok, query, result, {conn, reader}} + end end end @@ -267,16 +269,17 @@ defmodule Ch.Connection do @impl true def handle_execute(%Query{} = query, {:stream, params}, opts, conn) do - conn = maybe_reconnect(conn) - {query_params, extra_headers, body} = params + with {:ok, conn} <- maybe_reconnect(conn) do + {query_params, extra_headers, body} = params - path = path(conn, query_params, opts) - headers = headers(conn, extra_headers, opts) + path = path(conn, query_params, opts) + headers = headers(conn, extra_headers, opts) - with {:ok, conn, ref} <- send_request(conn, "POST", path, headers, :stream) do - case HTTP.stream_request_body(conn, ref, body) do - {:ok, conn} -> {:ok, query, ref, conn} - {:error, conn, reason} -> {:disconnect, reason, conn} + with {:ok, conn, ref} <- send_request(conn, "POST", path, headers, :stream) do + case HTTP.stream_request_body(conn, ref, body) do + {:ok, conn} -> {:ok, query, ref, conn} + {:error, conn, reason} -> {:disconnect, reason, conn} + end end end end @@ -300,33 +303,35 @@ defmodule Ch.Connection do end def handle_execute(%Query{command: :insert} = query, params, opts, conn) do - conn = maybe_reconnect(conn) - {query_params, extra_headers, body} = params + with {:ok, conn} <- maybe_reconnect(conn) do + {query_params, extra_headers, body} = params - path = path(conn, query_params, opts) - headers = headers(conn, extra_headers, opts) + path = path(conn, query_params, opts) + headers = headers(conn, extra_headers, opts) - result = - if is_function(body, 2) do - request_chunked(conn, "POST", path, headers, body, opts) - else - request(conn, "POST", path, headers, body, opts) - end + result = + if is_function(body, 2) do + request_chunked(conn, "POST", path, headers, body, opts) + else + request(conn, "POST", path, headers, body, opts) + end - with {:ok, conn, responses} <- result do - {:ok, query, responses, conn} + with {:ok, conn, responses} <- result do + {:ok, query, responses, conn} + end end end def handle_execute(query, params, opts, conn) do - conn = maybe_reconnect(conn) - {query_params, extra_headers, body} = params + with {:ok, conn} <- maybe_reconnect(conn) do + {query_params, extra_headers, body} = params - path = path(conn, query_params, opts) - headers = headers(conn, extra_headers, opts) + path = path(conn, query_params, opts) + headers = headers(conn, extra_headers, opts) - with {:ok, conn, responses} <- request(conn, "POST", path, headers, body, opts) do - {:ok, query, responses, conn} + with {:ok, conn, responses} <- request(conn, "POST", path, headers, body, opts) do + {:ok, query, responses, conn} + end end end @@ -483,23 +488,25 @@ defmodule Ch.Connection do end # If the http connection was closed by the server, attempt to - # reconnect once. If the re-connect failed, return the old - # connection and let the error bubble up to the caller. + # reconnect once. Returns {:ok, conn} or {:disconnect, reason, old_conn} defp maybe_reconnect(conn) do if HTTP.open?(conn) do - conn + {:ok, conn} else opts = HTTP.get_private(conn, :connect_options) - with {:ok, new_conn} <- do_connect(opts) do - Logger.warning( - "The connection was closed by the server; a new connection has been successfully reestablished." - ) + case do_connect(opts) do + {:ok, new_conn} -> + Logger.error( + "The connection was closed by the server; a new connection has been successfully reestablished." + ) + + # copy settings that are set dynamically (e.g. json as text) over to the new connection + {:ok, maybe_put_private(new_conn, :settings, HTTP.get_private(conn, :settings))} - # copy settings that are set dynamically (e.g. json as text) over to the new connection - maybe_put_private(new_conn, :settings, HTTP.get_private(conn, :settings)) - else - _ -> conn + {:error, reason} -> + Logger.error("Failed to reconnect to the server reason=#{inspect(reason)}") + {:disconnect, reason, conn} end end end diff --git a/mix.exs b/mix.exs index 9392e3d..bdd25e1 100644 --- a/mix.exs +++ b/mix.exs @@ -2,7 +2,7 @@ defmodule Ch.MixProject do use Mix.Project @source_url "https://github.com/plausible/ch" - @version "0.6.1" + @version "0.6.2-alpha.1" def project do [ From 1689d9d26a8b1d78ee2896bc4884161249faa04e Mon Sep 17 00:00:00 2001 From: Simon Escobar Benitez Date: Mon, 22 Dec 2025 11:18:36 -0500 Subject: [PATCH 2/3] apply review comments --- lib/ch/connection.ex | 54 ++++++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 30 deletions(-) diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index ca87c69..f4e5994 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -104,27 +104,24 @@ defmodule Ch.Connection do @impl true def handle_declare(query, params, opts, conn) do - with {:ok, conn} <- maybe_reconnect(conn) do - %Query{command: command, decode: decode} = query - {query_params, extra_headers, body} = params - - path = path(conn, query_params, opts) - headers = headers(conn, extra_headers, opts) - timeout = timeout(conn, opts) - - with {:ok, conn, _ref} <- send_request(conn, "POST", path, headers, body), - {:ok, conn, columns, headers, reader} <- recv_declare(conn, decode, timeout) do - result = %Result{ - command: command, - columns: columns, - rows: [], - num_rows: 0, - headers: headers, - data: [] - } - - {:ok, query, result, {conn, reader}} - end + with {:ok, conn} <- maybe_reconnect(conn), + %Query{command: command, decode: decode} = query, + {query_params, extra_headers, body} = params, + path = path(conn, query_params, opts), + headers = headers(conn, extra_headers, opts), + timeout = timeout(conn, opts), + {:ok, conn, _ref} <- send_request(conn, "POST", path, headers, body), + {:ok, conn, columns, headers, reader} <- recv_declare(conn, decode, timeout) do + result = %Result{ + command: command, + columns: columns, + rows: [], + num_rows: 0, + headers: headers, + data: [] + } + + {:ok, query, result, {conn, reader}} end end @@ -323,15 +320,12 @@ defmodule Ch.Connection do end def handle_execute(query, params, opts, conn) do - with {:ok, conn} <- maybe_reconnect(conn) do - {query_params, extra_headers, body} = params - - path = path(conn, query_params, opts) - headers = headers(conn, extra_headers, opts) - - with {:ok, conn, responses} <- request(conn, "POST", path, headers, body, opts) do - {:ok, query, responses, conn} - end + with {:ok, conn} <- maybe_reconnect(conn), + {query_params, extra_headers, body} = params, + path = path(conn, query_params, opts), + headers = headers(conn, extra_headers, opts), + {:ok, conn, responses} <- request(conn, "POST", path, headers, body, opts) do + {:ok, query, responses, conn} end end From 83994aff6bd6c88a7915912360119b8632b23f91 Mon Sep 17 00:00:00 2001 From: Simon Escobar Benitez Date: Mon, 22 Dec 2025 13:23:56 -0500 Subject: [PATCH 3/3] do no attempt to reconnect --- lib/ch/connection.ex | 100 +++++++++++++++---------------------------- 1 file changed, 35 insertions(+), 65 deletions(-) diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index f4e5994..431ee14 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -65,14 +65,12 @@ defmodule Ch.Connection do @impl true @spec ping(conn) :: {:ok, conn} | {:disconnect, Mint.Types.error() | Error.t(), conn} def ping(conn) do - with {:ok, conn} <- maybe_reconnect(conn) do - headers = [{"user-agent", @user_agent}] + headers = [{"user-agent", @user_agent}] - case request(conn, "GET", "/ping", headers, _body = "", _opts = []) do - {:ok, conn, _response} -> {:ok, conn} - {:error, error, conn} -> {:disconnect, error, conn} - {:disconnect, _error, _conn} = disconnect -> disconnect - end + case request(conn, "GET", "/ping", headers, _body = "", _opts = []) do + {:ok, conn, _response} -> {:ok, conn} + {:error, error, conn} -> {:disconnect, error, conn} + {:disconnect, _error, _conn} = disconnect -> disconnect end end @@ -104,13 +102,13 @@ defmodule Ch.Connection do @impl true def handle_declare(query, params, opts, conn) do - with {:ok, conn} <- maybe_reconnect(conn), - %Query{command: command, decode: decode} = query, - {query_params, extra_headers, body} = params, - path = path(conn, query_params, opts), - headers = headers(conn, extra_headers, opts), - timeout = timeout(conn, opts), - {:ok, conn, _ref} <- send_request(conn, "POST", path, headers, body), + %Query{command: command, decode: decode} = query + {query_params, extra_headers, body} = params + path = path(conn, query_params, opts) + headers = headers(conn, extra_headers, opts) + timeout = timeout(conn, opts) + + with {:ok, conn, _ref} <- send_request(conn, "POST", path, headers, body), {:ok, conn, columns, headers, reader} <- recv_declare(conn, decode, timeout) do result = %Result{ command: command, @@ -266,17 +264,15 @@ defmodule Ch.Connection do @impl true def handle_execute(%Query{} = query, {:stream, params}, opts, conn) do - with {:ok, conn} <- maybe_reconnect(conn) do - {query_params, extra_headers, body} = params + {query_params, extra_headers, body} = params - path = path(conn, query_params, opts) - headers = headers(conn, extra_headers, opts) + path = path(conn, query_params, opts) + headers = headers(conn, extra_headers, opts) - with {:ok, conn, ref} <- send_request(conn, "POST", path, headers, :stream) do - case HTTP.stream_request_body(conn, ref, body) do - {:ok, conn} -> {:ok, query, ref, conn} - {:error, conn, reason} -> {:disconnect, reason, conn} - end + with {:ok, conn, ref} <- send_request(conn, "POST", path, headers, :stream) do + case HTTP.stream_request_body(conn, ref, body) do + {:ok, conn} -> {:ok, query, ref, conn} + {:error, conn, reason} -> {:disconnect, reason, conn} end end end @@ -300,31 +296,29 @@ defmodule Ch.Connection do end def handle_execute(%Query{command: :insert} = query, params, opts, conn) do - with {:ok, conn} <- maybe_reconnect(conn) do - {query_params, extra_headers, body} = params - - path = path(conn, query_params, opts) - headers = headers(conn, extra_headers, opts) + {query_params, extra_headers, body} = params - result = - if is_function(body, 2) do - request_chunked(conn, "POST", path, headers, body, opts) - else - request(conn, "POST", path, headers, body, opts) - end + path = path(conn, query_params, opts) + headers = headers(conn, extra_headers, opts) - with {:ok, conn, responses} <- result do - {:ok, query, responses, conn} + result = + if is_function(body, 2) do + request_chunked(conn, "POST", path, headers, body, opts) + else + request(conn, "POST", path, headers, body, opts) end + + with {:ok, conn, responses} <- result do + {:ok, query, responses, conn} end end def handle_execute(query, params, opts, conn) do - with {:ok, conn} <- maybe_reconnect(conn), - {query_params, extra_headers, body} = params, - path = path(conn, query_params, opts), - headers = headers(conn, extra_headers, opts), - {:ok, conn, responses} <- request(conn, "POST", path, headers, body, opts) do + {query_params, extra_headers, body} = params + path = path(conn, query_params, opts) + headers = headers(conn, extra_headers, opts) + + with {:ok, conn, responses} <- request(conn, "POST", path, headers, body, opts) do {:ok, query, responses, conn} end end @@ -481,30 +475,6 @@ defmodule Ch.Connection do "/?" <> URI.encode_query(settings ++ query_params) end - # If the http connection was closed by the server, attempt to - # reconnect once. Returns {:ok, conn} or {:disconnect, reason, old_conn} - defp maybe_reconnect(conn) do - if HTTP.open?(conn) do - {:ok, conn} - else - opts = HTTP.get_private(conn, :connect_options) - - case do_connect(opts) do - {:ok, new_conn} -> - Logger.error( - "The connection was closed by the server; a new connection has been successfully reestablished." - ) - - # copy settings that are set dynamically (e.g. json as text) over to the new connection - {:ok, maybe_put_private(new_conn, :settings, HTTP.get_private(conn, :settings))} - - {:error, reason} -> - Logger.error("Failed to reconnect to the server reason=#{inspect(reason)}") - {:disconnect, reason, conn} - end - end - end - defp do_connect(opts) do scheme = String.to_existing_atom(opts[:scheme] || "http") address = opts[:hostname] || "localhost"