diff --git a/src/osiris.erl b/src/osiris.erl index b23ccbdd..47edcc96 100644 --- a/src/osiris.erl +++ b/src/osiris.erl @@ -81,8 +81,8 @@ {filter_value(), iodata() | batch()}. -type features() :: #{committed_offset_calculate => boolean()}. -%% returned when reading --type entry() :: binary() | batch(). +%% returned when reading (entry_iterator when using entry_iterator in iterator_next/2) +-type entry() :: binary() | batch() | osiris_log:entry_iterator(). -type reader_options() :: #{transport => tcp | ssl, chunk_selector => all | user_data, filter_spec => osiris_bloom:filter_spec(), diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 63dfafe6..827aee1d 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -34,6 +34,10 @@ chunk_iterator/2, chunk_iterator/3, iterator_next/1, + iterator_next/2, + iterator_next/3, + entry_iterator_read/2, + entry_iterator_skip/2, read_chunk/1, read_chunk_parsed/1, read_chunk_parsed/2, @@ -479,6 +483,8 @@ -export_type([state/0, chunk_iterator/0, + entry_iterator/0, + iterator_mode/0, range/0, config/0, counter_spec/0, @@ -1479,6 +1485,27 @@ read_header(#?MODULE{cfg = #cfg{}} = State0) -> next_record_pos :: non_neg_integer(), ra :: #ra{}}). -opaque chunk_iterator() :: #iterator{}. + +%% Entry iterator: streams a simple entry from file without loading it all. +%% Used when iterator_next/3 is called with mode entry_iterator. +%% +%% Fields: +%% fd - Open file descriptor of the segment file being read +%% buffer - Bytes already read from the entry and buffered for the next +%% entry_iterator_read; avoids re-reading from file +%% file_pos - Byte offset in the file where the next unread byte of the +%% entry lives (buffer length is not included in file_len) +%% file_len - Number of bytes of the entry still in the file (from file_pos +%% to end of entry); 0 when the whole entry is in buffer +%% size - Total size in bytes of the entry (payload); immutable +-record(entry_iterator, + {fd :: file:fd(), + buffer = <<>> :: binary(), + file_pos :: non_neg_integer(), + file_len :: non_neg_integer(), + size :: non_neg_integer()}). +-opaque entry_iterator() :: #entry_iterator{}. + -define(REC_MATCH_SIMPLE(Len, Rem), <<0:1, Len:31/unsigned, Rem/binary>>). -define(REC_MATCH_SUBBATCH(CompType, NumRec, UncompLen, Len, Rem), @@ -1573,11 +1600,18 @@ chunk_iterator(#?MODULE{cfg = #cfg{}, -spec iterator_next(chunk_iterator()) -> end_of_chunk | {offset_entry(), chunk_iterator()}. iterator_next(#iterator{} = I) -> - iterator_next(I, 1). + iterator_next(I, 1, full_entry). -spec iterator_next(chunk_iterator(), CreditHint :: non_neg_integer()) -> end_of_chunk | {offset_entry(), chunk_iterator()}. -iterator_next(#iterator{num_left = 0}, _CreditHint) -> +iterator_next(#iterator{} = I, CreditHint) -> + iterator_next(I, CreditHint, full_entry). + +-type iterator_mode() :: full_entry | entry_iterator. + +-spec iterator_next(chunk_iterator(), CreditHint :: non_neg_integer(), Mode :: iterator_mode()) -> + end_of_chunk | {offset_entry(), chunk_iterator()}. +iterator_next(#iterator{num_left = 0}, _CreditHint, _Mode) -> end_of_chunk; iterator_next(#iterator{fd = Fd, next_offset = NextOffs, @@ -1585,23 +1619,48 @@ iterator_next(#iterator{fd = Fd, data_left = DataLeft, data_cache = ?REC_MATCH_SIMPLE(Len, Rem0), next_record_pos = Pos0, - ra = Ra} = I0, CreditHint) -> + ra = Ra} = I0, CreditHint, Mode) -> Pos = Pos0 + ?REC_HDR_SZ_SIMPLE_B, - case Rem0 of - <> -> + case {Rem0, Mode} of + {<>, entry_iterator} -> + EntryIt = #entry_iterator{fd = Fd, + buffer = Record, + file_pos = Pos + Len, + file_len = 0, + size = Len}, + I = I0#iterator{next_offset = NextOffs + 1, + num_left = Num - 1, + data_left = DataLeft - (Len + ?REC_HDR_SZ_SIMPLE_B), + data_cache = Rem, + next_record_pos = Pos + Len}, + {{NextOffs, EntryIt}, I}; + {<>, full_entry} -> I = I0#iterator{next_offset = NextOffs + 1, num_left = Num - 1, data_left = DataLeft - (Len + ?REC_HDR_SZ_SIMPLE_B), data_cache = Rem, next_record_pos = Pos + Len}, {{NextOffs, Record}, I}; - _ -> + {_, entry_iterator} -> + Rem0Len = byte_size(Rem0), + EntryIt = #entry_iterator{fd = Fd, buffer = Rem0, + file_pos = Pos + Rem0Len, + file_len = Len - Rem0Len, + size = Len}, + I = I0#iterator{next_offset = NextOffs + 1, + num_left = Num - 1, + data_left = DataLeft - (Len + ?REC_HDR_SZ_SIMPLE_B), + data_cache = <<>>, + next_record_pos = Pos + Len, + ra = ra_clear(Ra)}, + {{NextOffs, EntryIt}, I}; + {_, full_entry} -> MinReqSize = Len + ?REC_HDR_SZ_SIMPLE_B, {Data, Ra1} = iter_read_ahead(Fd, Pos0, MinReqSize, CreditHint, DataLeft, Num, Ra), iterator_next(I0#iterator{ra = Ra1, data_cache = Data}, - CreditHint) + CreditHint, full_entry) end; iterator_next(#iterator{fd = Fd, next_offset = NextOffs, @@ -1611,8 +1670,8 @@ iterator_next(#iterator{fd = Fd, UncompressedLen, Len, Rem0), next_record_pos = Pos, - ra = Ra} = I0, CreditHint) -> - case Rem0 of + ra = Ra} = I0, CreditHint, Mode) -> + case Rem0 of <> -> Record = {batch, NumRecs, CompType, UncompressedLen, Data}, I = I0#iterator{next_offset = NextOffs + NumRecs, @@ -1629,19 +1688,74 @@ iterator_next(#iterator{fd = Fd, DataLeft, Num, Ra), iterator_next(I0#iterator{ra = Ra1, data_cache = Data}, - CreditHint) + CreditHint, Mode) end; iterator_next(#iterator{fd = Fd, num_left = Num, data_left = DataLeft, ra = Ra, - next_record_pos = Pos} = I0, CreditHint) -> + next_record_pos = Pos} = I0, CreditHint, Mode) -> MinReq = min(?REC_HDR_SZ_SUBBATCH_B, DataLeft), {Data, Ra1} = iter_read_ahead(Fd, Pos, MinReq, CreditHint, DataLeft, Num, Ra), iterator_next(I0#iterator{ra = Ra1, data_cache = Data}, - CreditHint). + CreditHint, Mode). + +%% Reads up to Max bytes from the current position of the entry iterator. +%% Returns the data (or fewer bytes if the remaining stream is shorter) and an +%% updated iterator, or {eof, It} when no data remains. The caller must use the +%% returned iterator for subsequent reads or skip. File read failures raise; +%% they are not returned as {error, _}. +-spec entry_iterator_read(entry_iterator(), non_neg_integer()) -> + {ok, binary(), entry_iterator()} | {eof, entry_iterator()}. +entry_iterator_read(#entry_iterator{buffer = Buffer, + file_len = FileLen} = It, _Max) + when Buffer =:= <<>>, FileLen =:= 0 -> + {eof, It}; +entry_iterator_read(#entry_iterator{fd = Fd, + buffer = Buffer, + file_pos = FilePos, + file_len = FileLen} = It, Max) -> + case Buffer of + <> -> + {ok, Chunk, It#entry_iterator{buffer = Rest}}; + <<>> when FileLen > 0 -> + ReadLen = min(Max, FileLen), + case file:pread(Fd, FilePos, ReadLen) of + {ok, Data} -> + %% file:pread may return fewer bytes than requested (e.g. near EOF) + DataLen = byte_size(Data), + {ok, Data, + It#entry_iterator{file_pos = FilePos + DataLen, + file_len = FileLen - DataLen}}; + {error, _} = Err -> + error(Err) + end; + <<>> -> + {eof, It}; + Rest when byte_size(Rest) =< Max -> + {ok, Rest, It#entry_iterator{buffer = <<>>}}; + Rest -> + Take = min(Max, byte_size(Rest)), + <> = Rest, + {ok, Chunk, It#entry_iterator{buffer = Rest1}} + end. + +-spec entry_iterator_skip(entry_iterator(), non_neg_integer()) -> entry_iterator(). +entry_iterator_skip(#entry_iterator{buffer = Buffer, + file_pos = FilePos, + file_len = FileLen} = It, N) -> + BufLen = byte_size(Buffer), + if + N =< BufLen -> + It#entry_iterator{buffer = binary:part(Buffer, N, BufLen - N)}; + true -> + SkipFile = N - BufLen, + It#entry_iterator{buffer = <<>>, + file_pos = FilePos + SkipFile, + file_len = FileLen - SkipFile} + end. -spec read_chunk(state()) -> {ok, binary(), state()} | diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index 5b3e38b5..b3c1c875 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -41,6 +41,8 @@ all_tests() -> iterator_read_chunk_with_read_ahead, iterator_read_chunk_with_read_ahead_2, iterator_read_chunk_mixed_sizes_with_credit, + iterator_stream_entry, + iterator_next_entry_iterator_returns_iterator_not_binary, read_chunk_parsed, read_chunk_parsed_2, read_chunk_parsed_multiple_chunks, @@ -393,6 +395,69 @@ iterator_read_chunk_mixed_sizes_with_credit(Config) -> osiris_log:close(S1), ok. +iterator_stream_entry(Config) -> + %% With entry_iterator, simple entries are returned as entry_iterator instead of binary. + Conf = ?config(osiris_conf, Config), + S0 = osiris_log:init(Conf), + Shared = osiris_log:get_shared(S0), + RConf = Conf#{shared => Shared}, + {ok, R0} = osiris_log:init_offset_reader(0, RConf), + {end_of_stream, R1} = osiris_log:chunk_iterator(R0), + EntriesRev = [<<"hi">>, <<"ho">>], + {ChId, S1} = write_committed(EntriesRev, S0), + {ok, _H, I0, R2} = osiris_log:chunk_iterator(R1), + %% First simple entry as entry_iterator (write order: first offset = last in list). + {{ChId, EntryIt0}, I1} = osiris_log:iterator_next(I0, 1, entry_iterator), + <<"ho">> = drain_entry_iterator(EntryIt0), + %% Second simple entry as entry_iterator. + HiOffs = ChId + 1, + {{HiOffs, EntryIt1}, I2} = osiris_log:iterator_next(I1, 1, entry_iterator), + <<"hi">> = drain_entry_iterator(EntryIt1), + ?assertMatch(end_of_chunk, osiris_log:iterator_next(I2, 1, entry_iterator)), + %% entry_iterator_skip: re-read same chunk and skip first byte of first entry ("ho"). + {ok, _, I3, _} = osiris_log:chunk_iterator(R1), + {{ChId, EntryIt2}, _} = osiris_log:iterator_next(I3, 1, entry_iterator), + EntryIt2Skipped = osiris_log:entry_iterator_skip(EntryIt2, 1), + <<"o">> = drain_entry_iterator(EntryIt2Skipped), + %% Read in small chunks. + {ok, _, I4, _} = osiris_log:chunk_iterator(R1), + {{ChId, EntryIt3}, _} = osiris_log:iterator_next(I4, 1, entry_iterator), + {ok, <<"h">>, EntryIt3a} = osiris_log:entry_iterator_read(EntryIt3, 1), + {ok, <<"o">>, _} = osiris_log:entry_iterator_read(EntryIt3a, 1), + osiris_log:close(R2), + osiris_log:close(S1), + ok. + +%% iterator_next(_, _, entry_iterator) must return an entry iterator (usable with +%% entry_iterator_read/entry_iterator_skip), not a raw binary. +iterator_next_entry_iterator_returns_iterator_not_binary(Config) -> + Conf = ?config(osiris_conf, Config), + S0 = osiris_log:init(Conf), + Shared = osiris_log:get_shared(S0), + RConf = Conf#{shared => Shared}, + {ok, R0} = osiris_log:init_offset_reader(0, RConf), + {end_of_stream, R1} = osiris_log:chunk_iterator(R0), + {ChId, S1} = write_committed([<<"payload">>], S0), + {ok, _H, I0, R2} = osiris_log:chunk_iterator(R1), + Result = osiris_log:iterator_next(I0, 1, entry_iterator), + {{ChId, EntryIt}, _I1} = Result, + ?assertNot(is_binary(EntryIt), "entry_iterator mode must return an entry iterator, not a binary"), + ?assertEqual(entry_iterator, element(1, EntryIt), "expected #entry_iterator{} record"), + osiris_log:close(R2), + osiris_log:close(S1), + ok. + +drain_entry_iterator(EntryIt) -> + drain_entry_iterator(EntryIt, []). + +drain_entry_iterator(EntryIt, Acc) -> + case osiris_log:entry_iterator_read(EntryIt, 64) of + {ok, Data, NewIt} -> + drain_entry_iterator(NewIt, [Data | Acc]); + {eof, _} -> + iolist_to_binary(lists:reverse(Acc)) + end. + iterator_read_chunk_with_read_ahead(Config) -> RAL = 4096, %% read ahead limit Conf = ?config(osiris_conf, Config),