Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
ThorstenThiel committed Oct 10, 2023
0 parents commit 4632d0e
Show file tree
Hide file tree
Showing 83 changed files with 4,690 additions and 0 deletions.
49 changes: 49 additions & 0 deletions .github/workflows/dotnet.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# This workflow will build a .NET project
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-net

name: .NET

on:
- push
- pull_request

jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup .NET
uses: actions/setup-dotnet@v3
with:
dotnet-version: 8.x
- name: Build
run: dotnet build
working-directory: ./src

test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup .NET
uses: actions/setup-dotnet@v3
with:
dotnet-version: 8.x
- name: Test
run: dotnet test --no-build --verbosity normal --collect:"XPlat Code Coverage"
working-directory: ./src/Fluss.UnitTest
- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v3
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

format:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup .NET
uses: actions/setup-dotnet@v3
with:
dotnet-version: 8.x
- name: Run dotnet format
run: dotnet format --verify-no-changes
working-directory: ./src
14 changes: 14 additions & 0 deletions src/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Build results
[Dd]ebug/
[Dd]ebugPublic/
[Rr]elease/
[Rr]eleases/
x64/
x86/
build/
bld/
[Bb]in/
[Oo]bj/
msbuild.log
msbuild.err
msbuild.wrn
13 changes: 13 additions & 0 deletions src/.idea/.idea.Fluss/.idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/.idea/.idea.Fluss/.idea/encodings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

183 changes: 183 additions & 0 deletions src/Fluss.HotChocolate/AddExtensionMiddleware.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
using Fluss.Events;
using HotChocolate.AspNetCore.Subscriptions;
using HotChocolate.Execution;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using RequestDelegate = HotChocolate.Execution.RequestDelegate;

namespace Fluss.HotChocolate;

