From 790dc2adf926d18b08b7562ea5dfcf7644ed5ae6 Mon Sep 17 00:00:00 2001 From: Burmaja Milan Date: Wed, 16 Apr 2025 17:17:31 +0200 Subject: [PATCH 1/2] Provide state in on_init callback --- lib/listener_with_backpressure.ex | 24 ++++++++++------ mix.lock | 16 +++++------ test/extreme_test.exs | 5 +++- test/listener_with_backpressure_test.exs | 35 ++++++++++++++++++++++++ 4 files changed, 62 insertions(+), 18 deletions(-) diff --git a/lib/listener_with_backpressure.ex b/lib/listener_with_backpressure.ex index c8a7922..883b171 100644 --- a/lib/listener_with_backpressure.ex +++ b/lib/listener_with_backpressure.ex @@ -1,5 +1,5 @@ defmodule Extreme.ListenerWithBackPressure do - @moduledoc ~S""" + @moduledoc """ The same as `Extreme.Listener` but uses event_producer functionality which applies backpressure on live events as well. @@ -7,7 +7,7 @@ defmodule Extreme.ListenerWithBackPressure do and creates new subscription when all buffered events are processed. """ - @callback on_init(opts_from_start_link :: Keyword.t()) :: :ok | {:ok, client_state :: any()} + @callback on_init(opts_from_start_link :: Keyword.t()) :: {:ok, client_state :: any()} @callback get_last_event(stream_name :: String.t(), client_state :: any()) :: last_event :: integer() @callback process_push( @@ -18,6 +18,7 @@ defmodule Extreme.ListenerWithBackPressure do {:ok, event_number :: non_neg_integer()} | :ok | :stop + @callback on_caught_up(client_state :: any()) :: any() defmacro __using__(_) do quote location: :keep do @@ -59,13 +60,10 @@ defmodule Extreme.ListenerWithBackPressure do @impl GenServer def init({extreme, opts}) do - client_state = + {:ok, client_state} = opts + |> Keyword.put(:extreme, extreme) |> on_init() - |> case do - :ok -> %{} - {:ok, client_state} -> client_state - end stream_name = Keyword.fetch!(opts, :stream) last_event = fn -> get_last_event(stream_name, client_state) end @@ -119,13 +117,21 @@ defmodule Extreme.ListenerWithBackPressure do end @impl GenServer + def handle_info(:caught_up, %{} = state) do + on_caught_up(state.client_state) + {:noreply, state} + end + def handle_info(_, %{} = state), do: {:noreply, state} @impl Extreme.ListenerWithBackPressure - def on_init(_), do: :ok + def on_init(_), do: {:ok, %{}} + + @impl Extreme.ListenerWithBackPressure + def on_caught_up(_), do: :ok - defoverridable on_init: 1 + defoverridable on_init: 1, on_caught_up: 1 end end end diff --git a/mix.lock b/mix.lock index 7267eb2..ffbb08c 100644 --- a/mix.lock +++ b/mix.lock @@ -1,20 +1,20 @@ %{ "earmark": {:hex, :earmark, "1.4.4", "4821b8d05cda507189d51f2caeef370cf1e18ca5d7dfb7d31e9cafe6688106a4", [:mix], [], "hexpm", "1f93aba7340574847c0f609da787f0d79efcab51b044bb6e242cae5aca9d264d"}, - "earmark_parser": {:hex, :earmark_parser, "1.4.41", "ab34711c9dc6212dda44fcd20ecb87ac3f3fce6f0ca2f28d4a00e4154f8cd599", [:mix], [], "hexpm", "a81a04c7e34b6617c2792e291b5a2e57ab316365c2644ddc553bb9ed863ebefa"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.44", "f20830dd6b5c77afe2b063777ddbbff09f9759396500cdbe7523efd58d7a339c", [:mix], [], "hexpm", "4778ac752b4701a5599215f7030989c989ffdc4f6df457c5f36938cc2d2a2750"}, "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, - "ex_doc": {:hex, :ex_doc, "0.34.2", "13eedf3844ccdce25cfd837b99bea9ad92c4e511233199440488d217c92571e8", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "5ce5f16b41208a50106afed3de6a2ed34f4acfd65715b82a0b84b49d995f95c1"}, + "ex_doc": {:hex, :ex_doc, "0.37.3", "f7816881a443cd77872b7d6118e8a55f547f49903aef8747dbcb345a75b462f9", [:mix], [{:earmark_parser, "~> 1.4.42", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "e6aebca7156e7c29b5da4daa17f6361205b2ae5f26e5c7d8ca0d3f7e18972233"}, "exactor": {:hex, :exactor, "2.2.4", "5efb4ddeb2c48d9a1d7c9b465a6fffdd82300eb9618ece5d34c3334d5d7245b1", [:mix], [], "hexpm", "1222419f706e01bfa1095aec9acf6421367dcfab798a6f67c54cf784733cd6b5"}, "exjsx": {:hex, :exjsx, "4.0.0", "60548841e0212df401e38e63c0078ec57b33e7ea49b032c796ccad8cde794b5c", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm", "32e95820a97cffea67830e91514a2ad53b888850442d6d395f53a1ac60c82e07"}, "exprotobuf": {:hex, :exprotobuf, "1.2.17", "3003937da617f588a8fb63ebdd7b127a18d78d6502623c272076fd54c07c4de1", [:mix], [{:gpb, "~> 4.0", [hex: :gpb, repo: "hexpm", optional: false]}], "hexpm", "e07ec1e5ae6f8c1c8521450d5f6b658c8c700b1f34c70356e91ece0766f4361a"}, - "exvcr": {:hex, :exvcr, "0.15.2", "2216c8605b5c3e300160c2a5bd896b4928fa51fc3fb3420d3e792ad833ac89ba", [:mix], [{:exactor, "~> 2.2", [hex: :exactor, repo: "hexpm", optional: false]}, {:exjsx, "~> 4.0", [hex: :exjsx, repo: "hexpm", optional: false]}, {:finch, "~> 0.16", [hex: :finch, repo: "hexpm", optional: true]}, {:httpoison, "~> 1.0 or ~> 2.0", [hex: :httpoison, repo: "hexpm", optional: true]}, {:httpotion, "~> 3.1", [hex: :httpotion, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.0", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:meck, "~> 0.8", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "2bd4125889bd3953d7fbb7b388c34190c31e292f12896da56ecf0743d40439ed"}, - "gpb": {:hex, :gpb, "4.21.1", "72e229c242d252d690addcfd04a6416c26c4d4d2c3521e05570a7a78b48d3bd1", [:make, :rebar3], [], "hexpm", "c05c9aea9e25bd341367a43b3d3eb68e951563911072259c5ec4cb6642f4ef22"}, + "exvcr": {:hex, :exvcr, "0.17.0", "ca7f83f08d378d601b3082c60b0550544ec54241317e2bdb6361e2925c023532", [:mix], [{:exjsx, "~> 4.0", [hex: :exjsx, repo: "hexpm", optional: false]}, {:finch, "~> 0.16", [hex: :finch, repo: "hexpm", optional: true]}, {:httpoison, "~> 1.0 or ~> 2.0", [hex: :httpoison, repo: "hexpm", optional: true]}, {:httpotion, "~> 3.1", [hex: :httpotion, repo: "hexpm", optional: true]}, {:ibrowse, "~> 4.4", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:meck, "~> 1.0", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "093561e523d170ad2d48e8afef213b070f75821d51102d582f2a1680891c92b4"}, + "gpb": {:hex, :gpb, "4.21.3", "99cbd5bce894e9287a8f351210afdcd9d4b413087118a0d738b12a2d67d97591", [:make, :rebar3], [], "hexpm", "f31bf9ef24450c978e2010bcec12272049e14f122511a7ae1488aeba8a835158"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "jsx": {:hex, :jsx, "2.8.3", "a05252d381885240744d955fbe3cf810504eb2567164824e19303ea59eef62cf", [:mix, :rebar3], [], "hexpm", "fc3499fed7a726995aa659143a248534adc754ebd16ccd437cd93b649a95091f"}, "makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"}, - "makeup_elixir": {:hex, :makeup_elixir, "1.0.0", "74bb8348c9b3a51d5c589bf5aebb0466a84b33274150e3b6ece1da45584afc82", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "49159b7d7d999e836bedaf09dcf35ca18b312230cf901b725a64f3f42e407983"}, - "makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"}, - "meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"}, - "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, + "makeup_elixir": {:hex, :makeup_elixir, "1.0.1", "e928a4f984e795e41e3abd27bfc09f51db16ab8ba1aebdba2b3a575437efafc2", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "7284900d412a3e5cfd97fdaed4f5ed389b8f2b4cb49efc0eb3bd10e2febf9507"}, + "makeup_erlang": {:hex, :makeup_erlang, "1.0.2", "03e1804074b3aa64d5fad7aa64601ed0fb395337b982d9bcf04029d68d51b6a7", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "af33ff7ef368d5893e4a267933e7744e46ce3cf1f61e2dccf53a111ed3aa3727"}, + "meck": {:hex, :meck, "1.0.0", "24676cb6ee6951530093a93edcd410cfe4cb59fe89444b875d35c9d3909a15d0", [:rebar3], [], "hexpm", "680a9bcfe52764350beb9fb0335fb75fee8e7329821416cee0a19fec35433882"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"}, "protobuf": {:hex, :protobuf, "0.5.4", "2e1b8eec211aff034ad8a14e3674220b0158bfb9a3c7128ac9d2a1ed1b3724d3", [:mix], [], "hexpm"}, "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, } diff --git a/test/extreme_test.exs b/test/extreme_test.exs index c92d058..958dfde 100644 --- a/test/extreme_test.exs +++ b/test/extreme_test.exs @@ -475,7 +475,10 @@ defmodule ExtremeTest do test "that doesn't exist is ok" do stream = Helpers.random_stream_name() - {:ok, %ExMsg.DeleteStreamCompleted{}} = + # newer ES versions return an error: + # {:ok, %ExMsg.DeleteStreamCompleted{}} = + {:error, :wrong_expected_version, + %Extreme.Messages.DeleteStreamCompleted{prepare_position: -1, commit_position: -1}} = TestConn.execute(Helpers.delete_stream(stream, false)) end diff --git a/test/listener_with_backpressure_test.exs b/test/listener_with_backpressure_test.exs index 1a74767..b38b29c 100644 --- a/test/listener_with_backpressure_test.exs +++ b/test/listener_with_backpressure_test.exs @@ -130,6 +130,41 @@ defmodule Extreme.ListenerWithBackPressureTest do Helpers.assert_no_leaks(TestConn) end + test "timeout is per event, not per batch" do + sleep = 5_001 + stream = Helpers.random_stream_name() + event1 = %Event.SlowProcessingEventHappened{sleep: sleep} + event2 = %Event.SlowProcessingEventHappened{sleep: sleep} + + assert DB.get_last_event(MyListenerWithBackPressure, stream) == -1 + + # write 2 events to stream + {:ok, %ExMsg.WriteEventsCompleted{}} = + TestConn.execute(Helpers.write_events(stream, [event1, event2])) + + # run listener and expect it to read them + {:ok, listener} = + MyListenerWithBackPressure.start_link(TestConn, stream, + read_per_page: 2, + ack_timeout: sleep + 1_000 + ) + + refute_receive {:processing_push, _event_type, _event}, sleep - 1 + assert_receive {:processing_push, event_type, event} + assert event_type == "Elixir.ExtremeTest.Events.SlowProcessingEventHappened" + assert event1 == :erlang.binary_to_term(event) + + refute_receive {:processing_push, _event_type, _event}, sleep - 1 + assert_receive {:processing_push, event_type, event} + assert event_type == "Elixir.ExtremeTest.Events.SlowProcessingEventHappened" + assert event2 == :erlang.binary_to_term(event) + + assert DB.get_last_event(MyListenerWithBackPressure, stream) == 1 + + :ok = MyListenerWithBackPressure.unsubscribe(listener) + Helpers.assert_no_leaks(TestConn) + end + test "subscribe/unsubscribe" do stream = Helpers.random_stream_name() event1 = %Event.PersonCreated{name: "Pera"} From 9dea4852bd86e3902f1604678180d355357c6bd6 Mon Sep 17 00:00:00 2001 From: Burmaja Milan Date: Thu, 17 Apr 2025 12:43:22 +0200 Subject: [PATCH 2/2] Stop sending read events to listener if it returned :stop while processing previous event --- CHANGELOG.md | 38 ++++++++++++++++++++--------- lib/extreme/reading_subscription.ex | 8 +++--- mix.exs | 2 +- 3 files changed, 31 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b02e322..490f542 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,18 @@ -# Changelog for extreme v1.1.0 +# v1.1.1 + +## Breaking changes + +- `ListenerWithBackPressure.on_init/1` doesn't support `:ok` result anymore, it must return `{:ok, client_state}` + +## New + +- add `on_caught_up(client_state)` callback to ListenerWithBackPressure + +## Improvements + +- Stop sending already read events to subscriber, if it returned `:stop` from processing previous one. + +# v1.1.0 ## Breaking changes @@ -9,50 +23,50 @@ - Add event_producer functionality for module that uses `Extreme`. - Add option to exclude catch all `handle_info` when using `Extreme.Listener` -# Changelog for extreme v1.0.7 +# v1.0.7 - Fix stoping subscription if `on_event` callback returns `:stop` -# Changelog for extreme v1.0.6 +# v1.0.6 - Add `Listener.subscribed?/1` function -# Changelog for extreme v1.0.5 +# v1.0.5 - RequestManager buffers live events received after subscription is created, but before it is registered -# Changelog for extreme v1.0.4 +# v1.0.4 - Listener.process_push callback can return `:stop`, meaning subscription should be stopped and pushes that are already in mailbox should be purged. -# Changelog for extreme v1.0.3 +# v1.0.3 - Add subscribe/unsubscribe and auto_subscribe? option for starting `Extreme.Listener` -# Changelog for extreme v1.0.0-beta2 +# v1.0.0-beta2 - Restart all subscriptions and subscribers/listeners when connection receives :tcp_closed -# Changelog for extreme v0.13.1 +# v0.13.1 - Dependency version upgrades -# Changelog for extreme v0.13.0 +# v0.13.0 - Support Elixir 1.7.0 and OTP 21.0 - Listener reads events in chunks of 500 events (instead of 4096) -# Changelog for extreme v0.11.0 +# v0.11.0 - Added support for EventStore 4 -# Changelog for extreme v0.10.4 +# v0.10.4 - Fixed issue with concurrent read and write where messages get stuck into extreme process state -# Changelog for extreme v0.10.3 +# v0.10.3 - Extreme.Listener - if get_last_event/1 returns `:from_now`, catching events will start from current event diff --git a/lib/extreme/reading_subscription.ex b/lib/extreme/reading_subscription.ex index 8efb75e..7655373 100644 --- a/lib/extreme/reading_subscription.ex +++ b/lib/extreme/reading_subscription.ex @@ -141,23 +141,23 @@ defmodule Extreme.ReadingSubscription do Logger.debug(fn -> "Last read event: #{inspect(response.next_event_number - 1)}" end) response.events - |> Enum.any?(fn msg -> + |> Enum.reduce_while(false, fn msg, _ -> state.subscriber |> Shared.on_event(msg, state.read_params.ack_timeout) |> case do :ok -> - false + {:cont, false} :stop -> Logger.info("Processing of read message is stopped") - true + {:halt, true} error -> Logger.warning( "Processing of buffered message didn't succeed: #{inspect([msg, error])}" ) - true + {:halt, true} end end) # One event processing didn't succeed (maybe we got `:stop`) diff --git a/mix.exs b/mix.exs index 1a35602..ce1472d 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Extreme.Mixfile do def project do [ app: :extreme, - version: "1.1.0", + version: "1.1.1", elixir: "~> 1.11", elixirc_paths: _elixirc_paths(Mix.env()), source_url: "https://github.com/exponentially/extreme",