Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
# Changelog for extreme v1.1.0
# v1.1.1

## Breaking changes

- `ListenerWithBackPressure.on_init/1` doesn't support `:ok` result anymore, it must return `{:ok, client_state}`

## New

- add `on_caught_up(client_state)` callback to ListenerWithBackPressure

## Improvements

- Stop sending already read events to subscriber, if it returned `:stop` from processing previous one.

# v1.1.0

## Breaking changes

Expand All @@ -9,50 +23,50 @@
- Add event_producer functionality for module that uses `Extreme`.
- Add option to exclude catch all `handle_info` when using `Extreme.Listener`

# Changelog for extreme v1.0.7
# v1.0.7

- Fix stoping subscription if `on_event` callback returns `:stop`

# Changelog for extreme v1.0.6
# v1.0.6

- Add `Listener.subscribed?/1` function

# Changelog for extreme v1.0.5
# v1.0.5

- RequestManager buffers live events received after subscription is created,
but before it is registered

# Changelog for extreme v1.0.4
# v1.0.4

- Listener.process_push callback can return `:stop`, meaning subscription should be stopped
and pushes that are already in mailbox should be purged.

# Changelog for extreme v1.0.3
# v1.0.3

- Add subscribe/unsubscribe and auto_subscribe? option for starting `Extreme.Listener`

# Changelog for extreme v1.0.0-beta2
# v1.0.0-beta2

- Restart all subscriptions and subscribers/listeners when connection receives :tcp_closed

# Changelog for extreme v0.13.1
# v0.13.1

- Dependency version upgrades

# Changelog for extreme v0.13.0
# v0.13.0

- Support Elixir 1.7.0 and OTP 21.0
- Listener reads events in chunks of 500 events (instead of 4096)

# Changelog for extreme v0.11.0
# v0.11.0

- Added support for EventStore 4

# Changelog for extreme v0.10.4
# v0.10.4

- Fixed issue with concurrent read and write where messages get stuck into extreme process state

# Changelog for extreme v0.10.3
# v0.10.3

- Extreme.Listener - if get_last_event/1 returns `:from_now`, catching events will start from current event

Expand Down
8 changes: 4 additions & 4 deletions lib/extreme/reading_subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -141,23 +141,23 @@ defmodule Extreme.ReadingSubscription do
Logger.debug(fn -> "Last read event: #{inspect(response.next_event_number - 1)}" end)

