diff --git a/src/Lifecycle/AggregateStateSummary.php b/src/Lifecycle/AggregateStateSummary.php index 4479c5b6..24d3d8f0 100644 --- a/src/Lifecycle/AggregateStateSummary.php +++ b/src/Lifecycle/AggregateStateSummary.php @@ -40,6 +40,8 @@ protected function discover(): static $continue = $this->discoverNewStates() && $this->discoverNewEventIds(); } while ($continue); + $this->related_event_ids = $this->related_event_ids->sort(); + return $this; } @@ -63,6 +65,7 @@ protected function discoverNewEventIds(): bool protected function discoverNewStates(): bool { $discovered_states = VerbStateEvent::query() + ->orderBy('id') ->distinct() ->select(['state_id', 'state_type']) ->whereIn('event_id', $this->related_event_ids) diff --git a/src/Lifecycle/NullSnapshotStore.php b/src/Lifecycle/NullSnapshotStore.php new file mode 100644 index 00000000..5d4879c0 --- /dev/null +++ b/src/Lifecycle/NullSnapshotStore.php @@ -0,0 +1,38 @@ +remember($state); $this->reconstitute($state); - return $state; + return $this->states->get($key); // FIXME } /** @param class-string $type */ @@ -193,59 +193,75 @@ protected function loadMany(iterable $ids, string $type): StateCollection // At this point, all the states should be in our cache, so we can just load everything return StateCollection::make( - $ids->map(fn ($id) => $this->states->get($this->key($id, $type))) + $ids->map(fn ($id) => $this->states->get($this->key($id, $type))), ); } protected function reconstitute(State $state): static { - // When we're replaying, the Broker is in charge of applying the correct events - // to the State, so we need to skip during replays. Similarly, if we're already - // reconstituting in a recursive call, the root call is responsible for applying - // events, so we should also skip in that case. + // FIXME: Only run this if the state is out of date + if (! $this->needsReconstituting($state)) { + // dump('skipping: everything in sync'); + return $this; + } if (! $this->is_replaying && ! $this->is_reconstituting) { + $real_registry = app(EventStateRegistry::class); + try { $this->is_reconstituting = true; $summary = $this->events->summarize($state); - // FIXME: We probably need to re-write all the snapshots after we're done - // FIXME: Swap out existing state manager, push all related states into new state manager - // FIXME: run all the event on them, swap them out - - $this->events->get($summary->related_event_ids) - ->filter(function (Event $event) { - $last_event_ids = $event->states() - ->map(fn (State $state) => $state->last_event_id) - ->filter(); - - $min = $last_event_ids->min() ?? PHP_INT_MIN; - $max = $last_event_ids->max() ?? PHP_INT_MIN; + [$temp_manager] = $this->bindNewEmptyStateManager(); - // If all states have had this or future events applied, just ignore them - if ($min >= $event->id && $max >= $event->id) { - return false; - } - - // We should never be in a situation where some events are ahead and - // others are behind, so if that's the case we'll throw an exception - if ($max > $event->id && $min <= $event->id) { - throw new RuntimeException('Trying to apply an event to states that are out of sync.'); - } - - return true; - }) + $this->events + ->get($summary->related_event_ids) ->each($this->dispatcher->apply(...)); + foreach ($temp_manager->states->all() as $key => $state) { + $this->states->put($key, $state); + } + } finally { $this->is_reconstituting = false; + + app()->instance(StateManager::class, $this); + app()->instance(EventStateRegistry::class, $real_registry); } } return $this; } + protected function needsReconstituting(State $state): bool + { + $max_id = VerbStateEvent::query() + ->where('state_id', $state->id) + ->where('state_type', $state::class) + ->max('event_id'); + + return $max_id !== $state->last_event_id; + } + + protected function bindNewEmptyStateManager() + { + $temp_manager = new StateManager( + dispatcher: $this->dispatcher, + snapshots: new NullSnapshotStore, + events: $this->events, + states: new StateInstanceCache, + ); + $temp_manager->is_reconstituting = true; // FIXME + + $temp_registry = new EventStateRegistry($temp_manager); + + app()->instance(StateManager::class, $temp_manager); + app()->instance(EventStateRegistry::class, $temp_registry); + + return [$temp_manager, $temp_registry]; + } + protected function remember(State $state): State { $key = $this->key($state->id, $state::class); diff --git a/src/Support/EventStateRegistry.php b/src/Support/EventStateRegistry.php index 533c992f..77382c4c 100644 --- a/src/Support/EventStateRegistry.php +++ b/src/Support/EventStateRegistry.php @@ -43,6 +43,7 @@ public function getStates(Event $event): StateCollection protected function discoverStates(Event $event): StateCollection { + dump('Discovering state: '.$event::class." ($event->id)"); $discovered = new StateCollection; $deferred = new StateCollection; diff --git a/tests/Unit/StateReconstitutionTest.php b/tests/Unit/StateReconstitutionTest.php index 9cf852f3..53face52 100644 --- a/tests/Unit/StateReconstitutionTest.php +++ b/tests/Unit/StateReconstitutionTest.php @@ -28,6 +28,11 @@ * - One of those Event::apply methods requires state1 and state2, so we need to load state2 * - Reconstituting state2 re-runs the same apply method on state2 before also running it on state1 * - Double-apply happens + * + * ALTERNATE TEST?: + * + * - LeftState and RightState + * - IncrementLeftByRight and IncrementRightByLeft */ // FIXME: We need to account for partially up-to-date snapshots that only need *some* events applied but not all @@ -61,11 +66,16 @@ }); test('partially up-to-date snapshots', function () { - StateReconstitutionTestEvent2::fire(state2_id: 2); // 1=null, 2=1 - StateReconstitutionTestEvent2::fire(state2_id: 2); // 1=null, 2=2 + // event 2 increments state 2 + // event 1 adds state 2 + state 1, then increments state 2 + + $event1 = StateReconstitutionTestEvent2::fire(state2_id: 2); // 1=0, 2=1 + $event2 = StateReconstitutionTestEvent2::fire(state2_id: 2); // 1=0, 2=2 $event3 = StateReconstitutionTestEvent1::fire(state1_id: 1, state2_id: 2); // 1=2, 2=3 - StateReconstitutionTestEvent2::fire(state2_id: 2); // 1=2, 2=4 - StateReconstitutionTestEvent1::fire(state1_id: 1, state2_id: 2); // 1=6, 2=5 + $event4 = StateReconstitutionTestEvent2::fire(state2_id: 2); // 1=2, 2=4 + $event5 = StateReconstitutionTestEvent1::fire(state1_id: 1, state2_id: 2); // 1=6, 2=5 + + dump([$event1->id, $event2->id, $event3->id, $event4->id, $event5->id]); Verbs::commit(); @@ -75,6 +85,8 @@ expect($state1->counter)->toBe(6) ->and($state2->counter)->toBe(5); + // Reset the snapshots to what they looked like at event 3 + $snapshot1 = VerbSnapshot::query()->where('state_id', 1)->sole(); $snapshot1->update([ 'data' => '{"counter":2}', @@ -92,6 +104,9 @@ $state1 = StateReconstitutionTestState1::load(1); $state2 = StateReconstitutionTestState2::load(2); + dump($state1); + dump(VerbSnapshot::all()->toArray()); + expect($state1->counter)->toBe(6); expect($state2->counter)->toBe(5); }); @@ -156,6 +171,8 @@ $state1 = StateReconstitutionTestState1::load(1); $state2 = StateReconstitutionTestState2::load(2); + dump(app(StateManager::class)); + expect($state1->counter)->toBe(6); expect($state2->counter)->toBe(5); }); @@ -180,9 +197,9 @@ class StateReconstitutionTestEvent1 extends \Thunk\Verbs\Event public function apply(StateReconstitutionTestState1 $state1, StateReconstitutionTestState2 $state2): void { - // dump("[event 1] incrementing \$state1->counter from {$state1->counter} to ({$state1->counter} + {$state2->counter})"); + dump("[event 1] incrementing \$state1->counter from {$state1->counter} to ({$state1->counter} + {$state2->counter})"); $state1->counter = $state1->counter + $state2->counter; - // dump("[event 1] incrementing \$state2->counter from {$state2->counter} to \$state2->counter++"); + dump("[event 1] incrementing \$state2->counter from {$state2->counter} to \$state2->counter++"); $state2->counter++; } } @@ -194,7 +211,7 @@ class StateReconstitutionTestEvent2 extends \Thunk\Verbs\Event public function apply(StateReconstitutionTestState2 $state2): void { - // dump("[event 2] incrementing \$state2->counter from {$state2->counter} to \$state2->counter++"); + dump("[event 2] incrementing \$state2->counter from {$state2->counter} to \$state2->counter++"); $state2->counter++; } }