From 7b90a96576f8e0ab0bd71f81791337dfaaebef73 Mon Sep 17 00:00:00 2001 From: Rob Janssen Date: Fri, 26 Apr 2024 15:40:18 +0200 Subject: [PATCH] Lower framework support (#9) This allows even .Net Framework developers to use this library. I had to add one preprocessor directive in `InMemoryOutboxBackend.cs` because in .NetStandard 2.0 the ConcurrentQueue doesn't support the `.Clear()` method yet so I had to "DIY" the clear. I'm not sure if this needs to be an atomic operation but if it turns out it needs to be then we can implement a lock or something. --- .../Contrib.KafkaFlow.Outbox.Postgres.csproj | 30 +++++++++------- .../Contrib.KafkaFlow.Outbox.SqlServer.csproj | 3 +- .../Contrib.KafkaFlow.Outbox.csproj | 35 +++++++++++-------- .../InMemory/InMemoryOutboxBackend.cs | 14 +++++--- ....KafkaFlow.ProcessManagers.Postgres.csproj | 30 +++++++++------- ...KafkaFlow.ProcessManagers.SqlServer.csproj | 7 +++- .../Contrib.KafkaFlow.ProcessManagers.csproj | 28 +++++++++------ .../InMemory/InMemoryProcessStateStore.cs | 22 +++++------- .../ProcessManagerConfigurationBuilder.cs | 2 +- .../Contrib.KafkaFlow.SqlServer.csproj | 6 +++- ...ow.ProcessManagers.IntegrationTests.csproj | 6 ++-- 11 files changed, 109 insertions(+), 74 deletions(-) diff --git a/src/Contrib.KafkaFlow.Outbox.Postgres/Contrib.KafkaFlow.Outbox.Postgres.csproj b/src/Contrib.KafkaFlow.Outbox.Postgres/Contrib.KafkaFlow.Outbox.Postgres.csproj index f967ff9..7535b86 100644 --- a/src/Contrib.KafkaFlow.Outbox.Postgres/Contrib.KafkaFlow.Outbox.Postgres.csproj +++ b/src/Contrib.KafkaFlow.Outbox.Postgres/Contrib.KafkaFlow.Outbox.Postgres.csproj @@ -1,19 +1,23 @@ - - net8.0 - enable - enable - KafkaFlow.Outbox.Postgres - + + netstandard2.0 + enable + enable + KafkaFlow.Outbox.Postgres + latest + true + latest - - - + - - - - + + + + + + + + diff --git a/src/Contrib.KafkaFlow.Outbox.SqlServer/Contrib.KafkaFlow.Outbox.SqlServer.csproj b/src/Contrib.KafkaFlow.Outbox.SqlServer/Contrib.KafkaFlow.Outbox.SqlServer.csproj index 90e79f0..83daef0 100644 --- a/src/Contrib.KafkaFlow.Outbox.SqlServer/Contrib.KafkaFlow.Outbox.SqlServer.csproj +++ b/src/Contrib.KafkaFlow.Outbox.SqlServer/Contrib.KafkaFlow.Outbox.SqlServer.csproj @@ -1,7 +1,7 @@  - net8.0 + netstandard2.0 enable enable latest @@ -17,6 +17,7 @@ + diff --git a/src/Contrib.KafkaFlow.Outbox/Contrib.KafkaFlow.Outbox.csproj b/src/Contrib.KafkaFlow.Outbox/Contrib.KafkaFlow.Outbox.csproj index 8680311..f22646c 100644 --- a/src/Contrib.KafkaFlow.Outbox/Contrib.KafkaFlow.Outbox.csproj +++ b/src/Contrib.KafkaFlow.Outbox/Contrib.KafkaFlow.Outbox.csproj @@ -1,18 +1,25 @@ - + - - net8.0 - enable - enable - KafkaFlow.Outbox - - + + netstandard2.0;netstandard2.1 + enable + enable + KafkaFlow.Outbox + latest + true + latest + + - - - - - - + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + diff --git a/src/Contrib.KafkaFlow.Outbox/InMemory/InMemoryOutboxBackend.cs b/src/Contrib.KafkaFlow.Outbox/InMemory/InMemoryOutboxBackend.cs index f53eaac..413fb71 100644 --- a/src/Contrib.KafkaFlow.Outbox/InMemory/InMemoryOutboxBackend.cs +++ b/src/Contrib.KafkaFlow.Outbox/InMemory/InMemoryOutboxBackend.cs @@ -1,5 +1,5 @@ -using System.Collections.Concurrent; using Confluent.Kafka; +using System.Collections.Concurrent; namespace KafkaFlow.Outbox.InMemory; @@ -10,7 +10,7 @@ public sealed class InMemoryOutboxBackend : IOutboxBackend public ValueTask Store(TopicPartition topicPartition, Message message, CancellationToken token = default) { _queue.Enqueue((topicPartition, message)); - return ValueTask.CompletedTask; + return default; } public ValueTask Read(int batchSize, CancellationToken token = default) @@ -23,13 +23,17 @@ public ValueTask Read(int batchSize, CancellationToken token = d .Select(x => x!) .ToArray(); - return ValueTask.FromResult(batch); + return new ValueTask(batch); } public ValueTask Purge() { - _queue.Clear(); - return ValueTask.CompletedTask; +#if NETSTANDARD2_1_OR_GREATER + _queue.Clear(); // Unfortunately .NetStandard 2.1+ is required for this +#else + while (_queue.TryDequeue(out _)) { } // So we have to do it this way +#endif + return default; } public ValueTask GetAll() => Read(int.MaxValue); diff --git a/src/Contrib.KafkaFlow.ProcessManagers.Postgres/Contrib.KafkaFlow.ProcessManagers.Postgres.csproj b/src/Contrib.KafkaFlow.ProcessManagers.Postgres/Contrib.KafkaFlow.ProcessManagers.Postgres.csproj index 443739b..852e25b 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers.Postgres/Contrib.KafkaFlow.ProcessManagers.Postgres.csproj +++ b/src/Contrib.KafkaFlow.ProcessManagers.Postgres/Contrib.KafkaFlow.ProcessManagers.Postgres.csproj @@ -1,19 +1,23 @@ - - net8.0 - enable - enable - KafkaFlow.ProcessManagers.Postgres - + + netstandard2.0 + enable + enable + KafkaFlow.ProcessManagers.Postgres + latest + true + latest - - - + - - - - + + + + + + + + diff --git a/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/Contrib.KafkaFlow.ProcessManagers.SqlServer.csproj b/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/Contrib.KafkaFlow.ProcessManagers.SqlServer.csproj index ce5ca51..d9fcceb 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/Contrib.KafkaFlow.ProcessManagers.SqlServer.csproj +++ b/src/Contrib.KafkaFlow.ProcessManagers.SqlServer/Contrib.KafkaFlow.ProcessManagers.SqlServer.csproj @@ -1,7 +1,7 @@  - net8.0 + netstandard2.0 enable enable latest @@ -17,6 +17,11 @@ + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + diff --git a/src/Contrib.KafkaFlow.ProcessManagers/Contrib.KafkaFlow.ProcessManagers.csproj b/src/Contrib.KafkaFlow.ProcessManagers/Contrib.KafkaFlow.ProcessManagers.csproj index bd00a0c..0641584 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers/Contrib.KafkaFlow.ProcessManagers.csproj +++ b/src/Contrib.KafkaFlow.ProcessManagers/Contrib.KafkaFlow.ProcessManagers.csproj @@ -1,15 +1,23 @@ - - net8.0 - enable - enable - KafkaFlow.ProcessManagers - + + netstandard2.0 + enable + enable + KafkaFlow.ProcessManagers + latest + true + latest + - - - - + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + diff --git a/src/Contrib.KafkaFlow.ProcessManagers/InMemory/InMemoryProcessStateStore.cs b/src/Contrib.KafkaFlow.ProcessManagers/InMemory/InMemoryProcessStateStore.cs index 4bd7882..47874a0 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers/InMemory/InMemoryProcessStateStore.cs +++ b/src/Contrib.KafkaFlow.ProcessManagers/InMemory/InMemoryProcessStateStore.cs @@ -16,27 +16,23 @@ public ValueTask Persist(Type processType, Guid processId, VersionedState state) { Store.AddOrUpdate((processType, processId), state, (_, currentState) => { - if (currentState.Version != state.Version) - throw new OptimisticConcurrencyException(processType, processId, - $"Concurrency error when persisting state {processType.FullName}"); - - return state; + return currentState.Version != state.Version + ? throw new OptimisticConcurrencyException(processType, processId, + $"Concurrency error when persisting state {processType.FullName}") + : state; }); } - return ValueTask.CompletedTask; + return default; } - public ValueTask Load(Type processType, Guid processId) - { - return Store.TryGetValue((processType, processId), out var state) - ? ValueTask.FromResult(state) - : ValueTask.FromResult(new VersionedState(0, default)); - } + public ValueTask Load(Type processType, Guid processId) => Store.TryGetValue((processType, processId), out var state) + ? new ValueTask(state) + : new ValueTask(new VersionedState(0, default)); public ValueTask Delete(Type processType, Guid processId, int version) { Store.TryRemove((processType, processId), out _); - return ValueTask.CompletedTask; + return default; } } diff --git a/src/Contrib.KafkaFlow.ProcessManagers/ProcessManagerConfigurationBuilder.cs b/src/Contrib.KafkaFlow.ProcessManagers/ProcessManagerConfigurationBuilder.cs index f09880a..b1d8c98 100644 --- a/src/Contrib.KafkaFlow.ProcessManagers/ProcessManagerConfigurationBuilder.cs +++ b/src/Contrib.KafkaFlow.ProcessManagers/ProcessManagerConfigurationBuilder.cs @@ -109,7 +109,7 @@ from messageType in GetMessageTypes(processType) group processType by messageType) .ToDictionary(x => x.Key, x => x.ToList()); - var mapping = new HandlerTypeMapping(maps.AsReadOnly()); + var mapping = new HandlerTypeMapping(maps); foreach (var processType in _processManagers) { diff --git a/src/Contrib.KafkaFlow.SqlServer/Contrib.KafkaFlow.SqlServer.csproj b/src/Contrib.KafkaFlow.SqlServer/Contrib.KafkaFlow.SqlServer.csproj index 54e5625..bf63736 100644 --- a/src/Contrib.KafkaFlow.SqlServer/Contrib.KafkaFlow.SqlServer.csproj +++ b/src/Contrib.KafkaFlow.SqlServer/Contrib.KafkaFlow.SqlServer.csproj @@ -1,7 +1,7 @@  - net8.0 + netstandard2.0 enable enable latest @@ -11,6 +11,10 @@ + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + diff --git a/tests/KafkaFlow.ProcessManagers.IntegrationTests/KafkaFlow.ProcessManagers.IntegrationTests.csproj b/tests/KafkaFlow.ProcessManagers.IntegrationTests/KafkaFlow.ProcessManagers.IntegrationTests.csproj index 4ac070c..a7c835a 100644 --- a/tests/KafkaFlow.ProcessManagers.IntegrationTests/KafkaFlow.ProcessManagers.IntegrationTests.csproj +++ b/tests/KafkaFlow.ProcessManagers.IntegrationTests/KafkaFlow.ProcessManagers.IntegrationTests.csproj @@ -1,12 +1,14 @@ - + net8.0 enable enable - false true + latest + true + latest