diff --git a/lib/commanded/aggregates/aggregate_state_builder.ex b/lib/commanded/aggregates/aggregate_state_builder.ex index 90bf39c7..a6f135a3 100644 --- a/lib/commanded/aggregates/aggregate_state_builder.ex +++ b/lib/commanded/aggregates/aggregate_state_builder.ex @@ -1,9 +1,35 @@ defmodule Commanded.Aggregates.AggregateStateBuilder do + use TelemetryRegistry alias Commanded.Aggregates.Aggregate alias Commanded.EventStore alias Commanded.EventStore.RecordedEvent alias Commanded.EventStore.SnapshotData alias Commanded.Snapshotting + alias Commanded.Telemetry + + telemetry_event(%{ + event: [:commanded, :aggregate, :populate, :start], + description: "Emitted when an aggregate begins loading from the event store", + measurements: "%{system_time: integer()}", + metadata: """ + %{application: Commanded.Application.t(), + aggregate_uuid: String.t(), + aggregate_state: struct(), + aggregate_version: non_neg_integer()} + """ + }) + + telemetry_event(%{ + event: [:commanded, :aggregate, :populate, :stop], + description: "Emitted when an aggregate completes loading from the event store", + measurements: "%{duration: non_neg_integer(), count: non_neg_integer()}", + metadata: """ + %{application: Commanded.Application.t(), + aggregate_uuid: String.t(), + aggregate_state: struct(), + aggregate_version: non_neg_integer()} + """ + }) @read_event_batch_size 1_000 @@ -62,15 +88,41 @@ defmodule Commanded.Aggregates.AggregateStateBuilder do # Rebuild aggregate state from a `Stream` of its events. defp rebuild_from_event_stream(event_stream, %Aggregate{} = state) do - Enum.reduce(event_stream, state, fn event, state -> - %RecordedEvent{data: data, stream_version: stream_version} = event - %Aggregate{aggregate_module: aggregate_module, aggregate_state: aggregate_state} = state + telemetry_prefix = [:commanded, :aggregate, :populate] + start_time = Telemetry.start(telemetry_prefix, telemetry_metadata(state)) - %Aggregate{ - state - | aggregate_version: stream_version, - aggregate_state: aggregate_module.apply(aggregate_state, data) - } - end) + {state, count} = + Enum.reduce(event_stream, {state, 0}, fn event, {state, count} -> + %RecordedEvent{data: data, stream_version: stream_version} = event + %Aggregate{aggregate_module: aggregate_module, aggregate_state: aggregate_state} = state + + state = %Aggregate{ + state + | aggregate_version: stream_version, + aggregate_state: aggregate_module.apply(aggregate_state, data) + } + + {state, count + 1} + end) + + Telemetry.stop(telemetry_prefix, start_time, telemetry_metadata(state), %{count: count}) + + state + end + + defp telemetry_metadata(%Aggregate{} = state) do + %Aggregate{ + application: application, + aggregate_uuid: aggregate_uuid, + aggregate_state: aggregate_state, + aggregate_version: aggregate_version + } = state + + %{ + application: application, + aggregate_uuid: aggregate_uuid, + aggregate_state: aggregate_state, + aggregate_version: aggregate_version + } end end diff --git a/lib/commanded/commands/dispatcher.ex b/lib/commanded/commands/dispatcher.ex index 6021c7b0..c0a0876c 100644 --- a/lib/commanded/commands/dispatcher.ex +++ b/lib/commanded/commands/dispatcher.ex @@ -132,6 +132,11 @@ defmodule Commanded.Commands.Dispatcher do # Maybe retry command when aggregate process not found on a remote node maybe_retry(pipeline, payload, context) + {:error, :aggregate_execution_timeout} -> + # The main reason for a timeout is that aggregate loading is slow, so retrying + # is expected to help. + maybe_retry(pipeline, payload, context) + {:error, error} -> pipeline |> Pipeline.respond({:error, error}) @@ -241,8 +246,10 @@ defmodule Commanded.Commands.Dispatcher do {:ok, context} -> execute(pipeline, payload, context) - reply -> - reply + {:error, :too_many_attempts} = error -> + pipeline + |> Pipeline.respond(error) + |> after_failure(payload) end end end diff --git a/test/aggregates/aggregate_telemetry_test.exs b/test/aggregates/aggregate_telemetry_test.exs index adc2e048..b884cdab 100644 --- a/test/aggregates/aggregate_telemetry_test.exs +++ b/test/aggregates/aggregate_telemetry_test.exs @@ -192,6 +192,42 @@ defmodule Commanded.Aggregates.AggregateTelemetryTest do refute_received {[:commanded, :aggregate, :execute, :stop], _measurements, _metadata} end + + test "emit `[:commanded, :aggregate, :populate]` events", + %{aggregate_uuid: aggregate_uuid, pid: pid} do + context = %ExecutionContext{ + command: %Ok{message: "ok"}, + function: :execute, + handler: ExampleAggregate + } + + # Send some commands, then kill the process to force a reload. + count = 3 + + for _i <- 1..count do + {:ok, _version, _events} = GenServer.call(pid, {:execute_command, context}) + end + + Process.exit(pid, :normal) + + # Do the reload, we should now have telemetry + start_aggregate(aggregate_uuid) + + assert_receive {[:commanded, :aggregate, :populate, :start], _measurements, _metadata} + assert_receive {[:commanded, :aggregate, :populate, :stop], measurements, metadata} + + assert match?(%{count: ^count}, measurements) + + assert match?( + %{ + aggregate_state: %ExampleAggregate{}, + aggregate_uuid: ^aggregate_uuid, + aggregate_version: ^count, + application: DefaultApp + }, + metadata + ) + end end def start_aggregate(aggregate_uuid) do @@ -207,7 +243,9 @@ defmodule Commanded.Aggregates.AggregateTelemetryTest do [ [:commanded, :aggregate, :execute, :start], [:commanded, :aggregate, :execute, :stop], - [:commanded, :aggregate, :execute, :exception] + [:commanded, :aggregate, :execute, :exception], + [:commanded, :aggregate, :populate, :start], + [:commanded, :aggregate, :populate, :stop] ], fn event_name, measurements, metadata, reply_to -> send(reply_to, {event_name, measurements, metadata}) diff --git a/test/commands/command_timeout_test.exs b/test/commands/command_timeout_test.exs index fe16f86d..67673c3d 100644 --- a/test/commands/command_timeout_test.exs +++ b/test/commands/command_timeout_test.exs @@ -16,7 +16,7 @@ defmodule Commanded.Commands.CommandTimeoutTest do # Handler is set to take longer than the configured timeout case TimeoutRouter.dispatch(command, application: DefaultApp) do {:error, :aggregate_execution_failed} -> :ok - {:error, :aggregate_execution_timeout} -> :ok + {:error, :too_many_attempts} -> :ok reply -> flunk("received an unexpected response: #{inspect(reply)}") end end diff --git a/test/middleware/middleware_test.exs b/test/middleware/middleware_test.exs index d2858c09..37019c1b 100644 --- a/test/middleware/middleware_test.exs +++ b/test/middleware/middleware_test.exs @@ -136,7 +136,7 @@ defmodule Commanded.Middleware.MiddlewareTest do # Force command handling to timeout so the aggregate process is terminated :ok = case Router.dispatch(command, application: DefaultApp, timeout: 50) do - {:error, :aggregate_execution_timeout} -> :ok + {:error, :too_many_attempts} -> :ok {:error, :aggregate_execution_failed} -> :ok end