From a0c1d701500ea1c92444184a275f3c887b13b83e Mon Sep 17 00:00:00 2001 From: Luke Strickland Date: Sun, 17 Aug 2025 06:18:08 -0400 Subject: [PATCH 1/4] Using a unique GenServer per customer server --- lib/query_canary/application.ex | 8 +- lib/query_canary/connections/adapter.ex | 17 ++ .../connections/{ => adapters}/clickhouse.ex | 20 ++ .../connections/{ => adapters}/mysql.ex | 21 ++ .../connections/{ => adapters}/postgresql.ex | 157 ++++------- .../connections/{ => adapters}/sqlite.ex | 2 + .../connections/connection_manager.ex | 194 ++------------ .../connections/connection_server.ex | 253 ++++++++++++++++++ lib/query_canary/jobs/check_runner.ex | 4 +- lib/query_canary_web/live/check_live/index.ex | 20 +- lib/query_canary_web/router.ex | 3 + mix.exs | 1 + mix.lock | 2 + postgres.exs | 108 ++++++++ 14 files changed, 518 insertions(+), 292 deletions(-) create mode 100644 lib/query_canary/connections/adapter.ex rename lib/query_canary/connections/{ => adapters}/clickhouse.ex (89%) rename lib/query_canary/connections/{ => adapters}/mysql.ex (92%) rename lib/query_canary/connections/{ => adapters}/postgresql.ex (66%) rename lib/query_canary/connections/{ => adapters}/sqlite.ex (98%) create mode 100644 lib/query_canary/connections/connection_server.ex create mode 100644 postgres.exs diff --git a/lib/query_canary/application.ex b/lib/query_canary/application.ex index d5d0ab6..44a77b8 100644 --- a/lib/query_canary/application.ex +++ b/lib/query_canary/application.ex @@ -13,10 +13,10 @@ defmodule QueryCanary.Application do {DNSCluster, query: Application.get_env(:query_canary, :dns_cluster_query) || :ignore}, {Phoenix.PubSub, name: QueryCanary.PubSub}, {Oban, Application.fetch_env!(:query_canary, Oban)}, - # Moved to Oban Cron to prevent multiple nodes from running this - # QueryCanary.CheckScheduler, - # Start a worker by calling: QueryCanary.Worker.start_link(arg) - # {QueryCanary.Worker, arg}, + # Registry for connection servers + {Registry, keys: :unique, name: QueryCanary.ConnectionRegistry}, + # Dynamic supervisor for per-server connection GenServers + {DynamicSupervisor, strategy: :one_for_one, name: QueryCanary.ConnectionSupervisor}, # Start to serve requests, typically the last entry QueryCanaryWeb.Endpoint ] diff --git a/lib/query_canary/connections/adapter.ex b/lib/query_canary/connections/adapter.ex new file mode 100644 index 0000000..9774ece --- /dev/null +++ b/lib/query_canary/connections/adapter.ex @@ -0,0 +1,17 @@ +defmodule QueryCanary.Connections.Adapter do + @moduledoc """ + Behaviour for database connection adapters. + + Adapters wrap a single logical connection (or pooled abstraction) and expose + a small uniform API used by ConnectionServer. + """ + + @callback connect(map) :: {:ok, term} | {:error, term} + @callback query(term, String.t(), list) :: {:ok, term} | {:error, term} + @callback list_tables(term) :: {:ok, list} | {:error, term} + @callback get_table_schema(term, String.t()) :: {:ok, term} | {:error, term} + @callback get_database_schema(term, String.t()) :: {:ok, term} | {:error, term} + @callback disconnect(term) :: :ok | {:error, term} + + @optional_callbacks disconnect: 1 +end diff --git a/lib/query_canary/connections/clickhouse.ex b/lib/query_canary/connections/adapters/clickhouse.ex similarity index 89% rename from lib/query_canary/connections/clickhouse.ex rename to lib/query_canary/connections/adapters/clickhouse.ex index 3808915..15c0d26 100644 --- a/lib/query_canary/connections/clickhouse.ex +++ b/lib/query_canary/connections/adapters/clickhouse.ex @@ -3,6 +3,8 @@ defmodule QueryCanary.Connections.Adapters.ClickHouse do ClickHouse adapter for database connections using the `ch` library. """ + @behaviour QueryCanary.Connections.Adapter + @doc """ Connects to a ClickHouse database using the `ch` library. @@ -106,4 +108,22 @@ defmodule QueryCanary.Connections.Adapters.ClickHouse do {:error, reason} end end + + @doc """ + Disconnects from a ClickHouse database. + + ## Parameters + * pid - The process ID of the connection + + ## Returns + * :ok - Disconnection successful + """ + def disconnect(pid) when is_pid(pid) do + try do + GenServer.stop(pid, :normal) + :ok + catch + _, _ -> :ok + end + end end diff --git a/lib/query_canary/connections/mysql.ex b/lib/query_canary/connections/adapters/mysql.ex similarity index 92% rename from lib/query_canary/connections/mysql.ex rename to lib/query_canary/connections/adapters/mysql.ex index a2abd2e..81578a0 100644 --- a/lib/query_canary/connections/mysql.ex +++ b/lib/query_canary/connections/adapters/mysql.ex @@ -7,6 +7,8 @@ defmodule QueryCanary.Connections.Adapters.MySQL do require Logger + @behaviour QueryCanary.Connections.Adapter + @doc """ Connects to a MySQL database. @@ -167,6 +169,25 @@ defmodule QueryCanary.Connections.Adapters.MySQL do end end + @doc """ + Disconnects from a MySQL database. + + ## Parameters + * pid - Process ID of the connection + + ## Returns + * :ok - Disconnection successful + * {:error, reason} - Disconnection failed + """ + def disconnect(pid) when is_pid(pid) do + try do + GenServer.stop(pid, :normal) + :ok + catch + _, _ -> :ok + end + end + # Formats query results into a more usable structure defp format_results(%MyXQL.Result{} = result) do columns = Enum.map(result.columns || [], &String.to_atom/1) diff --git a/lib/query_canary/connections/postgresql.ex b/lib/query_canary/connections/adapters/postgresql.ex similarity index 66% rename from lib/query_canary/connections/postgresql.ex rename to lib/query_canary/connections/adapters/postgresql.ex index 8906353..c3c8d8e 100644 --- a/lib/query_canary/connections/postgresql.ex +++ b/lib/query_canary/connections/adapters/postgresql.ex @@ -7,6 +7,8 @@ defmodule QueryCanary.Connections.Adapters.PostgreSQL do require Logger + @behaviour QueryCanary.Connections.Adapter + @doc """ Connects to a PostgreSQL database. @@ -18,112 +20,46 @@ defmodule QueryCanary.Connections.Adapters.PostgreSQL do * {:error, reason} - Connection failed """ def connect(conn_details) do + Logger.metadata(db_hostname: conn_details.hostname) Logger.info("QueryCanary.Connections: Connecting to #{conn_details.hostname}") - try do - opts = [ - hostname: conn_details.hostname, - port: conn_details.port, - username: conn_details.username, - password: conn_details.password, - database: conn_details.database, - socket_options: Map.get(conn_details, :socket_options, []), - # Connection pool settings - pool_size: 1, - - # Short timeouts - connect_timeout: 5000, - timeout: 5000, - - # Queue settings to fail fast - queue_target: 50, - queue_interval: 1000, - - # Misc settings - auto_reconnect: false, - max_restarts: 1, - show_sensitive_data_on_connection_error: true, - name: :"db_conn_#{System.unique_integer([:positive])}" - ] - - # Build advanced SSL options if present - ssl_mode = Map.get(conn_details, :ssl_mode, "allow") - - ssl_opts = - [ - # Map ssl_mode to verify options - verify: - case ssl_mode do - "verify-full" -> :verify_peer - "verify-ca" -> :verify_peer - "require" -> :verify_none - "prefer" -> :verify_none - "allow" -> :verify_none - _ -> :verify_none - end - ] - |> maybe_add_ssl_cert(conn_details) - |> maybe_add_ssl_key(conn_details) - |> maybe_add_ssl_ca_cert(conn_details) - |> Enum.reject(&is_nil/1) - - opts = opts ++ [ssl: ssl_opts] - - {:ok, pid} = Postgrex.start_link(opts) - - case query(pid, "SELECT 1;") do - {:ok, _res} -> - Logger.info( - "QueryCanary.Connections: Successfully connected to #{conn_details.hostname}" - ) - - {:ok, pid} - - {:error, _message} when ssl_mode in ["allow", "prefer"] -> - GenServer.stop(pid) - - Logger.info( - "PostgreSQL connection with SSL failed to #{conn_details.hostname}, retrying without SSL" - ) - - # Retry without SSL - opts_no_ssl = Keyword.delete(opts, :ssl) - - opts_no_ssl = - Keyword.put(opts_no_ssl, :name, :"db_conn_#{System.unique_integer([:positive])}") - - # opts_no_ssl = Keyword.delete(opts_no_ssl, :ssl_opts) - {:ok, no_ssl_pid} = Postgrex.start_link(opts_no_ssl) - - case query(no_ssl_pid, "SELECT 1;") do - {:ok, _res} -> - Logger.info( - "QueryCanary.Connections: Successfully connected to #{conn_details.hostname}, with non-SSL fallback" - ) - - {:ok, no_ssl_pid} - - error -> - Logger.warning( - "QueryCanary.Connections: Failed to connect to #{conn_details.hostname}, even without SSL" - ) - - GenServer.stop(no_ssl_pid) - {:error, "Failed to connect, even without SSL: #{inspect(error)}"} + # try do + Process.flag(:trap_exit, true) + + opts = [ + hostname: conn_details.hostname, + port: conn_details.port, + username: conn_details.username, + password: conn_details.password, + database: conn_details.database, + socket_options: Map.get(conn_details, :socket_options, []), + name: :"db_conn_#{System.unique_integer([:positive])}" + ] + + # Build advanced SSL options if present + ssl_mode = Map.get(conn_details, :ssl_mode, "allow") + + ssl_opts = + [ + # Map ssl_mode to verify options + verify: + case ssl_mode do + "verify-full" -> :verify_peer + "verify-ca" -> :verify_peer + "require" -> :verify_none + "prefer" -> :verify_none + "allow" -> :verify_none + _ -> :verify_none end + ] + |> maybe_add_ssl_cert(conn_details) + |> maybe_add_ssl_key(conn_details) + |> maybe_add_ssl_ca_cert(conn_details) + |> Enum.reject(&is_nil/1) - error -> - Logger.warning( - "QueryCanary.Connections: Failed to connect to #{conn_details.hostname}, no SSL attempted" - ) + opts = opts ++ [ssl: ssl_opts] - GenServer.stop(pid) - {:error, "Failed to connect: #{inspect(error)}"} - end - rescue - e -> - {:error, "PostgreSQL connection error: #{inspect(e)}"} - end + Postgrex.start_link(opts) end defp maybe_add_ssl_cert(opts, conn_details) do @@ -338,4 +274,23 @@ defmodule QueryCanary.Connections.Adapters.PostgreSQL do raw: result } end + + @doc """ + Disconnects from a PostgreSQL database. + + ## Parameters + * pid - Process ID of the connection + + ## Returns + * :ok - Disconnection successful + * :error - Disconnection failed + """ + def disconnect(pid) when is_pid(pid) do + try do + GenServer.stop(pid, :normal) + :ok + catch + _, _ -> :ok + end + end end diff --git a/lib/query_canary/connections/sqlite.ex b/lib/query_canary/connections/adapters/sqlite.ex similarity index 98% rename from lib/query_canary/connections/sqlite.ex rename to lib/query_canary/connections/adapters/sqlite.ex index 89adee2..88d824b 100644 --- a/lib/query_canary/connections/sqlite.ex +++ b/lib/query_canary/connections/adapters/sqlite.ex @@ -8,6 +8,8 @@ defmodule QueryCanary.Connections.Adapters.SQLite do require Logger + @behaviour QueryCanary.Connections.Adapter + @doc """ Connects to an SQLite database. diff --git a/lib/query_canary/connections/connection_manager.ex b/lib/query_canary/connections/connection_manager.ex index 4ceb626..a0a98ad 100644 --- a/lib/query_canary/connections/connection_manager.ex +++ b/lib/query_canary/connections/connection_manager.ex @@ -2,14 +2,13 @@ defmodule QueryCanary.Connections.ConnectionManager do @moduledoc """ Manages database connections with support for different engines and SSH tunneling. - This module provides an adapter pattern for interacting with various database engines - while abstracting away the complexity of SSH tunneling and connection pooling. + Refactored to use persistent per-server ConnectionServer processes. """ require Logger alias QueryCanary.Servers.Server - alias QueryCanary.Connections.SSHTunnel + alias QueryCanary.Connections.ConnectionServer @doc """ Tests a database connection with optional SSH tunneling. @@ -22,7 +21,13 @@ defmodule QueryCanary.Connections.ConnectionManager do * {:error, reason} - Connection failed with reason """ def test_connection(%Server{} = server) do - run_query(server, "SELECT NOW();") + with {:ok, _pid} <- ConnectionServer.ensure_started(server), + {:ok, _} <- ConnectionServer.query(server.id, "SELECT 1") do + :ok + else + {:error, reason} -> {:error, reason} + other -> other + end end @doc """ @@ -38,18 +43,10 @@ defmodule QueryCanary.Connections.ConnectionManager do * {:error, reason} - Query failed with reason """ def run_query(%Server{} = server, query, params \\ []) do - Logger.info("run_query #{server.name}") - - with {:ok, conn_details} <- prepare_connection(server), - {:ok, conn} <- get_adapter(server).connect(conn_details), - {:ok, results} <- get_adapter(server).query(conn, query, params) do - {:ok, results} - else - {:error, reason} -> {:error, reason} + with {:ok, _pid} <- ConnectionServer.ensure_started(server), + reply <- ConnectionServer.query(server.id, query, params) do + reply end - after - # Always close any SSH tunnels that might have been opened - cleanup_resources(server) end @doc """ @@ -63,17 +60,9 @@ defmodule QueryCanary.Connections.ConnectionManager do * {:error, reason} - Operation failed with reason """ def list_tables(%Server{} = server) do - Logger.info("list_tables #{server.name}") - - with {:ok, conn_details} <- prepare_connection(server), - {:ok, conn} <- get_adapter(server).connect(conn_details), - {:ok, tables} <- get_adapter(server).list_tables(conn) do - {:ok, tables} - else - {:error, reason} -> {:error, reason} + with {:ok, _pid} <- ConnectionServer.ensure_started(server) do + ConnectionServer.list_tables(server.id) end - after - cleanup_resources(server) end @doc """ @@ -88,159 +77,8 @@ defmodule QueryCanary.Connections.ConnectionManager do * {:error, reason} - Operation failed with reason """ def get_database_schema(%Server{} = server) do - Logger.info("get_database_schema #{server.name}") - - with {:ok, conn_details} <- prepare_connection(server), - {:ok, conn} <- get_adapter(server).connect(conn_details), - {:ok, schema} <- get_adapter(server).get_database_schema(conn, server.db_name) do - {:ok, schema} - else - {:error, reason} -> {:error, reason} - end - after - cleanup_resources(server) - end - - # Private functions - - # Sets up an SSH tunnel if enabled, and returns modified connection details - defp prepare_connection(%Server{ssh_tunnel: true} = server) do - # Decrypt any encrypted credentials from the server - server = decrypt_credentials(server) - - ssh_opts = %{ - host: server.ssh_hostname, - port: server.ssh_port, - user: server.ssh_username, - private_key: server.ssh_private_key - } - - target_opts = %{ - host: server.db_hostname, - port: server.db_port - } - - case SSHTunnel.start_tunnel(ssh_opts, target_opts) do - {:ok, {_conn, port} = tunnel_ref} -> - # Store the tunnel reference in the process dictionary - # so we can clean it up later - Process.put(:ssh_tunnel_ref, tunnel_ref) - - # Return connection details that point to the local tunnel endpoint - {:ok, - %{ - hostname: "127.0.0.1", - port: port, - username: server.db_username, - password: server.db_password, - database: server.db_name, - ssl_mode: server.db_ssl_mode, - ssl_cert: server.db_ssl_cert, - ssl_key: server.db_ssl_key, - ssl_ca_cert: server.db_ssl_ca_cert, - socket_options: [] - }} - - {:error, reason} -> - {:error, "SSH tunnel failed: #{inspect(reason)}"} - end - end - - # Returns standard connection details for direct connections - defp prepare_connection(%Server{} = server) do - # Decrypt any encrypted credentials from the server - server = decrypt_credentials(server) - - {:ok, - %{ - hostname: server.db_hostname, - port: server.db_port, - username: server.db_username, - password: server.db_password, - database: server.db_name, - ssl: true, - socket_options: socket_options(server) - }} - end - - defp socket_options(%Server{} = server) do - case :inet_res.gethostbyname(String.to_charlist(server.db_hostname), :inet6) do - {:ok, {:hostent, _host, [], _, _, _}} -> - [:inet6] - - _ -> - [] - end - end - - # Returns the appropriate database adapter module based on db_engine - defp get_adapter(%Server{db_engine: "postgresql"}), - do: QueryCanary.Connections.Adapters.PostgreSQL - - defp get_adapter(%Server{db_engine: "mysql"}), - do: QueryCanary.Connections.Adapters.MySQL - - defp get_adapter(%Server{db_engine: "clickhouse"}), - do: QueryCanary.Connections.Adapters.ClickHouse - - defp get_adapter(%Server{db_engine: engine}), - do: raise("Unsupported database engine: #{engine}") - - # Clean up any resources like SSH tunnels - # Clean up any resources like SSH tunnels and database connections - defp cleanup_resources(_server) do - # Clean up SSH tunnel if exists - case Process.get(:ssh_tunnel_ref) do - nil -> - :ok - - tunnel_ref -> - SSHTunnel.stop_tunnel(tunnel_ref) - Process.delete(:ssh_tunnel_ref) - end - - # Clean up database connection if exists - case Process.get(:db_connection_pid) do - nil -> - :ok - - db_conn -> - # Safely disconnect based on adapter type - cond do - is_pid(db_conn) and Process.alive?(db_conn) -> - try do - GenServer.stop(db_conn, :normal, 5000) - catch - _, _ -> :ok - end - - true -> - :ok - end - - Process.delete(:db_connection_pid) - end - end - - # Decrypt sensitive credentials from the server - defp decrypt_credentials(server) do - %{ - server - | db_password: decrypt_if_needed(server.db_password, "db_password"), - ssh_private_key: decrypt_if_needed(server.ssh_private_key, "ssh_private_key") - } - end - - defp decrypt_if_needed(nil, _), do: nil - - defp decrypt_if_needed(encrypted, salt) do - case Phoenix.Token.decrypt(QueryCanaryWeb.Endpoint, salt, encrypted, max_age: :infinity) do - {:ok, decrypted} -> - decrypted - - # Return as-is if we can't decrypt - {:error, _} -> - encrypted + with {:ok, _pid} <- ConnectionServer.ensure_started(server) do + ConnectionServer.get_database_schema(server.id) end end end diff --git a/lib/query_canary/connections/connection_server.ex b/lib/query_canary/connections/connection_server.ex new file mode 100644 index 0000000..50afa19 --- /dev/null +++ b/lib/query_canary/connections/connection_server.ex @@ -0,0 +1,253 @@ +defmodule QueryCanary.Connections.ConnectionServer do + @moduledoc """ + Persistent per-customer database connection process. + + Responsibilities: + * Establish and maintain a single adapter connection + * Optional auto-reconnect with backoff + * Execute queries via adapter + * Manage SSH tunnels lifecycle + * Provide status/metadata + """ + use GenServer + require Logger + + alias QueryCanary.Servers.Server + alias QueryCanary.Connections.SSHTunnel + + @type server_id :: any + + # Public API + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: via(opts[:server_id])) + end + + def ensure_started(%Server{id: id} = server), do: ensure_started(id, server) + + def ensure_started(id, %Server{} = server) do + case GenServer.whereis(via(id)) do + nil -> DynamicSupervisor.start_child(QueryCanary.ConnectionSupervisor, child_spec(server)) + pid -> {:ok, pid} + end + end + + def query(server_id, sql, params \\ []) do + GenServer.call(via(server_id), {:query, sql, params}) + end + + def list_tables(server_id), do: GenServer.call(via(server_id), :list_tables) + def get_database_schema(server_id), do: GenServer.call(via(server_id), :get_database_schema) + def status(server_id), do: GenServer.call(via(server_id), :status) + def disconnect(server_id), do: GenServer.call(via(server_id), :disconnect) + def refresh(server_id), do: GenServer.cast(via(server_id), :refresh) + + def child_spec(%Server{} = server) do + %{ + id: {:connection_server, server.id}, + start: {__MODULE__, :start_link, [[server: server, server_id: server.id]]}, + restart: :transient, + shutdown: 15_000, + type: :worker + } + end + + # GenServer callbacks + def init(opts) do + server = Keyword.fetch!(opts, :server) + + state = %{ + server: server, + server_id: server.id, + adapter: adapter_for(server), + adapter_conn: nil, + tunnel_ref: nil, + status: :starting, + last_error: nil, + auto_reconnect: Application.get_env(:query_canary, :connection_auto_reconnect, true), + backoff: 500, + max_backoff: 10_000 + } + + send(self(), :connect) + {:ok, state} + end + + def handle_info(:connect, state) do + case establish(state.server, state.adapter) do + {:ok, adapter_conn, tunnel_ref} -> + Logger.metadata(server_id: state.server_id) + Logger.info("Connection established to #{state.server_id}") + + {:noreply, + %{ + state + | adapter_conn: adapter_conn, + tunnel_ref: tunnel_ref, + status: :connected, + last_error: nil, + backoff: 500 + }} + + {:error, reason} -> + Logger.warning("Connection failed: #{inspect(reason)} to #{state.server_id}") + + if state.auto_reconnect do + Process.send_after(self(), :connect, state.backoff) + next_backoff = min(state.backoff * 2, state.max_backoff) + + {:noreply, + %{state | status: {:error, reason}, last_error: reason, backoff: next_backoff}} + else + {:noreply, %{state | status: {:error, reason}, last_error: reason}} + end + end + end + + def handle_cast(:refresh, state) do + cleanup(state) + send(self(), :connect) + {:noreply, %{state | status: :refreshing, adapter_conn: nil, tunnel_ref: nil}} + end + + def handle_call(:status, _from, state), do: {:reply, state.status, state} + + def handle_call(:disconnect, _from, state) do + cleanup(state) + {:reply, :ok, %{state | status: :disconnected, adapter_conn: nil, tunnel_ref: nil}} + end + + def handle_call({:query, _sql, _params}, _from, %{status: status} = state) + when status != :connected do + {:reply, {:error, :not_connected}, state} + end + + def handle_call({:query, sql, params}, _from, state) do + reply = state.adapter.query(state.adapter_conn, sql, params) + {:reply, reply, state} + end + + def handle_call(:list_tables, _from, %{status: :connected} = state) do + {:reply, state.adapter.list_tables(state.adapter_conn), state} + end + + def handle_call(:list_tables, _from, state), do: {:reply, {:error, :not_connected}, state} + + def handle_call(:get_database_schema, _from, %{status: :connected, server: server} = state) do + {:reply, state.adapter.get_database_schema(state.adapter_conn, server.db_name), state} + end + + def handle_call(:get_database_schema, _from, state), + do: {:reply, {:error, :not_connected}, state} + + def terminate(_reason, state) do + cleanup(state) + :ok + end + + # Helpers + defp establish(server, adapter) do + with {:ok, conn_details, tunnel_ref} <- prepare(server), + {:ok, adapter_conn} <- adapter.connect(conn_details) do + {:ok, adapter_conn, tunnel_ref} + end + end + + defp prepare(%Server{ssh_tunnel: true} = server) do + server = decrypt(server) + + ssh_opts = %{ + host: server.ssh_hostname, + port: server.ssh_port, + user: server.ssh_username, + private_key: server.ssh_private_key + } + + target_opts = %{host: server.db_hostname, port: server.db_port} + + case SSHTunnel.start_tunnel(ssh_opts, target_opts) do + {:ok, {_conn, port} = ref} -> + {:ok, + base_conn_details(server) |> Map.put(:hostname, "127.0.0.1") |> Map.put(:port, port), + ref} + + {:error, reason} -> + {:error, {:ssh_tunnel_failed, reason}} + end + end + + defp prepare(%Server{} = server), do: {:ok, base_conn_details(decrypt(server)), nil} + + defp base_conn_details(server) do + %{ + hostname: server.db_hostname, + port: server.db_port, + username: server.db_username, + password: server.db_password, + database: server.db_name, + ssl_mode: server.db_ssl_mode, + ssl_cert: server.db_ssl_cert, + ssl_key: server.db_ssl_key, + ssl_ca_cert: server.db_ssl_ca_cert, + socket_options: socket_options(server) + } + end + + defp decrypt(server) do + %{ + server + | db_password: decrypt_if_needed(server.db_password, "db_password"), + ssh_private_key: decrypt_if_needed(server.ssh_private_key, "ssh_private_key") + } + end + + defp decrypt_if_needed(nil, _), do: nil + + defp decrypt_if_needed(encrypted, salt) do + case Phoenix.Token.decrypt(QueryCanaryWeb.Endpoint, salt, encrypted, max_age: :infinity) do + {:ok, decrypted} -> decrypted + {:error, _} -> encrypted + end + end + + defp socket_options(server) do + case :inet_res.gethostbyname(String.to_charlist(server.db_hostname), :inet6) do + {:ok, {:hostent, _host, [], _, _, _}} -> [:inet6] + _ -> [] + end + end + + defp cleanup(state) do + if ref = state.tunnel_ref do + SSHTunnel.stop_tunnel(ref) + end + + if conn = state.adapter_conn do + safe_disconnect(state.adapter, conn) + end + end + + defp safe_disconnect(adapter, conn) do + if function_exported?(adapter, :disconnect, 1) do + try do + adapter.disconnect(conn) + catch + _, _ -> :ok + end + else + if is_pid(conn) and Process.alive?(conn), do: GenServer.stop(conn, :normal) + end + end + + defp adapter_for(%Server{db_engine: "postgresql"}), + do: QueryCanary.Connections.Adapters.PostgreSQL + + defp adapter_for(%Server{db_engine: "mysql"}), do: QueryCanary.Connections.Adapters.MySQL + + defp adapter_for(%Server{db_engine: "clickhouse"}), + do: QueryCanary.Connections.Adapters.ClickHouse + + defp adapter_for(%Server{db_engine: other}), do: raise("Unsupported database engine: #{other}") + + defp via(server_id), + do: {:via, Registry, {QueryCanary.ConnectionRegistry, {:server, server_id}}} +end diff --git a/lib/query_canary/jobs/check_runner.ex b/lib/query_canary/jobs/check_runner.ex index b7508e3..a9a534b 100644 --- a/lib/query_canary/jobs/check_runner.ex +++ b/lib/query_canary/jobs/check_runner.ex @@ -1,5 +1,7 @@ defmodule QueryCanary.Jobs.CheckRunner do - use Oban.Worker, queue: :checks + use Oban.Worker, + queue: :checks, + max_attempts: 1 alias QueryCanary.Checks diff --git a/lib/query_canary_web/live/check_live/index.ex b/lib/query_canary_web/live/check_live/index.ex index 8e76f60..cb8999a 100644 --- a/lib/query_canary_web/live/check_live/index.ex +++ b/lib/query_canary_web/live/check_live/index.ex @@ -64,17 +64,21 @@ defmodule QueryCanaryWeb.CheckLive.Index do
{check.query}
<:col :let={check} label="Status"> - <%= if check.last_result do %> - <%= if check.last_result.success do %> - Success + <%= if check.enabled do %> + <%= if check.last_result do %> + <%= if check.last_result.success do %> + Success + <% else %> + Failed + <% end %> +
+ {format_time_ago(check.last_run_at)} +
<% else %> - Failed + Pending <% end %> -
- {format_time_ago(check.last_run_at)} -
<% else %> - Pending + Disabled <% end %> <:col :let={check} label="Alert Status"> diff --git a/lib/query_canary_web/router.ex b/lib/query_canary_web/router.ex index 7a4e8a6..6345775 100644 --- a/lib/query_canary_web/router.ex +++ b/lib/query_canary_web/router.ex @@ -2,6 +2,7 @@ defmodule QueryCanaryWeb.Router do use QueryCanaryWeb, :router import QueryCanaryWeb.UserAuth + import Oban.Web.Router pipeline :browser do plug :accepts, ["html"] @@ -43,6 +44,8 @@ defmodule QueryCanaryWeb.Router do live_dashboard "/dashboard", metrics: QueryCanaryWeb.Telemetry forward "/mailbox", Plug.Swoosh.MailboxPreview + + oban_dashboard("/oban") end end diff --git a/mix.exs b/mix.exs index 6208b2b..c322fa2 100644 --- a/mix.exs +++ b/mix.exs @@ -69,6 +69,7 @@ defmodule QueryCanary.MixProject do # Scheduling {:crontab, "~> 1.1"}, {:oban, "~> 2.19"}, + {:oban_web, "~> 2.11"}, # Billing {:stripity_stripe, "~> 3.2"}, # Emails diff --git a/mix.lock b/mix.lock index 9e5ddc5..4f17174 100644 --- a/mix.lock +++ b/mix.lock @@ -38,6 +38,8 @@ "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, "oban": {:hex, :oban, "2.19.4", "045adb10db1161dceb75c254782f97cdc6596e7044af456a59decb6d06da73c1", [:mix], [{:ecto_sql, "~> 3.10", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:ecto_sqlite3, "~> 0.9", [hex: :ecto_sqlite3, repo: "hexpm", optional: true]}, {:igniter, "~> 0.5", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:myxql, "~> 0.7", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.16", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "5fcc6219e6464525b808d97add17896e724131f498444a292071bf8991c99f97"}, + "oban_met": {:hex, :oban_met, "1.0.3", "ea8f7a4cef3c8a7aef3b900b4458df46e83508dcbba9374c75dd590efda7a32a", [:mix], [{:oban, "~> 2.19", [hex: :oban, repo: "hexpm", optional: false]}], "hexpm", "23db1a0ee58b93afe324b221530594bdf3647a9bd4e803af762c3e00ad74b9cf"}, + "oban_web": {:hex, :oban_web, "2.11.4", "49e92e131a1d5946b6c2669e24fcc094d3c36fe431c776969b7c3a1f2e258ccd", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: false]}, {:oban, "~> 2.19", [hex: :oban, repo: "hexpm", optional: false]}, {:oban_met, "~> 1.0", [hex: :oban_met, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.7", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 3.3 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: false]}, {:phoenix_live_view, "~> 1.0", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}], "hexpm", "deb38825311f53cee5fc89c3ea78e0a2a60095b63643517649f76fb5563031db"}, "parse_trans": {:hex, :parse_trans, "3.4.1", "6e6aa8167cb44cc8f39441d05193be6e6f4e7c2946cb2759f015f8c56b76e5ff", [:rebar3], [], "hexpm", "620a406ce75dada827b82e453c19cf06776be266f5a67cff34e1ef2cbb60e49a"}, "phoenix": {:hex, :phoenix, "1.8.0-rc.3", "6ae19e57b9c109556f1b8abdb992d96d443b0ae28e03b604f3dc6c75d9f7d35f", [:mix], [{:bandit, "~> 1.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "419422afc33e965c0dbf181cbedc77b4cfd024dac0db7d9d2287656043d48e24"}, "phoenix_ecto": {:hex, :phoenix_ecto, "4.6.4", "dcf3483ab45bab4c15e3a47c34451392f64e433846b08469f5d16c2a4cd70052", [:mix], [{:ecto, "~> 3.5", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.1", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:plug, "~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.16 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}], "hexpm", "f5b8584c36ccc9b903948a696fc9b8b81102c79c7c0c751a9f00cdec55d5f2d7"}, diff --git a/postgres.exs b/postgres.exs new file mode 100644 index 0000000..f7dde3e --- /dev/null +++ b/postgres.exs @@ -0,0 +1,108 @@ +Mix.install([ + {:postgrex, "~> 0.21.1"} +]) + +defmodule Main do + require Logger + + def test do + {:ok, pid} = + Postgrex.start_link( + database: "not_real", + + # Short timeouts + connect_timeout: 5000, + timeout: 5000, + + # Queue settings to fail fast + queue_target: 50, + queue_interval: 1000, + max_restarts: 1, + show_sensitive_data_on_connection_error: true, + name: :"test_conn_#{System.unique_integer([:positive])}" + ) + + case Postgrex.query(pid, "SELECT 1;") do + {:error, unhelpful_error} -> + GenServer.stop(pid) |> dbg() + + {:ok, pid} = + Postgrex.start_link( + database: "query_canary_dev", + + # Short timeouts + connect_timeout: 5000, + timeout: 5000, + + # Queue settings to fail fast + queue_target: 50, + queue_interval: 1000, + max_restarts: 1, + show_sensitive_data_on_connection_error: true, + name: :"test_conn_#{System.unique_integer([:positive])}" + ) + + Postgrex.query(pid, "SELECT 1") |> dbg() + end + end + + def test_simple do + # Start the connection + + # Execute a literal query + try do + Process.flag(:trap_exit, true) + + {:ok, pid} = + Postgrex.SimpleConnection.start_link(MyConnection, [], + database: "query_canary_dev" + # username: "foobar" + ) + |> dbg() + + Postgrex.SimpleConnection.call(pid, {:query, "SELECT 1"}) |> dbg() + + # Postgrex.query(pid, "SELECT 1") |> dbg() + rescue + e -> + dbg(e) + end + + # => %Postgrex.Result{rows: [["1"]]} + end +end + +defmodule MyConnection do + @behaviour Postgrex.SimpleConnection + + @impl true + def init(_args) do + {:ok, %{from: nil}} + end + + @impl true + def handle_call({:query, query}, from, state) do + {:query, query, %{state | from: from}} + end + + @impl true + def handle_result(results, state) when is_list(results) do + Postgrex.SimpleConnection.reply(state.from, results) + + {:noreply, state} + end + + @impl true + def handle_result(%Postgrex.Error{} = error, state) do + Postgrex.SimpleConnection.reply(state.from, error) + + {:noreply, state} + end + + @impl true + def notify(_binary, _binary, _state) do + :ok + end +end + +Main.test_simple() From 86cef68566e8e4453d45513261b4e3a08c03837c Mon Sep 17 00:00:00 2001 From: Luke Strickland Date: Sun, 17 Aug 2025 06:19:35 -0400 Subject: [PATCH 2/4] Remove testing file --- postgres.exs | 108 --------------------------------------------------- 1 file changed, 108 deletions(-) delete mode 100644 postgres.exs diff --git a/postgres.exs b/postgres.exs deleted file mode 100644 index f7dde3e..0000000 --- a/postgres.exs +++ /dev/null @@ -1,108 +0,0 @@ -Mix.install([ - {:postgrex, "~> 0.21.1"} -]) - -defmodule Main do - require Logger - - def test do - {:ok, pid} = - Postgrex.start_link( - database: "not_real", - - # Short timeouts - connect_timeout: 5000, - timeout: 5000, - - # Queue settings to fail fast - queue_target: 50, - queue_interval: 1000, - max_restarts: 1, - show_sensitive_data_on_connection_error: true, - name: :"test_conn_#{System.unique_integer([:positive])}" - ) - - case Postgrex.query(pid, "SELECT 1;") do - {:error, unhelpful_error} -> - GenServer.stop(pid) |> dbg() - - {:ok, pid} = - Postgrex.start_link( - database: "query_canary_dev", - - # Short timeouts - connect_timeout: 5000, - timeout: 5000, - - # Queue settings to fail fast - queue_target: 50, - queue_interval: 1000, - max_restarts: 1, - show_sensitive_data_on_connection_error: true, - name: :"test_conn_#{System.unique_integer([:positive])}" - ) - - Postgrex.query(pid, "SELECT 1") |> dbg() - end - end - - def test_simple do - # Start the connection - - # Execute a literal query - try do - Process.flag(:trap_exit, true) - - {:ok, pid} = - Postgrex.SimpleConnection.start_link(MyConnection, [], - database: "query_canary_dev" - # username: "foobar" - ) - |> dbg() - - Postgrex.SimpleConnection.call(pid, {:query, "SELECT 1"}) |> dbg() - - # Postgrex.query(pid, "SELECT 1") |> dbg() - rescue - e -> - dbg(e) - end - - # => %Postgrex.Result{rows: [["1"]]} - end -end - -defmodule MyConnection do - @behaviour Postgrex.SimpleConnection - - @impl true - def init(_args) do - {:ok, %{from: nil}} - end - - @impl true - def handle_call({:query, query}, from, state) do - {:query, query, %{state | from: from}} - end - - @impl true - def handle_result(results, state) when is_list(results) do - Postgrex.SimpleConnection.reply(state.from, results) - - {:noreply, state} - end - - @impl true - def handle_result(%Postgrex.Error{} = error, state) do - Postgrex.SimpleConnection.reply(state.from, error) - - {:noreply, state} - end - - @impl true - def notify(_binary, _binary, _state) do - :ok - end -end - -Main.test_simple() From 58e621ebe51b1e6aa5059e0f4f31e47b64802b1f Mon Sep 17 00:00:00 2001 From: Luke Strickland Date: Sun, 17 Aug 2025 06:22:29 -0400 Subject: [PATCH 3/4] Fixing SQLite --- .../connections/adapters/sqlite.ex | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/lib/query_canary/connections/adapters/sqlite.ex b/lib/query_canary/connections/adapters/sqlite.ex index 88d824b..535aadf 100644 --- a/lib/query_canary/connections/adapters/sqlite.ex +++ b/lib/query_canary/connections/adapters/sqlite.ex @@ -135,24 +135,16 @@ defmodule QueryCanary.Connections.Adapters.SQLite do @doc """ Gets complete database schema information. - ## Parameters - * conn - Database connection - - ## Returns - * {:ok, schema} - Complete database schema information - * {:error, reason} - Operation failed + Accepts the database name for behaviour conformity; ignored for SQLite. """ - def get_database_schema(conn) do + def get_database_schema(conn, _database_name) do case list_tables(conn) do {:ok, tables} -> schema = Enum.reduce(tables, %{}, fn table, acc -> case get_table_schema(conn, table) do - {:ok, table_schema} -> - Map.put(acc, table, table_schema) - - {:error, _reason} -> - acc + {:ok, table_schema} -> Map.put(acc, table, table_schema) + {:error, _} -> acc end end) @@ -163,6 +155,8 @@ defmodule QueryCanary.Connections.Adapters.SQLite do end end + def get_database_schema(conn), do: get_database_schema(conn, nil) + # Formats query results into a more usable structure defp format_results(%Exqlite.Result{} = result) do columns = Enum.map(result.columns, &String.to_atom/1) From 9d5dde73e4e815c37c26e03b3a5515705ef9fcd4 Mon Sep 17 00:00:00 2001 From: Luke Strickland Date: Sun, 17 Aug 2025 07:25:20 -0400 Subject: [PATCH 4/4] Cleaning up postgres --- .../connections/adapters/postgresql.ex | 50 ++++++++++--------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/lib/query_canary/connections/adapters/postgresql.ex b/lib/query_canary/connections/adapters/postgresql.ex index c3c8d8e..37065fc 100644 --- a/lib/query_canary/connections/adapters/postgresql.ex +++ b/lib/query_canary/connections/adapters/postgresql.ex @@ -23,9 +23,6 @@ defmodule QueryCanary.Connections.Adapters.PostgreSQL do Logger.metadata(db_hostname: conn_details.hostname) Logger.info("QueryCanary.Connections: Connecting to #{conn_details.hostname}") - # try do - Process.flag(:trap_exit, true) - opts = [ hostname: conn_details.hostname, port: conn_details.port, @@ -37,27 +34,32 @@ defmodule QueryCanary.Connections.Adapters.PostgreSQL do ] # Build advanced SSL options if present - ssl_mode = Map.get(conn_details, :ssl_mode, "allow") - - ssl_opts = - [ - # Map ssl_mode to verify options - verify: - case ssl_mode do - "verify-full" -> :verify_peer - "verify-ca" -> :verify_peer - "require" -> :verify_none - "prefer" -> :verify_none - "allow" -> :verify_none - _ -> :verify_none - end - ] - |> maybe_add_ssl_cert(conn_details) - |> maybe_add_ssl_key(conn_details) - |> maybe_add_ssl_ca_cert(conn_details) - |> Enum.reject(&is_nil/1) - - opts = opts ++ [ssl: ssl_opts] + ssl_mode = Map.get(conn_details, :ssl_mode) + + opts = + if ssl_mode do + ssl_opts = + [ + # Map ssl_mode to verify options + verify: + case ssl_mode do + "verify-full" -> :verify_peer + "verify-ca" -> :verify_peer + "require" -> :verify_none + "prefer" -> :verify_none + "allow" -> :verify_none + _ -> :verify_none + end + ] + |> maybe_add_ssl_cert(conn_details) + |> maybe_add_ssl_key(conn_details) + |> maybe_add_ssl_ca_cert(conn_details) + |> Enum.reject(&is_nil/1) + + opts ++ ssl_opts + else + opts + end Postgrex.start_link(opts) end