Skip to content

Commit

Permalink
fix: event validation
Browse files Browse the repository at this point in the history
  • Loading branch information
mvarendorff2 committed Mar 15, 2024
1 parent 6c19bc5 commit ccf7d7c
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/Fluss.Testing/AggregateTestBed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace Fluss.Testing;
public AggregateTestBed()
{
var validator = new Mock<IRootValidator>();
validator.Setup(v => v.ValidateEvent(It.IsAny<EventEnvelope>()))
validator.Setup(v => v.ValidateEvent(It.IsAny<EventEnvelope>(), It.IsAny<IReadOnlyList<EventEnvelope>?>()))
.Returns<EventEnvelope>(_ => Task.CompletedTask);
validator.Setup(v => v.ValidateAggregate(It.IsAny<AggregateRoot>(), It.IsAny<UnitOfWork.UnitOfWork>()))
.Returns<AggregateRoot, UnitOfWork.UnitOfWork>((_, _) => Task.CompletedTask);
Expand Down
2 changes: 1 addition & 1 deletion src/Fluss.UnitTest/Core/UnitOfWork/UnitOfWorkTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public UnitOfWorkTest()
_policies = new List<Policy>();

_validator = new Mock<IRootValidator>(MockBehavior.Strict);
_validator.Setup(v => v.ValidateEvent(It.IsAny<EventEnvelope>()))
_validator.Setup(v => v.ValidateEvent(It.IsAny<EventEnvelope>(), It.IsAny<IReadOnlyList<EventEnvelope>?>()))
.Returns<EventEnvelope>(_ => Task.CompletedTask);
_validator.Setup(v => v.ValidateAggregate(It.IsAny<AggregateRoot>(), It.IsAny<Fluss.UnitOfWork.UnitOfWork>()))
.Returns<AggregateRoot, Fluss.UnitOfWork.UnitOfWork>((_, _) => Task.CompletedTask);
Expand Down
6 changes: 5 additions & 1 deletion src/Fluss/UnitOfWork/UnitOfWork.Aggregates.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ internal async ValueTask CommitInternal()
{
using var activity = FlussActivitySource.Source.StartActivity();

await Task.WhenAll(PublishedEventEnvelopes.Select(envelope => _validator.ValidateEvent(envelope)));
var validatedEnvelopes = new List<EventEnvelope>();
foreach (var envelope in PublishedEventEnvelopes) {
await _validator.ValidateEvent(envelope, validatedEnvelopes);
validatedEnvelopes.Add(envelope);
}

await _eventRepository.Publish(PublishedEventEnvelopes);
_consistentVersion += PublishedEventEnvelopes.Count;
Expand Down
12 changes: 9 additions & 3 deletions src/Fluss/Validation/RootValidator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Fluss.Core.Validation;

public interface IRootValidator
{
public Task ValidateEvent(EventEnvelope envelope);
public Task ValidateEvent(EventEnvelope envelope, IReadOnlyList<EventEnvelope>? PreviousEnvelopes = null);
public Task ValidateAggregate(AggregateRoot aggregate, Fluss.UnitOfWork.UnitOfWork unitOfWork);
}

Expand Down Expand Up @@ -54,11 +54,17 @@ private void CacheEventValidators(IEnumerable<EventValidator> validators)
}
}

public async Task ValidateEvent(EventEnvelope envelope)
public async Task ValidateEvent(EventEnvelope envelope, IReadOnlyList<EventEnvelope>? previousEnvelopes = null)
{
var unitOfWork = _serviceProvider.GetUserUnitOfWork(envelope.By ?? SystemUser.SystemUserGuid);

var versionedUnitOfWork = unitOfWork.WithPrefilledVersion(envelope.Version - 1);
var willBePublishedEnvelopes = previousEnvelopes ?? new List<EventEnvelope>();

var versionedUnitOfWork = unitOfWork.WithPrefilledVersion(envelope.Version - willBePublishedEnvelopes.Count - 1);
foreach (var willBePublishedEnvelope in willBePublishedEnvelopes) {
versionedUnitOfWork.PublishedEventEnvelopes.Enqueue(willBePublishedEnvelope);
}

var type = envelope.Event.GetType();

if (!_eventValidators.ContainsKey(type)) return;
Expand Down

0 comments on commit ccf7d7c

Please sign in to comment.