From 71486ffaaca19e6d8c65d8717356d8979c06a096 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20L=C3=B3pez=20Viqueira?= Date: Wed, 11 Sep 2019 17:59:51 +0200 Subject: [PATCH 1/4] Fetch already published schemas by Subject and Version (or latest) --- src/avlizer_confluent.erl | 24 ++++++++++++++++++++---- test/avlizer_confluent_tests.erl | 17 ++++++++++++++++- 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/src/avlizer_confluent.erl b/src/avlizer_confluent.erl index 0ce0942..57974d3 100644 --- a/src/avlizer_confluent.erl +++ b/src/avlizer_confluent.erl @@ -69,6 +69,8 @@ , get_decoder/2 , get_encoder/1 , get_encoder/2 + , get_schema/1 + , get_schema/2 , maybe_download/1 , register_schema/2 , register_schema_with_fp/2 @@ -170,6 +172,17 @@ get_encoder(Ref) -> -spec get_encoder(name(), fp()) -> avro:simple_encoder(). get_encoder(Name, Fp) -> get_encoder({Name, Fp}). +get_schema(Subject) -> + get_schema(Subject, "latest"). + +get_schema(Subject, Version) when is_integer(Version)-> + get_schema(Subject, integer_to_list(Version)); +get_schema(Subject, Version) -> + RegistryURL = get_registry_url(), + URL = RegistryURL ++ "/subjects/" ++ Subject ++ "/versions/" ++ Version, + httpc_download(URL). + + %% @doc Lookup cache for decoded schema, try to download if not found. -spec maybe_download(ref()) -> avro:avro_type(). maybe_download(Ref) -> @@ -353,21 +366,24 @@ unify_ref(Ref) -> Ref. -spec do_download(string(), ref()) -> {ok, binary()} | {error, any()}. do_download(RegistryURL, RegId) when is_integer(RegId) -> + RegIdBin = integer_to_binary(RegId), URL = RegistryURL ++ "/schemas/ids/" ++ integer_to_list(RegId), - httpc_download(URL); + {ok, RegIdBin, Schema} = httpc_download(URL), + {ok, Schema}; do_download(RegistryURL, {Name, Fp}) -> Subject = fp_to_subject(Name, Fp), %% fingerprint is unique, hence always version/1 URL = RegistryURL ++ "/subjects/" ++ Subject ++ "/versions/1", - httpc_download(URL). + {ok, _RegIdStr, Schema} = httpc_download(URL), + {ok, Schema}. httpc_download(URL) -> case httpc:request(get, {URL, _Headers = []}, [{timeout, ?HTTPC_TIMEOUT}], []) of {ok, {{_, OK, _}, _RspHeaders, RspBody}} when OK >= 200, OK < 300 -> - #{<<"schema">> := SchemaJSON} = + #{<<"schema">> := SchemaJSON, <<"id">> := Id} = jsone:decode(iolist_to_binary(RspBody)), - {ok, SchemaJSON}; + {ok, Id, SchemaJSON}; {ok, {{_, Other, _}, _RspHeaders, RspBody}}-> error_logger:error_msg("Failed to download schema from from ~s:\n~s", [URL, RspBody]), diff --git a/test/avlizer_confluent_tests.erl b/test/avlizer_confluent_tests.erl index 801299c..ba58471 100644 --- a/test/avlizer_confluent_tests.erl +++ b/test/avlizer_confluent_tests.erl @@ -55,6 +55,21 @@ get_encoder_decoder_assign_fp_test_() -> 42 = avlizer_confluent:decode(Decoder, Bin) end). +get_latest_schema_test() -> + with_meck( + fun() -> + {ok, _Id} = avlizer_confluent:register_schema("subj", Schema = test_schema()), + {ok, Schema} = avlizer_confluent:get_schema("subj") + end). + +get_schema_test() -> + with_meck( + fun() -> + {ok, _Id} = avlizer_confluent:register_schema("subj", Schema = test_schema()), + {ok, Schema} = avlizer_confluent:get_schema("subj", 1), + {ok, Schema} = avlizer_confluent:get_schema("subj", "1") + end). + make_encoder_decoder_test_() -> with_meck( fun() -> @@ -134,7 +149,7 @@ cleanup(_) -> %% make a fake JSON as if downloaded from schema registry test_download() -> SchemaJSON = test_schema(), - jsone:encode(#{<<"schema">> => SchemaJSON}). + jsone:encode(#{<<"schema">> => SchemaJSON, <<"id">> => <<"1">>}). test_schema() -> avro:encode_schema(test_type()). From 2f989361f83a9ca56b707654938926b7a399655c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20L=C3=B3pez=20Viqueira?= Date: Wed, 11 Sep 2019 18:23:13 +0200 Subject: [PATCH 2/4] Fixed bad return --- src/avlizer_confluent.erl | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/avlizer_confluent.erl b/src/avlizer_confluent.erl index 57974d3..9724d1a 100644 --- a/src/avlizer_confluent.erl +++ b/src/avlizer_confluent.erl @@ -368,14 +368,22 @@ unify_ref(Ref) -> Ref. do_download(RegistryURL, RegId) when is_integer(RegId) -> RegIdBin = integer_to_binary(RegId), URL = RegistryURL ++ "/schemas/ids/" ++ integer_to_list(RegId), - {ok, RegIdBin, Schema} = httpc_download(URL), - {ok, Schema}; + case httpc_download(URL) of + {ok, RegIdBin, Schema} -> + {ok, Schema}; + Error -> + Error + end; do_download(RegistryURL, {Name, Fp}) -> Subject = fp_to_subject(Name, Fp), %% fingerprint is unique, hence always version/1 URL = RegistryURL ++ "/subjects/" ++ Subject ++ "/versions/1", - {ok, _RegIdStr, Schema} = httpc_download(URL), - {ok, Schema}. + case httpc_download(URL) of + {ok, _RegIdBin, Schema} -> + {ok, Schema}; + Error -> + Error + end. httpc_download(URL) -> case httpc:request(get, {URL, _Headers = []}, From 60f2b9829b20bb3751abff7841989aee93da911f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20L=C3=B3pez=20Viqueira?= Date: Thu, 12 Sep 2019 12:33:21 +0200 Subject: [PATCH 3/4] feat: Fetch already published schemas by Subject and Version (or latest) Fixed code style and other bugs after code review. --- src/avlizer_confluent.erl | 34 +++++++++++++++++++------------- test/avlizer_confluent_tests.erl | 9 ++++++--- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/src/avlizer_confluent.erl b/src/avlizer_confluent.erl index 9724d1a..08e5224 100644 --- a/src/avlizer_confluent.erl +++ b/src/avlizer_confluent.erl @@ -172,16 +172,22 @@ get_encoder(Ref) -> -spec get_encoder(name(), fp()) -> avro:simple_encoder(). get_encoder(Name, Fp) -> get_encoder({Name, Fp}). +-spec get_schema(string()) -> {ok, binary(), binary()} | {error, any()} . get_schema(Subject) -> get_schema(Subject, "latest"). -get_schema(Subject, Version) when is_integer(Version)-> +-spec get_schema(string(), string()) -> {ok, binary(), binary()} | {error, any()} . +get_schema(Subject, Version) when is_integer(Version) -> get_schema(Subject, integer_to_list(Version)); get_schema(Subject, Version) -> RegistryURL = get_registry_url(), URL = RegistryURL ++ "/subjects/" ++ Subject ++ "/versions/" ++ Version, - httpc_download(URL). - + case httpc_download(URL) of + {ok, #{<<"schema">> := Schema, <<"id">> := Id}} -> + {ok, Id, Schema}; + Error -> + {error, Error} + end. %% @doc Lookup cache for decoded schema, try to download if not found. -spec maybe_download(ref()) -> avro:avro_type(). @@ -366,20 +372,21 @@ unify_ref(Ref) -> Ref. -spec do_download(string(), ref()) -> {ok, binary()} | {error, any()}. do_download(RegistryURL, RegId) when is_integer(RegId) -> - RegIdBin = integer_to_binary(RegId), URL = RegistryURL ++ "/schemas/ids/" ++ integer_to_list(RegId), - case httpc_download(URL) of - {ok, RegIdBin, Schema} -> - {ok, Schema}; - Error -> - Error - end; + case httpc_download(URL) of + {ok, RespMap} -> + #{<<"schema">> := Schema} = RespMap, + {ok, Schema}; + Error -> + Error + end; do_download(RegistryURL, {Name, Fp}) -> Subject = fp_to_subject(Name, Fp), %% fingerprint is unique, hence always version/1 URL = RegistryURL ++ "/subjects/" ++ Subject ++ "/versions/1", case httpc_download(URL) of - {ok, _RegIdBin, Schema} -> + {ok, RespMap} -> + #{<<"schema">> := Schema} = RespMap, {ok, Schema}; Error -> Error @@ -389,9 +396,8 @@ httpc_download(URL) -> case httpc:request(get, {URL, _Headers = []}, [{timeout, ?HTTPC_TIMEOUT}], []) of {ok, {{_, OK, _}, _RspHeaders, RspBody}} when OK >= 200, OK < 300 -> - #{<<"schema">> := SchemaJSON, <<"id">> := Id} = - jsone:decode(iolist_to_binary(RspBody)), - {ok, Id, SchemaJSON}; + RespMap = jsone:decode(iolist_to_binary(RspBody)), + {ok, RespMap}; {ok, {{_, Other, _}, _RspHeaders, RspBody}}-> error_logger:error_msg("Failed to download schema from from ~s:\n~s", [URL, RspBody]), diff --git a/test/avlizer_confluent_tests.erl b/test/avlizer_confluent_tests.erl index ba58471..312a8c6 100644 --- a/test/avlizer_confluent_tests.erl +++ b/test/avlizer_confluent_tests.erl @@ -131,8 +131,8 @@ setup() -> application:ensure_all_started(?APPLICATION), meck:new(httpc, [passthrough]), meck:expect(httpc, request, - fun(get, {"theurl" ++ _, _}, _, _) -> - Body = test_download(), + fun(get, {"theurl" ++ Endpoint, _}, _, _) -> + Body = test_download(Endpoint), {ok, {{ignore, 200, "OK"}, headers, Body}}; (post, {"theurl" ++ _, _, _, _}, _, _) -> Body = <<"{\"id\": 1}">>, @@ -147,7 +147,10 @@ cleanup(_) -> ok. %% make a fake JSON as if downloaded from schema registry -test_download() -> +test_download("/schemas/ids/" ++ _Id) -> + SchemaJSON = test_schema(), + jsone:encode(#{<<"schema">> => SchemaJSON}); +test_download(_) -> SchemaJSON = test_schema(), jsone:encode(#{<<"schema">> => SchemaJSON, <<"id">> => <<"1">>}). From d9ca410d9535b3bfd68263d92485b2e282e2a07a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20L=C3=B3pez=20Viqueira?= Date: Thu, 12 Sep 2019 12:51:31 +0200 Subject: [PATCH 4/4] feat: Fetch already published schemas by Subject and Version (or latest) Fixed spec --- src/avlizer_confluent.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/avlizer_confluent.erl b/src/avlizer_confluent.erl index 08e5224..b57fc2f 100644 --- a/src/avlizer_confluent.erl +++ b/src/avlizer_confluent.erl @@ -176,7 +176,7 @@ get_encoder(Name, Fp) -> get_encoder({Name, Fp}). get_schema(Subject) -> get_schema(Subject, "latest"). --spec get_schema(string(), string()) -> {ok, binary(), binary()} | {error, any()} . +-spec get_schema(string(), string() | integer()) -> {ok, binary(), binary()} | {error, any()} . get_schema(Subject, Version) when is_integer(Version) -> get_schema(Subject, integer_to_list(Version)); get_schema(Subject, Version) ->