diff --git a/BLOCKING_10.md b/BLOCKING_10.md new file mode 100644 index 00000000..cbdc1232 --- /dev/null +++ b/BLOCKING_10.md @@ -0,0 +1,353 @@ +# State Synchronization Issue - Blocking Verbs 1.0 + +## When we get back + +Add cache layer to state manager + +```php +$cache = [ + new MemoryCache(), + new RedisCache(), + new DatabaseCache(), +]; + +$cache = [ + new MemoryCache(), + new ReadOnlyCache(new RedisCache()), + new WriteOnlyCache(new DatabaseCache()), +]; +``` + +Look into using Timeline to reconstitute after loading from cache. + +- We need to make sure there's some way to determine if snapshots are in sync + +## Problem Statement + +The `StateManager` class currently conflates two responsibilities that should be separated: + +1. **State Repository**: Managing the singleton instances of all states in the system +2. **State Reconstitution**: Loading and replaying events to bring states up-to-date + +This architectural coupling creates critical synchronization issues when events operate on multiple interdependent +states. + +## The Core Issue + +When an event modifies multiple states (e.g., `PlayerState` and `GameState`), and those states have snapshots at +different points in time, the reconstitution process can lead to: + +1. **Temporal Inconsistency**: States being reconstituted independently may end up at different points in the event + stream +2. **Circular Dependencies**: Loading State A triggers reconstitution that loads State B, which may trigger loading + State A again +3. **Double Application**: The same event may be applied multiple times during cross-state reconstitution +4. **Future Data Leakage**: Events may see "future" state when one state is reconstituted ahead of another + +## Daniel Notes + +### Goals + +- State Reconstitution: given a snapshot (or nothing) and any given number of applicable events, you can + calculate the state. +- State Repository: support the existence of multiple state repos, keeping as much of the singleton behavior + as possible. There should be one "singleton" state repository, but there may be others as needed. We may need + to "swap out" the "current" state repository. + +## Chris Notes + +### Problems + +- We load States in the wrong way internally: the public API of `GameState::load()` is good for userland code, + but we should be loading states in a lower-level way inside Verbs so that we have better control over how the + state is loaded in different contexts. +- The reality of loading state requires ALL the context: we need to know all the states and events that will be + used to build up that state. + +### State classes + +- Right now we have `StateManager` and `StateInstanceCache` and `State` and `SnapshotStore` and `VerbSnapshot` + +#### `StateRegistry` + +- `get` +- `put` +- has a curryable cache layer + +#### State functions + +- GLOBAL CONTEXT: Load state from storage or cache and maybe reconstitute +- BOTH CONTEXTS: Remember this state (cache) +- INTERNAL CONTEXT: Get this state if you have it (cache) + +- Load state from snapshot +- Apply events to state +- Load state from cache +- Identify whether a state has a snapshot (or needs to be reconstituted) +- Get a default instance of a state + +### Ways we load state: + +1. For replay +2. For reconstitution +3. For userland `::load()` contexts +4. For event lifecycle hooks (during fire and replay and to some degree reconstitution) + +## Maybe Solutions + +"Everything is a play through of an event stream" + +- Sometimes we play events as they happen +- Sometimes we play them to reconstitute +- Sometimes we play them to replay + +The "unit" is a "Replay" (could be called Saga) in this case. Each replay can have any number of lifecycle hooks +enabled. It's essentially a lazy collection or iterator of an unknown number of events, and builds an internal +collection of states. + +## Concrete Example + +```php +class PlayerEnabledModifier extends Event { + public PlayerState $player; + public GameState $game; + + public function validate(PlayerState $player) { + $this->assert($player->hasItem('modifier_token')); + } + + public function apply(PlayerState $player, GameState $game): void { + $game->active_modifiers[] = $this->modifier_id; + } +} +``` + +If `PlayerState` snapshot is at event #100 and `GameState` snapshot is at event #50, reconstituting them independently +means the validation logic sees a future state of the player when processing a past event. + +## Programming Patterns to Address This Issue + +### 1. **Unit of Work Pattern** + +Treat multi-state reconstitution as a single atomic operation: + +```php +class ReconstitutionUnitOfWork { + protected array $states_to_load = []; + protected array $events_to_reapply = []; + + public function addState(string $type, $id): void { + $this->states_to_load[] = [$type, $id]; + } + + public function execute(): array { + $snapshots = $this->loadAllSnapshots(); + $common_event_id = $this->findEarliestSnapshotEventId($snapshots); + $events = $this->loadEventsAfter($common_event_id); + return $this->reapplyEventsInOrder($events, $snapshots); + } +} +``` + +### 2. **Saga Pattern** + +Coordinate state reconstitution as a distributed transaction: + +```php +class ReconstitutionSaga { + protected array $participating_states = []; + + public function begin(): void { + // Mark states as "reconstituting" to prevent concurrent access + } + + public function addParticipant(State $state): void { + $this->participating_states[] = $state; + } + + public function commit(): void { + // Atomically update all states + DB::transaction(function() { + foreach ($this->participating_states as $state) { + $this->snapshots->write($state); + } + }); + } +} +``` + +### 3. **Event Store Pattern with Global Ordering** + +Ensure all events are replayed in global order: + +```php +class GlobalEventStream { + public function getEventsForStates(array $state_ids, int $after_event_id): Collection { + return VerbEvent::query() + ->whereHas('states', fn($q) => $q->whereIn('state_id', $state_ids)) + ->where('id', '>', $after_event_id) + ->orderBy('id') // Global ordering + ->get(); + } +} +``` + +### 4. **Snapshot Coordination Pattern** + +Ensure related states are snapshotted together: + +```php +class CoordinatedSnapshotStore { + public function writeRelatedSnapshots(array $states): void { + $max_event_id = $this->findMaxEventId($states); + + DB::transaction(function() use ($states, $max_event_id) { + foreach ($states as $state) { + $this->writeSnapshot($state, $max_event_id); + } + }); + } +} +``` + +### 5. **Dependency Graph Resolution** + +Track and resolve state dependencies before reconstitution: + +```php +class StateDependencyResolver { + protected array $dependency_graph = []; + + public function analyze(Event $event): array { + // Analyze which states this event affects + $affected_states = $this->getAffectedStates($event); + + // Build dependency graph + foreach ($affected_states as $state) { + $this->dependency_graph[$state->id] = $affected_states; + } + + return $this->topologicalSort($this->dependency_graph); + } +} +``` + +### 6. **Two-Phase Loading Pattern** + +Separate state loading from reconstitution: + +```php +class TwoPhaseStateLoader { + // Phase 1: Load all required states without reconstitution + public function loadStatesWithoutReconstitution(array $state_specs): array { + return collect($state_specs) + ->map(fn($spec) => $this->loadSnapshot($spec)) + ->all(); + } + + // Phase 2: Reconstitute all states together + public function reconstituteTogether(array $states): array { + $min_event_id = collect($states)->min('last_event_id'); + $events = $this->loadEventsAfter($min_event_id); + + foreach ($events as $event) { + $this->applyEventToRelevantStates($event, $states); + } + + return $states; + } +} +``` + +## Recommended Solution Architecture + +1. **Separate Concerns**: Split `StateManager` into: + - `StateRepository`: Manages state instances + - `StateReconstitutor`: Handles replay logic + - `SnapshotCoordinator`: Manages consistent snapshots + +2. **Implement Global Event Ordering**: Ensure events are always replayed in the order they were originally fired + +3. **Atomic Reconstitution**: When loading multiple states, reconstitute them as a single atomic operation + +4. **Consistent Snapshots**: Implement snapshot sets that capture related states at the same logical point + +## Testing Considerations + +Tests should verify: + +- Circular dependency handling +- Consistent state after multi-state reconstitution +- No double-application of events +- Proper handling of partial snapshot scenarios +- Performance with large event streams + +## Migration Path + +1. Add feature flag for new reconstitution logic +2. Implement parallel reconstitution system +3. Add comprehensive tests comparing old vs new behavior +4. Gradually migrate to new system with monitoring +5. Remove old reconstitution code once stable + +## Additional Considerations + +### Event Sourcing Best Practices + +The synchronization issue described is a classic problem in event sourcing when dealing with aggregate boundaries. Key +principles to consider: + +- **Aggregate Consistency**: Each aggregate (state) should be internally consistent, but cross-aggregate consistency can + be eventual +- **Process Managers**: For coordinating changes across multiple aggregates, consider implementing process managers that + orchestrate multi-state operations +- **Compensating Events**: When reconstitution fails or produces inconsistent state, having a mechanism for compensating + events could help recovery + +### Reconstitution Context Requirements + +Building on Chris's four loading contexts, each has different consistency requirements: + +1. **Replay Context**: Requires strict ordering and full consistency - all states must be at the exact same point in the + event stream +2. **Reconstitution Context**: Needs consistency within the reconstitution boundary but may tolerate some staleness for + states outside the boundary +3. **Userland ::load() Context**: Could potentially accept eventual consistency depending on use case +4. **Event Lifecycle Context**: Needs point-in-time consistency for the specific moment the event is being processed + +### Performance Optimization Strategies + +Beyond the patterns listed, consider: + +- **Parallel Reconstitution**: For states with no interdependencies, reconstitute in parallel +- **Incremental Reconstitution**: Only reconstitute the delta between snapshot and current state +- **Reconstitution Caching**: Cache recently reconstituted states with TTL based on event frequency +- **Lazy Property Loading**: Defer loading of expensive state properties until accessed + +### Snapshot Coordination Strategies + +Different approaches to maintaining snapshot consistency: + +1. **Event-Aligned Snapshots**: All related states snapshot at the same global event ID +2. **Time-Based Snapshots**: Snapshot all states at regular wall-clock intervals +3. **Logical Clock Snapshots**: Use vector clocks or hybrid logical clocks to maintain causality +4. **Demand-Driven Snapshots**: Snapshot when reconstitution cost exceeds threshold + +### Error Recovery and Debugging + +Important considerations for production systems: + +- **Reconstitution Audit Trail**: Log which events were applied during reconstitution for debugging +- **Deterministic Reconstitution**: Same events + same snapshot should always produce identical state +- **Reconstitution Timeouts**: Prevent infinite loops with configurable timeouts +- **State Corruption Detection**: Checksums or invariant checks to detect corrupted state + +### Testing Strategies Beyond Current List + +Additional test scenarios to consider: + +- **Concurrent Reconstitution**: Multiple threads reconstituting overlapping states +- **Memory Pressure**: Reconstituting very large state graphs +- **Network Partitions**: Handling partial event availability +- **Schema Evolution**: Reconstituting states across event schema changes +- **Reconstitution Determinism**: Verify identical results across multiple runs diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 00000000..c5b05a88 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,80 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +Verbs is a Laravel package that provides event sourcing capabilities. It focuses on developer experience, following +Laravel conventions, and minimizing boilerplate. + +## Code Rules + +- Never use `private` or `readonly` keywords +- Never use strict types +- Values are `snake_case` +- Anything callable is `camelCase` (even if it's a variable) +- Paths are `kebab-case` (URLs, files, etc) +- Only apply docblocks where they provide useful IDE/static analysis value + +## Testing + +The project uses Pest PHP for testing. Key testing patterns: + +```php +// Use Verbs::fake() to prevent database writes during tests +Verbs::fake(); + +// Use Verbs::commitImmediately() for integration tests +Verbs::commitImmediately(); + +// Create test states using factories +CustomerState::factory()->id($id)->create(); +``` + +## High-Level Architecture + +### Core Concepts + +1. **Events**: Immutable records of what happened in the system + - Located in `src/Events/` + - Must extend `Verbs\Event` + - Can implement `boot()`, `authorize()`, `validate()`, `apply()`, and `handle()` methods + +2. **States**: Aggregate event data over time + - Located in `src/States/` + - Must extend `Verbs\State` + - Use `#[StateId]` attribute to specify which event property contains the state ID + +3. **Storage**: Three-table structure + - `verb_events`: All events with metadata + - `verb_snapshots`: State snapshots for performance + - `verb_state_events`: Event-to-state mappings + +### Key Directories + +- `src/`: Main package source code + - `Attributes/`: PHP 8 attributes for configuration + - `Commands/`: Artisan commands + - `Contracts/`: Interfaces + - `Events/`: Base event classes and utilities + - `Facades/`: Laravel facades + - `Models/`: Eloquent models for storage + - `States/`: Base state classes + - `Support/`: Utilities and helpers +- `tests/`: Pest tests organized by feature +- `examples/`: Complete example implementations (Bank, Cart, etc.) + +### Important Patterns + +1. **Event Lifecycle**: boot -> authorize → validate → apply → handle +2. **Attribute Usage**: `#[StateId]`, `#[AppliesToState]`, `#[AppliesToSingletonState]` +3. **Serialization**: Custom normalizers in `src/Support/Normalization/` +4. **Replay Safety**: Use `#[Once]` annotations and `Verbs::unlessReplaying()` for side effects + +## Development Guidelines + +- Follow Laravel package conventions +- Use Pest for all new tests +- Run `composer format` before committing +- Ensure compatibility with PHP 8.1+ and Laravel 10.x, 11.x, 12.x +- Test against SQLite, MySQL, and PostgreSQL when modifying storage logic diff --git a/examples/Bank/tests/BankAccountTest.php b/examples/Bank/tests/BankAccountTest.php index 8184f98c..f889db85 100644 --- a/examples/Bank/tests/BankAccountTest.php +++ b/examples/Bank/tests/BankAccountTest.php @@ -11,8 +11,8 @@ use Thunk\Verbs\Examples\Bank\Models\User; use Thunk\Verbs\Examples\Bank\States\AccountState; use Thunk\Verbs\Facades\Verbs; -use Thunk\Verbs\Lifecycle\StateManager; use Thunk\Verbs\Models\VerbEvent; +use Thunk\Verbs\State\StateManager; test('a bank account can be opened and interacted with', function () { Mail::fake(); @@ -99,7 +99,7 @@ // We'll also confirm that the state is correctly loaded without snapshots - app(StateManager::class)->reset(include_storage: true); + app(StateManager::class)->reset(); $account_state = AccountState::load($account->id); expect($account_state->balance_in_cents)->toBe(100_00); diff --git a/examples/Counter/tests/StateRehydrationTest.php b/examples/Counter/tests/StateRehydrationTest.php index 3d6a64f1..925be504 100644 --- a/examples/Counter/tests/StateRehydrationTest.php +++ b/examples/Counter/tests/StateRehydrationTest.php @@ -2,9 +2,9 @@ use Thunk\Verbs\Examples\Counter\Events\IncrementCount; use Thunk\Verbs\Facades\Verbs; -use Thunk\Verbs\Lifecycle\StateManager; use Thunk\Verbs\Models\VerbEvent; use Thunk\Verbs\Models\VerbSnapshot; +use Thunk\Verbs\State\StateManager; beforeEach(function () { Verbs::commitImmediately(); @@ -31,7 +31,7 @@ expect(VerbEvent::query()->count())->toBe(1); - app(StateManager::class)->reset(include_storage: true); + app(StateManager::class)->reset(); $state = IncrementCount::fire()->state(); diff --git a/src/Attributes/Autodiscovery/AppliesToChildState.php b/src/Attributes/Autodiscovery/AppliesToChildState.php index 7333ec07..57ec6bdd 100644 --- a/src/Attributes/Autodiscovery/AppliesToChildState.php +++ b/src/Attributes/Autodiscovery/AppliesToChildState.php @@ -5,8 +5,8 @@ use Attribute; use InvalidArgumentException; use Thunk\Verbs\Event; -use Thunk\Verbs\Lifecycle\StateManager; use Thunk\Verbs\State; +use Thunk\Verbs\State\StateManager; #[Attribute(Attribute::TARGET_CLASS)] class AppliesToChildState extends StateDiscoveryAttribute @@ -35,6 +35,6 @@ public function discoverState(Event $event, StateManager $manager): State { $parent = $this->discovered->first(fn (State $state) => $state instanceof $this->parent_type); - return $manager->load($parent->{$this->id}, $this->state_type); + return $manager->load($this->state_type, $parent->{$this->id}); } } diff --git a/src/Attributes/Autodiscovery/AppliesToState.php b/src/Attributes/Autodiscovery/AppliesToState.php index 7af9d5f4..979d5ec0 100644 --- a/src/Attributes/Autodiscovery/AppliesToState.php +++ b/src/Attributes/Autodiscovery/AppliesToState.php @@ -7,9 +7,9 @@ use Illuminate\Support\Str; use InvalidArgumentException; use Thunk\Verbs\Event; -use Thunk\Verbs\Lifecycle\StateManager; use Thunk\Verbs\SingletonState; use Thunk\Verbs\State; +use Thunk\Verbs\State\StateManager; #[Attribute(Attribute::TARGET_CLASS | Attribute::IS_REPEATABLE)] class AppliesToState extends StateDiscoveryAttribute @@ -43,13 +43,13 @@ public function discoverState(Event $event, StateManager $manager): State|array $id = snowflake_id(); $event->{$property} = $id; - return $manager->make($id, $this->state_type); + return $manager->make($this->state_type, $id); } // TODO: Check type of data return collect(Arr::wrap($id)) - ->map(fn ($id) => $manager->load($id, $this->state_type)) + ->map(fn ($id) => $manager->load($this->state_type, $id)) ->all(); } diff --git a/src/Attributes/Autodiscovery/StateDiscoveryAttribute.php b/src/Attributes/Autodiscovery/StateDiscoveryAttribute.php index 385d97ba..6d628c84 100644 --- a/src/Attributes/Autodiscovery/StateDiscoveryAttribute.php +++ b/src/Attributes/Autodiscovery/StateDiscoveryAttribute.php @@ -6,8 +6,8 @@ use Illuminate\Support\Str; use ReflectionProperty; use Thunk\Verbs\Event; -use Thunk\Verbs\Lifecycle\StateManager; use Thunk\Verbs\State; +use Thunk\Verbs\State\StateManager; abstract class StateDiscoveryAttribute { diff --git a/src/Attributes/Autodiscovery/StateId.php b/src/Attributes/Autodiscovery/StateId.php index 6366241d..f3c67af6 100644 --- a/src/Attributes/Autodiscovery/StateId.php +++ b/src/Attributes/Autodiscovery/StateId.php @@ -6,8 +6,8 @@ use Illuminate\Support\Arr; use InvalidArgumentException; use Thunk\Verbs\Event; -use Thunk\Verbs\Lifecycle\StateManager; use Thunk\Verbs\State; +use Thunk\Verbs\State\StateManager; #[Attribute(Attribute::TARGET_PROPERTY)] class StateId extends StateDiscoveryAttribute @@ -42,15 +42,15 @@ public function discoverState(Event $event, StateManager $manager): State|array $autofilled[$property_name] = true; $meta->put('autofilled', $autofilled); - return $manager->make($id, $this->state_type); + return $manager->make($this->state_type, $id); } // If we autofilled the value when it first fired, then we know this is the // first event for that given state, and we don't need to try to load it if ($meta->get("autofilled.{$property_name}", false)) { - return $manager->make($id, $this->state_type); + return $manager->make($this->state_type, $id); } - return array_map(fn ($id) => $manager->load($id, $this->state_type), Arr::wrap($id)); + return array_map(fn ($id) => $manager->load($this->state_type, $id), Arr::wrap($id)); } } diff --git a/src/Contracts/StoresEvents.php b/src/Contracts/StoresEvents.php index 057356d8..47619319 100644 --- a/src/Contracts/StoresEvents.php +++ b/src/Contracts/StoresEvents.php @@ -7,6 +7,7 @@ use Ramsey\Uuid\UuidInterface; use Symfony\Component\Uid\AbstractUid; use Thunk\Verbs\Event; +use Thunk\Verbs\Lifecycle\AggregateStateSummary; use Thunk\Verbs\State; interface StoresEvents @@ -16,6 +17,10 @@ public function read( Bits|UuidInterface|AbstractUid|int|string|null $after_id = null, ): LazyCollection; + public function get(iterable $ids): LazyCollection; + /** @param Event[] $events */ public function write(array $events): bool; + + public function summarize(State ...$states): AggregateStateSummary; } diff --git a/src/Contracts/StoresSnapshots.php b/src/Contracts/StoresSnapshots.php index 0b9af0d1..140055ba 100644 --- a/src/Contracts/StoresSnapshots.php +++ b/src/Contracts/StoresSnapshots.php @@ -17,4 +17,6 @@ public function loadSingleton(string $type): ?State; public function write(array $states): bool; public function reset(): bool; + + public function delete(Bits|UuidInterface|AbstractUid|int|string ...$ids): bool; } diff --git a/src/Event.php b/src/Event.php index 23951c41..5f69d8b6 100644 --- a/src/Event.php +++ b/src/Event.php @@ -13,7 +13,6 @@ use Thunk\Verbs\Support\EventStateRegistry; use Thunk\Verbs\Support\PendingEvent; use Thunk\Verbs\Support\StateCollection; -use WeakMap; /** * @method static static fire(...$args) @@ -42,11 +41,7 @@ public function metadata(?string $key = null, mixed $default = null): mixed public function states(): StateCollection { - // TODO: This is a bit hacky, but is probably OK right now - - static $map = new WeakMap; - - return $map[$this] ??= app(EventStateRegistry::class)->getStates($this); + return app(EventStateRegistry::class)->getStates($this); } /** diff --git a/src/Lifecycle/AggregateStateSummary.php b/src/Lifecycle/AggregateStateSummary.php new file mode 100644 index 00000000..a41f99b6 --- /dev/null +++ b/src/Lifecycle/AggregateStateSummary.php @@ -0,0 +1,98 @@ +map(StateIdentity::from(...)), + ); + + return $summary->discover(); + } + + /** + * @param Collection $original_states + * @param Collection $related_event_ids + * @param Collection $related_states + */ + public function __construct( + public Collection $original_states = new Collection, + public Collection $related_event_ids = new Collection, + public Collection $related_states = new Collection, + ) {} + + public function events(): Enumerable + { + return app(StoresEvents::class)->get($this->related_event_ids); + } + + protected function discover(): static + { + $this->discoverNewEventIds(); + + do { + $continue = $this->discoverNewStates() && $this->discoverNewEventIds(); + } while ($continue); + + $this->related_event_ids = $this->related_event_ids->sort(); + + return $this; + } + + protected function discoverNewEventIds(): bool + { + $new_event_ids = VerbStateEvent::query() + ->distinct() + ->select('event_id') + ->whereNotIn('event_id', $this->related_event_ids) + ->where(fn (Builder $query) => $this->related_states->each( + fn ($state) => $query->orWhere(fn (Builder $query) => $this->addConstraint($state, $query))), + ) + ->toBase() + ->pluck('event_id'); + + $this->related_event_ids = $this->related_event_ids->merge($new_event_ids); + + return $new_event_ids->isNotEmpty(); + } + + protected function discoverNewStates(): bool + { + $discovered_states = VerbStateEvent::query() + ->orderBy('id') + ->distinct() + ->select(['state_id', 'state_type']) + ->whereIn('event_id', $this->related_event_ids) + ->where(fn (Builder $query) => $this->related_states->each( + fn ($state) => $query->whereNot(fn (Builder $query) => $this->addConstraint($state, $query))), + ) + ->toBase() + ->chunkMap(StateIdentity::from(...)); + + $this->related_states = $this->related_states->merge($discovered_states); + + return $discovered_states->isNotEmpty(); + } + + protected function addConstraint(StateIdentity $state, Builder $query): Builder + { + $query->where('state_type', '=', $state->state_type); + $query->where('state_id', '=', $state->state_id); + + return $query; + } +} diff --git a/src/Lifecycle/Broker.php b/src/Lifecycle/Broker.php index 533a1222..489f19bc 100644 --- a/src/Lifecycle/Broker.php +++ b/src/Lifecycle/Broker.php @@ -8,6 +8,7 @@ use Thunk\Verbs\Event; use Thunk\Verbs\Exceptions\EventNotValid; use Thunk\Verbs\Lifecycle\Queue as EventQueue; +use Thunk\Verbs\State\StateManager; class Broker implements BrokersEvents { @@ -34,22 +35,16 @@ public function fireIfValid(Event $event): ?Event public function fire(Event $event): ?Event { if ($this->is_replaying) { - return null; + return null; // FIXME } - // NOTE: Any changes to how the dispatcher is called here - // should also be applied to the `replay` method - - $this->dispatcher->boot($event); - - Guards::for($event)->check(); - - $this->dispatcher->apply($event); + Lifecycle::run( + event: $event, + phases: Phases::fire(), + ); + // FIXME: This is now in a slightly different execution order $this->queue->queue($event); - - $this->dispatcher->fired($event); - if ($this->commit_immediately || $event instanceof CommitsImmediately) { $this->commit(); } @@ -65,10 +60,9 @@ public function commit(): bool return true; } - // FIXME: Only write changes + handle aggregate versioning - - $this->states->writeSnapshots(); - $this->states->prune(); + // FIXME: + // $this->states->writeSnapshots(); + // $this->states->prune(); foreach ($events as $event) { $this->metadata->setLastResults($event, $this->dispatcher->handle($event)); @@ -82,7 +76,7 @@ public function replay(?callable $beforeEach = null, ?callable $afterEach = null $this->is_replaying = true; try { - $this->states->reset(include_storage: true); + $this->states->reset(); $iteration = 0; diff --git a/src/Lifecycle/EventStore.php b/src/Lifecycle/EventStore.php index 40b8fd44..09b99ece 100644 --- a/src/Lifecycle/EventStore.php +++ b/src/Lifecycle/EventStore.php @@ -5,6 +5,7 @@ use Glhd\Bits\Bits; use Illuminate\Database\Eloquent\Builder; use Illuminate\Database\Query\Builder as BaseBuilder; +use Illuminate\Database\Query\Expression; use Illuminate\Support\Collection; use Illuminate\Support\Facades\DB; use Illuminate\Support\LazyCollection; @@ -35,6 +36,15 @@ public function read( ->map(fn (VerbEvent $model) => $model->event()); } + public function get(iterable $ids): LazyCollection + { + return VerbEvent::query() + ->whereIn('id', collect($ids)) + ->lazyById() + ->each(fn (VerbEvent $model) => $this->metadata->set($model->event(), $model->metadata())) + ->map(fn (VerbEvent $model) => $model->event()); + } + public function write(array $events): bool { if (empty($events)) { @@ -47,6 +57,11 @@ public function write(array $events): bool && VerbStateEvent::insert($this->formatRelationshipsForWrite($events)); } + public function summarize(State ...$states): AggregateStateSummary + { + return AggregateStateSummary::summarize(...$states); + } + protected function readEvents( ?State $state, Bits|UuidInterface|AbstractUid|int|string|null $after_id, @@ -78,11 +93,7 @@ protected function guardAgainstConcurrentWrites(array $events): void $query->select([ 'state_type', 'state_id', - DB::raw(sprintf( - 'max(%s) as %s', - $query->getGrammar()->wrap('event_id'), - $query->getGrammar()->wrapTable('max_event_id') - )), + $this->aggregateExpression($query, 'event_id', 'max'), ]); $query->groupBy('state_type', 'state_id'); @@ -148,4 +159,14 @@ protected function formatRelationshipsForWrite(array $event_objects): array ->values() ->all(); } + + protected function aggregateExpression(BaseBuilder $query, string $column, string $function): Expression + { + return DB::raw(sprintf( + '%s(%s) as %s', + $function, + $query->getGrammar()->wrap($column), + $query->getGrammar()->wrapTable("{$function}_{$column}") + )); + } } diff --git a/src/Lifecycle/Lifecycle.php b/src/Lifecycle/Lifecycle.php new file mode 100644 index 00000000..9349fc8c --- /dev/null +++ b/src/Lifecycle/Lifecycle.php @@ -0,0 +1,47 @@ +handle(); + } + + public function __construct( + public Dispatcher $dispatcher, + public Event $event, + public Phases $phases, + ) {} + + public function handle(): Event + { + if ($this->phases->has(Phase::Boot)) { + $this->dispatcher->boot($this->event); + } + + // FIXME: This is actually two phases + if ($this->phases->has(Phase::Authorize)) { + Guards::for($this->event)->check(); + } + + if ($this->phases->has(Phase::Apply)) { + $this->dispatcher->apply($this->event); + } + + if ($this->phases->has(Phase::Handle)) { + // FIXME + // $this->queue->queue($this->event); + $this->dispatcher->handle($this->event); + } + + if ($this->phases->has(Phase::Fired)) { + $this->dispatcher->fired($this->event); + } + + return $this->event; + } +} diff --git a/src/Lifecycle/NullSnapshotStore.php b/src/Lifecycle/NullSnapshotStore.php new file mode 100644 index 00000000..5d4879c0 --- /dev/null +++ b/src/Lifecycle/NullSnapshotStore.php @@ -0,0 +1,38 @@ +phases = $phases; + } + + public function has(Phase $phase): bool + { + return in_array($phase, $this->phases); + } +} diff --git a/src/Lifecycle/SnapshotStore.php b/src/Lifecycle/SnapshotStore.php index 3a342d56..1872b43b 100644 --- a/src/Lifecycle/SnapshotStore.php +++ b/src/Lifecycle/SnapshotStore.php @@ -64,6 +64,13 @@ public function write(array $states): bool return true; } + public function delete(Bits|UuidInterface|AbstractUid|int|string ...$ids): bool + { + $ids = array_map(Id::from(...), $ids); + + return VerbSnapshot::whereIn('state_id', $ids)->delete() === true; + } + public function reset(): bool { VerbSnapshot::truncate(); diff --git a/src/Lifecycle/StateManager.php b/src/Lifecycle/StateManager.php deleted file mode 100644 index 9ce4553f..00000000 --- a/src/Lifecycle/StateManager.php +++ /dev/null @@ -1,229 +0,0 @@ -id ??= snowflake_id(); - - return $this->remember($state); - } - - /** - * @template S instanceof State - * - * @param class-string $type - * @return S|StateCollection - */ - public function load(Bits|UuidInterface|AbstractUid|iterable|int|string $id, string $type): StateCollection|State - { - return is_iterable($id) - ? $this->loadMany($id, $type) - : $this->loadOne($id, $type); - } - - /** - * @template TStateClass of State - * - * @param class-string $type - * @return TStateClass - */ - public function singleton(string $type): State - { - // FIXME: If the state we're loading has a last_event_id that's ahead of the registry's last_event_id, we need to re-build the state - - if ($state = $this->states->get($type)) { - return $state; - } - - $state = $this->snapshots->loadSingleton($type) ?? new $type; - $state->id ??= snowflake_id(); - - // We'll store a reference to it by the type for future singleton access - $this->states->put($type, $state); - $this->remember($state); - - $this->reconstitute($state); - - return $state; - } - - /** - * @template TState of State - * - * @param class-string $type - * @return TState - */ - public function make(Bits|UuidInterface|AbstractUid|int|string $id, string $type): State - { - // If we've already instantiated this state, we'll load it - if ($existing = $this->states->get($this->key($id, $type))) { - return $existing; - } - - // State::__construct() auto-registers the state with the StateManager, - // so we need to skip the constructor until we've already set the ID. - $state = (new ReflectionClass($type))->newInstanceWithoutConstructor(); - $state->id = Id::from($id); - $state->__construct(); - - return $this->remember($state); - } - - public function writeSnapshots(): bool - { - return $this->snapshots->write($this->states->values()); - } - - public function setReplaying(bool $replaying): static - { - $this->is_replaying = $replaying; - - return $this; - } - - public function reset(bool $include_storage = false): static - { - $this->states->reset(); - $this->is_replaying = false; - - if ($include_storage) { - $this->snapshots->reset(); - } - - return $this; - } - - public function willPrune(): bool - { - return $this->states->willPrune(); - } - - public function prune(): static - { - $this->states->prune(); - - return $this; - } - - /** @param class-string $type */ - protected function loadOne(Bits|UuidInterface|AbstractUid|int|string $id, string $type): State - { - $id = Id::from($id); - $key = $this->key($id, $type); - - // FIXME: If the state we're loading has a last_event_id that's ahead of the registry's last_event_id, we need to re-build the state - - if ($state = $this->states->get($key)) { - return $state; - } - - if ($state = $this->snapshots->load($id, $type)) { - if (! $state instanceof $type) { - throw new UnexpectedValueException(sprintf('Expected State <%d> to be of type "%s" but got "%s"', $id, class_basename($type), class_basename($state))); - } - } else { - $state = $this->make($id, $type); - } - - $this->remember($state); - $this->reconstitute($state); - - return $state; - } - - /** @param class-string $type */ - protected function loadMany(iterable $ids, string $type): StateCollection - { - $ids = collect($ids)->map(Id::from(...)); - - $missing = $ids->reject(fn ($id) => $this->states->has($this->key($id, $type))); - - // Load all available snapshots for missing states - $this->snapshots->load($missing, $type)->each(function (State $state) { - $this->remember($state); - $this->reconstitute($state); - }); - - // Then make any states that don't exist yet - $missing - ->reject(fn ($id) => $this->states->has($this->key($id, $type))) - ->each(function (string|int $id) use ($type) { - $state = $this->make($id, $type); - $this->remember($state); - $this->reconstitute($state); - }); - - // At this point, all the states should be in our cache, so we can just load everything - return StateCollection::make( - $ids->map(fn ($id) => $this->states->get($this->key($id, $type))) - ); - } - - protected function reconstitute(State $state): static - { - // When we're replaying, the Broker is in charge of applying the correct events - // to the State, so we only need to do it *outside* of replays. - if (! $this->is_replaying) { - $this->events - ->read(state: $state, after_id: $state->last_event_id) - ->each(fn (Event $event) => $this->dispatcher->apply($event)); - - // It's possible for an event to mutate state out of order when reconstituting, - // so as a precaution, we'll clear all other states from the store and reload - // them from snapshots as needed in the rest of the request. - // FIXME: We still need to figure this out - // $this->states->reset(); - // $this->remember($state); - } - - return $this; - } - - protected function remember(State $state): State - { - $key = $this->key($state->id, $state::class); - - if ($this->states->get($key) === $state) { - return $state; - } - - if ($this->states->has($key)) { - throw new LogicException('Trying to remember state twice.'); - } - - $this->states->put($key, $state); - - return $state; - } - - protected function key(string|int $id, string $type): string - { - return "{$type}:{$id}"; - } -} diff --git a/src/SingletonState.php b/src/SingletonState.php index a0167f19..718d0508 100644 --- a/src/SingletonState.php +++ b/src/SingletonState.php @@ -4,7 +4,7 @@ use BadMethodCallException; use RuntimeException; -use Thunk\Verbs\Lifecycle\StateManager; +use Thunk\Verbs\State\StateManager; use Thunk\Verbs\Support\StateCollection; abstract class SingletonState extends State @@ -36,7 +36,7 @@ public static function loadByKey($from): static|StateCollection public static function singleton(): static { - return app(StateManager::class)->singleton(static::class); + return app(StateManager::class)->load(static::class, null); } public function resolveRouteBinding($value, $field = null) diff --git a/src/State.php b/src/State.php index 8d243e47..c4e946dc 100644 --- a/src/State.php +++ b/src/State.php @@ -10,7 +10,7 @@ use Symfony\Component\Uid\AbstractUid; use Thunk\Verbs\Contracts\StoresEvents; use Thunk\Verbs\Exceptions\StateNotFoundException; -use Thunk\Verbs\Lifecycle\StateManager; +use Thunk\Verbs\State\StateManager; use Thunk\Verbs\Support\Serializer; use Thunk\Verbs\Support\StateCollection; @@ -77,7 +77,7 @@ public static function load($from): static|StateCollection public static function loadByKey($from): static|StateCollection { - return app(StateManager::class)->load($from, static::class); + return app(StateManager::class)->load(static::class, $from); } protected static function normalizeKey(mixed $from) @@ -96,7 +96,7 @@ public function storedEvents() public function fresh(): static { - return app(StateManager::class)->load($this->id, static::class); + return app(StateManager::class)->load(static::class, $this->id); } public function getRouteKey() diff --git a/src/State/Cache/Contracts/ReadableCache.php b/src/State/Cache/Contracts/ReadableCache.php new file mode 100644 index 00000000..92213e4a --- /dev/null +++ b/src/State/Cache/Contracts/ReadableCache.php @@ -0,0 +1,12 @@ +key($class, $id); + + if ($this->has($class, $id)) { + $this->touch($key); + + return $this->cache[$key]; + } + + return null; + } + + public function put(State $state): State + { + $key = $this->key($state); + + if (isset($this->cache[$key])) { + unset($this->cache[$key]); + } + + $this->cache[$key] = $state; + + return $state; + } + + public function has(string $class, string $id): bool + { + $key = $this->key($class, $id); + + return isset($this->cache[$key]); + } + + public function prune(): static + { + $this->cache = array_slice($this->cache, offset: -1 * $this->capacity, preserve_keys: true); + + return $this; + } + + public function willPrune(): bool + { + return count($this->cache) > $this->capacity; + } + + public function values(): array + { + return $this->cache; + } + + public function reset(): static + { + $this->cache = []; + + return $this; + } + + protected function touch($key): void + { + $value = $this->cache[$key]; + + unset($this->cache[$key]); + + $this->cache[$key] = $value; + } + + protected function key(State|string $type, ?string $id = null): string + { + // Allow passing in state objects. + if ($type instanceof State) { + $id = $type instanceof SingletonState + ? null + : $type->id; + + $type = $type::class; + } + + return "{$type}:{$id}"; + } +} diff --git a/src/State/Cache/MultiCache.php b/src/State/Cache/MultiCache.php new file mode 100644 index 00000000..c2e24561 --- /dev/null +++ b/src/State/Cache/MultiCache.php @@ -0,0 +1,8 @@ +toBase() + ->select(['state_type', 'state_id', DB::raw('max(event_id) as max_event_id')]) + ->where(function ($query) use ($states) { + foreach ($states as $state) { + $query->orWhere(function ($query) use ($state) { + $query->where('state_type', $state::class); + $query->where('state_id', $state->id); + }); + } + }) + ->each(function ($row) { + // TODO: Compare to states + }); + + $summary = AggregateStateSummary::summarize($states); + + $replay = new Replay( + states: new StateManager(new InMemoryCache), // FIXME: Use states from summary + events: $summary->events(), + phases: new Phases(Phase::Apply), + ); + + $replay->handle(); + + // FIXME: Get all states loaded during replay and add them to our cache + + // FIXME return $state; + } +} diff --git a/src/State/StateIdentity.php b/src/State/StateIdentity.php new file mode 100644 index 00000000..05289777 --- /dev/null +++ b/src/State/StateIdentity.php @@ -0,0 +1,35 @@ + $source, + $source instanceof State => new static(state_type: $source::class, state_id: $source->id), + default => static::fromGenericObject($source), + }; + } + + protected static function fromGenericObject(object $source): static + { + $state_id = data_get($source, 'state_id'); + $state_type = data_get($source, 'state_type'); + + if (is_int($state_id) && is_string($state_type)) { + return new static(state_type: $state_type, state_id: $state_id); + } + + throw new InvalidArgumentException('State identity objects must have a "state_id" and "state_type" value.'); + } + + public function __construct( + public readonly string $state_type, + public readonly int|string $state_id, + ) {} +} diff --git a/src/State/StateManager.php b/src/State/StateManager.php new file mode 100644 index 00000000..7f8f6c86 --- /dev/null +++ b/src/State/StateManager.php @@ -0,0 +1,98 @@ +id ??= snowflake_id(); + + return $this->cache->put($state); + } + + /** + * @template S instanceof State + * + * @param class-string $type + * @return S|StateCollection + */ + public function load(string $type, Bits|UuidInterface|AbstractUid|iterable|int|string|null $id): StateCollection|State + { + return is_iterable($id) + ? $this->loadMany($id, $type) + : $this->loadOne($id, $type); + } + + /** + * @template TState of State + * + * @param class-string $type + */ + public function make(string $type, Bits|UuidInterface|AbstractUid|int|string|null $id): State + { + // If we've already instantiated this state, we'll load it + if ($existing = $this->cache->get($type, $id)) { + return $existing; + } + + // State::__construct() auto-registers the state with the StateManager, + // so we need to skip the constructor until we've already set the ID. + /** @var State $state */ + $state = (new ReflectionClass($type))->newInstanceWithoutConstructor(); + $state->id = Id::tryFrom($id) ?? snowflake_id(); + $state->__construct(); + + return $this->cache->put($state); + } + + // @todo - make persistent caches + // public function persist(): bool + // { + // return $this->cache->persist($this->states->values()); + // } + + public function reset(): static + { + $this->cache->reset(); + + return $this; + } + + /** @param class-string $type */ + protected function loadOne(string $type, Bits|UuidInterface|AbstractUid|int|string|null $id = null): State + { + $id = Id::tryFrom($id); + + if ($state = $this->cache->get($type, $id)) { + return $state; + } + + return $this->make($id, $type); + } + + /** @param class-string $type */ + protected function loadMany(iterable $ids, string $type): StateCollection + { + $ids = collect($ids)->map(Id::from(...)); + + return StateCollection::make( + // @todo - add support for getMany() in caches for perf + $ids->map(fn ($id) => $this->cache->get($type, $id)), + ); + } +} diff --git a/src/Support/EventStateRegistry.php b/src/Support/EventStateRegistry.php index 1a956589..53cf4b09 100644 --- a/src/Support/EventStateRegistry.php +++ b/src/Support/EventStateRegistry.php @@ -2,6 +2,7 @@ namespace Thunk\Verbs\Support; +use Illuminate\Contracts\Container\Container; use Illuminate\Support\Arr; use Illuminate\Support\Collection; use InvalidArgumentException; @@ -13,8 +14,9 @@ use ReflectionUnionType; use Thunk\Verbs\Attributes\Autodiscovery\StateDiscoveryAttribute; use Thunk\Verbs\Event; -use Thunk\Verbs\Lifecycle\StateManager; use Thunk\Verbs\State; +use Thunk\Verbs\State\StateManager; +use WeakMap; class EventStateRegistry { @@ -22,11 +24,27 @@ class EventStateRegistry protected array $discovered_properties = []; + protected WeakMap $discovered_states; + public function __construct( - protected StateManager $manager, - ) {} + protected Container $container, + ) { + $this->discovered_states = new WeakMap; + } + + public function reset(): static + { + $this->discovered_states = new WeakMap; + + return $this; + } public function getStates(Event $event): StateCollection + { + return $this->discovered_states[$event] ??= $this->discoverStates($event); + } + + protected function discoverStates(Event $event): StateCollection { $discovered = new StateCollection; $deferred = new StateCollection; @@ -57,7 +75,7 @@ protected function discoverAndPushState(StateDiscoveryAttribute $attribute, Even $states = Arr::wrap( $attribute ->setDiscoveredState($discovered) - ->discoverState($target, $this->manager), + ->discoverState($target, $this->container->make(StateManager::class)), ); $discovered->push(...$states); diff --git a/src/Support/Normalization/StateNormalizer.php b/src/Support/Normalization/StateNormalizer.php index 6156c70c..f4be532b 100644 --- a/src/Support/Normalization/StateNormalizer.php +++ b/src/Support/Normalization/StateNormalizer.php @@ -5,8 +5,8 @@ use InvalidArgumentException; use Symfony\Component\Serializer\Normalizer\DenormalizerInterface; use Symfony\Component\Serializer\Normalizer\NormalizerInterface; -use Thunk\Verbs\Lifecycle\StateManager; use Thunk\Verbs\State; +use Thunk\Verbs\State\StateManager; use Thunk\Verbs\Support\Serializer; class StateNormalizer implements DenormalizerInterface, NormalizerInterface @@ -23,7 +23,11 @@ public function denormalize(mixed $data, string $type, ?string $format = null, a return $data; } - return app(StateManager::class)->load($data, $type); + // $state = new $type; + // $state->id = $data; + // $state->__verbs_initialized = false; + + return app(StateManager::class)->load($type, $data); } public function supportsNormalization(mixed $data, ?string $format = null, array $context = []): bool diff --git a/src/Support/Replay.php b/src/Support/Replay.php new file mode 100644 index 00000000..9f861d33 --- /dev/null +++ b/src/Support/Replay.php @@ -0,0 +1,34 @@ +instance(StateManager::class, $this->states); + + foreach ($this->events as $event) { + Lifecycle::run($event, $this->phases); + } + } finally { + app()->instance(StateManager::class, $global_registry); + } + + return $this; + } +} diff --git a/src/Support/StateInstanceCache.php b/src/Support/StateInstanceCache.php index 58fa300e..4c833365 100644 --- a/src/Support/StateInstanceCache.php +++ b/src/Support/StateInstanceCache.php @@ -6,7 +6,7 @@ class StateInstanceCache { public function __construct( protected int $capacity = 100, - protected array $cache = [], + public array $cache = [], ) {} public function get(string|int $key, mixed $default = null): mixed diff --git a/src/Support/StateReconstructor.php b/src/Support/StateReconstructor.php new file mode 100644 index 00000000..c20dcd58 --- /dev/null +++ b/src/Support/StateReconstructor.php @@ -0,0 +1,61 @@ +container->make(StateManager::class); + $reconstruction_manager = new StateManager( + dispatcher: $this->dispatcher, + snapshots: new NullSnapshotStore, + events: $this->events, + states: new StateInstanceCache, + ); + + $this->container->instance(StateManager::class, $reconstruction_manager); + + try { + $summary = $this->events->summarize($state); + + $this->events + ->get($summary->related_event_ids) + ->each($this->dispatcher->apply(...)); + + foreach ($reconstruction_manager->states() as $state) { + $manager->push($state); + } + } finally { + $this->container->instance(StateManager::class, $original_manager); + } + + return $original_manager->load($state::class, $state->id); + } + + protected function bindNewEmptyStateManager(StateManager $manager) + { + + $temp_manager->is_reconstituting = true; // FIXME + + $temp_registry = new EventStateRegistry($temp_manager); + + app()->instance(StateManager::class, $temp_manager); + app()->instance(EventStateRegistry::class, $temp_registry); + + return [$temp_manager, $temp_registry]; + } +} diff --git a/src/Testing/BrokerFake.php b/src/Testing/BrokerFake.php index 52e1ff76..1744524b 100644 --- a/src/Testing/BrokerFake.php +++ b/src/Testing/BrokerFake.php @@ -8,7 +8,6 @@ use Thunk\Verbs\Contracts\BrokersEvents; use Thunk\Verbs\Contracts\StoresEvents; use Thunk\Verbs\Event; -use Thunk\Verbs\Lifecycle\Broker; use Thunk\Verbs\Lifecycle\BrokerConvenienceMethods; use Thunk\Verbs\Lifecycle\Dispatcher; @@ -21,7 +20,7 @@ class BrokerFake implements BrokersEvents public function __construct( Container $container, public EventStoreFake $store, - public Broker $broker, + public BrokersEvents $broker, ) { // Eventually this will swap out all the necessary fakes and implement // our own versions of fire/commit/replay, but for now this is just a diff --git a/src/Testing/EventStoreFake.php b/src/Testing/EventStoreFake.php index 186b569c..d9fb6523 100644 --- a/src/Testing/EventStoreFake.php +++ b/src/Testing/EventStoreFake.php @@ -13,6 +13,7 @@ use Thunk\Verbs\Contracts\StoresEvents; use Thunk\Verbs\Event; use Thunk\Verbs\Facades\Id; +use Thunk\Verbs\Lifecycle\AggregateStateSummary; use Thunk\Verbs\Lifecycle\MetadataManager; use Thunk\Verbs\SingletonState; use Thunk\Verbs\State; @@ -57,6 +58,17 @@ public function write(array $events): bool return true; } + public function summarize(State ...$states): AggregateStateSummary + { + // FIXME + return new AggregateStateSummary($states[0], collect(), collect(), null, null); + } + + public function get(iterable $ids): LazyCollection + { + return new LazyCollection; + } + /** @return Collection */ public function committed(string $class_name, ?Closure $filter = null): Collection { diff --git a/src/Testing/SnapshotStoreFake.php b/src/Testing/SnapshotStoreFake.php index 0a5ee4bd..e5f8ceae 100644 --- a/src/Testing/SnapshotStoreFake.php +++ b/src/Testing/SnapshotStoreFake.php @@ -60,6 +60,21 @@ public function reset(): bool return true; } + public function delete(Bits|UuidInterface|AbstractUid|int|string ...$ids): bool + { + $ids = array_map(Id::from(...), $ids); + + foreach ($this->states as $type => $states) { + foreach ($states as $id => $state) { + if (in_array($id, $ids)) { + uniqid($this->states[$type][$id]); + } + } + } + + return true; + } + public function assertWritten(string|Closure $state, Closure|int|null $callback = null): static { if ($state instanceof Closure) { diff --git a/src/VerbsServiceProvider.php b/src/VerbsServiceProvider.php index d3f537eb..1e38206d 100644 --- a/src/VerbsServiceProvider.php +++ b/src/VerbsServiceProvider.php @@ -32,12 +32,13 @@ use Thunk\Verbs\Lifecycle\MetadataManager; use Thunk\Verbs\Lifecycle\Queue as EventQueue; use Thunk\Verbs\Lifecycle\SnapshotStore; -use Thunk\Verbs\Lifecycle\StateManager; use Thunk\Verbs\Livewire\SupportVerbs; +use Thunk\Verbs\State\Cache\MultiCache; +use Thunk\Verbs\State\StateManager; use Thunk\Verbs\Support\EventStateRegistry; use Thunk\Verbs\Support\IdManager; use Thunk\Verbs\Support\Serializer; -use Thunk\Verbs\Support\StateInstanceCache; +use Thunk\Verbs\Support\StateReconstructor; use Thunk\Verbs\Support\Wormhole; class VerbsServiceProvider extends PackageServiceProvider @@ -66,17 +67,13 @@ public function packageRegistered() $this->app->scoped(EventStore::class); $this->app->singleton(SnapshotStore::class); $this->app->scoped(EventQueue::class); - $this->app->scoped(EventStateRegistry::class); + $this->app->scoped(EventStateRegistry::class); // FIXME: Pretty sure this should be hidden behind the StateManager $this->app->singleton(MetadataManager::class); + $this->app->singleton(StateReconstructor::class); $this->app->scoped(StateManager::class, function (Container $app) { return new StateManager( - dispatcher: $app->make(Dispatcher::class), - snapshots: $app->make(StoresSnapshots::class), - events: $app->make(StoresEvents::class), - states: new StateInstanceCache( - capacity: $app->make(Repository::class)->get('verbs.state_cache_size', 100) - ), + cache: new MultiCache ); }); diff --git a/tests/Feature/ReplayClassTest.php b/tests/Feature/ReplayClassTest.php new file mode 100644 index 00000000..11846b18 --- /dev/null +++ b/tests/Feature/ReplayClassTest.php @@ -0,0 +1,56 @@ +event)); + $replay = new Replay( + states: new StateManager( + cache: new InMemoryCache + ), + events: $events, + phases: Phases::all() + ); + + $replay->handle(); + + expect($replay->states->cache->values()) + ->toHaveCount(1); + + expect($replay->states->load('1', ReplayClassTestState::class)->count) + ->toBe(10); +}); + +it('can cache and retrieve state across events', function () { + $events = collect(array_fill(0, 10, ReplayClassTestEvent::make(state: 1)->event)); + + $replay = new Replay( + states: new StateManager( + cache: new InMemoryCache + ), + events: $events, + phases: Phases::all() + ); +}); + +class ReplayClassTestEvent extends Event +{ + #[StateId(ReplayClassTestState::class)] // FIXME: Breaks with State type hint + public int $state; + + public function apply(ReplayClassTestState $state) + { + $state->count++; + } +} + +class ReplayClassTestState extends State +{ + public int $count = 0; +} diff --git a/tests/Feature/ReplayCommandTest.php b/tests/Feature/ReplayCommandTest.php index feaf3b27..d8d2db5f 100644 --- a/tests/Feature/ReplayCommandTest.php +++ b/tests/Feature/ReplayCommandTest.php @@ -7,9 +7,9 @@ use Thunk\Verbs\Event; use Thunk\Verbs\Facades\Id; use Thunk\Verbs\Facades\Verbs; -use Thunk\Verbs\Lifecycle\StateManager; use Thunk\Verbs\Models\VerbSnapshot; use Thunk\Verbs\State; +use Thunk\Verbs\State\StateManager; beforeEach(function () { $GLOBALS['replay_test_counts'] = []; @@ -37,11 +37,11 @@ Verbs::commit(); - expect(app(StateManager::class)->load($state1_id, ReplayCommandTestState::class)->count) + expect(app(StateManager::class)->load(ReplayCommandTestState::class, $state1_id)->count) ->toBe(2) ->and($GLOBALS['replay_test_counts'][$state1_id]) ->toBe(2) - ->and(app(StateManager::class)->load($state2_id, ReplayCommandTestState::class)->count) + ->and(app(StateManager::class)->load(ReplayCommandTestState::class, $state2_id)->count) ->toBe(4) ->and($GLOBALS['replay_test_counts'][$state2_id]) ->toBe(4) @@ -54,11 +54,11 @@ config(['app.env' => 'testing']); $this->artisan(ReplayCommand::class); - expect(app(StateManager::class)->load($state1_id, ReplayCommandTestState::class)->count) + expect(app(StateManager::class)->load(ReplayCommandTestState::class, $state1_id)->count) ->toBe(2) ->and($GLOBALS['replay_test_counts'][$state1_id]) ->toBe(2) - ->and(app(StateManager::class)->load($state2_id, ReplayCommandTestState::class)->count) + ->and(app(StateManager::class)->load(ReplayCommandTestState::class, $state2_id)->count) ->toBe(4) ->and($GLOBALS['replay_test_counts'][$state2_id]) ->toBe(4) @@ -72,7 +72,7 @@ Verbs::commit(); - expect(app(StateManager::class)->load($state_id, ReplayCommandTestWormholeState::class)->time->unix()) + expect(app(StateManager::class)->load(ReplayCommandTestWormholeState::class, $state_id)->time->unix()) ->toBe(CarbonImmutable::parse('2024-04-01 12:00:00')->unix()) ->and($GLOBALS['time'][$state_id]->unix()) ->toBe(CarbonImmutable::parse('2024-04-01 12:00:00')->unix()); @@ -83,7 +83,7 @@ config(['app.env' => 'testing']); $this->artisan(ReplayCommand::class); - expect(app(StateManager::class)->load($state_id, ReplayCommandTestWormholeState::class)->time->unix()) + expect(app(StateManager::class)->load(ReplayCommandTestWormholeState::class, $state_id)->time->unix()) ->toBe(CarbonImmutable::parse('2024-04-01 12:00:00')->unix()) ->and($GLOBALS['time'][$state_id]->unix()) ->toBe(CarbonImmutable::parse('2024-04-01 12:00:00')->unix()); diff --git a/tests/Unit/AggregateStateSummaryTest.php b/tests/Unit/AggregateStateSummaryTest.php new file mode 100644 index 00000000..d1abbaac --- /dev/null +++ b/tests/Unit/AggregateStateSummaryTest.php @@ -0,0 +1,120 @@ + $matching_state_id) { + foreach ($matching_event_ids as $matching_event_id) { + VerbStateEvent::insert([ + 'id' => snowflake_id(), + 'event_id' => $matching_event_id, + 'state_id' => $matching_state_id, + 'state_type' => $matching_state_types[$state_index % count($matching_state_types)], + ]); + } + } + + $target_state = new AggregateStateSummaryTestState1; + $target_state->id = 10; + + $summary = AggregateStateSummary::summarize($target_state); + + expect($summary->original_states->all())->toBe([$target_state]) + ->and($summary->related_states)->toHaveCount(5) + ->and($summary->related_event_ids)->toHaveCount(5); + + $related_state_ids = $summary->related_states + ->map(fn (StateIdentity $state) => $state->state_id) + ->sort() + ->toArray(); + + $related_state_types = $summary->related_states + ->map(fn (StateIdentity $state) => $state->state_type) + ->unique() + ->sort() + ->toArray(); + + expect($related_state_ids)->toBe($matching_state_ids) + ->and($related_state_types)->toBe($matching_state_types); +}); + +test('it finds the correct states and events for multiple states', function () { + $matching_state_types = [ + AggregateStateSummaryTestState1::class, + AggregateStateSummaryTestState2::class, + AggregateStateSummaryTestState3::class, + ]; + $matching_state_ids = [10, 11, 12, 13, 14]; + $matching_event_ids = [100, 101, 102, 103, 105]; + + $other_state_types = [ + AggregateStateSummaryTestState4::class, + AggregateStateSummaryTestState5::class, + AggregateStateSummaryTestState6::class, + ]; + $other_state_ids = [20, 21, 22, 23, 24]; + $other_event_ids = [200, 201, 202, 203, 205]; + + foreach ($matching_state_ids as $state_index => $matching_state_id) { + foreach ($matching_event_ids as $matching_event_id) { + VerbStateEvent::insert([ + 'id' => snowflake_id(), + 'event_id' => $matching_event_id, + 'state_id' => $matching_state_id, + 'state_type' => $matching_state_types[$state_index % count($matching_state_types)], + ]); + } + } + + $target_state1 = new AggregateStateSummaryTestState1; + $target_state1->id = 10; + + $target_state2 = new AggregateStateSummaryTestState2; + $target_state2->id = 11; + + $summary = AggregateStateSummary::summarize($target_state1, $target_state2); + + expect($summary->original_states->all())->toBe([$target_state1, $target_state2]) + ->and($summary->related_states)->toHaveCount(5) + ->and($summary->related_event_ids)->toHaveCount(5); + + $related_state_ids = $summary->related_states + ->map(fn (StateIdentity $state) => $state->state_id) + ->sort() + ->toArray(); + + $related_state_types = $summary->related_states + ->map(fn (StateIdentity $state) => $state->state_type) + ->unique() + ->sort() + ->toArray(); + + expect($related_state_ids)->toBe($matching_state_ids) + ->and($related_state_types)->toBe($matching_state_types); +}); + +class AggregateStateSummaryTestState1 extends State {} +class AggregateStateSummaryTestState2 extends State {} +class AggregateStateSummaryTestState3 extends State {} +class AggregateStateSummaryTestState4 extends State {} +class AggregateStateSummaryTestState5 extends State {} +class AggregateStateSummaryTestState6 extends State {} diff --git a/tests/Unit/CollectionNormalizerTest.php b/tests/Unit/CollectionNormalizerTest.php index 4b769faa..cd4a733b 100644 --- a/tests/Unit/CollectionNormalizerTest.php +++ b/tests/Unit/CollectionNormalizerTest.php @@ -7,9 +7,9 @@ use Symfony\Component\Serializer\Encoder\JsonEncoder; use Symfony\Component\Serializer\Normalizer\PropertyNormalizer; use Symfony\Component\Serializer\Serializer as SymfonySerializer; -use Thunk\Verbs\Lifecycle\StateManager; use Thunk\Verbs\SerializedByVerbs; use Thunk\Verbs\State; +use Thunk\Verbs\State\StateManager; use Thunk\Verbs\Support\Normalization\CarbonNormalizer; use Thunk\Verbs\Support\Normalization\CollectionNormalizer; use Thunk\Verbs\Support\Normalization\NormalizeToPropertiesAndClassName; diff --git a/tests/Unit/EventStoreFakeTest.php b/tests/Unit/EventStoreFakeTest.php index b1e89331..3c36b468 100644 --- a/tests/Unit/EventStoreFakeTest.php +++ b/tests/Unit/EventStoreFakeTest.php @@ -4,8 +4,8 @@ use Thunk\Verbs\Contracts\StoresEvents; use Thunk\Verbs\Event; use Thunk\Verbs\Lifecycle\MetadataManager; -use Thunk\Verbs\Lifecycle\StateManager; use Thunk\Verbs\State; +use Thunk\Verbs\State\StateManager; use Thunk\Verbs\Testing\EventStoreFake; it('performs assertions', function () { @@ -76,13 +76,13 @@ app()->instance(StoresEvents::class, $store = new EventStoreFake(app(MetadataManager::class))); $state1 = app(StateManager::class)->load( - 1001, type: EventStoreFakeTestState::class, + id: 1001, ); $state2 = app(StateManager::class)->load( - 1002, type: EventStoreFakeTestState::class, + id: 1002, ); // State IDs = 100X, Event IDs = X0Y (X = state, Y = event) diff --git a/tests/Unit/FactoryTest.php b/tests/Unit/FactoryTest.php index c846be11..a4919603 100644 --- a/tests/Unit/FactoryTest.php +++ b/tests/Unit/FactoryTest.php @@ -1,9 +1,9 @@ counter)->toBe(0) + ->and($state2->counter)->toBe(1); + + StateReconstitutionTestEvent2::fire(state2_id: $state2_id); + StateReconstitutionTestEvent1::fire(state1_id: $state1_id, state2_id: $state2_id); + + expect($state1->counter)->toBe(2) + ->and($state2->counter)->toBe(3); + + Verbs::commit(); + app(StateManager::class)->reset(); + + $state1 = StateReconstitutionTestState1::load($state1_id); + $state2 = StateReconstitutionTestState2::load($state2_id); + + expect($state1->counter)->toBe(2) + ->and($state2->counter)->toBe(3); +}); + +test('partially up-to-date snapshots', function () { + // event 2 increments state 2 + // event 1 adds state 2 + state 1, then increments state 2 + + $event1 = StateReconstitutionTestEvent2::fire(state2_id: 2); // 1=0, 2=1 + $event2 = StateReconstitutionTestEvent2::fire(state2_id: 2); // 1=0, 2=2 + $event3 = StateReconstitutionTestEvent1::fire(state1_id: 1, state2_id: 2); // 1=2, 2=3 + $event4 = StateReconstitutionTestEvent2::fire(state2_id: 2); // 1=2, 2=4 + $event5 = StateReconstitutionTestEvent1::fire(state1_id: 1, state2_id: 2); // 1=6, 2=5 + + dump([$event1->id, $event2->id, $event3->id, $event4->id, $event5->id]); + + Verbs::commit(); + + $state1 = StateReconstitutionTestState1::load(1); + $state2 = StateReconstitutionTestState2::load(2); + + expect($state1->counter)->toBe(6) + ->and($state2->counter)->toBe(5); + + // Reset the snapshots to what they looked like at event 3 + + $snapshot1 = VerbSnapshot::query()->where('state_id', 1)->sole(); + $snapshot1->update([ + 'data' => '{"counter":2}', + 'last_event_id' => $event3->id, + ]); + + $snapshot2 = VerbSnapshot::query()->where('state_id', 2)->sole(); + $snapshot2->update([ + 'data' => '{"counter":3}', + 'last_event_id' => $event3->id, + ]); + + app(StateManager::class)->reset(); + + $state1 = StateReconstitutionTestState1::load(1); + $state2 = StateReconstitutionTestState2::load(2); + + dump($state1); + dump(VerbSnapshot::all()->toArray()); + + expect($state1->counter)->toBe(6); + expect($state2->counter)->toBe(5); +}); + +test('partially deleted snapshots', function () { + StateReconstitutionTestEvent2::fire(state2_id: 2); // 1=null, 2=1 + StateReconstitutionTestEvent2::fire(state2_id: 2); // 1=null, 2=2 + StateReconstitutionTestEvent1::fire(state1_id: 1, state2_id: 2); // 1=2, 2=3 + StateReconstitutionTestEvent2::fire(state2_id: 2); // 1=2, 2=4 + StateReconstitutionTestEvent1::fire(state1_id: 1, state2_id: 2); // 1=6, 2=5 + + Verbs::commit(); + + $state1 = StateReconstitutionTestState1::load(1); + $state2 = StateReconstitutionTestState2::load(2); + + expect($state1->counter)->toBe(6) + ->and($state2->counter)->toBe(5); + + VerbSnapshot::query()->where('state_id', 1)->delete(); + + app(StateManager::class)->reset(); + + $state1 = StateReconstitutionTestState1::load(1); + $state2 = StateReconstitutionTestState2::load(2); + + expect($state1->counter)->toBe(6); + expect($state2->counter)->toBe(5); +}); + +test('partially up-to-date, but out of sync snapshots', function () { + StateReconstitutionTestEvent2::fire(state2_id: 2); // 1=null, 2=1 + $event2 = StateReconstitutionTestEvent2::fire(state2_id: 2); // 1=null, 2=2 + $event3 = StateReconstitutionTestEvent1::fire(state1_id: 1, state2_id: 2); // 1=2, 2=3 + StateReconstitutionTestEvent2::fire(state2_id: 2); // 1=2, 2=4 + StateReconstitutionTestEvent1::fire(state1_id: 1, state2_id: 2); // 1=6, 2=5 + + Verbs::commit(); + + $state1 = StateReconstitutionTestState1::load(1); + $state2 = StateReconstitutionTestState2::load(2); + + expect($state1->counter)->toBe(6) + ->and($state2->counter)->toBe(5); + + $snapshot1 = VerbSnapshot::query()->where('state_id', 1)->sole(); + $snapshot1->update([ + 'data' => '{"counter":2}', + 'last_event_id' => $event3->id, + ]); + + $snapshot2 = VerbSnapshot::query()->where('state_id', 2)->sole(); + $snapshot2->update([ + 'data' => '{"counter":2}', // FIXME: This maybe can't happen? + 'last_event_id' => $event2->id, + ]); + + app(StateManager::class)->reset(); + + // dump('---- RESET ----'); + + $state1 = StateReconstitutionTestState1::load(1); + $state2 = StateReconstitutionTestState2::load(2); + + dump(app(StateManager::class)); + + expect($state1->counter)->toBe(6); + expect($state2->counter)->toBe(5); +}); + +class StateReconstitutionTestState1 extends State +{ + public int $counter = 0; +} + +class StateReconstitutionTestState2 extends State +{ + public int $counter = 0; +} + +class StateReconstitutionTestEvent1 extends \Thunk\Verbs\Event +{ + #[StateId(StateReconstitutionTestState1::class)] + public int $state1_id; + + #[StateId(StateReconstitutionTestState2::class)] + public int $state2_id; + + public function apply(StateReconstitutionTestState1 $state1, StateReconstitutionTestState2 $state2): void + { + dump("[event 1] incrementing \$state1->counter from {$state1->counter} to ({$state1->counter} + {$state2->counter})"); + $state1->counter = $state1->counter + $state2->counter; + dump("[event 1] incrementing \$state2->counter from {$state2->counter} to \$state2->counter++"); + $state2->counter++; + } +} + +class StateReconstitutionTestEvent2 extends \Thunk\Verbs\Event +{ + #[StateId(StateReconstitutionTestState2::class)] + public int $state2_id; + + public function apply(StateReconstitutionTestState2 $state2): void + { + dump("[event 2] incrementing \$state2->counter from {$state2->counter} to \$state2->counter++"); + $state2->counter++; + } +} diff --git a/tests/Unit/SupportUuidsTest.php b/tests/Unit/SupportUuidsTest.php index fbe25631..f0f6472a 100644 --- a/tests/Unit/SupportUuidsTest.php +++ b/tests/Unit/SupportUuidsTest.php @@ -3,8 +3,8 @@ use Illuminate\Support\Facades\Facade; use Illuminate\Support\Str; use Thunk\Verbs\Event; -use Thunk\Verbs\Lifecycle\StateManager; use Thunk\Verbs\State; +use Thunk\Verbs\State\StateManager; use Thunk\Verbs\Support\IdManager; use function Pest\Laravel\artisan; @@ -49,7 +49,7 @@ state: $state, ); - app(StateManager::class)->reset(include_storage: true); + app(StateManager::class)->reset(); $state = UuidState::load($uuid);