Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Co-Authored-By: Daniel Coulbourne <[email protected]>
Co-Authored-By: Skyler Katz <[email protected]>
  • Loading branch information
3 people committed Jan 13, 2025
1 parent f266dbd commit 00d713f
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 39 deletions.
3 changes: 3 additions & 0 deletions src/Lifecycle/AggregateStateSummary.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ protected function discover(): static
$continue = $this->discoverNewStates() && $this->discoverNewEventIds();
} while ($continue);

$this->related_event_ids = $this->related_event_ids->sort();

return $this;
}

Expand All @@ -63,6 +65,7 @@ protected function discoverNewEventIds(): bool
protected function discoverNewStates(): bool
{
$discovered_states = VerbStateEvent::query()
->orderBy('id')
->distinct()
->select(['state_id', 'state_type'])
->whereIn('event_id', $this->related_event_ids)
Expand Down
38 changes: 38 additions & 0 deletions src/Lifecycle/NullSnapshotStore.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

namespace Thunk\Verbs\Lifecycle;

use Glhd\Bits\Bits;
use Ramsey\Uuid\UuidInterface;
use Symfony\Component\Uid\AbstractUid;
use Thunk\Verbs\Contracts\StoresSnapshots;
use Thunk\Verbs\State;
use Thunk\Verbs\Support\StateCollection;

class NullSnapshotStore implements StoresSnapshots
{
public function load(Bits|UuidInterface|AbstractUid|iterable|int|string $id, string $type): State|StateCollection|null
{
return null;
}

public function loadSingleton(string $type): ?State
{
return null;
}

public function write(array $states): bool
{
return true;
}

public function delete(Bits|UuidInterface|AbstractUid|int|string ...$ids): bool
{
return true;
}

public function reset(): bool
{
return true;
}
}
80 changes: 48 additions & 32 deletions src/Lifecycle/StateManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
use LogicException;
use Ramsey\Uuid\UuidInterface;
use ReflectionClass;
use RuntimeException;
use Symfony\Component\Uid\AbstractUid;
use Thunk\Verbs\Contracts\StoresEvents;
use Thunk\Verbs\Contracts\StoresSnapshots;
use Thunk\Verbs\Event;
use Thunk\Verbs\Facades\Id;
use Thunk\Verbs\Models\VerbStateEvent;
use Thunk\Verbs\State;
use Thunk\Verbs\Support\EventStateRegistry;
use Thunk\Verbs\Support\StateCollection;
Expand Down Expand Up @@ -166,7 +166,7 @@ protected function loadOne(Bits|UuidInterface|AbstractUid|int|string $id, string
$this->remember($state);
$this->reconstitute($state);

return $state;
return $this->states->get($key); // FIXME
}

/** @param class-string<State> $type */
Expand All @@ -193,59 +193,75 @@ protected function loadMany(iterable $ids, string $type): StateCollection

// At this point, all the states should be in our cache, so we can just load everything
return StateCollection::make(
$ids->map(fn ($id) => $this->states->get($this->key($id, $type)))
$ids->map(fn ($id) => $this->states->get($this->key($id, $type))),
);
}

protected function reconstitute(State $state): static
{
// When we're replaying, the Broker is in charge of applying the correct events
// to the State, so we need to skip during replays. Similarly, if we're already
// reconstituting in a recursive call, the root call is responsible for applying
// events, so we should also skip in that case.
// FIXME: Only run this if the state is out of date
if (! $this->needsReconstituting($state)) {
// dump('skipping: everything in sync');
return $this;
}

if (! $this->is_replaying && ! $this->is_reconstituting) {
$real_registry = app(EventStateRegistry::class);

try {
$this->is_reconstituting = true;

$summary = $this->events->summarize($state);

// FIXME: We probably need to re-write all the snapshots after we're done
// FIXME: Swap out existing state manager, push all related states into new state manager
// FIXME: run all the event on them, swap them out

$this->events->get($summary->related_event_ids)
->filter(function (Event $event) {
$last_event_ids = $event->states()
->map(fn (State $state) => $state->last_event_id)
->filter();

$min = $last_event_ids->min() ?? PHP_INT_MIN;
$max = $last_event_ids->max() ?? PHP_INT_MIN;
[$temp_manager] = $this->bindNewEmptyStateManager();

// If all states have had this or future events applied, just ignore them
if ($min >= $event->id && $max >= $event->id) {
return false;
}

// We should never be in a situation where some events are ahead and
// others are behind, so if that's the case we'll throw an exception
if ($max > $event->id && $min <= $event->id) {
throw new RuntimeException('Trying to apply an event to states that are out of sync.');
}

return true;
})
$this->events
->get($summary->related_event_ids)
->each($this->dispatcher->apply(...));

foreach ($temp_manager->states->all() as $key => $state) {
$this->states->put($key, $state);
}

} finally {
$this->is_reconstituting = false;

app()->instance(StateManager::class, $this);
app()->instance(EventStateRegistry::class, $real_registry);
}
}

return $this;
}

