Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# v1.1.2

## New

- Add Extreme.set/get_metadata
- Add Extreme.scavenge_database
- Add Extreme.read_events/3 and Extreme.read_events_backwards/3

# v1.1.1

## Breaking changes
Expand Down
200 changes: 200 additions & 0 deletions lib/extreme.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ defmodule Extreme do
@doc false
defmacro __using__(opts \\ []) do
quote do
alias Extreme.Messages, as: ExMsg

@otp_app Keyword.get(unquote(opts), :otp_app, :extreme)

defp _default_config,
Expand Down Expand Up @@ -37,6 +39,204 @@ defmodule Extreme do
)
end

@doc """
Reads events from `stream_name` given `opts`
as keyword list of `Extreme.Reading.Params` keys,
and invokes `fun` for each read event. `fun` should return `:ok` or `:stop`
if further processing of events should be stopped
"""
@spec read_events(String.t(), Keyword.t(), (ExMsg.StreamEventAppeared.t() ->
:ok
| :stop
| {:stop, response :: any()})) ::
:finished
| {:stopped, response :: nil | any()}
| {:error, :no_stream | :stream_hard_deleted}
| {:error, :unexpected_processing_response, any()}
def read_events(stream_name, opts \\ [], fun) do
params =
[{:stream, stream_name} | opts]
|> Enum.into(%{})
|> Extreme.Reading.Params.new()

Extreme.Reading.read_events(__MODULE__, params, fun)
end

@doc """
Reads events backwards from `stream_name` given `opts`
as keyword list of `Extreme.Reading.Params` keys,
and invokes `fun` for each read event. `fun` should return `:ok` or `:stop`
if further processing of events should be stopped
"""
@spec read_events_backwards(String.t(), Keyword.t(), (ExMsg.StreamEventAppeared.t() ->
:ok
| :stop
| {:stop, response :: any()})) ::
:finished
| {:stopped, response :: any()}
| {:error, :no_stream | :stream_hard_deleted}
| {:error, :unexpected_processing_response, any()}
def read_events_backwards(stream_name, opts \\ [], fun) do
params =
[{:stream, stream_name} | opts]
|> Enum.into(%{})
|> Extreme.Reading.Params.new_backwards()

Extreme.Reading.read_events(__MODULE__, params, fun)
end

@spec reduce_events(acc :: any(), String.t(), Keyword.t(), (ExMsg.StreamEventAppeared.t(),
acc :: any() ->
{:ok, acc :: any()}
| {:stop, acc :: any()})) ::
{:finished, acc :: any()}
| {:stopped, acc :: any()}
| {:error, :no_stream | :stream_hard_deleted}
| {:error, :unexpected_processing_response, any()}
def reduce_events(acc, stream_name, opts \\ [], fun) do
params =
[{:stream, stream_name} | opts]
|> Enum.into(%{})
|> Extreme.Reading.Params.new()

Extreme.Reading.reduce_events(__MODULE__, acc, params, fun)
end

@spec reduce_events_backwards(
acc :: any(),
String.t(),
Keyword.t(),
(ExMsg.StreamEventAppeared.t(), acc :: any() ->
{:ok, acc :: any()}
| {:stop, acc :: any()})
) ::
{:finished, acc :: any()}
| {:stopped, acc :: any()}
| {:error, :no_stream | :stream_hard_deleted}
| {:error, :unexpected_processing_response, any()}
def reduce_events_backwards(acc, stream_name, opts \\ [], fun) do
params =
[{:stream, stream_name} | opts]
|> Enum.into(%{})
|> Extreme.Reading.Params.new_backwards()

Extreme.Reading.reduce_events(__MODULE__, acc, params, fun)
end

@doc """
Sets metadata map to stream. To remove metadata, set an empty map.

Example:
metadata = %{ "$maxAge" => max_age_seconds }
:ok = MyConn.set_metadata("user-123", metadata)
"""
@spec set_metadata(String.t(), map()) :: :ok | any()
def set_metadata(stream, %{} = metadata) do
stream
|> _write_metadata(metadata)
|> execute()
|> case do
{:ok, %ExMsg.WriteEventsCompleted{result: :success}} -> :ok
{:ok, %ExMsg.WriteEventsCompleted{result: result}} -> {:error, result}
other -> other
end
end

