From 112af94e45f362234eebf4ca31145c0edd48ec90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Fri, 30 Jan 2026 16:29:59 +0100 Subject: [PATCH] Add function to resolve offset specification --- src/osiris.erl | 12 +++++ src/osiris_log.erl | 92 +++++++++++++++++++++++++++------------ test/osiris_log_SUITE.erl | 58 +++++++++++++++++++++++- 3 files changed, 134 insertions(+), 28 deletions(-) diff --git a/src/osiris.erl b/src/osiris.erl index 25396c8e..b23ccbdd 100644 --- a/src/osiris.erl +++ b/src/osiris.erl @@ -18,6 +18,7 @@ fetch_writer_seq/2, init_reader/3, init_reader/4, + resolve_offset_spec/3, register_offset_listener/2, register_offset_listener/3, update_retention/2, @@ -239,6 +240,17 @@ init_reader(Pid, OffsetSpec, {_, _} = CounterSpec, Options) options => Options}, osiris_log:init_offset_reader(OffsetSpec, Ctx). +-spec resolve_offset_spec(pid(), offset_spec(), reader_options()) -> + {ok, offset()} | + {error, no_index_file} | + {error, {offset_out_of_range, osiris_log:range()}} | + {error, retries_exhausted}. +resolve_offset_spec(Pid, OffsetSpec, Options) + when is_pid(Pid) andalso node(Pid) =:= node() -> + Ctx0 = osiris_util:get_reader_context(Pid), + Ctx = Ctx0#{options => Options}, + osiris_log:resolve_offset_spec(OffsetSpec, Ctx). + -spec register_offset_listener(pid(), offset()) -> ok. register_offset_listener(Pid, Offset) -> register_offset_listener(Pid, Offset, undefined). diff --git a/src/osiris_log.erl b/src/osiris_log.erl index e7e137df..95c8ab24 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -28,6 +28,7 @@ send_file/3, init_data_reader/2, init_offset_reader/2, + resolve_offset_spec/2, read_header/1, chunk_iterator/1, chunk_iterator/2, @@ -1145,7 +1146,42 @@ init_offset_reader(OffsetSpec, Conf, Attempt) -> init_offset_reader(NewOffsSpec, NewConf, Attempt - 1) end. -init_offset_reader0({abs, Offs}, #{dir := Dir} = Conf) -> +%% @doc Resolve an offset specification to a numeric offset. +%% @param OffsetSpec specifies where in the log to resolve +%% @param Config The configuration. Requires the `dir' key and `name' for `last'. +%% @returns `{ok, offset()}' or `{error, Error}' +%% @end +-spec resolve_offset_spec(OffsetSpec :: offset_spec(), + Config :: config()) -> + {ok, offset()} | + {error, no_index_file} | + {error, {offset_out_of_range, range()}} | + {error, retries_exhausted}. +resolve_offset_spec(OffsetSpec, Conf) -> + resolve_offset_spec(OffsetSpec, Conf, 3). + +resolve_offset_spec(_OffsetSpec, _Conf, 0) -> + {error, retries_exhausted}; +resolve_offset_spec(OffsetSpec, Conf, Attempt) -> + try + resolve_offset_spec0(OffsetSpec, Conf) + catch + missing_file -> + resolve_offset_spec(OffsetSpec, + maps:remove(index_files, Conf), Attempt - 1); + {retry_with, NewOffsSpec, NewConf} -> + resolve_offset_spec(NewOffsSpec, NewConf, Attempt - 1) + end. + +resolve_offset_spec0(OffsetSpec, Conf) -> + case resolve_offset_location(OffsetSpec, Conf) of + {ok, {ChunkId, _FilePos, _SegmentFile}} -> + {ok, ChunkId}; + {error, _} = Err -> + Err + end. + +resolve_offset_location({abs, Offs}, #{dir := Dir} = Conf) -> case sorted_index_files(Dir) of [] -> {error, no_index_file}; @@ -1157,14 +1193,13 @@ init_offset_reader0({abs, Offs}, #{dir := Dir} = Conf) -> {S, E} when Offs < S orelse Offs > E -> {error, {offset_out_of_range, Range}}; _ -> - %% it is in range, convert to standard offset - init_offset_reader0(Offs, Conf) + resolve_offset_location(Offs, Conf) end end; -init_offset_reader0({timestamp, Ts}, #{} = Conf) -> +resolve_offset_location({timestamp, Ts}, #{} = Conf) -> case sorted_index_files_rev(Conf) of [] -> - init_offset_reader0(next, Conf); + resolve_offset_location(next, Conf); IdxFilesRev -> case timestamp_idx_file_search(Ts, IdxFilesRev) of {scan, IdxFile} -> @@ -1172,52 +1207,50 @@ init_offset_reader0({timestamp, Ts}, #{} = Conf) -> %% find nearest offset {ChunkId, FilePos} = chunk_location_for_timestamp(IdxFile, Ts), SegmentFile = segment_from_index_file(IdxFile), - open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); + {ok, {ChunkId, FilePos, SegmentFile}}; {first_in, IdxFile} -> {ok, Fd} = file:open(IdxFile, [raw, binary, read]), {ok, ?IDX_MATCH(ChunkId, _, FilePos)} = first_idx_record(Fd), + ok = file:close(Fd), SegmentFile = segment_from_index_file(IdxFile), - open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); + {ok, {ChunkId, FilePos, SegmentFile}}; next -> %% segment was not found, attach next - %% this should be rare so no need to call the more optimal - %% open_offset_reader_at/4 function - init_offset_reader0(next, Conf) + resolve_offset_location(next, Conf) end end; -init_offset_reader0(first, #{} = Conf) -> +resolve_offset_location(first, #{} = Conf) -> case sorted_index_files(Conf) of [] -> {error, no_index_file}; - [FstIdxFile | _ ] -> + [FstIdxFile | _] -> case build_seg_info(FstIdxFile) of {ok, #seg_info{file = File, first = undefined}} -> - %% empty log, attach at 0 - open_offset_reader_at(File, 0, ?LOG_HEADER_SIZE, Conf); + {ok, {0, ?LOG_HEADER_SIZE, File}}; {ok, #seg_info{file = File, first = #chunk_info{id = FirstChunkId, pos = FilePos}}} -> - open_offset_reader_at(File, FirstChunkId, FilePos, Conf); + {ok, {FirstChunkId, FilePos, File}}; {error, _} = Err -> exit(Err) end end; -init_offset_reader0(next, #{} = Conf) -> +resolve_offset_location(next, #{} = Conf) -> case sorted_index_files_rev(Conf) of [] -> {error, no_index_file}; - [LastIdxFile | _ ] -> + [LastIdxFile | _] -> case build_seg_info(LastIdxFile) of {ok, #seg_info{file = File, last = LastChunk}} -> {NextChunkId, FilePos} = next_location(LastChunk), - open_offset_reader_at(File, NextChunkId, FilePos, Conf); + {ok, {NextChunkId, FilePos, File}}; Err -> exit(Err) end end; -init_offset_reader0(last, #{name := Name} = Conf) -> +resolve_offset_location(last, #{name := Name} = Conf) -> case sorted_index_files_rev(Conf) of [] -> {error, no_index_file}; @@ -1225,14 +1258,13 @@ init_offset_reader0(last, #{name := Name} = Conf) -> case last_user_chunk_location(Name, IdxFiles) of not_found -> ?DEBUG_(Name, "offset spec: 'last', user chunk not found, fall back to next", []), - %% no user chunks in stream, this is awkward, fall back to next - init_offset_reader0(next, Conf); + resolve_offset_location(next, Conf); {ChunkId, FilePos, IdxFile} -> File = segment_from_index_file(IdxFile), - open_offset_reader_at(File, ChunkId, FilePos, Conf) + {ok, {ChunkId, FilePos, File}} end end; -init_offset_reader0(OffsetSpec, #{} = Conf) +resolve_offset_location(OffsetSpec, #{} = Conf) when is_integer(OffsetSpec) -> Name = maps:get(name, Conf, <<>>), case sorted_index_files(Conf) of @@ -1242,7 +1274,6 @@ init_offset_reader0(OffsetSpec, #{} = Conf) {ok, Range} = chunk_id_range_from_idx_files(IdxFiles), ?DEBUG_(Name, " spec ~w chunk_id range ~w Num index files ~b ", [OffsetSpec, Range, length(IdxFiles)]), - %% clamp start offset StartOffset = case {OffsetSpec, Range} of {_, empty} -> @@ -1250,14 +1281,13 @@ init_offset_reader0(OffsetSpec, #{} = Conf) {Offset, {FirstChId, _LastChId}} -> max(FirstChId, Offset) end, - case find_segment_for_offset(StartOffset, IdxFiles) of {not_found, high} -> throw({retry_with, next, Conf}); {end_of_log, #seg_info{file = SegmentFile, last = LastChunk}} -> {ChunkId, FilePos} = next_location(LastChunk), - open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); + {ok, {ChunkId, FilePos, SegmentFile}}; {found, #seg_info{file = SegmentFile} = SegmentInfo} -> {ChunkId, _Epoch, FilePos} = case offset_idx_scan(Name, StartOffset, SegmentInfo) of @@ -1274,10 +1304,18 @@ init_offset_reader0(OffsetSpec, #{} = Conf) end, ?DEBUG_(Name, "resolved chunk_id ~b" " at file pos: ~w ", [ChunkId, FilePos]), - open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf) + {ok, {ChunkId, FilePos, SegmentFile}} end end. +init_offset_reader0(OffsetSpec, Conf) -> + case resolve_offset_location(OffsetSpec, Conf) of + {ok, {ChunkId, FilePos, SegmentFile}} -> + open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf); + {error, _} = Err -> + Err + end. + open_offset_reader_at(SegmentFile, NextChunkId, FilePos, #{dir := Dir, name := Name, diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index baceac11..9fa5b7bd 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -97,7 +97,10 @@ all_tests() -> overview_with_missing_index_at_start, read_ahead_send_file, read_ahead_send_file_filter, - read_ahead_send_file_on_off + read_ahead_send_file_on_off, + resolve_offset_spec_empty, + resolve_offset_spec_empty_directory, + resolve_offset_spec ]. groups() -> @@ -2434,6 +2437,59 @@ read_ahead_send_file_on_off(Config) -> end || RaOn <- RaOnOff, RType <- RTypes], ok. +resolve_offset_spec_empty(Config) -> + Conf = ?config(osiris_conf, Config), + LDir = ?config(leader_dir, Config), + LLog0 = seed_log(LDir, [], Config), + osiris_log:close(LLog0), + RConf = Conf#{dir => LDir}, + ?assertEqual({ok, 0}, osiris_log:resolve_offset_spec(first, RConf)), + ?assertEqual({ok, 0}, osiris_log:resolve_offset_spec(last, RConf)), + ?assertEqual({ok, 0}, osiris_log:resolve_offset_spec(next, RConf)), + ?assertEqual({ok, 0}, osiris_log:resolve_offset_spec(0, RConf)), + ?assertEqual({error, {offset_out_of_range, empty}}, + osiris_log:resolve_offset_spec({abs, 1}, RConf)), + ok. + +resolve_offset_spec_empty_directory(Config) -> + Conf = ?config(osiris_conf, Config), + LDir = ?config(leader_dir, Config), + RConf = Conf#{dir => LDir}, + ?assertEqual({error, no_index_file}, osiris_log:resolve_offset_spec(first, RConf)), + ?assertEqual({error, no_index_file}, osiris_log:resolve_offset_spec(last, RConf)), + ?assertEqual({error, no_index_file}, osiris_log:resolve_offset_spec(next, RConf)), + ?assertEqual({error, no_index_file}, osiris_log:resolve_offset_spec(0, RConf)), + ?assertEqual({error, no_index_file}, osiris_log:resolve_offset_spec({abs, 1}, RConf)), + ok. + +resolve_offset_spec(Config) -> + EpochChunks = + [{1, [<<"one">>]}, {2, [<<"two">>]}, {3, [<<"three">>, <<"four">>]}], + LDir = ?config(leader_dir, Config), + Conf = ?config(osiris_conf, Config), + set_shared(Conf, 3), + LLog0 = seed_log(LDir, EpochChunks, Config), + osiris_log:close(LLog0), + RConf = Conf#{dir => LDir}, + + ?assertEqual({ok, 0}, osiris_log:resolve_offset_spec(first, RConf)), + ?assertEqual({ok, 2}, osiris_log:resolve_offset_spec(last, RConf)), + ?assertEqual({ok, 4}, osiris_log:resolve_offset_spec(next, RConf)), + ?assertEqual({ok, 0}, osiris_log:resolve_offset_spec(0, RConf)), + ?assertEqual({ok, 1}, osiris_log:resolve_offset_spec(1, RConf)), + ?assertEqual({ok, 2}, osiris_log:resolve_offset_spec(2, RConf)), + ?assertEqual({ok, 2}, osiris_log:resolve_offset_spec(3, RConf)), + ?assertEqual({ok, 2}, osiris_log:resolve_offset_spec({abs, 2}, RConf)), + ?assertEqual({ok, 2}, osiris_log:resolve_offset_spec({abs, 3}, RConf)), + %% integer offset too high falls back to next + ?assertEqual({ok, 4}, osiris_log:resolve_offset_spec(100, RConf)), + %% {abs, _} out of range returns error + ?assertEqual({error, {offset_out_of_range, {0, 3}}}, + osiris_log:resolve_offset_spec({abs, 4}, RConf)), + ?assertEqual({error, {offset_out_of_range, {0, 3}}}, + osiris_log:resolve_offset_spec({abs, 100}, RConf)), + ok. + %% Utility init_reader(offset, Conf) ->