Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/osiris.erl
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@
{filter_value(), iodata() | batch()}.
-type features() :: #{committed_offset_calculate => boolean()}.

%% returned when reading
-type entry() :: binary() | batch().
%% returned when reading (entry_iterator when using entry_iterator in iterator_next/2)
-type entry() :: binary() | batch() | osiris_log:entry_iterator().
-type reader_options() :: #{transport => tcp | ssl,
chunk_selector => all | user_data,
filter_spec => osiris_bloom:filter_spec(),
Expand Down
138 changes: 126 additions & 12 deletions src/osiris_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
chunk_iterator/2,
chunk_iterator/3,
iterator_next/1,
iterator_next/2,
iterator_next/3,
entry_iterator_read/2,
entry_iterator_skip/2,
read_chunk/1,
read_chunk_parsed/1,
read_chunk_parsed/2,
Expand Down Expand Up @@ -479,6 +483,8 @@

-export_type([state/0,
chunk_iterator/0,
entry_iterator/0,
iterator_mode/0,
range/0,
config/0,
counter_spec/0,
Expand Down Expand Up @@ -1479,6 +1485,27 @@ read_header(#?MODULE{cfg = #cfg{}} = State0) ->
next_record_pos :: non_neg_integer(),
ra :: #ra{}}).
-opaque chunk_iterator() :: #iterator{}.

%% Entry iterator: streams a simple entry from file without loading it all.
%% Used when iterator_next/3 is called with mode entry_iterator.
%%
%% Fields:
%% fd - Open file descriptor of the segment file being read
%% buffer - Bytes already read from the entry and buffered for the next
%% entry_iterator_read; avoids re-reading from file
%% file_pos - Byte offset in the file where the next unread byte of the
%% entry lives (buffer length is not included in file_len)
%% file_len - Number of bytes of the entry still in the file (from file_pos
%% to end of entry); 0 when the whole entry is in buffer
%% size - Total size in bytes of the entry (payload); immutable
-record(entry_iterator,
{fd :: file:fd(),
buffer = <<>> :: binary(),
file_pos :: non_neg_integer(),
file_len :: non_neg_integer(),
size :: non_neg_integer()}).
-opaque entry_iterator() :: #entry_iterator{}.

-define(REC_MATCH_SIMPLE(Len, Rem),
<<0:1, Len:31/unsigned, Rem/binary>>).
-define(REC_MATCH_SUBBATCH(CompType, NumRec, UncompLen, Len, Rem),
Expand Down Expand Up @@ -1573,35 +1600,67 @@ chunk_iterator(#?MODULE{cfg = #cfg{},
-spec iterator_next(chunk_iterator()) ->
end_of_chunk | {offset_entry(), chunk_iterator()}.
iterator_next(#iterator{} = I) ->
iterator_next(I, 1).
iterator_next(I, 1, full_entry).

-spec iterator_next(chunk_iterator(), CreditHint :: non_neg_integer()) ->
end_of_chunk | {offset_entry(), chunk_iterator()}.
iterator_next(#iterator{num_left = 0}, _CreditHint) ->
iterator_next(#iterator{} = I, CreditHint) ->
iterator_next(I, CreditHint, full_entry).

-type iterator_mode() :: full_entry | entry_iterator.

-spec iterator_next(chunk_iterator(), CreditHint :: non_neg_integer(), Mode :: iterator_mode()) ->
end_of_chunk | {offset_entry(), chunk_iterator()}.
iterator_next(#iterator{num_left = 0}, _CreditHint, _Mode) ->
end_of_chunk;
iterator_next(#iterator{fd = Fd,
next_offset = NextOffs,
num_left = Num,
data_left = DataLeft,
data_cache = ?REC_MATCH_SIMPLE(Len, Rem0),
next_record_pos = Pos0,
ra = Ra} = I0, CreditHint) ->
ra = Ra} = I0, CreditHint, Mode) ->
Pos = Pos0 + ?REC_HDR_SZ_SIMPLE_B,
case Rem0 of
<<Record:Len/binary, Rem/binary>> ->
case {Rem0, Mode} of
{<<Record:Len/binary, Rem/binary>>, entry_iterator} ->
EntryIt = #entry_iterator{fd = Fd,
buffer = Record,
file_pos = Pos + Len,
file_len = 0,
size = Len},
I = I0#iterator{next_offset = NextOffs + 1,
num_left = Num - 1,
data_left = DataLeft - (Len + ?REC_HDR_SZ_SIMPLE_B),
data_cache = Rem,
next_record_pos = Pos + Len},
{{NextOffs, EntryIt}, I};
{<<Record:Len/binary, Rem/binary>>, full_entry} ->
I = I0#iterator{next_offset = NextOffs + 1,
num_left = Num - 1,
data_left = DataLeft - (Len + ?REC_HDR_SZ_SIMPLE_B),
data_cache = Rem,
next_record_pos = Pos + Len},
{{NextOffs, Record}, I};
_ ->
{_, entry_iterator} ->
Rem0Len = byte_size(Rem0),
EntryIt = #entry_iterator{fd = Fd, buffer = Rem0,
file_pos = Pos + Rem0Len,
file_len = Len - Rem0Len,
size = Len},
I = I0#iterator{next_offset = NextOffs + 1,
num_left = Num - 1,
data_left = DataLeft - (Len + ?REC_HDR_SZ_SIMPLE_B),
data_cache = <<>>,
next_record_pos = Pos + Len,
ra = ra_clear(Ra)},
{{NextOffs, EntryIt}, I};
{_, full_entry} ->
MinReqSize = Len + ?REC_HDR_SZ_SIMPLE_B,
{Data, Ra1} = iter_read_ahead(Fd, Pos0, MinReqSize, CreditHint,
DataLeft, Num, Ra),
iterator_next(I0#iterator{ra = Ra1,
data_cache = Data},
CreditHint)
CreditHint, full_entry)
end;
iterator_next(#iterator{fd = Fd,
next_offset = NextOffs,
Expand All @@ -1611,8 +1670,8 @@ iterator_next(#iterator{fd = Fd,
UncompressedLen,
Len, Rem0),
next_record_pos = Pos,
ra = Ra} = I0, CreditHint) ->
case Rem0 of
ra = Ra} = I0, CreditHint, Mode) ->
case Rem0 of
<<Data:Len/binary, Rem/binary>> ->
Record = {batch, NumRecs, CompType, UncompressedLen, Data},
I = I0#iterator{next_offset = NextOffs + NumRecs,
Expand All @@ -1629,19 +1688,74 @@ iterator_next(#iterator{fd = Fd,
DataLeft, Num, Ra),
iterator_next(I0#iterator{ra = Ra1,
data_cache = Data},
CreditHint)
CreditHint, Mode)
end;
iterator_next(#iterator{fd = Fd,
num_left = Num,
data_left = DataLeft,
ra = Ra,
next_record_pos = Pos} = I0, CreditHint) ->
next_record_pos = Pos} = I0, CreditHint, Mode) ->
MinReq = min(?REC_HDR_SZ_SUBBATCH_B, DataLeft),
{Data, Ra1} = iter_read_ahead(Fd, Pos, MinReq, CreditHint,
DataLeft, Num, Ra),
iterator_next(I0#iterator{ra = Ra1,
data_cache = Data},
CreditHint).
CreditHint, Mode).

