From 2a71b116156304382dc3e805ab838bdfa6ea376d Mon Sep 17 00:00:00 2001 From: Jake Morrison Date: Sat, 26 Oct 2019 14:13:51 +0800 Subject: [PATCH 1/2] Improve documentation --- src/avlizer_confluent.erl | 194 +++++++++++++++++++++++++------------- 1 file changed, 126 insertions(+), 68 deletions(-) diff --git a/src/avlizer_confluent.erl b/src/avlizer_confluent.erl index 0ce0942..f410468 100644 --- a/src/avlizer_confluent.erl +++ b/src/avlizer_confluent.erl @@ -17,29 +17,68 @@ %%% under the License. %%%----------------------------------------------------------------------------- -%% @doc This module is a http client of confluent.io schema registry. -%% It implements two data serializers -%% 1: Confluent.io schema tagging convention. -%% see http://docs.confluent.io/ -%% current/schema-registry/docs/serializer-formatter.html -%% That is: Encoded objects are tagged with 5-byte schema ID. -%% The first byte is currently always 0 (work as version) -%% And the following 4 bytes is the schema registration ID -%% assigned by schema-registry -%% 2: Schema name (subject) + fingerprint based registration. -%% In this case, the payload is encoded avro (binary or JSON) object -%% itself (i.e. no tagging on the playload) schema name and fingerprint -%% are provided by callers. -%% Caller provided schema name is included in registration subject to -%% lower the risk of schema fingerprint collision, since the fingerprint -%% is essentially a 64-bit hash. -%% Caller is also given liberty to assign or generate fingerprint for -%% the schema before registeration. see `register_schema_with_fp/3' +%% @doc This module encodes and decodes Avro binary data tagged with Confluent +%% Schema Registry integer registry IDs or CRC64 fingerprints. %% -%% Registration ID is more 'compact' in payload (only 5 bytes overhead). -%% Fingerprint is more static (can be obtained at compile time) comparing -%% to the assigned registration ID which is assigned at runtime, and -%% most likely different across registries. +%% See https://www.confluent.io/confluent-schema-registry +%% +%% It looks up schemas in the Schema Registry using its HTTP API. +%% See https://docs.confluent.io/current/schema-registry/develop/api.html +%% +%% It maintains a cache, allowing encoders and decoders to be looked +%% up using Schema Registry IDs or name + fingerprint. +%% +%% It implements two data serializers: +%% +%% 1. Confluent serialization wire format +%% +%% https://docs.confluent.io/ +%% current/schema-registry/serializer-formatter.html#wire-format +%% +%% Avro binary encoded objects are prefixed with a five-byte tag. +%% The first byte indicates the Confluent serialization format +%% version number, currently always 0. +%% The following four bytes encode the integer schema ID as +%% returned from the Schema Registry in network byte order. +%% +%% 2. Fingerprint prefixed data +%% +%% When used without a schema registry, it's common to prefix +%% binary data with a hash of the schema that created it. +%% +%% See https://avro.apache.org/docs/1.8.2/spec.html#schema_fingerprints +%% and https://avro.apache.org/docs/1.8.2/spec.html#single_object_encoding_spec +%% +%% In the Avro "Single-object encoding", the Avro binary data is prefixed +%% with a two-byte marker, C3 01, to show that the message is Avro and +%% uses this single-record format (version 1). That is followed by the +%% the 8-byte little-endian CRC-64-AVRO fingerprint of the +%% object's schema. +%% +%% The CRC64 hash is uncommon, but used because it is shorter +%% than e.g. MD5, while still being good enough to detect +%% collisions. The CRC64 hash is implemented in +%% `erlavro:crc64_fingerprint/1'. +%% +%% This module's cache supports using the name and fingerprint as +%% a key. The name is normally the Avro "full name" +%% (https://avro.apache.org/docs/1.8.2/spec.html#names), e.g. +%% com.example.X. The fingerprint is CRC64 by default. You can +%% also register a name with your own fingerprint using +%% `register_schema_with_fp/3'. +%% +%% The Schema Registry ID is more compact in the payload (only five +%% bytes overhead), but relies on the registry. +%% +%% The fingerprint is more static, and can be obtained at compile +%% time, but it is harder to share with other applications and evolve +%% over time. It's also consistent compared with the schema registry +%% ID, which is assigned by the registry, each register server +%% will in general give a schema a different ID. +%% +%% This library allows consumers to look up the schema on demand +%% from the Schema Registry, using the name + fingerprint as the +%% registry subject name. -module(avlizer_confluent). -behaviour(gen_server). @@ -81,7 +120,7 @@ , fp/0 ]). -%% confluent.io convention magic version +%% confluent.io serialization version magic byte -define(VSN, 0). -define(SERVER, ?MODULE). -define(CACHE, ?MODULE). @@ -100,27 +139,30 @@ %%%_* APIs ===================================================================== -%% @doc Start a gen_server which owns a ets cache for schema -%% registered in schema registry. -%% Schema registry URL is configured in app env. +%% @doc Start gen_server which owns an ETS cache for schemas +%% registered in Schema Registry. +%% +%% The schema registry URL is configured in the app env. -spec start_link() -> {ok, pid()} | {error, any()}. start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -%% @doc Stop the gen_server, mostly used for tets. +%% @doc Stop the gen_server -spec stop() -> ok. stop() -> ok = gen_server:call(?SERVER, stop). -%% @doc Make an avro decoder from the given schema reference. -%% This call has an overhead of ets lookup (and maybe a `gen_server' -%% call to perform a http download) to fetch the avro schema, hence -%% the caller should keep the built decoder function and re-use it -%% for the same reference. +%% @doc Make Avro decoder for the given schema reference. +%% +%% This call has the overhead of an ETS lookup and potentially a +%% `gen_server' call to fetch the Avro schema via HTTP. The caller +%% should keep the result and reuse it for future requests for the +%% same reference. +%% %% The decoder made by this API has better performance than the one -%% returned from `get_decoder/1' which has the lookup overhead included -%% in the decoder. It may not be performant when decoding a large -%% number (or a stream) of objects having the same reference. +%% returned from `get_decoder/1', which has the lookup overhead +%% included in the decoder. It may not be performant when decoding a +%% large number (or a stream) of objects having the same reference. -spec make_decoder(ref()) -> avro:simple_decoder(). make_decoder(Ref) -> Schema = maybe_download(Ref), @@ -129,15 +171,17 @@ make_decoder(Ref) -> -spec make_decoder(name(), fp()) -> avro:simple_decoder(). make_decoder(Name, Fp) -> make_decoder({Name, Fp}). -%% @doc Make an avro encoder from the given schema reference. -%% This call has an overhead of ets lookup (and maybe a `gen_server' -%% call to perform a http download) to fetch the avro schema, hence -%% the caller should keep the built decoder function and re-use it -%% for the same reference. +%% @doc Make Avro encoder for the given schema reference. +%% +%% This call has the overhead of an ETS lookup and potentially a +%% `gen_server' call to fetch the Avro schema via HTTP. The caller +%% should keep the result and reuse it for future requests for the +%% same reference. +%% %% The encoder made by this API has better performance than the one -%% returned from `get_encoder/1' which has the lookup overhead included -%% in the encoder. It may not be performant when encoding a large -%% number of objects in a tight loop. +%% returned from `get_encoder/1', which has the lookup overhead +%% included in the encoder. It may not be performant when encoding a +%% large number of objects in a tight loop. -spec make_encoder(ref()) -> avro:simple_encoder(). make_encoder(Ref) -> Schema = maybe_download(Ref), @@ -146,10 +190,10 @@ make_encoder(Ref) -> -spec make_encoder(name(), fp()) -> avro:simple_encoder(). make_encoder(Name, Fp) -> make_encoder({Name, Fp}). -%% @doc Get avro decoder by lookup already decoded avro schema -%% from cache, and make a decoder from it. -%% Schema lookup is performed inside the decoder function for each call, -%% use `make_enodcer/1' for better performance. +%% @doc Get Avro decoder from cached, already-decoded Avro schema. +%% +%% Schema lookup is performed inside the decoder function for each +%% call, use `make_encoder/1' for better performance. -spec get_decoder(ref()) -> avro:simple_decoder(). get_decoder(Ref) -> Decoder = avro:make_decoder(?LKUP(Ref), [{encoding, avro_binary}]), @@ -158,10 +202,10 @@ get_decoder(Ref) -> -spec get_decoder(name(), fp()) -> avro:simple_decoder(). get_decoder(Name, Fp) -> get_decoder({Name, Fp}). -%% @doc Get avro decoder by lookup already decode avro schema -%% from cache, and make a encoder from it. -%% Schema lookup is performed inside the encoder function for each call, -%% use `make_enodcer/1' for better performance. +%% @doc Get Avro encoder from cached, already-decoded Avro schema. +%% +%% Schema lookup is performed inside the encoder function for each +%% call, use `make_encoder/1' for better performance. -spec get_encoder(ref()) -> avro:simple_encoder(). get_encoder(Ref) -> Encoder = avro:make_encoder(?LKUP(Ref), [{encoding, avro_binary}]), @@ -170,7 +214,7 @@ get_encoder(Ref) -> -spec get_encoder(name(), fp()) -> avro:simple_encoder(). get_encoder(Name, Fp) -> get_encoder({Name, Fp}). -%% @doc Lookup cache for decoded schema, try to download if not found. +%% @doc Lookup schema in cache and download if not found. -spec maybe_download(ref()) -> avro:avro_type(). maybe_download(Ref) -> case lookup_cache(Ref) of @@ -185,7 +229,7 @@ maybe_download(Ref) -> Type end. -%% @doc Register a schema. +%% @doc Register schema. -spec register_schema(string(), binary() | avro:type()) -> {ok, regid()} | {error, any()}. register_schema(Subject, JSON) when is_binary(JSON) -> @@ -194,8 +238,9 @@ register_schema(Subject, Schema) -> JSON = avro:encode_schema(Schema), register_schema(Subject, JSON). -%% @doc Register schema with name + fingerprint. -%% crc64 fingerprint is returned. +%% @doc Register Avro schema with name + fingerprint. +%% +%% @returns crc64 fingerprint -spec register_schema_with_fp(string() | binary(), binary() | avro:type()) -> {ok, fp()} | {error, any()}. register_schema_with_fp(Name, JSON) when is_binary(JSON) -> @@ -208,9 +253,10 @@ register_schema_with_fp(Name, Schema) -> JSON = avro:encode_schema(Schema), register_schema_with_fp(Name, JSON). -%% @doc Register schema with name + fingerprint. -%% Different from register_schema_with_fp/2, caller is given the liberty to -%% generate/assign schema fingerprint. +%% @doc Register Avro schema with name and specified fingerprint. +%% +%% Unlike `register_schema_with_fp/2', caller can generate/assign +%% the schema fingerprint. -spec register_schema_with_fp(string() | binary(), fp(), binary() | avro:type()) -> ok | {error, any()}. register_schema_with_fp(Name, Fp, JSON) when is_binary(JSON) -> @@ -226,19 +272,19 @@ register_schema_with_fp(Name, Fp, JSON) when is_binary(JSON) -> false -> DoIt() catch error : badarg -> - %% no ets + %% ETS error DoIt() end; register_schema_with_fp(Name, Fp, Schema) -> JSON = avro:encode_schema(Schema), register_schema_with_fp(Name, Fp, JSON). -%% @doc Tag binary data with schema registration ID. +%% @doc Tag Avro binary data with Schema Registry integer ID. -spec tag_data(regid(), binary()) -> binary(). tag_data(SchemaId, AvroBinary) -> iolist_to_binary([<>, AvroBinary]). -%% @doc Get schema registeration ID and avro binary from tagged data. +%% @doc Split integer schema registration ID and Avro binary from tagged data. -spec untag_data(binary()) -> {regid(), binary()}. untag_data(<>) -> {RegId, Body}. @@ -249,25 +295,25 @@ decode(Bin) -> {RegId, Payload} = untag_data(Bin), decode(RegId, Payload). -%% @doc Decode untagged payload or with a given schema reference. +%% @doc Decode bare Avro binary, looking up decoder by ref() or specifying it. -spec decode(ref() | avro:simple_decoder(), binary()) -> avro:out(). decode(Ref, Bin) when ?IS_REF(Ref) -> decode(get_decoder(Ref), Bin); decode(Decoder, Bin) when is_function(Decoder) -> Decoder(Bin). -%% @doc Decode avro binary with given schema name and fingerprint. +%% @doc Decode bare Avro binary, looking up decoder by name and fingerprint. -spec decode(name(), fp(), binary()) -> avro:out(). decode(Name, Fp, Bin) -> decode({Name, Fp}, Bin). -%% @doc Encoded avro-binary with schema tag. +%% @doc Encode Avro data to binary, looking up encoder by ref() or specifying it -spec encode(ref() | avro:simple_encoder(), avro:in()) -> binary(). encode(Ref, Input) when ?IS_REF(Ref) -> encode(get_encoder(Ref), Input); encode(Encoder, Input) when is_function(Encoder) -> iolist_to_binary(Encoder(Input)). -%% @doc Encode avro-binary with schema name + fingerprint. +%% @doc Encode Avro data to binary, looking up encoder by name and fingerprint. -spec encode(name(), fp(), avro:in()) -> binary(). encode(Name, Fp, Input) -> encode({Name, Fp}, Input). @@ -311,8 +357,13 @@ get_registry_url() -> download(Ref) -> gen_server:call(?SERVER, {download, Ref}, infinity). +%% Download schema from registry and insert into cache. handle_download(Ref) -> - %% lookup again to avoid concurrent callers re-download + %% This is normally called after checking the cache. + %% The gen_server serializes multiple process calls so that they + %% don't all hit the registry at once. A previous caller may have + %% already done it successfully, so check the cache again + %% before trying. case lookup_cache(Ref) of {ok, Schema} -> {ok, Schema}; @@ -345,6 +396,7 @@ lookup_cache(Ref0) -> [{Ref, Schema}] -> {ok, Schema} end. +%% Convert cache reference into standard form unify_ref({Name, Fp}) when is_list(Name) -> unify_ref({iolist_to_binary(Name), Fp}); unify_ref({Name, Fp}) when is_list(Fp) -> @@ -357,10 +409,11 @@ do_download(RegistryURL, RegId) when is_integer(RegId) -> httpc_download(URL); do_download(RegistryURL, {Name, Fp}) -> Subject = fp_to_subject(Name, Fp), - %% fingerprint is unique, hence always version/1 + %% Fingerprint is unique, hence always version 1 URL = RegistryURL ++ "/subjects/" ++ Subject ++ "/versions/1", httpc_download(URL). +%% Call Schema Registry REST API to get schema JSON httpc_download(URL) -> case httpc:request(get, {URL, _Headers = []}, [{timeout, ?HTTPC_TIMEOUT}], []) of @@ -378,6 +431,10 @@ httpc_download(URL) -> Other end. +%% Register schema with Schema Registry under subject. +%% +%% Name + fingerprint is converted into a subject. +%% %% Equivalent cURL command: %% curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \ %% --data '{"schema": "{\"type\": \"string\"}"}' \ @@ -409,11 +466,12 @@ do_register_schema(Subject, SchemaJSON) -> end. %% Make schema registry POST request body. -%% which is: the schema JSON is escaped and wrapped by another JSON object. +%% This is the escaped schema JSON wrapped in another JSON object. -spec make_schema_reg_req_body(binary()) -> binary(). make_schema_reg_req_body(SchemaJSON) -> jsone:encode(#{<<"schema">> => SchemaJSON}). +%% Make Schema Registry subject from name + fingerprint fp_to_subject(Name, Fp) -> FpStr = case is_integer(Fp) of true -> integer_to_list(Fp); From acb7614a6da93003dfa9b0a68da98bd1ffabc318 Mon Sep 17 00:00:00 2001 From: Jake Morrison Date: Mon, 4 Nov 2019 12:31:18 +0800 Subject: [PATCH 2/2] Add @doc tag --- src/avlizer_confluent.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/avlizer_confluent.erl b/src/avlizer_confluent.erl index f410468..40afed8 100644 --- a/src/avlizer_confluent.erl +++ b/src/avlizer_confluent.erl @@ -357,7 +357,7 @@ get_registry_url() -> download(Ref) -> gen_server:call(?SERVER, {download, Ref}, infinity). -%% Download schema from registry and insert into cache. +%% @doc Download schema from registry and insert into cache. handle_download(Ref) -> %% This is normally called after checking the cache. %% The gen_server serializes multiple process calls so that they