response.events
|> Enum.any?(fn msg ->
|> Enum.reduce_while(false, fn msg, _ ->
state.subscriber
|> Shared.on_event(msg, state.read_params.ack_timeout)
|> case do
:ok ->
false
{:cont, false}

:stop ->
Logger.info("Processing of read message is stopped")
true
{:halt, true}

error ->
Logger.warning(
"Processing of buffered message didn't succeed: #{inspect([msg, error])}"
)

true
{:halt, true}
end
end)
# One event processing didn't succeed (maybe we got `:stop`)
Expand Down
24 changes: 15 additions & 9 deletions lib/listener_with_backpressure.ex
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
defmodule Extreme.ListenerWithBackPressure do
@moduledoc ~S"""
@moduledoc """
The same as `Extreme.Listener` but uses event_producer functionality which applies backpressure
on live events as well.

The way it works is that there's intermediate process which turns off subscription when `max_buffer` is reached
and creates new subscription when all buffered events are processed.
"""

@callback on_init(opts_from_start_link :: Keyword.t()) :: :ok | {:ok, client_state :: any()}
@callback on_init(opts_from_start_link :: Keyword.t()) :: {:ok, client_state :: any()}
@callback get_last_event(stream_name :: String.t(), client_state :: any()) ::
last_event :: integer()
@callback process_push(
Expand All @@ -18,6 +18,7 @@ defmodule Extreme.ListenerWithBackPressure do
{:ok, event_number :: non_neg_integer()}
| :ok
| :stop
@callback on_caught_up(client_state :: any()) :: any()

defmacro __using__(_) do
quote location: :keep do
Expand Down Expand Up @@ -59,13 +60,10 @@ defmodule Extreme.ListenerWithBackPressure do

@impl GenServer
def init({extreme, opts}) do
client_state =
{:ok, client_state} =
opts
|> Keyword.put(:extreme, extreme)
|> on_init()
|> case do
:ok -> %{}
{:ok, client_state} -> client_state
end

stream_name = Keyword.fetch!(opts, :stream)
last_event = fn -> get_last_event(stream_name, client_state) end
Expand Down Expand Up @@ -119,13 +117,21 @@ defmodule Extreme.ListenerWithBackPressure do
end

@impl GenServer
def handle_info(:caught_up, %{} = state) do
on_caught_up(state.client_state)
{:noreply, state}
end

def handle_info(_, %{} = state),
do: {:noreply, state}

@impl Extreme.ListenerWithBackPressure
def on_init(_), do: :ok
def on_init(_), do: {:ok, %{}}

@impl Extreme.ListenerWithBackPressure
def on_caught_up(_), do: :ok

defoverridable on_init: 1
defoverridable on_init: 1, on_caught_up: 1
end
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Extreme.Mixfile do
def project do
[
app: :extreme,
version: "1.1.0",
version: "1.1.1",
elixir: "~> 1.11",
elixirc_paths: _elixirc_paths(Mix.env()),
source_url: "https://github.com/exponentially/extreme",
Expand Down
16 changes: 8 additions & 8 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
%{
"earmark": {:hex, :earmark, "1.4.4", "4821b8d05cda507189d51f2caeef370cf1e18ca5d7dfb7d31e9cafe6688106a4", [:mix], [], "hexpm", "1f93aba7340574847c0f609da787f0d79efcab51b044bb6e242cae5aca9d264d"},
"earmark_parser": {:hex, :earmark_parser, "1.4.41", "ab34711c9dc6212dda44fcd20ecb87ac3f3fce6f0ca2f28d4a00e4154f8cd599", [:mix], [], "hexpm", "a81a04c7e34b6617c2792e291b5a2e57ab316365c2644ddc553bb9ed863ebefa"},
"earmark_parser": {:hex, :earmark_parser, "1.4.44", "f20830dd6b5c77afe2b063777ddbbff09f9759396500cdbe7523efd58d7a339c", [:mix], [], "hexpm", "4778ac752b4701a5599215f7030989c989ffdc4f6df457c5f36938cc2d2a2750"},
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
"ex_doc": {:hex, :ex_doc, "0.34.2", "13eedf3844ccdce25cfd837b99bea9ad92c4e511233199440488d217c92571e8", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "5ce5f16b41208a50106afed3de6a2ed34f4acfd65715b82a0b84b49d995f95c1"},
"ex_doc": {:hex, :ex_doc, "0.37.3", "f7816881a443cd77872b7d6118e8a55f547f49903aef8747dbcb345a75b462f9", [:mix], [{:earmark_parser, "~> 1.4.42", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "e6aebca7156e7c29b5da4daa17f6361205b2ae5f26e5c7d8ca0d3f7e18972233"},
"exactor": {:hex, :exactor, "2.2.4", "5efb4ddeb2c48d9a1d7c9b465a6fffdd82300eb9618ece5d34c3334d5d7245b1", [:mix], [], "hexpm", "1222419f706e01bfa1095aec9acf6421367dcfab798a6f67c54cf784733cd6b5"},
"exjsx": {:hex, :exjsx, "4.0.0", "60548841e0212df401e38e63c0078ec57b33e7ea49b032c796ccad8cde794b5c", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm", "32e95820a97cffea67830e91514a2ad53b888850442d6d395f53a1ac60c82e07"},
"exprotobuf": {:hex, :exprotobuf, "1.2.17", "3003937da617f588a8fb63ebdd7b127a18d78d6502623c272076fd54c07c4de1", [:mix], [{:gpb, "~> 4.0", [hex: :gpb, repo: "hexpm", optional: false]}], "hexpm", "e07ec1e5ae6f8c1c8521450d5f6b658c8c700b1f34c70356e91ece0766f4361a"},
"exvcr": {:hex, :exvcr, "0.15.2", "2216c8605b5c3e300160c2a5bd896b4928fa51fc3fb3420d3e792ad833ac89ba", [:mix], [{:exactor, "~> 2.2", [hex: :exactor, repo: "hexpm", optional: false]}, {:exjsx, "~> 4.0", [hex: :exjsx, repo: "hexpm", optional: false]}, {:finch, "~> 0.16", [hex: :finch, repo: "hexpm", optional: true]}, {:httpoison, "~> 1.0 or ~> 2.0", [hex: :httpoison, repo: "hexpm", optional: true]}, {:httpotion, "~> 3.1", [hex: :httpotion, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.0", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:meck, "~> 0.8", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "2bd4125889bd3953d7fbb7b388c34190c31e292f12896da56ecf0743d40439ed"},
"gpb": {:hex, :gpb, "4.21.1", "72e229c242d252d690addcfd04a6416c26c4d4d2c3521e05570a7a78b48d3bd1", [:make, :rebar3], [], "hexpm", "c05c9aea9e25bd341367a43b3d3eb68e951563911072259c5ec4cb6642f4ef22"},
"exvcr": {:hex, :exvcr, "0.17.0", "ca7f83f08d378d601b3082c60b0550544ec54241317e2bdb6361e2925c023532", [:mix], [{:exjsx, "~> 4.0", [hex: :exjsx, repo: "hexpm", optional: false]}, {:finch, "~> 0.16", [hex: :finch, repo: "hexpm", optional: true]}, {:httpoison, "~> 1.0 or ~> 2.0", [hex: :httpoison, repo: "hexpm", optional: true]}, {:httpotion, "~> 3.1", [hex: :httpotion, repo: "hexpm", optional: true]}, {:ibrowse, "~> 4.4", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:meck, "~> 1.0", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "093561e523d170ad2d48e8afef213b070f75821d51102d582f2a1680891c92b4"},
"gpb": {:hex, :gpb, "4.21.3", "99cbd5bce894e9287a8f351210afdcd9d4b413087118a0d738b12a2d67d97591", [:make, :rebar3], [], "hexpm", "f31bf9ef24450c978e2010bcec12272049e14f122511a7ae1488aeba8a835158"},
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
"jsx": {:hex, :jsx, "2.8.3", "a05252d381885240744d955fbe3cf810504eb2567164824e19303ea59eef62cf", [:mix, :rebar3], [], "hexpm", "fc3499fed7a726995aa659143a248534adc754ebd16ccd437cd93b649a95091f"},
"makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"},
"makeup_elixir": {:hex, :makeup_elixir, "1.0.0", "74bb8348c9b3a51d5c589bf5aebb0466a84b33274150e3b6ece1da45584afc82", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "49159b7d7d999e836bedaf09dcf35ca18b312230cf901b725a64f3f42e407983"},
"makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"},
"meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"},
"makeup_elixir": {:hex, :makeup_elixir, "1.0.1", "e928a4f984e795e41e3abd27bfc09f51db16ab8ba1aebdba2b3a575437efafc2", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "7284900d412a3e5cfd97fdaed4f5ed389b8f2b4cb49efc0eb3bd10e2febf9507"},
"makeup_erlang": {:hex, :makeup_erlang, "1.0.2", "03e1804074b3aa64d5fad7aa64601ed0fb395337b982d9bcf04029d68d51b6a7", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "af33ff7ef368d5893e4a267933e7744e46ce3cf1f61e2dccf53a111ed3aa3727"},
"meck": {:hex, :meck, "1.0.0", "24676cb6ee6951530093a93edcd410cfe4cb59fe89444b875d35c9d3909a15d0", [:rebar3], [], "hexpm", "680a9bcfe52764350beb9fb0335fb75fee8e7329821416cee0a19fec35433882"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"},
"protobuf": {:hex, :protobuf, "0.5.4", "2e1b8eec211aff034ad8a14e3674220b0158bfb9a3c7128ac9d2a1ed1b3724d3", [:mix], [], "hexpm"},
"telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"},
}
5 changes: 4 additions & 1 deletion test/extreme_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,10 @@ defmodule ExtremeTest do
test "that doesn't exist is ok" do
stream = Helpers.random_stream_name()

{:ok, %ExMsg.DeleteStreamCompleted{}} =
# newer ES versions return an error:
# {:ok, %ExMsg.DeleteStreamCompleted{}} =
{:error, :wrong_expected_version,
%Extreme.Messages.DeleteStreamCompleted{prepare_position: -1, commit_position: -1}} =
TestConn.execute(Helpers.delete_stream(stream, false))
end

Expand Down
35 changes: 35 additions & 0 deletions test/listener_with_backpressure_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,41 @@ defmodule Extreme.ListenerWithBackPressureTest do
Helpers.assert_no_leaks(TestConn)
end

test "timeout is per event, not per batch" do
sleep = 5_001
stream = Helpers.random_stream_name()
event1 = %Event.SlowProcessingEventHappened{sleep: sleep}
event2 = %Event.SlowProcessingEventHappened{sleep: sleep}

assert DB.get_last_event(MyListenerWithBackPressure, stream) == -1

# write 2 events to stream
{:ok, %ExMsg.WriteEventsCompleted{}} =
TestConn.execute(Helpers.write_events(stream, [event1, event2]))

# run listener and expect it to read them
{:ok, listener} =
MyListenerWithBackPressure.start_link(TestConn, stream,
read_per_page: 2,
ack_timeout: sleep + 1_000
)

refute_receive {:processing_push, _event_type, _event}, sleep - 1
assert_receive {:processing_push, event_type, event}
assert event_type == "Elixir.ExtremeTest.Events.SlowProcessingEventHappened"
assert event1 == :erlang.binary_to_term(event)

refute_receive {:processing_push, _event_type, _event}, sleep - 1
assert_receive {:processing_push, event_type, event}
assert event_type == "Elixir.ExtremeTest.Events.SlowProcessingEventHappened"
assert event2 == :erlang.binary_to_term(event)

assert DB.get_last_event(MyListenerWithBackPressure, stream) == 1

:ok = MyListenerWithBackPressure.unsubscribe(listener)
Helpers.assert_no_leaks(TestConn)
end

test "subscribe/unsubscribe" do
stream = Helpers.random_stream_name()
event1 = %Event.PersonCreated{name: "Pera"}
Expand Down
Loading