Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: State Reconstructor #160

Draft
wants to merge 33 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
7c246a4
wip
inxilpro Aug 21, 2024
79e69d4
Fix styling
inxilpro Aug 21, 2024
5c43a99
wip
inxilpro Aug 21, 2024
037c450
Merge branch 'state-reconstructor' of https://github.com/hirethunk/ve…
inxilpro Aug 21, 2024
93582b5
Allow EventStateRegistry to reset
inxilpro Aug 21, 2024
ec6082f
Fix styling
inxilpro Aug 21, 2024
25dbc6c
Undo infinite loop
inxilpro Aug 21, 2024
1989b9c
GREEN
inxilpro Aug 21, 2024
712463f
refactoring
inxilpro Aug 21, 2024
64a0819
Merge branch 'state-reconstructor' of https://github.com/hirethunk/ve…
inxilpro Aug 21, 2024
72188a5
One more failing test
inxilpro Aug 21, 2024
129b031
Make test moar bad
inxilpro Aug 21, 2024
064fb3c
Fix styling
inxilpro Aug 21, 2024
f87eb56
wip
inxilpro Aug 21, 2024
1d18e81
Merge branch 'state-reconstructor' of https://github.com/hirethunk/ve…
inxilpro Aug 21, 2024
4edd585
wip
inxilpro Aug 21, 2024
9609c9f
wip
inxilpro Aug 22, 2024
a04e552
WIP
inxilpro Aug 23, 2024
21283eb
wip
inxilpro Aug 26, 2024
07394a1
Merge branch 'main' into state-reconstructor
inxilpro Sep 15, 2024
fe01611
Fix styling
inxilpro Sep 15, 2024
91f3d87
Move singleton status to the state class
inxilpro Sep 19, 2024
fe66df1
A little bit of docs
inxilpro Sep 20, 2024
5cd6c45
Fix styling
inxilpro Sep 20, 2024
bf8ed4e
Merge branch 'better-singletons' into state-reconstructor
inxilpro Sep 24, 2024
3ea993a
Start to refactor
inxilpro Sep 25, 2024
8569f3e
Fix styling
inxilpro Dec 19, 2024
7c3b226
Merge branch 'main' into state-reconstructor
inxilpro Dec 19, 2024
789aa40
Merge branch 'state-reconstructor' of https://github.com/hirethunk/ve…
inxilpro Dec 19, 2024
399288c
wip
inxilpro Dec 19, 2024
a90c509
Merge branch 'main' into state-reconstructor
inxilpro Jan 8, 2025
f266dbd
Merge branch 'main' into state-reconstructor
inxilpro Jan 8, 2025
00d713f
wip
inxilpro Jan 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/Contracts/StoresEvents.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Ramsey\Uuid\UuidInterface;
use Symfony\Component\Uid\AbstractUid;
use Thunk\Verbs\Event;
use Thunk\Verbs\Lifecycle\AggregateStateSummary;
use Thunk\Verbs\State;

interface StoresEvents
Expand All @@ -16,6 +17,10 @@ public function read(
Bits|UuidInterface|AbstractUid|int|string|null $after_id = null,
): LazyCollection;

public function get(iterable $ids): LazyCollection;

/** @param Event[] $events */
public function write(array $events): bool;

public function summarize(State ...$states): AggregateStateSummary;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure this needs to be on the StoresEvents interface. I think maybe it needs access to some store data, but it seems a little odd for it to live here.

}
2 changes: 2 additions & 0 deletions src/Contracts/StoresSnapshots.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ public function loadSingleton(string $type): ?State;
public function write(array $states): bool;

public function reset(): bool;

public function delete(Bits|UuidInterface|AbstractUid|int|string ...$ids): bool;
}
7 changes: 1 addition & 6 deletions src/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
use Thunk\Verbs\Support\EventStateRegistry;
use Thunk\Verbs\Support\PendingEvent;
use Thunk\Verbs\Support\StateCollection;
use WeakMap;

/**
* @method static static fire(...$args)
Expand Down Expand Up @@ -42,11 +41,7 @@ public function metadata(?string $key = null, mixed $default = null): mixed

public function states(): StateCollection
{
// TODO: This is a bit hacky, but is probably OK right now

static $map = new WeakMap;

return $map[$this] ??= app(EventStateRegistry::class)->getStates($this);
return app(EventStateRegistry::class)->getStates($this);
}

/**
Expand Down
90 changes: 90 additions & 0 deletions src/Lifecycle/AggregateStateSummary.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?php

namespace Thunk\Verbs\Lifecycle;

use Illuminate\Database\Eloquent\Builder;
use Illuminate\Support\Collection;
use Thunk\Verbs\Models\VerbStateEvent;
use Thunk\Verbs\State;
use Thunk\Verbs\Support\StateIdentity;

class AggregateStateSummary
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is… obtuse. But the basic point of this class is to find ALL the States and Events that are related to a given set of starting States. We need this because if we're reconstituting state, and it relies on some other related state, we need that state to be in sync.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class FindAllTheThings
{
    public static function leeroyyyyyyJenkins(State ...$states): static
}

{
public static function summarize(State ...$states): static
{
$summary = new static(
original_states: Collection::make($states),
related_event_ids: new Collection,
related_states: Collection::make($states)->map(StateIdentity::from(...)),
);

return $summary->discover();
}

/**
* @param Collection<int, State> $original_states
* @param Collection<int, int> $related_event_ids
* @param Collection<int, StateIdentity> $related_states
*/
public function __construct(
public Collection $original_states = new Collection,
public Collection $related_event_ids = new Collection,
public Collection $related_states = new Collection,
) {}

