From 56d9665346294bc9a0e44fe586617fd1091e46cb Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Thu, 5 Mar 2026 16:37:56 +0100 Subject: [PATCH 1/6] Add function that returns several landmark offsets --- src/osiris_log.erl | 206 ++++++++++++++++++++++++++++++++++++++ test/osiris_log_SUITE.erl | 107 +++++++++++++++++++- 2 files changed, 311 insertions(+), 2 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 95c8ab24..71b95317 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -57,6 +57,8 @@ directory/1, delete_directory/1, counter_fields/0, + stream_offset_landmarks/1, + last_offset_and_timestamp/1, make_counter/1, generate_log/4]). @@ -3504,6 +3506,210 @@ write_in_chunks(ToWrite, MsgsPerChunk, Msg, W0) when ToWrite > 0 -> write_in_chunks(_, _, _, W) -> W. +%% Scans all index files for the log at Dir and returns the first chunk +%% (offset + timestamp), last chunk (offset + timestamp), and the chunk +%% closest to 25%, 50% and 75% of the offset range (with offset and +%% timestamp). Percent positions may not fall on a chunk boundary, so +%% the chunk with the closest offset is chosen. +-spec stream_offset_landmarks(file:filename_all() | config()) -> + {ok, #{first => {offset(), osiris:timestamp()}, + last => {offset(), osiris:timestamp()}, + p25 => {offset(), osiris:timestamp()}, + p50 => {offset(), osiris:timestamp()}, + p75 => {offset(), osiris:timestamp()}}} | + {error, empty}. +stream_offset_landmarks(#{dir := Dir}) -> + stream_offset_landmarks(Dir); +stream_offset_landmarks(Dir) when ?IS_STRING(Dir) -> + IdxFiles = sorted_index_files(Dir), + case scan_index_chunks_files(IdxFiles, []) of + {ok, []} -> + {error, empty}; + {ok, [One]} -> + {LastOff, LastTs} = + case last_offset_and_timestamp_from_files(IdxFiles) of + {ok, L} -> L; + _ -> One + end, + {ok, #{first => One, + last => {LastOff, LastTs}, + p25 => One, + p50 => One, + p75 => One}}; + {ok, Chunks} -> + First = hd(Chunks), + LastChunk = lists:last(Chunks), + {FirstOffset, _FirstTs} = First, + {LastChunkId, _LastChunkTs} = LastChunk, + Last = case last_offset_and_timestamp_from_files(IdxFiles) of + {ok, L} -> L; + _ -> LastChunk + end, + Range = LastChunkId - FirstOffset, + Targets = case Range of + 0 -> + [FirstOffset, FirstOffset, FirstOffset]; + _ -> + [FirstOffset + (Range * 25) div 100, + FirstOffset + (Range * 50) div 100, + FirstOffset + (Range * 75) div 100] + end, + [P25, P50, P75] = closest_chunks_to_targets(Chunks, Targets), + {ok, #{first => First, + last => Last, + p25 => P25, + p50 => P50, + p75 => P75}} + end. + +%% Returns {ok, {LastOffset, Timestamp}} where LastOffset is the very last +%% offset in the log (last offset in the last chunk), not the last chunk's +%% first offset. Timestamp is the last chunk's timestamp. +-spec last_offset_and_timestamp(file:filename_all()) -> + {ok, {offset(), osiris:timestamp()}} | {error, empty}. +last_offset_and_timestamp(Dir) -> + last_offset_and_timestamp_from_files(sorted_index_files(Dir)). + +last_offset_and_timestamp_from_files(IdxFiles) -> + case non_empty_index_files(IdxFiles) of + [] -> + {error, empty}; + NonEmpty -> + LastIdxFile = lists:last(NonEmpty), + last_offset_and_timestamp_from_file(LastIdxFile) + end. + +last_offset_and_timestamp_from_file(LastIdxFile) -> + case file:open(LastIdxFile, [read, raw, binary]) of + {ok, IdxFd} -> + try + case position_at_idx_record_boundary(IdxFd, eof) of + {ok, Pos} when Pos >= ?IDX_HEADER_SIZE + ?INDEX_RECORD_SIZE_B -> + ReadPos = Pos - ?INDEX_RECORD_SIZE_B, + case file:pread(IdxFd, ReadPos, ?INDEX_RECORD_SIZE_B) of + {ok, <>} + when ChunkId =/= 0 orelse IdxTs =/= 0 -> + SegFile = segment_from_index_file(LastIdxFile), + case file:open(SegFile, [read, raw, binary]) of + {ok, SegFd} -> + try + case file:pread(SegFd, FilePos, ?HEADER_SIZE_B) of + {ok, <<_:32, + NumRecords:32/unsigned, + SegTs:64/signed, + _/binary>>} -> + LastOffset = ChunkId + NumRecords - 1, + Ts = if IdxTs < 1000000000000 -> SegTs; + true -> IdxTs + end, + {ok, {LastOffset, Ts}}; + _ -> + {ok, {ChunkId, IdxTs}} + end + after + file:close(SegFd) + end; + _ -> + {ok, {ChunkId, IdxTs}} + end; + _ -> + {error, empty} + end; + _ -> + {error, empty} + end + after + file:close(IdxFd) + end; + _ -> + {error, empty} + end. + +scan_index_chunks_files([], Acc) -> + {ok, lists:reverse(Acc)}; +scan_index_chunks_files([IdxFile | Rest], Acc) -> + case scan_one_index_file(IdxFile) of + {ok, Chunks} -> + scan_index_chunks_files(Rest, lists:reverse(Chunks) ++ Acc); + {error, _} = Err -> + Err + end. + +scan_one_index_file(IdxFile) -> + case file:open(IdxFile, [read, raw, binary]) of + {ok, Fd} -> + try + {ok, _} = file:position(Fd, ?IDX_HEADER_SIZE), + scan_index_records(Fd, []) + after + _ = file:close(Fd) + end; + Err -> + Err + end. + +scan_index_records(Fd, Acc) -> + case file:read(Fd, ?INDEX_RECORD_SIZE_B) of + {ok, <>} when ChunkId =/= 0 orelse Timestamp =/= 0 -> + scan_index_records(Fd, [{ChunkId, Timestamp} | Acc]); + {ok, ?ZERO_IDX_MATCH(_)} -> + scan_index_records(Fd, Acc); + {ok, _} -> + scan_index_records(Fd, Acc); + eof -> + {ok, lists:reverse(Acc)} + end. + +%% Returns [chunk closest to T25, to T50, to T75]. Chunks are ordered by offset. +%% Uses binary search per target for O(log n) lookups after O(n) list-to-tuple. +closest_chunks_to_targets(Chunks, [T25, T50, T75]) -> + Tuple = list_to_tuple(Chunks), + [closest_to_target(Tuple, T25), + closest_to_target(Tuple, T50), + closest_to_target(Tuple, T75)]. + +%% First 1-based index i such that element(i, Tuple) has offset >= Target, +%% or tuple_size(Tuple) + 1 if all offsets are < Target. +find_first_ge(Tuple, Target, Low, High) when Low < High -> + Mid = (Low + High) div 2, + {O, _} = element(Mid, Tuple), + if O >= Target -> find_first_ge(Tuple, Target, Low, Mid); + true -> find_first_ge(Tuple, Target, Mid + 1, High) + end; +find_first_ge(Tuple, Target, Low, _High) -> + {O, _} = element(Low, Tuple), + if O >= Target -> Low; true -> Low + 1 end. + +find_first_ge(Tuple, Target) -> + Size = tuple_size(Tuple), + find_first_ge(Tuple, Target, 1, Size). + +%% Chunk in Tuple whose offset is closest to Target (Chunks ordered by offset). +closest_to_target(Tuple, Target) -> + Size = tuple_size(Tuple), + Idx = find_first_ge(Tuple, Target), + if Idx =< 1 -> + element(1, Tuple); + Idx > Size -> + element(Size, Tuple); + true -> + C1 = element(Idx, Tuple), + C2 = element(Idx - 1, Tuple), + {O1, _} = C1, + {O2, _} = C2, + if abs(O1 - Target) =< abs(O2 - Target) -> C1; + true -> C2 + end + end. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index 9fa5b7bd..1dd7fb37 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -100,7 +100,12 @@ all_tests() -> read_ahead_send_file_on_off, resolve_offset_spec_empty, resolve_offset_spec_empty_directory, - resolve_offset_spec + resolve_offset_spec, + stream_offset_landmarks_empty, + stream_offset_landmarks_single_chunk, + stream_offset_landmarks_multiple_chunks, + stream_offset_landmarks_percentiles, + stream_offset_landmarks_config_map ]. groups() -> @@ -2049,6 +2054,104 @@ overview_with_missing_index_at_start(Config) -> filename:join(?config(dir, Config), "*.index")))), ok. +stream_offset_landmarks_empty(Config) -> + %% Empty log (init but no writes) and non-existent directory return {error, empty}. + LDir = ?config(leader_dir, Config), + Log0 = seed_log(LDir, [], Config), + osiris_log:close(Log0), + ?assertEqual({error, empty}, osiris_log:stream_offset_landmarks(LDir)), + NonExistent = filename:join(?config(priv_dir, Config), "stream_offset_landmarks_empty_nonexistent"), + ?assertEqual({error, empty}, osiris_log:stream_offset_landmarks(NonExistent)), + ok. + +stream_offset_landmarks_single_chunk(Config) -> + %% Single chunk: first, last, p25, p50, p75 all equal. last is the last + %% message offset (same as first when the only chunk has one record). + Now = now_ms(), + FirstTs = Now - 10000, + EpochChunks = [{2, FirstTs, [<<"one">>, <<"two">>]}], + LDir = ?config(leader_dir, Config), + Log0 = seed_log(LDir, EpochChunks, Config), + osiris_log:close(Log0), + {ok, Landmarks} = osiris_log:stream_offset_landmarks(LDir), + ?assertMatch(#{first := {0, FirstTs}, + last := {1, FirstTs}, + p25 := {0, FirstTs}, + p50 := {0, FirstTs}, + p75 := {0, FirstTs}}, Landmarks), + ok. + +stream_offset_landmarks_multiple_chunks(Config) -> + %% Multiple chunks: first < p25 <= p50 <= p75 < last (by offset). last is + %% the very last message offset in the log (last offset in the last chunk), + %% not the last chunk's first offset. Last chunk here has 2 records -> 5. + Now = now_ms(), + FirstTs = Now - 10000, + LastTs = Now - 3000, + EpochChunks = + [{1, FirstTs, [<<"one">>]}, + {1, Now - 8000, [<<"two">>]}, + {2, Now - 5000, [<<"three">>, <<"four">>]}, + {2, LastTs, [<<"five">>, <<"six">>]}], + + LDir = ?config(leader_dir, Config), + Log0 = seed_log(LDir, EpochChunks, Config), + osiris_log:close(Log0), + {ok, Landmarks} = osiris_log:stream_offset_landmarks(LDir), + #{first := First, last := Last, p25 := P25, p50 := P50, p75 := P75} = Landmarks, + {FirstOff, FirstTs} = First, + {LastOff, LastTs} = Last, + {P25Off, _} = P25, + {P50Off, _} = P50, + {P75Off, _} = P75, + ?assert(FirstOff =< P25Off), + ?assert(P25Off =< P50Off), + ?assert(P50Off =< P75Off), + ?assert(P75Off =< LastOff), + ?assertEqual(FirstOff, 0), + ?assertEqual(LastOff, 5), + ok. + +stream_offset_landmarks_percentiles(Config) -> + %% Minimum layout for non-overlapping percentiles: chunk starts at 0,1,2,3,4 + %% so Range=4, T25=1, T50=2, T75=3 each land on a distinct chunk. + Now = now_ms(), + Ts0 = Now - 10000, + Ts1 = Now - 8000, + Ts2 = Now - 5000, + Ts3 = Now - 3000, + Ts4 = Now - 1000, + EpochChunks = + [{1, Ts0, [<<"a">>]}, + {1, Ts1, [<<"b">>]}, + {1, Ts2, [<<"c">>]}, + {1, Ts3, [<<"d">>]}, + {1, Ts4, [<<"e">>]}], + + LDir = ?config(leader_dir, Config), + Log0 = seed_log(LDir, EpochChunks, Config), + osiris_log:close(Log0), + {ok, Landmarks} = osiris_log:stream_offset_landmarks(LDir), + #{first := First, last := Last, p25 := P25, p50 := P50, p75 := P75} = Landmarks, + {0, Ts0} = First, + {4, Ts4} = Last, + {1, Ts1} = P25, + {2, Ts2} = P50, + {3, Ts3} = P75. + +stream_offset_landmarks_config_map(Config) -> + %% Calling with config map #{dir => Dir} works like path. + EpochChunks = [{1, [<<"a">>]}, {1, [<<"b">>]}], + LDir = ?config(leader_dir, Config), + Log0 = seed_log(LDir, EpochChunks, Config), + osiris_log:close(Log0), + {ok, ByPath} = osiris_log:stream_offset_landmarks(LDir), + Conf = ?config(osiris_conf, Config), + RConf = Conf#{dir => LDir}, + {ok, ByConf} = osiris_log:stream_offset_landmarks(RConf), + ?assertEqual(ByPath, ByConf), + ok. + read_ahead_send_file(Config) -> RAL = 4096, %% read ahead limit HS = ?HEADER_SIZE_B, @@ -2587,7 +2690,7 @@ write_chunk(Conf, Epoch, Now, Records, Trk0, Log0) -> %% need to re-init as new epoch osiris_log:close(Log1), Log = osiris_log:init(Conf#{epoch => Epoch}), - {Trk1, osiris_log:write(lists:reverse(Records), Log)} + {Trk1, osiris_log:write(lists:reverse(Records), Now, Log)} end. now_ms() -> From 331e3634d3bfad8d3084974a685ebcbc1090edb5 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Thu, 5 Mar 2026 17:59:23 +0100 Subject: [PATCH 2/6] Refactor, including @kjnilsson feedback - Reuse existing scaning functions - Specify -via argments- the exact offset percentiles (between 0.0 and 1.0). - Use samples rather than landmarks --- src/osiris_log.erl | 267 ++++++++++++++------------------------ test/osiris_log_SUITE.erl | 48 ++++--- 2 files changed, 124 insertions(+), 191 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 71b95317..ceb74265 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -57,7 +57,7 @@ directory/1, delete_directory/1, counter_fields/0, - stream_offset_landmarks/1, + stream_offset_samples/2, last_offset_and_timestamp/1, make_counter/1, generate_log/4]). @@ -3506,62 +3506,75 @@ write_in_chunks(ToWrite, MsgsPerChunk, Msg, W0) when ToWrite > 0 -> write_in_chunks(_, _, _, W) -> W. -%% Scans all index files for the log at Dir and returns the first chunk -%% (offset + timestamp), last chunk (offset + timestamp), and the chunk -%% closest to 25%, 50% and 75% of the offset range (with offset and -%% timestamp). Percent positions may not fall on a chunk boundary, so -%% the chunk with the closest offset is chosen. --spec stream_offset_landmarks(file:filename_all() | config()) -> - {ok, #{first => {offset(), osiris:timestamp()}, - last => {offset(), osiris:timestamp()}, - p25 => {offset(), osiris:timestamp()}, - p50 => {offset(), osiris:timestamp()}, - p75 => {offset(), osiris:timestamp()}}} | - {error, empty}. -stream_offset_landmarks(#{dir := Dir}) -> - stream_offset_landmarks(Dir); -stream_offset_landmarks(Dir) when ?IS_STRING(Dir) -> - IdxFiles = sorted_index_files(Dir), - case scan_index_chunks_files(IdxFiles, []) of - {ok, []} -> +%% Returns offset samples at given fractions of the stream's offset range. +%% Fractions is a list of floats in [0.0, 1.0]: 0.0 = first, 1.0 = last; +%% values in between are linear (e.g. 0.5 = midpoint by chunk id). +%% Returns {ok, [ {offset(), timestamp()} ]} in the same order as Fractions, +%% or {error, empty}. Fractions are clamped to [0.0, 1.0]. +-spec stream_offset_samples(file:filename_all() | config(), [float()]) -> + {ok, [{offset(), osiris:timestamp()}]} | {error, empty}. +stream_offset_samples(DirOrConfig, Fractions0) when is_list(Fractions0) -> + Fractions = [normalize_fraction(F) || F <- Fractions0], + IdxFiles = sorted_index_files(DirOrConfig), + NonEmpty = non_empty_index_files(IdxFiles), + case NonEmpty of + [] -> {error, empty}; - {ok, [One]} -> - {LastOff, LastTs} = - case last_offset_and_timestamp_from_files(IdxFiles) of - {ok, L} -> L; - _ -> One - end, - {ok, #{first => One, - last => {LastOff, LastTs}, - p25 => One, - p50 => One, - p75 => One}}; - {ok, Chunks} -> - First = hd(Chunks), - LastChunk = lists:last(Chunks), - {FirstOffset, _FirstTs} = First, - {LastChunkId, _LastChunkTs} = LastChunk, - Last = case last_offset_and_timestamp_from_files(IdxFiles) of - {ok, L} -> L; - _ -> LastChunk - end, - Range = LastChunkId - FirstOffset, - Targets = case Range of - 0 -> - [FirstOffset, FirstOffset, FirstOffset]; - _ -> - [FirstOffset + (Range * 25) div 100, - FirstOffset + (Range * 50) div 100, - FirstOffset + (Range * 75) div 100] - end, - [P25, P50, P75] = closest_chunks_to_targets(Chunks, Targets), - {ok, #{first => First, - last => Last, - p25 => P25, - p50 => P50, - p75 => P75}} + _ -> + case first_and_last_seginfos0(NonEmpty) of + none -> + {error, empty}; + {_NumSegs, FstSI, LstSI} -> + First = seg_first_landmark(FstSI), + Last = seg_last_landmark(LstSI), + case {First, Last} of + {undefined, _} -> + {error, empty}; + {_, undefined} -> + {error, empty}; + {{FstChId, FstTs}, {LastOff, LastTs}} -> + #seg_info{last = LastChunk} = LstSI, + LastChunkId = LastChunk#chunk_info.id, + Range = LastChunkId - FstChId, + Targets = [target_chunk_id(FstChId, Range, F) || F <- Fractions], + Samples = case Range of + 0 -> + [ {FstChId, FstTs} || _ <- Fractions ]; + _ -> + fold_index_files_closest(NonEmpty, Targets, + {FstChId, FstTs}) + end, + %% Replace first (0.0) and last (1.0) with exact first/last + %% so 0.0 and 1.0 return true first/last offset and timestamp. + Samples2 = lists:zipwith( + fun (0.0, _) -> {FstChId, FstTs}; + (1.0, _) -> {LastOff, LastTs}; + (_, S) -> S + end, Fractions, Samples), + {ok, Samples2} + end + end end. +normalize_fraction(F) when F =< 0.0 -> 0.0; +normalize_fraction(F) when F >= 1.0 -> 1.0; +normalize_fraction(F) -> F. + +target_chunk_id(FirstChId, Range, 0.0) -> FirstChId; +target_chunk_id(FirstChId, Range, 1.0) -> FirstChId + Range; +target_chunk_id(FirstChId, Range, F) -> + FirstChId + round((Range * F)). + +seg_first_landmark(#seg_info{first = #chunk_info{id = Id, timestamp = Ts}}) -> + {Id, Ts}; +seg_first_landmark(_) -> + undefined. + +seg_last_landmark(#seg_info{last = #chunk_info{id = Id, num = Num, timestamp = Ts}}) -> + {Id + Num - 1, Ts}; +seg_last_landmark(_) -> + undefined. + %% Returns {ok, {LastOffset, Timestamp}} where LastOffset is the very last %% offset in the log (last offset in the last chunk), not the last chunk's %% first offset. Timestamp is the last chunk's timestamp. @@ -3575,139 +3588,61 @@ last_offset_and_timestamp_from_files(IdxFiles) -> [] -> {error, empty}; NonEmpty -> - LastIdxFile = lists:last(NonEmpty), - last_offset_and_timestamp_from_file(LastIdxFile) - end. - -last_offset_and_timestamp_from_file(LastIdxFile) -> - case file:open(LastIdxFile, [read, raw, binary]) of - {ok, IdxFd} -> - try - case position_at_idx_record_boundary(IdxFd, eof) of - {ok, Pos} when Pos >= ?IDX_HEADER_SIZE + ?INDEX_RECORD_SIZE_B -> - ReadPos = Pos - ?INDEX_RECORD_SIZE_B, - case file:pread(IdxFd, ReadPos, ?INDEX_RECORD_SIZE_B) of - {ok, <>} - when ChunkId =/= 0 orelse IdxTs =/= 0 -> - SegFile = segment_from_index_file(LastIdxFile), - case file:open(SegFile, [read, raw, binary]) of - {ok, SegFd} -> - try - case file:pread(SegFd, FilePos, ?HEADER_SIZE_B) of - {ok, <<_:32, - NumRecords:32/unsigned, - SegTs:64/signed, - _/binary>>} -> - LastOffset = ChunkId + NumRecords - 1, - Ts = if IdxTs < 1000000000000 -> SegTs; - true -> IdxTs - end, - {ok, {LastOffset, Ts}}; - _ -> - {ok, {ChunkId, IdxTs}} - end - after - file:close(SegFd) - end; - _ -> - {ok, {ChunkId, IdxTs}} - end; - _ -> - {error, empty} - end; - _ -> - {error, empty} - end - after - file:close(IdxFd) - end; - _ -> - {error, empty} + case first_and_last_seginfos0(NonEmpty) of + none -> + {error, empty}; + {_NumSegs, _Fst, LstSI} -> + case seg_last_landmark(LstSI) of + undefined -> + {error, empty}; + L -> + {ok, L} + end + end end. -scan_index_chunks_files([], Acc) -> - {ok, lists:reverse(Acc)}; -scan_index_chunks_files([IdxFile | Rest], Acc) -> - case scan_one_index_file(IdxFile) of - {ok, Chunks} -> - scan_index_chunks_files(Rest, lists:reverse(Chunks) ++ Acc); - {error, _} = Err -> - Err - end. +%% Single pass over index files: for each record, update the chunk closest to +%% each target. Targets and Acc are lists of the same length. Returns [Sample]. +fold_index_files_closest(IdxFiles, Targets, FirstChunk) when is_list(Targets) -> + {FstChId, _FstTs} = FirstChunk, + Acc0 = [ {FirstChunk, abs(FstChId - T)} || T <- Targets ], + Acc = lists:foldl(fun (IdxFile, A) -> + fold_one_index_file(IdxFile, Targets, A) + end, + Acc0, IdxFiles), + [ B || {B, _D} <- Acc ]. -scan_one_index_file(IdxFile) -> +fold_one_index_file(IdxFile, Targets, Acc) -> case file:open(IdxFile, [read, raw, binary]) of {ok, Fd} -> try {ok, _} = file:position(Fd, ?IDX_HEADER_SIZE), - scan_index_records(Fd, []) + fold_index_records(Fd, Targets, Acc) after _ = file:close(Fd) end; - Err -> - Err + _ -> + Acc end. -scan_index_records(Fd, Acc) -> +fold_index_records(Fd, Targets, Acc) -> case file:read(Fd, ?INDEX_RECORD_SIZE_B) of {ok, <>} when ChunkId =/= 0 orelse Timestamp =/= 0 -> - scan_index_records(Fd, [{ChunkId, Timestamp} | Acc]); + Chunk = {ChunkId, Timestamp}, + NewAcc = [ if abs(ChunkId - T) < D -> {Chunk, abs(ChunkId - T)}; + true -> {B, D} + end || {{B, D}, T} <- lists:zip(Acc, Targets) ], + fold_index_records(Fd, Targets, NewAcc); {ok, ?ZERO_IDX_MATCH(_)} -> - scan_index_records(Fd, Acc); + fold_index_records(Fd, Targets, Acc); {ok, _} -> - scan_index_records(Fd, Acc); + fold_index_records(Fd, Targets, Acc); eof -> - {ok, lists:reverse(Acc)} - end. - -%% Returns [chunk closest to T25, to T50, to T75]. Chunks are ordered by offset. -%% Uses binary search per target for O(log n) lookups after O(n) list-to-tuple. -closest_chunks_to_targets(Chunks, [T25, T50, T75]) -> - Tuple = list_to_tuple(Chunks), - [closest_to_target(Tuple, T25), - closest_to_target(Tuple, T50), - closest_to_target(Tuple, T75)]. - -%% First 1-based index i such that element(i, Tuple) has offset >= Target, -%% or tuple_size(Tuple) + 1 if all offsets are < Target. -find_first_ge(Tuple, Target, Low, High) when Low < High -> - Mid = (Low + High) div 2, - {O, _} = element(Mid, Tuple), - if O >= Target -> find_first_ge(Tuple, Target, Low, Mid); - true -> find_first_ge(Tuple, Target, Mid + 1, High) - end; -find_first_ge(Tuple, Target, Low, _High) -> - {O, _} = element(Low, Tuple), - if O >= Target -> Low; true -> Low + 1 end. - -find_first_ge(Tuple, Target) -> - Size = tuple_size(Tuple), - find_first_ge(Tuple, Target, 1, Size). - -%% Chunk in Tuple whose offset is closest to Target (Chunks ordered by offset). -closest_to_target(Tuple, Target) -> - Size = tuple_size(Tuple), - Idx = find_first_ge(Tuple, Target), - if Idx =< 1 -> - element(1, Tuple); - Idx > Size -> - element(Size, Tuple); - true -> - C1 = element(Idx, Tuple), - C2 = element(Idx - 1, Tuple), - {O1, _} = C1, - {O2, _} = C2, - if abs(O1 - Target) =< abs(O2 - Target) -> C1; - true -> C2 - end + Acc end. -ifdef(TEST). diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index 1dd7fb37..6039e160 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -101,11 +101,11 @@ all_tests() -> resolve_offset_spec_empty, resolve_offset_spec_empty_directory, resolve_offset_spec, - stream_offset_landmarks_empty, - stream_offset_landmarks_single_chunk, - stream_offset_landmarks_multiple_chunks, - stream_offset_landmarks_percentiles, - stream_offset_landmarks_config_map + stream_offset_samples_empty, + stream_offset_samples_single_chunk, + stream_offset_samples_multiple_chunks, + stream_offset_samples_percentiles, + stream_offset_samples_config_map ]. groups() -> @@ -2054,17 +2054,17 @@ overview_with_missing_index_at_start(Config) -> filename:join(?config(dir, Config), "*.index")))), ok. -stream_offset_landmarks_empty(Config) -> +stream_offset_samples_empty(Config) -> %% Empty log (init but no writes) and non-existent directory return {error, empty}. LDir = ?config(leader_dir, Config), Log0 = seed_log(LDir, [], Config), osiris_log:close(Log0), - ?assertEqual({error, empty}, osiris_log:stream_offset_landmarks(LDir)), - NonExistent = filename:join(?config(priv_dir, Config), "stream_offset_landmarks_empty_nonexistent"), - ?assertEqual({error, empty}, osiris_log:stream_offset_landmarks(NonExistent)), + ?assertEqual({error, empty}, osiris_log:stream_offset_samples(LDir, [0.0, 0.25, 0.5, 0.75, 1.0])), + NonExistent = filename:join(?config(priv_dir, Config), "stream_offset_samples_empty_nonexistent"), + ?assertEqual({error, empty}, osiris_log:stream_offset_samples(NonExistent, [0.0, 0.25, 0.5, 0.75, 1.0])), ok. -stream_offset_landmarks_single_chunk(Config) -> +stream_offset_samples_single_chunk(Config) -> %% Single chunk: first, last, p25, p50, p75 all equal. last is the last %% message offset (same as first when the only chunk has one record). Now = now_ms(), @@ -2073,15 +2073,15 @@ stream_offset_landmarks_single_chunk(Config) -> LDir = ?config(leader_dir, Config), Log0 = seed_log(LDir, EpochChunks, Config), osiris_log:close(Log0), - {ok, Landmarks} = osiris_log:stream_offset_landmarks(LDir), - ?assertMatch(#{first := {0, FirstTs}, - last := {1, FirstTs}, - p25 := {0, FirstTs}, - p50 := {0, FirstTs}, - p75 := {0, FirstTs}}, Landmarks), + {ok, [First, P25, P50, P75, Last]} = osiris_log:stream_offset_samples(LDir, [0.0, 0.25, 0.5, 0.75, 1.0]), + ?assertMatch({0, FirstTs}, First), + ?assertMatch({1, FirstTs}, Last), + ?assertMatch({0, FirstTs}, P25), + ?assertMatch({0, FirstTs}, P50), + ?assertMatch({0, FirstTs}, P75), ok. -stream_offset_landmarks_multiple_chunks(Config) -> +stream_offset_samples_multiple_chunks(Config) -> %% Multiple chunks: first < p25 <= p50 <= p75 < last (by offset). last is %% the very last message offset in the log (last offset in the last chunk), %% not the last chunk's first offset. Last chunk here has 2 records -> 5. @@ -2097,8 +2097,7 @@ stream_offset_landmarks_multiple_chunks(Config) -> LDir = ?config(leader_dir, Config), Log0 = seed_log(LDir, EpochChunks, Config), osiris_log:close(Log0), - {ok, Landmarks} = osiris_log:stream_offset_landmarks(LDir), - #{first := First, last := Last, p25 := P25, p50 := P50, p75 := P75} = Landmarks, + {ok, [First, P25, P50, P75, Last]} = osiris_log:stream_offset_samples(LDir, [0.0, 0.25, 0.5, 0.75, 1.0]), {FirstOff, FirstTs} = First, {LastOff, LastTs} = Last, {P25Off, _} = P25, @@ -2112,7 +2111,7 @@ stream_offset_landmarks_multiple_chunks(Config) -> ?assertEqual(LastOff, 5), ok. -stream_offset_landmarks_percentiles(Config) -> +stream_offset_samples_percentiles(Config) -> %% Minimum layout for non-overlapping percentiles: chunk starts at 0,1,2,3,4 %% so Range=4, T25=1, T50=2, T75=3 each land on a distinct chunk. Now = now_ms(), @@ -2131,24 +2130,23 @@ stream_offset_landmarks_percentiles(Config) -> LDir = ?config(leader_dir, Config), Log0 = seed_log(LDir, EpochChunks, Config), osiris_log:close(Log0), - {ok, Landmarks} = osiris_log:stream_offset_landmarks(LDir), - #{first := First, last := Last, p25 := P25, p50 := P50, p75 := P75} = Landmarks, + {ok, [First, P25, P50, P75, Last]} = osiris_log:stream_offset_samples(LDir, [0.0, 0.25, 0.5, 0.75, 1.0]), {0, Ts0} = First, {4, Ts4} = Last, {1, Ts1} = P25, {2, Ts2} = P50, {3, Ts3} = P75. -stream_offset_landmarks_config_map(Config) -> +stream_offset_samples_config_map(Config) -> %% Calling with config map #{dir => Dir} works like path. EpochChunks = [{1, [<<"a">>]}, {1, [<<"b">>]}], LDir = ?config(leader_dir, Config), Log0 = seed_log(LDir, EpochChunks, Config), osiris_log:close(Log0), - {ok, ByPath} = osiris_log:stream_offset_landmarks(LDir), + {ok, ByPath} = osiris_log:stream_offset_samples(LDir, [0.0, 0.25, 0.5, 0.75, 1.0]), Conf = ?config(osiris_conf, Config), RConf = Conf#{dir => LDir}, - {ok, ByConf} = osiris_log:stream_offset_landmarks(RConf), + {ok, ByConf} = osiris_log:stream_offset_samples(RConf, [0.0, 0.25, 0.5, 0.75, 1.0]), ?assertEqual(ByPath, ByConf), ok. From b9d2433abd6f69d3f1f504d0085bf762b93883e2 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Thu, 5 Mar 2026 18:20:15 +0100 Subject: [PATCH 3/6] Remove unnecessary function --- src/osiris_log.erl | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index ceb74265..04113161 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -58,7 +58,6 @@ delete_directory/1, counter_fields/0, stream_offset_samples/2, - last_offset_and_timestamp/1, make_counter/1, generate_log/4]). @@ -3575,32 +3574,6 @@ seg_last_landmark(#seg_info{last = #chunk_info{id = Id, num = Num, timestamp = T seg_last_landmark(_) -> undefined. -%% Returns {ok, {LastOffset, Timestamp}} where LastOffset is the very last -%% offset in the log (last offset in the last chunk), not the last chunk's -%% first offset. Timestamp is the last chunk's timestamp. --spec last_offset_and_timestamp(file:filename_all()) -> - {ok, {offset(), osiris:timestamp()}} | {error, empty}. -last_offset_and_timestamp(Dir) -> - last_offset_and_timestamp_from_files(sorted_index_files(Dir)). - -last_offset_and_timestamp_from_files(IdxFiles) -> - case non_empty_index_files(IdxFiles) of - [] -> - {error, empty}; - NonEmpty -> - case first_and_last_seginfos0(NonEmpty) of - none -> - {error, empty}; - {_NumSegs, _Fst, LstSI} -> - case seg_last_landmark(LstSI) of - undefined -> - {error, empty}; - L -> - {ok, L} - end - end - end. - %% Single pass over index files: for each record, update the chunk closest to %% each target. Targets and Acc are lists of the same length. Returns [Sample]. fold_index_files_closest(IdxFiles, Targets, FirstChunk) when is_list(Targets) -> From 90cde660245624055106a02689d20e5cfac002b9 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Thu, 5 Mar 2026 19:32:34 +0100 Subject: [PATCH 4/6] Fix warnings --- src/osiris_log.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 04113161..8355e288 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -3546,7 +3546,7 @@ stream_offset_samples(DirOrConfig, Fractions0) when is_list(Fractions0) -> %% Replace first (0.0) and last (1.0) with exact first/last %% so 0.0 and 1.0 return true first/last offset and timestamp. Samples2 = lists:zipwith( - fun (0.0, _) -> {FstChId, FstTs}; + fun (+0.0, _) -> {FstChId, FstTs}; (1.0, _) -> {LastOff, LastTs}; (_, S) -> S end, Fractions, Samples), @@ -3559,7 +3559,7 @@ normalize_fraction(F) when F =< 0.0 -> 0.0; normalize_fraction(F) when F >= 1.0 -> 1.0; normalize_fraction(F) -> F. -target_chunk_id(FirstChId, Range, 0.0) -> FirstChId; +target_chunk_id(FirstChId, _Range, +0.0) -> FirstChId; target_chunk_id(FirstChId, Range, 1.0) -> FirstChId + Range; target_chunk_id(FirstChId, Range, F) -> FirstChId + round((Range * F)). From 6dc72341b0276c7c26a0dcc0f3e10d121ea4f5f1 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Wed, 11 Mar 2026 11:38:45 +0100 Subject: [PATCH 5/6] Use binary-search when locating intermediate samples --- src/osiris_log.erl | 176 ++++++++++++++++++++++++++++++++------------- 1 file changed, 125 insertions(+), 51 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 8355e288..6f5c4637 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -113,6 +113,9 @@ -define(SKIP_SEARCH_JUMP, 2048). -define(DEFAULT_READ_AHEAD_LIMIT, 4096). +%% Block size for index scan in fold_index_files_closest (records per read) +-define(INDEX_READ_BLOCK_RECORDS, 1024). +-define(INDEX_READ_BLOCK_BYTES, (?INDEX_READ_BLOCK_RECORDS * ?INDEX_RECORD_SIZE_B)). %% Specification of the Log format. %% @@ -3535,21 +3538,22 @@ stream_offset_samples(DirOrConfig, Fractions0) when is_list(Fractions0) -> #seg_info{last = LastChunk} = LstSI, LastChunkId = LastChunk#chunk_info.id, Range = LastChunkId - FstChId, - Targets = [target_chunk_id(FstChId, Range, F) || F <- Fractions], - Samples = case Range of - 0 -> - [ {FstChId, FstTs} || _ <- Fractions ]; - _ -> - fold_index_files_closest(NonEmpty, Targets, - {FstChId, FstTs}) - end, - %% Replace first (0.0) and last (1.0) with exact first/last - %% so 0.0 and 1.0 return true first/last offset and timestamp. - Samples2 = lists:zipwith( - fun (+0.0, _) -> {FstChId, FstTs}; - (1.0, _) -> {LastOff, LastTs}; - (_, S) -> S - end, Fractions, Samples), + %% First and last come from seg_info; only scan index for intermediate fractions. + MiddleFractions = [F || F <- Fractions, F > 0.0, F < 1.0], + MiddleSamples = case Range of + 0 -> + [ {FstChId, FstTs} || _ <- MiddleFractions ]; + _ when MiddleFractions =:= [] -> + []; + _ -> + MiddleTargets = [target_chunk_id(FstChId, Range, F) + || F <- MiddleFractions], + fold_index_files_closest(NonEmpty, MiddleTargets, + {FstChId, FstTs}) + end, + %% Assemble result in same order as Fractions: first/last from seg_info, rest from fold. + Samples2 = assemble_offset_samples(Fractions, {FstChId, FstTs}, + {LastOff, LastTs}, MiddleSamples), {ok, Samples2} end end @@ -3559,6 +3563,14 @@ normalize_fraction(F) when F =< 0.0 -> 0.0; normalize_fraction(F) when F >= 1.0 -> 1.0; normalize_fraction(F) -> F. +assemble_offset_samples(Fractions, First, Last, MiddleSamples) -> + {Samples, _} = lists:mapfoldl( + fun (+0.0, MS) -> {First, MS}; + (1.0, MS) -> {Last, MS}; + (_, [S | MS]) -> {S, MS} + end, MiddleSamples, Fractions), + Samples. + target_chunk_id(FirstChId, _Range, +0.0) -> FirstChId; target_chunk_id(FirstChId, Range, 1.0) -> FirstChId + Range; target_chunk_id(FirstChId, Range, F) -> @@ -3574,50 +3586,112 @@ seg_last_landmark(#seg_info{last = #chunk_info{id = Id, num = Num, timestamp = T seg_last_landmark(_) -> undefined. -%% Single pass over index files: for each record, update the chunk closest to -%% each target. Targets and Acc are lists of the same length. Returns [Sample]. +%% Find chunk (offset, timestamp) closest to each target by binary search in index +%% files (ordered by ChunkId). Reads O(num_targets * log(records)) instead of all records. fold_index_files_closest(IdxFiles, Targets, FirstChunk) when is_list(Targets) -> - {FstChId, _FstTs} = FirstChunk, - Acc0 = [ {FirstChunk, abs(FstChId - T)} || T <- Targets ], - Acc = lists:foldl(fun (IdxFile, A) -> - fold_one_index_file(IdxFile, Targets, A) - end, - Acc0, IdxFiles), - [ B || {B, _D} <- Acc ]. - -fold_one_index_file(IdxFile, Targets, Acc) -> - case file:open(IdxFile, [read, raw, binary]) of - {ok, Fd} -> - try - {ok, _} = file:position(Fd, ?IDX_HEADER_SIZE), - fold_index_records(Fd, Targets, Acc) - after - _ = file:close(Fd) + case index_files_bounds(IdxFiles) of + {ok, Bounds} -> + [closest_chunk_for_target(Bounds, T, FirstChunk) || T <- Targets]; + {error, _} -> + [FirstChunk || _ <- Targets] + end. + +index_files_bounds(IdxFiles) -> + R = [index_file_bounds_one(F) || F <- IdxFiles], + case lists:keyfind(error, 1, R) of + false -> {ok, [B || {ok, B} <- R]}; + _ -> {error, bounds} + end. + +index_file_bounds_one(IdxFile) -> + case file_size(IdxFile) of + Size when Size > ?IDX_HEADER_SIZE + ?INDEX_RECORD_SIZE_B - 1 -> + NumRecords = (Size - ?IDX_HEADER_SIZE) div ?INDEX_RECORD_SIZE_B, + case read_idx_chunk_at(IdxFile, 0) of + {ok, {FstChId, FstTs}} -> + case read_idx_chunk_at(IdxFile, NumRecords - 1) of + {ok, {LstChId, LstTs}} -> + {ok, {IdxFile, FstChId, FstTs, LstChId, LstTs, NumRecords}}; + _ -> + {error, IdxFile} + end; + _ -> + {error, IdxFile} end; _ -> - Acc + {error, IdxFile} end. -fold_index_records(Fd, Targets, Acc) -> - case file:read(Fd, ?INDEX_RECORD_SIZE_B) of - {ok, <>} when ChunkId =/= 0 orelse Timestamp =/= 0 -> - Chunk = {ChunkId, Timestamp}, - NewAcc = [ if abs(ChunkId - T) < D -> {Chunk, abs(ChunkId - T)}; - true -> {B, D} - end || {{B, D}, T} <- lists:zip(Acc, Targets) ], - fold_index_records(Fd, Targets, NewAcc); +read_idx_chunk_at(IdxFile, RecordIndex) when RecordIndex >= 0 -> + Pos = ?IDX_HEADER_SIZE + RecordIndex * ?INDEX_RECORD_SIZE_B, + case file:pread(IdxFile, Pos, ?INDEX_RECORD_SIZE_B) of + {ok, <>} + when ChunkId =/= 0 orelse Ts =/= 0 -> + {ok, {ChunkId, Ts}}; {ok, ?ZERO_IDX_MATCH(_)} -> - fold_index_records(Fd, Targets, Acc); - {ok, _} -> - fold_index_records(Fd, Targets, Acc); - eof -> - Acc + {ok, zero}; + X -> + X + end. + +closest_chunk_for_target(Bounds, Target, FirstChunk) when Bounds =/= [] -> + {F1, Ft1} = FirstChunk, + [{_IdxFile1, F1, Ft1, _L1, _Lt1, _N1} | _] = Bounds, + Last = lists:last(Bounds), + {_IdxFileK, _Fk, _Ftk, Lk, Ltk, _Nk} = Last, + if + Target =< F1 -> {F1, Ft1}; + Target >= Lk -> {Lk, Ltk}; + true -> + case find_file_for_target(Bounds, Target) of + {single, IdxFile, F, Ft, L, Lt, NumRec} -> + bsearch_closest_in_file(IdxFile, Target, NumRec, F, Ft, L, Lt); + {between, {Ca, Ta}, {Cb, Tb}} -> + if abs(Ca - Target) =< abs(Cb - Target) -> {Ca, Ta}; + true -> {Cb, Tb} + end + end + end. + +find_file_for_target([{IdxFile, F, Ft, L, Lt, N}], _Target) -> + {single, IdxFile, F, Ft, L, Lt, N}; +find_file_for_target([{IdxFile, F, Ft, L, Lt, N} | Rest], Target) -> + if + Target >= F, Target =< L -> {single, IdxFile, F, Ft, L, Lt, N}; + true -> + [{_NextFile, F2, Ft2, _L2, _Lt2, _N2} | _] = Rest, + if + Target > L, Target < F2 -> {between, {L, Lt}, {F2, Ft2}}; + true -> find_file_for_target(Rest, Target) + end end. +bsearch_closest_in_file(IdxFile, Target, NumRecords, _FirstChunkId, _FirstTs, _LastChunkId, _LastTs) -> + %% Largest record index with ChunkId =< Target + Idx = bsearch_lower(IdxFile, Target, 0, NumRecords - 1), + {ok, {C0, T0}} = read_idx_chunk_at(IdxFile, Idx), + if + Idx + 1 >= NumRecords -> {C0, T0}; + true -> + {ok, {C1, T1}} = read_idx_chunk_at(IdxFile, Idx + 1), + if abs(C0 - Target) =< abs(C1 - Target) -> {C0, T0}; + true -> {C1, T1} + end + end. + +bsearch_lower(IdxFile, Target, Low, High) when Low =< High -> + Mid = (Low + High) div 2, + case read_idx_chunk_at(IdxFile, Mid) of + {ok, {ChunkId, _}} when ChunkId =< Target -> + bsearch_lower(IdxFile, Target, Mid + 1, High); + {ok, zero} -> + bsearch_lower(IdxFile, Target, Mid + 1, High); + _ -> + bsearch_lower(IdxFile, Target, Low, Mid - 1) + end; +bsearch_lower(_IdxFile, _Target, Low, _High) -> + max(0, Low - 1). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). From 19036044fb19d4b34de1b61d8354f97704f5b591 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Wed, 11 Mar 2026 11:57:08 +0100 Subject: [PATCH 6/6] Fix issue related to access files --- src/osiris_log.erl | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 6f5c4637..71f13d72 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -3624,12 +3624,21 @@ index_file_bounds_one(IdxFile) -> read_idx_chunk_at(IdxFile, RecordIndex) when RecordIndex >= 0 -> Pos = ?IDX_HEADER_SIZE + RecordIndex * ?INDEX_RECORD_SIZE_B, - case file:pread(IdxFile, Pos, ?INDEX_RECORD_SIZE_B) of - {ok, <>} - when ChunkId =/= 0 orelse Ts =/= 0 -> - {ok, {ChunkId, Ts}}; - {ok, ?ZERO_IDX_MATCH(_)} -> - {ok, zero}; + case file:open(IdxFile, [read, raw, binary]) of + {ok, Fd} -> + try + case file:pread(Fd, Pos, ?INDEX_RECORD_SIZE_B) of + {ok, <>} + when ChunkId =/= 0 orelse Ts =/= 0 -> + {ok, {ChunkId, Ts}}; + {ok, ?ZERO_IDX_MATCH(_)} -> + {ok, zero}; + X -> + X + end + after + _ = file:close(Fd) + end; X -> X end.