From c8a3dcc9a54a161bb1c4c2a982a94c8e60297d4e Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Thu, 26 Feb 2026 15:54:43 +0100 Subject: [PATCH 1/5] Returns an entry iterator --- src/osiris.erl | 4 +- src/osiris_log.erl | 106 ++++++++++++++++++++++++++++-- test/osiris_log_SUITE.erl | 133 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 235 insertions(+), 8 deletions(-) diff --git a/src/osiris.erl b/src/osiris.erl index b23ccbdd..47edcc96 100644 --- a/src/osiris.erl +++ b/src/osiris.erl @@ -81,8 +81,8 @@ {filter_value(), iodata() | batch()}. -type features() :: #{committed_offset_calculate => boolean()}. -%% returned when reading --type entry() :: binary() | batch(). +%% returned when reading (entry_iterator when using entry_iterator in iterator_next/2) +-type entry() :: binary() | batch() | osiris_log:entry_iterator(). -type reader_options() :: #{transport => tcp | ssl, chunk_selector => all | user_data, filter_spec => osiris_bloom:filter_spec(), diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 63dfafe6..e6f6f786 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -34,6 +34,9 @@ chunk_iterator/2, chunk_iterator/3, iterator_next/1, + iterator_next/2, + entry_iterator_read/2, + entry_iterator_skip/2, read_chunk/1, read_chunk_parsed/1, read_chunk_parsed/2, @@ -479,6 +482,7 @@ -export_type([state/0, chunk_iterator/0, + entry_iterator/0, range/0, config/0, counter_spec/0, @@ -1479,6 +1483,25 @@ read_header(#?MODULE{cfg = #cfg{}} = State0) -> next_record_pos :: non_neg_integer(), ra :: #ra{}}). -opaque chunk_iterator() :: #iterator{}. + +%% Entry iterator: streams a simple entry from file without loading it all. +%% Used when iterator_next/2 is called with entry_iterator. +%% +%% Fields: +%% fd - Open file descriptor of the segment file being read +%% buffer - Bytes already read from the entry and buffered for the next +%% entry_iterator_read; avoids re-reading from file +%% file_pos - Byte offset in the file where the next unread byte of the +%% entry lives (buffer length is not included in file_len) +%% file_len - Number of bytes of the entry still in the file (from file_pos +%% to end of entry); 0 when the whole entry is in buffer +-record(entry_iterator, + {fd :: file:fd(), + buffer = <<>> :: binary(), + file_pos :: non_neg_integer(), + file_len :: non_neg_integer()}). +-opaque entry_iterator() :: #entry_iterator{}. + -define(REC_MATCH_SIMPLE(Len, Rem), <<0:1, Len:31/unsigned, Rem/binary>>). -define(REC_MATCH_SUBBATCH(CompType, NumRec, UncompLen, Len, Rem), @@ -1575,7 +1598,7 @@ chunk_iterator(#?MODULE{cfg = #cfg{}, iterator_next(#iterator{} = I) -> iterator_next(I, 1). --spec iterator_next(chunk_iterator(), CreditHint :: non_neg_integer()) -> +-spec iterator_next(chunk_iterator(), CreditHint :: non_neg_integer() | 'entry_iterator') -> end_of_chunk | {offset_entry(), chunk_iterator()}. iterator_next(#iterator{num_left = 0}, _CreditHint) -> end_of_chunk; @@ -1587,15 +1610,36 @@ iterator_next(#iterator{fd = Fd, next_record_pos = Pos0, ra = Ra} = I0, CreditHint) -> Pos = Pos0 + ?REC_HDR_SZ_SIMPLE_B, - case Rem0 of - <> -> + case {Rem0, CreditHint} of + {<>, entry_iterator} -> + EntryIt = #entry_iterator{fd = Fd, buffer = Record, + file_pos = Pos + Len, file_len = 0}, + I = I0#iterator{next_offset = NextOffs + 1, + num_left = Num - 1, + data_left = DataLeft - (Len + ?REC_HDR_SZ_SIMPLE_B), + data_cache = Rem, + next_record_pos = Pos + Len}, + {{NextOffs, EntryIt}, I}; + {<>, _} -> I = I0#iterator{next_offset = NextOffs + 1, num_left = Num - 1, data_left = DataLeft - (Len + ?REC_HDR_SZ_SIMPLE_B), data_cache = Rem, next_record_pos = Pos + Len}, {{NextOffs, Record}, I}; - _ -> + {_, entry_iterator} -> + PrefixLen = byte_size(Rem0), + EntryIt = #entry_iterator{fd = Fd, buffer = Rem0, + file_pos = Pos + PrefixLen, + file_len = Len - PrefixLen}, + I = I0#iterator{next_offset = NextOffs + 1, + num_left = Num - 1, + data_left = DataLeft - (Len + ?REC_HDR_SZ_SIMPLE_B), + data_cache = <<>>, + next_record_pos = Pos + Len, + ra = ra_clear(Ra)}, + {{NextOffs, EntryIt}, I}; + {_, _} -> MinReqSize = Len + ?REC_HDR_SZ_SIMPLE_B, {Data, Ra1} = iter_read_ahead(Fd, Pos0, MinReqSize, CreditHint, DataLeft, Num, Ra), @@ -1625,7 +1669,8 @@ iterator_next(#iterator{fd = Fd, %% not enough in Rem0 to read the entire record %% so we need to read it from disk MinReqSize = Len + ?REC_HDR_SZ_SUBBATCH_B, - {Data, Ra1} = iter_read_ahead(Fd, Pos, MinReqSize, CreditHint, + Credit = credit_hint_for_read_ahead(CreditHint), + {Data, Ra1} = iter_read_ahead(Fd, Pos, MinReqSize, Credit, DataLeft, Num, Ra), iterator_next(I0#iterator{ra = Ra1, data_cache = Data}, @@ -1637,12 +1682,61 @@ iterator_next(#iterator{fd = Fd, ra = Ra, next_record_pos = Pos} = I0, CreditHint) -> MinReq = min(?REC_HDR_SZ_SUBBATCH_B, DataLeft), - {Data, Ra1} = iter_read_ahead(Fd, Pos, MinReq, CreditHint, + Credit = credit_hint_for_read_ahead(CreditHint), + {Data, Ra1} = iter_read_ahead(Fd, Pos, MinReq, Credit, DataLeft, Num, Ra), iterator_next(I0#iterator{ra = Ra1, data_cache = Data}, CreditHint). +credit_hint_for_read_ahead(entry_iterator) -> 1; +credit_hint_for_read_ahead(N) when is_integer(N), N >= 0 -> N. + +-spec entry_iterator_read(entry_iterator(), non_neg_integer()) -> + {ok, binary(), entry_iterator()} | {eof, entry_iterator()}. +entry_iterator_read(#entry_iterator{buffer = Buffer, file_len = FileLen} = It, _Max) + when Buffer =:= <<>>, FileLen =:= 0 -> + {eof, It}; +entry_iterator_read(#entry_iterator{fd = Fd, buffer = Buffer, + file_pos = FilePos, file_len = FileLen} = It, + Max) -> + case Buffer of + <> -> + {ok, Chunk, It#entry_iterator{buffer = Rest}}; + <<>> when FileLen > 0 -> + ReadLen = min(Max, FileLen), + case file:pread(Fd, FilePos, ReadLen) of + {ok, Data} -> + {ok, Data, + It#entry_iterator{file_pos = FilePos + ReadLen, + file_len = FileLen - ReadLen}}; + {error, _} = Err -> + error(Err) + end; + <<>> -> + {eof, It}; + Rest when byte_size(Rest) =< Max -> + {ok, Rest, It#entry_iterator{buffer = <<>>}}; + Rest -> + Take = min(Max, byte_size(Rest)), + <> = Rest, + {ok, Chunk, It#entry_iterator{buffer = Rest1}} + end. + +-spec entry_iterator_skip(entry_iterator(), non_neg_integer()) -> entry_iterator(). +entry_iterator_skip(#entry_iterator{buffer = Buffer, file_pos = FilePos, + file_len = FileLen} = It, N) -> + BufLen = byte_size(Buffer), + if + N =< BufLen -> + It#entry_iterator{buffer = binary:part(Buffer, N, BufLen - N)}; + true -> + SkipFile = N - BufLen, + It#entry_iterator{buffer = <<>>, + file_pos = FilePos + SkipFile, + file_len = FileLen - SkipFile} + end. + -spec read_chunk(state()) -> {ok, binary(), state()} | {end_of_stream, state()} | diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index 5b3e38b5..0986e7bb 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -41,6 +41,8 @@ all_tests() -> iterator_read_chunk_with_read_ahead, iterator_read_chunk_with_read_ahead_2, iterator_read_chunk_mixed_sizes_with_credit, + iterator_stream_entry, + iterator_stream_entry_sections, read_chunk_parsed, read_chunk_parsed_2, read_chunk_parsed_multiple_chunks, @@ -393,6 +395,137 @@ iterator_read_chunk_mixed_sizes_with_credit(Config) -> osiris_log:close(S1), ok. +iterator_stream_entry(Config) -> + %% With entry_iterator, simple entries are returned as entry_iterator instead of binary. + Conf = ?config(osiris_conf, Config), + S0 = osiris_log:init(Conf), + Shared = osiris_log:get_shared(S0), + RConf = Conf#{shared => Shared}, + {ok, R0} = osiris_log:init_offset_reader(0, RConf), + {end_of_stream, R1} = osiris_log:chunk_iterator(R0), + EntriesRev = [<<"hi">>, <<"ho">>], + {ChId, S1} = write_committed(EntriesRev, S0), + {ok, _H, I0, R2} = osiris_log:chunk_iterator(R1), + %% First simple entry as entry_iterator (write order: first offset = last in list). + {{ChId, EntryIt0}, I1} = osiris_log:iterator_next(I0, entry_iterator), + <<"ho">> = drain_entry_iterator(EntryIt0), + %% Second simple entry as entry_iterator. + HiOffs = ChId + 1, + {{HiOffs, EntryIt1}, I2} = osiris_log:iterator_next(I1, entry_iterator), + <<"hi">> = drain_entry_iterator(EntryIt1), + ?assertMatch(end_of_chunk, osiris_log:iterator_next(I2, entry_iterator)), + %% entry_iterator_skip: re-read same chunk and skip first byte of first entry ("ho"). + {ok, _, I3, _} = osiris_log:chunk_iterator(R1), + {{ChId, EntryIt2}, _} = osiris_log:iterator_next(I3, entry_iterator), + EntryIt2Skipped = osiris_log:entry_iterator_skip(EntryIt2, 1), + <<"o">> = drain_entry_iterator(EntryIt2Skipped), + %% Read in small chunks. + {ok, _, I4, _} = osiris_log:chunk_iterator(R1), + {{ChId, EntryIt3}, _} = osiris_log:iterator_next(I4, entry_iterator), + {ok, <<"h">>, EntryIt3a} = osiris_log:entry_iterator_read(EntryIt3, 1), + {ok, <<"o">>, _} = osiris_log:entry_iterator_read(EntryIt3a, 1), + osiris_log:close(R2), + osiris_log:close(S1), + ok. + +%% Covers iterating over non-data sections, data section, attributes, and skipping +%% sections/attributes. Uses a single entry with a length-prefixed layout: +%% [AttrLen:16, Attrs, NonDataLen:16, NonDataSection, DataLen:16, DataSection] +iterator_stream_entry_sections(Config) -> + Conf = ?config(osiris_conf, Config), + S0 = osiris_log:init(Conf), + Shared = osiris_log:get_shared(S0), + RConf = Conf#{shared => Shared}, + {ok, R0} = osiris_log:init_offset_reader(0, RConf), + {end_of_stream, R1} = osiris_log:chunk_iterator(R0), + %% Build entry: attributes (7 bytes), non-data section "header" (6), data "body" (4). + Attrs = <<"k:v;a:1">>, + NonDataSection = <<"header">>, + DataSection = <<"body">>, + Entry = <<(byte_size(Attrs)):16/big, Attrs/binary, + (byte_size(NonDataSection)):16/big, NonDataSection/binary, + (byte_size(DataSection)):16/big, DataSection/binary>>, + {ChId, S1} = write_committed([Entry], S0), + {ok, _H, I0, R2} = osiris_log:chunk_iterator(R1), + {{ChId, EntryIt}, I0_done} = osiris_log:iterator_next(I0, entry_iterator), + + %% Read attributes (non-data): length then payload. + {ok, <<7:16/big>>, EntryIt1} = osiris_log:entry_iterator_read(EntryIt, 2), + {ok, Attrs, EntryIt2} = osiris_log:entry_iterator_read(EntryIt1, byte_size(Attrs)), + %% Read non-data section (header): length then payload. + {ok, <<6:16/big>>, EntryIt3} = osiris_log:entry_iterator_read(EntryIt2, 2), + {ok, NonDataSection, EntryIt4} = osiris_log:entry_iterator_read(EntryIt3, byte_size(NonDataSection)), + %% Read data section (body). + {ok, <<4:16/big>>, EntryIt5} = osiris_log:entry_iterator_read(EntryIt4, 2), + {ok, DataSection, EntryIt6} = osiris_log:entry_iterator_read(EntryIt5, byte_size(DataSection)), + %% Drain should be empty. + {eof, _} = osiris_log:entry_iterator_read(EntryIt6, 1), + + %% Same chunk: skip attributes entirely, then read non-data section only. + {ok, _, I1, _} = osiris_log:chunk_iterator(R1), + {{ChId, EntryItA}, _} = osiris_log:iterator_next(I1, entry_iterator), + SkipAttrs = 2 + 7, + EntryItB = osiris_log:entry_iterator_skip(EntryItA, SkipAttrs), + %% After skipping attributes we have non-data section then data section. + {ok, <<6:16/big>>, EntryItC} = osiris_log:entry_iterator_read(EntryItB, 2), + {ok, NonDataSection, EntryItD} = osiris_log:entry_iterator_read(EntryItC, byte_size(NonDataSection)), + %% EntryItD is now at the data section (2-byte length + 4-byte body); read it. + {ok, <<4:16/big>>, EntryItF} = osiris_log:entry_iterator_read(EntryItD, 2), + {ok, DataSection, EntryItG} = osiris_log:entry_iterator_read(EntryItF, byte_size(DataSection)), + {eof, _} = osiris_log:entry_iterator_read(EntryItG, 1), + + %% Same chunk: skip attributes and non-data section and data length prefix. + {ok, _, I2, _} = osiris_log:chunk_iterator(R1), + {{ChId, EntryItH}, _} = osiris_log:iterator_next(I2, entry_iterator), + SkipToData = 2 + 7 + 2 + 6 + 2, + EntryItI = osiris_log:entry_iterator_skip(EntryItH, SkipToData), + {ok, DataSection, _} = osiris_log:entry_iterator_read(EntryItI, byte_size(DataSection)), + + %% Same chunk: read first attribute byte, skip rest of attributes, read non-data. + {ok, _, I3, _} = osiris_log:chunk_iterator(R1), + {{ChId, EntryItJ}, _} = osiris_log:iterator_next(I3, entry_iterator), + {ok, <<7:16/big>>, EntryItK} = osiris_log:entry_iterator_read(EntryItJ, 2), + {ok, <<"k">>, EntryItL} = osiris_log:entry_iterator_read(EntryItK, 1), + EntryItM = osiris_log:entry_iterator_skip(EntryItL, 6), + {ok, <<6:16/big>>, EntryItN} = osiris_log:entry_iterator_read(EntryItM, 2), + {ok, NonDataSection, _} = osiris_log:entry_iterator_read(EntryItN, byte_size(NonDataSection)), + + %% Entry with multiple non-data sections: skip one section, read next, skip attr block. + Attrs2 = <<"x">>, + Section1 = <<"sec1">>, + Section2 = <<"section-two">>, + Data2 = <<"payload">>, + Entry2 = <<(byte_size(Attrs2)):16/big, Attrs2/binary, + (byte_size(Section1)):16/big, Section1/binary, + (byte_size(Section2)):16/big, Section2/binary, + (byte_size(Data2)):16/big, Data2/binary>>, + {ChId2, S2} = write_committed([Entry2], S1), + end_of_chunk = osiris_log:iterator_next(I0_done, entry_iterator), + {ok, _, I4, R3} = osiris_log:chunk_iterator(R2, 1, I0_done), + {{ChId2, EntryItP}, _} = osiris_log:iterator_next(I4, entry_iterator), + %% Skip attributes and first non-data section. + EntryItP1 = osiris_log:entry_iterator_skip(EntryItP, 2 + 1 + 2 + 4), + {ok, <<11:16/big>>, EntryItP2} = osiris_log:entry_iterator_read(EntryItP1, 2), + {ok, Section2, EntryItP3} = osiris_log:entry_iterator_read(EntryItP2, byte_size(Section2)), + %% EntryItP3 is now at the data section (2-byte length + 7-byte payload). + {ok, <<7:16/big>>, EntryItP5} = osiris_log:entry_iterator_read(EntryItP3, 2), + {ok, Data2, _} = osiris_log:entry_iterator_read(EntryItP5, byte_size(Data2)), + + osiris_log:close(R3), + osiris_log:close(S2), + ok. + +drain_entry_iterator(EntryIt) -> + drain_entry_iterator(EntryIt, []). + +drain_entry_iterator(EntryIt, Acc) -> + case osiris_log:entry_iterator_read(EntryIt, 64) of + {ok, Data, NewIt} -> + drain_entry_iterator(NewIt, [Data | Acc]); + {eof, _} -> + iolist_to_binary(lists:reverse(Acc)) + end. + iterator_read_chunk_with_read_ahead(Config) -> RAL = 4096, %% read ahead limit Conf = ?config(osiris_conf, Config), From 93850c8c147dd0cd5f6b1d7c36efdd3c1c59fbee Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Thu, 26 Feb 2026 18:27:21 +0100 Subject: [PATCH 2/5] Add entry iterator --- src/osiris_log.erl | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index e6f6f786..4a3b4b24 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -1692,6 +1692,18 @@ iterator_next(#iterator{fd = Fd, credit_hint_for_read_ahead(entry_iterator) -> 1; credit_hint_for_read_ahead(N) when is_integer(N), N >= 0 -> N. +%% Reads up to Max bytes from the current position of the entry iterator. +%% Returns the data (or fewer bytes if the remaining stream is shorter) and an +%% updated iterator, or {eof, It} when no data remains. The caller must use the +%% returned iterator for subsequent reads or skip. File read failures raise; +%% they are not returned as {error, _}. +%% +%% Entry iterators exist because chunks can be returned in entry_iterator mode +%% instead of as a full Entry: the stream protocol limits a single message to the +%% frame size (typically 1 MiB), but other protocols (e.g. AMQP 0-9-1) can +%% publish much larger messages to a stream. An entry iterator lets callers read +%% a bounded number of bytes at a time (e.g. the AMQP 1.0 section descriptor and +%% size) and then decide whether to read or skip the section content. -spec entry_iterator_read(entry_iterator(), non_neg_integer()) -> {ok, binary(), entry_iterator()} | {eof, entry_iterator()}. entry_iterator_read(#entry_iterator{buffer = Buffer, file_len = FileLen} = It, _Max) From 30b6236d86fb8d7c6871835abff5f4e8fb4fd6d0 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Thu, 26 Feb 2026 18:48:21 +0100 Subject: [PATCH 3/5] Move mode as a 3rd parameter --- src/osiris_log.erl | 53 +++++++++++++++++------------------- test/osiris_log_SUITE.erl | 57 +++++++++++++++++++++++++++++++-------- 2 files changed, 71 insertions(+), 39 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 4a3b4b24..60dc8171 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -35,6 +35,7 @@ chunk_iterator/3, iterator_next/1, iterator_next/2, + iterator_next/3, entry_iterator_read/2, entry_iterator_skip/2, read_chunk/1, @@ -483,6 +484,7 @@ -export_type([state/0, chunk_iterator/0, entry_iterator/0, + iterator_mode/0, range/0, config/0, counter_spec/0, @@ -1485,7 +1487,7 @@ read_header(#?MODULE{cfg = #cfg{}} = State0) -> -opaque chunk_iterator() :: #iterator{}. %% Entry iterator: streams a simple entry from file without loading it all. -%% Used when iterator_next/2 is called with entry_iterator. +%% Used when iterator_next/3 is called with mode entry_iterator. %% %% Fields: %% fd - Open file descriptor of the segment file being read @@ -1596,11 +1598,18 @@ chunk_iterator(#?MODULE{cfg = #cfg{}, -spec iterator_next(chunk_iterator()) -> end_of_chunk | {offset_entry(), chunk_iterator()}. iterator_next(#iterator{} = I) -> - iterator_next(I, 1). + iterator_next(I, 1, full_entry). --spec iterator_next(chunk_iterator(), CreditHint :: non_neg_integer() | 'entry_iterator') -> +-spec iterator_next(chunk_iterator(), CreditHint :: non_neg_integer()) -> end_of_chunk | {offset_entry(), chunk_iterator()}. -iterator_next(#iterator{num_left = 0}, _CreditHint) -> +iterator_next(#iterator{} = I, CreditHint) -> + iterator_next(I, CreditHint, full_entry). + +-type iterator_mode() :: full_entry | entry_iterator. + +-spec iterator_next(chunk_iterator(), CreditHint :: non_neg_integer(), Mode :: iterator_mode()) -> + end_of_chunk | {offset_entry(), chunk_iterator()}. +iterator_next(#iterator{num_left = 0}, _CreditHint, _Mode) -> end_of_chunk; iterator_next(#iterator{fd = Fd, next_offset = NextOffs, @@ -1608,9 +1617,9 @@ iterator_next(#iterator{fd = Fd, data_left = DataLeft, data_cache = ?REC_MATCH_SIMPLE(Len, Rem0), next_record_pos = Pos0, - ra = Ra} = I0, CreditHint) -> + ra = Ra} = I0, CreditHint, Mode) -> Pos = Pos0 + ?REC_HDR_SZ_SIMPLE_B, - case {Rem0, CreditHint} of + case {Rem0, Mode} of {<>, entry_iterator} -> EntryIt = #entry_iterator{fd = Fd, buffer = Record, file_pos = Pos + Len, file_len = 0}, @@ -1620,7 +1629,7 @@ iterator_next(#iterator{fd = Fd, data_cache = Rem, next_record_pos = Pos + Len}, {{NextOffs, EntryIt}, I}; - {<>, _} -> + {<>, full_entry} -> I = I0#iterator{next_offset = NextOffs + 1, num_left = Num - 1, data_left = DataLeft - (Len + ?REC_HDR_SZ_SIMPLE_B), @@ -1639,13 +1648,13 @@ iterator_next(#iterator{fd = Fd, next_record_pos = Pos + Len, ra = ra_clear(Ra)}, {{NextOffs, EntryIt}, I}; - {_, _} -> + {_, full_entry} -> MinReqSize = Len + ?REC_HDR_SZ_SIMPLE_B, {Data, Ra1} = iter_read_ahead(Fd, Pos0, MinReqSize, CreditHint, DataLeft, Num, Ra), iterator_next(I0#iterator{ra = Ra1, data_cache = Data}, - CreditHint) + CreditHint, full_entry) end; iterator_next(#iterator{fd = Fd, next_offset = NextOffs, @@ -1655,8 +1664,8 @@ iterator_next(#iterator{fd = Fd, UncompressedLen, Len, Rem0), next_record_pos = Pos, - ra = Ra} = I0, CreditHint) -> - case Rem0 of + ra = Ra} = I0, CreditHint, Mode) -> + case Rem0 of <> -> Record = {batch, NumRecs, CompType, UncompressedLen, Data}, I = I0#iterator{next_offset = NextOffs + NumRecs, @@ -1669,41 +1678,29 @@ iterator_next(#iterator{fd = Fd, %% not enough in Rem0 to read the entire record %% so we need to read it from disk MinReqSize = Len + ?REC_HDR_SZ_SUBBATCH_B, - Credit = credit_hint_for_read_ahead(CreditHint), - {Data, Ra1} = iter_read_ahead(Fd, Pos, MinReqSize, Credit, + {Data, Ra1} = iter_read_ahead(Fd, Pos, MinReqSize, CreditHint, DataLeft, Num, Ra), iterator_next(I0#iterator{ra = Ra1, data_cache = Data}, - CreditHint) + CreditHint, Mode) end; iterator_next(#iterator{fd = Fd, num_left = Num, data_left = DataLeft, ra = Ra, - next_record_pos = Pos} = I0, CreditHint) -> + next_record_pos = Pos} = I0, CreditHint, Mode) -> MinReq = min(?REC_HDR_SZ_SUBBATCH_B, DataLeft), - Credit = credit_hint_for_read_ahead(CreditHint), - {Data, Ra1} = iter_read_ahead(Fd, Pos, MinReq, Credit, + {Data, Ra1} = iter_read_ahead(Fd, Pos, MinReq, CreditHint, DataLeft, Num, Ra), iterator_next(I0#iterator{ra = Ra1, data_cache = Data}, - CreditHint). - -credit_hint_for_read_ahead(entry_iterator) -> 1; -credit_hint_for_read_ahead(N) when is_integer(N), N >= 0 -> N. + CreditHint, Mode). %% Reads up to Max bytes from the current position of the entry iterator. %% Returns the data (or fewer bytes if the remaining stream is shorter) and an %% updated iterator, or {eof, It} when no data remains. The caller must use the %% returned iterator for subsequent reads or skip. File read failures raise; %% they are not returned as {error, _}. -%% -%% Entry iterators exist because chunks can be returned in entry_iterator mode -%% instead of as a full Entry: the stream protocol limits a single message to the -%% frame size (typically 1 MiB), but other protocols (e.g. AMQP 0-9-1) can -%% publish much larger messages to a stream. An entry iterator lets callers read -%% a bounded number of bytes at a time (e.g. the AMQP 1.0 section descriptor and -%% size) and then decide whether to read or skip the section content. -spec entry_iterator_read(entry_iterator(), non_neg_integer()) -> {ok, binary(), entry_iterator()} | {eof, entry_iterator()}. entry_iterator_read(#entry_iterator{buffer = Buffer, file_len = FileLen} = It, _Max) diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index 0986e7bb..b19c60f8 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -43,6 +43,7 @@ all_tests() -> iterator_read_chunk_mixed_sizes_with_credit, iterator_stream_entry, iterator_stream_entry_sections, + entry_iterator_read_contract, read_chunk_parsed, read_chunk_parsed_2, read_chunk_parsed_multiple_chunks, @@ -407,21 +408,21 @@ iterator_stream_entry(Config) -> {ChId, S1} = write_committed(EntriesRev, S0), {ok, _H, I0, R2} = osiris_log:chunk_iterator(R1), %% First simple entry as entry_iterator (write order: first offset = last in list). - {{ChId, EntryIt0}, I1} = osiris_log:iterator_next(I0, entry_iterator), + {{ChId, EntryIt0}, I1} = osiris_log:iterator_next(I0, 1, entry_iterator), <<"ho">> = drain_entry_iterator(EntryIt0), %% Second simple entry as entry_iterator. HiOffs = ChId + 1, - {{HiOffs, EntryIt1}, I2} = osiris_log:iterator_next(I1, entry_iterator), + {{HiOffs, EntryIt1}, I2} = osiris_log:iterator_next(I1, 1, entry_iterator), <<"hi">> = drain_entry_iterator(EntryIt1), - ?assertMatch(end_of_chunk, osiris_log:iterator_next(I2, entry_iterator)), + ?assertMatch(end_of_chunk, osiris_log:iterator_next(I2, 1, entry_iterator)), %% entry_iterator_skip: re-read same chunk and skip first byte of first entry ("ho"). {ok, _, I3, _} = osiris_log:chunk_iterator(R1), - {{ChId, EntryIt2}, _} = osiris_log:iterator_next(I3, entry_iterator), + {{ChId, EntryIt2}, _} = osiris_log:iterator_next(I3, 1, entry_iterator), EntryIt2Skipped = osiris_log:entry_iterator_skip(EntryIt2, 1), <<"o">> = drain_entry_iterator(EntryIt2Skipped), %% Read in small chunks. {ok, _, I4, _} = osiris_log:chunk_iterator(R1), - {{ChId, EntryIt3}, _} = osiris_log:iterator_next(I4, entry_iterator), + {{ChId, EntryIt3}, _} = osiris_log:iterator_next(I4, 1, entry_iterator), {ok, <<"h">>, EntryIt3a} = osiris_log:entry_iterator_read(EntryIt3, 1), {ok, <<"o">>, _} = osiris_log:entry_iterator_read(EntryIt3a, 1), osiris_log:close(R2), @@ -447,7 +448,7 @@ iterator_stream_entry_sections(Config) -> (byte_size(DataSection)):16/big, DataSection/binary>>, {ChId, S1} = write_committed([Entry], S0), {ok, _H, I0, R2} = osiris_log:chunk_iterator(R1), - {{ChId, EntryIt}, I0_done} = osiris_log:iterator_next(I0, entry_iterator), + {{ChId, EntryIt}, I0_done} = osiris_log:iterator_next(I0, 1, entry_iterator), %% Read attributes (non-data): length then payload. {ok, <<7:16/big>>, EntryIt1} = osiris_log:entry_iterator_read(EntryIt, 2), @@ -463,7 +464,7 @@ iterator_stream_entry_sections(Config) -> %% Same chunk: skip attributes entirely, then read non-data section only. {ok, _, I1, _} = osiris_log:chunk_iterator(R1), - {{ChId, EntryItA}, _} = osiris_log:iterator_next(I1, entry_iterator), + {{ChId, EntryItA}, _} = osiris_log:iterator_next(I1, 1, entry_iterator), SkipAttrs = 2 + 7, EntryItB = osiris_log:entry_iterator_skip(EntryItA, SkipAttrs), %% After skipping attributes we have non-data section then data section. @@ -476,14 +477,14 @@ iterator_stream_entry_sections(Config) -> %% Same chunk: skip attributes and non-data section and data length prefix. {ok, _, I2, _} = osiris_log:chunk_iterator(R1), - {{ChId, EntryItH}, _} = osiris_log:iterator_next(I2, entry_iterator), + {{ChId, EntryItH}, _} = osiris_log:iterator_next(I2, 1, entry_iterator), SkipToData = 2 + 7 + 2 + 6 + 2, EntryItI = osiris_log:entry_iterator_skip(EntryItH, SkipToData), {ok, DataSection, _} = osiris_log:entry_iterator_read(EntryItI, byte_size(DataSection)), %% Same chunk: read first attribute byte, skip rest of attributes, read non-data. {ok, _, I3, _} = osiris_log:chunk_iterator(R1), - {{ChId, EntryItJ}, _} = osiris_log:iterator_next(I3, entry_iterator), + {{ChId, EntryItJ}, _} = osiris_log:iterator_next(I3, 1, entry_iterator), {ok, <<7:16/big>>, EntryItK} = osiris_log:entry_iterator_read(EntryItJ, 2), {ok, <<"k">>, EntryItL} = osiris_log:entry_iterator_read(EntryItK, 1), EntryItM = osiris_log:entry_iterator_skip(EntryItL, 6), @@ -500,9 +501,9 @@ iterator_stream_entry_sections(Config) -> (byte_size(Section2)):16/big, Section2/binary, (byte_size(Data2)):16/big, Data2/binary>>, {ChId2, S2} = write_committed([Entry2], S1), - end_of_chunk = osiris_log:iterator_next(I0_done, entry_iterator), + end_of_chunk = osiris_log:iterator_next(I0_done, 1, entry_iterator), {ok, _, I4, R3} = osiris_log:chunk_iterator(R2, 1, I0_done), - {{ChId2, EntryItP}, _} = osiris_log:iterator_next(I4, entry_iterator), + {{ChId2, EntryItP}, _} = osiris_log:iterator_next(I4, 1, entry_iterator), %% Skip attributes and first non-data section. EntryItP1 = osiris_log:entry_iterator_skip(EntryItP, 2 + 1 + 2 + 4), {ok, <<11:16/big>>, EntryItP2} = osiris_log:entry_iterator_read(EntryItP1, 2), @@ -515,6 +516,40 @@ iterator_stream_entry_sections(Config) -> osiris_log:close(S2), ok. +%% entry_iterator_read(It, Max) contract (for PR / API documentation): +%% - Returns {ok, Data, NewIt} with up to Max bytes; Data may be shorter if +%% the remaining stream is shorter. The caller must use NewIt for the next read. +%% - Returns {eof, It} when no data remains. +%% - File read failures raise; they are not returned as {error, _}. +entry_iterator_read_contract(Config) -> + Conf = ?config(osiris_conf, Config), + S0 = osiris_log:init(Conf), + Shared = osiris_log:get_shared(S0), + RConf = Conf#{shared => Shared}, + {ok, R0} = osiris_log:init_offset_reader(0, RConf), + {end_of_stream, R1} = osiris_log:chunk_iterator(R0), + {ChId, S1} = write_committed([<<"ho">>], S0), + {ok, _H, I0, R2} = osiris_log:chunk_iterator(R1), + {{ChId, EntryIt}, _} = osiris_log:iterator_next(I0, 1, entry_iterator), + + %% Read one byte at a time; each call returns updated iterator for next read. + {ok, <<"h">>, EntryIt1} = osiris_log:entry_iterator_read(EntryIt, 1), + {ok, <<"o">>, EntryIt2} = osiris_log:entry_iterator_read(EntryIt1, 1), + %% No data left. + {eof, _} = osiris_log:entry_iterator_read(EntryIt2, 1), + %% Max larger than remaining: already at eof, so eof again. + {eof, _} = osiris_log:entry_iterator_read(EntryIt2, 10), + + %% Same chunk: single read with Max >= entry size returns full entry then eof. + {ok, _, I1, _} = osiris_log:chunk_iterator(R1), + {{ChId, EntryItB}, _} = osiris_log:iterator_next(I1, 1, entry_iterator), + {ok, <<"ho">>, EntryItB1} = osiris_log:entry_iterator_read(EntryItB, 10), + {eof, _} = osiris_log:entry_iterator_read(EntryItB1, 10), + + osiris_log:close(R2), + osiris_log:close(S1), + ok. + drain_entry_iterator(EntryIt) -> drain_entry_iterator(EntryIt, []). From 5439a4a5e21f5103695a5659ac276fa5aee0eb80 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Thu, 26 Feb 2026 19:05:57 +0100 Subject: [PATCH 4/5] Improve tests WIP: Test different read/skip scnearios --- test/osiris_log_SUITE.erl | 121 +++----------------------------------- 1 file changed, 9 insertions(+), 112 deletions(-) diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index b19c60f8..b3c1c875 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -42,8 +42,7 @@ all_tests() -> iterator_read_chunk_with_read_ahead_2, iterator_read_chunk_mixed_sizes_with_credit, iterator_stream_entry, - iterator_stream_entry_sections, - entry_iterator_read_contract, + iterator_next_entry_iterator_returns_iterator_not_binary, read_chunk_parsed, read_chunk_parsed_2, read_chunk_parsed_multiple_chunks, @@ -429,123 +428,21 @@ iterator_stream_entry(Config) -> osiris_log:close(S1), ok. -%% Covers iterating over non-data sections, data section, attributes, and skipping -%% sections/attributes. Uses a single entry with a length-prefixed layout: -%% [AttrLen:16, Attrs, NonDataLen:16, NonDataSection, DataLen:16, DataSection] -iterator_stream_entry_sections(Config) -> +%% iterator_next(_, _, entry_iterator) must return an entry iterator (usable with +%% entry_iterator_read/entry_iterator_skip), not a raw binary. +iterator_next_entry_iterator_returns_iterator_not_binary(Config) -> Conf = ?config(osiris_conf, Config), S0 = osiris_log:init(Conf), Shared = osiris_log:get_shared(S0), RConf = Conf#{shared => Shared}, {ok, R0} = osiris_log:init_offset_reader(0, RConf), {end_of_stream, R1} = osiris_log:chunk_iterator(R0), - %% Build entry: attributes (7 bytes), non-data section "header" (6), data "body" (4). - Attrs = <<"k:v;a:1">>, - NonDataSection = <<"header">>, - DataSection = <<"body">>, - Entry = <<(byte_size(Attrs)):16/big, Attrs/binary, - (byte_size(NonDataSection)):16/big, NonDataSection/binary, - (byte_size(DataSection)):16/big, DataSection/binary>>, - {ChId, S1} = write_committed([Entry], S0), + {ChId, S1} = write_committed([<<"payload">>], S0), {ok, _H, I0, R2} = osiris_log:chunk_iterator(R1), - {{ChId, EntryIt}, I0_done} = osiris_log:iterator_next(I0, 1, entry_iterator), - - %% Read attributes (non-data): length then payload. - {ok, <<7:16/big>>, EntryIt1} = osiris_log:entry_iterator_read(EntryIt, 2), - {ok, Attrs, EntryIt2} = osiris_log:entry_iterator_read(EntryIt1, byte_size(Attrs)), - %% Read non-data section (header): length then payload. - {ok, <<6:16/big>>, EntryIt3} = osiris_log:entry_iterator_read(EntryIt2, 2), - {ok, NonDataSection, EntryIt4} = osiris_log:entry_iterator_read(EntryIt3, byte_size(NonDataSection)), - %% Read data section (body). - {ok, <<4:16/big>>, EntryIt5} = osiris_log:entry_iterator_read(EntryIt4, 2), - {ok, DataSection, EntryIt6} = osiris_log:entry_iterator_read(EntryIt5, byte_size(DataSection)), - %% Drain should be empty. - {eof, _} = osiris_log:entry_iterator_read(EntryIt6, 1), - - %% Same chunk: skip attributes entirely, then read non-data section only. - {ok, _, I1, _} = osiris_log:chunk_iterator(R1), - {{ChId, EntryItA}, _} = osiris_log:iterator_next(I1, 1, entry_iterator), - SkipAttrs = 2 + 7, - EntryItB = osiris_log:entry_iterator_skip(EntryItA, SkipAttrs), - %% After skipping attributes we have non-data section then data section. - {ok, <<6:16/big>>, EntryItC} = osiris_log:entry_iterator_read(EntryItB, 2), - {ok, NonDataSection, EntryItD} = osiris_log:entry_iterator_read(EntryItC, byte_size(NonDataSection)), - %% EntryItD is now at the data section (2-byte length + 4-byte body); read it. - {ok, <<4:16/big>>, EntryItF} = osiris_log:entry_iterator_read(EntryItD, 2), - {ok, DataSection, EntryItG} = osiris_log:entry_iterator_read(EntryItF, byte_size(DataSection)), - {eof, _} = osiris_log:entry_iterator_read(EntryItG, 1), - - %% Same chunk: skip attributes and non-data section and data length prefix. - {ok, _, I2, _} = osiris_log:chunk_iterator(R1), - {{ChId, EntryItH}, _} = osiris_log:iterator_next(I2, 1, entry_iterator), - SkipToData = 2 + 7 + 2 + 6 + 2, - EntryItI = osiris_log:entry_iterator_skip(EntryItH, SkipToData), - {ok, DataSection, _} = osiris_log:entry_iterator_read(EntryItI, byte_size(DataSection)), - - %% Same chunk: read first attribute byte, skip rest of attributes, read non-data. - {ok, _, I3, _} = osiris_log:chunk_iterator(R1), - {{ChId, EntryItJ}, _} = osiris_log:iterator_next(I3, 1, entry_iterator), - {ok, <<7:16/big>>, EntryItK} = osiris_log:entry_iterator_read(EntryItJ, 2), - {ok, <<"k">>, EntryItL} = osiris_log:entry_iterator_read(EntryItK, 1), - EntryItM = osiris_log:entry_iterator_skip(EntryItL, 6), - {ok, <<6:16/big>>, EntryItN} = osiris_log:entry_iterator_read(EntryItM, 2), - {ok, NonDataSection, _} = osiris_log:entry_iterator_read(EntryItN, byte_size(NonDataSection)), - - %% Entry with multiple non-data sections: skip one section, read next, skip attr block. - Attrs2 = <<"x">>, - Section1 = <<"sec1">>, - Section2 = <<"section-two">>, - Data2 = <<"payload">>, - Entry2 = <<(byte_size(Attrs2)):16/big, Attrs2/binary, - (byte_size(Section1)):16/big, Section1/binary, - (byte_size(Section2)):16/big, Section2/binary, - (byte_size(Data2)):16/big, Data2/binary>>, - {ChId2, S2} = write_committed([Entry2], S1), - end_of_chunk = osiris_log:iterator_next(I0_done, 1, entry_iterator), - {ok, _, I4, R3} = osiris_log:chunk_iterator(R2, 1, I0_done), - {{ChId2, EntryItP}, _} = osiris_log:iterator_next(I4, 1, entry_iterator), - %% Skip attributes and first non-data section. - EntryItP1 = osiris_log:entry_iterator_skip(EntryItP, 2 + 1 + 2 + 4), - {ok, <<11:16/big>>, EntryItP2} = osiris_log:entry_iterator_read(EntryItP1, 2), - {ok, Section2, EntryItP3} = osiris_log:entry_iterator_read(EntryItP2, byte_size(Section2)), - %% EntryItP3 is now at the data section (2-byte length + 7-byte payload). - {ok, <<7:16/big>>, EntryItP5} = osiris_log:entry_iterator_read(EntryItP3, 2), - {ok, Data2, _} = osiris_log:entry_iterator_read(EntryItP5, byte_size(Data2)), - - osiris_log:close(R3), - osiris_log:close(S2), - ok. - -%% entry_iterator_read(It, Max) contract (for PR / API documentation): -%% - Returns {ok, Data, NewIt} with up to Max bytes; Data may be shorter if -%% the remaining stream is shorter. The caller must use NewIt for the next read. -%% - Returns {eof, It} when no data remains. -%% - File read failures raise; they are not returned as {error, _}. -entry_iterator_read_contract(Config) -> - Conf = ?config(osiris_conf, Config), - S0 = osiris_log:init(Conf), - Shared = osiris_log:get_shared(S0), - RConf = Conf#{shared => Shared}, - {ok, R0} = osiris_log:init_offset_reader(0, RConf), - {end_of_stream, R1} = osiris_log:chunk_iterator(R0), - {ChId, S1} = write_committed([<<"ho">>], S0), - {ok, _H, I0, R2} = osiris_log:chunk_iterator(R1), - {{ChId, EntryIt}, _} = osiris_log:iterator_next(I0, 1, entry_iterator), - - %% Read one byte at a time; each call returns updated iterator for next read. - {ok, <<"h">>, EntryIt1} = osiris_log:entry_iterator_read(EntryIt, 1), - {ok, <<"o">>, EntryIt2} = osiris_log:entry_iterator_read(EntryIt1, 1), - %% No data left. - {eof, _} = osiris_log:entry_iterator_read(EntryIt2, 1), - %% Max larger than remaining: already at eof, so eof again. - {eof, _} = osiris_log:entry_iterator_read(EntryIt2, 10), - - %% Same chunk: single read with Max >= entry size returns full entry then eof. - {ok, _, I1, _} = osiris_log:chunk_iterator(R1), - {{ChId, EntryItB}, _} = osiris_log:iterator_next(I1, 1, entry_iterator), - {ok, <<"ho">>, EntryItB1} = osiris_log:entry_iterator_read(EntryItB, 10), - {eof, _} = osiris_log:entry_iterator_read(EntryItB1, 10), - + Result = osiris_log:iterator_next(I0, 1, entry_iterator), + {{ChId, EntryIt}, _I1} = Result, + ?assertNot(is_binary(EntryIt), "entry_iterator mode must return an entry iterator, not a binary"), + ?assertEqual(entry_iterator, element(1, EntryIt), "expected #entry_iterator{} record"), osiris_log:close(R2), osiris_log:close(S1), ok. From ee3d4796d497f90e6e14ebe610c9951e7bf4593b Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Thu, 26 Feb 2026 21:07:54 +0100 Subject: [PATCH 5/5] Address edge case Where the read number of bytes < requested bytes --- src/osiris_log.erl | 37 ++++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 60dc8171..827aee1d 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -1497,11 +1497,13 @@ read_header(#?MODULE{cfg = #cfg{}} = State0) -> %% entry lives (buffer length is not included in file_len) %% file_len - Number of bytes of the entry still in the file (from file_pos %% to end of entry); 0 when the whole entry is in buffer +%% size - Total size in bytes of the entry (payload); immutable -record(entry_iterator, {fd :: file:fd(), buffer = <<>> :: binary(), file_pos :: non_neg_integer(), - file_len :: non_neg_integer()}). + file_len :: non_neg_integer(), + size :: non_neg_integer()}). -opaque entry_iterator() :: #entry_iterator{}. -define(REC_MATCH_SIMPLE(Len, Rem), @@ -1621,8 +1623,11 @@ iterator_next(#iterator{fd = Fd, Pos = Pos0 + ?REC_HDR_SZ_SIMPLE_B, case {Rem0, Mode} of {<>, entry_iterator} -> - EntryIt = #entry_iterator{fd = Fd, buffer = Record, - file_pos = Pos + Len, file_len = 0}, + EntryIt = #entry_iterator{fd = Fd, + buffer = Record, + file_pos = Pos + Len, + file_len = 0, + size = Len}, I = I0#iterator{next_offset = NextOffs + 1, num_left = Num - 1, data_left = DataLeft - (Len + ?REC_HDR_SZ_SIMPLE_B), @@ -1637,10 +1642,11 @@ iterator_next(#iterator{fd = Fd, next_record_pos = Pos + Len}, {{NextOffs, Record}, I}; {_, entry_iterator} -> - PrefixLen = byte_size(Rem0), + Rem0Len = byte_size(Rem0), EntryIt = #entry_iterator{fd = Fd, buffer = Rem0, - file_pos = Pos + PrefixLen, - file_len = Len - PrefixLen}, + file_pos = Pos + Rem0Len, + file_len = Len - Rem0Len, + size = Len}, I = I0#iterator{next_offset = NextOffs + 1, num_left = Num - 1, data_left = DataLeft - (Len + ?REC_HDR_SZ_SIMPLE_B), @@ -1703,12 +1709,14 @@ iterator_next(#iterator{fd = Fd, %% they are not returned as {error, _}. -spec entry_iterator_read(entry_iterator(), non_neg_integer()) -> {ok, binary(), entry_iterator()} | {eof, entry_iterator()}. -entry_iterator_read(#entry_iterator{buffer = Buffer, file_len = FileLen} = It, _Max) +entry_iterator_read(#entry_iterator{buffer = Buffer, + file_len = FileLen} = It, _Max) when Buffer =:= <<>>, FileLen =:= 0 -> {eof, It}; -entry_iterator_read(#entry_iterator{fd = Fd, buffer = Buffer, - file_pos = FilePos, file_len = FileLen} = It, - Max) -> +entry_iterator_read(#entry_iterator{fd = Fd, + buffer = Buffer, + file_pos = FilePos, + file_len = FileLen} = It, Max) -> case Buffer of <> -> {ok, Chunk, It#entry_iterator{buffer = Rest}}; @@ -1716,9 +1724,11 @@ entry_iterator_read(#entry_iterator{fd = Fd, buffer = Buffer, ReadLen = min(Max, FileLen), case file:pread(Fd, FilePos, ReadLen) of {ok, Data} -> + %% file:pread may return fewer bytes than requested (e.g. near EOF) + DataLen = byte_size(Data), {ok, Data, - It#entry_iterator{file_pos = FilePos + ReadLen, - file_len = FileLen - ReadLen}}; + It#entry_iterator{file_pos = FilePos + DataLen, + file_len = FileLen - DataLen}}; {error, _} = Err -> error(Err) end; @@ -1733,7 +1743,8 @@ entry_iterator_read(#entry_iterator{fd = Fd, buffer = Buffer, end. -spec entry_iterator_skip(entry_iterator(), non_neg_integer()) -> entry_iterator(). -entry_iterator_skip(#entry_iterator{buffer = Buffer, file_pos = FilePos, +entry_iterator_skip(#entry_iterator{buffer = Buffer, + file_pos = FilePos, file_len = FileLen} = It, N) -> BufLen = byte_size(Buffer), if