Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- add [Time](https://clickhouse.com/docs/sql-reference/data-types/time) and [Time64](https://clickhouse.com/docs/sql-reference/data-types/time64) types support https://github.com/plausible/ch/pull/260
- add [Variant](https://clickhouse.com/docs/sql-reference/data-types/variant) type support https://github.com/plausible/ch/pull/263
- add [JSON](https://clickhouse.com/docs/sql-reference/data-types/newjson) type support https://github.com/plausible/ch/pull/262

## 0.4.1 (2025-07-07)

Expand Down
1 change: 1 addition & 0 deletions lib/ch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ defmodule Ch do
def cast(value, {:datetime64, _p}), do: Ecto.Type.cast(:naive_datetime_usec, value)
def cast(value, {:datetime64, _p, "UTC"}), do: Ecto.Type.cast(:utc_datetime_usec, value)
def cast(value, {:fixed_string, _s}), do: Ecto.Type.cast(:string, value)
def cast(value, :json), do: Ecto.Type.cast(:map, value)

for size <- [8, 16, 32, 64, 128, 256] do
def cast(value, unquote(:"i#{size}")), do: Ecto.Type.cast(:integer, value)
Expand Down
19 changes: 16 additions & 3 deletions lib/ch/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,25 @@ defmodule Ch.Connection do
@spec connect([Ch.start_option()]) :: {:ok, conn} | {:error, Error.t() | Mint.Types.error()}
def connect(opts) do
with {:ok, conn} <- do_connect(opts) do
handshake = Query.build("select 1")
handshake = Query.build("select 1, version()")
params = DBConnection.Query.encode(handshake, _params = [], _opts = [])

case handle_execute(handshake, params, _opts = [], conn) do
{:ok, handshake, responses, conn} ->
case DBConnection.Query.decode(handshake, responses, _opts = []) do
%Result{rows: [[1]]} ->
%Result{rows: [[1, version]]} ->
conn =
if version >= "24.10" do
settings =
HTTP.get_private(conn, :settings, [])
|> Keyword.put_new(:input_format_binary_read_json_as_string, 1)
|> Keyword.put_new(:output_format_binary_write_json_as_string, 1)

HTTP.put_private(conn, :settings, settings)
else
conn
end

{:ok, conn}

result ->
Expand Down Expand Up @@ -405,7 +417,8 @@ defmodule Ch.Connection do
"The connection was closed by the server; a new connection has been successfully reestablished."
)

new_conn
# copy settings that are set dynamically (e.g. json as text) over to the new connection
maybe_put_private(new_conn, :settings, HTTP.get_private(conn, :settings))
else
_ -> conn
end
Expand Down
29 changes: 29 additions & 0 deletions lib/ch/row_binary.ex
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ defmodule Ch.RowBinary do
when t in [
:string,
:binary,
:json,
:boolean,
:uuid,
:date,
Expand Down Expand Up @@ -191,6 +192,13 @@ defmodule Ch.RowBinary do
end
end

def encode(:json, json) do
# assuming it can be sent as text and not "native" binary JSON
# i.e. assumes `settings: [input_format_binary_read_json_as_string: 1]`
# TODO
encode(:string, Jason.encode_to_iodata!(json))
end

def encode({:fixed_string, size}, str) when byte_size(str) == size do
str
end
Expand Down Expand Up @@ -598,6 +606,7 @@ defmodule Ch.RowBinary do
when t in [
:string,
:binary,
:json,
:boolean,
:uuid,
:date,
Expand Down Expand Up @@ -769,6 +778,20 @@ defmodule Ch.RowBinary do
defp utf8_size(codepoint) when codepoint <= 0xFFFF, do: 3
defp utf8_size(codepoint) when codepoint <= 0x10FFFF, do: 4

@compile inline: [decode_string_json_decode_rows: 5]

for {pattern, size} <- varints do
defp decode_string_json_decode_rows(
<<unquote(pattern), s::size(unquote(size))-bytes, bin::bytes>>,
types_rest,
row,
rows,
types
) do
decode_rows(types_rest, bin, [Jason.decode!(s) | row], rows, types)
end
end

@compile inline: [decode_binary_decode_rows: 5]

for {pattern, size} <- varints do
Expand Down Expand Up @@ -913,6 +936,12 @@ defmodule Ch.RowBinary do
:binary ->
decode_binary_decode_rows(bin, types_rest, row, rows, types)

:json ->
# assuming it arrives as text and not "native" binary JSON
# i.e. assumes `settings: [output_format_binary_write_json_as_string: 1]`
# TODO
decode_string_json_decode_rows(bin, types_rest, row, rows, types)

# TODO utf8?
{:fixed_string, size} ->
<<s::size(size)-bytes, bin::bytes>> = bin
Expand Down
2 changes: 2 additions & 0 deletions lib/ch/types.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ defmodule Ch.Types do
{"Time", :time, []},
{"Date32", :date32, []},
{"Date", :date, []},
{"JSON", :json, []},
{"LowCardinality", :low_cardinality, [:type]},
for size <- [32, 64, 128, 256] do
{"Decimal#{size}", :"decimal#{size}", [:int]}
Expand Down Expand Up @@ -355,6 +356,7 @@ defmodule Ch.Types do
end

def decode("DateTime"), do: :datetime
def decode("JSON" <> _options), do: :json

def decode(type) do
try do
Expand Down
27 changes: 15 additions & 12 deletions test/ch/faults_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ defmodule Ch.FaultsTest do

# failed handshake
handshake = intercept_packets(mint)
assert handshake =~ "select 1"
assert handshake =~ "select 1, version()"
:ok = :gen_tcp.send(clickhouse, handshake)
:ok = :gen_tcp.send(mint, first_byte(intercept_packets(clickhouse)))

Expand All @@ -78,7 +78,7 @@ defmodule Ch.FaultsTest do

# handshake
handshake = intercept_packets(mint)
assert handshake =~ "select 1"
assert handshake =~ "select 1, version()"
:ok = :gen_tcp.send(clickhouse, handshake)
:ok = :gen_tcp.send(mint, intercept_packets(clickhouse))
end)
Expand All @@ -96,7 +96,7 @@ defmodule Ch.FaultsTest do

# failed handshake
handshake = intercept_packets(mint)
assert handshake =~ "select 1"
assert handshake =~ "select 1, version()"
:ok = :gen_tcp.send(clickhouse, handshake)
:ok = :gen_tcp.send(mint, first_byte(intercept_packets(clickhouse)))
:gen_tcp.close(mint)
Expand All @@ -106,7 +106,7 @@ defmodule Ch.FaultsTest do

# handshake
handshake = intercept_packets(mint)
assert handshake =~ "select 1"
assert handshake =~ "select 1, version()"
:ok = :gen_tcp.send(clickhouse, handshake)
:ok = :gen_tcp.send(mint, intercept_packets(clickhouse))
end)
Expand All @@ -126,8 +126,8 @@ defmodule Ch.FaultsTest do

# failed handshake
handshake = intercept_packets(mint1)
assert handshake =~ "select 1"
altered_handshake = String.replace(handshake, "select 1", "select ;")
assert handshake =~ "select 1, version()"
altered_handshake = String.replace(handshake, "select 1", "select x")
:ok = :gen_tcp.send(clickhouse, altered_handshake)
:ok = :gen_tcp.send(mint1, intercept_packets(clickhouse))

Expand All @@ -136,7 +136,7 @@ defmodule Ch.FaultsTest do

# handshake
handshake = intercept_packets(mint2)
assert handshake =~ "select 1"
assert handshake =~ "select 1, version()"
:ok = :gen_tcp.send(clickhouse, handshake)
:ok = :gen_tcp.send(mint2, intercept_packets(clickhouse))

Expand All @@ -145,7 +145,7 @@ defmodule Ch.FaultsTest do
assert Port.info(mint2)
end)

assert log =~ "failed to connect: ** (Ch.Error) Code: 62. DB::Exception: Syntax error"
assert log =~ "UNKNOWN_IDENTIFIER"
end

test "reconnects after incorrect query result", ctx do
Expand All @@ -160,8 +160,11 @@ defmodule Ch.FaultsTest do

# failed handshake
handshake = intercept_packets(mint1)
assert handshake =~ "select 1"
altered_handshake = String.replace(handshake, "select 1", "select 2")
assert handshake =~ "select 1, version()"

altered_handshake =
String.replace(handshake, "select 1, version()", "select 2, version()")

:ok = :gen_tcp.send(clickhouse, altered_handshake)
:ok = :gen_tcp.send(mint1, intercept_packets(clickhouse))

Expand All @@ -170,7 +173,7 @@ defmodule Ch.FaultsTest do

# handshake
handshake = intercept_packets(mint2)
assert handshake =~ "select 1"
assert handshake =~ "select 1, version()"
:ok = :gen_tcp.send(clickhouse, handshake)
:ok = :gen_tcp.send(mint2, intercept_packets(clickhouse))

Expand All @@ -179,7 +182,7 @@ defmodule Ch.FaultsTest do
assert Port.info(mint2)
end)