%% Reads up to Max bytes from the current position of the entry iterator.
%% Returns the data (or fewer bytes if the remaining stream is shorter) and an
%% updated iterator, or {eof, It} when no data remains. The caller must use the
%% returned iterator for subsequent reads or skip. File read failures raise;
%% they are not returned as {error, _}.
-spec entry_iterator_read(entry_iterator(), non_neg_integer()) ->
{ok, binary(), entry_iterator()} | {eof, entry_iterator()}.
entry_iterator_read(#entry_iterator{buffer = Buffer,
file_len = FileLen} = It, _Max)
when Buffer =:= <<>>, FileLen =:= 0 ->
{eof, It};
entry_iterator_read(#entry_iterator{fd = Fd,
buffer = Buffer,
file_pos = FilePos,
file_len = FileLen} = It, Max) ->
case Buffer of
<<Chunk:Max/binary, Rest/binary>> ->
{ok, Chunk, It#entry_iterator{buffer = Rest}};
<<>> when FileLen > 0 ->
ReadLen = min(Max, FileLen),
case file:pread(Fd, FilePos, ReadLen) of
{ok, Data} ->
%% file:pread may return fewer bytes than requested (e.g. near EOF)
DataLen = byte_size(Data),
{ok, Data,
It#entry_iterator{file_pos = FilePos + DataLen,
file_len = FileLen - DataLen}};
{error, _} = Err ->
error(Err)
end;
<<>> ->
{eof, It};
Rest when byte_size(Rest) =< Max ->
{ok, Rest, It#entry_iterator{buffer = <<>>}};
Rest ->
Take = min(Max, byte_size(Rest)),
<<Chunk:Take/binary, Rest1/binary>> = Rest,
{ok, Chunk, It#entry_iterator{buffer = Rest1}}
end.

-spec entry_iterator_skip(entry_iterator(), non_neg_integer()) -> entry_iterator().
entry_iterator_skip(#entry_iterator{buffer = Buffer,
file_pos = FilePos,
file_len = FileLen} = It, N) ->
BufLen = byte_size(Buffer),
if
N =< BufLen ->
It#entry_iterator{buffer = binary:part(Buffer, N, BufLen - N)};
true ->
SkipFile = N - BufLen,
It#entry_iterator{buffer = <<>>,
file_pos = FilePos + SkipFile,
file_len = FileLen - SkipFile}
end.