protected function discover(): static
{
$this->discoverNewEventIds();

do {
$continue = $this->discoverNewStates() && $this->discoverNewEventIds();
} while ($continue);

$this->related_event_ids = $this->related_event_ids->sort();

return $this;
}

protected function discoverNewEventIds(): bool
{
$new_event_ids = VerbStateEvent::query()
->distinct()
->select('event_id')
->whereNotIn('event_id', $this->related_event_ids)
->where(fn (Builder $query) => $this->related_states->each(
fn ($state) => $query->orWhere(fn (Builder $query) => $this->addConstraint($state, $query)))
)
->toBase()
->pluck('event_id');

$this->related_event_ids = $this->related_event_ids->merge($new_event_ids);

return $new_event_ids->isNotEmpty();
}

protected function discoverNewStates(): bool
{
$discovered_states = VerbStateEvent::query()
->orderBy('id')
->distinct()
->select(['state_id', 'state_type'])
->whereIn('event_id', $this->related_event_ids)
->where(fn (Builder $query) => $this->related_states->each(
fn ($state) => $query->whereNot(fn (Builder $query) => $this->addConstraint($state, $query)))
)
->toBase()
->chunkMap(StateIdentity::from(...));

$this->related_states = $this->related_states->merge($discovered_states);

return $discovered_states->isNotEmpty();
}

protected function addConstraint(StateIdentity $state, Builder $query): Builder
{
$query->where('state_type', '=', $state->state_type);
$query->where('state_id', '=', $state->state_id);

return $query;
}
}
31 changes: 26 additions & 5 deletions src/Lifecycle/EventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Glhd\Bits\Bits;
use Illuminate\Database\Eloquent\Builder;
use Illuminate\Database\Query\Builder as BaseBuilder;
use Illuminate\Database\Query\Expression;
use Illuminate\Support\Collection;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\LazyCollection;
Expand Down Expand Up @@ -35,6 +36,15 @@ public function read(
->map(fn (VerbEvent $model) => $model->event());
}

public function get(iterable $ids): LazyCollection
{
return VerbEvent::query()
->whereIn('id', collect($ids))
->lazyById()
->each(fn (VerbEvent $model) => $this->metadata->set($model->event(), $model->metadata()))
->map(fn (VerbEvent $model) => $model->event());
}

public function write(array $events): bool
{
if (empty($events)) {
Expand All @@ -47,6 +57,11 @@ public function write(array $events): bool
&& VerbStateEvent::insert($this->formatRelationshipsForWrite($events));
}

public function summarize(State ...$states): AggregateStateSummary
{
return AggregateStateSummary::summarize(...$states);
}

protected function readEvents(
?State $state,
Bits|UuidInterface|AbstractUid|int|string|null $after_id,
Expand Down Expand Up @@ -78,11 +93,7 @@ protected function guardAgainstConcurrentWrites(array $events): void
$query->select([
'state_type',
'state_id',
DB::raw(sprintf(
'max(%s) as %s',
$query->getGrammar()->wrap('event_id'),
$query->getGrammar()->wrapTable('max_event_id')
)),
$this->aggregateExpression($query, 'event_id', 'max'),
]);

$query->groupBy('state_type', 'state_id');
Expand Down Expand Up @@ -148,4 +159,14 @@ protected function formatRelationshipsForWrite(array $event_objects): array
->values()
->all();
}

protected function aggregateExpression(BaseBuilder $query, string $column, string $function): Expression
{
return DB::raw(sprintf(
'%s(%s) as %s',
$function,
$query->getGrammar()->wrap($column),
$query->getGrammar()->wrapTable("{$function}_{$column}")
));
}
}
38 changes: 38 additions & 0 deletions src/Lifecycle/NullSnapshotStore.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

namespace Thunk\Verbs\Lifecycle;

use Glhd\Bits\Bits;
use Ramsey\Uuid\UuidInterface;
use Symfony\Component\Uid\AbstractUid;
use Thunk\Verbs\Contracts\StoresSnapshots;
use Thunk\Verbs\State;
use Thunk\Verbs\Support\StateCollection;

class NullSnapshotStore implements StoresSnapshots
{
public function load(Bits|UuidInterface|AbstractUid|iterable|int|string $id, string $type): State|StateCollection|null
{
return null;
}

public function loadSingleton(string $type): ?State
{
return null;
}

public function write(array $states): bool
{
return true;
}

public function delete(Bits|UuidInterface|AbstractUid|int|string ...$ids): bool
{
return true;
}

public function reset(): bool
{
return true;
}
}
7 changes: 7 additions & 0 deletions src/Lifecycle/SnapshotStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ public function write(array $states): bool
return true;
}