assert log =~ "failed to connect: ** (Ch.Error) unexpected result for 'select 1'"
assert log =~ "failed to connect: ** (Ch.Error) unexpected result for 'select 1, version()'"
end
end

Expand Down
99 changes: 99 additions & 0 deletions test/ch/json_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
defmodule Ch.JSONTest do
use ExUnit.Case

@moduletag :json

setup do
on_exit(fn ->
Ch.Test.query("DROP TABLE IF EXISTS test", [], database: Ch.Test.database())
end)

{:ok, conn: start_supervised!({Ch, database: Ch.Test.database()})}
end

test "simple json", %{conn: conn} do
assert Ch.query!(conn, ~s|select '{"a":"b","c":"d"}'::json|).rows == [
[%{"a" => "b", "c" => "d"}]
]

assert Ch.query!(conn, ~s|select '{"a":42}'::json|).rows == [[%{"a" => "42"}]]
assert Ch.query!(conn, ~s|select '{}'::json|).rows == [[%{}]]
assert Ch.query!(conn, ~s|select '{"a":null}'::json|).rows == [[%{}]]
assert Ch.query!(conn, ~s|select '{"a":3.14}'::json|).rows == [[%{"a" => 3.14}]]
assert Ch.query!(conn, ~s|select '{"a":true}'::json|).rows == [[%{"a" => true}]]
assert Ch.query!(conn, ~s|select '{"a":false}'::json|).rows == [[%{"a" => false}]]
assert Ch.query!(conn, ~s|select '{"a":{"b":"c"}}'::json|).rows == [[%{"a" => %{"b" => "c"}}]]

