Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 197 additions & 0 deletions src/osiris_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
directory/1,
delete_directory/1,
counter_fields/0,
stream_offset_samples/2,
make_counter/1,
generate_log/4]).

Expand Down Expand Up @@ -112,6 +113,9 @@

-define(SKIP_SEARCH_JUMP, 2048).
-define(DEFAULT_READ_AHEAD_LIMIT, 4096).
%% Block size for index scan in fold_index_files_closest (records per read)
-define(INDEX_READ_BLOCK_RECORDS, 1024).
-define(INDEX_READ_BLOCK_BYTES, (?INDEX_READ_BLOCK_RECORDS * ?INDEX_RECORD_SIZE_B)).

%% Specification of the Log format.
%%
Expand Down Expand Up @@ -3504,6 +3508,199 @@ write_in_chunks(ToWrite, MsgsPerChunk, Msg, W0) when ToWrite > 0 ->
write_in_chunks(_, _, _, W) ->
W.

%% Returns offset samples at given fractions of the stream's offset range.
%% Fractions is a list of floats in [0.0, 1.0]: 0.0 = first, 1.0 = last;
%% values in between are linear (e.g. 0.5 = midpoint by chunk id).
%% Returns {ok, [ {offset(), timestamp()} ]} in the same order as Fractions,
%% or {error, empty}. Fractions are clamped to [0.0, 1.0].
-spec stream_offset_samples(file:filename_all() | config(), [float()]) ->
{ok, [{offset(), osiris:timestamp()}]} | {error, empty}.
stream_offset_samples(DirOrConfig, Fractions0) when is_list(Fractions0) ->
Fractions = [normalize_fraction(F) || F <- Fractions0],
IdxFiles = sorted_index_files(DirOrConfig),
NonEmpty = non_empty_index_files(IdxFiles),
case NonEmpty of
[] ->
{error, empty};
_ ->
case first_and_last_seginfos0(NonEmpty) of
none ->
{error, empty};
{_NumSegs, FstSI, LstSI} ->
First = seg_first_landmark(FstSI),
Last = seg_last_landmark(LstSI),
case {First, Last} of
{undefined, _} ->
{error, empty};
{_, undefined} ->
{error, empty};
{{FstChId, FstTs}, {LastOff, LastTs}} ->
#seg_info{last = LastChunk} = LstSI,
LastChunkId = LastChunk#chunk_info.id,
Range = LastChunkId - FstChId,
%% First and last come from seg_info; only scan index for intermediate fractions.
MiddleFractions = [F || F <- Fractions, F > 0.0, F < 1.0],
MiddleSamples = case Range of
0 ->
[ {FstChId, FstTs} || _ <- MiddleFractions ];
_ when MiddleFractions =:= [] ->
[];
_ ->
MiddleTargets = [target_chunk_id(FstChId, Range, F)
|| F <- MiddleFractions],
fold_index_files_closest(NonEmpty, MiddleTargets,
{FstChId, FstTs})
end,
%% Assemble result in same order as Fractions: first/last from seg_info, rest from fold.
Samples2 = assemble_offset_samples(Fractions, {FstChId, FstTs},
{LastOff, LastTs}, MiddleSamples),
{ok, Samples2}
end
end
end.

normalize_fraction(F) when F =< 0.0 -> 0.0;
normalize_fraction(F) when F >= 1.0 -> 1.0;
normalize_fraction(F) -> F.

assemble_offset_samples(Fractions, First, Last, MiddleSamples) ->
{Samples, _} = lists:mapfoldl(
fun (+0.0, MS) -> {First, MS};
(1.0, MS) -> {Last, MS};
(_, [S | MS]) -> {S, MS}
end, MiddleSamples, Fractions),
Samples.

target_chunk_id(FirstChId, _Range, +0.0) -> FirstChId;
target_chunk_id(FirstChId, Range, 1.0) -> FirstChId + Range;
target_chunk_id(FirstChId, Range, F) ->
FirstChId + round((Range * F)).

