Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/osiris.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
fetch_writer_seq/2,
init_reader/3,
init_reader/4,
resolve_offset_spec/3,
register_offset_listener/2,
register_offset_listener/3,
update_retention/2,
Expand Down Expand Up @@ -239,6 +240,17 @@ init_reader(Pid, OffsetSpec, {_, _} = CounterSpec, Options)
options => Options},
osiris_log:init_offset_reader(OffsetSpec, Ctx).

-spec resolve_offset_spec(pid(), offset_spec(), reader_options()) ->
{ok, offset()} |
{error, no_index_file} |
{error, {offset_out_of_range, osiris_log:range()}} |
{error, retries_exhausted}.
resolve_offset_spec(Pid, OffsetSpec, Options)
when is_pid(Pid) andalso node(Pid) =:= node() ->
Ctx0 = osiris_util:get_reader_context(Pid),
Ctx = Ctx0#{options => Options},
osiris_log:resolve_offset_spec(OffsetSpec, Ctx).

-spec register_offset_listener(pid(), offset()) -> ok.
register_offset_listener(Pid, Offset) ->
register_offset_listener(Pid, Offset, undefined).
Expand Down
92 changes: 65 additions & 27 deletions src/osiris_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
send_file/3,
init_data_reader/2,
init_offset_reader/2,
resolve_offset_spec/2,
read_header/1,
chunk_iterator/1,
chunk_iterator/2,
Expand Down Expand Up @@ -1145,7 +1146,42 @@ init_offset_reader(OffsetSpec, Conf, Attempt) ->
init_offset_reader(NewOffsSpec, NewConf, Attempt - 1)
end.

