Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 126 additions & 68 deletions src/avlizer_confluent.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,68 @@
%%% under the License.
%%%-----------------------------------------------------------------------------

%% @doc This module is a http client of confluent.io schema registry.
%% It implements two data serializers
%% 1: Confluent.io schema tagging convention.
%% see http://docs.confluent.io/
%% current/schema-registry/docs/serializer-formatter.html
%% That is: Encoded objects are tagged with 5-byte schema ID.
%% The first byte is currently always 0 (work as version)
%% And the following 4 bytes is the schema registration ID
%% assigned by schema-registry
%% 2: Schema name (subject) + fingerprint based registration.
%% In this case, the payload is encoded avro (binary or JSON) object
%% itself (i.e. no tagging on the playload) schema name and fingerprint
%% are provided by callers.
%% Caller provided schema name is included in registration subject to
%% lower the risk of schema fingerprint collision, since the fingerprint
%% is essentially a 64-bit hash.
%% Caller is also given liberty to assign or generate fingerprint for
%% the schema before registeration. see `register_schema_with_fp/3'
%% @doc This module encodes and decodes Avro binary data tagged with Confluent
%% Schema Registry integer registry IDs or CRC64 fingerprints.
%%
%% Registration ID is more 'compact' in payload (only 5 bytes overhead).
%% Fingerprint is more static (can be obtained at compile time) comparing
%% to the assigned registration ID which is assigned at runtime, and
%% most likely different across registries.
%% See https://www.confluent.io/confluent-schema-registry
%%
%% It looks up schemas in the Schema Registry using its HTTP API.
%% See https://docs.confluent.io/current/schema-registry/develop/api.html
%%
%% It maintains a cache, allowing encoders and decoders to be looked
%% up using Schema Registry IDs or name + fingerprint.
%%
%% It implements two data serializers:
%%
%% 1. Confluent serialization wire format
%%
%% https://docs.confluent.io/
%% current/schema-registry/serializer-formatter.html#wire-format
%%
%% Avro binary encoded objects are prefixed with a five-byte tag.
%% The first byte indicates the Confluent serialization format
%% version number, currently always 0.
%% The following four bytes encode the integer schema ID as
%% returned from the Schema Registry in network byte order.
%%
%% 2. Fingerprint prefixed data
%%
%% When used without a schema registry, it's common to prefix
%% binary data with a hash of the schema that created it.
%%
%% See https://avro.apache.org/docs/1.8.2/spec.html#schema_fingerprints
%% and https://avro.apache.org/docs/1.8.2/spec.html#single_object_encoding_spec
%%
%% In the Avro "Single-object encoding", the Avro binary data is prefixed
%% with a two-byte marker, C3 01, to show that the message is Avro and
%% uses this single-record format (version 1). That is followed by the
%% the 8-byte little-endian CRC-64-AVRO fingerprint of the
%% object's schema.
%%
%% The CRC64 hash is uncommon, but used because it is shorter
%% than e.g. MD5, while still being good enough to detect
%% collisions. The CRC64 hash is implemented in
%% `erlavro:crc64_fingerprint/1'.
%%
%% This module's cache supports using the name and fingerprint as
%% a key. The name is normally the Avro "full name"
%% (https://avro.apache.org/docs/1.8.2/spec.html#names), e.g.
%% com.example.X. The fingerprint is CRC64 by default. You can
%% also register a name with your own fingerprint using
%% `register_schema_with_fp/3'.
%%
%% The Schema Registry ID is more compact in the payload (only five
%% bytes overhead), but relies on the registry.
%%
%% The fingerprint is more static, and can be obtained at compile
%% time, but it is harder to share with other applications and evolve
Copy link
Copy Markdown
Contributor

@zmstone zmstone Nov 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To share schema by fp, it is not harder than registration ID. Since they are essentially just two numbers.
One may argue that without fp as a part of the encoded payload makes it harder to share (maybe that’s what you meant)
But I would phrase it as more freedom on choosing means to share.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E.g fp as payload meta data (it’s common to have envelops )
Or fp in kafka message header. Etc.
Even we in some cases did double registration: the id returned from fp registration can also be used to share the schema.

%% over time. It's also consistent compared with the schema registry
%% ID, which is assigned by the registry, each register server
%% will in general give a schema a different ID.
%%
%% This library allows consumers to look up the schema on demand
%% from the Schema Registry, using the name + fingerprint as the
%% registry subject name.

-module(avlizer_confluent).
-behaviour(gen_server).
Expand Down Expand Up @@ -81,7 +120,7 @@
, fp/0
]).

%% confluent.io convention magic version
%% confluent.io serialization version magic byte
-define(VSN, 0).
-define(SERVER, ?MODULE).
-define(CACHE, ?MODULE).
Expand All @@ -100,27 +139,30 @@

%%%_* APIs =====================================================================

%% @doc Start a gen_server which owns a ets cache for schema
%% registered in schema registry.
%% Schema registry URL is configured in app env.
%% @doc Start gen_server which owns an ETS cache for schemas
%% registered in Schema Registry.
%%
%% The schema registry URL is configured in the app env.
-spec start_link() -> {ok, pid()} | {error, any()}.
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

%% @doc Stop the gen_server, mostly used for tets.
%% @doc Stop the gen_server
-spec stop() -> ok.
stop() ->
ok = gen_server:call(?SERVER, stop).

%% @doc Make an avro decoder from the given schema reference.
%% This call has an overhead of ets lookup (and maybe a `gen_server'
%% call to perform a http download) to fetch the avro schema, hence
%% the caller should keep the built decoder function and re-use it
%% for the same reference.
%% @doc Make Avro decoder for the given schema reference.
%%
%% This call has the overhead of an ETS lookup and potentially a
%% `gen_server' call to fetch the Avro schema via HTTP. The caller
%% should keep the result and reuse it for future requests for the
%% same reference.
%%
%% The decoder made by this API has better performance than the one
%% returned from `get_decoder/1' which has the lookup overhead included
%% in the decoder. It may not be performant when decoding a large
%% number (or a stream) of objects having the same reference.
%% returned from `get_decoder/1', which has the lookup overhead
%% included in the decoder. It may not be performant when decoding a
%% large number (or a stream) of objects having the same reference.
-spec make_decoder(ref()) -> avro:simple_decoder().
make_decoder(Ref) ->
Schema = maybe_download(Ref),
Expand All @@ -129,15 +171,17 @@ make_decoder(Ref) ->
-spec make_decoder(name(), fp()) -> avro:simple_decoder().
make_decoder(Name, Fp) -> make_decoder({Name, Fp}).

%% @doc Make an avro encoder from the given schema reference.
%% This call has an overhead of ets lookup (and maybe a `gen_server'
%% call to perform a http download) to fetch the avro schema, hence
%% the caller should keep the built decoder function and re-use it
%% for the same reference.
%% @doc Make Avro encoder for the given schema reference.
%%
%% This call has the overhead of an ETS lookup and potentially a
%% `gen_server' call to fetch the Avro schema via HTTP. The caller
%% should keep the result and reuse it for future requests for the
%% same reference.
%%
%% The encoder made by this API has better performance than the one
%% returned from `get_encoder/1' which has the lookup overhead included
%% in the encoder. It may not be performant when encoding a large
%% number of objects in a tight loop.
%% returned from `get_encoder/1', which has the lookup overhead
%% included in the encoder. It may not be performant when encoding a
%% large number of objects in a tight loop.
-spec make_encoder(ref()) -> avro:simple_encoder().
make_encoder(Ref) ->
Schema = maybe_download(Ref),
Expand All @@ -146,10 +190,10 @@ make_encoder(Ref) ->
-spec make_encoder(name(), fp()) -> avro:simple_encoder().
make_encoder(Name, Fp) -> make_encoder({Name, Fp}).

%% @doc Get avro decoder by lookup already decoded avro schema
%% from cache, and make a decoder from it.
%% Schema lookup is performed inside the decoder function for each call,
%% use `make_enodcer/1' for better performance.
%% @doc Get Avro decoder from cached, already-decoded Avro schema.
%%
%% Schema lookup is performed inside the decoder function for each
%% call, use `make_encoder/1' for better performance.
-spec get_decoder(ref()) -> avro:simple_decoder().
get_decoder(Ref) ->
Decoder = avro:make_decoder(?LKUP(Ref), [{encoding, avro_binary}]),
Expand All @@ -158,10 +202,10 @@ get_decoder(Ref) ->
-spec get_decoder(name(), fp()) -> avro:simple_decoder().
get_decoder(Name, Fp) -> get_decoder({Name, Fp}).

%% @doc Get avro decoder by lookup already decode avro schema
%% from cache, and make a encoder from it.
%% Schema lookup is performed inside the encoder function for each call,
%% use `make_enodcer/1' for better performance.
%% @doc Get Avro encoder from cached, already-decoded Avro schema.
%%
%% Schema lookup is performed inside the encoder function for each
%% call, use `make_encoder/1' for better performance.
-spec get_encoder(ref()) -> avro:simple_encoder().
get_encoder(Ref) ->
Encoder = avro:make_encoder(?LKUP(Ref), [{encoding, avro_binary}]),
Expand All @@ -170,7 +214,7 @@ get_encoder(Ref) ->
-spec get_encoder(name(), fp()) -> avro:simple_encoder().
get_encoder(Name, Fp) -> get_encoder({Name, Fp}).

%% @doc Lookup cache for decoded schema, try to download if not found.
%% @doc Lookup schema in cache and download if not found.
-spec maybe_download(ref()) -> avro:avro_type().
maybe_download(Ref) ->
case lookup_cache(Ref) of
Expand All @@ -185,7 +229,7 @@ maybe_download(Ref) ->
Type
end.

%% @doc Register a schema.
%% @doc Register schema.
-spec register_schema(string(), binary() | avro:type()) ->
{ok, regid()} | {error, any()}.
register_schema(Subject, JSON) when is_binary(JSON) ->
Expand All @@ -194,8 +238,9 @@ register_schema(Subject, Schema) ->
JSON = avro:encode_schema(Schema),
register_schema(Subject, JSON).

%% @doc Register schema with name + fingerprint.
%% crc64 fingerprint is returned.
%% @doc Register Avro schema with name + fingerprint.
%%
%% @returns crc64 fingerprint
-spec register_schema_with_fp(string() | binary(), binary() | avro:type()) ->
{ok, fp()} | {error, any()}.
register_schema_with_fp(Name, JSON) when is_binary(JSON) ->
Expand All @@ -208,9 +253,10 @@ register_schema_with_fp(Name, Schema) ->
JSON = avro:encode_schema(Schema),
register_schema_with_fp(Name, JSON).

%% @doc Register schema with name + fingerprint.
%% Different from register_schema_with_fp/2, caller is given the liberty to
%% generate/assign schema fingerprint.
%% @doc Register Avro schema with name and specified fingerprint.
%%
%% Unlike `register_schema_with_fp/2', caller can generate/assign
%% the schema fingerprint.
-spec register_schema_with_fp(string() | binary(), fp(),
binary() | avro:type()) -> ok | {error, any()}.
register_schema_with_fp(Name, Fp, JSON) when is_binary(JSON) ->
Expand All @@ -226,19 +272,19 @@ register_schema_with_fp(Name, Fp, JSON) when is_binary(JSON) ->
false -> DoIt()
catch
error : badarg ->
%% no ets
%% ETS error
DoIt()
end;
register_schema_with_fp(Name, Fp, Schema) ->
JSON = avro:encode_schema(Schema),
register_schema_with_fp(Name, Fp, JSON).

