Skip to content

Commit

Permalink
Lower framework support (#9)
Browse files Browse the repository at this point in the history
This allows even .Net Framework developers to use this library. I had to add one preprocessor directive in `InMemoryOutboxBackend.cs` because in .NetStandard 2.0 the ConcurrentQueue doesn't support the `.Clear()` method yet so I had to "DIY" the clear. I'm not sure if this needs to be an atomic operation but if it turns out it needs to be then we can implement a lock or something.
  • Loading branch information
RobThree authored Apr 26, 2024
1 parent 560e24d commit 7b90a96
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 74 deletions.
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<RootNamespace>KafkaFlow.Outbox.Postgres</RootNamespace>
</PropertyGroup>
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<RootNamespace>KafkaFlow.Outbox.Postgres</RootNamespace>
<LangVersion>latest</LangVersion>
<EnableNETAnalyzers>true</EnableNETAnalyzers>
<AnalysisLevel>latest</AnalysisLevel>

<ItemGroup>
<ProjectReference Include="..\Contrib.KafkaFlow.Outbox\Contrib.KafkaFlow.Outbox.csproj" />
</ItemGroup>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Dapper" Version="2.1.35" />
<PackageReference Include="Npgsql" Version="8.0.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Contrib.KafkaFlow.Outbox\Contrib.KafkaFlow.Outbox.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Dapper" Version="2.1.35" />
<PackageReference Include="Npgsql" Version="8.0.2" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<TargetFramework>netstandard2.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<LangVersion>latest</LangVersion>
Expand All @@ -17,6 +17,7 @@
<ItemGroup>
<PackageReference Include="Dapper" Version="2.1.35" />
<PackageReference Include="System.Data.SqlClient" Version="4.8.6" />
<PackageReference Include="System.Text.Json" Version="8.0.3" />
</ItemGroup>

</Project>
35 changes: 21 additions & 14 deletions src/Contrib.KafkaFlow.Outbox/Contrib.KafkaFlow.Outbox.csproj
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<RootNamespace>KafkaFlow.Outbox</RootNamespace>
<PackageVersion></PackageVersion>
</PropertyGroup>
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<RootNamespace>KafkaFlow.Outbox</RootNamespace>
<LangVersion>latest</LangVersion>
<EnableNETAnalyzers>true</EnableNETAnalyzers>
<AnalysisLevel>latest</AnalysisLevel>
<PackageVersion></PackageVersion>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="KafkaFlow.Abstractions" Version="3.0.7" />
<PackageReference Include="KafkaFlow.Microsoft.DependencyInjection" Version="3.0.7" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.1" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="KafkaFlow.Abstractions" Version="3.0.7" />
<PackageReference Include="KafkaFlow.Microsoft.DependencyInjection" Version="3.0.7" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.1" />
<PackageReference Include="PolySharp" Version="1.14.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>

</Project>
14 changes: 9 additions & 5 deletions src/Contrib.KafkaFlow.Outbox/InMemory/InMemoryOutboxBackend.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using System.Collections.Concurrent;
using Confluent.Kafka;
using System.Collections.Concurrent;

namespace KafkaFlow.Outbox.InMemory;

Expand All @@ -10,7 +10,7 @@ public sealed class InMemoryOutboxBackend : IOutboxBackend
public ValueTask Store(TopicPartition topicPartition, Message<byte[], byte[]> message, CancellationToken token = default)
{
_queue.Enqueue((topicPartition, message));
return ValueTask.CompletedTask;
return default;
}

public ValueTask<OutboxRecord[]> Read(int batchSize, CancellationToken token = default)
Expand All @@ -23,13 +23,17 @@ public ValueTask<OutboxRecord[]> Read(int batchSize, CancellationToken token = d
.Select(x => x!)
.ToArray();

return ValueTask.FromResult(batch);
return new ValueTask<OutboxRecord[]>(batch);
}

public ValueTask Purge()
{
_queue.Clear();
return ValueTask.CompletedTask;
#if NETSTANDARD2_1_OR_GREATER
_queue.Clear(); // Unfortunately .NetStandard 2.1+ is required for this
#else
while (_queue.TryDequeue(out _)) { } // So we have to do it this way
#endif
return default;
}

public ValueTask<OutboxRecord[]> GetAll() => Read(int.MaxValue);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<RootNamespace>KafkaFlow.ProcessManagers.Postgres</RootNamespace>
</PropertyGroup>
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<RootNamespace>KafkaFlow.ProcessManagers.Postgres</RootNamespace>
<LangVersion>latest</LangVersion>
<EnableNETAnalyzers>true</EnableNETAnalyzers>
<AnalysisLevel>latest</AnalysisLevel>

<ItemGroup>
<ProjectReference Include="..\Contrib.KafkaFlow.ProcessManagers\Contrib.KafkaFlow.ProcessManagers.csproj" />
</ItemGroup>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Dapper" Version="2.1.35" />
<PackageReference Include="Npgsql" Version="8.0.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Contrib.KafkaFlow.ProcessManagers\Contrib.KafkaFlow.ProcessManagers.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Dapper" Version="2.1.35" />
<PackageReference Include="Npgsql" Version="8.0.2" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<TargetFramework>netstandard2.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<LangVersion>latest</LangVersion>
Expand All @@ -17,6 +17,11 @@
<ItemGroup>
<PackageReference Include="Dapper" Version="2.1.35" />
<PackageReference Include="System.Data.SqlClient" Version="4.8.6" />
<PackageReference Include="PolySharp" Version="1.14.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="System.Text.Json" Version="8.0.3" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<RootNamespace>KafkaFlow.ProcessManagers</RootNamespace>
</PropertyGroup>
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<RootNamespace>KafkaFlow.ProcessManagers</RootNamespace>
<LangVersion>latest</LangVersion>
<EnableNETAnalyzers>true</EnableNETAnalyzers>
<AnalysisLevel>latest</AnalysisLevel>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="KafkaFlow.Abstractions" Version="3.0.7" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.1" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="KafkaFlow.Abstractions" Version="3.0.7" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.1" />
<PackageReference Include="PolySharp" Version="1.14.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="System.Text.Json" Version="8.0.3" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,23 @@ public ValueTask Persist(Type processType, Guid processId, VersionedState state)
{
Store.AddOrUpdate((processType, processId), state, (_, currentState) =>
{
if (currentState.Version != state.Version)
throw new OptimisticConcurrencyException(processType, processId,
$"Concurrency error when persisting state {processType.FullName}");
return state;
return currentState.Version != state.Version
? throw new OptimisticConcurrencyException(processType, processId,
$"Concurrency error when persisting state {processType.FullName}")
: state;
});
}

return ValueTask.CompletedTask;
return default;
}