assert Ch.query!(conn, ~s|select '{"a":[]}'::json|).rows == [
[%{"a" => []}]
]

assert Ch.query!(conn, ~s|select '{"a":[null]}'::json|).rows == [
[%{"a" => [nil]}]
]

assert Ch.query!(conn, ~s|select '{"a":[1,3.14,"hello",null]}'::json|).rows == [
[%{"a" => ["1", "3.14", "hello", nil]}]
]

assert Ch.query!(conn, ~s|select '{"a":[1,2.13,"s",{"a":"b"}]}'::json|).rows == [
[%{"a" => ["1", 2.13, "s", %{"a" => "b"}]}]
]
end

# https://clickhouse.com/docs/sql-reference/data-types/newjson#using-json-in-a-table-column-definition
test "basic", %{conn: conn} do
Ch.query!(conn, "CREATE TABLE test (json JSON) ENGINE = Memory")

Ch.query!(conn, """
INSERT INTO test VALUES
('{"a" : {"b" : 42}, "c" : [1, 2, 3]}'),
('{"f" : "Hello, World!"}'),
('{"a" : {"b" : 43, "e" : 10}, "c" : [4, 5, 6]}')
""")

assert Ch.query!(
conn,
"SELECT json FROM test"
).rows == [
[%{"a" => %{"b" => "42"}, "c" => ["1", "2", "3"]}],
[%{"f" => "Hello, World!"}],
[%{"a" => %{"b" => "43", "e" => "10"}, "c" => ["4", "5", "6"]}]
]

Ch.query!(
conn,
"INSERT INTO test FORMAT RowBinary",
[[%{"some other" => "json value", "from" => "rowbinary"}]],
types: ["JSON"]
)

assert Ch.query!(
conn,
"SELECT json FROM test where json.from = 'rowbinary'"
).rows == [
[%{"from" => "rowbinary", "some other" => "json value"}]
]
end

# https://clickhouse.com/docs/sql-reference/data-types/newjson#using-json-in-a-table-column-definition
test "with skip (i.e. extra type options)", %{conn: conn} do
Ch.query!(conn, "CREATE TABLE test (json JSON(a.b UInt32, SKIP a.e)) ENGINE = Memory;")

Ch.query!(conn, """
INSERT INTO test VALUES
('{"a" : {"b" : 42}, "c" : [1, 2, 3]}'),
('{"f" : "Hello, World!"}'),
('{"a" : {"b" : 43, "e" : 10}, "c" : [4, 5, 6]}');
""")

assert Ch.query!(
conn,
"SELECT json FROM test"
).rows == [
[%{"a" => %{"b" => 42}, "c" => ["1", "2", "3"]}],
[%{"a" => %{"b" => 0}, "f" => "Hello, World!"}],
[%{"a" => %{"b" => 43}, "c" => ["4", "5", "6"]}]
]
end
end
4 changes: 2 additions & 2 deletions test/test_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ extra_exclude =
if ch_version >= "25" do
[]
else
# Time and Variant types are not supported in older ClickHouse versions
[:time, :variant]
# Time, Variant, and JSON types are not supported in older ClickHouse versions we have in the CI
[:time, :variant, :json]
end

ExUnit.start(exclude: [:slow | extra_exclude])