protected function needsReconstituting(State $state): bool
{
$max_id = VerbStateEvent::query()
->where('state_id', $state->id)
->where('state_type', $state::class)
->max('event_id');

return $max_id !== $state->last_event_id;
}

protected function bindNewEmptyStateManager()
{
$temp_manager = new StateManager(
dispatcher: $this->dispatcher,
snapshots: new NullSnapshotStore,
events: $this->events,
states: new StateInstanceCache,
);
$temp_manager->is_reconstituting = true; // FIXME

$temp_registry = new EventStateRegistry($temp_manager);

app()->instance(StateManager::class, $temp_manager);
app()->instance(EventStateRegistry::class, $temp_registry);

return [$temp_manager, $temp_registry];
}

protected function remember(State $state): State
{
$key = $this->key($state->id, $state::class);
Expand Down
1 change: 1 addition & 0 deletions src/Support/EventStateRegistry.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public function getStates(Event $event): StateCollection

protected function discoverStates(Event $event): StateCollection
{
dump('Discovering state: '.$event::class." ($event->id)");
$discovered = new StateCollection;
$deferred = new StateCollection;

Expand Down
31 changes: 24 additions & 7 deletions tests/Unit/StateReconstitutionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
* - One of those Event::apply methods requires state1 and state2, so we need to load state2
* - Reconstituting state2 re-runs the same apply method on state2 before also running it on state1
* - Double-apply happens
*
* ALTERNATE TEST?:
*
* - LeftState and RightState
* - IncrementLeftByRight and IncrementRightByLeft
*/

// FIXME: We need to account for partially up-to-date snapshots that only need *some* events applied but not all
Expand Down Expand Up @@ -61,11 +66,16 @@
});

test('partially up-to-date snapshots', function () {
StateReconstitutionTestEvent2::fire(state2_id: 2); // 1=null, 2=1
StateReconstitutionTestEvent2::fire(state2_id: 2); // 1=null, 2=2
// event 2 increments state 2
// event 1 adds state 2 + state 1, then increments state 2

$event1 = StateReconstitutionTestEvent2::fire(state2_id: 2); // 1=0, 2=1
$event2 = StateReconstitutionTestEvent2::fire(state2_id: 2); // 1=0, 2=2
$event3 = StateReconstitutionTestEvent1::fire(state1_id: 1, state2_id: 2); // 1=2, 2=3
StateReconstitutionTestEvent2::fire(state2_id: 2); // 1=2, 2=4
StateReconstitutionTestEvent1::fire(state1_id: 1, state2_id: 2); // 1=6, 2=5
$event4 = StateReconstitutionTestEvent2::fire(state2_id: 2); // 1=2, 2=4
$event5 = StateReconstitutionTestEvent1::fire(state1_id: 1, state2_id: 2); // 1=6, 2=5

dump([$event1->id, $event2->id, $event3->id, $event4->id, $event5->id]);

Verbs::commit();

Expand All @@ -75,6 +85,8 @@
expect($state1->counter)->toBe(6)
->and($state2->counter)->toBe(5);

// Reset the snapshots to what they looked like at event 3

$snapshot1 = VerbSnapshot::query()->where('state_id', 1)->sole();
$snapshot1->update([
'data' => '{"counter":2}',
Expand All @@ -92,6 +104,9 @@
$state1 = StateReconstitutionTestState1::load(1);
$state2 = StateReconstitutionTestState2::load(2);

dump($state1);
dump(VerbSnapshot::all()->toArray());

expect($state1->counter)->toBe(6);
expect($state2->counter)->toBe(5);
});
Expand Down Expand Up @@ -156,6 +171,8 @@
$state1 = StateReconstitutionTestState1::load(1);
$state2 = StateReconstitutionTestState2::load(2);

dump(app(StateManager::class));

expect($state1->counter)->toBe(6);
expect($state2->counter)->toBe(5);
});
Expand All @@ -180,9 +197,9 @@ class StateReconstitutionTestEvent1 extends \Thunk\Verbs\Event

public function apply(StateReconstitutionTestState1 $state1, StateReconstitutionTestState2 $state2): void
{
// dump("[event 1] incrementing \$state1->counter from {$state1->counter} to ({$state1->counter} + {$state2->counter})");
dump("[event 1] incrementing \$state1->counter from {$state1->counter} to ({$state1->counter} + {$state2->counter})");
$state1->counter = $state1->counter + $state2->counter;
// dump("[event 1] incrementing \$state2->counter from {$state2->counter} to \$state2->counter++");
dump("[event 1] incrementing \$state2->counter from {$state2->counter} to \$state2->counter++");
$state2->counter++;
}
}
Expand All @@ -194,7 +211,7 @@ class StateReconstitutionTestEvent2 extends \Thunk\Verbs\Event

public function apply(StateReconstitutionTestState2 $state2): void
{
// dump("[event 2] incrementing \$state2->counter from {$state2->counter} to \$state2->counter++");
dump("[event 2] incrementing \$state2->counter from {$state2->counter} to \$state2->counter++");
$state2->counter++;
}
}

0 comments on commit 00d713f

Please sign in to comment.