public ValueTask<VersionedState> Load(Type processType, Guid processId)
{
return Store.TryGetValue((processType, processId), out var state)
? ValueTask.FromResult(state)
: ValueTask.FromResult(new VersionedState(0, default));
}
public ValueTask<VersionedState> Load(Type processType, Guid processId) => Store.TryGetValue((processType, processId), out var state)
? new ValueTask<VersionedState>(state)
: new ValueTask<VersionedState>(new VersionedState(0, default));

public ValueTask Delete(Type processType, Guid processId, int version)
{
Store.TryRemove((processType, processId), out _);
return ValueTask.CompletedTask;
return default;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ from messageType in GetMessageTypes(processType)
group processType by messageType)
.ToDictionary(x => x.Key, x => x.ToList());

var mapping = new HandlerTypeMapping(maps.AsReadOnly());
var mapping = new HandlerTypeMapping(maps);

foreach (var processType in _processManagers)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<TargetFramework>netstandard2.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<LangVersion>latest</LangVersion>
Expand All @@ -11,6 +11,10 @@

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="8.0.0" />
<PackageReference Include="PolySharp" Version="1.14.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>

<IsPackable>false</IsPackable>
<IsTestProject>true</IsTestProject>
<LangVersion>latest</LangVersion>
<EnableNETAnalyzers>true</EnableNETAnalyzers>
<AnalysisLevel>latest</AnalysisLevel>
</PropertyGroup>

<ItemGroup>
Expand Down

0 comments on commit 7b90a96

Please sign in to comment.