seg_first_landmark(#seg_info{first = #chunk_info{id = Id, timestamp = Ts}}) ->
{Id, Ts};
seg_first_landmark(_) ->
undefined.

seg_last_landmark(#seg_info{last = #chunk_info{id = Id, num = Num, timestamp = Ts}}) ->
{Id + Num - 1, Ts};
seg_last_landmark(_) ->
undefined.

%% Find chunk (offset, timestamp) closest to each target by binary search in index
%% files (ordered by ChunkId). Reads O(num_targets * log(records)) instead of all records.
fold_index_files_closest(IdxFiles, Targets, FirstChunk) when is_list(Targets) ->
case index_files_bounds(IdxFiles) of
{ok, Bounds} ->
[closest_chunk_for_target(Bounds, T, FirstChunk) || T <- Targets];
{error, _} ->
[FirstChunk || _ <- Targets]
end.

index_files_bounds(IdxFiles) ->
R = [index_file_bounds_one(F) || F <- IdxFiles],
case lists:keyfind(error, 1, R) of
false -> {ok, [B || {ok, B} <- R]};
_ -> {error, bounds}
end.

index_file_bounds_one(IdxFile) ->
case file_size(IdxFile) of
Size when Size > ?IDX_HEADER_SIZE + ?INDEX_RECORD_SIZE_B - 1 ->
NumRecords = (Size - ?IDX_HEADER_SIZE) div ?INDEX_RECORD_SIZE_B,
case read_idx_chunk_at(IdxFile, 0) of
{ok, {FstChId, FstTs}} ->
case read_idx_chunk_at(IdxFile, NumRecords - 1) of
{ok, {LstChId, LstTs}} ->
{ok, {IdxFile, FstChId, FstTs, LstChId, LstTs, NumRecords}};
_ ->
{error, IdxFile}
end;
_ ->
{error, IdxFile}
end;
_ ->
{error, IdxFile}
end.

read_idx_chunk_at(IdxFile, RecordIndex) when RecordIndex >= 0 ->
Pos = ?IDX_HEADER_SIZE + RecordIndex * ?INDEX_RECORD_SIZE_B,
case file:open(IdxFile, [read, raw, binary]) of
{ok, Fd} ->
try
case file:pread(Fd, Pos, ?INDEX_RECORD_SIZE_B) of
{ok, <<ChunkId:64/unsigned, Ts:64/signed, _/binary>>}
when ChunkId =/= 0 orelse Ts =/= 0 ->
{ok, {ChunkId, Ts}};
{ok, ?ZERO_IDX_MATCH(_)} ->
{ok, zero};
X ->
X
end
after
_ = file:close(Fd)
end;
X ->
X
end.

closest_chunk_for_target(Bounds, Target, FirstChunk) when Bounds =/= [] ->
{F1, Ft1} = FirstChunk,
[{_IdxFile1, F1, Ft1, _L1, _Lt1, _N1} | _] = Bounds,
Last = lists:last(Bounds),
{_IdxFileK, _Fk, _Ftk, Lk, Ltk, _Nk} = Last,
if
Target =< F1 -> {F1, Ft1};
Target >= Lk -> {Lk, Ltk};
true ->
case find_file_for_target(Bounds, Target) of
{single, IdxFile, F, Ft, L, Lt, NumRec} ->
bsearch_closest_in_file(IdxFile, Target, NumRec, F, Ft, L, Lt);
{between, {Ca, Ta}, {Cb, Tb}} ->
if abs(Ca - Target) =< abs(Cb - Target) -> {Ca, Ta};
true -> {Cb, Tb}
end
end
end.

find_file_for_target([{IdxFile, F, Ft, L, Lt, N}], _Target) ->
{single, IdxFile, F, Ft, L, Lt, N};
find_file_for_target([{IdxFile, F, Ft, L, Lt, N} | Rest], Target) ->
if
Target >= F, Target =< L -> {single, IdxFile, F, Ft, L, Lt, N};
true ->
[{_NextFile, F2, Ft2, _L2, _Lt2, _N2} | _] = Rest,
if
Target > L, Target < F2 -> {between, {L, Lt}, {F2, Ft2}};
true -> find_file_for_target(Rest, Target)
end
end.

bsearch_closest_in_file(IdxFile, Target, NumRecords, _FirstChunkId, _FirstTs, _LastChunkId, _LastTs) ->
%% Largest record index with ChunkId =< Target
Idx = bsearch_lower(IdxFile, Target, 0, NumRecords - 1),
{ok, {C0, T0}} = read_idx_chunk_at(IdxFile, Idx),
if
Idx + 1 >= NumRecords -> {C0, T0};
true ->
{ok, {C1, T1}} = read_idx_chunk_at(IdxFile, Idx + 1),
if abs(C0 - Target) =< abs(C1 - Target) -> {C0, T0};
true -> {C1, T1}
end
end.

bsearch_lower(IdxFile, Target, Low, High) when Low =< High ->
Mid = (Low + High) div 2,
case read_idx_chunk_at(IdxFile, Mid) of
{ok, {ChunkId, _}} when ChunkId =< Target ->
bsearch_lower(IdxFile, Target, Mid + 1, High);
{ok, zero} ->
bsearch_lower(IdxFile, Target, Mid + 1, High);
_ ->
bsearch_lower(IdxFile, Target, Low, Mid - 1)
end;
bsearch_lower(_IdxFile, _Target, Low, _High) ->
max(0, Low - 1).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

Expand Down
105 changes: 103 additions & 2 deletions test/osiris_log_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,12 @@ all_tests() ->
read_ahead_send_file_on_off,
resolve_offset_spec_empty,
resolve_offset_spec_empty_directory,
resolve_offset_spec
resolve_offset_spec,
stream_offset_samples_empty,
stream_offset_samples_single_chunk,
stream_offset_samples_multiple_chunks,
stream_offset_samples_percentiles,
stream_offset_samples_config_map
].

groups() ->
Expand Down Expand Up @@ -2049,6 +2054,102 @@ overview_with_missing_index_at_start(Config) ->
filename:join(?config(dir, Config), "*.index")))),
ok.