public function delete(Bits|UuidInterface|AbstractUid|int|string ...$ids): bool
{
$ids = array_map(Id::from(...), $ids);

return VerbSnapshot::whereIn('state_id', $ids)->delete() === true;
}

public function reset(): bool
{
VerbSnapshot::truncate();
Expand Down
87 changes: 72 additions & 15 deletions src/Lifecycle/StateManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@
use Thunk\Verbs\Contracts\StoresSnapshots;
use Thunk\Verbs\Event;
use Thunk\Verbs\Facades\Id;
use Thunk\Verbs\Models\VerbStateEvent;
use Thunk\Verbs\State;
use Thunk\Verbs\Support\EventStateRegistry;
use Thunk\Verbs\Support\StateCollection;
use Thunk\Verbs\Support\StateInstanceCache;
use UnexpectedValueException;

class StateManager
{
protected bool $is_reconstituting = false;

protected bool $is_replaying = false;

public function __construct(
Expand All @@ -42,6 +46,12 @@ public function register(State $state): State
*/
public function load(Bits|UuidInterface|AbstractUid|iterable|int|string $id, string $type): StateCollection|State
{
// FIXME: This was not written to support loading multiple states
// $summary = $this->events->summarize($state);
// if ($summary->out_of_sync) {
// $this->snapshots->delete(...$summary->related_state_ids);
// }

return is_iterable($id)
? $this->loadMany($id, $type)
: $this->loadOne($id, $type);
Expand Down Expand Up @@ -110,6 +120,8 @@ public function setReplaying(bool $replaying): static
public function reset(bool $include_storage = false): static
{
$this->states->reset();
app(EventStateRegistry::class)->reset();

$this->is_replaying = false;

if ($include_storage) {
Expand Down Expand Up @@ -154,7 +166,7 @@ protected function loadOne(Bits|UuidInterface|AbstractUid|int|string $id, string
$this->remember($state);
$this->reconstitute($state);

return $state;
return $this->states->get($key); // FIXME
}

/** @param class-string<State> $type */
Expand All @@ -181,30 +193,75 @@ protected function loadMany(iterable $ids, string $type): StateCollection

// At this point, all the states should be in our cache, so we can just load everything
return StateCollection::make(
$ids->map(fn ($id) => $this->states->get($this->key($id, $type)))
$ids->map(fn ($id) => $this->states->get($this->key($id, $type))),
);
}

protected function reconstitute(State $state): static
{
// When we're replaying, the Broker is in charge of applying the correct events
// to the State, so we only need to do it *outside* of replays.
if (! $this->is_replaying) {
$this->events
->read(state: $state, after_id: $state->last_event_id)
->each(fn (Event $event) => $this->dispatcher->apply($event));

// It's possible for an event to mutate state out of order when reconstituting,
// so as a precaution, we'll clear all other states from the store and reload
// them from snapshots as needed in the rest of the request.
// FIXME: We still need to figure this out
// $this->states->reset();
// $this->remember($state);
// FIXME: Only run this if the state is out of date
if (! $this->needsReconstituting($state)) {
// dump('skipping: everything in sync');
return $this;
}

if (! $this->is_replaying && ! $this->is_reconstituting) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be useful to combine is_replaying and is_reconstituting into a single concept that means "should the state manager try to reconstitute state or not" — I don't know that the distinction matters inside the state manager.

$real_registry = app(EventStateRegistry::class);

try {
$this->is_reconstituting = true;

$summary = $this->events->summarize($state);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably should be able to take multiple states


[$temp_manager] = $this->bindNewEmptyStateManager();

$this->events
->get($summary->related_event_ids)
->each($this->dispatcher->apply(...));

foreach ($temp_manager->states->all() as $key => $state) {
$this->states->put($key, $state);
}

} finally {
$this->is_reconstituting = false;

app()->instance(StateManager::class, $this);
app()->instance(EventStateRegistry::class, $real_registry);
}
}

return $this;
}

protected function needsReconstituting(State $state): bool
{
$max_id = VerbStateEvent::query()
->where('state_id', $state->id)
->where('state_type', $state::class)
->max('event_id');

return $max_id !== $state->last_event_id;
}

protected function bindNewEmptyStateManager()
{
$temp_manager = new StateManager(
dispatcher: $this->dispatcher,
snapshots: new NullSnapshotStore,
events: $this->events,
states: new StateInstanceCache,
);
$temp_manager->is_reconstituting = true; // FIXME

$temp_registry = new EventStateRegistry($temp_manager);

app()->instance(StateManager::class, $temp_manager);
app()->instance(EventStateRegistry::class, $temp_registry);

return [$temp_manager, $temp_registry];
}

protected function remember(State $state): State
{
$key = $this->key($state->id, $state::class);
Expand Down
Loading
Loading