@doc """
Gets metadata map from stream.

Example:
> stream = "users-123"
> MyConn.get_metadata(stream)
{:error, :no_stream}

> max_age_seconds = 60 * 60 * 24 # keep events 1 day
> metadata = %{ "$maxAge" => max_age_seconds }
> :ok = MyConn.set_metadata(stream, metadata)

> MyConn.get_metadata(stream)
{:ok, %{ "$maxAge" => 86_400 }}
"""
@spec get_metadata(String.t()) :: {:ok, map()} | {:error, :no_stream}
def get_metadata(stream) do
stream
|> _read_metadata()
|> execute()
|> case do
{:error, :no_stream, %ExMsg.ReadStreamEventsCompleted{result: :no_stream}} ->
{:error, :no_stream}

{:ok,
%ExMsg.ReadStreamEventsCompleted{
events: [
%ExMsg.ResolvedIndexedEvent{
event: %Extreme.Messages.EventRecord{
event_stream_id: "$$" <> ^stream,
event_type: "$metadata",
data: data
}
}
| _
],
result: :success
}} ->
data
|> Jason.decode()
|> case do
{:ok, decoded} -> {:ok, decoded}
_ -> data
end
end
end

@doc """
Starts database scavenge on current connection. Pay attention that
if cluster is used, scavenge will be executed only on connected node!
"""
@spec scavenge_database() :: :ok | any()
def scavenge_database() do
ExMsg.ScavengeDatabase.new()
|> execute()
|> case do
{:ok, %Extreme.Messages.ScavengeDatabaseCompleted{result: :success}} -> :ok
{:ok, %Extreme.Messages.ScavengeDatabaseCompleted{result: other}} -> {:error, other}
other -> other
end
end

defp _write_metadata(stream, %{} = metadata) do
metadata_stream_name = "$$" <> stream

proto_event =
ExMsg.NewEvent.new(
event_id: Extreme.Tools.generate_uuid(),
event_type: "$metadata",
data_content_type: 1,
metadata_content_type: 1,
data: Jason.encode!(metadata),
metadata: ""
)

ExMsg.WriteEvents.new(
event_stream_id: metadata_stream_name,
expected_version: -2,
events: [proto_event],
require_master: false
)
end

defp _read_metadata(stream) do
metadata_stream_name = "$$" <> stream

Extreme.Messages.ReadStreamEventsBackward.new(
event_stream_id: metadata_stream_name,
from_event_number: -1,
max_count: 1,
resolve_link_tos: false,
require_master: false
)
end

def subscribe_to(stream, subscriber, resolve_link_tos \\ true, ack_timeout \\ 5_000)
when is_binary(stream) and is_pid(subscriber) and is_boolean(resolve_link_tos) do
Extreme.RequestManager.subscribe_to(
Expand Down
5 changes: 3 additions & 2 deletions lib/extreme/message_resolver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ defmodule Extreme.MessageResolver do
def encode_cmd(:update_persistent_subscription), do: 0xCE
def encode_cmd(:update_persistent_subscription_completed), do: 0xCF

def encode_cmd(:scavenge_database), do: 0xD0
def encode_cmd(:scavenge_database_completed), do: 0xD1
def encode_cmd(Msg.ScavengeDatabase), do: 0xD0

def encode_cmd(:not_handled), do: 0xF1
def encode_cmd(:authenticate), do: 0xF2
Expand Down Expand Up @@ -82,6 +81,8 @@ defmodule Extreme.MessageResolver do
def decode_cmd(0xC9), do: Msg.CreatePersistentSubscriptionCompleted
def decode_cmd(0xCB), do: Msg.DeletePersistentSubscriptionCompleted

def decode_cmd(0xD1), do: Msg.ScavengeDatabaseCompleted

def decode_cmd(0xF0), do: :bad_request
def decode_cmd(0xF4), do: :not_authenticated
def decode_cmd(0xF6), do: :client_identified
Expand Down
Loading
Loading