%% @doc Tag binary data with schema registration ID.
%% @doc Tag Avro binary data with Schema Registry integer ID.
-spec tag_data(regid(), binary()) -> binary().
tag_data(SchemaId, AvroBinary) ->
iolist_to_binary([<<?VSN:8, SchemaId:32/unsigned-integer>>, AvroBinary]).

%% @doc Get schema registeration ID and avro binary from tagged data.
%% @doc Split integer schema registration ID and Avro binary from tagged data.
-spec untag_data(binary()) -> {regid(), binary()}.
untag_data(<<?VSN:8, RegId:32/unsigned-integer, Body/binary>>) ->
{RegId, Body}.
Expand All @@ -249,25 +295,25 @@ decode(Bin) ->
{RegId, Payload} = untag_data(Bin),
decode(RegId, Payload).

%% @doc Decode untagged payload or with a given schema reference.
%% @doc Decode bare Avro binary, looking up decoder by ref() or specifying it.
-spec decode(ref() | avro:simple_decoder(), binary()) -> avro:out().
decode(Ref, Bin) when ?IS_REF(Ref) ->
decode(get_decoder(Ref), Bin);
decode(Decoder, Bin) when is_function(Decoder) ->
Decoder(Bin).

%% @doc Decode avro binary with given schema name and fingerprint.
%% @doc Decode bare Avro binary, looking up decoder by name and fingerprint.
-spec decode(name(), fp(), binary()) -> avro:out().
decode(Name, Fp, Bin) -> decode({Name, Fp}, Bin).

