diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index 09ba17e7..8f7ca82a 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -12,7 +12,23 @@ defmodule Ch.Connection do @impl true @spec connect([Ch.start_option()]) :: {:ok, conn} | {:error, Error.t() | Mint.Types.error()} def connect(opts) do - with {:ok, conn} <- do_connect(opts) do + scheme = String.to_existing_atom(opts[:scheme] || "http") + address = opts[:hostname] || "localhost" + port = opts[:port] || 8123 + mint_opts = [mode: :passive] ++ Keyword.take(opts, [:hostname, :transport_opts]) + + with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do + monitor_socket(conn.socket) + IO.inspect(conn.socket, label: "connect") + + conn = + conn + |> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15)) + |> maybe_put_private(:database, opts[:database]) + |> maybe_put_private(:username, opts[:username]) + |> maybe_put_private(:password, opts[:password]) + |> maybe_put_private(:settings, opts[:settings]) + handshake = Query.build("select 1") params = DBConnection.Query.encode(handshake, _params = [], _opts = []) @@ -37,12 +53,13 @@ defmodule Ch.Connection do {:error, reason} end end + catch + _kind, reason -> {:error, reason} end @impl true @spec ping(conn) :: {:ok, conn} | {:disconnect, Mint.Types.error() | Error.t(), conn} def ping(conn) do - conn = maybe_reconnect(conn) headers = [{"user-agent", @user_agent}] case request(conn, "GET", "/ping", headers, _body = "", _opts = []) do @@ -54,7 +71,10 @@ defmodule Ch.Connection do @impl true @spec checkout(conn) :: {:ok, conn} - def checkout(conn), do: {:ok, conn} + def checkout(conn) do + IO.inspect(conn.socket, label: "Ch.checkout") + {:ok, conn} + end # we "support" these four tx callbacks for Repo.checkout # even though ClickHouse doesn't support txs @@ -80,7 +100,6 @@ defmodule Ch.Connection do @impl true def handle_declare(query, params, opts, conn) do - conn = maybe_reconnect(conn) %Query{command: command} = query {query_params, extra_headers, body} = params @@ -180,7 +199,6 @@ defmodule Ch.Connection do @impl true def handle_execute(%Query{} = query, {:stream, params}, opts, conn) do - conn = maybe_reconnect(conn) {query_params, extra_headers, body} = params path = path(conn, query_params, opts) @@ -213,7 +231,6 @@ defmodule Ch.Connection do end def handle_execute(%Query{command: :insert} = query, params, opts, conn) do - conn = maybe_reconnect(conn) {query_params, extra_headers, body} = params path = path(conn, query_params, opts) @@ -232,7 +249,7 @@ defmodule Ch.Connection do end def handle_execute(query, params, opts, conn) do - conn = maybe_reconnect(conn) + IO.puts("query incoming #{inspect(query)} #{inspect(conn.socket)}") {query_params, extra_headers, body} = params path = path(conn, query_params, opts) @@ -243,8 +260,25 @@ defmodule Ch.Connection do end end + def handle_info({:DOWN, _ref, :port, socket, _reason}, conn) do + IO.puts("socket #{inspect(socket)} closed") + + IO.inspect( + if conn.socket == socket do + {:disconnect, Mint.TransportError.exception(reason: :closed)} + else + :ok + end + ) + end + + def handle_info(msg, _state) do + Logger.error(["Unhandled message in Ch.Connection: ", inspect(msg)]) + end + @impl true def disconnect(_error, conn) do + IO.puts("disconnect #{inspect(conn.socket)}") {:ok = ok, _conn} = HTTP.close(conn) ok end @@ -391,49 +425,6 @@ defmodule Ch.Connection do "/?" <> URI.encode_query(settings ++ query_params) end - # If the http connection was closed by the server, attempt to - # reconnect once. If the re-connect failed, return the old - # connection and let the error bubble up to the caller. - defp maybe_reconnect(conn) do - if HTTP.open?(conn) do - conn - else - opts = HTTP.get_private(conn, :connect_options) - - with {:ok, new_conn} <- do_connect(opts) do - Logger.warning( - "The connection was closed by the server; a new connection has been successfully reestablished." - ) - - new_conn - else - _ -> conn - end - end - end - - defp do_connect(opts) do - scheme = String.to_existing_atom(opts[:scheme] || "http") - address = opts[:hostname] || "localhost" - port = opts[:port] || 8123 - mint_opts = [mode: :passive] ++ Keyword.take(opts, [:hostname, :transport_opts]) - - with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do - conn = - conn - |> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15)) - |> maybe_put_private(:database, opts[:database]) - |> maybe_put_private(:username, opts[:username]) - |> maybe_put_private(:password, opts[:password]) - |> maybe_put_private(:settings, opts[:settings]) - |> maybe_put_private(:connect_options, opts) - - {:ok, conn} - end - catch - _kind, reason -> {:error, reason} - end - @server_display_name_key :server_display_name @spec ensure_same_server(conn, Mint.Types.headers()) :: conn @@ -459,4 +450,15 @@ defmodule Ch.Connection do conn end end + + # TODO use ssl_record.hrl + defp monitor_socket({:sslsocket, tcp_socket, _}) do + # TODO support :socket + {:gen_tcp, socket, _, _} = tcp_socket + :inet.monitor(socket) + end + + defp monitor_socket(socket) do + :inet.monitor(socket) + end end diff --git a/mix.exs b/mix.exs index 18e3f419..a7145af5 100644 --- a/mix.exs +++ b/mix.exs @@ -34,7 +34,7 @@ defmodule Ch.MixProject do defp deps do [ {:mint, "~> 1.0"}, - {:db_connection, "~> 2.0"}, + {:db_connection, github: "elixir-ecto/db_connection"}, {:jason, "~> 1.0"}, {:decimal, "~> 2.0"}, {:ecto, "~> 3.12", optional: true}, diff --git a/mix.lock b/mix.lock index de987dc3..baf9f2b0 100644 --- a/mix.lock +++ b/mix.lock @@ -1,6 +1,6 @@ %{ "benchee": {:hex, :benchee, "1.3.1", "c786e6a76321121a44229dde3988fc772bca73ea75170a73fd5f4ddf1af95ccf", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}, {:statistex, "~> 1.0", [hex: :statistex, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "76224c58ea1d0391c8309a8ecbfe27d71062878f59bd41a390266bf4ac1cc56d"}, - "db_connection": {:hex, :db_connection, "2.7.0", "b99faa9291bb09892c7da373bb82cba59aefa9b36300f6145c5f201c7adf48ec", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "dcf08f31b2701f857dfc787fbad78223d61a32204f217f15e881dd93e4bdd3ff"}, + "db_connection": {:git, "https://github.com/elixir-ecto/db_connection.git", "625b89fbb33e71e41ba39086ea7f65cd22bf1a3b", []}, "decimal": {:hex, :decimal, "2.2.0", "df3d06bb9517e302b1bd265c1e7f16cda51547ad9d99892049340841f3e15836", [:mix], [], "hexpm", "af8daf87384b51b7e611fb1a1f2c4d4876b65ef968fa8bd3adf44cff401c7f21"}, "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, "dialyxir": {:hex, :dialyxir, "1.4.5", "ca1571ac18e0f88d4ab245f0b60fa31ff1b12cbae2b11bd25d207f865e8ae78a", [:mix], [{:erlex, ">= 0.2.7", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b0fb08bb8107c750db5c0b324fa2df5ceaa0f9307690ee3c1f6ba5b9eb5d35c3"}, diff --git a/test/ch/faults_test.exs b/test/ch/faults_test.exs index 921a1777..ee85e633 100644 --- a/test/ch/faults_test.exs +++ b/test/ch/faults_test.exs @@ -291,7 +291,7 @@ defmodule Ch.FaultsTest do assert_receive :done end) - assert log =~ "disconnected: ** (Mint.TransportError) timeout" + assert log =~ "disconnected: ** (Mint.TransportError) socket closed" end test "reconnects after closed on response", ctx do @@ -340,7 +340,7 @@ defmodule Ch.FaultsTest do assert log =~ "disconnected: ** (Mint.TransportError) socket closed" end - test "reconnects after Connection: close response from server", ctx do + test "reconnects after `connection: close` response from server", ctx do %{port: port, listen: listen, clickhouse: clickhouse} = ctx test = self() @@ -357,7 +357,6 @@ defmodule Ch.FaultsTest do spawn_link(fn -> assert {:ok, %{num_rows: 1, rows: [[2]]}} = Ch.query(conn, "select 1 + 1") - send(test, :done) end) # first select 1 + 1 @@ -372,7 +371,12 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.send(mint, response) :ok = :gen_tcp.close(mint) - assert_receive :done + + IO.puts(""" + ---------------------------- + story starts here ! + ---------------------------- + """) # reconnect {:ok, mint} = :gen_tcp.accept(listen) @@ -382,8 +386,10 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) spawn_link(fn -> + IO.puts("second spawn") + assert {:ok, %{num_rows: 1, rows: [[2]]}} = - Ch.query(conn, "select 1 + 1") + Ch.query(conn, "select 1 + 1") |> IO.inspect(label: "second fun") send(test, :done) end)