Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions src/avlizer_confluent.erl
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
, get_decoder/2
, get_encoder/1
, get_encoder/2
, get_schema/1
, get_schema/2
, maybe_download/1
, register_schema/2
, register_schema_with_fp/2
Expand Down Expand Up @@ -170,6 +172,23 @@ get_encoder(Ref) ->
-spec get_encoder(name(), fp()) -> avro:simple_encoder().
get_encoder(Name, Fp) -> get_encoder({Name, Fp}).

-spec get_schema(string()) -> {ok, binary(), binary()} | {error, any()} .
get_schema(Subject) ->
get_schema(Subject, "latest").

-spec get_schema(string(), string() | integer()) -> {ok, binary(), binary()} | {error, any()} .
get_schema(Subject, Version) when is_integer(Version) ->
get_schema(Subject, integer_to_list(Version));
get_schema(Subject, Version) ->
RegistryURL = get_registry_url(),
URL = RegistryURL ++ "/subjects/" ++ Subject ++ "/versions/" ++ Version,
case httpc_download(URL) of
{ok, #{<<"schema">> := Schema, <<"id">> := Id}} ->
{ok, Id, Schema};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's more useful to decode it to regid() (change the type spec also).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good one

Error ->
{error, Error}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just Error to return here

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

end.

%% @doc Lookup cache for decoded schema, try to download if not found.
-spec maybe_download(ref()) -> avro:avro_type().
maybe_download(Ref) ->
Expand Down Expand Up @@ -354,20 +373,31 @@ unify_ref(Ref) -> Ref.
-spec do_download(string(), ref()) -> {ok, binary()} | {error, any()}.
do_download(RegistryURL, RegId) when is_integer(RegId) ->
URL = RegistryURL ++ "/schemas/ids/" ++ integer_to_list(RegId),
httpc_download(URL);
case httpc_download(URL) of
{ok, RespMap} ->
#{<<"schema">> := Schema} = RespMap,
{ok, Schema};
Error ->
Error
end;
do_download(RegistryURL, {Name, Fp}) ->
Subject = fp_to_subject(Name, Fp),
%% fingerprint is unique, hence always version/1
URL = RegistryURL ++ "/subjects/" ++ Subject ++ "/versions/1",
httpc_download(URL).
case httpc_download(URL) of
{ok, RespMap} ->
#{<<"schema">> := Schema} = RespMap,
{ok, Schema};
Error ->
Error
end.

httpc_download(URL) ->
case httpc:request(get, {URL, _Headers = []},
[{timeout, ?HTTPC_TIMEOUT}], []) of
{ok, {{_, OK, _}, _RspHeaders, RspBody}} when OK >= 200, OK < 300 ->
#{<<"schema">> := SchemaJSON} =
jsone:decode(iolist_to_binary(RspBody)),
{ok, SchemaJSON};
RespMap = jsone:decode(iolist_to_binary(RspBody)),
{ok, RespMap};
{ok, {{_, Other, _}, _RspHeaders, RspBody}}->
error_logger:error_msg("Failed to download schema from from ~s:\n~s",
[URL, RspBody]),
Expand Down
26 changes: 22 additions & 4 deletions test/avlizer_confluent_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,21 @@ get_encoder_decoder_assign_fp_test_() ->
42 = avlizer_confluent:decode(Decoder, Bin)
end).

get_latest_schema_test() ->
with_meck(
fun() ->
{ok, _Id} = avlizer_confluent:register_schema("subj", Schema = test_schema()),
{ok, Schema} = avlizer_confluent:get_schema("subj")
end).

get_schema_test() ->
with_meck(
fun() ->
{ok, _Id} = avlizer_confluent:register_schema("subj", Schema = test_schema()),
{ok, Schema} = avlizer_confluent:get_schema("subj", 1),
{ok, Schema} = avlizer_confluent:get_schema("subj", "1")
end).

make_encoder_decoder_test_() ->
with_meck(
fun() ->
Expand Down Expand Up @@ -116,8 +131,8 @@ setup() ->
application:ensure_all_started(?APPLICATION),
meck:new(httpc, [passthrough]),
meck:expect(httpc, request,
fun(get, {"theurl" ++ _, _}, _, _) ->
Body = test_download(),
fun(get, {"theurl" ++ Endpoint, _}, _, _) ->
Body = test_download(Endpoint),
{ok, {{ignore, 200, "OK"}, headers, Body}};
(post, {"theurl" ++ _, _, _, _}, _, _) ->
Body = <<"{\"id\": 1}">>,
Expand All @@ -132,9 +147,12 @@ cleanup(_) ->
ok.

%% make a fake JSON as if downloaded from schema registry
test_download() ->
test_download("/schemas/ids/" ++ _Id) ->
SchemaJSON = test_schema(),
jsone:encode(#{<<"schema">> => SchemaJSON});
test_download(_) ->
SchemaJSON = test_schema(),
jsone:encode(#{<<"schema">> => SchemaJSON}).
jsone:encode(#{<<"schema">> => SchemaJSON, <<"id">> => <<"1">>}).

test_schema() ->
avro:encode_schema(test_type()).
Expand Down