%% @doc Encoded avro-binary with schema tag.
%% @doc Encode Avro data to binary, looking up encoder by ref() or specifying it
-spec encode(ref() | avro:simple_encoder(), avro:in()) -> binary().
encode(Ref, Input) when ?IS_REF(Ref) ->
encode(get_encoder(Ref), Input);
encode(Encoder, Input) when is_function(Encoder) ->
iolist_to_binary(Encoder(Input)).

%% @doc Encode avro-binary with schema name + fingerprint.
%% @doc Encode Avro data to binary, looking up encoder by name and fingerprint.
-spec encode(name(), fp(), avro:in()) -> binary().
encode(Name, Fp, Input) -> encode({Name, Fp}, Input).

Expand Down Expand Up @@ -311,8 +357,13 @@ get_registry_url() ->
download(Ref) ->
gen_server:call(?SERVER, {download, Ref}, infinity).

%% @doc Download schema from registry and insert into cache.
handle_download(Ref) ->
%% lookup again to avoid concurrent callers re-download
%% This is normally called after checking the cache.
%% The gen_server serializes multiple process calls so that they
%% don't all hit the registry at once. A previous caller may have
%% already done it successfully, so check the cache again
%% before trying.
case lookup_cache(Ref) of
{ok, Schema} ->
{ok, Schema};
Expand Down Expand Up @@ -345,6 +396,7 @@ lookup_cache(Ref0) ->
[{Ref, Schema}] -> {ok, Schema}
end.