init_offset_reader0({abs, Offs}, #{dir := Dir} = Conf) ->
%% @doc Resolve an offset specification to a numeric offset.
%% @param OffsetSpec specifies where in the log to resolve
%% @param Config The configuration. Requires the `dir' key and `name' for `last'.
%% @returns `{ok, offset()}' or `{error, Error}'
%% @end
-spec resolve_offset_spec(OffsetSpec :: offset_spec(),
Config :: config()) ->
{ok, offset()} |
{error, no_index_file} |
{error, {offset_out_of_range, range()}} |
{error, retries_exhausted}.
resolve_offset_spec(OffsetSpec, Conf) ->
resolve_offset_spec(OffsetSpec, Conf, 3).

resolve_offset_spec(_OffsetSpec, _Conf, 0) ->
{error, retries_exhausted};
resolve_offset_spec(OffsetSpec, Conf, Attempt) ->
try
resolve_offset_spec0(OffsetSpec, Conf)
catch
missing_file ->
resolve_offset_spec(OffsetSpec,
maps:remove(index_files, Conf), Attempt - 1);
{retry_with, NewOffsSpec, NewConf} ->
resolve_offset_spec(NewOffsSpec, NewConf, Attempt - 1)
end.

resolve_offset_spec0(OffsetSpec, Conf) ->
case resolve_offset_location(OffsetSpec, Conf) of
{ok, {ChunkId, _FilePos, _SegmentFile}} ->
{ok, ChunkId};
{error, _} = Err ->
Err
end.

resolve_offset_location({abs, Offs}, #{dir := Dir} = Conf) ->
case sorted_index_files(Dir) of
[] ->
{error, no_index_file};
Expand All @@ -1157,82 +1193,78 @@ init_offset_reader0({abs, Offs}, #{dir := Dir} = Conf) ->
{S, E} when Offs < S orelse Offs > E ->
{error, {offset_out_of_range, Range}};
_ ->
%% it is in range, convert to standard offset
init_offset_reader0(Offs, Conf)
resolve_offset_location(Offs, Conf)
end
end;
init_offset_reader0({timestamp, Ts}, #{} = Conf) ->
resolve_offset_location({timestamp, Ts}, #{} = Conf) ->
case sorted_index_files_rev(Conf) of
[] ->
init_offset_reader0(next, Conf);
resolve_offset_location(next, Conf);
IdxFilesRev ->
case timestamp_idx_file_search(Ts, IdxFilesRev) of
{scan, IdxFile} ->
%% segment was found, now we need to scan index to
%% find nearest offset
{ChunkId, FilePos} = chunk_location_for_timestamp(IdxFile, Ts),
SegmentFile = segment_from_index_file(IdxFile),
open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf);
{ok, {ChunkId, FilePos, SegmentFile}};
{first_in, IdxFile} ->
{ok, Fd} = file:open(IdxFile, [raw, binary, read]),
{ok, ?IDX_MATCH(ChunkId, _, FilePos)} = first_idx_record(Fd),
ok = file:close(Fd),
SegmentFile = segment_from_index_file(IdxFile),
open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf);
{ok, {ChunkId, FilePos, SegmentFile}};
next ->
%% segment was not found, attach next
%% this should be rare so no need to call the more optimal
%% open_offset_reader_at/4 function
init_offset_reader0(next, Conf)
resolve_offset_location(next, Conf)
end
end;
init_offset_reader0(first, #{} = Conf) ->
resolve_offset_location(first, #{} = Conf) ->
case sorted_index_files(Conf) of
[] ->
{error, no_index_file};
[FstIdxFile | _ ] ->
[FstIdxFile | _] ->
case build_seg_info(FstIdxFile) of
{ok, #seg_info{file = File,
first = undefined}} ->
%% empty log, attach at 0
open_offset_reader_at(File, 0, ?LOG_HEADER_SIZE, Conf);
{ok, {0, ?LOG_HEADER_SIZE, File}};
{ok, #seg_info{file = File,
first = #chunk_info{id = FirstChunkId,
pos = FilePos}}} ->
open_offset_reader_at(File, FirstChunkId, FilePos, Conf);
{ok, {FirstChunkId, FilePos, File}};
{error, _} = Err ->
exit(Err)
end
end;
init_offset_reader0(next, #{} = Conf) ->
resolve_offset_location(next, #{} = Conf) ->
case sorted_index_files_rev(Conf) of
[] ->
{error, no_index_file};
[LastIdxFile | _ ] ->
[LastIdxFile | _] ->
case build_seg_info(LastIdxFile) of
{ok, #seg_info{file = File,
last = LastChunk}} ->
{NextChunkId, FilePos} = next_location(LastChunk),
open_offset_reader_at(File, NextChunkId, FilePos, Conf);
{ok, {NextChunkId, FilePos, File}};
Err ->
exit(Err)
end
end;
init_offset_reader0(last, #{name := Name} = Conf) ->
resolve_offset_location(last, #{name := Name} = Conf) ->
case sorted_index_files_rev(Conf) of
[] ->
{error, no_index_file};
IdxFiles ->
case last_user_chunk_location(Name, IdxFiles) of
not_found ->
?DEBUG_(Name, "offset spec: 'last', user chunk not found, fall back to next", []),
%% no user chunks in stream, this is awkward, fall back to next
init_offset_reader0(next, Conf);
resolve_offset_location(next, Conf);
{ChunkId, FilePos, IdxFile} ->
File = segment_from_index_file(IdxFile),
open_offset_reader_at(File, ChunkId, FilePos, Conf)
{ok, {ChunkId, FilePos, File}}
end
end;
init_offset_reader0(OffsetSpec, #{} = Conf)
resolve_offset_location(OffsetSpec, #{} = Conf)
when is_integer(OffsetSpec) ->
Name = maps:get(name, Conf, <<>>),
case sorted_index_files(Conf) of
Expand All @@ -1242,22 +1274,20 @@ init_offset_reader0(OffsetSpec, #{} = Conf)
{ok, Range} = chunk_id_range_from_idx_files(IdxFiles),
?DEBUG_(Name, " spec ~w chunk_id range ~w Num index files ~b ",
[OffsetSpec, Range, length(IdxFiles)]),

%% clamp start offset
StartOffset = case {OffsetSpec, Range} of
{_, empty} ->
0;
{Offset, {FirstChId, _LastChId}} ->
max(FirstChId, Offset)
end,

case find_segment_for_offset(StartOffset, IdxFiles) of
{not_found, high} ->
throw({retry_with, next, Conf});
{end_of_log, #seg_info{file = SegmentFile,
last = LastChunk}} ->
{ChunkId, FilePos} = next_location(LastChunk),
open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf);
{ok, {ChunkId, FilePos, SegmentFile}};
{found, #seg_info{file = SegmentFile} = SegmentInfo} ->
{ChunkId, _Epoch, FilePos} =
case offset_idx_scan(Name, StartOffset, SegmentInfo) of
Expand All @@ -1274,10 +1304,18 @@ init_offset_reader0(OffsetSpec, #{} = Conf)
end,
?DEBUG_(Name, "resolved chunk_id ~b"
" at file pos: ~w ", [ChunkId, FilePos]),
open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf)
{ok, {ChunkId, FilePos, SegmentFile}}
end
end.

init_offset_reader0(OffsetSpec, Conf) ->
case resolve_offset_location(OffsetSpec, Conf) of
{ok, {ChunkId, FilePos, SegmentFile}} ->
open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf);
{error, _} = Err ->
Err
end.

open_offset_reader_at(SegmentFile, NextChunkId, FilePos,
#{dir := Dir,
name := Name,
Expand Down
58 changes: 57 additions & 1 deletion test/osiris_log_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ all_tests() ->
overview_with_missing_index_at_start,
read_ahead_send_file,
read_ahead_send_file_filter,
read_ahead_send_file_on_off
read_ahead_send_file_on_off,
resolve_offset_spec_empty,
resolve_offset_spec_empty_directory,
resolve_offset_spec
].

groups() ->
Expand Down Expand Up @@ -2434,6 +2437,59 @@ read_ahead_send_file_on_off(Config) ->
end || RaOn <- RaOnOff, RType <- RTypes],
ok.

resolve_offset_spec_empty(Config) ->
Conf = ?config(osiris_conf, Config),
LDir = ?config(leader_dir, Config),
LLog0 = seed_log(LDir, [], Config),
osiris_log:close(LLog0),
RConf = Conf#{dir => LDir},
?assertEqual({ok, 0}, osiris_log:resolve_offset_spec(first, RConf)),
?assertEqual({ok, 0}, osiris_log:resolve_offset_spec(last, RConf)),
?assertEqual({ok, 0}, osiris_log:resolve_offset_spec(next, RConf)),
?assertEqual({ok, 0}, osiris_log:resolve_offset_spec(0, RConf)),
?assertEqual({error, {offset_out_of_range, empty}},
osiris_log:resolve_offset_spec({abs, 1}, RConf)),
ok.

resolve_offset_spec_empty_directory(Config) ->
Conf = ?config(osiris_conf, Config),
LDir = ?config(leader_dir, Config),
RConf = Conf#{dir => LDir},
?assertEqual({error, no_index_file}, osiris_log:resolve_offset_spec(first, RConf)),
?assertEqual({error, no_index_file}, osiris_log:resolve_offset_spec(last, RConf)),
?assertEqual({error, no_index_file}, osiris_log:resolve_offset_spec(next, RConf)),
?assertEqual({error, no_index_file}, osiris_log:resolve_offset_spec(0, RConf)),
?assertEqual({error, no_index_file}, osiris_log:resolve_offset_spec({abs, 1}, RConf)),
ok.

resolve_offset_spec(Config) ->
EpochChunks =
[{1, [<<"one">>]}, {2, [<<"two">>]}, {3, [<<"three">>, <<"four">>]}],
LDir = ?config(leader_dir, Config),
Conf = ?config(osiris_conf, Config),
set_shared(Conf, 3),
LLog0 = seed_log(LDir, EpochChunks, Config),
osiris_log:close(LLog0),
RConf = Conf#{dir => LDir},

?assertEqual({ok, 0}, osiris_log:resolve_offset_spec(first, RConf)),
?assertEqual({ok, 2}, osiris_log:resolve_offset_spec(last, RConf)),
?assertEqual({ok, 4}, osiris_log:resolve_offset_spec(next, RConf)),
?assertEqual({ok, 0}, osiris_log:resolve_offset_spec(0, RConf)),
?assertEqual({ok, 1}, osiris_log:resolve_offset_spec(1, RConf)),
?assertEqual({ok, 2}, osiris_log:resolve_offset_spec(2, RConf)),
?assertEqual({ok, 2}, osiris_log:resolve_offset_spec(3, RConf)),
?assertEqual({ok, 2}, osiris_log:resolve_offset_spec({abs, 2}, RConf)),
?assertEqual({ok, 2}, osiris_log:resolve_offset_spec({abs, 3}, RConf)),
%% integer offset too high falls back to next
?assertEqual({ok, 4}, osiris_log:resolve_offset_spec(100, RConf)),
%% {abs, _} out of range returns error
?assertEqual({error, {offset_out_of_range, {0, 3}}},
osiris_log:resolve_offset_spec({abs, 4}, RConf)),
?assertEqual({error, {offset_out_of_range, {0, 3}}},
osiris_log:resolve_offset_spec({abs, 100}, RConf)),
ok.

%% Utility

init_reader(offset, Conf) ->
Expand Down