From b750116e4b3566a589521aeef41bd4611ed5b9d3 Mon Sep 17 00:00:00 2001 From: Cees de Groot Date: Thu, 15 Feb 2024 14:56:48 -0500 Subject: [PATCH 1/4] Retry on timeouts --- lib/commanded/commands/dispatcher.ex | 11 +++++++++-- test/commands/command_timeout_test.exs | 2 +- test/middleware/middleware_test.exs | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/lib/commanded/commands/dispatcher.ex b/lib/commanded/commands/dispatcher.ex index 6266bb9a..c5fb5d24 100644 --- a/lib/commanded/commands/dispatcher.ex +++ b/lib/commanded/commands/dispatcher.ex @@ -130,6 +130,11 @@ defmodule Commanded.Commands.Dispatcher do # Maybe retry command when aggregate process not found on a remote node maybe_retry(pipeline, payload, context) + {:error, :aggregate_execution_timeout} -> + # The main reason for a timeout is that aggregate loading is slow, so retrying + # is expected to help. + maybe_retry(pipeline, payload, context) + {:error, error} -> pipeline |> Pipeline.respond({:error, error}) @@ -239,8 +244,10 @@ defmodule Commanded.Commands.Dispatcher do {:ok, context} -> execute(pipeline, payload, context) - reply -> - reply + {:error, :too_many_attempts} = error -> + pipeline + |> Pipeline.respond(error) + |> after_failure(payload) end end end diff --git a/test/commands/command_timeout_test.exs b/test/commands/command_timeout_test.exs index fe16f86d..67673c3d 100644 --- a/test/commands/command_timeout_test.exs +++ b/test/commands/command_timeout_test.exs @@ -16,7 +16,7 @@ defmodule Commanded.Commands.CommandTimeoutTest do # Handler is set to take longer than the configured timeout case TimeoutRouter.dispatch(command, application: DefaultApp) do {:error, :aggregate_execution_failed} -> :ok - {:error, :aggregate_execution_timeout} -> :ok + {:error, :too_many_attempts} -> :ok reply -> flunk("received an unexpected response: #{inspect(reply)}") end end diff --git a/test/middleware/middleware_test.exs b/test/middleware/middleware_test.exs index d2858c09..37019c1b 100644 --- a/test/middleware/middleware_test.exs +++ b/test/middleware/middleware_test.exs @@ -136,7 +136,7 @@ defmodule Commanded.Middleware.MiddlewareTest do # Force command handling to timeout so the aggregate process is terminated :ok = case Router.dispatch(command, application: DefaultApp, timeout: 50) do - {:error, :aggregate_execution_timeout} -> :ok + {:error, :too_many_attempts} -> :ok {:error, :aggregate_execution_failed} -> :ok end From 143db2f39ce1f6c7af9ea992282dd8521e9f16f6 Mon Sep 17 00:00:00 2001 From: Cees de Groot Date: Thu, 15 Feb 2024 15:23:53 -0500 Subject: [PATCH 2/4] Add Aggregate State rebuilding telemetry --- .../aggregates/aggregate_state_builder.ex | 56 ++++++++++++++++++- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/lib/commanded/aggregates/aggregate_state_builder.ex b/lib/commanded/aggregates/aggregate_state_builder.ex index 90bf39c7..b20395f9 100644 --- a/lib/commanded/aggregates/aggregate_state_builder.ex +++ b/lib/commanded/aggregates/aggregate_state_builder.ex @@ -1,9 +1,35 @@ defmodule Commanded.Aggregates.AggregateStateBuilder do + use TelemetryRegistry alias Commanded.Aggregates.Aggregate alias Commanded.EventStore alias Commanded.EventStore.RecordedEvent alias Commanded.EventStore.SnapshotData alias Commanded.Snapshotting + alias Commanded.Telemetry + + telemetry_event(%{ + event: [:commanded, :aggregate, :state_builder, :start], + description: "Emitted when an aggregate begins loading from the event store", + measurements: "%{system_time: integer()}", + metadata: """ + %{application: Commanded.Application.t(), + aggregate_uuid: String.t(), + aggregate_state: struct(), + aggregate_version: non_neg_integer()} + """ + }) + + telemetry_event(%{ + event: [:commanded, :aggregate, :state_builder, :stop], + description: "Emitted when an aggregate completes loading from the event store", + measurements: "%{duration: non_neg_integer(), count: non_neg_integer()}", + metadata: """ + %{application: Commanded.Application.t(), + aggregate_uuid: String.t(), + aggregate_state: struct(), + aggregate_version: non_neg_integer()} + """ + }) @read_event_batch_size 1_000 @@ -62,15 +88,41 @@ defmodule Commanded.Aggregates.AggregateStateBuilder do # Rebuild aggregate state from a `Stream` of its events. defp rebuild_from_event_stream(event_stream, %Aggregate{} = state) do - Enum.reduce(event_stream, state, fn event, state -> + telemetry_metadata = telemetry_metadata(state) + telemetry_prefix = [:commanded, :aggregate, :state_builder] + start_time = Telemetry.start(telemetry_prefix, telemetry_metadata) + + {state, count} = Enum.reduce(event_stream, {state, 0}, fn event, {state, count} -> %RecordedEvent{data: data, stream_version: stream_version} = event %Aggregate{aggregate_module: aggregate_module, aggregate_state: aggregate_state} = state - %Aggregate{ + state = %Aggregate{ state | aggregate_version: stream_version, aggregate_state: aggregate_module.apply(aggregate_state, data) } + {state, count + 1} end) + + Telemetry.stop(telemetry_prefix, start_time, telemetry_metadata, %{count: count}) + + state + end + + defp telemetry_metadata(%Aggregate{} = state) do + %Aggregate{ + application: application, + aggregate_uuid: aggregate_uuid, + aggregate_state: aggregate_state, + aggregate_version: aggregate_version + } = state + + %{ + application: application, + aggregate_uuid: aggregate_uuid, + aggregate_state: aggregate_state, + aggregate_version: aggregate_version, + } end + end From a58d24298f5f9e5c5de69ecf30c746e9f751616b Mon Sep 17 00:00:00 2001 From: Cees de Groot Date: Thu, 15 Feb 2024 16:22:31 -0500 Subject: [PATCH 3/4] Put telemetry on dehydration --- test/aggregates/aggregate_telemetry_test.exs | 39 +++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/test/aggregates/aggregate_telemetry_test.exs b/test/aggregates/aggregate_telemetry_test.exs index 4f0c5e7c..338c0e7b 100644 --- a/test/aggregates/aggregate_telemetry_test.exs +++ b/test/aggregates/aggregate_telemetry_test.exs @@ -192,6 +192,41 @@ defmodule Commanded.Aggregates.AggregateTelemetryTest do refute_received {[:commanded, :aggregate, :execute, :stop], _measurements, _metadata} end + + test "emit `[:commanded, :aggregate, :populate]` events", + %{aggregate_uuid: aggregate_uuid, pid: pid} do + + context = %ExecutionContext{ + command: %Ok{message: "ok"}, + function: :execute, + handler: ExampleAggregate + } + + # Send some commands, then kill the process to force a reload. + count = 3 + for _i <- 1..count do + {:ok, _version, _events} = GenServer.call(pid, {:execute_command, context}) + end + Process.exit(pid, :normal) + + # Do the reload, we should now have telemetry + start_aggregate(aggregate_uuid) + + assert_receive {[:commanded, :aggregate, :populate, :start], _measurements, _metadata} + assert_receive {[:commanded, :aggregate, :populate, :stop], measurements, metadata} + + assert match?(%{count: ^count}, measurements) + + assert match?( + %{ + aggregate_state: %ExampleAggregate{}, + aggregate_uuid: ^aggregate_uuid, + aggregate_version: ^count, + application: DefaultApp + }, + metadata + ) + end end def start_aggregate(aggregate_uuid) do @@ -207,7 +242,9 @@ defmodule Commanded.Aggregates.AggregateTelemetryTest do [ [:commanded, :aggregate, :execute, :start], [:commanded, :aggregate, :execute, :stop], - [:commanded, :aggregate, :execute, :exception] + [:commanded, :aggregate, :execute, :exception], + [:commanded, :aggregate, :populate, :start], + [:commanded, :aggregate, :populate, :stop] ], fn event_name, measurements, metadata, reply_to -> send(reply_to, {event_name, measurements, metadata}) From 85c97c3dbbbda0de816e134ed6fc6164bd060f19 Mon Sep 17 00:00:00 2001 From: Cees de Groot Date: Thu, 15 Feb 2024 16:31:46 -0500 Subject: [PATCH 4/4] Mix format --- .../aggregates/aggregate_state_builder.ex | 36 +++++++++---------- test/aggregates/aggregate_telemetry_test.exs | 3 +- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/lib/commanded/aggregates/aggregate_state_builder.ex b/lib/commanded/aggregates/aggregate_state_builder.ex index b20395f9..a6f135a3 100644 --- a/lib/commanded/aggregates/aggregate_state_builder.ex +++ b/lib/commanded/aggregates/aggregate_state_builder.ex @@ -8,7 +8,7 @@ defmodule Commanded.Aggregates.AggregateStateBuilder do alias Commanded.Telemetry telemetry_event(%{ - event: [:commanded, :aggregate, :state_builder, :start], + event: [:commanded, :aggregate, :populate, :start], description: "Emitted when an aggregate begins loading from the event store", measurements: "%{system_time: integer()}", metadata: """ @@ -20,7 +20,7 @@ defmodule Commanded.Aggregates.AggregateStateBuilder do }) telemetry_event(%{ - event: [:commanded, :aggregate, :state_builder, :stop], + event: [:commanded, :aggregate, :populate, :stop], description: "Emitted when an aggregate completes loading from the event store", measurements: "%{duration: non_neg_integer(), count: non_neg_integer()}", metadata: """ @@ -88,23 +88,24 @@ defmodule Commanded.Aggregates.AggregateStateBuilder do # Rebuild aggregate state from a `Stream` of its events. defp rebuild_from_event_stream(event_stream, %Aggregate{} = state) do - telemetry_metadata = telemetry_metadata(state) - telemetry_prefix = [:commanded, :aggregate, :state_builder] - start_time = Telemetry.start(telemetry_prefix, telemetry_metadata) + telemetry_prefix = [:commanded, :aggregate, :populate] + start_time = Telemetry.start(telemetry_prefix, telemetry_metadata(state)) - {state, count} = Enum.reduce(event_stream, {state, 0}, fn event, {state, count} -> - %RecordedEvent{data: data, stream_version: stream_version} = event - %Aggregate{aggregate_module: aggregate_module, aggregate_state: aggregate_state} = state + {state, count} = + Enum.reduce(event_stream, {state, 0}, fn event, {state, count} -> + %RecordedEvent{data: data, stream_version: stream_version} = event + %Aggregate{aggregate_module: aggregate_module, aggregate_state: aggregate_state} = state - state = %Aggregate{ - state - | aggregate_version: stream_version, - aggregate_state: aggregate_module.apply(aggregate_state, data) - } - {state, count + 1} - end) + state = %Aggregate{ + state + | aggregate_version: stream_version, + aggregate_state: aggregate_module.apply(aggregate_state, data) + } + + {state, count + 1} + end) - Telemetry.stop(telemetry_prefix, start_time, telemetry_metadata, %{count: count}) + Telemetry.stop(telemetry_prefix, start_time, telemetry_metadata(state), %{count: count}) state end @@ -121,8 +122,7 @@ defmodule Commanded.Aggregates.AggregateStateBuilder do application: application, aggregate_uuid: aggregate_uuid, aggregate_state: aggregate_state, - aggregate_version: aggregate_version, + aggregate_version: aggregate_version } end - end diff --git a/test/aggregates/aggregate_telemetry_test.exs b/test/aggregates/aggregate_telemetry_test.exs index 338c0e7b..608e6e08 100644 --- a/test/aggregates/aggregate_telemetry_test.exs +++ b/test/aggregates/aggregate_telemetry_test.exs @@ -195,7 +195,6 @@ defmodule Commanded.Aggregates.AggregateTelemetryTest do test "emit `[:commanded, :aggregate, :populate]` events", %{aggregate_uuid: aggregate_uuid, pid: pid} do - context = %ExecutionContext{ command: %Ok{message: "ok"}, function: :execute, @@ -204,9 +203,11 @@ defmodule Commanded.Aggregates.AggregateTelemetryTest do # Send some commands, then kill the process to force a reload. count = 3 + for _i <- 1..count do {:ok, _version, _events} = GenServer.call(pid, {:execute_command, context}) end + Process.exit(pid, :normal) # Do the reload, we should now have telemetry