diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d8a8b9c8..7ef533b4 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -1,6 +1,11 @@ name: Test -on: [push, pull_request] +on: + pull_request: + push: + branches: + - master + - 'v*' env: MIX_ENV: test @@ -10,12 +15,22 @@ jobs: name: Build and test runs-on: ubuntu-latest strategy: + fail-fast: false matrix: - otp: ['25.3'] - elixir: ['1.15.7'] + include: + - elixir: 1.17.x + otp: 27 + - elixir: 1.16.x + otp: 26 + - elixir: 1.15.x + otp: 26 + - elixir: 1.14.x + otp: 26 + - elixir: 1.13.x + otp: 25 steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Set up Elixir id: beam @@ -25,7 +40,7 @@ jobs: otp-version: ${{ matrix.otp }} - name: Restore dependencies cache - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: deps key: ${{ runner.os }}-${{ steps.beam.outputs.otp-version }}-${{ steps.beam.outputs.elixir-version }}-mix-${{ hashFiles('**/mix.lock') }} @@ -49,7 +64,7 @@ jobs: mix test --include distributed - name: Retrieve Dialyzer PLT cache - uses: actions/cache@v1 + uses: actions/cache@v4 id: plt-cache with: path: priv/plts diff --git a/.gitignore b/.gitignore index 2b67907c..57338d2b 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ erl_crash.dump /bench/graphs /bench/snapshots .devbox +.tool-versions diff --git a/.tool-versions b/.tool-versions deleted file mode 100644 index c1cec712..00000000 --- a/.tool-versions +++ /dev/null @@ -1,2 +0,0 @@ -elixir 1.15.7-otp-25 -erlang 25.3.2.8 diff --git a/CHANGELOG.md b/CHANGELOG.md index 4df18f56..44deddd1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,54 @@ # Changelog +## v1.4.8 + +### Bug fixes + +* `Commanded.Event.ErrorHandler` now keeps surounding failure context by @drteeth in https://github.com/commanded/commanded/issues/617 + +## v1.4.7 + +### Enhancements + +* Application-wide event handler error handling by @drteeth in https://github.com/commanded/commanded/pull/605 +* chore: remove asdf file by @yordis in https://github.com/commanded/commanded/pull/570 +* chore: improve docs about aggregate version by @yordis in https://github.com/commanded/commanded/pull/608 +* Update include_aggregate_version documentation by @TylerPachal in https://github.com/commanded/commanded/pull/609 + +### Bug fixes + +* Fix flakey test by @drteeth in https://github.com/commanded/commanded/pull/599 +* feat: default aggregate lifespan configuration by @yordis in https://github.com/commanded/commanded/pull/548 +* Aggregate.handle_* now properly handles lifespans by @drteeth in https://github.com/commanded/commanded/pull/606 +* Allow registration handle_call/cast callbacks to be called by @drteeth in https://github.com/commanded/commanded/pull/607 +* Update local_cluster by @drteeth in https://github.com/commanded/commanded/pull/610 + +## v1.4.6 + +### Enhancements +- Includes changelog updates +- Version bump + +## v1.4.5 + +### Enhancements +- Support OTP 26 and Elixir 1.17 ([#595](https://github.com/commanded/commanded/pull/595)). + +## v1.4.4 + +### Enhancements +- feat: put aggregate_state into assigns of the pipeline ([#502](https://github.com/commanded/commanded/pull/502)). +- Add tag to partition test case ([#525](https://github.com/commanded/commanded/pull/525)). +- Make before_reset/0 an explicit callback function ([#550](https://github.com/commanded/commanded/pull/550)). +- New `Event.Handler.after_start/1` callback allows configuration in the handler's process ([#568](https://github.com/commanded/commanded/pull/568)). + +### Bug fixes +- Fix EventData typespec ([#495](https://github.com/commanded/commanded/pull/495)). +- Fix refute_receive_event examples ([#557](https://github.com/commanded/commanded/pull/557)). +- Fix interested? function doc ([#562](https://github.com/commanded/commanded/pull/562)). +- Use TypeProvider for process managers snapshot serialization ([#558](https://github.com/commanded/commanded/pull/558)). +- fix(router.ex): Telemetry is not emitted if dispatch fails for {:error, :unregistered_command} ([#563](in https://github.com/commanded/commanded/pull/563)). + ## v1.4.3 ### Enhancements diff --git a/README.md b/README.md index e8debf06..e532ecb3 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,7 @@ MIT License - [Domain events](guides/Events.md#domain-events) - [Event handlers](guides/Events.md#event-handlers) - [Consistency guarantee](guides/Events.md#consistency-guarantee) + - [Error handling](guides/Events.md#handling-errors) - [Upcasting events](guides/Events.md#upcasting-events) - [Process managers](guides/Process%20Managers.md) - [Example process manager](guides/Process%20Managers.md#example-process-manager) @@ -180,4 +181,4 @@ Commanded exists thanks to the following people who have contributed. ## Need help? -Please [open an issue](https://github.com/commanded/commanded/issues) if you encounter a problem, or need assistance. You can also seek help in the #commanded channel in the [official Elixir Slack](https://elixir-slackin.herokuapp.com/). +Please [open an issue](https://github.com/commanded/commanded/issues) if you encounter a problem, or need assistance. You can also seek help in the #commanded channel in the [official Elixir Slack](https://elixir-lang.slack.com/). diff --git a/devbox.lock b/devbox.lock index 0300faf9..dea6227d 100644 --- a/devbox.lock +++ b/devbox.lock @@ -1,8 +1,13 @@ { "lockfile_version": "1", "packages": { + "darwin.apple_sdk.frameworks.CoreServices": { + "resolved": "github:NixOS/nixpkgs/2d068ae5c6516b2d04562de50a58c682540de9bf?narHash=sha256-XcdiWLEhjJkMxDLKQJ0CCivmYYCvA5MDxu9pMybM5kM%3D#darwin.apple_sdk.frameworks.CoreServices", + "source": "nixpkg" + }, "elixir@1.16.3": { "last_modified": "2024-08-14T11:41:26Z", + "plugin_version": "0.0.1", "resolved": "github:NixOS/nixpkgs/0cb2fd7c59fed0cd82ef858cbcbdb552b9a33465#elixir", "source": "devbox-search", "version": "1.16.3", @@ -48,6 +53,9 @@ "store_path": "/nix/store/mxr48y86anc2p6lgvfd9l0qq1d4csz2h-elixir-1.16.3" } } + }, + "github:NixOS/nixpkgs/nixpkgs-unstable": { + "resolved": "github:NixOS/nixpkgs/2d068ae5c6516b2d04562de50a58c682540de9bf?lastModified=1740303746&narHash=sha256-XcdiWLEhjJkMxDLKQJ0CCivmYYCvA5MDxu9pMybM5kM%3D" } } } diff --git a/guides/Aggregates.md b/guides/Aggregates.md index fd4671dc..2494291f 100644 --- a/guides/Aggregates.md +++ b/guides/Aggregates.md @@ -20,7 +20,7 @@ end ## Command functions -A command function receives the aggregate's state and the command to execute. It must return the resultant domain events, which may be one event or multiple events. You can return a single event or a list of events: `%Event{}`, `[%Event{}]`, `{:ok, %Event{}}`, or `{:ok, [%Event{}]}`. +A command function receives the aggregate's state and the command to execute. It must return the resultant domain events, which may be one event or multiple events. You can return a single event or a list of events: `%Event{}`, `[%Event{}]`, `{:ok, %Event{}}`, or `{:ok, [%Event{}]}`. To respond without returning an event you can return `:ok`, `nil` or an empty list as either `[]` or `{:ok, []}`. @@ -47,7 +47,7 @@ end The state of an aggregate can only be mutated by applying a domain event to its state. This is achieved by an `apply/2` function that receives the state and the domain event. It returns the modified state. -Pattern matching is used to invoke the respective `apply/2` function for an event. These functions *must never fail* as they are used when rebuilding the aggregate state from its history of domain events. You cannot reject the event once it has occurred. +Pattern matching is used to invoke the respective `apply/2` function for an event. These functions **MUST NOT** fail as they are used when rebuilding the aggregate state from its history of domain events. You cannot reject the event once it has occurred. ```elixir defmodule ExampleAggregate do @@ -160,6 +160,20 @@ defmodule BankAccount do end ``` +> #### Effective Aggregate State {: .tip} +> +> The aggregate state should be carefully designed to maintain only the data required to: +> +> - Enforce business rules and invariants (e.g. preventing withdrawals that would overdraw an account) +> - Provide context for command handling based on previous events (e.g. checking an order's current status) +> - Make decisions that depend on historical events (e.g. verifying a refund doesn't exceed original payment) +> +> A common anti-pattern is blindly copying all event data into aggregate state without considering whether that data +> is actually needed for command handling. If a piece of state is never referenced in any command handling logic, +> it should be removed from the aggregate. This keeps the aggregate focused and maintainable. +> +> State that is only needed for querying/reporting should be maintained in read models rather than aggregate state. + ## Using `Commanded.Aggregate.Multi` to return multiple events Sometimes you need to create multiple events from a single command. You can use `Commanded.Aggregate.Multi` to help track the events and update the aggregate state. This can be useful when you want to emit multiple events that depend upon the aggregate state being updated. @@ -284,8 +298,9 @@ defimpl Commanded.Serialization.JsonDecoder, for: ExampleAggregate do end end ``` + Note: The default JSON encoding of a `DateTime` struct uses the `to_iso8601/1` function which is why we must decode it using the `from_iso8601/1` function. ### Rebuilding an aggregate snapshot -Whenever you change the structure of an aggregate's state you *must* increment the `snapshot_version` number. The aggregate state will be rebuilt from its events, ignoring any existing snapshots. They will be overwritten when the next snapshot is taken. +Whenever you change the structure of an aggregate's state you **MUST** increment the `snapshot_version` number. The aggregate state will be rebuilt from its events, ignoring any existing snapshots. They will be overwritten when the next snapshot is taken. diff --git a/guides/Commands.md b/guides/Commands.md index 58caa6a4..740b05ad 100644 --- a/guides/Commands.md +++ b/guides/Commands.md @@ -141,7 +141,7 @@ The above configuration requires that all commands for the `BankAccount` aggrega #### Identity prefix -An optional identity prefix can be used to distinguish between different aggregates that would otherwise share the same identity. As an example you might have a `User` and a `UserPreferences` aggregate that you wish to share the same identity. In this scenario you should specify a `prefix` for each aggregate (e.g. "user-" and "user-preference-"). +An optional identity prefix can be used to distinguish between different aggregates that would otherwise share the same identity. As an example you might have a `User` and a `UserPreferences` aggregate that you wish to share the same identity. In this scenario you should specify a `prefix` for each aggregate (e.g. "user-" and "user-preference-"). ```elixir defmodule BankRouter do @@ -155,7 +155,7 @@ defmodule BankRouter do end ``` -The prefix is used as the stream identity when appending, and reading, the aggregate's events (e.g. ``). Note you *must not* change the stream prefix once you have events persisted in your event store, otherwise the aggregate's events cannot be read from the event store and its state cannot be rebuilt since the stream name will be different. +The prefix is used as the stream identity when appending, and reading, the aggregate's events (e.g. ``). Note you **must not** change the stream prefix once you have events persisted in your event store, otherwise the aggregate's events cannot be read from the event store and its state cannot be rebuilt since the stream name will be different. #### Custom aggregate identity @@ -231,40 +231,40 @@ end You can choose the consistency guarantee when dispatching a command. -- *Strong consistency* offers up-to-date data but at the cost of high latency. -- *Eventual consistency* offers low latency but read model queries may reply with stale data since they may not have processed the persisted events. +- **Strong consistency** offers up-to-date data but at the cost of high latency. +- **Eventual consistency** offers low latency but read model queries may reply with stale data since they may not have processed the persisted events. In Commanded, the available options during command dispatch are: - - `:eventual` (default) - don't block command dispatch and don't wait for any event handlers, regardless of their own consistency configuration. +- `:eventual` (default) - don't block command dispatch and don't wait for any event handlers, regardless of their own consistency configuration. - ```elixir - :ok = BankApp.dispatch(command) - :ok = BankApp.dispatch(command, consistency: :eventual) - ``` + ```elixir + :ok = BankApp.dispatch(command) + :ok = BankApp.dispatch(command, consistency: :eventual) + ``` - - `:strong` - block command dispatch until all strongly consistent event handlers and process managers have successfully processed all events created by the command. +- `:strong` - block command dispatch until all strongly consistent event handlers and process managers have successfully processed all events created by the command. - ```elixir - :ok = BankApp.dispatch(command, consistency: :strong) - ``` + ```elixir + :ok = BankApp.dispatch(command, consistency: :strong) + ``` - Dispatching a command using `:strong` consistency but without any strongly consistent event handlers configured will have no effect. + Dispatching a command using `:strong` consistency but without any strongly consistent event handlers configured will have no effect. - - Provide an explicit list of event handler and process manager modules (or their configured names), containing only those handlers you'd like to wait for. No other handlers will be awaited on, regardless of their own configured consistency setting. +- Provide an explicit list of event handler and process manager modules (or their configured names), containing only those handlers you'd like to wait for. No other handlers will be awaited on, regardless of their own configured consistency setting. - ```elixir - :ok = BankApp.dispatch(command, consistency: [ExampleHandler, AnotherHandler]) - :ok = BankApp.dispatch(command, consistency: ["ExampleHandler", "AnotherHandler"]) - ``` + ```elixir + :ok = BankApp.dispatch(command, consistency: [ExampleHandler, AnotherHandler]) + :ok = BankApp.dispatch(command, consistency: ["ExampleHandler", "AnotherHandler"]) + ``` - Note you cannot opt-in to strong consistency for a handler that has been configured as eventually consistent. + Note you cannot opt-in to strong consistency for a handler that has been configured as eventually consistent. #### Which consistency guarantee should I use? When dispatching a command using `consistency: :strong` the dispatch will block until all of the strongly consistent event handlers and process managers have handled all events created by the command. This guarantees that when you receive the `:ok` response from dispatch, your strongly consistent read models will have been updated and can safely be queried. -Strong consistency helps to alleviate problems and workarounds you would otherwise encounter when dealing with eventual consistency in your own application. Use `:strong` consistency when you want to query a read model immediately after dispatching a command. You *must* also configure the event handler to use `:strong` consistency. +Strong consistency helps to alleviate problems and workarounds you would otherwise encounter when dealing with eventual consistency in your own application. Use `:strong` consistency when you want to query a read model immediately after dispatching a command. You **must** also configure the event handler to use `:strong` consistency. Using `:eventual` consistency, or omitting the `consistency` option, will cause the command dispatch to immediately return without waiting for any event handlers or process managers. The handlers run independently, and asynchronously, in the background, therefore you will need to deal with potentially stale read model data. @@ -318,13 +318,64 @@ This is useful if you need to get information from the events produced by the ag ### Dispatch returning aggregate version -You can optionally choose to include the aggregate's version as part of the dispatch result by setting the `include_aggregate_version` option to true: +You can optionally choose to include the aggregate's version as part of the dispatch result by setting the `include_aggregate_version` option to true: ```elixir {:ok, aggregate_version} = BankApp.dispatch(command, include_aggregate_version: true) ``` -This is useful when you need to wait for an event handler, such as a read model projection, to be up-to-date before continuing execution or querying its data. +The returned `aggregate_version` can be used as an ETAG, allowing you to synchronize operations across the read-side and write-side of your application. For example, if need to wait for an event handler, such as a read model projection, to be up-to-date before continuing execution or querying its data. + +```elixir +defmodule BankAccountProjector do + use Commanded.Projections.Ecto, + application: BankApp, + name: "BankAccountProjector", + repo: BankApp.Repo + + project(%BankAccountOpened{} = event, metadata, fn multi -> + multi + |> Ecto.Multi.run(:bank_account, &find_bank_account(&1, &2, event.id)) + |> Ecto.Multi.update( + :updated_bank_account, + &BankAccount.changeset(&1.bank_account, %{ + # Notice that I am using the stream version from the metadata + # to update the auction's stream version. + stream_version: metadata.stream_version, + }) + ) + end) +end +``` + +Then you can use the aggregate version to wait for the event handler to be up-to-date: + +```elixir +defmodule BankAccounts do + def get_bank_account(id, stream_version) do + query = + from b in BankAccount, + where: b.id == ^id, + where: b.stream_version >= ^stream_version, + order_by: [asc: b.stream_version] + + case BankApp.one(query) do + nil -> + # not ready yet means the projection is not yet up-to-date + # so you can retry the query + {:error, :not_ready} + + bank_account -> + {:ok, bank_account} + end + end +end +``` + +### Aggregate version as an ETAG + +The aggregate version can be thought of as an ETAG (Entity Tag) for a given resource. ETAGs are commonly used in HTTP caching mechanisms to determine if a resource has changed. Similarly, the aggregate version serves as a unique identifier for a specific state of the aggregate. +When building a REST API, you can use the aggregate version as the ETAG in your HTTP responses. ### Causation and correlation ids @@ -342,7 +393,7 @@ You can set causation and correlation ids when dispatching a command: When dispatching a command in an event handler, you should copy these values from the metadata (second) argument associated with the event you are handling: ```elixir -defmodule ExampleHandler do +defmodule ExampleHandler do use Commanded.Event.Handler, application: ExampleApp, name: "ExampleHandler" diff --git a/guides/Deployment.md b/guides/Deployment.md index fbcb7de4..5e95caf7 100644 --- a/guides/Deployment.md +++ b/guides/Deployment.md @@ -38,7 +38,7 @@ Or configure your application to use the `:global` registry in config: config :my_app, MyApp.Application, registry: :global ``` -Note that when clusters are formed dynamically (e.g. using [libcluster](https://hex.pm/packages/libcluster)]), the typical sequence of events is that first all nodes will start all processes, then the cluster is formed and `:global` will kill off duplicate names. This is ugly in the logs but expected; it also means that if your supervisor's `:max_restarts` is too low - lower than the number of event handlers/projectors you start - it will immediately exit and if that was your +Note that when clusters are formed dynamically (e.g. using [libcluster](https://hex.pm/packages/libcluster)), the typical sequence of events is that first all nodes will start all processes, then the cluster is formed and `:global` will kill off duplicate names. This is ugly in the logs but expected; it also means that if your supervisor's `:max_restarts` is too low - lower than the number of event handlers/projectors you start - it will immediately exit and if that was your application supervisor, your app gets shutdown. The solution is simple: keep `:max_restarts` above the number of event handlers you start under your supervisor and the transition from no cluster to cluster will be clean. ### Commanded Swarm registry diff --git a/guides/Events.md b/guides/Events.md index 4ad9ce5b..4c10a82d 100644 --- a/guides/Events.md +++ b/guides/Events.md @@ -33,6 +33,7 @@ defmodule ExampleHandler do application: ExampleApp, name: "ExampleHandler" + @impl Commanded.Event.Handler def handle(%AnEvent{..}, _metadata) do # ... process the event :ok @@ -70,6 +71,23 @@ Use the `:current` position when you don't want newly created event handlers to You should start your event handlers using an OTP `Supervisor` to ensure they are restarted on error. See the [Supervision guide](https://hexdocs.pm/commanded/supervision.html) for more details. +### Configuration options + +You can choose the default error behaviour for *all* of your event handlers in each Application's configuration: + +```ellxir +config :example, ExampleApp, + on_event_handler_error: :stop # or :backoff or MyCustomErrorHandler +``` + +The default behaviour is to stop the event handler process when any error is encountered. As event handlers are supervised either by a custom supervisor or by the application itself, the handlers are usually restarted right away. If the error is permanent, due to a logic or data bug, then the process will likely crash again right away. This can lead the supervisor itself to give up, crash and this will continue up your supervision tree until it stops your application. + +The `:backoff` option, introduced in v1.4.7, cause the even handler to retry after an exponentially increasing backoff period (up to a maximum of 24 hours). The event handler will still not be able to make forward progress until you address the issue, but it won't take your supervisors or applications down with it. + +If you want to provide your own strategy, you can pass in a module that implements an `c:error/3` function that matches the `c:error/3` callback mentioned above. + +It's important to note that if your event handler overrides the `error/3` callback, then that will be called instead of the application-wide strategy. + ### Subscribing to an individual stream By default event handlers will subscribe to all events appended to any stream. Provide a `subscribe_to` option to subscribe to a single stream. @@ -85,12 +103,54 @@ end This will ensure the handler only receives events appended to that stream. +### Handling errors + +It is important to consider how errors in the handling of events will affect your application. By default any errors encountered are configured to stop the event handler immediately. For this reason it is vital to have handlers under supervision. Errors in handlers occur in two categories: + +#### Transient errors + +Transient errors occur due temporary issues such as network connectivity, flakey downstream services, memory, or disk issues. Retrying here will often solve the problem. + +#### Permanent errors + +Permanent errors occure due to bugs in code, and as such have no hope of resolving themselves. The handler will not be able to make any progress, and will repeatedly crash. + +Repeated crashes become a problem when the handler crashes more than it's supervisor is configured to tolerate. When that happens, the supervisor itself will crash and this process will continue upwards until finally the application itself crashes. This is obviously bad. + +You can opt to have your event handlers backoff when crashing to avoid this: + +```elixir +config :my_app, MyApp.CommandedApp, + on_event_handler_error: :backoff +``` + +This will cause all event handlers from `MyApp.CommandedApp` to back off exponentially when they encounter an error. Errors will backoff to maximum of 24 hours, and will continue at that rate until the issue is resolved. The event handler still can't make any forward progress, but it will at least not bring down your application. + +It is also possible to configure the error handler for the default behaviour `:stop`. + +You can bring your own Commanded application-level behaviour by specifiying a module which implements `c:Commanded.Event.Handler.error/3` + +```elixir +defmodule YoloErrorHandler do + def error(error, failing_event, failure_context) do + # Ignore all errors! + :skip + end +end + +config :my_app, MyApp.CommandedApp, + on_event_handler_error: YoloErrorHandler +``` + ### Event handler callbacks -- `c:Commanded.Event.Handler.init/0` - (optional) initialisation callback function called when the handler starts. - `c:Commanded.Event.Handler.init/1` - (optional) used to configure the handler before it starts. +- `c:Commanded.Event.Handler.after_start/1` - (optional) initialisation callback function called in the process of the started handler. - `c:Commanded.Event.Handler.error/3` - (optional) called when an event handle/2 callback returns an error. +Error event handlers configured on a per handler basis like this will override the application level error handling. + + ### Metadata The `handle/2` function in your handler receives the domain event and a map of metadata associated with that event. You can provide the metadata key/value pairs when dispatching a command: @@ -120,6 +180,7 @@ defmodule ExampleHandler do application: ExampleApp, name: "ExampleHandler" + @impl Commanded.Event.Handler def handle(event, metadata) do IO.inspect(metadata) # %{ @@ -199,7 +260,7 @@ end An event handler can be reset (using a mix task), it will restart the event store subscription from the configured `start_from`. This allow an individual handler to be restart while the app is still running. -You can define a `before_reset/1` function that will be called before resetting the event handler. +You can implement the `before_reset/0` callback that will be called before resetting the event handler. ```elixir defmodule ExampleHandler do @@ -211,6 +272,7 @@ defmodule ExampleHandler do alias Commanded.Event.FailureContext + @impl Commanded.Event.Handler def before_reset do # Do something :ok diff --git a/guides/InMemoryEventStore.md b/guides/InMemoryEventStore.md new file mode 100644 index 00000000..63c600fd --- /dev/null +++ b/guides/InMemoryEventStore.md @@ -0,0 +1,57 @@ +# In memory event store + +Commanded provides an in-memory event store implementation, for **test use only**, in the module `Commanded.EventStore.Adapters.InMemory`. This is a transient event store without persistence. + +### Configuration + +Ensure you configure the in-memory adapter in your environment config file, `config/test.exs`: + +```elixir +config :my_app, MyApp.App, + event_store: [ + adapter: Commanded.EventStore.Adapters.InMemory, + serializer: Commanded.Serialization.JsonSerializer + ] +``` + +You may replace or omit the serializer configuration. By including it here we ensure events used by the tests can be successfully serialized and deserialized. + +### Usage + +#### ExUnit case template + +You can use ExUnit's case template feature to restart the in-memory event store for each test run. + +```elixir +defmodule InMemoryEventStoreCase do + use ExUnit.CaseTemplate + + alias Commanded.EventStore.Adapters.InMemory + + setup do + {:ok, _apps} = Application.ensure_all_started(:my_app) + + on_exit(fn -> + :ok = Application.stop(:my_app) + end) + end +end +``` + +Replace both occurrences of `:my_app` in the above ExUnit case template with the name of your own application. + +The reason why your app must be stopped and then started between resetting the event store is to ensure all application processes are restarted with their initial state to prevent state from one test affecting another. + +Use the `InMemoryEventStoreCase` module within any test files that need to use the event store. + +```elixir +defmodule ExampleTest do + use InMemoryEventStoreCase + + # Define your tests here ... +end +``` + +#### Running your tests + +Run `mix test` as usual to execute the tests using the in-memory event store. diff --git a/guides/Testing.md b/guides/Testing.md index 1e87a3dd..242b9804 100644 --- a/guides/Testing.md +++ b/guides/Testing.md @@ -119,3 +119,179 @@ test "make sure aggregate state are what we wanted" do } end ``` + +## Tests using the event store and read store + +To test your application using an event store and read model projection you can take advantage of ExUnit's case template feature to have the databases reset between each test execution. This guarantees that each test starts from a known good state and one test won't affect any other. + +First, define a `DataCase` module which is used to reset the event store and read store databases after each test run using the `on_exit/0` callback: + +```elixir +# test/support/data_case.ex +defmodule MyApp.DataCase do + use ExUnit.CaseTemplate + + using do + quote do + import Ecto + import Ecto.Changeset + import Ecto.Query + import Commanded.Assertions.EventAssertions + end + end + + setup do + {:ok, _} = Application.ensure_all_started(:my_app) + + on_exit(fn -> + :ok = Application.stop(:my_app) + + MyApp.Storage.reset!() + end) + + :ok + end +end +``` + +The `DataCase` module uses the following `Storage.reset!/0` function to: + +1. Reset the Postgres EventStore database. +2. Truncate the listed tables in the read store database. + +Rename `table1`, `table2`, and `table3` to your own table names and remember to include any new tables when added to your app. + +```elixir +# test/support/storage.ex +defmodule MyApp.Storage do + @doc """ + Clear the event store and read store databases + """ + def reset! do + reset_eventstore() + reset_readstore() + end + + defp reset_eventstore do + config = MyEventStore.config() + + {:ok, conn} = Postgrex.start_link(config) + + EventStore.Storage.Initializer.reset!(conn, config) + end + + defp reset_readstore do + config = Application.get_env(:my_app, MyApp.Repo) + + {:ok, conn} = Postgrex.start_link(config) + + Postgrex.query!(conn, truncate_readstore_tables(), []) + end + + defp truncate_readstore_tables do + """ + TRUNCATE TABLE + table1, + table2, + table3 + RESTART IDENTITY + CASCADE; + """ + end +end +``` + +You need to include the `test/support` files in the test environment Elixir paths by adding the following `elixirc_paths/1` function to your app's `mix.exs` file: + +```elixir +# mix.exs +defmodule MyApp.Mixfile do + use Mix.Project + + # Include `test/support` files in test environment + defp elixirc_paths(:test), do: ["lib", "test/support"] + defp elixirc_paths(_), do: ["lib"] +end +``` + +Finally, you can use the `MyApp.DataCase` case template within any test modules that require access to the databases: + +```elixir +# test/example_test.exs +defmodule MyApp.ExampleTest do + use MyApp.DataCase + + # Each test will be run against clean read and write databases. + # After test execution (regardless of success or failure) the databases will be reset. +end +``` + +Run your tests using: `mix test` + +When these tests run they will execute against empty event store and read store databases. The caveat with the approach is that the databases will be reset after your tests run; it won't be possible to look at data contained within database after a test fails to debug the failure. The workaround is to temporarily disable the reset operation, run a single failing test, and then you will be able to look at the data. Note that the next time you run any test it will fail because the databases won't have been reset. Subsequent tests will behave normally, assuming you reinstate the reset behaviour. + +--- + +#### Using the in-memory event store for testing + +You can use the [[InMemoryEventStore]] to test your application. You can also use an [ExUnit `CaseTemplate` to have the in-memory event store restarted between each test run](https://github.com/commanded/commanded/wiki/In-memory-event-store#exunit-case-template). + +--- + +#### Using `strong` consistency for tests only + +You can configure environment specific consistency setting for Commanded event handlers: + +```elixir +# config/config.exs +use Mix.Config + +config :my_app, consistency: :eventual +``` + +```elixir +# config/test.exs +use Mix.Config + +config :my_app, consistency: :strong +``` + +Then read the setting when defining your event handlers and process managers: + +```elixir +defmodule ExampleEventHandler do + use Commanded.Event.Handler, + name: __MODULE__, + consistency: Application.get_env(:my_app, :consistency, :eventual) +end +``` + +--- + +#### Given *events* / When *command* / Then *assert* tests + +In your test you can append events to the aggregate's event stream to setup its *given* state. Use `Commanded.EventStore.append_to_stream/4` to append events directly to the event store you've configured to use with Commanded. This allows you to configure a different event store for each environment (e.g. in-memory event store for test env). + +You need to map your app's domain events to `Commanded.EventStore.EventData` structs as follows: + +```elixir +causation_id = UUID.uuid4() +correlation_id = UUID.uuid4() + +event_data = + Enum.map(events, fn -> event + %Commanded.EventStore.EventData{ + causation_id: causation_id, + correlation_id: correlation_id, + event_type: Commanded.EventStore.TypeProvider.to_string(event), + data: event, + metadata: %{}, + } + ) + +{:ok, _} = Commanded.EventStore.append_to_stream(application, stream_uuid, expected_version, event_data) +``` + +The `stream_uuid` will be your aggregate's identity and `expected_version` is the aggregate version (count of events already appended to its stream, use `0` when creating a new aggregate). + +Once you've appended the events, you can dispatch the command via your router. The aggregate process will be started, it'll fetch its events, including those you just appended, and then handle the command. diff --git a/guides/Usage.md b/guides/Usage.md index 4d8db967..7fc77bcb 100644 --- a/guides/Usage.md +++ b/guides/Usage.md @@ -104,7 +104,7 @@ Here's an example bank account opening feature built using Commanded to demonstr application: BankApp, name: __MODULE__ - def init do + def after_start(_state) do with {:ok, _pid} <- Agent.start_link(fn -> 0 end, name: __MODULE__) do :ok end diff --git a/lib/application.ex b/lib/application.ex index b166e3b3..ecd99570 100644 --- a/lib/application.ex +++ b/lib/application.ex @@ -398,4 +398,9 @@ defmodule Commanded.Application do @doc false @spec registry_adapter(Commanded.Application.t()) :: {module, map} def registry_adapter(application), do: Config.get(application, :registry) + + @doc false + @spec on_event_handler_error(Commanded.Application.t()) :: atom | module + def on_event_handler_error(application), + do: Config.get(application, :on_event_handler_error) end diff --git a/lib/commanded/aggregates/aggregate.ex b/lib/commanded/aggregates/aggregate.ex index 517137ca..4ff141f2 100644 --- a/lib/commanded/aggregates/aggregate.ex +++ b/lib/commanded/aggregates/aggregate.ex @@ -326,14 +326,13 @@ defmodule Commanded.Aggregates.Aggregate do if Snapshotting.snapshot_required?(snapshotting, aggregate_version) do :ok = GenServer.cast(self(), {:take_snapshot, lifespan_timeout}) + # Don't reply with a lifetime because we just asked for a snapshot to + # be taken. When it finishes, it will set the timeout. {:reply, formatted_reply, state} else state = %Aggregate{state | lifespan_timeout: lifespan_timeout} - case lifespan_timeout do - {:stop, reason} -> {:stop, reason, formatted_reply, state} - lifespan_timeout -> {:reply, formatted_reply, state, lifespan_timeout} - end + reply_with_lifespan(formatted_reply, state) end telemetry_metadata = telemetry_metadata(context, from, state) @@ -347,7 +346,7 @@ defmodule Commanded.Aggregates.Aggregate do def handle_call(:aggregate_state, _from, %Aggregate{} = state) do %Aggregate{aggregate_state: aggregate_state} = state - {:reply, aggregate_state, state} + reply_with_lifespan(aggregate_state, state) end @doc false @@ -355,13 +354,13 @@ defmodule Commanded.Aggregates.Aggregate do def handle_call(:aggregate_version, _from, %Aggregate{} = state) do %Aggregate{aggregate_version: aggregate_version} = state - {:reply, aggregate_version, state} + reply_with_lifespan(aggregate_version, state) end @doc false @impl GenServer def handle_info({:events, events}, %Aggregate{} = state) do - %Aggregate{application: application, lifespan_timeout: lifespan_timeout} = state + %Aggregate{application: application} = state Logger.debug(describe(state) <> " received events: " <> inspect(events)) @@ -372,10 +371,7 @@ defmodule Commanded.Aggregates.Aggregate do |> Upcast.upcast_event_stream(additional_metadata: %{application: application}) |> Enum.reduce(state, &handle_event/2) - case lifespan_timeout do - {:stop, reason} -> {:stop, reason, state} - lifespan_timeout -> {:noreply, state, lifespan_timeout} - end + noreply_with_lifespan(state) catch {:error, error} -> Logger.debug(describe(state) <> " stopping due to: " <> inspect(error)) @@ -393,6 +389,14 @@ defmodule Commanded.Aggregates.Aggregate do {:stop, :normal, state} end + @doc false + @impl GenServer + def handle_info(message, %Aggregate{} = state) do + Logger.debug("received unexpected message in handle_info/2: " <> inspect(message)) + + noreply_with_lifespan(state) + end + defp event_already_seen?(%RecordedEvent{} = event, %Aggregate{} = state) do %RecordedEvent{stream_version: stream_version} = event %Aggregate{aggregate_version: aggregate_version} = state @@ -582,7 +586,6 @@ defmodule Commanded.Aggregates.Aggregate do %Aggregate{ aggregate_state: aggregate_state, aggregate_version: aggregate_version, - lifespan_timeout: lifespan_timeout, snapshotting: snapshotting } = state @@ -599,10 +602,7 @@ defmodule Commanded.Aggregates.Aggregate do state end - case lifespan_timeout do - {:stop, reason} -> {:stop, reason, state} - lifespan_timeout -> {:noreply, state, lifespan_timeout} - end + noreply_with_lifespan(state) end defp telemetry_start(telemetry_metadata) do @@ -666,4 +666,22 @@ defmodule Commanded.Aggregates.Aggregate do "#{inspect(aggregate_module)}<#{aggregate_uuid}@#{aggregate_version}>" end + + defp reply_with_lifespan(reply, state) do + %Aggregate{lifespan_timeout: lifespan_timeout} = state + + case lifespan_timeout do + {:stop, reason} -> {:stop, reason, reply, state} + lifespan_timeout -> {:reply, reply, state, lifespan_timeout} + end + end + + defp noreply_with_lifespan(state) do + %Aggregate{lifespan_timeout: lifespan_timeout} = state + + case lifespan_timeout do + {:stop, reason} -> {:stop, reason, state} + lifespan_timeout -> {:noreply, state, lifespan_timeout} + end + end end diff --git a/lib/commanded/aggregates/execution_context.ex b/lib/commanded/aggregates/execution_context.ex index 043964d9..487e0a87 100644 --- a/lib/commanded/aggregates/execution_context.ex +++ b/lib/commanded/aggregates/execution_context.ex @@ -51,7 +51,7 @@ defmodule Commanded.Aggregates.ExecutionContext do before_execute: nil, retry_attempts: 0, returning: false, - lifespan: DefaultLifespan, + lifespan: Application.compile_env(:commanded, :aggregate_lifespan, DefaultLifespan), metadata: %{} ] @@ -80,13 +80,13 @@ defmodule Commanded.Aggregates.ExecutionContext do case returning do :aggregate_state -> - {:ok, aggregate_version, events, aggregate_state} + {:ok, aggregate_version, events, aggregate_state, aggregate_state} :aggregate_version -> - {:ok, aggregate_version, events, aggregate_version} + {:ok, aggregate_version, events, aggregate_state, aggregate_version} :events -> - {:ok, aggregate_version, events, events} + {:ok, aggregate_version, events, aggregate_state, events} :execution_result -> result = %ExecutionResult{ @@ -97,10 +97,10 @@ defmodule Commanded.Aggregates.ExecutionContext do metadata: metadata } - {:ok, aggregate_version, events, result} + {:ok, aggregate_version, events, aggregate_state, result} false -> - {:ok, aggregate_version, events} + {:ok, aggregate_version, events, aggregate_state} end end diff --git a/lib/commanded/application/config.ex b/lib/commanded/application/config.ex index a71373b6..6553747c 100644 --- a/lib/commanded/application/config.ex +++ b/lib/commanded/application/config.ex @@ -15,6 +15,14 @@ defmodule Commanded.Application.Config do application |> lookup() |> Keyword.get(setting) end + @doc """ + Put a value into the application's config. This is test induced damage and you + probably shouldn't be using it. + """ + def __put__(application, setting, value) when is_atom(application) and is_atom(setting) do + GenServer.call(__MODULE__, {:put, application, setting, value}) + end + @impl GenServer def init(:ok) do table = :ets.new(__MODULE__, [:named_table, read_concurrency: true]) @@ -30,6 +38,17 @@ defmodule Commanded.Application.Config do {:reply, :ok, table} end + @impl GenServer + def handle_call({:put, application, settings, value}, _from, table) do + config = + lookup(application) + |> Keyword.put(settings, value) + + true = :ets.update_element(table, application, {4, config}) + + {:reply, :ok, table} + end + @impl GenServer def handle_info({:DOWN, ref, _type, pid, _reason}, table) do [[application]] = :ets.match(table, {:"$1", pid, ref, :_}) diff --git a/lib/commanded/assertions/event_assertions.ex b/lib/commanded/assertions/event_assertions.ex index 80b235e1..3880e4a1 100644 --- a/lib/commanded/assertions/event_assertions.ex +++ b/lib/commanded/assertions/event_assertions.ex @@ -91,13 +91,13 @@ defmodule Commanded.Assertions.EventAssertions do Refute that `ExampleEvent` is produced by given anonymous function: - refute_receive_event(ExampleApp, ExampleEvent, fn -> - :ok = MyApp.dispatch(command) - end) + refute_receive_event(ExampleApp, ExampleEvent, fn -> + :ok = MyApp.dispatch(command) + end) Refute that `ExampleEvent` is produced by `some_func/0` function: - refute_receive_event(ExampleApp, ExampleEvent, &some_func/0) + refute_receive_event(ExampleApp, ExampleEvent, &some_func/0) Refute that `ExampleEvent` matching given `event_matches?/1` predicate function is produced by `some_func/0` function: diff --git a/lib/commanded/commands/dispatcher.ex b/lib/commanded/commands/dispatcher.ex index c5fb5d24..c0a0876c 100644 --- a/lib/commanded/commands/dispatcher.ex +++ b/lib/commanded/commands/dispatcher.ex @@ -108,17 +108,19 @@ defmodule Commanded.Commands.Dispatcher do end case result do - {:ok, aggregate_version, events} -> + {:ok, aggregate_version, events, aggregate_state} -> pipeline |> Pipeline.assign(:aggregate_version, aggregate_version) |> Pipeline.assign(:events, events) + |> Pipeline.assign(:aggregate_state, aggregate_state) |> after_dispatch(payload) |> Pipeline.respond(:ok) - {:ok, aggregate_version, events, reply} -> + {:ok, aggregate_version, events, aggregate_state, reply} -> pipeline |> Pipeline.assign(:aggregate_version, aggregate_version) |> Pipeline.assign(:events, events) + |> Pipeline.assign(:aggregate_state, aggregate_state) |> after_dispatch(payload) |> Pipeline.respond({:ok, reply}) diff --git a/lib/commanded/commands/router.ex b/lib/commanded/commands/router.ex index 60599301..d726cdb6 100644 --- a/lib/commanded/commands/router.ex +++ b/lib/commanded/commands/router.ex @@ -208,9 +208,13 @@ defmodule Commanded.Commands.Router do alias Commanded.Aggregates.DefaultLifespan alias Commanded.Commands.{ExecutionResult, Router} + alias Commanded.Telemetry alias Commanded.UUID defmacro __using__(opts) do + otp_app = Keyword.get(opts, :otp_app, :commanded) + app_module = Keyword.get(opts, :application) + quote do require Logger @@ -224,11 +228,20 @@ defmodule Commanded.Commands.Router do Module.register_attribute(__MODULE__, :registered_identities, accumulate: false) @default_dispatch_opts [ - application: Keyword.get(unquote(opts), :application), + application: unquote(app_module), consistency: Router.get_opt(unquote(opts), :default_consistency, :eventual), returning: Router.get_default_dispatch_return(unquote(opts)), timeout: 5_000, - lifespan: DefaultLifespan, + lifespan: + if unquote(app_module) do + Application.compile_env( + unquote(otp_app), + [unquote(app_module), :aggregate_lifespan], + DefaultLifespan + ) + else + DefaultLifespan + end, metadata: %{}, retry_attempts: 10 ] @@ -578,11 +591,32 @@ defmodule Commanded.Commands.Router do end # Catch unregistered commands, log and return an error. - defp do_dispatch(command, _opts) do + defp do_dispatch(command, opts) do + event_prefix = [:commanded, :application, :dispatch] + application = Keyword.fetch!(opts, :application) + + context = %Commanded.Aggregates.ExecutionContext{ + command: command + } + + telemetry_metadata = %{ + application: application, + error: nil, + execution_context: context + } + + start_time = Telemetry.start(event_prefix, telemetry_metadata) + Logger.error(fn -> "attempted to dispatch an unregistered command: " <> inspect(command) end) + Telemetry.stop( + event_prefix, + start_time, + Map.put(telemetry_metadata, :error, :unregistered_command) + ) + {:error, :unregistered_command} end diff --git a/lib/commanded/event/error_handler.ex b/lib/commanded/event/error_handler.ex new file mode 100644 index 00000000..0b0124c1 --- /dev/null +++ b/lib/commanded/event/error_handler.ex @@ -0,0 +1,37 @@ +defmodule Commanded.Event.ErrorHandler do + alias Commanded.Event.FailureContext + + @doc """ + Stop the `Commanded.Event.Handler` with the reason given. + """ + def stop_on_error({:error, reason}, _failed_event, _failure_context) do + {:stop, reason} + end + + @doc """ + Backoff exponentially when an error is encountered. + * Adds up to 1 second of jitter. + * Minimum of 1 second delay + * Maximum of 24 hour delay + """ + def backoff(_error, _event, %FailureContext{} = failure_context, opts \\ []) do + jitter_fn = Keyword.get(opts, :jitter_fn, &one_second_jitter/0) + + %FailureContext{context: context} = failure_context + context = Map.update(context, :failures, 1, &(&1 + 1)) + failures = Map.fetch!(context, :failures) + + base_delay = failures ** 2 * 1000 + delay = base_delay + jitter_fn.() + delay = max(delay, :timer.seconds(1)) + delay = min(delay, :timer.hours(24)) + + failure_context = %{failure_context | context: context} + + {:retry, delay, failure_context} + end + + defp one_second_jitter do + :rand.uniform(1000) + end +end diff --git a/lib/commanded/event/handler.ex b/lib/commanded/event/handler.ex index 3967f7e3..cad02a5a 100644 --- a/lib/commanded/event/handler.ex +++ b/lib/commanded/event/handler.ex @@ -404,6 +404,7 @@ defmodule Commanded.Event.Handler do require Logger + alias Commanded.Event.ErrorHandler alias Commanded.Event.FailureContext alias Commanded.Event.Handler alias Commanded.Event.Upcast @@ -417,14 +418,17 @@ defmodule Commanded.Event.Handler do @type subscribe_from :: :origin | :current | non_neg_integer() @type consistency :: :eventual | :strong + @doc deprecated: "Use the after_start/1 callback instead." + @callback init() :: :ok | {:stop, reason :: any()} + @doc """ Optional initialisation callback function called when the handler starts. Can be used to start any related processes when the event handler is started. - This callback function must return `:ok`, or `{:stop, reason}` to stop the - handler process. Any other return value will terminate the event handler with - an error. + This callback function must return `:ok`, `{:ok, state}` to return new state, + or `{:stop, reason}` to stop the handler process. Any other return value + will terminate the event handler with an error. ### Example @@ -434,8 +438,9 @@ defmodule Commanded.Event.Handler do name: "ExampleHandler" # Optional initialisation - def init do - :ok + def after_start(handler_state) do + new_handler_state = Map.put(handler_state, :foo, "bar") + {:ok, new_handler_state} end def handle(%AnEvent{..}, _metadata) do @@ -445,7 +450,8 @@ defmodule Commanded.Event.Handler do end """ - @callback init() :: :ok | {:stop, reason :: any()} + @callback after_start(handler_state :: term()) :: + :ok | {:ok, state :: map()} | {:stop, reason :: any()} @doc """ Optional callback function called to configure the handler before it starts. @@ -606,7 +612,19 @@ defmodule Commanded.Event.Handler do """ @callback partition_by(domain_event, metadata) :: any() - @optional_callbacks init: 0, init: 1, error: 3, partition_by: 2, handle: 2, handle_batch: 1 + @doc """ + Called before an event handler gets reset + """ + + @callback before_reset() :: :ok + + @optional_callbacks init: 0, + init: 1, + error: 3, + partition_by: 2, + before_reset: 0, + handle: 2, + handle_batch: 1 defmacro __using__(using_opts) do quote location: :keep do @@ -694,7 +712,14 @@ defmodule Commanded.Event.Handler do end @doc false - def init, do: :ok + def after_start(_state) do + # TODO: remove this when we remove init/0 + if function_exported?(__MODULE__, :init, 0) do + apply(__MODULE__, :init, []) + else + :ok + end + end @doc false def init(config), do: {:ok, config} @@ -702,7 +727,7 @@ defmodule Commanded.Event.Handler do @doc false def before_reset, do: :ok - defoverridable init: 0, init: 1, before_reset: 0 + defoverridable init: 1, after_start: 1, before_reset: 0 end end @@ -909,12 +934,20 @@ defmodule Commanded.Event.Handler do %Handler{handler_module: handler_module} = state - case handler_module.init() do + if function_exported?(handler_module, :init, 0) do + Logger.warning("#{inspect(handler_module)}.init/0 is deprecated, use after_start/1 instead") + end + + case handler_module.after_start(state.handler_state) do :ok -> {:noreply, state} + {:ok, %{} = new_handler_state} -> + new_state = %{state | handler_state: new_handler_state} + {:noreply, new_state} + {:stop, reason} -> - Logger.debug(describe(state) <> " `init/0` callback has requested to stop") + Logger.debug(describe(state) <> " `after_start/1` callback has requested to stop") {:stop, reason, state} end @@ -1249,9 +1282,7 @@ defmodule Commanded.Event.Handler do nil -> nil end - %Handler{handler_module: handler_module} = state - - case handler_module.error(error, data, failure_context) do + case on_error(state, error, data, failure_context) do {:retry, %FailureContext{context: context}} when is_map(context) -> # Retry the failed event Logger.info(describe(state) <> " is retrying failed event") @@ -1302,6 +1333,25 @@ defmodule Commanded.Event.Handler do end end + defp on_error(%Handler{} = state, error, data, failure_context) do + %Handler{application: application, handler_module: handler_module} = state + + if function_exported?(handler_module, :error, 3) do + handler_module.error(error, data, failure_context) + else + case Commanded.Application.on_event_handler_error(application) do + default when default in [nil, :stop] -> + ErrorHandler.stop_on_error(error, data, failure_context) + + :backoff -> + ErrorHandler.backoff(error, data, failure_context) + + module when is_atom(module) -> + module.error(error, data, failure_context) + end + end + end + defp log_event_error(error, %RecordedEvent{} = failed_event, %Handler{} = state) do reason = case error do diff --git a/lib/commanded/event_store/adapters/in_memory.ex b/lib/commanded/event_store/adapters/in_memory.ex index 8e01f708..edfa0da2 100644 --- a/lib/commanded/event_store/adapters/in_memory.ex +++ b/lib/commanded/event_store/adapters/in_memory.ex @@ -506,9 +506,15 @@ defmodule Commanded.EventStore.Adapters.InMemory do Logger.debug(fn -> "Acknowleding event ##{event_number}" end) update_persistent_subscription(state, pid, fn %PersistentSubscription{} = subscription -> - subscription = PersistentSubscription.ack(subscription, event_number) + case PersistentSubscription.ack(subscription, event_number) do + %PersistentSubscription{} = subscription -> + publish_events(state, subscription) - publish_events(state, subscription) + {:error, :unexpected_ack} -> + # We tried to ack an event but there is no matching in-flight event + # I *think* it's okay to ignore this and leave the subscription as is + subscription + end end) end diff --git a/lib/commanded/event_store/event_data.ex b/lib/commanded/event_store/event_data.ex index 25457d3e..3191b04e 100644 --- a/lib/commanded/event_store/event_data.ex +++ b/lib/commanded/event_store/event_data.ex @@ -7,7 +7,7 @@ defmodule Commanded.EventStore.EventData do @type uuid :: String.t() @type t :: %Commanded.EventStore.EventData{ - causation_id: uuid(), + causation_id: uuid() | nil, correlation_id: uuid(), event_type: String.t(), data: struct(), diff --git a/lib/commanded/process_managers/process_manager.ex b/lib/commanded/process_managers/process_manager.ex index f4ee27a6..f41de7a2 100644 --- a/lib/commanded/process_managers/process_manager.ex +++ b/lib/commanded/process_managers/process_manager.ex @@ -322,7 +322,7 @@ defmodule Commanded.ProcessManagers.ProcessManager do @callback init(config :: Keyword.t()) :: {:ok, Keyword.t()} @doc """ - Is the process manager interested in the given command? + Is the process manager interested in the given event? See `c:interested?/2` for details. """ @@ -335,7 +335,7 @@ defmodule Commanded.ProcessManagers.ProcessManager do | false @doc """ - Is the process manager interested in the given command? + Is the process manager interested in the given event? The `c:interested?/2` function is used to indicate which events the process manager receives. The response is used to route the event to an existing diff --git a/lib/commanded/process_managers/process_manager_instance.ex b/lib/commanded/process_managers/process_manager_instance.ex index 37728830..3a7f193d 100644 --- a/lib/commanded/process_managers/process_manager_instance.ex +++ b/lib/commanded/process_managers/process_manager_instance.ex @@ -6,7 +6,7 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do require Logger alias Commanded.{Application, EventStore, Telemetry} - alias Commanded.EventStore.{RecordedEvent, SnapshotData} + alias Commanded.EventStore.{RecordedEvent, SnapshotData, TypeProvider} alias Commanded.ProcessManagers.{FailureContext, ProcessRouter} defmodule State do @@ -156,7 +156,9 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do @doc false @impl GenServer def handle_info(message, state) do - Logger.error(fn -> describe(state) <> " received unexpected message: " <> inspect(message) end) + Logger.error(fn -> + describe(state) <> " received unexpected message: " <> inspect(message) + end) {:noreply, state} end @@ -551,14 +553,13 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do defp persist_state(source_version, %State{} = state) do %State{ application: application, - process_manager_module: process_manager_module, process_state: process_state } = state snapshot = %SnapshotData{ source_uuid: snapshot_uuid(state), source_version: source_version, - source_type: Atom.to_string(process_manager_module), + source_type: TypeProvider.to_string(process_state), data: process_state } diff --git a/lib/commanded/registration.ex b/lib/commanded/registration.ex index 098f6f67..a9f8e6c8 100644 --- a/lib/commanded/registration.ex +++ b/lib/commanded/registration.ex @@ -83,15 +83,7 @@ defmodule Commanded.Registration do @before_compile unquote(__MODULE__) alias unquote(__MODULE__) - end - end - @doc """ - Allow a registry adapter to handle the standard `GenServer` callback - functions. - """ - defmacro __before_compile__(_env) do - quote generated: true, location: :keep do @doc false def handle_call(request, from, state) do adapter = registry_adapter(state) @@ -106,6 +98,16 @@ defmodule Commanded.Registration do adapter.handle_cast(request, state) end + defoverridable(handle_call: 3, handle_cast: 2) + end + end + + @doc """ + Allow a registry adapter to handle the standard `GenServer` callback + functions. + """ + defmacro __before_compile__(_env) do + quote generated: true, location: :keep do @doc false def handle_info(msg, state) do adapter = registry_adapter(state) diff --git a/mix.exs b/mix.exs index 16146d70..140c7267 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Commanded.Mixfile do use Mix.Project - @version "1.4.3" + @version "1.4.8" def project do [ @@ -70,7 +70,7 @@ defmodule Commanded.Mixfile do {:credo, "~> 1.7", only: [:dev, :test], runtime: false}, {:dialyxir, "~> 1.3", only: [:dev, :test], runtime: false}, {:ex_doc, ">= 0.0.0", only: :dev}, - {:local_cluster, "~> 1.2", only: :test, runtime: false}, + {:local_cluster, "~> 2.1", only: :test, runtime: false}, {:mix_test_watch, "~> 1.1", only: :dev}, {:mox, "~> 1.0", only: [:bench, :test]} ] @@ -103,6 +103,7 @@ defmodule Commanded.Mixfile do "guides/Serialization.md", "guides/Read Model Projections.md", "guides/Testing.md", + "guides/InMemoryEventStore.md", "guides/Deployment.md", "guides/upgrades/0.19-1.0.md": [ filename: "0.19-1.0", @@ -233,6 +234,7 @@ defmodule Commanded.Mixfile do maintainers: ["Ben Smith"], licenses: ["MIT"], links: %{ + "Changelog" => "https://hexdocs.pm/commanded/#{@version}/changelog.html", "GitHub" => "https://github.com/commanded/commanded" } ] diff --git a/mix.lock b/mix.lock index cec1b7b1..b6f27b2e 100644 --- a/mix.lock +++ b/mix.lock @@ -2,22 +2,23 @@ "backoff": {:hex, :backoff, "1.1.6", "83b72ed2108ba1ee8f7d1c22e0b4a00cfe3593a67dbc792799e8cce9f42f796b", [:rebar3], [], "hexpm", "cf0cfff8995fb20562f822e5cc47d8ccf664c5ecdc26a684cbe85c225f9d7c39"}, "benchfella": {:hex, :benchfella, "0.3.5", "b2122c234117b3f91ed7b43b6e915e19e1ab216971154acd0a80ce0e9b8c05f5", [:mix], [], "hexpm", "23f27cbc482cbac03fc8926441eb60a5e111759c17642bac005c3225f5eb809d"}, "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, - "credo": {:hex, :credo, "1.7.3", "05bb11eaf2f2b8db370ecaa6a6bda2ec49b2acd5e0418bc106b73b07128c0436", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "35ea675a094c934c22fb1dca3696f3c31f2728ae6ef5a53b5d648c11180a4535"}, - "dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"}, - "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, - "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, - "ex_doc": {:hex, :ex_doc, "0.31.1", "8a2355ac42b1cc7b2379da9e40243f2670143721dd50748bf6c3b1184dae2089", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3178c3a407c557d8343479e1ff117a96fd31bafe52a039079593fb0524ef61b0"}, - "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, + "credo": {:hex, :credo, "1.7.10", "6e64fe59be8da5e30a1b96273b247b5cf1cc9e336b5fd66302a64b25749ad44d", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "71fbc9a6b8be21d993deca85bf151df023a3097b01e09a2809d460348561d8cd"}, + "dialyxir": {:hex, :dialyxir, "1.4.5", "ca1571ac18e0f88d4ab245f0b60fa31ff1b12cbae2b11bd25d207f865e8ae78a", [:mix], [{:erlex, ">= 0.2.7", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b0fb08bb8107c750db5c0b324fa2df5ceaa0f9307690ee3c1f6ba5b9eb5d35c3"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.41", "ab34711c9dc6212dda44fcd20ecb87ac3f3fce6f0ca2f28d4a00e4154f8cd599", [:mix], [], "hexpm", "a81a04c7e34b6617c2792e291b5a2e57ab316365c2644ddc553bb9ed863ebefa"}, + "erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"}, + "ex_doc": {:hex, :ex_doc, "0.35.1", "de804c590d3df2d9d5b8aec77d758b00c814b356119b3d4455e4b8a8687aecaf", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "2121c6402c8d44b05622677b761371a759143b958c6c19f6558ff64d0aed40df"}, + "file_system": {:hex, :file_system, "1.0.1", "79e8ceaddb0416f8b8cd02a0127bdbababe7bf4a23d2a395b983c1f8b3f73edd", [:mix], [], "hexpm", "4414d1f38863ddf9120720cd976fce5bdde8e91d8283353f0e31850fa89feb9e"}, "global_flags": {:hex, :global_flags, "1.0.0", "ee6b864979a1fb38d1fbc67838565644baf632212bce864adca21042df036433", [:rebar3], [], "hexpm", "85d944cecd0f8f96b20ce70b5b16ebccedfcd25e744376b131e89ce61ba93176"}, - "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, - "local_cluster": {:hex, :local_cluster, "1.2.1", "8eab3b8a387680f0872eacfb1a8bd5a91cb1d4d61256eec6a655b07ac7030c73", [:mix], [{:global_flags, "~> 1.0", [hex: :global_flags, repo: "hexpm", optional: false]}], "hexpm", "aae80c9bc92c911cb0be085fdeea2a9f5b88f81b6bec2ff1fec244bb0acc232c"}, - "makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"}, - "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, - "makeup_erlang": {:hex, :makeup_erlang, "0.1.3", "d684f4bac8690e70b06eb52dad65d26de2eefa44cd19d64a8095e1417df7c8fd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "b78dc853d2e670ff6390b605d807263bf606da3c82be37f9d7f68635bd886fc9"}, - "mix_test_watch": {:hex, :mix_test_watch, "1.1.1", "eee6fc570d77ad6851c7bc08de420a47fd1e449ef5ccfa6a77ef68b72e7e51ad", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "f82262b54dee533467021723892e15c3267349849f1f737526523ecba4e6baae"}, - "mox": {:hex, :mox, "1.1.0", "0f5e399649ce9ab7602f72e718305c0f9cdc351190f72844599545e4996af73c", [:mix], [], "hexpm", "d44474c50be02d5b72131070281a5d3895c0e7a95c780e90bc0cfe712f633a13"}, + "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, + "local_cluster": {:hex, :local_cluster, "2.1.0", "1c847d69a927ef5a62db13236f93146e8a42377a9c9a5bb4cac3372cba69d683", [:mix], [{:global_flags, "~> 1.0", [hex: :global_flags, repo: "hexpm", optional: false]}], "hexpm", "dc1c3abb6fef00198dd53c855b39ea80c55b3a8059d8d9f17d50da46b1e3b858"}, + "makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"}, + "makeup_elixir": {:hex, :makeup_elixir, "1.0.0", "74bb8348c9b3a51d5c589bf5aebb0466a84b33274150e3b6ece1da45584afc82", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "49159b7d7d999e836bedaf09dcf35ca18b312230cf901b725a64f3f42e407983"}, + "makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"}, + "mix_test_watch": {:hex, :mix_test_watch, "1.2.0", "1f9acd9e1104f62f280e30fc2243ae5e6d8ddc2f7f4dc9bceb454b9a41c82b42", [:mix], [{:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "278dc955c20b3fb9a3168b5c2493c2e5cffad133548d307e0a50c7f2cfbf34f6"}, + "mox": {:hex, :mox, "1.2.0", "a2cd96b4b80a3883e3100a221e8adc1b98e4c3a332a8fc434c39526babafd5b3", [:mix], [{:nimble_ownership, "~> 1.0", [hex: :nimble_ownership, repo: "hexpm", optional: false]}], "hexpm", "c7b92b3cc69ee24a7eeeaf944cd7be22013c52fcb580c1f33f50845ec821089a"}, + "nimble_ownership": {:hex, :nimble_ownership, "1.0.0", "3f87744d42c21b2042a0aa1d48c83c77e6dd9dd357e425a038dd4b49ba8b79a1", [:mix], [], "hexpm", "7c16cc74f4e952464220a73055b557a273e8b1b7ace8489ec9d86e9ad56cb2cc"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, "phoenix_pubsub": {:hex, :phoenix_pubsub, "2.1.3", "3168d78ba41835aecad272d5e8cd51aa87a7ac9eb836eabc42f6e57538e3731d", [:mix], [], "hexpm", "bba06bc1dcfd8cb086759f0edc94a8ba2bc8896d5331a1e2c2902bf8e36ee502"}, - "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, - "telemetry_registry": {:hex, :telemetry_registry, "0.3.1", "14a3319a7d9027bdbff7ebcacf1a438f5f5c903057b93aee484cca26f05bdcba", [:mix, :rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6d0ca77b691cf854ed074b459a93b87f4c7f5512f8f7743c635ca83da81f939e"}, + "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, + "telemetry_registry": {:hex, :telemetry_registry, "0.3.2", "701576890320be6428189bff963e865e8f23e0ff3615eade8f78662be0fc003c", [:mix, :rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e7ed191eb1d115a3034af8e1e35e4e63d5348851d556646d46ca3d1b4e16bab9"}, } diff --git a/test/aggregates/aggregate_concurrency_test.exs b/test/aggregates/aggregate_concurrency_test.exs index bf302907..3d447896 100644 --- a/test/aggregates/aggregate_concurrency_test.exs +++ b/test/aggregates/aggregate_concurrency_test.exs @@ -79,7 +79,7 @@ defmodule Commanded.Aggregates.AggregateConcurrencyTest do :ok end) - assert {:ok, 3, _events} = + assert {:ok, 3, _events, _aggregate_state} = Aggregate.execute(MockedApp, BankAccount, account_number, context) assert Aggregate.aggregate_version(MockedApp, BankAccount, account_number) == 3 @@ -145,7 +145,8 @@ defmodule Commanded.Aggregates.AggregateConcurrencyTest do retry_attempts: 1 } - {:ok, 1, _events} = Aggregate.execute(MockedApp, BankAccount, account_number, context) + {:ok, 1, _events, _aggregate_state} = + Aggregate.execute(MockedApp, BankAccount, account_number, context) [ account_number: account_number diff --git a/test/aggregates/aggregate_lifespan_test.exs b/test/aggregates/aggregate_lifespan_test.exs index 149b3a32..f19e4a4c 100644 --- a/test/aggregates/aggregate_lifespan_test.exs +++ b/test/aggregates/aggregate_lifespan_test.exs @@ -1,7 +1,7 @@ defmodule Commanded.Aggregates.AggregateLifespanTest do use ExUnit.Case - alias Commanded.Aggregates.{DefaultLifespanRouter, LifespanAggregate, LifespanRouter} + alias Commanded.Aggregates.{Aggregate, DefaultLifespanRouter, LifespanAggregate, LifespanRouter} alias Commanded.Aggregates.LifespanAggregate.{Command, Event} alias Commanded.{DefaultApp, EventStore} alias Commanded.EventStore.RecordedEvent @@ -99,6 +99,64 @@ defmodule Commanded.Aggregates.AggregateLifespanTest do assert_receive {:DOWN, ^ref, :process, _pid, :normal} end + test "honours lifespan after a call to aggregate_state", %{ + aggregate_uuid: aggregate_uuid, + ref: ref, + reply_to: reply_to + } do + command = %Command{ + uuid: aggregate_uuid, + reply_to: reply_to, + action: :event, + lifespan: 500 + } + + :ok = LifespanRouter.dispatch(command, application: DefaultApp) + + %{lifespan: 500} = Aggregate.aggregate_state(DefaultApp, LifespanAggregate, aggregate_uuid) + + assert_receive {:DOWN, ^ref, :process, _pid, :normal} + end + + test "honours lifespan after a call to aggregate_version", %{ + aggregate_uuid: aggregate_uuid, + ref: ref, + reply_to: reply_to + } do + command = %Command{ + uuid: aggregate_uuid, + reply_to: reply_to, + action: :event, + lifespan: 500 + } + + :ok = LifespanRouter.dispatch(command, application: DefaultApp) + + 1 = Aggregate.aggregate_version(DefaultApp, LifespanAggregate, aggregate_uuid) + + assert_receive {:DOWN, ^ref, :process, _pid, :normal} + end + + test "honours lifespan after an info message is handled", %{ + aggregate_uuid: aggregate_uuid, + pid: pid, + ref: ref, + reply_to: reply_to + } do + command = %Command{ + uuid: aggregate_uuid, + reply_to: reply_to, + action: :event, + lifespan: 500 + } + + :ok = LifespanRouter.dispatch(command, application: DefaultApp) + + send(pid, :unexpected_message) + + assert_receive {:DOWN, ^ref, :process, _pid, :normal} + end + test "should call `after_command/1` callback function when no domain events", %{ aggregate_uuid: aggregate_uuid, reply_to: reply_to @@ -176,20 +234,30 @@ defmodule Commanded.Aggregates.AggregateLifespanTest do ref: ref, reply_to: reply_to } do + lifespan = 100 + command = %Command{ uuid: aggregate_uuid, action: :noop, reply_to: reply_to, - lifespan: 25 + lifespan: lifespan } - :ok = LifespanRouter.dispatch(command, application: DefaultApp) - :ok = LifespanRouter.dispatch(command, application: DefaultApp) + {elapsed_usec, _} = + :timer.tc(fn -> + :ok = LifespanRouter.dispatch(command, application: DefaultApp) + :ok = LifespanRouter.dispatch(command, application: DefaultApp) - assert_receive :after_command - assert_receive :after_command + assert_receive :after_command + assert_receive :after_command + end) + + # after dispatching and receiving, figure out how much time is left in the lifespan + elapsed = ceil(elapsed_usec / 1000) + remaining_lifespan = lifespan - elapsed - refute_receive {:DOWN, ^ref, :process, _pid, _reason}, 25 + # the aggregate should not shutdown until after the remaining lifespan has elapsed + refute_receive {:DOWN, ^ref, :process, _pid, _reason}, remaining_lifespan assert_receive {:DOWN, ^ref, :process, _pid, :normal} end diff --git a/test/aggregates/aggregate_state_test.exs b/test/aggregates/aggregate_state_test.exs index 5a3f740b..cf7a131b 100644 --- a/test/aggregates/aggregate_state_test.exs +++ b/test/aggregates/aggregate_state_test.exs @@ -89,7 +89,7 @@ defmodule Commanded.Aggregates.AggregateStateTest do {:ok, ^aggregate_uuid} = Supervisor.open_aggregate(DefaultApp, @aggregate_module, aggregate_uuid) - {:ok, _count, _events} = + {:ok, _count, _events, _aggregate_state} = Aggregate.execute(DefaultApp, @aggregate_module, aggregate_uuid, execution_context) end end diff --git a/test/aggregates/aggregate_subscription_test.exs b/test/aggregates/aggregate_subscription_test.exs index 411ece31..54ea430d 100644 --- a/test/aggregates/aggregate_subscription_test.exs +++ b/test/aggregates/aggregate_subscription_test.exs @@ -88,7 +88,8 @@ defmodule Commanded.Aggregates.AggregateSubscriptionTest do retry_attempts: 1 } - {:ok, 1, _events} = Aggregate.execute(DefaultApp, BankAccount, account_number, context) + {:ok, 1, _events, _aggregate_state} = + Aggregate.execute(DefaultApp, BankAccount, account_number, context) [ account_number: account_number diff --git a/test/aggregates/aggregate_telemetry_test.exs b/test/aggregates/aggregate_telemetry_test.exs index 608e6e08..b884cdab 100644 --- a/test/aggregates/aggregate_telemetry_test.exs +++ b/test/aggregates/aggregate_telemetry_test.exs @@ -63,7 +63,7 @@ defmodule Commanded.Aggregates.AggregateTelemetryTest do self = self() - {:ok, 1, _events} = GenServer.call(pid, {:execute_command, context}) + {:ok, 1, _events, _aggregate_state} = GenServer.call(pid, {:execute_command, context}) assert_receive {[:commanded, :aggregate, :execute, :start], measurements, metadata} @@ -96,7 +96,7 @@ defmodule Commanded.Aggregates.AggregateTelemetryTest do self = self() - {:ok, 1, events} = GenServer.call(pid, {:execute_command, context}) + {:ok, 1, events, _aggregate_state} = GenServer.call(pid, {:execute_command, context}) assert_receive {[:commanded, :aggregate, :execute, :start], _measurements, _metadata} assert_receive {[:commanded, :aggregate, :execute, :stop], measurements, metadata} diff --git a/test/aggregates/event_persistence_test.exs b/test/aggregates/event_persistence_test.exs index 094124ac..869b6579 100644 --- a/test/aggregates/event_persistence_test.exs +++ b/test/aggregates/event_persistence_test.exs @@ -20,7 +20,7 @@ defmodule Commanded.Aggregates.EventPersistenceTest do {:ok, ^aggregate_uuid} = Commanded.Aggregates.Supervisor.open_aggregate(DefaultApp, ExampleAggregate, aggregate_uuid) - {:ok, 10, events} = + {:ok, 10, events, _aggregate_state} = Aggregate.execute(DefaultApp, ExampleAggregate, aggregate_uuid, %ExecutionContext{ command: %AppendItems{count: 10}, handler: AppendItemsHandler, @@ -45,7 +45,7 @@ defmodule Commanded.Aggregates.EventPersistenceTest do {:ok, ^aggregate_uuid} = Commanded.Aggregates.Supervisor.open_aggregate(DefaultApp, ExampleAggregate, aggregate_uuid) - {:ok, 1, events} = + {:ok, 1, events, _aggregate_state} = Aggregate.execute(DefaultApp, ExampleAggregate, aggregate_uuid, %ExecutionContext{ command: %AppendItems{count: 1}, handler: AppendItemsHandler, @@ -54,7 +54,7 @@ defmodule Commanded.Aggregates.EventPersistenceTest do assert length(events) == 1 - {:ok, 1, events} = + {:ok, 1, events, _aggregate_state} = Aggregate.execute(DefaultApp, ExampleAggregate, aggregate_uuid, %ExecutionContext{ command: %NoOp{}, handler: ExampleAggregate, @@ -82,7 +82,9 @@ defmodule Commanded.Aggregates.EventPersistenceTest do function: :handle } - {:ok, 10, events} = Aggregate.execute(DefaultApp, ExampleAggregate, aggregate_uuid, context) + {:ok, 10, events, _aggregate_state} = + Aggregate.execute(DefaultApp, ExampleAggregate, aggregate_uuid, context) + assert length(events) == 10 recorded_events = EventStore.stream_forward(DefaultApp, aggregate_uuid, 0) |> Enum.to_list() @@ -98,7 +100,7 @@ defmodule Commanded.Aggregates.EventPersistenceTest do {:ok, ^aggregate_uuid} = Commanded.Aggregates.Supervisor.open_aggregate(DefaultApp, ExampleAggregate, aggregate_uuid) - {:ok, 10, events} = + {:ok, 10, events, _aggregate_state} = Aggregate.execute(DefaultApp, ExampleAggregate, aggregate_uuid, %ExecutionContext{ command: %AppendItems{count: 10}, handler: AppendItemsHandler, @@ -127,21 +129,21 @@ defmodule Commanded.Aggregates.EventPersistenceTest do {:ok, ^aggregate_uuid} = Commanded.Aggregates.Supervisor.open_aggregate(DefaultApp, ExampleAggregate, aggregate_uuid) - {:ok, 100, _events} = + {:ok, 100, _events, _aggregate_state} = Aggregate.execute(DefaultApp, ExampleAggregate, aggregate_uuid, %ExecutionContext{ command: %AppendItems{count: 100}, handler: AppendItemsHandler, function: :handle }) - {:ok, 200, _events} = + {:ok, 200, _events, _aggregate_state} = Aggregate.execute(DefaultApp, ExampleAggregate, aggregate_uuid, %ExecutionContext{ command: %AppendItems{count: 100}, handler: AppendItemsHandler, function: :handle }) - {:ok, 201, _events} = + {:ok, 201, _events, _aggregate_state} = Aggregate.execute(DefaultApp, ExampleAggregate, aggregate_uuid, %ExecutionContext{ command: %AppendItems{count: 1}, handler: AppendItemsHandler, @@ -180,7 +182,7 @@ defmodule Commanded.Aggregates.EventPersistenceTest do function: :handle } - {:ok, 1, events} = + {:ok, 1, events, _aggregate_state} = Aggregate.execute(DefaultApp, ExampleAggregate, prefixed_aggregate_uuid, context) assert length(events) == 1 diff --git a/test/aggregates/execute_command_test.exs b/test/aggregates/execute_command_test.exs index bdf20ea0..44885721 100644 --- a/test/aggregates/execute_command_test.exs +++ b/test/aggregates/execute_command_test.exs @@ -24,7 +24,8 @@ defmodule Commanded.Aggregates.ExecuteCommandTest do command = %OpenAccount{account_number: account_number, initial_balance: 1_000} context = %ExecutionContext{command: command, handler: BankAccount, function: :open_account} - {:ok, 1, events} = Aggregate.execute(BankApp, BankAccount, account_number, context) + {:ok, 1, events, _aggregate_state} = + Aggregate.execute(BankApp, BankAccount, account_number, context) assert events == [%BankAccountOpened{account_number: account_number, initial_balance: 1_000}] @@ -50,7 +51,8 @@ defmodule Commanded.Aggregates.ExecuteCommandTest do command = %OpenAccount{account_number: account_number, initial_balance: 1_000} context = %ExecutionContext{command: command, handler: OpenAccountHandler, function: :handle} - {:ok, 1, events} = Aggregate.execute(BankApp, BankAccount, account_number, context) + {:ok, 1, events, _aggregate_state} = + Aggregate.execute(BankApp, BankAccount, account_number, context) assert events == [%BankAccountOpened{account_number: account_number, initial_balance: 1_000}] @@ -76,7 +78,8 @@ defmodule Commanded.Aggregates.ExecuteCommandTest do command = %OpenAccount{account_number: account_number, initial_balance: 1_000} context = %ExecutionContext{command: command, handler: OpenAccountHandler, function: :handle} - {:ok, 1, _events} = Aggregate.execute(BankApp, BankAccount, account_number, context) + {:ok, 1, _events, _aggregate_state} = + Aggregate.execute(BankApp, BankAccount, account_number, context) state_before = Aggregate.aggregate_state(BankApp, BankAccount, account_number) @@ -133,7 +136,7 @@ defmodule Commanded.Aggregates.ExecuteCommandTest do defp assert_no_events(command_fun) do id = UUID.uuid4() - assert {:ok, 0, []} = execute_aggregate_command(id, command_fun) + assert {:ok, 0, [], _aggregate_state} = execute_aggregate_command(id, command_fun) end defp assert_event_result(command_fun) do @@ -141,7 +144,8 @@ defmodule Commanded.Aggregates.ExecuteCommandTest do id = UUID.uuid4() - assert {:ok, 1, [%Event{id: ^id}]} = execute_aggregate_command(id, command_fun) + assert {:ok, 1, [%Event{id: ^id}], _aggregate_state} = + execute_aggregate_command(id, command_fun) end defp execute_aggregate_command(id, command_fun) do diff --git a/test/aggregates/snapshotting_test.exs b/test/aggregates/snapshotting_test.exs index d7a7cb9d..62a75400 100644 --- a/test/aggregates/snapshotting_test.exs +++ b/test/aggregates/snapshotting_test.exs @@ -280,7 +280,7 @@ defmodule Commanded.Aggregates.SnapshottingTest do {:ok, ^aggregate_uuid} = Supervisor.open_aggregate(DefaultApp, SnapshotAggregate, aggregate_uuid) - {:ok, _count, _events} = + {:ok, _count, _events, _aggregate_state} = Aggregate.execute(DefaultApp, SnapshotAggregate, aggregate_uuid, execution_context) end end @@ -315,7 +315,7 @@ defmodule Commanded.Aggregates.SnapshottingTest do {:ok, ^aggregate_uuid} = Supervisor.open_aggregate(DefaultApp, ExampleAggregate, aggregate_uuid) - {:ok, _count, _events} = + {:ok, _count, _events, _aggregate_state} = Aggregate.execute(DefaultApp, ExampleAggregate, aggregate_uuid, execution_context) end diff --git a/test/application/application_test.exs b/test/application/application_test.exs index 2199b100..4760dbb9 100644 --- a/test/application/application_test.exs +++ b/test/application/application_test.exs @@ -64,13 +64,15 @@ defmodule Commanded.ApplicationTest do test "should fail to start unconfigured application" do Process.flag(:trap_exit, true) - UnconfiguredApplication.start_link() + {:error, reason} = UnconfiguredApplication.start_link() - assert_receive {:EXIT, _pid, - {%ArgumentError{ - message: - "missing :event_store config for application Commanded.UnconfiguredApplication" - }, _}} + assert match?( + {%ArgumentError{ + message: + "missing :event_store config for application Commanded.UnconfiguredApplication" + }, _}, + reason + ) end end end diff --git a/test/application/telemetry_test.exs b/test/application/telemetry_test.exs index fba16ae5..e325aab5 100644 --- a/test/application/telemetry_test.exs +++ b/test/application/telemetry_test.exs @@ -2,6 +2,7 @@ defmodule Commanded.Application.TelemetryTest do use ExUnit.Case alias Commanded.DefaultApp + alias Commanded.Middleware.Commands.Fail alias Commanded.Middleware.Commands.IncrementCount alias Commanded.Middleware.Commands.RaiseError alias Commanded.UUID @@ -58,6 +59,20 @@ defmodule Commanded.Application.TelemetryTest do meta end + test "emit `[:commanded, :application, :dispatch, :start | :stop]` event on unregistered command" do + command = %Fail{aggregate_uuid: UUID.uuid4()} + error = :unregistered_command + + assert {:error, ^error} = TestRouter.dispatch(command, application: DefaultApp) + + assert_receive {[:commanded, :application, :dispatch, :start], 1, _meas, _meta} + + assert_receive {[:commanded, :application, :dispatch, :stop], 2, _meas, meta} + + assert %{application: DefaultApp, error: ^error, execution_context: %{command: ^command}} = + meta + end + defp attach_telemetry do agent = start_supervised!({Agent, fn -> 1 end}) diff --git a/test/commanded/event/error_handler_test.exs b/test/commanded/event/error_handler_test.exs new file mode 100644 index 00000000..128fd4f4 --- /dev/null +++ b/test/commanded/event/error_handler_test.exs @@ -0,0 +1,126 @@ +defmodule Commanded.Event.ErrorHandlerTest do + use ExUnit.Case + + alias Commanded.Event.ErrorHandler + alias Commanded.Event.FailureContext + + defmodule Event do + defstruct [:id] + end + + test "Stop on error" do + context = %FailureContext{} + error = {:error, "an error"} + + result = ErrorHandler.stop_on_error(error, an_event(), context) + assert result == {:stop, "an error"} + end + + describe "backoff" do + test "the first delay is 1 second" do + context = %FailureContext{context: %{}} + + {:retry, delay, context} = + ErrorHandler.backoff(an_error(), an_event(), context, jitter_fn: no_jitter()) + + assert context.context.failures == 1 + assert delay == :timer.seconds(1) + end + + test "applies jitter to delay" do + jitter_fn = fn -> 234 end + + {:retry, delay, _} = + ErrorHandler.backoff(an_error(), an_event(), failure_context(), jitter_fn: jitter_fn) + + assert delay == 1234 + end + + test "backs off exponentially-ish" do + failure_context = %FailureContext{context: %{}} + + expectations = [ + {1, 1_000}, + {2, 4_000}, + {3, 9_000}, + {4, 16_000}, + {5, 25_000}, + {6, 36_000}, + {7, 49_000}, + {8, 64_000}, + {9, 81_000}, + {10, 100_000} + ] + + Enum.reduce(expectations, failure_context, fn {expected_failures, expected_delay}, ctx -> + {:retry, actual_delay, context} = + ErrorHandler.backoff(an_error(), an_event(), ctx, jitter_fn: no_jitter()) + + assert actual_delay == expected_delay + actual_failures = Map.fetch!(context.context, :failures) + assert actual_failures == expected_failures + + context + end) + end + + test "maxes out at 1 day" do + # We should hit the max after 294 failures + context = %FailureContext{context: %{failures: 500}} + + {:retry, actual_delay, _context} = + ErrorHandler.backoff(an_error(), an_event(), context, jitter_fn: no_jitter()) + + assert actual_delay == :timer.hours(24) + end + + test "retains failure_context" do + failure_context = %FailureContext{ + application: MyApp.CommandedApp, + handler_name: "MyApp.FailingEventHandler", + handler_state: nil, + context: %{}, + metadata: %{ + application: MyApp.CommandedApp, + causation_id: "49d9a83c-8bcd-4fa9-9ccb-3c196717415c", + correlation_id: "e9c3b49e-0ac5-44db-8ddb-83835a7c9437", + created_at: ~U[2024-06-25 20:55:24.576545Z], + event_id: "0f117f39-3b2f-491e-b39d-9325fd1d19d1", + event_number: 1, + handler_name: "MyApp.FailingEventHandler", + state: nil, + stream_id: "2iO5kHYbIAGW1rrZ7FeHCRMvbT5", + stream_version: 1 + }, + stacktrace: nil + } + + assert {:retry, _delay, ctx} = ErrorHandler.backoff(an_error(), an_event(), failure_context) + + assert ctx == %FailureContext{ + application: MyApp.CommandedApp, + handler_name: "MyApp.FailingEventHandler", + handler_state: nil, + context: %{failures: 1}, + metadata: %{ + application: MyApp.CommandedApp, + causation_id: "49d9a83c-8bcd-4fa9-9ccb-3c196717415c", + correlation_id: "e9c3b49e-0ac5-44db-8ddb-83835a7c9437", + created_at: ~U[2024-06-25 20:55:24.576545Z], + event_id: "0f117f39-3b2f-491e-b39d-9325fd1d19d1", + event_number: 1, + handler_name: "MyApp.FailingEventHandler", + state: nil, + stream_id: "2iO5kHYbIAGW1rrZ7FeHCRMvbT5", + stream_version: 1 + }, + stacktrace: nil + } + end + end + + defp an_event, do: %Event{id: 123} + defp an_error, do: {:error, "Invalid pizza toppings"} + defp no_jitter, do: fn -> 0 end + defp failure_context, do: %FailureContext{context: %{}} +end diff --git a/test/event/event_handler_error_handling_test.exs b/test/event/event_handler_error_handling_test.exs index fb5f2d01..079d61fc 100644 --- a/test/event/event_handler_error_handling_test.exs +++ b/test/event/event_handler_error_handling_test.exs @@ -3,6 +3,10 @@ defmodule Commanded.Event.EventHandlerErrorHandlingTest do import ExUnit.CaptureLog + alias Commanded.Application.Config, as: AppConfig + + alias Commanded.DefaultApp + alias Commanded.Event.ErrorAggregate.Events.{ ErrorEvent, ExceptionEvent, @@ -12,6 +16,8 @@ defmodule Commanded.Event.EventHandlerErrorHandlingTest do alias Commanded.Event.ErrorEventHandler alias Commanded.Event.FailureContext alias Commanded.Event.Handler + alias Commanded.Event.SimpleErrorEventHandler + alias Commanded.Event.ThreeStrikesErrorHandler alias Commanded.Helpers.EventFactory setup do @@ -25,6 +31,53 @@ defmodule Commanded.Event.EventHandlerErrorHandlingTest do ] end + describe "Configured error handling" do + setup [:listen_for_telemetry_events, :start_simple_error_handler] + + test ":stop stops the handler", %{handler: handler, ref: ref} do + AppConfig.__put__(DefaultApp, :on_event_handler_error, :stop) + + send_error_event(handler) + + assert_receive {:DOWN, ^ref, :process, ^handler, :failed} + end + + test ":backoff delays the next attempt", %{handler: handler} do + AppConfig.__put__(DefaultApp, :on_event_handler_error, :backoff) + + # When we sent the error event + send_error_event(handler) + + # Then the first attempt and failure occur + assert_receive {[:commanded, :event, :handle, :start], 1, _, %{context: %{}}} + assert_receive {[:commanded, :event, :handle, :stop], 2, _, %{context: %{}}} + + # And then the next one is received up to 2 seconds later + assert_receive {[:commanded, :event, :handle, :start], 3, _, %{context: %{failures: 1}}}, + 2100 + + assert_receive {[:commanded, :event, :handle, :stop], 4, _, %{context: %{failures: 1}}}, + 2100 + end + + test "error handler can be a custom module", %{handler: handler, ref: ref} do + AppConfig.__put__(DefaultApp, :on_event_handler_error, ThreeStrikesErrorHandler) + + send_error_event(handler) + + assert_receive {[:commanded, :event, :handle, :start], 1, _, %{context: %{}}} + assert_receive {[:commanded, :event, :handle, :stop], 2, _, %{context: %{}}} + + assert_receive {[:commanded, :event, :handle, :start], 3, _, %{context: %{attempts: 1}}} + assert_receive {[:commanded, :event, :handle, :stop], 4, _, %{context: %{attempts: 1}}} + + assert_receive {[:commanded, :event, :handle, :start], 5, _, %{context: %{attempts: 2}}} + assert_receive {[:commanded, :event, :handle, :stop], 6, _, %{context: %{attempts: 2}}} + + assert_receive {:DOWN, ^ref, :process, ^handler, :too_many} + end + end + describe "event handling exception handling" do test "should print the stack trace", %{handler: handler, ref: ref} do send_error_message = fn -> @@ -180,4 +233,45 @@ defmodule Commanded.Event.EventHandlerErrorHandlingTest do end defp reply_to, do: :erlang.pid_to_list(self()) + + def listen_for_telemetry_events(_) do + agent = start_supervised!({Agent, fn -> 1 end}) + handler_id = :"#{__MODULE__}-handler" + + events = [ + [:commanded, :event, :handle, :start], + [:commanded, :event, :handle, :stop], + [:commanded, :event, :handle, :exception] + ] + + increment = fn n -> {n, n + 1} end + + :telemetry.attach_many( + handler_id, + events, + fn event_name, measurements, metadata, reply_to -> + if Process.alive?(agent) do + num = Agent.get_and_update(agent, increment) + send(reply_to, {event_name, num, measurements, metadata}) + end + end, + self() + ) + + on_exit(fn -> + :telemetry.detach(handler_id) + end) + end + + defp start_simple_error_handler(_) do + start_supervised!(DefaultApp) + handler = start_supervised!(SimpleErrorEventHandler) + true = Process.unlink(handler) + ref = Process.monitor(handler) + + [ + handler: handler, + ref: ref + ] + end end diff --git a/test/event/reset_event_handler_test.exs b/test/event/reset_event_handler_test.exs index 925d5fd6..a992fec7 100644 --- a/test/event/reset_event_handler_test.exs +++ b/test/event/reset_event_handler_test.exs @@ -38,6 +38,7 @@ defmodule Commanded.Event.ResetEventHandlerTest do end) end + @tag :skip test "should be reset when starting from `:current`" do stream_uuid = UUID.uuid4() diff --git a/test/event/support/appending_event_handler.ex b/test/event/support/appending_event_handler.ex index 989a4406..ced4da2e 100644 --- a/test/event/support/appending_event_handler.ex +++ b/test/event/support/appending_event_handler.ex @@ -5,7 +5,7 @@ defmodule Commanded.Event.AppendingEventHandler do application: Commanded.DefaultApp, name: __MODULE__ - def init do + def after_start(_state) do with {:ok, _pid} <- Agent.start_link(fn -> %{events: [], metadata: []} end, name: __MODULE__) do :ok end diff --git a/test/event/support/concurrency/concurrent_event_handler.ex b/test/event/support/concurrency/concurrent_event_handler.ex index 017304af..7d449697 100644 --- a/test/event/support/concurrency/concurrent_event_handler.ex +++ b/test/event/support/concurrency/concurrent_event_handler.ex @@ -8,7 +8,7 @@ defmodule Commanded.Event.ConcurrentEventHandler do concurrency: 5 @impl Commanded.Event.Handler - def init do + def after_start(_state) do Process.send(:test, {:init, self()}, []) end diff --git a/test/event/support/concurrency/partition_event_handler.ex b/test/event/support/concurrency/partition_event_handler.ex index fe7a005f..b1ad10c6 100644 --- a/test/event/support/concurrency/partition_event_handler.ex +++ b/test/event/support/concurrency/partition_event_handler.ex @@ -8,7 +8,7 @@ defmodule Commanded.Event.PartitionEventHandler do concurrency: 5 @impl Commanded.Event.Handler - def init do + def after_start(_state) do Process.send(:test, {:init, self()}, []) end diff --git a/test/event/support/error/simple_error_event_handler.ex b/test/event/support/error/simple_error_event_handler.ex new file mode 100644 index 00000000..7798d43a --- /dev/null +++ b/test/event/support/error/simple_error_event_handler.ex @@ -0,0 +1,27 @@ +defmodule Commanded.Event.SimpleErrorEventHandler do + @moduledoc false + use Commanded.Event.Handler, + application: Commanded.DefaultApp, + name: __MODULE__ + + alias Commanded.Event.ErrorAggregate.Events.{ + ErrorEvent, + ExceptionEvent, + InvalidReturnValueEvent + } + + # Simulate event handling error reply + def handle(%ErrorEvent{}, _metadata) do + {:error, :failed} + end + + # Simulate event handling exception + def handle(%ExceptionEvent{}, _metadata) do + raise "exception" + end + + # Simulate event handling returning an invalid value + def handle(%InvalidReturnValueEvent{}, _metadata) do + nil + end +end diff --git a/test/event/support/error/three_strikes_error_handler.ex b/test/event/support/error/three_strikes_error_handler.ex new file mode 100644 index 00000000..f206c41c --- /dev/null +++ b/test/event/support/error/three_strikes_error_handler.ex @@ -0,0 +1,11 @@ +defmodule Commanded.Event.ThreeStrikesErrorHandler do + def error({:error, :failed}, _event, %{context: context}) do + attempts = Map.get(context, :attempts, 1) + + if attempts >= 3 do + {:stop, :too_many} + else + {:retry, Map.update(context, :attempts, 1, &(&1 + 1))} + end + end +end diff --git a/test/event_handler_after_start_test.exs b/test/event_handler_after_start_test.exs new file mode 100644 index 00000000..782f0b12 --- /dev/null +++ b/test/event_handler_after_start_test.exs @@ -0,0 +1,102 @@ +defmodule Commanded.Event.HandlerAfterStartTest do + use Commanded.MockEventStoreCase + + import Mox + import ExUnit.CaptureLog + + alias Commanded.EventStore.Adapters.Mock, as: MockEventStore + + setup do + stub(MockEventStore, :subscribe_to, fn + _event_store, :all, _handler_name, handler, _subscribe_from, _opts -> + {:ok, handler} + end) + + :ok + end + + describe "event handler `init/0` callback" do + # TODO: remove these test when we remove init/0 + + setup(%{test: test}) do + # HACK: generate a module that can communicate back to our test process + true = Process.register(self(), test) + + Code.eval_string(""" + defmodule DeprecatedHandler do + use Commanded.Event.Handler, + application: Commanded.MockedApp, + name: __MODULE__ + + @impl Commanded.Event.Handler + def init() do + process_name = :"#{test}" + Process.send(process_name, {:init, self()}, []) + end + end + """) + + [handler: start_supervised!(DeprecatedHandler)] + end + + test "should be called and a deprecation warning raised after handler subscribbed", %{ + handler: handler + } do + warning = + capture_log([level: :warning], fn -> + # When the handler subscribes to the eventstore + send_subscribed(handler) + + # Then we expect init/0 to have been called for us + assert_receive {:init, ^handler} + end) + + # And we expect a deprecation warning to have been logged + assert warning =~ "DeprecatedHandler.init/0 is deprecated, use after_start/1 instead" + end + end + + describe "event handler `after_start/1` callback" do + defmodule AfterStartHandler do + use Commanded.Event.Handler, + application: Commanded.MockedApp, + name: __MODULE__ + + def after_start(state) do + test_pid = Map.fetch!(state, :test) + ref = Map.get_lazy(state, :ref, &make_ref/0) + reply = Map.get(state, :reply, :ok) + + Process.send(test_pid, {ref, :after_start, reply}, []) + reply + end + end + + test "should be called after handler subscribed" do + ref = make_ref() + state = %{test: self(), ref: ref} + handler = start_supervised!({AfterStartHandler, state: state}) + + refute_receive {^ref, :after_start, :ok} + + send_subscribed(handler) + + assert_receive {^ref, :after_start, :ok} + end + + test "should reply with new state" do + ref = make_ref() + state = %{test: self(), ref: ref, reply: {:ok, %{something: :new}}} + handler = start_supervised!({AfterStartHandler, state: state}) + + send_subscribed(handler) + + assert_receive {^ref, :after_start, {:ok, new_state}} + assert new_state == %{something: :new} + end + end + + defp send_subscribed(handler) do + send(handler, {:subscribed, handler}) + end +end diff --git a/test/event_store/adapters/in_memory/subscription_test.exs b/test/event_store/adapters/in_memory/subscription_test.exs index 516b8dc6..8a1bf19e 100644 --- a/test/event_store/adapters/in_memory/subscription_test.exs +++ b/test/event_store/adapters/in_memory/subscription_test.exs @@ -3,6 +3,4 @@ defmodule Commanded.EventStore.Adapters.InMemory.SubscriptionTest do use Commanded.EventStore.InMemoryTestCase use Commanded.EventStore.SubscriptionTestCase, event_store: InMemory - - defp event_store_wait(_default \\ nil), do: 1 end diff --git a/test/event_store/support/subscription_test_case.ex b/test/event_store/support/subscription_test_case.ex index d7a2d8db..44c26415 100644 --- a/test/event_store/support/subscription_test_case.ex +++ b/test/event_store/support/subscription_test_case.ex @@ -408,6 +408,7 @@ defmodule Commanded.EventStore.SubscriptionTestCase do assert length(subscribers) == 3 end + @tag :partition test "should distribute events to subscribers using optional partition by function", %{ event_store: event_store, event_store_meta: event_store_meta @@ -777,12 +778,8 @@ defmodule Commanded.EventStore.SubscriptionTestCase do wait_for_event_store() end - # Optionally wait for the event store defp wait_for_event_store do - case event_store_wait() do - nil -> :ok - wait -> :timer.sleep(wait) - end + :timer.sleep(1) end defp assert_receive_events(event_store, event_store_meta, subscription, opts) do diff --git a/test/example_domain/bank_account/account_balance_handler.ex b/test/example_domain/bank_account/account_balance_handler.ex index 58cbf97a..32c401a3 100644 --- a/test/example_domain/bank_account/account_balance_handler.ex +++ b/test/example_domain/bank_account/account_balance_handler.ex @@ -9,7 +9,7 @@ defmodule Commanded.ExampleDomain.BankAccount.AccountBalanceHandler do alias Commanded.ExampleDomain.BankAccount.Events.MoneyDeposited alias Commanded.ExampleDomain.BankAccount.Events.MoneyWithdrawn - def init do + def after_start(_state) do with {:ok, _pid} <- Agent.start_link(fn -> 0 end, name: __MODULE__) do :ok end diff --git a/test/example_domain/bank_account/bank_account_handler.ex b/test/example_domain/bank_account/bank_account_handler.ex index 48843d8d..9bee95bc 100644 --- a/test/example_domain/bank_account/bank_account_handler.ex +++ b/test/example_domain/bank_account/bank_account_handler.ex @@ -6,9 +6,11 @@ defmodule Commanded.ExampleDomain.BankAccount.BankAccountHandler do name: __MODULE__, start_from: :origin + alias Commanded.Event.Handler alias Commanded.ExampleDomain.BankAccount.Events.BankAccountOpened - def init do + @impl Handler + def after_start(_state) do case Agent.start_link(fn -> %{prefix: "", accounts: []} end, name: __MODULE__) do {:ok, _} -> :ok {:error, {:already_started, _}} -> :ok @@ -16,10 +18,12 @@ defmodule Commanded.ExampleDomain.BankAccount.BankAccountHandler do end end + @impl Handler def before_reset do Agent.update(__MODULE__, fn state -> %{state | accounts: []} end) end + @impl Handler def handle(%BankAccountOpened{} = event, _metadata) do %BankAccountOpened{account_number: account_number} = event diff --git a/test/process_managers/process_manager_instance_test.exs b/test/process_managers/process_manager_instance_test.exs index 7389efbf..f01e19a1 100644 --- a/test/process_managers/process_manager_instance_test.exs +++ b/test/process_managers/process_manager_instance_test.exs @@ -17,11 +17,15 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstanceTest do alias Commanded.Registration.LocalRegistry alias Commanded.UUID + alias Commanded.Serialization.ModuleNameTypeProvider + alias Commanded.Serialization.TypeProvider.Mock, as: MockTypeProvider + setup :set_mox_global setup :verify_on_exit! setup do mock_event_store() + mock_type_provider() {:ok, registry_meta} = start_local_registry() @@ -46,6 +50,12 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstanceTest do {:error, :snapshot_not_found} end) + # Ensure a type provider call is made + expect(MockTypeProvider, :to_string, fn process_state -> + assert %TransferMoneyProcessManager{} = process_state + ModuleNameTypeProvider.to_string(process_state) + end) + expect(MockEventStore, :record_snapshot, fn _adapter_meta, snapshot -> assert %SnapshotData{ data: %TransferMoneyProcessManager{ @@ -223,6 +233,20 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstanceTest do stub(MockEventStore, :ack_event, fn _event_store, _pid, _event -> :ok end) end + defp mock_type_provider do + stub(MockTypeProvider, :to_string, fn struct -> ModuleNameTypeProvider.to_string(struct) end) + stub(MockTypeProvider, :to_struct, fn type -> ModuleNameTypeProvider.to_struct(type) end) + + current_type_provider = Application.get_env(:commanded, :type_provider) + Application.put_env(:commanded, :type_provider, MockTypeProvider) + + on_exit(fn -> + if current_type_provider, + do: Application.put_env(:commanded, :type_provider, current_type_provider), + else: Application.delete_env(:commanded, :type_provider) + end) + end + defp start_local_registry do {:ok, registry_child_spec, registry_meta} = LocalRegistry.child_spec(MockApplication, []) diff --git a/test/serialization/json_decoder_test.exs b/test/serialization/json_decoder_test.exs index 3a07dba8..fc7d2822 100644 --- a/test/serialization/json_decoder_test.exs +++ b/test/serialization/json_decoder_test.exs @@ -18,22 +18,32 @@ defmodule Commanded.Serialization.JsonDecoderTest do end end - @serialized_event_json "{\"datetime\":\"2016-09-20T20:01:02Z\",\"name\":\"Ben\"}" - test "should serialize value to JSON" do - {:ok, dt, _} = DateTime.from_iso8601("2016-09-20 20:01:02Z") - event = %ExampleEvent{name: "Ben", datetime: dt} + event = %ExampleEvent{name: "Ben", datetime: ~U[2024-10-22 00:00:00Z]} + + serialized = JsonSerializer.serialize(event) - assert JsonSerializer.serialize(event) == @serialized_event_json + assert serialized =~ "\"datetime\":\"2024-10-22T00:00:00Z\"" + assert serialized =~ "\"name\":\"Ben\"" end test "should allow decoding of deserialized value from JSON" do - {:ok, dt, _} = DateTime.from_iso8601("2016-09-20 20:01:02Z") + serialized = "{\"name\":\"Ben\",\"datetime\":\"2024-10-22T00:00:00Z\"}" + + type = Atom.to_string(ExampleEvent) + deserialized = JsonSerializer.deserialize(serialized, type: type) + + event = %ExampleEvent{name: "Ben", datetime: ~U[2024-10-22 00:00:00Z]} + assert deserialized == event + end + + test "should round-trip serialization-deserialization" do + event = %ExampleEvent{name: "Ben", datetime: ~U[2024-10-22 00:00:00Z]} + type = Atom.to_string(ExampleEvent) - event = %ExampleEvent{name: "Ben", datetime: dt} - type = Atom.to_string(event.__struct__) + deserialized = event |> JsonSerializer.serialize() |> JsonSerializer.deserialize(type: type) - assert JsonSerializer.deserialize(@serialized_event_json, type: type) == event + assert deserialized == event end defmodule ParentEvent do diff --git a/test/subscriptions/distributed_subscriptions_test.exs b/test/subscriptions/distributed_subscriptions_test.exs index 0ea8f6b1..e3e3fa64 100644 --- a/test/subscriptions/distributed_subscriptions_test.exs +++ b/test/subscriptions/distributed_subscriptions_test.exs @@ -6,9 +6,16 @@ defmodule Commanded.DistributedSubscriptionsTest do @moduletag :distributed setup do + {"", 0} = System.cmd("epmd", ["-daemon"]) :ok = LocalCluster.start() - nodes = LocalCluster.start_nodes("commanded", 3, applications: [:commanded]) + {:ok, cluster} = + LocalCluster.start_link(3, + prefix: "commanded", + applications: [:commanded] + ) + + {:ok, nodes} = LocalCluster.nodes(cluster) [nodes: nodes] end diff --git a/test/support/mocks.ex b/test/support/mocks.ex index 6bfad3d7..ad36fa51 100644 --- a/test/support/mocks.ex +++ b/test/support/mocks.ex @@ -1,3 +1,4 @@ Mox.defmock(Commanded.EventStore.Adapters.Mock, for: Commanded.EventStore.Adapter) Mox.defmock(Commanded.Commands.MockRouter, for: Commanded.Commands.Router) Mox.defmock(Commanded.Application.Mock, for: Commanded.Application) +Mox.defmock(Commanded.Serialization.TypeProvider.Mock, for: Commanded.EventStore.TypeProvider)