%% Convert cache reference into standard form
unify_ref({Name, Fp}) when is_list(Name) ->
unify_ref({iolist_to_binary(Name), Fp});
unify_ref({Name, Fp}) when is_list(Fp) ->
Expand All @@ -357,10 +409,11 @@ do_download(RegistryURL, RegId) when is_integer(RegId) ->
httpc_download(URL);
do_download(RegistryURL, {Name, Fp}) ->
Subject = fp_to_subject(Name, Fp),
%% fingerprint is unique, hence always version/1
%% Fingerprint is unique, hence always version 1
URL = RegistryURL ++ "/subjects/" ++ Subject ++ "/versions/1",
httpc_download(URL).

%% Call Schema Registry REST API to get schema JSON
httpc_download(URL) ->
case httpc:request(get, {URL, _Headers = []},
[{timeout, ?HTTPC_TIMEOUT}], []) of
Expand All @@ -378,6 +431,10 @@ httpc_download(URL) ->
Other
end.

%% Register schema with Schema Registry under subject.
%%
%% Name + fingerprint is converted into a subject.
%%
%% Equivalent cURL command:
%% curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
%% --data '{"schema": "{\"type\": \"string\"}"}' \
Expand Down Expand Up @@ -409,11 +466,12 @@ do_register_schema(Subject, SchemaJSON) ->
end.

%% Make schema registry POST request body.
%% which is: the schema JSON is escaped and wrapped by another JSON object.
%% This is the escaped schema JSON wrapped in another JSON object.
-spec make_schema_reg_req_body(binary()) -> binary().
make_schema_reg_req_body(SchemaJSON) ->
jsone:encode(#{<<"schema">> => SchemaJSON}).

%% Make Schema Registry subject from name + fingerprint
fp_to_subject(Name, Fp) ->
FpStr = case is_integer(Fp) of
true -> integer_to_list(Fp);
Expand Down