diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml
new file mode 100644
index 0000000..6dcc30e
--- /dev/null
+++ b/.github/workflows/dotnet.yml
@@ -0,0 +1,49 @@
+# This workflow will build a .NET project
+# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-net
+
+name: .NET
+
+on:
+ - push
+ - pull_request
+
+jobs:
+ build:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+ - name: Setup .NET
+ uses: actions/setup-dotnet@v3
+ with:
+ dotnet-version: 8.x
+ - name: Build
+ run: dotnet build
+ working-directory: ./src
+
+ test:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+ - name: Setup .NET
+ uses: actions/setup-dotnet@v3
+ with:
+ dotnet-version: 8.x
+ - name: Test
+ run: dotnet test --no-build --verbosity normal --collect:"XPlat Code Coverage"
+ working-directory: ./src/Fluss.UnitTest
+ - name: Upload coverage reports to Codecov
+ uses: codecov/codecov-action@v3
+ env:
+ CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
+
+ format:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+ - name: Setup .NET
+ uses: actions/setup-dotnet@v3
+ with:
+ dotnet-version: 8.x
+ - name: Run dotnet format
+ run: dotnet format --verify-no-changes
+ working-directory: ./src
diff --git a/src/.gitignore b/src/.gitignore
new file mode 100644
index 0000000..9226fb5
--- /dev/null
+++ b/src/.gitignore
@@ -0,0 +1,14 @@
+# Build results
+[Dd]ebug/
+[Dd]ebugPublic/
+[Rr]elease/
+[Rr]eleases/
+x64/
+x86/
+build/
+bld/
+[Bb]in/
+[Oo]bj/
+msbuild.log
+msbuild.err
+msbuild.wrn
\ No newline at end of file
diff --git a/src/.idea/.idea.Fluss/.idea/.gitignore b/src/.idea/.idea.Fluss/.idea/.gitignore
new file mode 100644
index 0000000..066e84e
--- /dev/null
+++ b/src/.idea/.idea.Fluss/.idea/.gitignore
@@ -0,0 +1,13 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Rider ignored files
+/contentModel.xml
+/.idea.Fluss.iml
+/projectSettingsUpdater.xml
+/modules.xml
+# Editor-based HTTP Client requests
+/httpRequests/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml
diff --git a/src/.idea/.idea.Fluss/.idea/encodings.xml b/src/.idea/.idea.Fluss/.idea/encodings.xml
new file mode 100644
index 0000000..df87cf9
--- /dev/null
+++ b/src/.idea/.idea.Fluss/.idea/encodings.xml
@@ -0,0 +1,4 @@
+
+
+
+
\ No newline at end of file
diff --git a/src/Fluss.HotChocolate/AddExtensionMiddleware.cs b/src/Fluss.HotChocolate/AddExtensionMiddleware.cs
new file mode 100644
index 0000000..d81f988
--- /dev/null
+++ b/src/Fluss.HotChocolate/AddExtensionMiddleware.cs
@@ -0,0 +1,183 @@
+using Fluss.Events;
+using HotChocolate.AspNetCore.Subscriptions;
+using HotChocolate.Execution;
+using Microsoft.AspNetCore.Http;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using RequestDelegate = HotChocolate.Execution.RequestDelegate;
+
+namespace Fluss.HotChocolate;
+
+public class AddExtensionMiddleware {
+ private const string SubsequentRequestMarker = nameof(AddExtensionMiddleware) + ".subsequentRequestMarker";
+
+ private readonly RequestDelegate _next;
+
+ private readonly IServiceProvider _rootServiceProvider;
+ private readonly ILogger _logger;
+
+ public AddExtensionMiddleware(
+ RequestDelegate next,
+ IServiceProvider rootServiceProvider,
+ ILogger logger
+ ) {
+ _next = next ?? throw new ArgumentNullException(nameof(next));
+ _rootServiceProvider = rootServiceProvider;
+ _logger = logger;
+ }
+
+ public async ValueTask InvokeAsync(IRequestContext context) {
+ await _next.Invoke(context);
+
+ if (!context.ContextData.ContainsKey(nameof(UnitOfWork))) {
+ return;
+ }
+
+ if (true != context.Services.GetRequiredService().HttpContext?.WebSockets
+ .IsWebSocketRequest) {
+ return;
+ }
+
+ if (context.Request.Extensions?.ContainsKey(SubsequentRequestMarker) ?? false) {
+ if (context.Result is QueryResult subsequentQueryResult) {
+ context.Result = QueryResultBuilder.FromResult(subsequentQueryResult).AddContextData(nameof(UnitOfWork),
+ context.ContextData[nameof(UnitOfWork)]).Create();
+ }
+
+ return;
+ }
+
+ if (context.Result is QueryResult qr) {
+ var contextData = new Dictionary(context.ContextData);
+ // Do not inline; this stores a reference to the request because it is set to null on the context eventually
+ var contextRequest = context.Request;
+ context.Result = new ResponseStream(() => LiveResults(contextData, qr, contextRequest));
+ }
+ }
+
+ private async IAsyncEnumerable LiveResults(IReadOnlyDictionary? contextData, QueryResult firstResult, IQueryRequest originalRequest) {
+ yield return firstResult;
+
+ using var serviceScope = _rootServiceProvider.CreateScope();
+ var serviceProvider = serviceScope.ServiceProvider;
+
+ if (contextData == null) {
+ _logger.LogWarning("Trying to add live results but {ContextData} is null!", nameof(contextData));
+ yield break;
+ }
+
+ var foundSocketSession = contextData.TryGetValue(nameof(ISocketSession), out var contextSocketSession); // as ISocketSession
+ var foundOperationId = contextData.TryGetValue("HotChocolate.Execution.Transport.OperationSessionId", out var operationId); // as string
+
+ if (!foundSocketSession || !foundOperationId) {
+ _logger.LogWarning("Trying to add live results but {SocketSession} or {OperationId} is not present in context!", nameof(contextSocketSession), nameof(operationId));
+ yield break;
+ }
+
+ if (contextSocketSession is not ISocketSession socketSession) {
+ _logger.LogWarning("{ContextSocketSession} key present in context but not an {ISocketSession}!", contextSocketSession?.GetType().FullName, nameof(ISocketSession));
+ yield break;
+ }
+
+ while (true) {
+ if (contextData == null || !contextData.ContainsKey(nameof(UnitOfWork))) {
+ break;
+ }
+
+ if (contextData[nameof(UnitOfWork)] is not UnitOfWork.UnitOfWork unitOfWork) {
+ break;
+ }
+
+ var latestPersistedEventVersion = await WaitForChange(
+ serviceProvider,
+ unitOfWork.ReadModels
+ );
+
+ if (socketSession.Operations.All(operationSession => operationSession.Id != operationId?.ToString())) {
+ break;
+ }
+
+ var readOnlyQueryRequest = QueryRequestBuilder
+ .From(originalRequest)
+ .AddExtension(SubsequentRequestMarker, SubsequentRequestMarker)
+ .AddGlobalState(UnitOfWorkParameterExpressionBuilder.PrefillUnitOfWorkVersion,
+ latestPersistedEventVersion)
+ .SetServices(serviceProvider)
+ .Create();
+
+ await using var executionResult = await serviceProvider.ExecuteRequestAsync(readOnlyQueryRequest);
+
+ if (executionResult is not IQueryResult result) {
+ break;
+ }
+
+ yield return result;
+ contextData = executionResult.ContextData;
+
+ if (result.Errors?.Count > 0) {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Returns the received latest persistent event version after a change has occured.
+ */
+ private static async ValueTask WaitForChange(IServiceProvider serviceProvider, IEnumerable eventListeners) {
+ var currentEventListener = eventListeners.ToList();
+
+ var newEventNotifier = serviceProvider.GetRequiredService();
+ var newTransientEventNotifier = serviceProvider.GetRequiredService();
+ var eventListenerFactory = serviceProvider.GetRequiredService();
+
+ var cancellationTokenSource = new CancellationTokenSource();
+
+ var latestPersistedEventVersion = currentEventListener.Min(el => el.Tag.LastSeen);
+ var latestTransientEventVersion = currentEventListener.Min(el => el.Tag.LastSeenTransient);
+
+ var persistedEventTask = Task.Run(async () => {
+ while (true) {
+ latestPersistedEventVersion = await newEventNotifier.WaitForEventAfter(latestPersistedEventVersion, cancellationTokenSource.Token);
+
+ for (var index = 0; index < currentEventListener.Count; index++) {
+ var eventListener = currentEventListener[index];
+ var updatedEventListener = await eventListenerFactory.UpdateTo(eventListener, latestPersistedEventVersion);
+
+ if (updatedEventListener.Tag.LastAccepted > eventListener.Tag.LastAccepted) {
+ return;
+ }
+
+ currentEventListener[index] = updatedEventListener;
+ }
+ }
+ }, cancellationTokenSource.Token);
+
+ var transientEventTask = Task.Run(async () => {
+ while (true) {
+ var events = (await newTransientEventNotifier.WaitForEventAfter(latestTransientEventVersion, cancellationTokenSource.Token)).ToList();
+
+ for (var index = 0; index < currentEventListener.Count; index++) {
+ var eventListener = currentEventListener[index];
+ var updatedEventListener = eventListenerFactory.UpdateWithEvents(eventListener, events.ToPagedMemory());
+
+ if (updatedEventListener != eventListener) {
+ return;
+ }
+
+ currentEventListener[index] = updatedEventListener;
+ }
+
+ latestTransientEventVersion = events.Max(el => el.Version);
+ }
+ }, cancellationTokenSource.Token);
+
+ var completedTask = await Task.WhenAny(persistedEventTask, transientEventTask);
+ cancellationTokenSource.Cancel();
+
+ if (completedTask.IsFaulted) {
+ throw completedTask.Exception!;
+ }
+
+ return latestPersistedEventVersion;
+ }
+}
diff --git a/src/Fluss.HotChocolate/AutoGenerateSchema/AutoGenerateSchema.cs b/src/Fluss.HotChocolate/AutoGenerateSchema/AutoGenerateSchema.cs
new file mode 100644
index 0000000..1b60528
--- /dev/null
+++ b/src/Fluss.HotChocolate/AutoGenerateSchema/AutoGenerateSchema.cs
@@ -0,0 +1,133 @@
+using System.ComponentModel;
+using System.Reflection;
+using HotChocolate.Data.Filters;
+using HotChocolate.Execution.Configuration;
+using HotChocolate.Language;
+using Microsoft.Extensions.DependencyInjection;
+
+namespace Fluss.HotChocolate.AutoGenerateSchema;
+
+public static class AutoGenerateSchema {
+ public static IRequestExecutorBuilder AutoGenerateStronglyTypedIds(
+ this IRequestExecutorBuilder requestExecutorBuilder, Assembly assembly) {
+ var typesToGenerateFor = assembly.GetTypes().Where(t =>
+ t.IsValueType && t.CustomAttributes.Any(a =>
+ a.AttributeType == typeof(TypeConverterAttribute)));
+
+ foreach (var type in typesToGenerateFor) {
+ var converterType = GetStronglyTypedIdTypeForType(type);
+ requestExecutorBuilder.BindRuntimeType(type, converterType.MakeGenericType(type));
+ }
+
+ return requestExecutorBuilder;
+ }
+
+ private static Type GetBackingType(Type type) {
+ return type.GetProperty("Value")?.PropertyType ??
+ throw new ArgumentException($"Could not determine backing field type for type {type.Name}");
+ }
+
+ private static Type GetStronglyTypedIdTypeForType(Type type) {
+ var backingType = GetBackingType(type);
+ if (backingType == typeof(long)) {
+ return typeof(StronglyTypedLongIdType<>);
+ }
+
+ if (backingType == typeof(Guid)) {
+ return typeof(StronglyTypedGuidIdType<>);
+ }
+
+ throw new ArgumentException(
+ $"Could not find Type converter for strongly typed IDs with backing type {backingType!.Name}");
+ }
+}
+
+public abstract class StronglyTypedIdType : ScalarType
+ where TId : struct where TScalarType : ScalarType where TNodeType : IValueNode {
+ private readonly TScalarType scalarType;
+
+ protected StronglyTypedIdType(TScalarType scalarType) : base(typeof(TId).Name) {
+ this.scalarType = scalarType;
+ }
+
+ protected override TId ParseLiteral(TNodeType valueSyntax) {
+ var guid = (TCLRType)scalarType.ParseLiteral(valueSyntax)!;
+
+ return (TId)Activator.CreateInstance(typeof(TId), guid)!;
+ }
+
+ protected override TNodeType ParseValue(TId runtimeValue) {
+ return (TNodeType)scalarType.ParseValue(GetInternalValue(runtimeValue));
+ }
+
+ public override IValueNode ParseResult(object? resultValue) {
+ if (resultValue is TId id) {
+ resultValue = GetInternalValue(id);
+ }
+
+ return scalarType.ParseResult(resultValue);
+ }
+
+ private TCLRType GetInternalValue(TId obj) {
+ return (TCLRType)typeof(TId).GetProperty("Value")?.GetMethod?.Invoke(obj, null)!;
+ }
+
+ public override bool TrySerialize(object? runtimeValue, out object? resultValue) {
+ if (runtimeValue is TId id) {
+ resultValue = GetInternalValue(id);
+ return true;
+ }
+
+ return base.TrySerialize(runtimeValue, out resultValue);
+ }
+}
+
+public class StronglyTypedGuidIdType : StronglyTypedIdType where TId : struct {
+ public StronglyTypedGuidIdType() : base(new UuidType('D')) { }
+}
+
+public class StronglyTypedLongIdType : StronglyTypedIdType where TId : struct {
+ public StronglyTypedLongIdType() : base(new LongType()) { }
+}
+
+public class StronglyTypedIdFilterConventionExtension : FilterConventionExtension {
+ protected override void Configure(IFilterConventionDescriptor descriptor) {
+ base.Configure(descriptor);
+
+ var typesToGenerateFor = typeof(TAssemblyReference).Assembly.GetTypes().Where(t =>
+ t.IsValueType && t.CustomAttributes.Any(a =>
+ a.AttributeType == typeof(TypeConverterAttribute)));
+
+
+ foreach (var type in typesToGenerateFor) {
+ var filterInputType = typeof(StronglyTypedGuidIdFilterInput<>).MakeGenericType(type);
+ var nullableType = typeof(Nullable<>).MakeGenericType(type);
+ descriptor.BindRuntimeType(type, filterInputType);
+ descriptor.BindRuntimeType(nullableType, filterInputType);
+ }
+ }
+}
+
+public class StronglyTypedGuidIdFilterInput : StringOperationFilterInputType {
+ /*public override bool TrySerialize(object? runtimeValue, out object? resultValue) {
+ if (runtimeValue is TId id) {
+ resultValue = id.ToString();
+ return true;
+ }
+
+ resultValue = null;
+ return false;
+ }
+
+ public override bool TryDeserialize(object? resultValue, out object? runtimeValue) {
+ var canParseGuid = Guid.TryParse(resultValue?.ToString(), out var parsedGuid);
+ if (!canParseGuid) {
+ runtimeValue = null;
+ return false;
+ }
+
+ var tId = Activator.CreateInstance(typeof(TId), parsedGuid);
+ runtimeValue = tId;
+ return true;
+ }*/
+}
diff --git a/src/Fluss.HotChocolate/BuilderExtensions.cs b/src/Fluss.HotChocolate/BuilderExtensions.cs
new file mode 100644
index 0000000..2999869
--- /dev/null
+++ b/src/Fluss.HotChocolate/BuilderExtensions.cs
@@ -0,0 +1,20 @@
+using Fluss.UnitOfWork;
+using HotChocolate.Execution.Configuration;
+using HotChocolate.Internal;
+using Microsoft.Extensions.DependencyInjection;
+
+namespace Fluss.HotChocolate;
+
+public static class BuilderExtensions {
+ public static IRequestExecutorBuilder AddLiveEventSourcing(this IRequestExecutorBuilder reb) {
+ reb.UseRequest()
+ .RegisterService(ServiceKind.Synchronized);
+
+ reb.Services
+ .AddSingleton()
+ .AddSingleton()
+ .AddSingleton();
+
+ return reb;
+ }
+}
diff --git a/src/Fluss.HotChocolate/Fluss.HotChocolate.csproj b/src/Fluss.HotChocolate/Fluss.HotChocolate.csproj
new file mode 100644
index 0000000..e60a95d
--- /dev/null
+++ b/src/Fluss.HotChocolate/Fluss.HotChocolate.csproj
@@ -0,0 +1,19 @@
+
+
+
+ net8.0
+ enable
+ enable
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Fluss.HotChocolate/NewEventNotifier.cs b/src/Fluss.HotChocolate/NewEventNotifier.cs
new file mode 100644
index 0000000..107a5b3
--- /dev/null
+++ b/src/Fluss.HotChocolate/NewEventNotifier.cs
@@ -0,0 +1,63 @@
+using Fluss.Events;
+using Fluss.Extensions;
+
+namespace Fluss.HotChocolate;
+
+public class NewEventNotifier {
+ private long _knownVersion;
+ private readonly List<(long startedAtVersion, SemaphoreSlim semaphoreSlim)> _listeners = new();
+ private readonly SemaphoreSlim _newEventAvailable = new(0);
+
+ public NewEventNotifier(IBaseEventRepository eventRepository) {
+ _knownVersion = eventRepository.GetLatestVersion().GetResult();
+ eventRepository.NewEvents += EventRepositoryOnNewEvents;
+
+ _ = Task.Run(async () => {
+ while (true) {
+ await _newEventAvailable.WaitAsync();
+ var newVersion = await eventRepository.GetLatestVersion();
+ if (newVersion <= _knownVersion) {
+ continue;
+ }
+
+ lock (this) {
+ _knownVersion = newVersion;
+
+ for (var i = _listeners.Count - 1; i >= 0; i--) {
+ if (_listeners[i].startedAtVersion >= newVersion) {
+ continue;
+ }
+
+ _listeners[i].semaphoreSlim.Release();
+ _listeners.RemoveAt(i);
+ }
+ }
+ }
+ });
+ }
+
+ private void EventRepositoryOnNewEvents(object? sender, EventArgs e) {
+ _newEventAvailable.Release();
+ }
+
+ public async Task WaitForEventAfter(long startedAtVersion, CancellationToken ct = default) {
+ if (_knownVersion > startedAtVersion) {
+ return _knownVersion;
+ }
+
+ SemaphoreSlim semaphore;
+
+ lock (this) {
+ if (_knownVersion > startedAtVersion) {
+ return _knownVersion;
+ }
+
+ semaphore = new SemaphoreSlim(0, 1);
+ _listeners.Add((startedAtVersion, semaphore));
+ }
+
+ await semaphore.WaitAsync(ct);
+
+ return _knownVersion;
+ }
+}
diff --git a/src/Fluss.HotChocolate/NewTransientEventNotifier.cs b/src/Fluss.HotChocolate/NewTransientEventNotifier.cs
new file mode 100644
index 0000000..75139d4
--- /dev/null
+++ b/src/Fluss.HotChocolate/NewTransientEventNotifier.cs
@@ -0,0 +1,77 @@
+using Fluss.Events.TransientEvents;
+
+namespace Fluss.HotChocolate;
+
+public class NewTransientEventNotifier {
+ private readonly List<(long startedAtVersion, TaskCompletionSource> task)> _listeners = new();
+ private readonly TransientEventAwareEventRepository _transientEventRepository;
+
+ private readonly SemaphoreSlim _newEventAvailable = new(0);
+
+ public NewTransientEventNotifier(TransientEventAwareEventRepository transientEventRepository) {
+ _transientEventRepository = transientEventRepository;
+ transientEventRepository.NewTransientEvents += OnNewTransientEvents;
+
+ _ = Task.Run(async () => {
+ while (true) {
+ await _newEventAvailable.WaitAsync();
+ var events = transientEventRepository.GetCurrentTransientEvents();
+
+ lock (this) {
+ for (var i = _listeners.Count - 1; i >= 0; i--) {
+ var listener = _listeners[i];
+ var newEvents = new List();
+ foreach (var memory in events) {
+ foreach (var eventEnvelope in memory.ToArray()) {
+ if (eventEnvelope.Version <= listener.startedAtVersion) {
+ continue;
+ }
+ newEvents.Add((TransientEventEnvelope)eventEnvelope);
+ }
+ }
+
+ // If a listener is re-added before all other ones are handled, there might be a situation where
+ // there are no new events for that listener; in that case we keep it around
+ if (newEvents.Count == 0) continue;
+
+ _listeners[i].task.SetResult(newEvents);
+ _listeners.RemoveAt(i);
+ }
+ }
+ }
+ });
+ }
+
+ private void OnNewTransientEvents(object? sender, EventArgs e) {
+ _newEventAvailable.Release();
+ }
+
+ public async ValueTask> WaitForEventAfter(long startedAtVersion, CancellationToken ct = default) {
+ var events = _transientEventRepository.GetCurrentTransientEvents();
+
+ if (events.LastOrDefault() is { Length: > 0 } lastEventPage && lastEventPage.Span[^1] is { } lastKnown && lastKnown.Version > startedAtVersion) {
+ var newEvents = new List();
+ foreach (var memory in events) {
+ for (var index = 0; index < memory.Span.Length; index++) {
+ var eventEnvelope = memory.Span[index];
+ if (eventEnvelope.Version <= startedAtVersion) {
+ continue;
+ }
+
+ newEvents.Add((TransientEventEnvelope)eventEnvelope);
+ }
+ }
+
+ return newEvents;
+ }
+
+ TaskCompletionSource> task;
+
+ lock (this) {
+ task = new TaskCompletionSource>();
+ _listeners.Add((startedAtVersion, task));
+ }
+
+ return await task.Task.WaitAsync(ct);
+ }
+}
diff --git a/src/Fluss.HotChocolate/UnitOfWorkParameterExpressionBuilder.cs b/src/Fluss.HotChocolate/UnitOfWorkParameterExpressionBuilder.cs
new file mode 100644
index 0000000..5ad1878
--- /dev/null
+++ b/src/Fluss.HotChocolate/UnitOfWorkParameterExpressionBuilder.cs
@@ -0,0 +1,70 @@
+using System.Linq.Expressions;
+using System.Reflection;
+using HotChocolate.Internal;
+using HotChocolate.Resolvers;
+
+namespace Fluss.HotChocolate;
+
+public class UnitOfWorkParameterExpressionBuilder : IParameterExpressionBuilder {
+ public const string PrefillUnitOfWorkVersion = nameof(AddExtensionMiddleware) + ".prefillUnitOfWorkVersion";
+
+ private static readonly MethodInfo GetOrSetGlobalStateUnitOfWorkMethod =
+ typeof(ResolverContextExtensions).GetMethods()
+ .First(m => m.Name == nameof(ResolverContextExtensions.GetOrSetGlobalState))
+ .MakeGenericMethod(typeof(UnitOfWork.UnitOfWork));
+
+ private static readonly MethodInfo GetGlobalStateOrDefaultLongMethod =
+ typeof(ResolverContextExtensions).GetMethods()
+ .First(m => m.Name == nameof(ResolverContextExtensions.GetGlobalStateOrDefault))
+ .MakeGenericMethod(typeof(long?));
+
+ private static readonly MethodInfo ServiceUnitOfWorkMethod =
+ typeof(IPureResolverContext).GetMethods().First(
+ method => method.Name == nameof(IPureResolverContext.Service) &&
+ method.IsGenericMethod)
+ .MakeGenericMethod(typeof(UnitOfWork.UnitOfWork));
+
+ private static readonly MethodInfo GetValueOrDefaultMethod =
+ typeof(CollectionExtensions).GetMethods().First(m => m.Name == nameof(CollectionExtensions.GetValueOrDefault) && m.GetParameters().Length == 2);
+
+ private static readonly MethodInfo WithPrefilledVersionMethod =
+ typeof(UnitOfWork.UnitOfWork).GetMethods(BindingFlags.Instance | BindingFlags.Public)
+ .First(m => m.Name == nameof(UnitOfWork.UnitOfWork.WithPrefilledVersion));
+
+ private static readonly PropertyInfo ContextData =
+ typeof(IHasContextData).GetProperty(
+ nameof(IHasContextData.ContextData))!;
+
+ public bool CanHandle(ParameterInfo parameter) => typeof(UnitOfWork.UnitOfWork) == parameter.ParameterType;
+
+ /*
+ * Produces something like this: context.GetOrSetGlobalState(
+ * nameof(UnitOfWork.UnitOfWork),
+ * _ =>
+ * context
+ * .Service()
+ * .WithPrefilledVersion(
+ * context.GetGlobalState(PrefillUnitOfWorkVersion)
+ * ))!;
+ */
+ public Expression Build(ParameterExpressionBuilderContext builderContext) {
+ var context = builderContext.ResolverContext;
+ var getNewUnitOfWork = Expression.Call(
+ Expression.Call(context, ServiceUnitOfWorkMethod),
+ WithPrefilledVersionMethod,
+ Expression.Call(
+ null,
+ GetGlobalStateOrDefaultLongMethod,
+ context,
+ Expression.Constant(PrefillUnitOfWorkVersion)));
+
+ return Expression.Call(null, GetOrSetGlobalStateUnitOfWorkMethod, context, Expression.Constant(nameof(UnitOfWork)),
+ Expression.Lambda>(
+ getNewUnitOfWork,
+ Expression.Parameter(typeof(string))));
+ }
+
+ public ArgumentKind Kind => ArgumentKind.Custom;
+ public bool IsPure => true;
+ public bool IsDefaultHandler => false;
+}
diff --git a/src/Fluss.PostgreSQL/ActivitySource.cs b/src/Fluss.PostgreSQL/ActivitySource.cs
new file mode 100644
index 0000000..b204eb1
--- /dev/null
+++ b/src/Fluss.PostgreSQL/ActivitySource.cs
@@ -0,0 +1,23 @@
+using OpenTelemetry.Trace;
+
+namespace Fluss.PostgreSQL;
+
+internal class ActivitySource {
+ public static System.Diagnostics.ActivitySource Source { get; } = new(GetName(), GetVersion());
+
+ public static string GetName()
+ => typeof(ActivitySource).Assembly.GetName().Name!;
+
+ private static string GetVersion()
+ => typeof(ActivitySource).Assembly.GetName().Version!.ToString();
+}
+
+public static class TracerProviderBuilderExtensions {
+ public static TracerProviderBuilder AddPostgreSQLESInstrumentation(
+ this TracerProviderBuilder builder) {
+ ArgumentNullException.ThrowIfNull(builder);
+
+ builder.AddSource(ActivitySource.GetName());
+ return builder;
+ }
+}
diff --git a/src/Fluss.PostgreSQL/Fluss.PostgreSQL.csproj b/src/Fluss.PostgreSQL/Fluss.PostgreSQL.csproj
new file mode 100644
index 0000000..46d2792
--- /dev/null
+++ b/src/Fluss.PostgreSQL/Fluss.PostgreSQL.csproj
@@ -0,0 +1,27 @@
+
+
+
+ net8.0
+ enable
+ enable
+
+
+
+
+
+
+
+
+
+
+
+
+ ..\..\..\..\..\.nuget\packages\opentelemetry.api\1.6.0\lib\net6.0\OpenTelemetry.Api.dll
+
+
+
+
+
+
+
+
diff --git a/src/Fluss.PostgreSQL/Migrations/01_AddEvents.cs b/src/Fluss.PostgreSQL/Migrations/01_AddEvents.cs
new file mode 100644
index 0000000..dfea7cf
--- /dev/null
+++ b/src/Fluss.PostgreSQL/Migrations/01_AddEvents.cs
@@ -0,0 +1,29 @@
+using FluentMigrator;
+
+namespace EventSourcing.PostgreSQL.Migrations {
+ [Migration(1)]
+ public class AddEvent : Migration {
+ public override void Up() {
+ Create.Table("Events")
+ .WithColumn("Version").AsInt64().PrimaryKey().Identity()
+ .WithColumn("At").AsDateTimeOffset().NotNullable()
+ .WithColumn("By").AsGuid().Nullable()
+ .WithColumn("Event").AsCustom("jsonb").NotNullable();
+
+ Execute.Sql(@"CREATE FUNCTION notify_events()
+ RETURNS trigger AS
+ $BODY$
+ BEGIN
+ NOTIFY new_event;
+ RETURN NULL;
+ END;
+ $BODY$ LANGUAGE plpgsql;
+
+ CREATE TRIGGER new_event AFTER INSERT ON ""Events"" EXECUTE PROCEDURE notify_events();");
+ }
+
+ public override void Down() {
+ Delete.Table("Events");
+ }
+ }
+}
diff --git a/src/Fluss.PostgreSQL/Migrations/02_VersionConstraintIsDeferrable.cs b/src/Fluss.PostgreSQL/Migrations/02_VersionConstraintIsDeferrable.cs
new file mode 100644
index 0000000..57e7c81
--- /dev/null
+++ b/src/Fluss.PostgreSQL/Migrations/02_VersionConstraintIsDeferrable.cs
@@ -0,0 +1,16 @@
+using FluentMigrator;
+
+namespace EventSourcing.PostgreSQL.Migrations {
+ [Migration(2)]
+ public class VersionConstraintIsDeferrable : Migration {
+ public override void Up() {
+ Execute.Sql(@"ALTER TABLE ""Events"" DROP CONSTRAINT ""PK_Events"";");
+ Execute.Sql(@"ALTER TABLE ""Events"" ADD CONSTRAINT ""PK_Events"" PRIMARY KEY (""Version"") DEFERRABLE INITIALLY IMMEDIATE;");
+ }
+
+ public override void Down() {
+ Execute.Sql(@"ALTER TABLE ""Events"" DROP CONSTRAINT ""PK_Events"";");
+ Execute.Sql(@"ALTER TABLE ""Events"" ADD CONSTRAINT ""PK_Events"" PRIMARY KEY (""Version"") NOT DEFERRABLE;");
+ }
+ }
+}
diff --git a/src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs b/src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs
new file mode 100644
index 0000000..edd852e
--- /dev/null
+++ b/src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs
@@ -0,0 +1,199 @@
+using System.Collections.ObjectModel;
+using System.Data;
+using System.Diagnostics;
+using EventSourcing.PostgreSQL;
+using Fluss.Events;
+using Fluss.Exceptions;
+using Newtonsoft.Json;
+using Newtonsoft.Json.Linq;
+using Npgsql;
+using NpgsqlTypes;
+
+namespace Fluss.PostgreSQL;
+
+public partial class PostgreSQLEventRepository : IBaseEventRepository {
+ private readonly PostgreSQLConfig config;
+ private readonly NpgsqlDataSource dataSource;
+
+ public PostgreSQLEventRepository(PostgreSQLConfig config) {
+ this.config = config;
+
+ var dataSourceBuilder = new NpgsqlDataSourceBuilder(config.ConnectionString);
+ dataSourceBuilder.UseJsonNet(settings: new JsonSerializerSettings {
+ TypeNameHandling = TypeNameHandling.All,
+ TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full,
+ MetadataPropertyHandling =
+ MetadataPropertyHandling.ReadAhead // While this is marked as a performance hit, profiling approves
+ });
+ this.dataSource = dataSourceBuilder.Build();
+ }
+
+ private async ValueTask Publish(IEnumerable envelopes, Func eventExtractor,
+ NpgsqlConnection? conn = null) where TEnvelope : Envelope {
+ using var activity = ActivitySource.Source.StartActivity();
+ activity?.SetTag("EventSourcing.EventRepository", nameof(PostgreSQLEventRepository));
+
+ // await using var connection has the side-effect that our connection passed from the outside is also disposed, so we split this up.
+ await using var freshConnection = dataSource.OpenConnection();
+ var connection = conn ?? freshConnection;
+
+ activity?.AddEvent(new ActivityEvent("Connection open"));
+
+ await using var writer =
+ connection.BeginBinaryImport(
+ @"COPY ""Events"" (""Version"", ""At"", ""By"", ""Event"") FROM STDIN (FORMAT BINARY)");
+
+ activity?.AddEvent(new ActivityEvent("Got Writer"));
+
+ try {
+ foreach (var eventEnvelope in envelopes.OrderBy(e => e.Version)) {
+ // ReSharper disable MethodHasAsyncOverload
+ writer.StartRow();
+ writer.Write(eventEnvelope.Version);
+ writer.Write(eventEnvelope.At);
+ if (eventEnvelope.By != null) {
+ writer.Write(eventEnvelope.By.Value);
+ }
+ else {
+ writer.Write(DBNull.Value);
+ }
+
+ writer.Write(eventExtractor(eventEnvelope), NpgsqlDbType.Jsonb);
+ // ReSharper enable MethodHasAsyncOverload
+ }
+
+ await writer.CompleteAsync();
+ }
+ catch (PostgresException e) {
+ if (e is { SqlState: "23505", TableName: "Events" }) {
+ throw new RetryException();
+ }
+
+ throw;
+ }
+
+ NotifyNewEvents();
+ }
+
+ public async ValueTask Publish(IEnumerable envelopes) {
+ await Publish(envelopes, e => e.Event);
+ }
+
+ private async ValueTask WithReader(long fromExclusive, long toInclusive,
+ Func> action) {
+ await using var connection = dataSource.OpenConnection();
+ await using var cmd =
+ new NpgsqlCommand(
+ """
+ SELECT "Version", "At", "By", "Event" FROM "Events" WHERE "Version" > @from AND "Version" <= @to ORDER BY "Version"
+ """,
+ connection);
+
+ cmd.Parameters.AddWithValue("@from", fromExclusive);
+ cmd.Parameters.AddWithValue("@to", toInclusive);
+ await cmd.PrepareAsync();
+
+ await using var reader = await cmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess);
+
+ var result = await action(reader);
+
+ await reader.CloseAsync();
+ return result;
+ }
+
+ public async ValueTask>> GetEvents(long fromExclusive,
+ long toInclusive) {
+ using var activity = ActivitySource.Source.StartActivity();
+ activity?.SetTag("EventSourcing.EventRepository", nameof(PostgreSQLEventRepository));
+ activity?.SetTag("EventSourcing.EventRequest", $"{fromExclusive}-{toInclusive}");
+
+ return await WithReader(fromExclusive, toInclusive, async reader => {
+ var envelopes = new List();
+
+ while (await reader.ReadAsync()) {
+ envelopes.Add(new EventEnvelope {
+ Version = reader.GetInt64(0),
+ At = reader.GetDateTime(1),
+ By = reader.IsDBNull(2) ? null : reader.GetGuid(2),
+ Event = reader.GetFieldValue(3)
+ });
+ }
+
+ return envelopes.ToPagedMemory();
+ });
+ }
+
+ public async ValueTask> GetRawEvents() {
+ var latestVersion = await GetLatestVersion();
+ return await WithReader(-1, latestVersion, async reader => {
+ var envelopes = new List();
+
+ while (await reader.ReadAsync()) {
+ envelopes.Add(new RawEventEnvelope {
+ Version = reader.GetInt64(0),
+ At = reader.GetDateTime(1),
+ By = reader.IsDBNull(2) ? null : reader.GetGuid(2),
+ RawEvent = reader.GetFieldValue(3),
+ });
+ }
+
+ return envelopes;
+ });
+ }
+
+ public async ValueTask ReplaceEvent(long at, IEnumerable newEnvelopes) {
+ var envelopes = newEnvelopes.ToList();
+
+ await using var connection = dataSource.OpenConnection();
+ await using var transaction = connection.BeginTransaction();
+
+ await using var deleteCommand =
+ new NpgsqlCommand("""DELETE FROM "Events" WHERE "Version" = @at;""", connection);
+ deleteCommand.Parameters.AddWithValue("at", at);
+ await deleteCommand.ExecuteNonQueryAsync();
+
+ if (envelopes.Count != 1) {
+ // Deferring constraints to allow updating the primary key and shifting the versions
+ await using var deferConstraintsCommand =
+ new NpgsqlCommand(@"SET CONSTRAINTS ""PK_Events"" DEFERRED;", connection);
+ await deferConstraintsCommand.ExecuteNonQueryAsync();
+
+ await using var versionUpdateCommand =
+ new NpgsqlCommand(
+ """UPDATE "Events" e SET "Version" = e."Version" + @offset WHERE e."Version" > @at;""",
+ connection);
+
+ versionUpdateCommand.Parameters.AddWithValue("offset", envelopes.Count - 1);
+ versionUpdateCommand.Parameters.AddWithValue("at", at);
+ await versionUpdateCommand.ExecuteNonQueryAsync();
+ }
+
+ await Publish(envelopes, e => e.RawEvent, connection);
+
+ await transaction.CommitAsync();
+ }
+
+ public async ValueTask GetLatestVersion() {
+ using var activity = ActivitySource.Source.StartActivity();
+ activity?.SetTag("EventSourcing.EventRepository", nameof(PostgreSQLEventRepository));
+
+ await using var connection = dataSource.OpenConnection();
+
+ await using var cmd = new NpgsqlCommand("""SELECT MAX("Version") FROM "Events";""", connection);
+ await cmd.PrepareAsync();
+ var scalar = await cmd.ExecuteScalarAsync();
+
+ if (scalar is DBNull) {
+ return -1;
+ }
+
+ return (long)(scalar ?? -1);
+ }
+
+ public void Dispose() {
+ if (!_cancellationTokenSource.IsCancellationRequested) {
+ _cancellationTokenSource.Cancel();
+ _cancellationTokenSource.Dispose();
+ }
+ }
+}
diff --git a/src/Fluss.PostgreSQL/PostgreSQLEventRepositorySubscriptions.cs b/src/Fluss.PostgreSQL/PostgreSQLEventRepositorySubscriptions.cs
new file mode 100644
index 0000000..72ae7be
--- /dev/null
+++ b/src/Fluss.PostgreSQL/PostgreSQLEventRepositorySubscriptions.cs
@@ -0,0 +1,53 @@
+using Npgsql;
+
+namespace Fluss.PostgreSQL;
+
+public partial class PostgreSQLEventRepository : IDisposable {
+ private readonly CancellationTokenSource _cancellationTokenSource = new();
+ private EventHandler? _newEvents;
+
+ private bool _triggerInitialized;
+
+ public event EventHandler NewEvents {
+ add {
+ _newEvents += value;
+#pragma warning disable 4014
+ InitializeTrigger();
+#pragma warning restore 4014
+ }
+
+ remove {
+ _newEvents -= value;
+ }
+ }
+
+ private async Task InitializeTrigger() {
+ if (_triggerInitialized) {
+ return;
+ }
+
+ _triggerInitialized = true;
+ await using var listenConnection = dataSource.CreateConnection();
+ await listenConnection.OpenAsync(_cancellationTokenSource.Token);
+
+ listenConnection.Notification += (_, _) => {
+ NotifyNewEvents();
+ };
+
+ await using var listen = new NpgsqlCommand(@"LISTEN new_event", listenConnection);
+ await listen.ExecuteNonQueryAsync(_cancellationTokenSource.Token);
+
+ while (!_cancellationTokenSource.Token.IsCancellationRequested) {
+ await listenConnection.WaitAsync(_cancellationTokenSource.Token);
+ }
+
+ await using var unlisten = new NpgsqlCommand(@"UNLISTEN new_event", listenConnection);
+ await unlisten.ExecuteNonQueryAsync(new CancellationToken());
+ }
+
+ private async void NotifyNewEvents() {
+ await Task.Run(() => {
+ _newEvents?.Invoke(this, EventArgs.Empty);
+ });
+ }
+}
diff --git a/src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs b/src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs
new file mode 100644
index 0000000..a027a5f
--- /dev/null
+++ b/src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs
@@ -0,0 +1,124 @@
+using System.Reflection;
+using FluentMigrator.Runner;
+using Fluss.Core;
+using Fluss.Events;
+using Fluss.Upcasting;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace Fluss.PostgreSQL;
+
+public static class ServiceCollectionExtensions {
+ public static IServiceCollection AddPostgresEventSourcingRepository(this IServiceCollection services,
+ string connectionString, Assembly? upcasterSourceAssembly = null) {
+ if (services is null) {
+ throw new ArgumentNullException(nameof(services));
+ }
+
+ if (upcasterSourceAssembly is not null) {
+ services
+ .AddUpcasters(upcasterSourceAssembly)
+ .AddHostedService();
+ }
+
+ return services
+ .AddScoped()
+ .AddFluentMigratorCore()
+ .ConfigureRunner(rb => rb
+ .AddPostgres()
+ .WithGlobalConnectionString(connectionString)
+ .ScanIn(typeof(Fluss.PostgreSQL.PostgreSQLEventRepository).Assembly).For.Migrations())
+ .AddLogging(lb => lb.AddFluentMigratorConsole())
+ .AddSingleton(new PostgreSQLConfig(connectionString))
+ .AddSingleton()
+ .AddHostedService(sp => sp.GetRequiredService());
+ }
+}
+
+public class Migrator : BackgroundService {
+ private readonly ILogger _logger;
+ private readonly IMigrationRunner _migrationRunner;
+ private bool _didFinish;
+
+ private readonly SemaphoreSlim _didFinishChanged = new(0, 1);
+
+ public Migrator(IMigrationRunner migrationRunner, ILogger logger) {
+ _migrationRunner = migrationRunner;
+ _logger = logger;
+ }
+
+ public async Task WaitForFinish() {
+ while (true) {
+ if (_didFinish) {
+ return;
+ }
+ await _didFinishChanged.WaitAsync();
+ if (_didFinish) {
+ _didFinishChanged.Release();
+ return;
+ }
+ }
+ }
+
+ protected override Task ExecuteAsync(CancellationToken stoppingToken) {
+ return Task.Run(() => {
+ try {
+ Migrate();
+ }
+ catch (Exception e) {
+ _logger.LogError(e, "Error while migrating");
+ }
+ }, stoppingToken);
+ }
+
+ public void Migrate() {
+ //_migrationRunner.ListMigrations();
+ try {
+ _migrationRunner.MigrateUp();
+ }
+ catch {
+ Environment.Exit(-1);
+ }
+
+ _didFinish = true;
+ _didFinishChanged.Release();
+ }
+}
+
+public class Upcaster : BackgroundService {
+ private readonly EventUpcasterService _upcasterService;
+ private readonly Migrator _migrator;
+ private readonly ILogger _logger;
+
+ public Upcaster(EventUpcasterService upcasterService, Migrator migrator, ILogger logger) {
+ _upcasterService = upcasterService;
+ _migrator = migrator;
+ _logger = logger;
+ }
+
+ protected override Task ExecuteAsync(CancellationToken stoppingToken) {
+ return Task.Run(async () => {
+ _logger.LogInformation("Waiting for migration to finish");
+ await _migrator.WaitForFinish();
+ _logger.LogInformation("Migration finished, starting event upcasting");
+
+ try {
+ await _upcasterService.Run();
+ _logger.LogInformation("Event upcasting finished");
+ }
+ catch (Exception e) {
+ _logger.LogError(e, "Error while upcasting");
+ throw;
+ }
+ }, stoppingToken);
+ }
+}
+
+public class PostgreSQLConfig {
+ public PostgreSQLConfig(string connectionString) {
+ ConnectionString = connectionString;
+ }
+
+ public string ConnectionString { get; }
+}
diff --git a/src/Fluss.Testing/AggregateTestBed.cs b/src/Fluss.Testing/AggregateTestBed.cs
new file mode 100644
index 0000000..b1045b5
--- /dev/null
+++ b/src/Fluss.Testing/AggregateTestBed.cs
@@ -0,0 +1,88 @@
+using System.Reflection;
+using Fluss.Aggregates;
+using Fluss.Authentication;
+using Fluss.Core.Validation;
+using Fluss.Events;
+using Fluss.Extensions;
+using Moq;
+using Xunit;
+
+namespace Fluss.Testing;
+
+public class AggregateTestBed : EventTestBed where TAggregate : AggregateRoot, new() {
+ private readonly UnitOfWork.UnitOfWork _unitOfWork;
+ private readonly IList _ignoredTypes = new List();
+
+ public AggregateTestBed() {
+ var validator = new Mock();
+ validator.Setup(v => v.ValidateEvent(It.IsAny()))
+ .Returns(_ => Task.CompletedTask);
+ validator.Setup(v => v.ValidateAggregate(It.IsAny(), It.IsAny()))
+ .Returns((_, _) => Task.CompletedTask);
+
+ _unitOfWork = new UnitOfWork.UnitOfWork(EventRepository, EventListenerFactory, new[] { new AllowAllPolicy() },
+ new UserIdProvider(_ => Guid.Empty, null!), validator.Object);
+ }
+
+ public AggregateTestBed Calling(Func action) {
+ action(_unitOfWork).GetAwaiter().GetResult();
+ return this;
+ }
+
+ public AggregateTestBed Calling(TKey key, Func action) {
+ var aggregate = _unitOfWork.GetAggregate(key).GetResult();
+ action(aggregate).GetAwaiter().GetResult();
+ return this;
+ }
+
+ public AggregateTestBed Ignoring() {
+ _ignoredTypes.Add(typeof(TIgnoreType));
+
+ return this;
+ }
+
+ public void ResultsIn(params Event[] expectedEvents) {
+ var publishedEvents = _unitOfWork.PublishedEventEnvelopes.Select(ee => ee.Event).ToArray();
+
+ if (expectedEvents.Length == publishedEvents.Length) {
+ for (int i = 0; i < expectedEvents.Length; i++) {
+ expectedEvents[i] = GetEventRespectingIgnoredTypes(expectedEvents[i], publishedEvents[i]);
+ }
+ }
+
+ Assert.Equal(expectedEvents, publishedEvents);
+ }
+
+ private Event GetEventRespectingIgnoredTypes(Event expected, Event published) {
+ if (expected.GetType() != published.GetType()) {
+ return expected;
+ }
+
+ var cloneMethod = expected.GetType().GetMethod("$");
+ var exp = (Event)cloneMethod!.Invoke(expected, Array.Empty