diff --git a/src/avlizer_confluent.erl b/src/avlizer_confluent.erl index 0ce0942..b57fc2f 100644 --- a/src/avlizer_confluent.erl +++ b/src/avlizer_confluent.erl @@ -69,6 +69,8 @@ , get_decoder/2 , get_encoder/1 , get_encoder/2 + , get_schema/1 + , get_schema/2 , maybe_download/1 , register_schema/2 , register_schema_with_fp/2 @@ -170,6 +172,23 @@ get_encoder(Ref) -> -spec get_encoder(name(), fp()) -> avro:simple_encoder(). get_encoder(Name, Fp) -> get_encoder({Name, Fp}). +-spec get_schema(string()) -> {ok, binary(), binary()} | {error, any()} . +get_schema(Subject) -> + get_schema(Subject, "latest"). + +-spec get_schema(string(), string() | integer()) -> {ok, binary(), binary()} | {error, any()} . +get_schema(Subject, Version) when is_integer(Version) -> + get_schema(Subject, integer_to_list(Version)); +get_schema(Subject, Version) -> + RegistryURL = get_registry_url(), + URL = RegistryURL ++ "/subjects/" ++ Subject ++ "/versions/" ++ Version, + case httpc_download(URL) of + {ok, #{<<"schema">> := Schema, <<"id">> := Id}} -> + {ok, Id, Schema}; + Error -> + {error, Error} + end. + %% @doc Lookup cache for decoded schema, try to download if not found. -spec maybe_download(ref()) -> avro:avro_type(). maybe_download(Ref) -> @@ -354,20 +373,31 @@ unify_ref(Ref) -> Ref. -spec do_download(string(), ref()) -> {ok, binary()} | {error, any()}. do_download(RegistryURL, RegId) when is_integer(RegId) -> URL = RegistryURL ++ "/schemas/ids/" ++ integer_to_list(RegId), - httpc_download(URL); + case httpc_download(URL) of + {ok, RespMap} -> + #{<<"schema">> := Schema} = RespMap, + {ok, Schema}; + Error -> + Error + end; do_download(RegistryURL, {Name, Fp}) -> Subject = fp_to_subject(Name, Fp), %% fingerprint is unique, hence always version/1 URL = RegistryURL ++ "/subjects/" ++ Subject ++ "/versions/1", - httpc_download(URL). + case httpc_download(URL) of + {ok, RespMap} -> + #{<<"schema">> := Schema} = RespMap, + {ok, Schema}; + Error -> + Error + end. httpc_download(URL) -> case httpc:request(get, {URL, _Headers = []}, [{timeout, ?HTTPC_TIMEOUT}], []) of {ok, {{_, OK, _}, _RspHeaders, RspBody}} when OK >= 200, OK < 300 -> - #{<<"schema">> := SchemaJSON} = - jsone:decode(iolist_to_binary(RspBody)), - {ok, SchemaJSON}; + RespMap = jsone:decode(iolist_to_binary(RspBody)), + {ok, RespMap}; {ok, {{_, Other, _}, _RspHeaders, RspBody}}-> error_logger:error_msg("Failed to download schema from from ~s:\n~s", [URL, RspBody]), diff --git a/test/avlizer_confluent_tests.erl b/test/avlizer_confluent_tests.erl index 801299c..312a8c6 100644 --- a/test/avlizer_confluent_tests.erl +++ b/test/avlizer_confluent_tests.erl @@ -55,6 +55,21 @@ get_encoder_decoder_assign_fp_test_() -> 42 = avlizer_confluent:decode(Decoder, Bin) end). +get_latest_schema_test() -> + with_meck( + fun() -> + {ok, _Id} = avlizer_confluent:register_schema("subj", Schema = test_schema()), + {ok, Schema} = avlizer_confluent:get_schema("subj") + end). + +get_schema_test() -> + with_meck( + fun() -> + {ok, _Id} = avlizer_confluent:register_schema("subj", Schema = test_schema()), + {ok, Schema} = avlizer_confluent:get_schema("subj", 1), + {ok, Schema} = avlizer_confluent:get_schema("subj", "1") + end). + make_encoder_decoder_test_() -> with_meck( fun() -> @@ -116,8 +131,8 @@ setup() -> application:ensure_all_started(?APPLICATION), meck:new(httpc, [passthrough]), meck:expect(httpc, request, - fun(get, {"theurl" ++ _, _}, _, _) -> - Body = test_download(), + fun(get, {"theurl" ++ Endpoint, _}, _, _) -> + Body = test_download(Endpoint), {ok, {{ignore, 200, "OK"}, headers, Body}}; (post, {"theurl" ++ _, _, _, _}, _, _) -> Body = <<"{\"id\": 1}">>, @@ -132,9 +147,12 @@ cleanup(_) -> ok. %% make a fake JSON as if downloaded from schema registry -test_download() -> +test_download("/schemas/ids/" ++ _Id) -> + SchemaJSON = test_schema(), + jsone:encode(#{<<"schema">> => SchemaJSON}); +test_download(_) -> SchemaJSON = test_schema(), - jsone:encode(#{<<"schema">> => SchemaJSON}). + jsone:encode(#{<<"schema">> => SchemaJSON, <<"id">> => <<"1">>}). test_schema() -> avro:encode_schema(test_type()).