diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 95c8ab24..71f13d72 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -57,6 +57,7 @@ directory/1, delete_directory/1, counter_fields/0, + stream_offset_samples/2, make_counter/1, generate_log/4]). @@ -112,6 +113,9 @@ -define(SKIP_SEARCH_JUMP, 2048). -define(DEFAULT_READ_AHEAD_LIMIT, 4096). +%% Block size for index scan in fold_index_files_closest (records per read) +-define(INDEX_READ_BLOCK_RECORDS, 1024). +-define(INDEX_READ_BLOCK_BYTES, (?INDEX_READ_BLOCK_RECORDS * ?INDEX_RECORD_SIZE_B)). %% Specification of the Log format. %% @@ -3504,6 +3508,199 @@ write_in_chunks(ToWrite, MsgsPerChunk, Msg, W0) when ToWrite > 0 -> write_in_chunks(_, _, _, W) -> W. +%% Returns offset samples at given fractions of the stream's offset range. +%% Fractions is a list of floats in [0.0, 1.0]: 0.0 = first, 1.0 = last; +%% values in between are linear (e.g. 0.5 = midpoint by chunk id). +%% Returns {ok, [ {offset(), timestamp()} ]} in the same order as Fractions, +%% or {error, empty}. Fractions are clamped to [0.0, 1.0]. +-spec stream_offset_samples(file:filename_all() | config(), [float()]) -> + {ok, [{offset(), osiris:timestamp()}]} | {error, empty}. +stream_offset_samples(DirOrConfig, Fractions0) when is_list(Fractions0) -> + Fractions = [normalize_fraction(F) || F <- Fractions0], + IdxFiles = sorted_index_files(DirOrConfig), + NonEmpty = non_empty_index_files(IdxFiles), + case NonEmpty of + [] -> + {error, empty}; + _ -> + case first_and_last_seginfos0(NonEmpty) of + none -> + {error, empty}; + {_NumSegs, FstSI, LstSI} -> + First = seg_first_landmark(FstSI), + Last = seg_last_landmark(LstSI), + case {First, Last} of + {undefined, _} -> + {error, empty}; + {_, undefined} -> + {error, empty}; + {{FstChId, FstTs}, {LastOff, LastTs}} -> + #seg_info{last = LastChunk} = LstSI, + LastChunkId = LastChunk#chunk_info.id, + Range = LastChunkId - FstChId, + %% First and last come from seg_info; only scan index for intermediate fractions. + MiddleFractions = [F || F <- Fractions, F > 0.0, F < 1.0], + MiddleSamples = case Range of + 0 -> + [ {FstChId, FstTs} || _ <- MiddleFractions ]; + _ when MiddleFractions =:= [] -> + []; + _ -> + MiddleTargets = [target_chunk_id(FstChId, Range, F) + || F <- MiddleFractions], + fold_index_files_closest(NonEmpty, MiddleTargets, + {FstChId, FstTs}) + end, + %% Assemble result in same order as Fractions: first/last from seg_info, rest from fold. + Samples2 = assemble_offset_samples(Fractions, {FstChId, FstTs}, + {LastOff, LastTs}, MiddleSamples), + {ok, Samples2} + end + end + end. + +normalize_fraction(F) when F =< 0.0 -> 0.0; +normalize_fraction(F) when F >= 1.0 -> 1.0; +normalize_fraction(F) -> F. + +assemble_offset_samples(Fractions, First, Last, MiddleSamples) -> + {Samples, _} = lists:mapfoldl( + fun (+0.0, MS) -> {First, MS}; + (1.0, MS) -> {Last, MS}; + (_, [S | MS]) -> {S, MS} + end, MiddleSamples, Fractions), + Samples. + +target_chunk_id(FirstChId, _Range, +0.0) -> FirstChId; +target_chunk_id(FirstChId, Range, 1.0) -> FirstChId + Range; +target_chunk_id(FirstChId, Range, F) -> + FirstChId + round((Range * F)). + +seg_first_landmark(#seg_info{first = #chunk_info{id = Id, timestamp = Ts}}) -> + {Id, Ts}; +seg_first_landmark(_) -> + undefined. + +seg_last_landmark(#seg_info{last = #chunk_info{id = Id, num = Num, timestamp = Ts}}) -> + {Id + Num - 1, Ts}; +seg_last_landmark(_) -> + undefined. + +%% Find chunk (offset, timestamp) closest to each target by binary search in index +%% files (ordered by ChunkId). Reads O(num_targets * log(records)) instead of all records. +fold_index_files_closest(IdxFiles, Targets, FirstChunk) when is_list(Targets) -> + case index_files_bounds(IdxFiles) of + {ok, Bounds} -> + [closest_chunk_for_target(Bounds, T, FirstChunk) || T <- Targets]; + {error, _} -> + [FirstChunk || _ <- Targets] + end. + +index_files_bounds(IdxFiles) -> + R = [index_file_bounds_one(F) || F <- IdxFiles], + case lists:keyfind(error, 1, R) of + false -> {ok, [B || {ok, B} <- R]}; + _ -> {error, bounds} + end. + +index_file_bounds_one(IdxFile) -> + case file_size(IdxFile) of + Size when Size > ?IDX_HEADER_SIZE + ?INDEX_RECORD_SIZE_B - 1 -> + NumRecords = (Size - ?IDX_HEADER_SIZE) div ?INDEX_RECORD_SIZE_B, + case read_idx_chunk_at(IdxFile, 0) of + {ok, {FstChId, FstTs}} -> + case read_idx_chunk_at(IdxFile, NumRecords - 1) of + {ok, {LstChId, LstTs}} -> + {ok, {IdxFile, FstChId, FstTs, LstChId, LstTs, NumRecords}}; + _ -> + {error, IdxFile} + end; + _ -> + {error, IdxFile} + end; + _ -> + {error, IdxFile} + end. + +read_idx_chunk_at(IdxFile, RecordIndex) when RecordIndex >= 0 -> + Pos = ?IDX_HEADER_SIZE + RecordIndex * ?INDEX_RECORD_SIZE_B, + case file:open(IdxFile, [read, raw, binary]) of + {ok, Fd} -> + try + case file:pread(Fd, Pos, ?INDEX_RECORD_SIZE_B) of + {ok, <>} + when ChunkId =/= 0 orelse Ts =/= 0 -> + {ok, {ChunkId, Ts}}; + {ok, ?ZERO_IDX_MATCH(_)} -> + {ok, zero}; + X -> + X + end + after + _ = file:close(Fd) + end; + X -> + X + end. + +closest_chunk_for_target(Bounds, Target, FirstChunk) when Bounds =/= [] -> + {F1, Ft1} = FirstChunk, + [{_IdxFile1, F1, Ft1, _L1, _Lt1, _N1} | _] = Bounds, + Last = lists:last(Bounds), + {_IdxFileK, _Fk, _Ftk, Lk, Ltk, _Nk} = Last, + if + Target =< F1 -> {F1, Ft1}; + Target >= Lk -> {Lk, Ltk}; + true -> + case find_file_for_target(Bounds, Target) of + {single, IdxFile, F, Ft, L, Lt, NumRec} -> + bsearch_closest_in_file(IdxFile, Target, NumRec, F, Ft, L, Lt); + {between, {Ca, Ta}, {Cb, Tb}} -> + if abs(Ca - Target) =< abs(Cb - Target) -> {Ca, Ta}; + true -> {Cb, Tb} + end + end + end. + +find_file_for_target([{IdxFile, F, Ft, L, Lt, N}], _Target) -> + {single, IdxFile, F, Ft, L, Lt, N}; +find_file_for_target([{IdxFile, F, Ft, L, Lt, N} | Rest], Target) -> + if + Target >= F, Target =< L -> {single, IdxFile, F, Ft, L, Lt, N}; + true -> + [{_NextFile, F2, Ft2, _L2, _Lt2, _N2} | _] = Rest, + if + Target > L, Target < F2 -> {between, {L, Lt}, {F2, Ft2}}; + true -> find_file_for_target(Rest, Target) + end + end. + +bsearch_closest_in_file(IdxFile, Target, NumRecords, _FirstChunkId, _FirstTs, _LastChunkId, _LastTs) -> + %% Largest record index with ChunkId =< Target + Idx = bsearch_lower(IdxFile, Target, 0, NumRecords - 1), + {ok, {C0, T0}} = read_idx_chunk_at(IdxFile, Idx), + if + Idx + 1 >= NumRecords -> {C0, T0}; + true -> + {ok, {C1, T1}} = read_idx_chunk_at(IdxFile, Idx + 1), + if abs(C0 - Target) =< abs(C1 - Target) -> {C0, T0}; + true -> {C1, T1} + end + end. + +bsearch_lower(IdxFile, Target, Low, High) when Low =< High -> + Mid = (Low + High) div 2, + case read_idx_chunk_at(IdxFile, Mid) of + {ok, {ChunkId, _}} when ChunkId =< Target -> + bsearch_lower(IdxFile, Target, Mid + 1, High); + {ok, zero} -> + bsearch_lower(IdxFile, Target, Mid + 1, High); + _ -> + bsearch_lower(IdxFile, Target, Low, Mid - 1) + end; +bsearch_lower(_IdxFile, _Target, Low, _High) -> + max(0, Low - 1). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index 9fa5b7bd..6039e160 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -100,7 +100,12 @@ all_tests() -> read_ahead_send_file_on_off, resolve_offset_spec_empty, resolve_offset_spec_empty_directory, - resolve_offset_spec + resolve_offset_spec, + stream_offset_samples_empty, + stream_offset_samples_single_chunk, + stream_offset_samples_multiple_chunks, + stream_offset_samples_percentiles, + stream_offset_samples_config_map ]. groups() -> @@ -2049,6 +2054,102 @@ overview_with_missing_index_at_start(Config) -> filename:join(?config(dir, Config), "*.index")))), ok. +stream_offset_samples_empty(Config) -> + %% Empty log (init but no writes) and non-existent directory return {error, empty}. + LDir = ?config(leader_dir, Config), + Log0 = seed_log(LDir, [], Config), + osiris_log:close(Log0), + ?assertEqual({error, empty}, osiris_log:stream_offset_samples(LDir, [0.0, 0.25, 0.5, 0.75, 1.0])), + NonExistent = filename:join(?config(priv_dir, Config), "stream_offset_samples_empty_nonexistent"), + ?assertEqual({error, empty}, osiris_log:stream_offset_samples(NonExistent, [0.0, 0.25, 0.5, 0.75, 1.0])), + ok. + +stream_offset_samples_single_chunk(Config) -> + %% Single chunk: first, last, p25, p50, p75 all equal. last is the last + %% message offset (same as first when the only chunk has one record). + Now = now_ms(), + FirstTs = Now - 10000, + EpochChunks = [{2, FirstTs, [<<"one">>, <<"two">>]}], + LDir = ?config(leader_dir, Config), + Log0 = seed_log(LDir, EpochChunks, Config), + osiris_log:close(Log0), + {ok, [First, P25, P50, P75, Last]} = osiris_log:stream_offset_samples(LDir, [0.0, 0.25, 0.5, 0.75, 1.0]), + ?assertMatch({0, FirstTs}, First), + ?assertMatch({1, FirstTs}, Last), + ?assertMatch({0, FirstTs}, P25), + ?assertMatch({0, FirstTs}, P50), + ?assertMatch({0, FirstTs}, P75), + ok. + +stream_offset_samples_multiple_chunks(Config) -> + %% Multiple chunks: first < p25 <= p50 <= p75 < last (by offset). last is + %% the very last message offset in the log (last offset in the last chunk), + %% not the last chunk's first offset. Last chunk here has 2 records -> 5. + Now = now_ms(), + FirstTs = Now - 10000, + LastTs = Now - 3000, + EpochChunks = + [{1, FirstTs, [<<"one">>]}, + {1, Now - 8000, [<<"two">>]}, + {2, Now - 5000, [<<"three">>, <<"four">>]}, + {2, LastTs, [<<"five">>, <<"six">>]}], + + LDir = ?config(leader_dir, Config), + Log0 = seed_log(LDir, EpochChunks, Config), + osiris_log:close(Log0), + {ok, [First, P25, P50, P75, Last]} = osiris_log:stream_offset_samples(LDir, [0.0, 0.25, 0.5, 0.75, 1.0]), + {FirstOff, FirstTs} = First, + {LastOff, LastTs} = Last, + {P25Off, _} = P25, + {P50Off, _} = P50, + {P75Off, _} = P75, + ?assert(FirstOff =< P25Off), + ?assert(P25Off =< P50Off), + ?assert(P50Off =< P75Off), + ?assert(P75Off =< LastOff), + ?assertEqual(FirstOff, 0), + ?assertEqual(LastOff, 5), + ok. + +stream_offset_samples_percentiles(Config) -> + %% Minimum layout for non-overlapping percentiles: chunk starts at 0,1,2,3,4 + %% so Range=4, T25=1, T50=2, T75=3 each land on a distinct chunk. + Now = now_ms(), + Ts0 = Now - 10000, + Ts1 = Now - 8000, + Ts2 = Now - 5000, + Ts3 = Now - 3000, + Ts4 = Now - 1000, + EpochChunks = + [{1, Ts0, [<<"a">>]}, + {1, Ts1, [<<"b">>]}, + {1, Ts2, [<<"c">>]}, + {1, Ts3, [<<"d">>]}, + {1, Ts4, [<<"e">>]}], + + LDir = ?config(leader_dir, Config), + Log0 = seed_log(LDir, EpochChunks, Config), + osiris_log:close(Log0), + {ok, [First, P25, P50, P75, Last]} = osiris_log:stream_offset_samples(LDir, [0.0, 0.25, 0.5, 0.75, 1.0]), + {0, Ts0} = First, + {4, Ts4} = Last, + {1, Ts1} = P25, + {2, Ts2} = P50, + {3, Ts3} = P75. + +stream_offset_samples_config_map(Config) -> + %% Calling with config map #{dir => Dir} works like path. + EpochChunks = [{1, [<<"a">>]}, {1, [<<"b">>]}], + LDir = ?config(leader_dir, Config), + Log0 = seed_log(LDir, EpochChunks, Config), + osiris_log:close(Log0), + {ok, ByPath} = osiris_log:stream_offset_samples(LDir, [0.0, 0.25, 0.5, 0.75, 1.0]), + Conf = ?config(osiris_conf, Config), + RConf = Conf#{dir => LDir}, + {ok, ByConf} = osiris_log:stream_offset_samples(RConf, [0.0, 0.25, 0.5, 0.75, 1.0]), + ?assertEqual(ByPath, ByConf), + ok. + read_ahead_send_file(Config) -> RAL = 4096, %% read ahead limit HS = ?HEADER_SIZE_B, @@ -2587,7 +2688,7 @@ write_chunk(Conf, Epoch, Now, Records, Trk0, Log0) -> %% need to re-init as new epoch osiris_log:close(Log1), Log = osiris_log:init(Conf#{epoch => Epoch}), - {Trk1, osiris_log:write(lists:reverse(Records), Log)} + {Trk1, osiris_log:write(lists:reverse(Records), Now, Log)} end. now_ms() ->