Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions src/osiris.erl
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,12 @@
{abs, offset()} |
offset() |
{timestamp, timestamp()}.
-type retention_fun() :: fun((IdxFiles :: [file:filename_all()]) ->
{ToDelete :: [file:filename_all()], ToKeep :: [file:filename_all()]}).
Comment on lines +71 to +72
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The force pushes were to rebase and modify this retention spec. Before it was

-type retention_fun() :: fun((IdxFile :: file:filename_all()) -> boolean()).

Going through the sorted set of index files, you could return true to delete index+segment file pairs and false to stop evaluating the retention function.

For retention where you want to keep some offset, though, this meant that you had to read the index file contents and a chunk from the segment too, since you don't know the final offset in the segment just from the index file. Passing the full list of index files lets us determine which files to keep and which to delete just from the index file names.

Here is where it's used in rabbitmq_stream_s3 to clean up any segments which have been fully uploaded to the remote tier and are not the current segment: https://github.com/amazon-mq/rabbitmq-stream-s3/blob/57d0a0a93bdb7d3da1e6f03f828e0dfce31019a6/src/rabbitmq_stream_s3_log_manifest.erl#L968-L1069

-type retention_spec() ::
{max_bytes, non_neg_integer()} | {max_age, milliseconds()}.
{max_bytes, non_neg_integer()} |
{max_age, milliseconds()} |
{'fun', retention_fun()}.
-type writer_id() :: binary().
-type batch() :: {batch, NumRecords :: non_neg_integer(),
compression_type(),
Expand All @@ -83,11 +87,6 @@

%% returned when reading
-type entry() :: binary() | batch().
-type reader_options() :: #{transport => tcp | ssl,
chunk_selector => all | user_data,
filter_spec => osiris_bloom:filter_spec(),
read_ahead => boolean() | non_neg_integer()
}.

-export_type([name/0,
config/0,
Expand All @@ -97,6 +96,7 @@
tracking_id/0,
offset_spec/0,
retention_spec/0,
retention_fun/0,
timestamp/0,
writer_id/0,
data/0,
Expand Down Expand Up @@ -228,8 +228,8 @@ init_reader(Pid, OffsetSpec, CounterSpec) ->
chunk_selector => user_data}).

-spec init_reader(pid(), offset_spec(), osiris_log:counter_spec(),
reader_options()) ->
{ok, osiris_log:state()} |
osiris_log_reader:options()) ->
{ok, osiris_log_reader:state()} |
{error, {offset_out_of_range, empty | {offset(), offset()}}} |
{error, {invalid_last_offset_epoch, offset(), offset()}}.
init_reader(Pid, OffsetSpec, {_, _} = CounterSpec, Options)
Expand All @@ -238,9 +238,9 @@ init_reader(Pid, OffsetSpec, {_, _} = CounterSpec, Options)
Ctx0 = osiris_util:get_reader_context(Pid),
Ctx = Ctx0#{counter_spec => CounterSpec,
options => Options},
osiris_log:init_offset_reader(OffsetSpec, Ctx).
(osiris_log_reader:module()):init_offset_reader(OffsetSpec, Ctx).

-spec resolve_offset_spec(pid(), offset_spec(), reader_options()) ->
-spec resolve_offset_spec(pid(), offset_spec(), osiris_log_reader:options()) ->
{ok, offset()} |
{error, no_index_file} |
{error, {offset_out_of_range, osiris_log:range()}} |
Expand All @@ -249,7 +249,7 @@ resolve_offset_spec(Pid, OffsetSpec, Options)
when is_pid(Pid) andalso node(Pid) =:= node() ->
Ctx0 = osiris_util:get_reader_context(Pid),
Ctx = Ctx0#{options => Options},
osiris_log:resolve_offset_spec(OffsetSpec, Ctx).
(osiris_log_reader:module()):resolve_offset_spec(OffsetSpec, Ctx).

-spec register_offset_listener(pid(), offset()) -> ok.
register_offset_listener(Pid, Offset) ->
Expand Down
Loading