From 378da444b5f31792080bb319ce759d73488b1b0a Mon Sep 17 00:00:00 2001 From: ruslandoga Date: Wed, 16 Jul 2025 14:40:23 +0300 Subject: [PATCH 1/8] decode works --- lib/ch/row_binary.ex | 18 ++++++++++++++++ lib/ch/types.ex | 1 + test/ch/json_test.exs | 50 +++++++++++++++++++++++++++++++++++++++++++ test/test_helper.exs | 4 ++-- 4 files changed, 71 insertions(+), 2 deletions(-) create mode 100644 test/ch/json_test.exs diff --git a/lib/ch/row_binary.ex b/lib/ch/row_binary.ex index 8cefaed..fe7772d 100644 --- a/lib/ch/row_binary.ex +++ b/lib/ch/row_binary.ex @@ -573,6 +573,7 @@ defmodule Ch.RowBinary do when t in [ :string, :binary, + :json, :boolean, :uuid, :date, @@ -740,6 +741,20 @@ defmodule Ch.RowBinary do defp utf8_size(codepoint) when codepoint <= 0xFFFF, do: 3 defp utf8_size(codepoint) when codepoint <= 0x10FFFF, do: 4 + @compile inline: [decode_string_json_decode_rows: 5] + + for {pattern, size} <- varints do + defp decode_string_json_decode_rows( + <>, + types_rest, + row, + rows, + types + ) do + decode_rows(types_rest, bin, [Jason.decode!(s) | row], rows, types) + end + end + @compile inline: [decode_binary_decode_rows: 5] for {pattern, size} <- varints do @@ -884,6 +899,9 @@ defmodule Ch.RowBinary do :binary -> decode_binary_decode_rows(bin, types_rest, row, rows, types) + :json -> + decode_string_json_decode_rows(bin, types_rest, row, rows, types) + # TODO utf8? {:fixed_string, size} -> <> = bin diff --git a/lib/ch/types.ex b/lib/ch/types.ex index 0c96da8..e963d72 100644 --- a/lib/ch/types.ex +++ b/lib/ch/types.ex @@ -28,6 +28,7 @@ defmodule Ch.Types do {"Time", :time, []}, {"Date32", :date32, []}, {"Date", :date, []}, + {"JSON", :json, []}, {"LowCardinality", :low_cardinality, [:type]}, for size <- [32, 64, 128, 256] do {"Decimal#{size}", :"decimal#{size}", [:int]} diff --git a/test/ch/json_test.exs b/test/ch/json_test.exs new file mode 100644 index 0000000..d41ad94 --- /dev/null +++ b/test/ch/json_test.exs @@ -0,0 +1,50 @@ +defmodule Ch.JSONTest do + use ExUnit.Case, async: true + + @moduletag :json + + setup do + conn = + start_supervised!( + {Ch, + database: Ch.Test.database(), + settings: [ + enable_json_type: 1, + output_format_binary_write_json_as_string: 1, + input_format_binary_read_json_as_string: 1 + ]} + ) + + {:ok, conn: conn} + end + + test "simple json", %{conn: conn} do + assert Ch.query!(conn, ~s|select '{"a":"b","c":"d"}'::json|).rows == [ + [%{"a" => "b", "c" => "d"}] + ] + + assert Ch.query!(conn, ~s|select '{"a":42}'::json|).rows == [[%{"a" => "42"}]] + assert Ch.query!(conn, ~s|select '{}'::json|).rows == [[%{}]] + assert Ch.query!(conn, ~s|select '{"a":null}'::json|).rows == [[%{}]] + assert Ch.query!(conn, ~s|select '{"a":3.14}'::json|).rows == [[%{"a" => 3.14}]] + assert Ch.query!(conn, ~s|select '{"a":true}'::json|).rows == [[%{"a" => true}]] + assert Ch.query!(conn, ~s|select '{"a":false}'::json|).rows == [[%{"a" => false}]] + assert Ch.query!(conn, ~s|select '{"a":{"b":"c"}}'::json|).rows == [[%{"a" => %{"b" => "c"}}]] + + assert Ch.query!(conn, ~s|select '{"a":[]}'::json|).rows == [ + [%{"a" => []}] + ] + + assert Ch.query!(conn, ~s|select '{"a":[null]}'::json|).rows == [ + [%{"a" => [nil]}] + ] + + assert Ch.query!(conn, ~s|select '{"a":[1,3.14,"hello",null]}'::json|).rows == [ + [%{"a" => ["1", "3.14", "hello", nil]}] + ] + + assert Ch.query!(conn, ~s|select '{"a":[1,2.13,"s",{"a":"b"}]}'::json|).rows == [ + [%{"a" => ["1", 2.13, "s", %{"a" => "b"}]}] + ] + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index dd11412..189b4d4 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -30,8 +30,8 @@ extra_exclude = if ch_version >= "25" do [] else - # Time type is not supported in ClickHouse < 25 - [:time] + # Time and JSON types are not supported in ClickHouse < 25 + [:time, :json] end ExUnit.start(exclude: [:slow | extra_exclude]) From e808a06292b7020d848bfca76c6b1332b09ea317 Mon Sep 17 00:00:00 2001 From: ruslandoga Date: Wed, 16 Jul 2025 14:46:03 +0300 Subject: [PATCH 2/8] seems to work --- lib/ch/row_binary.ex | 5 +++++ test/ch/json_test.exs | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/lib/ch/row_binary.ex b/lib/ch/row_binary.ex index fe7772d..5688e6b 100644 --- a/lib/ch/row_binary.ex +++ b/lib/ch/row_binary.ex @@ -93,6 +93,7 @@ defmodule Ch.RowBinary do when t in [ :string, :binary, + :json, :boolean, :uuid, :date, @@ -187,6 +188,10 @@ defmodule Ch.RowBinary do end end + def encode(:json, json) do + encode(:string, Jason.encode_to_iodata!(json)) + end + def encode({:fixed_string, size}, str) when byte_size(str) == size do str end diff --git a/test/ch/json_test.exs b/test/ch/json_test.exs index d41ad94..da93f73 100644 --- a/test/ch/json_test.exs +++ b/test/ch/json_test.exs @@ -3,6 +3,8 @@ defmodule Ch.JSONTest do @moduletag :json + @table "json_test" + setup do conn = start_supervised!( @@ -15,6 +17,10 @@ defmodule Ch.JSONTest do ]} ) + on_exit(fn -> + Ch.Test.query("DROP TABLE IF EXISTS #{@table}", [], database: Ch.Test.database()) + end) + {:ok, conn: conn} end @@ -47,4 +53,38 @@ defmodule Ch.JSONTest do [%{"a" => ["1", 2.13, "s", %{"a" => "b"}]}] ] end + + test "creating, inserting, and reading json", %{conn: conn} do + Ch.query!(conn, "CREATE TABLE #{@table} (json JSON) ENGINE = Memory") + + Ch.query!(conn, """ + INSERT INTO #{@table} VALUES + ('{"a" : {"b" : 42}, "c" : [1, 2, 3]}'), + ('{"f" : "Hello, World!"}'), + ('{"a" : {"b" : 43, "e" : 10}, "c" : [4, 5, 6]}') + """) + + assert Ch.query!( + conn, + "SELECT json FROM #{@table}" + ).rows == [ + [%{"a" => %{"b" => "42"}, "c" => ["1", "2", "3"]}], + [%{"f" => "Hello, World!"}], + [%{"a" => %{"b" => "43", "e" => "10"}, "c" => ["4", "5", "6"]}] + ] + + Ch.query!( + conn, + "INSERT INTO #{@table} FORMAT RowBinary", + [[%{"some other" => "json value", "from" => "rowbinary"}]], + types: ["JSON"] + ) + + assert Ch.query!( + conn, + "SELECT json FROM #{@table} where json.from = 'rowbinary'" + ).rows == [ + [%{"from" => "rowbinary", "some other" => "json value"}] + ] + end end From 1001146f2f52b3d3c707f933a5f6fa5a9eecd412 Mon Sep 17 00:00:00 2001 From: ruslandoga Date: Wed, 16 Jul 2025 14:51:10 +0300 Subject: [PATCH 3/8] comments --- lib/ch/row_binary.ex | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lib/ch/row_binary.ex b/lib/ch/row_binary.ex index 5688e6b..7856b94 100644 --- a/lib/ch/row_binary.ex +++ b/lib/ch/row_binary.ex @@ -189,6 +189,9 @@ defmodule Ch.RowBinary do end def encode(:json, json) do + # assuming it can be sent as text and not "native" binary JSON + # i.e. assumes `settings: [input_format_binary_read_json_as_string: 1]` + # TODO encode(:string, Jason.encode_to_iodata!(json)) end @@ -905,6 +908,9 @@ defmodule Ch.RowBinary do decode_binary_decode_rows(bin, types_rest, row, rows, types) :json -> + # assuming it arrives as text and not "native" binary JSON + # i.e. assumes `settings: [output_format_binary_write_json_as_string: 1]` + # TODO decode_string_json_decode_rows(bin, types_rest, row, rows, types) # TODO utf8? From 2e5fc8cc1a5af86852c17ad5f42c84fc2a5eced9 Mon Sep 17 00:00:00 2001 From: ruslandoga Date: Wed, 16 Jul 2025 14:54:15 +0300 Subject: [PATCH 4/8] support JSON(...) type --- lib/ch/types.ex | 1 + test/ch/json_test.exs | 40 ++++++++++++++++++++++++++++++---------- 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/lib/ch/types.ex b/lib/ch/types.ex index e963d72..fcc4b37 100644 --- a/lib/ch/types.ex +++ b/lib/ch/types.ex @@ -325,6 +325,7 @@ defmodule Ch.Types do end def decode("DateTime"), do: :datetime + def decode("JSON" <> _options), do: :json def decode(type) do try do diff --git a/test/ch/json_test.exs b/test/ch/json_test.exs index da93f73..e0b7ca7 100644 --- a/test/ch/json_test.exs +++ b/test/ch/json_test.exs @@ -1,10 +1,8 @@ defmodule Ch.JSONTest do - use ExUnit.Case, async: true + use ExUnit.Case @moduletag :json - @table "json_test" - setup do conn = start_supervised!( @@ -18,7 +16,7 @@ defmodule Ch.JSONTest do ) on_exit(fn -> - Ch.Test.query("DROP TABLE IF EXISTS #{@table}", [], database: Ch.Test.database()) + Ch.Test.query("DROP TABLE IF EXISTS test", [], database: Ch.Test.database()) end) {:ok, conn: conn} @@ -54,11 +52,12 @@ defmodule Ch.JSONTest do ] end - test "creating, inserting, and reading json", %{conn: conn} do - Ch.query!(conn, "CREATE TABLE #{@table} (json JSON) ENGINE = Memory") + # https://clickhouse.com/docs/sql-reference/data-types/newjson#using-json-in-a-table-column-definition + test "basic", %{conn: conn} do + Ch.query!(conn, "CREATE TABLE test (json JSON) ENGINE = Memory") Ch.query!(conn, """ - INSERT INTO #{@table} VALUES + INSERT INTO test VALUES ('{"a" : {"b" : 42}, "c" : [1, 2, 3]}'), ('{"f" : "Hello, World!"}'), ('{"a" : {"b" : 43, "e" : 10}, "c" : [4, 5, 6]}') @@ -66,7 +65,7 @@ defmodule Ch.JSONTest do assert Ch.query!( conn, - "SELECT json FROM #{@table}" + "SELECT json FROM test" ).rows == [ [%{"a" => %{"b" => "42"}, "c" => ["1", "2", "3"]}], [%{"f" => "Hello, World!"}], @@ -75,16 +74,37 @@ defmodule Ch.JSONTest do Ch.query!( conn, - "INSERT INTO #{@table} FORMAT RowBinary", + "INSERT INTO test FORMAT RowBinary", [[%{"some other" => "json value", "from" => "rowbinary"}]], types: ["JSON"] ) assert Ch.query!( conn, - "SELECT json FROM #{@table} where json.from = 'rowbinary'" + "SELECT json FROM test where json.from = 'rowbinary'" ).rows == [ [%{"from" => "rowbinary", "some other" => "json value"}] ] end + + # https://clickhouse.com/docs/sql-reference/data-types/newjson#using-json-in-a-table-column-definition + test "with skip (i.e. extra type options)", %{conn: conn} do + Ch.query!(conn, "CREATE TABLE test (json JSON(a.b UInt32, SKIP a.e)) ENGINE = Memory;") + + Ch.query!(conn, """ + INSERT INTO test VALUES + ('{"a" : {"b" : 42}, "c" : [1, 2, 3]}'), + ('{"f" : "Hello, World!"}'), + ('{"a" : {"b" : 43, "e" : 10}, "c" : [4, 5, 6]}'); + """) + + assert Ch.query!( + conn, + "SELECT json FROM test" + ).rows == [ + [%{"a" => %{"b" => 42}, "c" => ["1", "2", "3"]}], + [%{"a" => %{"b" => 0}, "f" => "Hello, World!"}], + [%{"a" => %{"b" => 43}, "c" => ["4", "5", "6"]}] + ] + end end From abeb446f03d3624f23626822469b7ef8bf42e3fa Mon Sep 17 00:00:00 2001 From: ruslandoga Date: Wed, 16 Jul 2025 15:12:44 +0300 Subject: [PATCH 5/8] allow casting JSON --- lib/ch.ex | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/ch.ex b/lib/ch.ex index 52a5f6b..4f8cf3f 100644 --- a/lib/ch.ex +++ b/lib/ch.ex @@ -147,6 +147,7 @@ defmodule Ch do def cast(value, {:datetime64, _p}), do: Ecto.Type.cast(:naive_datetime_usec, value) def cast(value, {:datetime64, _p, "UTC"}), do: Ecto.Type.cast(:utc_datetime_usec, value) def cast(value, {:fixed_string, _s}), do: Ecto.Type.cast(:string, value) + def cast(value, :json), do: Ecto.Type.cast(:map, value) for size <- [8, 16, 32, 64, 128, 256] do def cast(value, unquote(:"i#{size}")), do: Ecto.Type.cast(:integer, value) From 30c70c681e78b02baf80b2ff0ed5615035b64678 Mon Sep 17 00:00:00 2001 From: ruslandoga Date: Thu, 17 Jul 2025 19:52:57 +0300 Subject: [PATCH 6/8] continue --- CHANGELOG.md | 1 + lib/ch/connection.ex | 9 ++++++++- test/ch/json_test.exs | 13 +------------ 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ebc556..0984430 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - add [Time](https://clickhouse.com/docs/sql-reference/data-types/time) and [Time64](https://clickhouse.com/docs/sql-reference/data-types/time64) types support https://github.com/plausible/ch/pull/260 - add [Variant](https://clickhouse.com/docs/sql-reference/data-types/variant) type support https://github.com/plausible/ch/pull/263 +- add [JSON](https://clickhouse.com/docs/sql-reference/data-types/newjson) type support https://github.com/plausible/ch/pull/262 ## 0.4.1 (2025-07-07) diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index 40970ae..8f86531 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -416,6 +416,13 @@ defmodule Ch.Connection do scheme = String.to_existing_atom(opts[:scheme] || "http") address = opts[:hostname] || "localhost" port = opts[:port] || 8123 + settings = opts[:settings] || [] + + settings = + settings + |> Keyword.put_new(:output_format_binary_write_json_as_string, 1) + |> Keyword.put_new(:input_format_binary_read_json_as_string, 1) + mint_opts = [mode: :passive] ++ Keyword.take(opts, [:hostname, :transport_opts]) with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do @@ -425,7 +432,7 @@ defmodule Ch.Connection do |> maybe_put_private(:database, opts[:database]) |> maybe_put_private(:username, opts[:username]) |> maybe_put_private(:password, opts[:password]) - |> maybe_put_private(:settings, opts[:settings]) + |> maybe_put_private(:settings, settings) |> maybe_put_private(:connect_options, opts) {:ok, conn} diff --git a/test/ch/json_test.exs b/test/ch/json_test.exs index e0b7ca7..0f83c96 100644 --- a/test/ch/json_test.exs +++ b/test/ch/json_test.exs @@ -4,22 +4,11 @@ defmodule Ch.JSONTest do @moduletag :json setup do - conn = - start_supervised!( - {Ch, - database: Ch.Test.database(), - settings: [ - enable_json_type: 1, - output_format_binary_write_json_as_string: 1, - input_format_binary_read_json_as_string: 1 - ]} - ) - on_exit(fn -> Ch.Test.query("DROP TABLE IF EXISTS test", [], database: Ch.Test.database()) end) - {:ok, conn: conn} + {:ok, conn: start_supervised!({Ch, database: Ch.Test.database()})} end test "simple json", %{conn: conn} do From 09c927d33dcca80018c07a4bbaf05102194ebe5e Mon Sep 17 00:00:00 2001 From: ruslandoga Date: Thu, 17 Jul 2025 20:12:47 +0300 Subject: [PATCH 7/8] continue --- lib/ch/connection.ex | 19 ++++++++++++++++--- test/ch/faults_test.exs | 27 +++++++++++++++------------ 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index 8f86531..509d0d6 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -13,13 +13,25 @@ defmodule Ch.Connection do @spec connect([Ch.start_option()]) :: {:ok, conn} | {:error, Error.t() | Mint.Types.error()} def connect(opts) do with {:ok, conn} <- do_connect(opts) do - handshake = Query.build("select 1") + handshake = Query.build("select 1, version()") params = DBConnection.Query.encode(handshake, _params = [], _opts = []) case handle_execute(handshake, params, _opts = [], conn) do {:ok, handshake, responses, conn} -> case DBConnection.Query.decode(handshake, responses, _opts = []) do - %Result{rows: [[1]]} -> + %Result{rows: [[1, version]]} -> + conn = + if version >= "24.10" do + settings = + HTTP.get_private(conn, :settings, []) + |> Keyword.put_new(:input_format_binary_read_json_as_string, 1) + |> Keyword.put_new(:output_format_binary_write_json_as_string, 1) + + HTTP.put_private(conn, :settings, settings) + else + conn + end + {:ok, conn} result -> @@ -405,7 +417,8 @@ defmodule Ch.Connection do "The connection was closed by the server; a new connection has been successfully reestablished." ) - new_conn + # copy settings that are set dynamically (e.g. json as text) over to the new connection + maybe_put_private(new_conn, :settings, HTTP.get_private(conn, :settings)) else _ -> conn end diff --git a/test/ch/faults_test.exs b/test/ch/faults_test.exs index d20d984..9a41f4e 100644 --- a/test/ch/faults_test.exs +++ b/test/ch/faults_test.exs @@ -69,7 +69,7 @@ defmodule Ch.FaultsTest do # failed handshake handshake = intercept_packets(mint) - assert handshake =~ "select 1" + assert handshake =~ "select 1, version()" :ok = :gen_tcp.send(clickhouse, handshake) :ok = :gen_tcp.send(mint, first_byte(intercept_packets(clickhouse))) @@ -78,7 +78,7 @@ defmodule Ch.FaultsTest do # handshake handshake = intercept_packets(mint) - assert handshake =~ "select 1" + assert handshake =~ "select 1, version()" :ok = :gen_tcp.send(clickhouse, handshake) :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) end) @@ -96,7 +96,7 @@ defmodule Ch.FaultsTest do # failed handshake handshake = intercept_packets(mint) - assert handshake =~ "select 1" + assert handshake =~ "select 1, version()" :ok = :gen_tcp.send(clickhouse, handshake) :ok = :gen_tcp.send(mint, first_byte(intercept_packets(clickhouse))) :gen_tcp.close(mint) @@ -106,7 +106,7 @@ defmodule Ch.FaultsTest do # handshake handshake = intercept_packets(mint) - assert handshake =~ "select 1" + assert handshake =~ "select 1, version()" :ok = :gen_tcp.send(clickhouse, handshake) :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) end) @@ -126,8 +126,8 @@ defmodule Ch.FaultsTest do # failed handshake handshake = intercept_packets(mint1) - assert handshake =~ "select 1" - altered_handshake = String.replace(handshake, "select 1", "select ;") + assert handshake =~ "select 1, version()" + altered_handshake = String.replace(handshake, "select 1", "select x") :ok = :gen_tcp.send(clickhouse, altered_handshake) :ok = :gen_tcp.send(mint1, intercept_packets(clickhouse)) @@ -136,7 +136,7 @@ defmodule Ch.FaultsTest do # handshake handshake = intercept_packets(mint2) - assert handshake =~ "select 1" + assert handshake =~ "select 1, version()" :ok = :gen_tcp.send(clickhouse, handshake) :ok = :gen_tcp.send(mint2, intercept_packets(clickhouse)) @@ -145,7 +145,7 @@ defmodule Ch.FaultsTest do assert Port.info(mint2) end) - assert log =~ "failed to connect: ** (Ch.Error) Code: 62. DB::Exception: Syntax error" + assert log =~ "UNKNOWN_IDENTIFIER" end test "reconnects after incorrect query result", ctx do @@ -160,8 +160,11 @@ defmodule Ch.FaultsTest do # failed handshake handshake = intercept_packets(mint1) - assert handshake =~ "select 1" - altered_handshake = String.replace(handshake, "select 1", "select 2") + assert handshake =~ "select 1, version()" + + altered_handshake = + String.replace(handshake, "select 1, version()", "select 2, version()") + :ok = :gen_tcp.send(clickhouse, altered_handshake) :ok = :gen_tcp.send(mint1, intercept_packets(clickhouse)) @@ -170,7 +173,7 @@ defmodule Ch.FaultsTest do # handshake handshake = intercept_packets(mint2) - assert handshake =~ "select 1" + assert handshake =~ "select 1, version()" :ok = :gen_tcp.send(clickhouse, handshake) :ok = :gen_tcp.send(mint2, intercept_packets(clickhouse)) @@ -179,7 +182,7 @@ defmodule Ch.FaultsTest do assert Port.info(mint2) end) - assert log =~ "failed to connect: ** (Ch.Error) unexpected result for 'select 1'" + assert log =~ "failed to connect: ** (Ch.Error) unexpected result for 'select 1, version()'" end end From 23ab65c8a86c3af669ab6a55a2caafd768f60321 Mon Sep 17 00:00:00 2001 From: ruslandoga Date: Thu, 17 Jul 2025 20:14:42 +0300 Subject: [PATCH 8/8] cleanup --- lib/ch/connection.ex | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index 509d0d6..f94b50a 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -429,13 +429,6 @@ defmodule Ch.Connection do scheme = String.to_existing_atom(opts[:scheme] || "http") address = opts[:hostname] || "localhost" port = opts[:port] || 8123 - settings = opts[:settings] || [] - - settings = - settings - |> Keyword.put_new(:output_format_binary_write_json_as_string, 1) - |> Keyword.put_new(:input_format_binary_read_json_as_string, 1) - mint_opts = [mode: :passive] ++ Keyword.take(opts, [:hostname, :transport_opts]) with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do @@ -445,7 +438,7 @@ defmodule Ch.Connection do |> maybe_put_private(:database, opts[:database]) |> maybe_put_private(:username, opts[:username]) |> maybe_put_private(:password, opts[:password]) - |> maybe_put_private(:settings, settings) + |> maybe_put_private(:settings, opts[:settings]) |> maybe_put_private(:connect_options, opts) {:ok, conn}