diff --git a/src/osiris.erl b/src/osiris.erl index b23ccbdd..77581403 100644 --- a/src/osiris.erl +++ b/src/osiris.erl @@ -68,8 +68,12 @@ {abs, offset()} | offset() | {timestamp, timestamp()}. +-type retention_fun() :: fun((IdxFiles :: [file:filename_all()]) -> + {ToDelete :: [file:filename_all()], ToKeep :: [file:filename_all()]}). -type retention_spec() :: - {max_bytes, non_neg_integer()} | {max_age, milliseconds()}. + {max_bytes, non_neg_integer()} | + {max_age, milliseconds()} | + {'fun', retention_fun()}. -type writer_id() :: binary(). -type batch() :: {batch, NumRecords :: non_neg_integer(), compression_type(), @@ -83,11 +87,6 @@ %% returned when reading -type entry() :: binary() | batch(). --type reader_options() :: #{transport => tcp | ssl, - chunk_selector => all | user_data, - filter_spec => osiris_bloom:filter_spec(), - read_ahead => boolean() | non_neg_integer() - }. -export_type([name/0, config/0, @@ -97,6 +96,7 @@ tracking_id/0, offset_spec/0, retention_spec/0, + retention_fun/0, timestamp/0, writer_id/0, data/0, @@ -228,8 +228,8 @@ init_reader(Pid, OffsetSpec, CounterSpec) -> chunk_selector => user_data}). -spec init_reader(pid(), offset_spec(), osiris_log:counter_spec(), - reader_options()) -> - {ok, osiris_log:state()} | + osiris_log_reader:options()) -> + {ok, osiris_log_reader:state()} | {error, {offset_out_of_range, empty | {offset(), offset()}}} | {error, {invalid_last_offset_epoch, offset(), offset()}}. init_reader(Pid, OffsetSpec, {_, _} = CounterSpec, Options) @@ -238,9 +238,9 @@ init_reader(Pid, OffsetSpec, {_, _} = CounterSpec, Options) Ctx0 = osiris_util:get_reader_context(Pid), Ctx = Ctx0#{counter_spec => CounterSpec, options => Options}, - osiris_log:init_offset_reader(OffsetSpec, Ctx). + (osiris_log_reader:module()):init_offset_reader(OffsetSpec, Ctx). --spec resolve_offset_spec(pid(), offset_spec(), reader_options()) -> +-spec resolve_offset_spec(pid(), offset_spec(), osiris_log_reader:options()) -> {ok, offset()} | {error, no_index_file} | {error, {offset_out_of_range, osiris_log:range()}} | @@ -249,7 +249,7 @@ resolve_offset_spec(Pid, OffsetSpec, Options) when is_pid(Pid) andalso node(Pid) =:= node() -> Ctx0 = osiris_util:get_reader_context(Pid), Ctx = Ctx0#{options => Options}, - osiris_log:resolve_offset_spec(OffsetSpec, Ctx). + (osiris_log_reader:module()):resolve_offset_spec(OffsetSpec, Ctx). -spec register_offset_listener(pid(), offset()) -> ok. register_offset_listener(Pid, Offset) -> diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 95c8ab24..0f90089a 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -58,7 +58,15 @@ delete_directory/1, counter_fields/0, make_counter/1, - generate_log/4]). + generate_log/4, + parse_header/2]). + +-behaviour(osiris_log_manifest). +%% osiris_log_manifest implementations +-export([init_manifest/2, + handle_event/2, + close_manifest/1, + delete/1]). -export([dump_init/1, dump_init_idx/1, @@ -364,6 +372,7 @@ -type offset() :: osiris:offset(). -type epoch() :: osiris:epoch(). -type range() :: empty | {From :: offset(), To :: offset()}. +-type epoch_offsets() :: [{epoch(), offset()}]. -type counter_spec() :: {Tag :: term(), Fields :: seshat:fields_spec()}. -type chunk_type() :: ?CHNK_USER | @@ -395,19 +404,19 @@ -type offset_spec() :: osiris:offset_spec(). -type retention_spec() :: osiris:retention_spec(). -type header_map() :: - #{chunk_id => offset(), - epoch => epoch(), - type => chunk_type(), - crc => integer(), - num_records => non_neg_integer(), - num_entries => non_neg_integer(), - timestamp => osiris:timestamp(), - data_size => non_neg_integer(), - trailer_size => non_neg_integer(), - filter_size => 16..255, - header_data => binary(), - position => non_neg_integer(), - next_position => non_neg_integer()}. + #{chunk_id := offset(), + epoch := epoch(), + type := chunk_type(), + crc := integer(), + num_records := non_neg_integer(), + num_entries := non_neg_integer(), + timestamp := osiris:timestamp(), + data_size := non_neg_integer(), + trailer_size := non_neg_integer(), + filter_size := 16..255, + header_data := binary(), + position := non_neg_integer(), + next_position := non_neg_integer()}. -type transport() :: tcp | ssl. %% holds static or rarely changing fields @@ -424,7 +433,8 @@ %% that will be included in snapshots written to new segments readers_counter_fun = fun(_) -> ok end :: function(), shared :: atomics:atomics_ref(), - filter_size = ?DEFAULT_FILTER_SIZE :: osiris_bloom:filter_size() + filter_size = ?DEFAULT_FILTER_SIZE :: osiris_bloom:filter_size(), + manifest_module = ?MODULE :: module() }). -record(ra, {size = ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE :: non_neg_integer(), @@ -446,7 +456,8 @@ {type = writer :: writer | acceptor, segment_size = {?LOG_HEADER_SIZE, 0} :: {non_neg_integer(), non_neg_integer()}, current_epoch :: non_neg_integer(), - tail_info = {0, empty} :: osiris:tail_info() + tail_info = {0, empty} :: osiris:tail_info(), + manifest :: osiris_log_manifest:state() }). -record(?MODULE, {cfg :: #cfg{}, @@ -458,7 +469,7 @@ %% record chunk_info does not map exactly to an index record (field 'num' differs) -record(chunk_info, {id :: offset(), - timestamp :: non_neg_integer(), + timestamp :: osiris:timestamp(), epoch :: epoch(), num :: non_neg_integer(), type :: chunk_type(), @@ -477,11 +488,14 @@ -opaque state() :: #?MODULE{}. -export_type([state/0, + chunk_type/0, chunk_iterator/0, range/0, + epoch_offsets/0, config/0, counter_spec/0, - transport/0]). + transport/0, + header_map/0]). -spec directory(osiris:config() | list()) -> file:filename_all(). directory(#{name := Name, dir := Dir}) -> @@ -500,20 +514,11 @@ init(Config) -> -spec init(config(), writer | acceptor) -> state(). init(#{dir := Dir, name := Name, - epoch := Epoch} = Config, + epoch := Epoch} = Config0, WriterType) -> %% scan directory for segments if in write mode - MaxSizeBytes = maps:get(max_segment_size_bytes, Config, - ?DEFAULT_MAX_SEGMENT_SIZE_B), - MaxSizeChunks = application:get_env(osiris, max_segment_size_chunks, - ?DEFAULT_MAX_SEGMENT_SIZE_C), - Retention = maps:get(retention, Config, []), - FilterSize = maps:get(filter_size, Config, ?DEFAULT_FILTER_SIZE), ?INFO("Stream: ~ts will use ~ts for osiris log data directory", [Name, Dir]), - ?DEBUG_(Name, "max_segment_size_bytes: ~b, - max_segment_size_chunks ~b, retention ~w, filter size ~b", - [MaxSizeBytes, MaxSizeChunks, Retention, FilterSize]), ok = filelib:ensure_dir(Dir), case file:make_dir(Dir) of ok -> @@ -523,7 +528,25 @@ init(#{dir := Dir, Err -> throw(Err) end, + ok = maybe_fix_corrupted_files(Config0), + + ManifestMod = manifest_module(), + {Config, Manifest0} = case Config0 of + #{manifest := M} -> + {Config0, M}; + _ -> + ManifestMod:init_manifest(Config0, writer) + end, + MaxSizeBytes = maps:get(max_segment_size_bytes, Config, + ?DEFAULT_MAX_SEGMENT_SIZE_B), + MaxSizeChunks = application:get_env(osiris, max_segment_size_chunks, + ?DEFAULT_MAX_SEGMENT_SIZE_C), + Retention = maps:get(retention, Config, []), + FilterSize = maps:get(filter_size, Config, ?DEFAULT_FILTER_SIZE), + ?DEBUG_(Name, "max_segment_size_bytes: ~b, + max_segment_size_chunks ~b, retention ~w, filter size ~b", + [MaxSizeBytes, MaxSizeChunks, Retention, FilterSize]), Cnt = make_counter(Config), %% initialise offset counter to -1 as 0 is the first offset in the log and %% it hasn't necessarily been written yet, for an empty log the first offset @@ -545,8 +568,8 @@ init(#{dir := Dir, counter = Cnt, counter_id = counter_id(Config), shared = Shared, - filter_size = FilterSize}, - ok = maybe_fix_corrupted_files(Config), + filter_size = FilterSize, + manifest_module = ManifestMod}, DefaultNextOffset = case Config of #{initial_offset := IO} when WriterType == acceptor -> @@ -563,6 +586,7 @@ init(#{dir := Dir, #write{type = WriterType, tail_info = {DefaultNextOffset, empty}, + manifest = Manifest0, current_epoch = Epoch}}); {NumSegments, #seg_info{first = #chunk_info{id = FstChId, @@ -604,11 +628,14 @@ init(#{dir := Dir, {ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE), {ok, IdxEof} = file:position(IdxFd, eof), NumChunks = (IdxEof - ?IDX_HEADER_SIZE) div ?INDEX_RECORD_SIZE_B, + Event = {segment_opened, undefined, filename:basename(Filename)}, + Manifest = ManifestMod:handle_event(Event, Manifest0), #?MODULE{cfg = Cfg, mode = #write{type = WriterType, tail_info = TailInfo, segment_size = {Size, NumChunks}, + manifest = Manifest, current_epoch = Epoch}, current_file = filename:basename(Filename), fd = SegFd, @@ -628,10 +655,13 @@ init(#{dir := Dir, {ok, _} = file:position(IdxFd, ?IDX_HEADER_SIZE), osiris_log_shared:set_first_chunk_id(Shared, DefaultNextOffset - 1), osiris_log_shared:set_last_chunk_id(Shared, DefaultNextOffset - 1), + Event = {segment_opened, undefined, filename:basename(Filename)}, + Manifest = ManifestMod:handle_event(Event, Manifest0), #?MODULE{cfg = Cfg, mode = #write{type = WriterType, tail_info = {DefaultNextOffset, empty}, + manifest = Manifest, current_epoch = Epoch}, current_file = filename:basename(Filename), fd = SegFd, @@ -868,7 +898,7 @@ evaluate_tracking_snapshot(#?MODULE{mode = #write{type = writer}} = State0, Trk0 -spec init_acceptor(range(), list(), config()) -> state(). init_acceptor(Range, EpochOffsets0, - #{name := Name, dir := Dir} = Conf) -> + #{name := Name, dir := Dir} = Conf0) -> %% truncate to first common last epoch offset %% * if the last local chunk offset has the same epoch but is lower %% than the last chunk offset then just attach at next offset. @@ -880,6 +910,8 @@ init_acceptor(Range, EpochOffsets0, lists:reverse( lists:sort(EpochOffsets0)), + {Conf, Manifest} = (manifest_module()):init_manifest(Conf0, acceptor), + %% then truncate to IdxFiles = sorted_index_files(Dir), ?DEBUG_(Name, "from epoch offsets: ~w range ~w", [EpochOffsets, Range]), @@ -890,7 +922,8 @@ init_acceptor(Range, EpochOffsets0, {O, _} -> O end, init(Conf#{initial_offset => InitOffset, - index_files => RemIdxFiles}, acceptor). + index_files => RemIdxFiles, + manifest => Manifest}, acceptor). chunk_id_index_scan(IdxFile, ChunkId) when ?IS_STRING(IdxFile) -> @@ -1866,11 +1899,19 @@ needs_handling(_, _, _) -> -spec close(state()) -> ok. close(#?MODULE{cfg = #cfg{counter_id = CntId, + manifest_module = ManifestMod, readers_counter_fun = Fun}, fd = SegFd, - index_fd = IdxFd}) -> + index_fd = IdxFd, + mode = Mode}) -> close_fd(IdxFd), close_fd(SegFd), + case Mode of + #write{manifest = Manifest} -> + ok = ManifestMod:close_manifest(Manifest); + _ -> + ok + end, Fun(-1), case CntId of undefined -> @@ -1879,14 +1920,17 @@ close(#?MODULE{cfg = #cfg{counter_id = CntId, osiris_counters:delete(CntId) end. -delete_directory(#{name := Name, - dir := _} = Config) -> +delete_directory(Config) -> + (manifest_module()):delete(Config). + +delete(#{name := Name, + dir := _} = Config) -> Dir = directory(Config), ?DEBUG_(Name, " deleting directory ~ts", [Dir]), delete_dir(Dir); -delete_directory(#{name := Name}) -> - delete_directory(Name); -delete_directory(Name) when ?IS_STRING(Name) -> +delete(#{name := Name}) -> + delete(Name); +delete(Name) when ?IS_STRING(Name) -> Dir = directory(Name), ?DEBUG_(Name, " deleting directory ~ts", [Dir]), delete_dir(Dir). @@ -2167,7 +2211,7 @@ build_segment_info(SegFile, LastChunkPos, IdxFile) -> end. -spec overview(file:filename_all()) -> - {range(), [{epoch(), offset()}]}. + {range(), epoch_offsets()}. overview(Dir) -> Files = list_dir(Dir), %% index files with matching segment @@ -2231,11 +2275,19 @@ format_status(#?MODULE{cfg = #cfg{directory = Dir, -spec update_retention([retention_spec()], state()) -> state(). update_retention(Retention, #?MODULE{cfg = #cfg{name = Name, + manifest_module = ?MODULE, retention = Retention0} = Cfg} = State0) when is_list(Retention) -> ?DEBUG_(Name, " from: ~w to ~w", [Retention0, Retention]), State = State0#?MODULE{cfg = Cfg#cfg{retention = Retention}}, - trigger_retention_eval(State). + trigger_retention_eval(State); +update_retention(Retention, + #?MODULE{cfg = #cfg{manifest_module = ManifestMod}, + mode = #write{manifest = Manifest0} = Write0} = + State0) -> + Event = {retention_updated, Retention}, + Manifest = ManifestMod:handle_event(Event, Manifest0), + State0#?MODULE{mode = Write0#write{manifest = Manifest}}. -spec evaluate_retention(file:filename_all(), [retention_spec()]) -> {range(), FirstTimestamp :: osiris:timestamp(), @@ -2257,14 +2309,15 @@ evaluate_retention(Dir, Specs) when is_binary(Dir) -> ?DEBUG_(<<>>," (~w) completed in ~fms", [Specs, Time/1_000]), Result. -evaluate_retention0(IdxFiles, []) -> - IdxFiles; -evaluate_retention0(IdxFiles, [{max_bytes, MaxSize} | Specs]) -> - RemIdxFiles = eval_max_bytes(IdxFiles, MaxSize), - evaluate_retention0(RemIdxFiles, Specs); -evaluate_retention0(IdxFiles, [{max_age, Age} | Specs]) -> - RemIdxFiles = eval_age(IdxFiles, Age), - evaluate_retention0(RemIdxFiles, Specs). +evaluate_retention0(IdxFiles, Specs) -> + lists:foldl( + fun ({max_bytes, MaxSize}, RemIdxFiles) -> + eval_max_bytes(RemIdxFiles, MaxSize); + ({max_age, Age}, RemIdxFiles) -> + eval_age(RemIdxFiles, Age); + ({'fun', Fun}, RemIdxFiles) -> + eval_retention_fun(RemIdxFiles, Fun) + end, IdxFiles, Specs). eval_age([_] = IdxFiles, _Age) -> IdxFiles; @@ -2329,6 +2382,13 @@ file_size_or_zero(Path) -> 0 end. +eval_retention_fun([], _) -> + []; +eval_retention_fun(IdxFiles, Fun) -> + {ToDelete, ToKeep} = Fun(IdxFiles), + _ = [ok = delete_segment_from_index(Idx) || Idx <- ToDelete], + ToKeep. + last_epoch_chunk_ids(Name, IdxFiles) -> T1 = erlang:monotonic_time(), %% no need to filter out empty index files as @@ -2523,11 +2583,13 @@ write_chunk(Chunk, Epoch, NumRecords, #?MODULE{cfg = #cfg{counter = CntRef, + manifest_module = ManifestMod, shared = Shared} = Cfg, fd = Fd, index_fd = IdxFd, mode = #write{segment_size = {SegSizeBytes, SegSizeChunks}, + manifest = Manifest0, tail_info = {Next, _}} = Write} = State) -> @@ -2557,9 +2619,19 @@ write_chunk(Chunk, counters:put(CntRef, ?C_OFFSET, NextOffset - 1), counters:add(CntRef, ?C_CHUNKS, 1), maybe_set_first_offset(Next, Cfg), + ChunkInfo = #{id => Next, + timestamp => Timestamp, + epoch => Epoch, + num => NumRecords, + type => ChType, + size => Size, + pos => Cur}, + Event = {chunk_written, ChunkInfo, Chunk}, + Manifest = ManifestMod:handle_event(Event, Manifest0), State#?MODULE{mode = Write#write{tail_info = {NextOffset, {Epoch, Next, Timestamp}}, + manifest = Manifest, segment_size = {SegSizeBytes + Size, SegSizeChunks + 1}}} end. @@ -2768,10 +2840,13 @@ make_file_name(N, Suff) -> open_new_segment(#?MODULE{cfg = #cfg{name = Name, directory = Dir, - counter = Cnt}, + counter = Cnt, + manifest_module = ManifestMod}, fd = OldFd, index_fd = OldIdxFd, + current_file = OldFilename, mode = #write{type = _WriterType, + manifest = Manifest0, tail_info = {NextOffset, _}} = Write} = State0) -> _ = close_fd(OldFd), @@ -2792,11 +2867,15 @@ open_new_segment(#?MODULE{cfg = #cfg{name = Name, {ok, _} = file:position(IdxFd, eof), counters:add(Cnt, ?C_SEGMENTS, 1), + Event = {segment_opened, OldFilename, Filename}, + Manifest = ManifestMod:handle_event(Event, Manifest0), + State0#?MODULE{current_file = Filename, fd = Fd, - %% reset segment_size counter index_fd = IdxFd, - mode = Write#write{segment_size = {?LOG_HEADER_SIZE, 0}}}. + mode = Write#write{manifest = Manifest, + %% reset segment_size counter + segment_size = {?LOG_HEADER_SIZE, 0}}}. open_index_read(File) -> {ok, Fd} = open(File, [read, raw, binary, read_ahead]), @@ -3033,7 +3112,8 @@ read_header_with_ra(#?MODULE{cfg = #cfg{directory = Dir, case ra_read(Pos, ?HEADER_SIZE_B, Ra0) of Bin when is_binary(Bin) andalso byte_size(Bin) == ?HEADER_SIZE_B -> - parse_header(Bin, State); + {ok, Header} = parse_header(Bin, Pos), + read_header_with_ra0(Header, State); undefined -> case ra_fill(Fd, Pos, Ra0) of {ok, Ra} -> @@ -3070,60 +3150,36 @@ read_header_with_ra(#?MODULE{cfg = #cfg{directory = Dir, end end. -parse_header(<> = HeaderData0, - #?MODULE{cfg = #cfg{counter = CntRef}, - fd = Fd, - mode = #read{next_offset = NextChId0, - position = Pos, - filter = Filter, - read_ahead = Ra0} = Read0} = State0) -> - +read_header_with_ra0(#{chunk_id := NextChId0, + num_records := NumRecords, + data_size := DataSize, + filter_size := FilterSize, + next_position := NextPos} = Header, + #?MODULE{cfg = #cfg{counter = CntRef}, + fd = Fd, + mode = #read{next_offset = NextChId0, + position = Pos, + filter = Filter, + read_ahead = Ra0} = Read0} = State0) -> Ra1 = ra_update_size(Filter, FilterSize, DataSize, Ra0), case ra_read(Pos + ?HEADER_SIZE_B, FilterSize, Ra1) of undefined -> {ok, Ra} = ra_fill(Fd, Pos + ?HEADER_SIZE_B, Ra1), - parse_header(HeaderData0, - State0#?MODULE{mode = Read0#read{read_ahead = Ra}}); + State = State0#?MODULE{mode = Read0#read{read_ahead = Ra}}, + read_header_with_ra0(Header, State); ChunkFilter -> counters:put(CntRef, ?C_OFFSET, NextChId0 + NumRecords), counters:add(CntRef, ?C_CHUNKS, 1), - NextPos = Pos + ?HEADER_SIZE_B + FilterSize + DataSize + TrailerSize, case osiris_bloom:is_match(ChunkFilter, Filter) of true -> - <> = HeaderData0, State = case Ra1 of Ra0 -> State0; Ra -> State0#?MODULE{mode = Read0#read{read_ahead = Ra}} end, - {ok, #{chunk_id => NextChId0, - epoch => Epoch, - type => ChType, - crc => Crc, - num_records => NumRecords, - num_entries => NumEntries, - timestamp => Timestamp, - data_size => DataSize, - trailer_size => TrailerSize, - header_data => HeaderData, - filter_size => FilterSize, - next_position => NextPos, - position => Pos}, - State}; + {ok, Header, State}; false -> Read = Read0#read{next_offset = NextChId0 + NumRecords, position = NextPos, @@ -3140,6 +3196,40 @@ parse_header(< + {ok, header_map()} | + {error, invalid_chunk_header}. +parse_header(<> = HeaderData, + Pos) -> + NextPos = Pos + ?HEADER_SIZE_B + FilterSize + DataSize + TrailerSize, + {ok, #{chunk_id => NextChId0, + epoch => Epoch, + type => ChType, + crc => Crc, + num_records => NumRecords, + num_entries => NumEntries, + timestamp => Timestamp, + data_size => DataSize, + trailer_size => TrailerSize, + header_data => HeaderData, + filter_size => FilterSize, + next_position => NextPos, + position => Pos}}; +parse_header(_, _) -> + {error, invalid_chunk_header}. + %% keep the previous value if the current one is 0 (i.e. no filter in the chunk) read_ahead_fsize(Previous, 0) -> Previous; @@ -3472,6 +3562,9 @@ ra(#{options := #{read_ahead := Limit}}) when is_integer(Limit) -> ra(_) -> #ra{}. +manifest_module() -> + application:get_env(osiris, log_manifest, ?MODULE). + generate_log(Msg, MsgsPerChunk, NumMessages, Directory) -> Name = filename:basename(Directory), @@ -3504,6 +3597,14 @@ write_in_chunks(ToWrite, MsgsPerChunk, Msg, W0) when ToWrite > 0 -> write_in_chunks(_, _, _, W) -> W. +%% Default implementation of osiris_log_manifest: +init_manifest(Config, _WriterType) -> + {Config, undefined}. +handle_event(_Event, undefined) -> + undefined. +close_manifest(undefined) -> + ok. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/src/osiris_log_manifest.erl b/src/osiris_log_manifest.erl new file mode 100644 index 00000000..f553bd44 --- /dev/null +++ b/src/osiris_log_manifest.erl @@ -0,0 +1,42 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. +%% + +-module(osiris_log_manifest). + +-type state() :: term(). + +-type chunk_info() :: + #{id := osiris:offset(), + timestamp := osiris:timestamp(), + epoch := osiris:epoch(), + num := non_neg_integer(), + type := osiris_log:chunk_type(), + %% size of data + filter + trailer + size := non_neg_integer(), + pos := integer()}. + +-type event() :: {segment_opened, + OldSegment :: file:filename_all() | undefined, + NewSegment :: file:filename_all()} | + {chunk_written, chunk_info(), iodata()} | + {retention_updated, [osiris:retention_spec()]}. + +-export_type([state/0, + chunk_info/0, + event/0]). + +-callback overview(Dir :: file:filename_all()) -> + {osiris_log:range(), osiris_log:epoch_offsets()}. + +-callback init_manifest(osiris_log:config(), writer | acceptor) -> + {osiris_log:config(), state()}. + +-callback handle_event(event(), state()) -> state(). + +-callback close_manifest(state()) -> ok. + +-callback delete(osiris_log:config()) -> ok. diff --git a/src/osiris_log_reader.erl b/src/osiris_log_reader.erl new file mode 100644 index 00000000..11e0bc81 --- /dev/null +++ b/src/osiris_log_reader.erl @@ -0,0 +1,96 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. +%% + +-module(osiris_log_reader). + +-export([next_offset/1, + committed_offset/1, + committed_chunk_id/1, + close/1, + send_file/2, + send_file/3, + chunk_iterator/2, + chunk_iterator/3, + iterator_next/1]). + +%% Exported for internal usage +-export([module/0]). + +-type options() :: #{transport => tcp | ssl, + chunk_selector => all | user_data, + filter_spec => osiris_bloom:filter_spec(), + read_ahead => boolean() | non_neg_integer() + }. +-type config() :: + osiris:config() | + #{counter_spec := osiris_log:counter_spec(), + options := options()}. +-type state() :: term(). +-type chunk_iterator() :: term(). +-type send_file_callback() :: + fun((osiris_log:header_map(), BytesToSend :: non_neg_integer()) -> + PrefixData :: binary()). + +-export_type([options/0, + config/0, + state/0, + chunk_iterator/0]). + +-callback init_offset_reader(osiris:offset_spec(), config()) -> + {ok, state()} | + {error, term()}. +-callback resolve_offset_spec(osiris:offset_spec(), config()) -> + {ok, osiris:offset()} | + {error, term()}. + +-callback next_offset(state()) -> osiris:offset(). +-callback committed_offset(state()) -> osiris:offset(). +-callback committed_chunk_id(state()) -> osiris:offset(). +-callback close(state()) -> ok. + +-callback send_file(gen_tcp:socket() | ssl:socket(), state(), + send_file_callback()) -> + {ok, state()} | + {error, term()} | + {end_of_stream, state()}. + +-callback chunk_iterator(state(), + Credit :: pos_integer() | all, + PrevIter :: chunk_iterator() | undefined) -> + {ok, osiris_log:header_map(), chunk_iterator(), state()} | + {end_of_stream, state()} | + {error, term()}. + +-callback iterator_next(chunk_iterator()) -> + {{osiris:offset(), osiris:entry()}, chunk_iterator()} | + end_of_chunk. + +next_offset(State) -> + (module()):?FUNCTION_NAME(State). +committed_offset(State) -> + (module()):?FUNCTION_NAME(State). +committed_chunk_id(State) -> + (module()):?FUNCTION_NAME(State). +close(State) -> + (module()):?FUNCTION_NAME(State). + +send_file(Socket, State) -> + send_file(Socket, State, fun(_, _) -> <<>> end). +send_file(Socket, State, Callback) -> + (module()):?FUNCTION_NAME(Socket, State, Callback). + +chunk_iterator(State, Credit) -> + chunk_iterator(State, Credit, undefined). +chunk_iterator(State, Credit, PrevChunkIterator) -> + (module()):?FUNCTION_NAME(State, Credit, PrevChunkIterator). + +iterator_next(Iterator) -> + (module()):?FUNCTION_NAME(Iterator). + +%% @private +module() -> + application:get_env(osiris, log_reader, osiris_log). diff --git a/test/osiris_log_SUITE.erl b/test/osiris_log_SUITE.erl index 9fa5b7bd..81c709c0 100644 --- a/test/osiris_log_SUITE.erl +++ b/test/osiris_log_SUITE.erl @@ -86,6 +86,7 @@ all_tests() -> evaluate_retention_max_bytes, evaluate_retention_max_age, evaluate_retention_max_age_empty, + evaluate_retention_fun, offset_tracking, offset_tracking_snapshot, many_segment_overview, @@ -1702,6 +1703,38 @@ evaluate_retention_max_age(Config) -> "the retention process didn't delete the oldest segment"), ok. +evaluate_retention_fun(Config) -> + Data = crypto:strong_rand_bytes(1500), + EpochChunks = + [begin {1, [Data || _ <- lists:seq(1, 50)]} end + || _ <- lists:seq(1, 20)], + LDir = ?config(leader_dir, Config), + Log = seed_log(LDir, EpochChunks, Config), + osiris_log:close(Log), + %% delete only the first segment + Fun = fun(IdxFiles) -> + ct:pal("Eval retention for ~p", [IdxFiles]), + lists:splitwith(fun(IdxFile) -> + filename:basename(IdxFile) =:= + <<"00000000000000000000.index">> + end, IdxFiles) + end, + Spec = {'fun', Fun}, + Range = osiris_log:evaluate_retention(LDir, [Spec]), + %% idempotency check + Range = osiris_log:evaluate_retention(LDir, [Spec]), + SegFiles = + filelib:wildcard( + filename:join(LDir, "*.segment")), + ?assertMatch([_], SegFiles), + ?assertEqual([], + lists:filter(fun(S) -> + lists:suffix("00000000000000000000.segment", S) + end, + SegFiles), + "the retention process didn't delete the oldest segment"), + ok. + offset_tracking(Config) -> Conf = ?config(osiris_conf, Config), S0 = osiris_log:init(Conf),