stream_offset_samples_empty(Config) ->
%% Empty log (init but no writes) and non-existent directory return {error, empty}.
LDir = ?config(leader_dir, Config),
Log0 = seed_log(LDir, [], Config),
osiris_log:close(Log0),
?assertEqual({error, empty}, osiris_log:stream_offset_samples(LDir, [0.0, 0.25, 0.5, 0.75, 1.0])),
NonExistent = filename:join(?config(priv_dir, Config), "stream_offset_samples_empty_nonexistent"),
?assertEqual({error, empty}, osiris_log:stream_offset_samples(NonExistent, [0.0, 0.25, 0.5, 0.75, 1.0])),
ok.

stream_offset_samples_single_chunk(Config) ->
%% Single chunk: first, last, p25, p50, p75 all equal. last is the last
%% message offset (same as first when the only chunk has one record).
Now = now_ms(),
FirstTs = Now - 10000,
EpochChunks = [{2, FirstTs, [<<"one">>, <<"two">>]}],
LDir = ?config(leader_dir, Config),
Log0 = seed_log(LDir, EpochChunks, Config),
osiris_log:close(Log0),
{ok, [First, P25, P50, P75, Last]} = osiris_log:stream_offset_samples(LDir, [0.0, 0.25, 0.5, 0.75, 1.0]),
?assertMatch({0, FirstTs}, First),
?assertMatch({1, FirstTs}, Last),
?assertMatch({0, FirstTs}, P25),
?assertMatch({0, FirstTs}, P50),
?assertMatch({0, FirstTs}, P75),
ok.