-spec read_chunk(state()) ->
{ok, binary(), state()} |
Expand Down
65 changes: 65 additions & 0 deletions test/osiris_log_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ all_tests() ->
iterator_read_chunk_with_read_ahead,
iterator_read_chunk_with_read_ahead_2,
iterator_read_chunk_mixed_sizes_with_credit,
iterator_stream_entry,
iterator_next_entry_iterator_returns_iterator_not_binary,
read_chunk_parsed,
read_chunk_parsed_2,
read_chunk_parsed_multiple_chunks,
Expand Down Expand Up @@ -393,6 +395,69 @@ iterator_read_chunk_mixed_sizes_with_credit(Config) ->
osiris_log:close(S1),
ok.

iterator_stream_entry(Config) ->
%% With entry_iterator, simple entries are returned as entry_iterator instead of binary.
Conf = ?config(osiris_conf, Config),
S0 = osiris_log:init(Conf),
Shared = osiris_log:get_shared(S0),
RConf = Conf#{shared => Shared},
{ok, R0} = osiris_log:init_offset_reader(0, RConf),
{end_of_stream, R1} = osiris_log:chunk_iterator(R0),
EntriesRev = [<<"hi">>, <<"ho">>],
{ChId, S1} = write_committed(EntriesRev, S0),
{ok, _H, I0, R2} = osiris_log:chunk_iterator(R1),
%% First simple entry as entry_iterator (write order: first offset = last in list).
{{ChId, EntryIt0}, I1} = osiris_log:iterator_next(I0, 1, entry_iterator),
<<"ho">> = drain_entry_iterator(EntryIt0),
%% Second simple entry as entry_iterator.
HiOffs = ChId + 1,
{{HiOffs, EntryIt1}, I2} = osiris_log:iterator_next(I1, 1, entry_iterator),
<<"hi">> = drain_entry_iterator(EntryIt1),
?assertMatch(end_of_chunk, osiris_log:iterator_next(I2, 1, entry_iterator)),
%% entry_iterator_skip: re-read same chunk and skip first byte of first entry ("ho").
{ok, _, I3, _} = osiris_log:chunk_iterator(R1),
{{ChId, EntryIt2}, _} = osiris_log:iterator_next(I3, 1, entry_iterator),
EntryIt2Skipped = osiris_log:entry_iterator_skip(EntryIt2, 1),
<<"o">> = drain_entry_iterator(EntryIt2Skipped),
%% Read in small chunks.
{ok, _, I4, _} = osiris_log:chunk_iterator(R1),
{{ChId, EntryIt3}, _} = osiris_log:iterator_next(I4, 1, entry_iterator),
{ok, <<"h">>, EntryIt3a} = osiris_log:entry_iterator_read(EntryIt3, 1),
{ok, <<"o">>, _} = osiris_log:entry_iterator_read(EntryIt3a, 1),
osiris_log:close(R2),
osiris_log:close(S1),
ok.

%% iterator_next(_, _, entry_iterator) must return an entry iterator (usable with
%% entry_iterator_read/entry_iterator_skip), not a raw binary.
iterator_next_entry_iterator_returns_iterator_not_binary(Config) ->
Conf = ?config(osiris_conf, Config),
S0 = osiris_log:init(Conf),
Shared = osiris_log:get_shared(S0),
RConf = Conf#{shared => Shared},
{ok, R0} = osiris_log:init_offset_reader(0, RConf),
{end_of_stream, R1} = osiris_log:chunk_iterator(R0),
{ChId, S1} = write_committed([<<"payload">>], S0),
{ok, _H, I0, R2} = osiris_log:chunk_iterator(R1),
Result = osiris_log:iterator_next(I0, 1, entry_iterator),
{{ChId, EntryIt}, _I1} = Result,
?assertNot(is_binary(EntryIt), "entry_iterator mode must return an entry iterator, not a binary"),
?assertEqual(entry_iterator, element(1, EntryIt), "expected #entry_iterator{} record"),
osiris_log:close(R2),
osiris_log:close(S1),
ok.

drain_entry_iterator(EntryIt) ->
drain_entry_iterator(EntryIt, []).

drain_entry_iterator(EntryIt, Acc) ->
case osiris_log:entry_iterator_read(EntryIt, 64) of
{ok, Data, NewIt} ->
drain_entry_iterator(NewIt, [Data | Acc]);
{eof, _} ->
iolist_to_binary(lists:reverse(Acc))
end.

iterator_read_chunk_with_read_ahead(Config) ->
RAL = 4096, %% read ahead limit
Conf = ?config(osiris_conf, Config),
Expand Down