diff --git a/CHANGELOG.md b/CHANGELOG.md index 490f542..8541293 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +# v1.1.2 + +## New + +- Add Extreme.set/get_metadata +- Add Extreme.scavenge_database +- Add Extreme.read_events/3 and Extreme.read_events_backwards/3 + # v1.1.1 ## Breaking changes diff --git a/lib/extreme.ex b/lib/extreme.ex index 8ac6358..6d06071 100644 --- a/lib/extreme.ex +++ b/lib/extreme.ex @@ -8,6 +8,8 @@ defmodule Extreme do @doc false defmacro __using__(opts \\ []) do quote do + alias Extreme.Messages, as: ExMsg + @otp_app Keyword.get(unquote(opts), :otp_app, :extreme) defp _default_config, @@ -37,6 +39,204 @@ defmodule Extreme do ) end + @doc """ + Reads events from `stream_name` given `opts` + as keyword list of `Extreme.Reading.Params` keys, + and invokes `fun` for each read event. `fun` should return `:ok` or `:stop` + if further processing of events should be stopped + """ + @spec read_events(String.t(), Keyword.t(), (ExMsg.StreamEventAppeared.t() -> + :ok + | :stop + | {:stop, response :: any()})) :: + :finished + | {:stopped, response :: nil | any()} + | {:error, :no_stream | :stream_hard_deleted} + | {:error, :unexpected_processing_response, any()} + def read_events(stream_name, opts \\ [], fun) do + params = + [{:stream, stream_name} | opts] + |> Enum.into(%{}) + |> Extreme.Reading.Params.new() + + Extreme.Reading.read_events(__MODULE__, params, fun) + end + + @doc """ + Reads events backwards from `stream_name` given `opts` + as keyword list of `Extreme.Reading.Params` keys, + and invokes `fun` for each read event. `fun` should return `:ok` or `:stop` + if further processing of events should be stopped + """ + @spec read_events_backwards(String.t(), Keyword.t(), (ExMsg.StreamEventAppeared.t() -> + :ok + | :stop + | {:stop, response :: any()})) :: + :finished + | {:stopped, response :: any()} + | {:error, :no_stream | :stream_hard_deleted} + | {:error, :unexpected_processing_response, any()} + def read_events_backwards(stream_name, opts \\ [], fun) do + params = + [{:stream, stream_name} | opts] + |> Enum.into(%{}) + |> Extreme.Reading.Params.new_backwards() + + Extreme.Reading.read_events(__MODULE__, params, fun) + end + + @spec reduce_events(acc :: any(), String.t(), Keyword.t(), (ExMsg.StreamEventAppeared.t(), + acc :: any() -> + {:ok, acc :: any()} + | {:stop, acc :: any()})) :: + {:finished, acc :: any()} + | {:stopped, acc :: any()} + | {:error, :no_stream | :stream_hard_deleted} + | {:error, :unexpected_processing_response, any()} + def reduce_events(acc, stream_name, opts \\ [], fun) do + params = + [{:stream, stream_name} | opts] + |> Enum.into(%{}) + |> Extreme.Reading.Params.new() + + Extreme.Reading.reduce_events(__MODULE__, acc, params, fun) + end + + @spec reduce_events_backwards( + acc :: any(), + String.t(), + Keyword.t(), + (ExMsg.StreamEventAppeared.t(), acc :: any() -> + {:ok, acc :: any()} + | {:stop, acc :: any()}) + ) :: + {:finished, acc :: any()} + | {:stopped, acc :: any()} + | {:error, :no_stream | :stream_hard_deleted} + | {:error, :unexpected_processing_response, any()} + def reduce_events_backwards(acc, stream_name, opts \\ [], fun) do + params = + [{:stream, stream_name} | opts] + |> Enum.into(%{}) + |> Extreme.Reading.Params.new_backwards() + + Extreme.Reading.reduce_events(__MODULE__, acc, params, fun) + end + + @doc """ + Sets metadata map to stream. To remove metadata, set an empty map. + + Example: + metadata = %{ "$maxAge" => max_age_seconds } + :ok = MyConn.set_metadata("user-123", metadata) + """ + @spec set_metadata(String.t(), map()) :: :ok | any() + def set_metadata(stream, %{} = metadata) do + stream + |> _write_metadata(metadata) + |> execute() + |> case do + {:ok, %ExMsg.WriteEventsCompleted{result: :success}} -> :ok + {:ok, %ExMsg.WriteEventsCompleted{result: result}} -> {:error, result} + other -> other + end + end + + @doc """ + Gets metadata map from stream. + + Example: + > stream = "users-123" + > MyConn.get_metadata(stream) + {:error, :no_stream} + + > max_age_seconds = 60 * 60 * 24 # keep events 1 day + > metadata = %{ "$maxAge" => max_age_seconds } + > :ok = MyConn.set_metadata(stream, metadata) + + > MyConn.get_metadata(stream) + {:ok, %{ "$maxAge" => 86_400 }} + """ + @spec get_metadata(String.t()) :: {:ok, map()} | {:error, :no_stream} + def get_metadata(stream) do + stream + |> _read_metadata() + |> execute() + |> case do + {:error, :no_stream, %ExMsg.ReadStreamEventsCompleted{result: :no_stream}} -> + {:error, :no_stream} + + {:ok, + %ExMsg.ReadStreamEventsCompleted{ + events: [ + %ExMsg.ResolvedIndexedEvent{ + event: %Extreme.Messages.EventRecord{ + event_stream_id: "$$" <> ^stream, + event_type: "$metadata", + data: data + } + } + | _ + ], + result: :success + }} -> + data + |> Jason.decode() + |> case do + {:ok, decoded} -> {:ok, decoded} + _ -> data + end + end + end + + @doc """ + Starts database scavenge on current connection. Pay attention that + if cluster is used, scavenge will be executed only on connected node! + """ + @spec scavenge_database() :: :ok | any() + def scavenge_database() do + ExMsg.ScavengeDatabase.new() + |> execute() + |> case do + {:ok, %Extreme.Messages.ScavengeDatabaseCompleted{result: :success}} -> :ok + {:ok, %Extreme.Messages.ScavengeDatabaseCompleted{result: other}} -> {:error, other} + other -> other + end + end + + defp _write_metadata(stream, %{} = metadata) do + metadata_stream_name = "$$" <> stream + + proto_event = + ExMsg.NewEvent.new( + event_id: Extreme.Tools.generate_uuid(), + event_type: "$metadata", + data_content_type: 1, + metadata_content_type: 1, + data: Jason.encode!(metadata), + metadata: "" + ) + + ExMsg.WriteEvents.new( + event_stream_id: metadata_stream_name, + expected_version: -2, + events: [proto_event], + require_master: false + ) + end + + defp _read_metadata(stream) do + metadata_stream_name = "$$" <> stream + + Extreme.Messages.ReadStreamEventsBackward.new( + event_stream_id: metadata_stream_name, + from_event_number: -1, + max_count: 1, + resolve_link_tos: false, + require_master: false + ) + end + def subscribe_to(stream, subscriber, resolve_link_tos \\ true, ack_timeout \\ 5_000) when is_binary(stream) and is_pid(subscriber) and is_boolean(resolve_link_tos) do Extreme.RequestManager.subscribe_to( diff --git a/lib/extreme/message_resolver.ex b/lib/extreme/message_resolver.ex index 06b9e53..5ec1f88 100644 --- a/lib/extreme/message_resolver.ex +++ b/lib/extreme/message_resolver.ex @@ -49,8 +49,7 @@ defmodule Extreme.MessageResolver do def encode_cmd(:update_persistent_subscription), do: 0xCE def encode_cmd(:update_persistent_subscription_completed), do: 0xCF - def encode_cmd(:scavenge_database), do: 0xD0 - def encode_cmd(:scavenge_database_completed), do: 0xD1 + def encode_cmd(Msg.ScavengeDatabase), do: 0xD0 def encode_cmd(:not_handled), do: 0xF1 def encode_cmd(:authenticate), do: 0xF2 @@ -82,6 +81,8 @@ defmodule Extreme.MessageResolver do def decode_cmd(0xC9), do: Msg.CreatePersistentSubscriptionCompleted def decode_cmd(0xCB), do: Msg.DeletePersistentSubscriptionCompleted + def decode_cmd(0xD1), do: Msg.ScavengeDatabaseCompleted + def decode_cmd(0xF0), do: :bad_request def decode_cmd(0xF4), do: :not_authenticated def decode_cmd(0xF6), do: :client_identified diff --git a/lib/extreme/reading.ex b/lib/extreme/reading.ex new file mode 100644 index 0000000..c58c7b4 --- /dev/null +++ b/lib/extreme/reading.ex @@ -0,0 +1,332 @@ +defmodule Extreme.Reading do + alias Extreme.RequestManager + alias Extreme.Messages, as: ExMsg + + require Logger + + @dont_reduce :dont_reduce + + defmodule Params do + @type t() :: %__MODULE__{ + stream: String.t(), + direction: :forward | :backwards, + from_event_number: integer(), + read_until: :all | integer(), + per_page: integer(), + resolve_link_tos: boolean(), + require_master: boolean() + } + + defstruct ~w(stream direction from_event_number read_until per_page resolve_link_tos require_master )a + + @spec new(map()) :: t() + def new(%{} = opts) do + %__MODULE__{ + direction: :forward, + from_event_number: 0, + read_until: :all, + per_page: 100, + resolve_link_tos: true, + require_master: false + } + |> Map.merge(opts) + end + + @spec new_backwards(map()) :: t() + def new_backwards(%{} = opts) do + %__MODULE__{ + direction: :backwards, + from_event_number: -1, + read_until: :all, + per_page: 100, + resolve_link_tos: true, + require_master: false + } + |> Map.merge(opts) + end + end + + @spec read_events(module(), Params.t(), (ExMsg.StreamEventAppeared.t() -> + :ok | :stop | {:stop, response :: any()})) :: + :finished + | {:stopped, response :: any()} + | {:error, :no_stream | :stream_hard_deleted} + | {:error, :unexpected_processing_response, any()} + def read_events(base_name, %Params{} = params, fun) do + base_name + |> reduce_events(@dont_reduce, params, fun) + |> case do + {:finished, _} -> :finished + other -> other + end + end + + @spec reduce_events(module(), any(), Params.t(), (ExMsg.StreamEventAppeared.t(), acc :: any() -> + {:ok, acc :: any} + | {:stop, acc :: any()})) :: + {:finished, acc :: any()} + | {:stopped, acc :: any()} + | {:error, :no_stream | :stream_hard_deleted} + | {:error, :unexpected_processing_response, any()} + def reduce_events(base_name, acc, %Params{} = params, fun) do + params + |> _build_request() + |> case do + {request, false} -> + base_name + |> _execute_read(request, params, fun, acc) + |> case do + {:end_of_stream, acc} -> {:finished, acc} + {:continue, _, acc} -> {:finished, acc} + other -> other + end + + {request, true} -> + base_name + |> _execute_read(request, params, fun, acc) + |> case do + {:end_of_stream, acc} -> + {:finished, acc} + + {:continue, from, acc} -> + reduce_events(base_name, acc, %Params{params | from_event_number: from}, fun) + + other -> + other + end + end + end + + defp _execute_read(base_name, request, params, fun, acc) do + base_name + |> RequestManager.execute(request, Extreme.Tools.generate_uuid()) + |> _process_read_response(params, fun, acc) + end + + @spec _build_request(Params.t()) :: {ExMsg.ReadStreamEvents.t(), continue? :: boolean()} + defp _build_request( + %Params{ + direction: :forward, + read_until: :all + } = params + ) do + request = + ExMsg.ReadStreamEvents.new( + event_stream_id: params.stream, + from_event_number: params.from_event_number, + max_count: params.per_page, + resolve_link_tos: params.resolve_link_tos, + require_master: params.require_master + ) + + {request, true} + end + + defp _build_request( + %Params{ + direction: :forward, + from_event_number: from, + per_page: per_page, + read_until: read_until + } = params + ) + when is_integer(read_until) and from + per_page < read_until do + request = + ExMsg.ReadStreamEvents.new( + event_stream_id: params.stream, + from_event_number: params.from_event_number, + max_count: params.per_page, + resolve_link_tos: params.resolve_link_tos, + require_master: params.require_master + ) + + {request, true} + end + + defp _build_request(%Params{direction: :forward} = params) do + request = + ExMsg.ReadStreamEvents.new( + event_stream_id: params.stream, + from_event_number: params.from_event_number, + max_count: params.read_until - params.from_event_number, + resolve_link_tos: params.resolve_link_tos, + require_master: params.require_master + ) + + {request, false} + end + + # backwards + + defp _build_request( + %Params{ + direction: :backwards, + from_event_number: -1 + } = params + ) do + request = + ExMsg.ReadStreamEventsBackward.new( + event_stream_id: params.stream, + from_event_number: params.from_event_number, + max_count: params.per_page, + resolve_link_tos: params.resolve_link_tos, + require_master: params.require_master + ) + + {request, true} + end + + defp _build_request( + %Params{ + direction: :backwards, + read_until: :all + } = params + ) do + request = + ExMsg.ReadStreamEventsBackward.new( + event_stream_id: params.stream, + from_event_number: params.from_event_number, + max_count: params.per_page, + resolve_link_tos: params.resolve_link_tos, + require_master: params.require_master + ) + + {request, true} + end + + defp _build_request( + %Params{ + direction: :backwards, + from_event_number: from, + per_page: per_page, + read_until: read_until + } = params + ) + when from - per_page > read_until do + request = + ExMsg.ReadStreamEventsBackward.new( + event_stream_id: params.stream, + from_event_number: params.from_event_number, + max_count: params.per_page, + resolve_link_tos: params.resolve_link_tos, + require_master: params.require_master + ) + + {request, true} + end + + defp _build_request(%Params{direction: :backwards} = params) do + request = + ExMsg.ReadStreamEventsBackward.new( + event_stream_id: params.stream, + from_event_number: params.from_event_number, + max_count: params.from_event_number - params.read_until, + resolve_link_tos: params.resolve_link_tos, + require_master: params.require_master + ) + + {request, false} + end + + defp _process_read_response({:error, :stream_deleted, _}, _params, _fun, _acc), + do: {:error, :stream_hard_deleted} + + defp _process_read_response({:error, :no_stream, _}, _params, _fun, _acc), + do: {:error, :no_stream} + + defp _process_read_response( + {:ok, %ExMsg.ReadStreamEventsCompleted{} = response}, + %Params{} = params, + fun, + acc + ) do + Logger.debug(fn -> + last_read_event_number = + case params do + %Params{direction: :forward} -> response.next_event_number - 1 + %Params{direction: :backwards} -> response.next_event_number + 1 + end + + "Last read event: #{last_read_event_number}" + end) + + response.events + |> Enum.reduce_while({:continue, response.next_event_number, acc}, fn + msg, {:continue, _, acc} -> + msg + |> _process_message(params, fun, acc) + |> case do + {:out_of_requested_bounds, acc} -> + {:halt, {:continue, response.next_event_number, acc}} + + {:ok, acc} -> + {:cont, {:continue, response.next_event_number, acc}} + + {:stop, @dont_reduce} -> + Logger.info("Processing of read message is stopped") + {:halt, {:stopped, nil}} + + {{:stop, return}, @dont_reduce} -> + Logger.info("Processing of read message is stopped") + {:halt, {:stopped, return}} + + {:stop, return} -> + Logger.info("Processing of read message is stopped") + {:halt, {:stopped, return}} + + {other, @dont_reduce} -> + Logger.warning( + "Processing of read message didn't succeed [msg, response]: #{inspect([msg, other])}" + ) + + {:halt, {:error, :unexpected_processing_response, other}} + + other -> + Logger.warning( + "Processing of read message didn't succeed [msg, response]: #{inspect([msg, other])}" + ) + + {:halt, {:error, :unexpected_processing_response, other}} + end + end) + |> case do + {:continue, _, acc} = resp -> + if response.is_end_of_stream, + do: {:end_of_stream, acc}, + else: resp + + other -> + other + end + end + + defp _process_message( + msg, + %Params{direction: :backwards, from_event_number: -1, read_until: read_until}, + fun, + acc + ) + when is_integer(read_until) do + if _event_number(msg) <= read_until do + {:out_of_requested_bounds, acc} + else + if acc == @dont_reduce do + {fun.(msg), acc} + else + fun.(msg, acc) + end + end + end + + defp _process_message(msg, _, fun, @dont_reduce), + do: {fun.(msg), @dont_reduce} + + defp _process_message(msg, _, fun, acc), + do: fun.(msg, acc) + + defp _event_number(%{event: event, link: nil}), + do: event.event_number + + defp _event_number(%{link: link}), + do: link.event_number +end diff --git a/lib/extreme/reading_subscription.ex b/lib/extreme/reading_subscription.ex index 7655373..f7f50e0 100644 --- a/lib/extreme/reading_subscription.ex +++ b/lib/extreme/reading_subscription.ex @@ -186,9 +186,10 @@ defmodule Extreme.ReadingSubscription do end defp _send_next_request(next_event_number, state) do + state = %{state | read_params: %{state.read_params | from_event_number: next_event_number}} Logger.debug(fn -> "Reading new batch of events #{inspect(state.read_params)}" end) GenServer.cast(self(), :read_events) - %{state | read_params: %{state.read_params | from_event_number: next_event_number}} + state end defp _read_events_message(%{from_event_number: from, per_page: per_page} = params, read_until) diff --git a/lib/extreme/response.ex b/lib/extreme/response.ex index 151fcce..fd46436 100644 --- a/lib/extreme/response.ex +++ b/lib/extreme/response.ex @@ -22,7 +22,7 @@ defmodule Extreme.Response do {:error, :bad_request, correlation_id} response_struct -> - data = response_struct.decode(data) + data = apply(response_struct, :decode, [data]) # IO.inspect([correlation_id, data.__struct__], label: "Received from ES") {auth, correlation_id, data} end diff --git a/mix.exs b/mix.exs index ce1472d..220b9e2 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Extreme.Mixfile do def project do [ app: :extreme, - version: "1.1.1", + version: "1.1.2", elixir: "~> 1.11", elixirc_paths: _elixirc_paths(Mix.env()), source_url: "https://github.com/exponentially/extreme", @@ -15,7 +15,7 @@ defmodule Extreme.Mixfile do start_permanent: Mix.env() == :prod, test_coverage: [ summary: [ - threshold: 83 + threshold: 84 ] ], preferred_cli_env: [ diff --git a/test/extreme/reading_test.exs b/test/extreme/reading_test.exs new file mode 100644 index 0000000..af21cab --- /dev/null +++ b/test/extreme/reading_test.exs @@ -0,0 +1,541 @@ +defmodule Extreme.ReadingTest do + use ExUnit.Case, async: true + + alias ExtremeTest.Helpers + alias ExtremeTest.Events, as: Event + alias Extreme.Messages, as: ExMsg + + describe "reading forwards" do + test "returns error if stream doesn't exist" do + assert {:error, :no_stream} = + TestConn.read_events("non_existing", fn %{event: event} -> + send(self(), {:event_processed, :erlang.binary_to_term(event.data)}) + :ok + end) + + refute_receive {:event_processed, _} + end + + test "all events from start in a single batch" do + count = 20 + {stream, _events} = _write_events(count) + + opts = [per_page: 100] + + assert :finished = + TestConn.read_events(stream, opts, fn %{event: event} -> + send(self(), {:event_processed, :erlang.binary_to_term(event.data)}) + :ok + end) + + for i <- 0..(count - 1) do + expected_name = "Reading #{i}" + assert_received {:event_processed, %Event.PersonCreated{name: ^expected_name}} + end + + refute_receive {:event_processed, _} + end + + test "all events from start in multiple batches" do + count = 20 + {stream, _events} = _write_events(count) + + opts = [per_page: 3] + + assert :finished = + TestConn.read_events(stream, opts, fn %{event: event} -> + send(self(), {:event_processed, :erlang.binary_to_term(event.data)}) + :ok + end) + + for i <- 0..(count - 1) do + expected_name = "Reading #{i}" + assert_received {:event_processed, %Event.PersonCreated{name: ^expected_name}} + end + + refute_receive {:event_processed, _} + end + + test "events from non-zero starting point" do + count = 20 + {stream, _events} = _write_events(count) + + opts = [per_page: 3, from_event_number: 10] + + assert :finished = + TestConn.read_events(stream, opts, fn %{event: event} -> + send(self(), {:event_processed, :erlang.binary_to_term(event.data)}) + :ok + end) + + for i <- 10..(count - 1) do + expected_name = "Reading #{i}" + assert_received {:event_processed, %Event.PersonCreated{name: ^expected_name}} + end + + refute_receive {:event_processed, _} + end + + test "events from start until specified event number (exclusive) with full last batch" do + count = 20 + {stream, _events} = _write_events(count) + + opts = [per_page: 3, read_until: 6] + + assert :finished = + TestConn.read_events(stream, opts, fn %{event: event} -> + send(self(), {:event_processed, :erlang.binary_to_term(event.data)}) + :ok + end) + + for i <- 0..5 do + expected_name = "Reading #{i}" + assert_received {:event_processed, %Event.PersonCreated{name: ^expected_name}} + end + + refute_receive {:event_processed, _} + end + + test "events from start until specified event number (exclusive) with partial last batch" do + count = 20 + {stream, _events} = _write_events(count) + + opts = [per_page: 3, read_until: 4] + + assert :finished = + TestConn.read_events(stream, opts, fn %{event: event} -> + send(self(), {:event_processed, :erlang.binary_to_term(event.data)}) + :ok + end) + + for i <- 0..3 do + expected_name = "Reading #{i}" + assert_received {:event_processed, %Event.PersonCreated{name: ^expected_name}} + end + + refute_receive {:event_processed, _} + end + + test "can be stopped" do + count = 20 + {stream, _events} = _write_events(count) + + opts = [per_page: 7] + + assert {:stopped, %ExMsg.ResolvedIndexedEvent{event: %{event_number: 12}}} = + TestConn.read_events(stream, opts, fn %{event: event} = msg -> + event.data + |> :erlang.binary_to_term() + |> case do + %Event.PersonCreated{name: "Reading 12"} -> + {:stop, msg} + + other -> + send(self(), {:event_processed, other}) + :ok + end + end) + + for i <- 0..11 do + expected_name = "Reading #{i}" + assert_received {:event_processed, %Event.PersonCreated{name: ^expected_name}} + end + + refute_receive {:event_processed, _} + end + + test "stops with error on unexpected processing response" do + count = 20 + {stream, _events} = _write_events(count) + + opts = [per_page: 8] + + assert {:error, :unexpected_processing_response, :invalid_response} = + TestConn.read_events(stream, opts, fn %{event: event} -> + event.data + |> :erlang.binary_to_term() + |> case do + %Event.PersonCreated{name: "Reading 13"} -> + :invalid_response + + other -> + send(self(), {:event_processed, other}) + :ok + end + end) + + for i <- 0..12 do + expected_name = "Reading #{i}" + assert_received {:event_processed, %Event.PersonCreated{name: ^expected_name}} + end + + refute_receive {:event_processed, _} + end + end + + describe "reading backwards" do + test "returns error if stream doesn't exist" do + assert {:error, :no_stream} = + TestConn.read_events_backwards("non_existing", fn %{event: event} -> + send(self(), {:event_processed, :erlang.binary_to_term(event.data)}) + :ok + end) + + refute_receive {:event_processed, _} + end + + test "all events in a single batch" do + count = 20 + {stream, _events} = _write_events(count) + + opts = [per_page: 100] + + assert :finished = + TestConn.read_events_backwards(stream, opts, fn %{event: event} -> + send(self(), {:event_processed, :erlang.binary_to_term(event.data)}) + :ok + end) + + for i <- (count - 1)..0//-1 do + expected_name = "Reading #{i}" + assert_received {:event_processed, %Event.PersonCreated{name: ^expected_name}} + end + + refute_receive {:event_processed, _} + end + + test "all events in multiple batches" do + count = 20 + {stream, _events} = _write_events(count) + + opts = [per_page: 3] + + assert :finished = + TestConn.read_events_backwards(stream, opts, fn %{event: event} -> + send(self(), {:event_processed, :erlang.binary_to_term(event.data)}) + :ok + end) + + for i <- (count - 1)..0//-1 do + expected_name = "Reading #{i}" + assert_received {:event_processed, %Event.PersonCreated{name: ^expected_name}} + end + + refute_receive {:event_processed, _} + end + + test "events from non-minus-one starting point" do + count = 20 + {stream, _events} = _write_events(count) + + opts = [per_page: 2, from_event_number: 10] + + assert :finished = + TestConn.read_events_backwards(stream, opts, fn %{event: event} -> + send(self(), {:event_processed, :erlang.binary_to_term(event.data)}) + :ok + end) + + for i <- 10..0//-1 do + expected_name = "Reading #{i}" + assert_received {:event_processed, %Event.PersonCreated{name: ^expected_name}} + end + + refute_receive {:event_processed, _} + end + + test "events from minus-one until specified event number (exclusive) in a single batch" do + count = 20 + {stream, _events} = _write_events(count) + + opts = [per_page: 50, read_until: 6] + + assert :finished = + TestConn.read_events_backwards(stream, opts, fn %{event: event} -> + send(self(), {:event_processed, :erlang.binary_to_term(event.data)}) + :ok + end) + + for i <- 19..7//-1 do + expected_name = "Reading #{i}" + assert_received {:event_processed, %Event.PersonCreated{name: ^expected_name}} + end + + refute_receive {:event_processed, _} + end + + test "events from non-minus-one until specified event number (exclusive)" do + count = 20 + {stream, _events} = _write_events(count) + + # kada je ceo batch onda radi ... nesto zeza sa racunanjem kada da pre stane da cita + # proveri isto read_until kada krece od -1 + opts = [per_page: 3, from_event_number: 10, read_until: 6] + + assert :finished = + TestConn.read_events_backwards(stream, opts, fn %{event: event} -> + send(self(), {:event_processed, :erlang.binary_to_term(event.data)}) + :ok + end) + + for i <- 10..7//-1 do + expected_name = "Reading #{i}" + assert_received {:event_processed, %Event.PersonCreated{name: ^expected_name}} + end + + refute_receive {:event_processed, _} + end + + test "can be stopped" do + count = 20 + {stream, _events} = _write_events(count) + + opts = [per_page: 2] + + assert {:stopped, nil} = + TestConn.read_events_backwards(stream, opts, fn %{event: event} -> + event.data + |> :erlang.binary_to_term() + |> case do + %Event.PersonCreated{name: "Reading 13"} -> + :stop + + other -> + send(self(), {:event_processed, other}) + :ok + end + end) + + for i <- (count - 1)..14//-1 do + expected_name = "Reading #{i}" + assert_received {:event_processed, %Event.PersonCreated{name: ^expected_name}} + end + + refute_receive {:event_processed, _} + end + + test "stops with error on unexpected processing response" do + count = 20 + {stream, _events} = _write_events(count) + + opts = [per_page: 2] + + assert {:error, :unexpected_processing_response, :invalid_response} = + TestConn.read_events_backwards(stream, opts, fn %{event: event} -> + event.data + |> :erlang.binary_to_term() + |> case do + %Event.PersonCreated{name: "Reading 13"} -> + :invalid_response + + other -> + send(self(), {:event_processed, other}) + :ok + end + end) + + for i <- (count - 1)..14//-1 do + expected_name = "Reading #{i}" + assert_received {:event_processed, %Event.PersonCreated{name: ^expected_name}} + end + + refute_receive {:event_processed, _} + end + end + + describe "reduce forwards" do + test "returns error if stream doesn't exist" do + assert {:error, :no_stream} = + :whatever + |> TestConn.reduce_events("non_existing", fn + %{event: event}, acc -> + send(self(), {:event_processed, :erlang.binary_to_term(event.data), acc}) + {:ok, acc} + end) + + refute_receive {:event_processed, _} + end + + test "all events from start in a single batch" do + count = 20 + {stream, _events} = _write_events(count) + + opts = [per_page: 100] + + assert {:finished, 20} = + TestConn.reduce_events(0, stream, opts, fn + %{event: event}, acc -> + send(self(), {:event_processed, :erlang.binary_to_term(event.data), acc}) + {:ok, acc + 1} + end) + + for i <- 0..(count - 1) do + expected_name = "Reading #{i}" + assert_received {:event_processed, %Event.PersonCreated{name: ^expected_name}, ^i} + end + + refute_receive {:event_processed, _, _} + end + + test "all events from start in a multiple batches" do + count = 20 + {stream, _events} = _write_events(count) + + opts = [per_page: 6] + + assert {:finished, 20} = + TestConn.reduce_events(0, stream, opts, fn + %{event: event}, acc -> + send(self(), {:event_processed, :erlang.binary_to_term(event.data), acc}) + {:ok, acc + 1} + end) + + for i <- 0..(count - 1) do + expected_name = "Reading #{i}" + assert_received {:event_processed, %Event.PersonCreated{name: ^expected_name}, ^i} + end + + refute_receive {:event_processed, _, _} + end + + test "can be stopped" do + count = 20 + {stream, _events} = _write_events(count) + + opts = [per_page: 7] + + assert {:stopped, + %{ + stopped_on: %ExMsg.ResolvedIndexedEvent{event: %{event_number: 12}}, + last_processed: 11 + }} = + %{stopped_on: nil, last_processed: nil} + |> TestConn.reduce_events(stream, opts, fn %{event: event} = msg, acc -> + event.data + |> :erlang.binary_to_term() + |> case do + %Event.PersonCreated{name: "Reading 12"} -> + {:stop, %{acc | stopped_on: msg}} + + other -> + send(self(), {:event_processed, other}) + {:ok, %{acc | last_processed: event.event_number}} + end + end) + + for i <- 0..11 do + expected_name = "Reading #{i}" + assert_received {:event_processed, %Event.PersonCreated{name: ^expected_name}} + end + + refute_receive {:event_processed, _} + end + end + + describe "reduce backwards" do + test "returns error if stream doesn't exist" do + assert {:error, :no_stream} = + :whatever + |> TestConn.reduce_events_backwards("non_existing", fn + %{event: event}, acc -> + send(self(), {:event_processed, :erlang.binary_to_term(event.data), acc}) + {:ok, acc} + end) + + refute_receive {:event_processed, _} + end + + test "all events from start in a single batch" do + count = 20 + {stream, _events} = _write_events(count) + + opts = [per_page: 100] + + assert {:finished, 0} = + TestConn.reduce_events_backwards(20, stream, opts, fn + %{event: event}, acc -> + send(self(), {:event_processed, :erlang.binary_to_term(event.data), acc - 1}) + {:ok, acc - 1} + end) + + for i <- 0..(count - 1) do + expected_name = "Reading #{i}" + assert_received {:event_processed, %Event.PersonCreated{name: ^expected_name}, ^i} + end + + refute_receive {:event_processed, _, _} + end + + test "all events from start in a multiple batches" do + count = 20 + {stream, _events} = _write_events(count) + + opts = [per_page: 6] + + assert {:finished, 0} = + TestConn.reduce_events_backwards(20, stream, opts, fn + %{event: event}, acc -> + send(self(), {:event_processed, :erlang.binary_to_term(event.data), acc - 1}) + {:ok, acc - 1} + end) + + for i <- 0..(count - 1) do + expected_name = "Reading #{i}" + assert_received {:event_processed, %Event.PersonCreated{name: ^expected_name}, ^i} + end + + refute_receive {:event_processed, _, _} + end + + test "can be stopped" do + count = 20 + {stream, _events} = _write_events(count) + + opts = [per_page: 6] + + assert {:stopped, + %{ + stopped_on: %ExMsg.ResolvedIndexedEvent{event: %{event_number: 12}}, + last_processed: 13 + }} = + %{stopped_on: nil, last_processed: nil} + |> TestConn.reduce_events_backwards(stream, opts, fn %{event: event} = msg, acc -> + event.data + |> :erlang.binary_to_term() + |> case do + %Event.PersonCreated{name: "Reading 12"} -> + {:stop, %{acc | stopped_on: msg}} + + other -> + send(self(), {:event_processed, other}) + {:ok, %{acc | last_processed: event.event_number}} + end + end) + + for i <- (count - 1)..13//-1 do + expected_name = "Reading #{i}" + assert_received {:event_processed, %Event.PersonCreated{name: ^expected_name}} + end + + refute_receive {:event_processed, _} + end + end + + defp _gen_events(count) do + 0..(count - 1) + |> Enum.map(fn i -> + %Event.PersonCreated{name: "Reading #{i}"} + end) + end + + defp _write_events(count) do + stream = Helpers.random_stream_name() + + events = _gen_events(count) + + {:ok, %ExMsg.WriteEventsCompleted{}} = + TestConn.execute(Helpers.write_events(stream, events)) + + {stream, events} + end +end diff --git a/test/extreme_test.exs b/test/extreme_test.exs index 958dfde..1f972a6 100644 --- a/test/extreme_test.exs +++ b/test/extreme_test.exs @@ -79,6 +79,74 @@ defmodule ExtremeTest do }} = TestConn.execute(Helpers.write_events()) end + test "setting metadata works" do + # write 2 events + stream_name = "extreme_test-" <> to_string(UUID.uuid4()) + # IO.inspect("http://localhost:2113/web/index.html#/streams/" <> stream_name) + + events_1 = [ + %Event.PersonCreated{name: "First round"}, + %Event.PersonChangedName{name: "First round change"} + ] + + assert {:ok, %ExMsg.WriteEventsCompleted{result: :success}} = + TestConn.execute(Helpers.write_events(stream_name, events_1)) + + assert {:error, :no_stream} == TestConn.get_metadata(stream_name) + + # set max age for stream + max_age_seconds = 5 + + metadata = + %{ + "$maxAge" => max_age_seconds, + # "$tb" => 3, + # + # custom metadata is ok + "processed" => true + } + + assert :ok = TestConn.set_metadata(stream_name, metadata) + assert {:ok, ^metadata} = TestConn.get_metadata(stream_name) + + # make sure events are still evailable + {:ok, %ExMsg.ReadStreamEventsCompleted{events: read_events, is_end_of_stream: true}} = + TestConn.execute(Helpers.read_events(stream_name, 0, 20)) + + assert events_1 == + Enum.map(read_events, fn event -> :erlang.binary_to_term(event.event.data) end) + + # wait for max age to expire and then scavenge db + Process.sleep(10_000) + + assert :ok = TestConn.scavenge_database() + + # make sure no events left in stream + {:ok, %ExMsg.ReadStreamEventsCompleted{events: [], is_end_of_stream: true}} = + TestConn.execute(Helpers.read_events(stream_name, 0, 20)) + + # Remove max age is not good idea, because ES eventualy deletes data! + # Even scavenge is not guarantee old ones will be deleted immediately + # assert {:ok, %ExMsg.WriteEventsCompleted{result: :success}} = + # TestConn.execute(write_metadata(stream_name, %{})) + + # write 2 new events + events_2 = [ + %Event.PersonCreated{name: "Second round"}, + %Event.PersonChangedName{name: "Second round change"} + ] + + assert {:ok, %ExMsg.WriteEventsCompleted{result: :success}} = + TestConn.execute(Helpers.write_events(stream_name, events_2)) + + # make sure we have only new events + {:ok, %ExMsg.ReadStreamEventsCompleted{events: read_events_2, is_end_of_stream: true}} = + TestConn.execute(Helpers.read_events(stream_name, 0, 20)) + + assert events_2 == + Enum.map(read_events_2, fn event -> :erlang.binary_to_term(event.event.data) end) + end + test "for existing stream is success" do stream = Helpers.random_stream_name()