stream_offset_samples_multiple_chunks(Config) ->
%% Multiple chunks: first < p25 <= p50 <= p75 < last (by offset). last is
%% the very last message offset in the log (last offset in the last chunk),
%% not the last chunk's first offset. Last chunk here has 2 records -> 5.
Now = now_ms(),
FirstTs = Now - 10000,
LastTs = Now - 3000,
EpochChunks =
[{1, FirstTs, [<<"one">>]},
{1, Now - 8000, [<<"two">>]},
{2, Now - 5000, [<<"three">>, <<"four">>]},
{2, LastTs, [<<"five">>, <<"six">>]}],

LDir = ?config(leader_dir, Config),
Log0 = seed_log(LDir, EpochChunks, Config),
osiris_log:close(Log0),
{ok, [First, P25, P50, P75, Last]} = osiris_log:stream_offset_samples(LDir, [0.0, 0.25, 0.5, 0.75, 1.0]),
{FirstOff, FirstTs} = First,
{LastOff, LastTs} = Last,
{P25Off, _} = P25,
{P50Off, _} = P50,
{P75Off, _} = P75,
?assert(FirstOff =< P25Off),
?assert(P25Off =< P50Off),
?assert(P50Off =< P75Off),
?assert(P75Off =< LastOff),
?assertEqual(FirstOff, 0),
?assertEqual(LastOff, 5),
ok.

stream_offset_samples_percentiles(Config) ->
%% Minimum layout for non-overlapping percentiles: chunk starts at 0,1,2,3,4
%% so Range=4, T25=1, T50=2, T75=3 each land on a distinct chunk.
Now = now_ms(),
Ts0 = Now - 10000,
Ts1 = Now - 8000,
Ts2 = Now - 5000,
Ts3 = Now - 3000,
Ts4 = Now - 1000,
EpochChunks =
[{1, Ts0, [<<"a">>]},
{1, Ts1, [<<"b">>]},
{1, Ts2, [<<"c">>]},
{1, Ts3, [<<"d">>]},
{1, Ts4, [<<"e">>]}],

LDir = ?config(leader_dir, Config),
Log0 = seed_log(LDir, EpochChunks, Config),
osiris_log:close(Log0),
{ok, [First, P25, P50, P75, Last]} = osiris_log:stream_offset_samples(LDir, [0.0, 0.25, 0.5, 0.75, 1.0]),
{0, Ts0} = First,
{4, Ts4} = Last,
{1, Ts1} = P25,
{2, Ts2} = P50,
{3, Ts3} = P75.

stream_offset_samples_config_map(Config) ->
%% Calling with config map #{dir => Dir} works like path.
EpochChunks = [{1, [<<"a">>]}, {1, [<<"b">>]}],
LDir = ?config(leader_dir, Config),
Log0 = seed_log(LDir, EpochChunks, Config),
osiris_log:close(Log0),
{ok, ByPath} = osiris_log:stream_offset_samples(LDir, [0.0, 0.25, 0.5, 0.75, 1.0]),
Conf = ?config(osiris_conf, Config),
RConf = Conf#{dir => LDir},
{ok, ByConf} = osiris_log:stream_offset_samples(RConf, [0.0, 0.25, 0.5, 0.75, 1.0]),
?assertEqual(ByPath, ByConf),
ok.

read_ahead_send_file(Config) ->
RAL = 4096, %% read ahead limit
HS = ?HEADER_SIZE_B,
Expand Down Expand Up @@ -2587,7 +2688,7 @@ write_chunk(Conf, Epoch, Now, Records, Trk0, Log0) ->
%% need to re-init as new epoch
osiris_log:close(Log1),
Log = osiris_log:init(Conf#{epoch => Epoch}),
{Trk1, osiris_log:write(lists:reverse(Records), Log)}
{Trk1, osiris_log:write(lists:reverse(Records), Now, Log)}
end.

now_ms() ->
Expand Down
Loading