diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ebc556..0984430 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - add [Time](https://clickhouse.com/docs/sql-reference/data-types/time) and [Time64](https://clickhouse.com/docs/sql-reference/data-types/time64) types support https://github.com/plausible/ch/pull/260 - add [Variant](https://clickhouse.com/docs/sql-reference/data-types/variant) type support https://github.com/plausible/ch/pull/263 +- add [JSON](https://clickhouse.com/docs/sql-reference/data-types/newjson) type support https://github.com/plausible/ch/pull/262 ## 0.4.1 (2025-07-07) diff --git a/lib/ch.ex b/lib/ch.ex index 1173724..9575938 100644 --- a/lib/ch.ex +++ b/lib/ch.ex @@ -149,6 +149,7 @@ defmodule Ch do def cast(value, {:datetime64, _p}), do: Ecto.Type.cast(:naive_datetime_usec, value) def cast(value, {:datetime64, _p, "UTC"}), do: Ecto.Type.cast(:utc_datetime_usec, value) def cast(value, {:fixed_string, _s}), do: Ecto.Type.cast(:string, value) + def cast(value, :json), do: Ecto.Type.cast(:map, value) for size <- [8, 16, 32, 64, 128, 256] do def cast(value, unquote(:"i#{size}")), do: Ecto.Type.cast(:integer, value) diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index 40970ae..f94b50a 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -13,13 +13,25 @@ defmodule Ch.Connection do @spec connect([Ch.start_option()]) :: {:ok, conn} | {:error, Error.t() | Mint.Types.error()} def connect(opts) do with {:ok, conn} <- do_connect(opts) do - handshake = Query.build("select 1") + handshake = Query.build("select 1, version()") params = DBConnection.Query.encode(handshake, _params = [], _opts = []) case handle_execute(handshake, params, _opts = [], conn) do {:ok, handshake, responses, conn} -> case DBConnection.Query.decode(handshake, responses, _opts = []) do - %Result{rows: [[1]]} -> + %Result{rows: [[1, version]]} -> + conn = + if version >= "24.10" do + settings = + HTTP.get_private(conn, :settings, []) + |> Keyword.put_new(:input_format_binary_read_json_as_string, 1) + |> Keyword.put_new(:output_format_binary_write_json_as_string, 1) + + HTTP.put_private(conn, :settings, settings) + else + conn + end + {:ok, conn} result -> @@ -405,7 +417,8 @@ defmodule Ch.Connection do "The connection was closed by the server; a new connection has been successfully reestablished." ) - new_conn + # copy settings that are set dynamically (e.g. json as text) over to the new connection + maybe_put_private(new_conn, :settings, HTTP.get_private(conn, :settings)) else _ -> conn end diff --git a/lib/ch/row_binary.ex b/lib/ch/row_binary.ex index 434e183..fbd77d4 100644 --- a/lib/ch/row_binary.ex +++ b/lib/ch/row_binary.ex @@ -93,6 +93,7 @@ defmodule Ch.RowBinary do when t in [ :string, :binary, + :json, :boolean, :uuid, :date, @@ -191,6 +192,13 @@ defmodule Ch.RowBinary do end end + def encode(:json, json) do + # assuming it can be sent as text and not "native" binary JSON + # i.e. assumes `settings: [input_format_binary_read_json_as_string: 1]` + # TODO + encode(:string, Jason.encode_to_iodata!(json)) + end + def encode({:fixed_string, size}, str) when byte_size(str) == size do str end @@ -598,6 +606,7 @@ defmodule Ch.RowBinary do when t in [ :string, :binary, + :json, :boolean, :uuid, :date, @@ -769,6 +778,20 @@ defmodule Ch.RowBinary do defp utf8_size(codepoint) when codepoint <= 0xFFFF, do: 3 defp utf8_size(codepoint) when codepoint <= 0x10FFFF, do: 4 + @compile inline: [decode_string_json_decode_rows: 5] + + for {pattern, size} <- varints do + defp decode_string_json_decode_rows( + <>, + types_rest, + row, + rows, + types + ) do + decode_rows(types_rest, bin, [Jason.decode!(s) | row], rows, types) + end + end + @compile inline: [decode_binary_decode_rows: 5] for {pattern, size} <- varints do @@ -913,6 +936,12 @@ defmodule Ch.RowBinary do :binary -> decode_binary_decode_rows(bin, types_rest, row, rows, types) + :json -> + # assuming it arrives as text and not "native" binary JSON + # i.e. assumes `settings: [output_format_binary_write_json_as_string: 1]` + # TODO + decode_string_json_decode_rows(bin, types_rest, row, rows, types) + # TODO utf8? {:fixed_string, size} -> <> = bin diff --git a/lib/ch/types.ex b/lib/ch/types.ex index d48b5a5..881e016 100644 --- a/lib/ch/types.ex +++ b/lib/ch/types.ex @@ -29,6 +29,7 @@ defmodule Ch.Types do {"Time", :time, []}, {"Date32", :date32, []}, {"Date", :date, []}, + {"JSON", :json, []}, {"LowCardinality", :low_cardinality, [:type]}, for size <- [32, 64, 128, 256] do {"Decimal#{size}", :"decimal#{size}", [:int]} @@ -355,6 +356,7 @@ defmodule Ch.Types do end def decode("DateTime"), do: :datetime + def decode("JSON" <> _options), do: :json def decode(type) do try do diff --git a/test/ch/faults_test.exs b/test/ch/faults_test.exs index d20d984..9a41f4e 100644 --- a/test/ch/faults_test.exs +++ b/test/ch/faults_test.exs @@ -69,7 +69,7 @@ defmodule Ch.FaultsTest do # failed handshake handshake = intercept_packets(mint) - assert handshake =~ "select 1" + assert handshake =~ "select 1, version()" :ok = :gen_tcp.send(clickhouse, handshake) :ok = :gen_tcp.send(mint, first_byte(intercept_packets(clickhouse))) @@ -78,7 +78,7 @@ defmodule Ch.FaultsTest do # handshake handshake = intercept_packets(mint) - assert handshake =~ "select 1" + assert handshake =~ "select 1, version()" :ok = :gen_tcp.send(clickhouse, handshake) :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) end) @@ -96,7 +96,7 @@ defmodule Ch.FaultsTest do # failed handshake handshake = intercept_packets(mint) - assert handshake =~ "select 1" + assert handshake =~ "select 1, version()" :ok = :gen_tcp.send(clickhouse, handshake) :ok = :gen_tcp.send(mint, first_byte(intercept_packets(clickhouse))) :gen_tcp.close(mint) @@ -106,7 +106,7 @@ defmodule Ch.FaultsTest do # handshake handshake = intercept_packets(mint) - assert handshake =~ "select 1" + assert handshake =~ "select 1, version()" :ok = :gen_tcp.send(clickhouse, handshake) :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) end) @@ -126,8 +126,8 @@ defmodule Ch.FaultsTest do # failed handshake handshake = intercept_packets(mint1) - assert handshake =~ "select 1" - altered_handshake = String.replace(handshake, "select 1", "select ;") + assert handshake =~ "select 1, version()" + altered_handshake = String.replace(handshake, "select 1", "select x") :ok = :gen_tcp.send(clickhouse, altered_handshake) :ok = :gen_tcp.send(mint1, intercept_packets(clickhouse)) @@ -136,7 +136,7 @@ defmodule Ch.FaultsTest do # handshake handshake = intercept_packets(mint2) - assert handshake =~ "select 1" + assert handshake =~ "select 1, version()" :ok = :gen_tcp.send(clickhouse, handshake) :ok = :gen_tcp.send(mint2, intercept_packets(clickhouse)) @@ -145,7 +145,7 @@ defmodule Ch.FaultsTest do assert Port.info(mint2) end) - assert log =~ "failed to connect: ** (Ch.Error) Code: 62. DB::Exception: Syntax error" + assert log =~ "UNKNOWN_IDENTIFIER" end test "reconnects after incorrect query result", ctx do @@ -160,8 +160,11 @@ defmodule Ch.FaultsTest do # failed handshake handshake = intercept_packets(mint1) - assert handshake =~ "select 1" - altered_handshake = String.replace(handshake, "select 1", "select 2") + assert handshake =~ "select 1, version()" + + altered_handshake = + String.replace(handshake, "select 1, version()", "select 2, version()") + :ok = :gen_tcp.send(clickhouse, altered_handshake) :ok = :gen_tcp.send(mint1, intercept_packets(clickhouse)) @@ -170,7 +173,7 @@ defmodule Ch.FaultsTest do # handshake handshake = intercept_packets(mint2) - assert handshake =~ "select 1" + assert handshake =~ "select 1, version()" :ok = :gen_tcp.send(clickhouse, handshake) :ok = :gen_tcp.send(mint2, intercept_packets(clickhouse)) @@ -179,7 +182,7 @@ defmodule Ch.FaultsTest do assert Port.info(mint2) end) - assert log =~ "failed to connect: ** (Ch.Error) unexpected result for 'select 1'" + assert log =~ "failed to connect: ** (Ch.Error) unexpected result for 'select 1, version()'" end end diff --git a/test/ch/json_test.exs b/test/ch/json_test.exs new file mode 100644 index 0000000..0f83c96 --- /dev/null +++ b/test/ch/json_test.exs @@ -0,0 +1,99 @@ +defmodule Ch.JSONTest do + use ExUnit.Case + + @moduletag :json + + setup do + on_exit(fn -> + Ch.Test.query("DROP TABLE IF EXISTS test", [], database: Ch.Test.database()) + end) + + {:ok, conn: start_supervised!({Ch, database: Ch.Test.database()})} + end + + test "simple json", %{conn: conn} do + assert Ch.query!(conn, ~s|select '{"a":"b","c":"d"}'::json|).rows == [ + [%{"a" => "b", "c" => "d"}] + ] + + assert Ch.query!(conn, ~s|select '{"a":42}'::json|).rows == [[%{"a" => "42"}]] + assert Ch.query!(conn, ~s|select '{}'::json|).rows == [[%{}]] + assert Ch.query!(conn, ~s|select '{"a":null}'::json|).rows == [[%{}]] + assert Ch.query!(conn, ~s|select '{"a":3.14}'::json|).rows == [[%{"a" => 3.14}]] + assert Ch.query!(conn, ~s|select '{"a":true}'::json|).rows == [[%{"a" => true}]] + assert Ch.query!(conn, ~s|select '{"a":false}'::json|).rows == [[%{"a" => false}]] + assert Ch.query!(conn, ~s|select '{"a":{"b":"c"}}'::json|).rows == [[%{"a" => %{"b" => "c"}}]] + + assert Ch.query!(conn, ~s|select '{"a":[]}'::json|).rows == [ + [%{"a" => []}] + ] + + assert Ch.query!(conn, ~s|select '{"a":[null]}'::json|).rows == [ + [%{"a" => [nil]}] + ] + + assert Ch.query!(conn, ~s|select '{"a":[1,3.14,"hello",null]}'::json|).rows == [ + [%{"a" => ["1", "3.14", "hello", nil]}] + ] + + assert Ch.query!(conn, ~s|select '{"a":[1,2.13,"s",{"a":"b"}]}'::json|).rows == [ + [%{"a" => ["1", 2.13, "s", %{"a" => "b"}]}] + ] + end + + # https://clickhouse.com/docs/sql-reference/data-types/newjson#using-json-in-a-table-column-definition + test "basic", %{conn: conn} do + Ch.query!(conn, "CREATE TABLE test (json JSON) ENGINE = Memory") + + Ch.query!(conn, """ + INSERT INTO test VALUES + ('{"a" : {"b" : 42}, "c" : [1, 2, 3]}'), + ('{"f" : "Hello, World!"}'), + ('{"a" : {"b" : 43, "e" : 10}, "c" : [4, 5, 6]}') + """) + + assert Ch.query!( + conn, + "SELECT json FROM test" + ).rows == [ + [%{"a" => %{"b" => "42"}, "c" => ["1", "2", "3"]}], + [%{"f" => "Hello, World!"}], + [%{"a" => %{"b" => "43", "e" => "10"}, "c" => ["4", "5", "6"]}] + ] + + Ch.query!( + conn, + "INSERT INTO test FORMAT RowBinary", + [[%{"some other" => "json value", "from" => "rowbinary"}]], + types: ["JSON"] + ) + + assert Ch.query!( + conn, + "SELECT json FROM test where json.from = 'rowbinary'" + ).rows == [ + [%{"from" => "rowbinary", "some other" => "json value"}] + ] + end + + # https://clickhouse.com/docs/sql-reference/data-types/newjson#using-json-in-a-table-column-definition + test "with skip (i.e. extra type options)", %{conn: conn} do + Ch.query!(conn, "CREATE TABLE test (json JSON(a.b UInt32, SKIP a.e)) ENGINE = Memory;") + + Ch.query!(conn, """ + INSERT INTO test VALUES + ('{"a" : {"b" : 42}, "c" : [1, 2, 3]}'), + ('{"f" : "Hello, World!"}'), + ('{"a" : {"b" : 43, "e" : 10}, "c" : [4, 5, 6]}'); + """) + + assert Ch.query!( + conn, + "SELECT json FROM test" + ).rows == [ + [%{"a" => %{"b" => 42}, "c" => ["1", "2", "3"]}], + [%{"a" => %{"b" => 0}, "f" => "Hello, World!"}], + [%{"a" => %{"b" => 43}, "c" => ["4", "5", "6"]}] + ] + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index 64c1467..83bc49b 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -30,8 +30,8 @@ extra_exclude = if ch_version >= "25" do [] else - # Time and Variant types are not supported in older ClickHouse versions - [:time, :variant] + # Time, Variant, and JSON types are not supported in older ClickHouse versions we have in the CI + [:time, :variant, :json] end ExUnit.start(exclude: [:slow | extra_exclude])