Skip to content

Commit

Permalink
fix: event validation (#1)
Browse files Browse the repository at this point in the history
* fix: event validation

* chore: pipeline
  • Loading branch information
mvarendorff2 authored Mar 15, 2024
1 parent 6c19bc5 commit eebee43
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 8 deletions.
4 changes: 2 additions & 2 deletions src/Fluss.Testing/AggregateTestBed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ namespace Fluss.Testing;
public AggregateTestBed()
{
var validator = new Mock<IRootValidator>();
validator.Setup(v => v.ValidateEvent(It.IsAny<EventEnvelope>()))
.Returns<EventEnvelope>(_ => Task.CompletedTask);
validator.Setup(v => v.ValidateEvent(It.IsAny<EventEnvelope>(), It.IsAny<IReadOnlyList<EventEnvelope>?>()))
.Returns<EventEnvelope, IReadOnlyList<EventEnvelope>?>((_, _) => Task.CompletedTask);
validator.Setup(v => v.ValidateAggregate(It.IsAny<AggregateRoot>(), It.IsAny<UnitOfWork.UnitOfWork>()))
.Returns<AggregateRoot, UnitOfWork.UnitOfWork>((_, _) => Task.CompletedTask);

Expand Down
4 changes: 2 additions & 2 deletions src/Fluss.UnitTest/Core/UnitOfWork/UnitOfWorkTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ public UnitOfWorkTest()
_policies = new List<Policy>();

_validator = new Mock<IRootValidator>(MockBehavior.Strict);
_validator.Setup(v => v.ValidateEvent(It.IsAny<EventEnvelope>()))
.Returns<EventEnvelope>(_ => Task.CompletedTask);
_validator.Setup(v => v.ValidateEvent(It.IsAny<EventEnvelope>(), It.IsAny<IReadOnlyList<EventEnvelope>?>()))
.Returns<EventEnvelope, IReadOnlyList<EventEnvelope>?>((_, _) => Task.CompletedTask);
_validator.Setup(v => v.ValidateAggregate(It.IsAny<AggregateRoot>(), It.IsAny<Fluss.UnitOfWork.UnitOfWork>()))
.Returns<AggregateRoot, Fluss.UnitOfWork.UnitOfWork>((_, _) => Task.CompletedTask);

Expand Down
7 changes: 6 additions & 1 deletion src/Fluss/UnitOfWork/UnitOfWork.Aggregates.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,12 @@ internal async ValueTask CommitInternal()
{
using var activity = FlussActivitySource.Source.StartActivity();

await Task.WhenAll(PublishedEventEnvelopes.Select(envelope => _validator.ValidateEvent(envelope)));
var validatedEnvelopes = new List<EventEnvelope>();
foreach (var envelope in PublishedEventEnvelopes)
{
await _validator.ValidateEvent(envelope, validatedEnvelopes);
validatedEnvelopes.Add(envelope);
}

await _eventRepository.Publish(PublishedEventEnvelopes);
_consistentVersion += PublishedEventEnvelopes.Count;
Expand Down
13 changes: 10 additions & 3 deletions src/Fluss/Validation/RootValidator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Fluss.Core.Validation;

public interface IRootValidator
{
public Task ValidateEvent(EventEnvelope envelope);
public Task ValidateEvent(EventEnvelope envelope, IReadOnlyList<EventEnvelope>? PreviousEnvelopes = null);
public Task ValidateAggregate(AggregateRoot aggregate, Fluss.UnitOfWork.UnitOfWork unitOfWork);
}

Expand Down Expand Up @@ -54,11 +54,18 @@ private void CacheEventValidators(IEnumerable<EventValidator> validators)
}
}

public async Task ValidateEvent(EventEnvelope envelope)
public async Task ValidateEvent(EventEnvelope envelope, IReadOnlyList<EventEnvelope>? previousEnvelopes = null)
{
var unitOfWork = _serviceProvider.GetUserUnitOfWork(envelope.By ?? SystemUser.SystemUserGuid);

var versionedUnitOfWork = unitOfWork.WithPrefilledVersion(envelope.Version - 1);
var willBePublishedEnvelopes = previousEnvelopes ?? new List<EventEnvelope>();

var versionedUnitOfWork = unitOfWork.WithPrefilledVersion(envelope.Version - willBePublishedEnvelopes.Count - 1);
foreach (var willBePublishedEnvelope in willBePublishedEnvelopes)
{
versionedUnitOfWork.PublishedEventEnvelopes.Enqueue(willBePublishedEnvelope);
}

var type = envelope.Event.GetType();

if (!_eventValidators.ContainsKey(type)) return;
Expand Down

0 comments on commit eebee43

Please sign in to comment.