public class AddExtensionMiddleware {
private const string SubsequentRequestMarker = nameof(AddExtensionMiddleware) + ".subsequentRequestMarker";

private readonly RequestDelegate _next;

private readonly IServiceProvider _rootServiceProvider;
private readonly ILogger<AddExtensionMiddleware> _logger;

public AddExtensionMiddleware(
RequestDelegate next,
IServiceProvider rootServiceProvider,
ILogger<AddExtensionMiddleware> logger
) {
_next = next ?? throw new ArgumentNullException(nameof(next));
_rootServiceProvider = rootServiceProvider;
_logger = logger;
}

public async ValueTask InvokeAsync(IRequestContext context) {
await _next.Invoke(context);

if (!context.ContextData.ContainsKey(nameof(UnitOfWork))) {
return;
}

if (true != context.Services.GetRequiredService<IHttpContextAccessor>().HttpContext?.WebSockets
.IsWebSocketRequest) {
return;
}

if (context.Request.Extensions?.ContainsKey(SubsequentRequestMarker) ?? false) {
if (context.Result is QueryResult subsequentQueryResult) {
context.Result = QueryResultBuilder.FromResult(subsequentQueryResult).AddContextData(nameof(UnitOfWork),
context.ContextData[nameof(UnitOfWork)]).Create();
}

return;
}

if (context.Result is QueryResult qr) {
var contextData = new Dictionary<string, object?>(context.ContextData);
// Do not inline; this stores a reference to the request because it is set to null on the context eventually
var contextRequest = context.Request;
context.Result = new ResponseStream(() => LiveResults(contextData, qr, contextRequest));
}
}

private async IAsyncEnumerable<IQueryResult> LiveResults(IReadOnlyDictionary<string, object?>? contextData, QueryResult firstResult, IQueryRequest originalRequest) {
yield return firstResult;

using var serviceScope = _rootServiceProvider.CreateScope();
var serviceProvider = serviceScope.ServiceProvider;

if (contextData == null) {
_logger.LogWarning("Trying to add live results but {ContextData} is null!", nameof(contextData));
yield break;
}

var foundSocketSession = contextData.TryGetValue(nameof(ISocketSession), out var contextSocketSession); // as ISocketSession
var foundOperationId = contextData.TryGetValue("HotChocolate.Execution.Transport.OperationSessionId", out var operationId); // as string

if (!foundSocketSession || !foundOperationId) {
_logger.LogWarning("Trying to add live results but {SocketSession} or {OperationId} is not present in context!", nameof(contextSocketSession), nameof(operationId));
yield break;
}

if (contextSocketSession is not ISocketSession socketSession) {
_logger.LogWarning("{ContextSocketSession} key present in context but not an {ISocketSession}!", contextSocketSession?.GetType().FullName, nameof(ISocketSession));
yield break;
}

while (true) {
if (contextData == null || !contextData.ContainsKey(nameof(UnitOfWork))) {
break;
}

if (contextData[nameof(UnitOfWork)] is not UnitOfWork.UnitOfWork unitOfWork) {
break;
}

var latestPersistedEventVersion = await WaitForChange(
serviceProvider,
unitOfWork.ReadModels
);

if (socketSession.Operations.All(operationSession => operationSession.Id != operationId?.ToString())) {
break;
}

var readOnlyQueryRequest = QueryRequestBuilder
.From(originalRequest)
.AddExtension(SubsequentRequestMarker, SubsequentRequestMarker)
.AddGlobalState(UnitOfWorkParameterExpressionBuilder.PrefillUnitOfWorkVersion,
latestPersistedEventVersion)
.SetServices(serviceProvider)
.Create();

await using var executionResult = await serviceProvider.ExecuteRequestAsync(readOnlyQueryRequest);

if (executionResult is not IQueryResult result) {
break;
}

yield return result;
contextData = executionResult.ContextData;

if (result.Errors?.Count > 0) {
break;
}
}
}

/**
* Returns the received latest persistent event version after a change has occured.
*/
private static async ValueTask<long> WaitForChange(IServiceProvider serviceProvider, IEnumerable<EventListener> eventListeners) {
var currentEventListener = eventListeners.ToList();

var newEventNotifier = serviceProvider.GetRequiredService<NewEventNotifier>();
var newTransientEventNotifier = serviceProvider.GetRequiredService<NewTransientEventNotifier>();
var eventListenerFactory = serviceProvider.GetRequiredService<EventListenerFactory>();

var cancellationTokenSource = new CancellationTokenSource();

var latestPersistedEventVersion = currentEventListener.Min(el => el.Tag.LastSeen);
var latestTransientEventVersion = currentEventListener.Min(el => el.Tag.LastSeenTransient);

var persistedEventTask = Task.Run(async () => {
while (true) {
latestPersistedEventVersion = await newEventNotifier.WaitForEventAfter(latestPersistedEventVersion, cancellationTokenSource.Token);
for (var index = 0; index < currentEventListener.Count; index++) {
var eventListener = currentEventListener[index];
var updatedEventListener = await eventListenerFactory.UpdateTo(eventListener, latestPersistedEventVersion);
if (updatedEventListener.Tag.LastAccepted > eventListener.Tag.LastAccepted) {
return;
}
currentEventListener[index] = updatedEventListener;
}
}
}, cancellationTokenSource.Token);

var transientEventTask = Task.Run(async () => {
while (true) {
var events = (await newTransientEventNotifier.WaitForEventAfter(latestTransientEventVersion, cancellationTokenSource.Token)).ToList();
for (var index = 0; index < currentEventListener.Count; index++) {
var eventListener = currentEventListener[index];
var updatedEventListener = eventListenerFactory.UpdateWithEvents(eventListener, events.ToPagedMemory());
if (updatedEventListener != eventListener) {
return;
}
currentEventListener[index] = updatedEventListener;
}
latestTransientEventVersion = events.Max(el => el.Version);
}
}, cancellationTokenSource.Token);

var completedTask = await Task.WhenAny(persistedEventTask, transientEventTask);
cancellationTokenSource.Cancel();

if (completedTask.IsFaulted) {
throw completedTask.Exception!;
}

return latestPersistedEventVersion;
}
}
Loading

0 comments on commit 4632d0e

Please sign in to comment.