Skip to content

Commit

Permalink
Merge pull request #5 from RobThree/mssql
Browse files Browse the repository at this point in the history
SQL Server implementation
  • Loading branch information
AlexeyRaga authored Apr 26, 2024
2 parents 0e86054 + 822d348 commit f87a685
Show file tree
Hide file tree
Showing 28 changed files with 711 additions and 33 deletions.
40 changes: 35 additions & 5 deletions KafkaFlow.Contrib.sln
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@

Microsoft Visual Studio Solution File, Format Version 12.00
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Contrib.KafkaFlow.ProcessManagers", "src\Contrib.KafkaFlow.ProcessManagers\Contrib.KafkaFlow.ProcessManagers.csproj", "{1DD0DA5D-63CD-4FC3-972E-CD376139810A}"
# Visual Studio Version 17
VisualStudioVersion = 17.9.34728.123
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Contrib.KafkaFlow.ProcessManagers", "src\Contrib.KafkaFlow.ProcessManagers\Contrib.KafkaFlow.ProcessManagers.csproj", "{1DD0DA5D-63CD-4FC3-972E-CD376139810A}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{CE99781E-E88C-404C-B9AD-D7F713265F31}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{A5E2EC13-2E8B-4EB3-8C0F-D252A04B42F4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.ProcessManagers.IntegrationTests", "tests\KafkaFlow.ProcessManagers.IntegrationTests\KafkaFlow.ProcessManagers.IntegrationTests.csproj", "{F3DB8E66-BEA2-494F-B908-57C83BDF1501}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.ProcessManagers.IntegrationTests", "tests\KafkaFlow.ProcessManagers.IntegrationTests\KafkaFlow.ProcessManagers.IntegrationTests.csproj", "{F3DB8E66-BEA2-494F-B908-57C83BDF1501}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Contrib.KafkaFlow.Outbox", "src\Contrib.KafkaFlow.Outbox\Contrib.KafkaFlow.Outbox.csproj", "{39CF3703-98C2-4EE5-872B-AEFE85EEAC9A}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Contrib.KafkaFlow.Outbox", "src\Contrib.KafkaFlow.Outbox\Contrib.KafkaFlow.Outbox.csproj", "{39CF3703-98C2-4EE5-872B-AEFE85EEAC9A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Contrib.KafkaFlow.Outbox.Postgres", "src\Contrib.KafkaFlow.Outbox.Postgres\Contrib.KafkaFlow.Outbox.Postgres.csproj", "{95A1F8B4-98B2-42F2-8CF1-0067F3D6B125}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Contrib.KafkaFlow.Outbox.Postgres", "src\Contrib.KafkaFlow.Outbox.Postgres\Contrib.KafkaFlow.Outbox.Postgres.csproj", "{95A1F8B4-98B2-42F2-8CF1-0067F3D6B125}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Contrib.KafkaFlow.ProcessManagers.Postgres", "src\Contrib.KafkaFlow.ProcessManagers.Postgres\Contrib.KafkaFlow.ProcessManagers.Postgres.csproj", "{6A9E74CC-3E65-4422-8E3E-862C3A999B72}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Contrib.KafkaFlow.ProcessManagers.Postgres", "src\Contrib.KafkaFlow.ProcessManagers.Postgres\Contrib.KafkaFlow.ProcessManagers.Postgres.csproj", "{6A9E74CC-3E65-4422-8E3E-862C3A999B72}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Contrib.KafkaFlow.Outbox.SqlServer", "src\Contrib.KafkaFlow.Outbox.SqlServer\Contrib.KafkaFlow.Outbox.SqlServer.csproj", "{A4CBB313-9DCC-441E-BB44-1473CDF7A986}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Contrib.KafkaFlow.ProcessManagers.SqlServer", "src\Contrib.KafkaFlow.ProcessManagers.SqlServer\Contrib.KafkaFlow.ProcessManagers.SqlServer.csproj", "{3F0C0AD0-CFD8-4722-BF3B-6259D0878680}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Contrib.KafkaFlow.SqlServer", "src\Contrib.KafkaFlow.SqlServer\Contrib.KafkaFlow.SqlServer.csproj", "{6839FA17-DF28-418B-AB5A-2249D44F4CA3}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -40,12 +49,33 @@ Global
{6A9E74CC-3E65-4422-8E3E-862C3A999B72}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6A9E74CC-3E65-4422-8E3E-862C3A999B72}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6A9E74CC-3E65-4422-8E3E-862C3A999B72}.Release|Any CPU.Build.0 = Release|Any CPU
{A4CBB313-9DCC-441E-BB44-1473CDF7A986}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A4CBB313-9DCC-441E-BB44-1473CDF7A986}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A4CBB313-9DCC-441E-BB44-1473CDF7A986}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A4CBB313-9DCC-441E-BB44-1473CDF7A986}.Release|Any CPU.Build.0 = Release|Any CPU
{3F0C0AD0-CFD8-4722-BF3B-6259D0878680}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3F0C0AD0-CFD8-4722-BF3B-6259D0878680}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3F0C0AD0-CFD8-4722-BF3B-6259D0878680}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3F0C0AD0-CFD8-4722-BF3B-6259D0878680}.Release|Any CPU.Build.0 = Release|Any CPU
{6839FA17-DF28-418B-AB5A-2249D44F4CA3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6839FA17-DF28-418B-AB5A-2249D44F4CA3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6839FA17-DF28-418B-AB5A-2249D44F4CA3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6839FA17-DF28-418B-AB5A-2249D44F4CA3}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{1DD0DA5D-63CD-4FC3-972E-CD376139810A} = {CE99781E-E88C-404C-B9AD-D7F713265F31}
{F3DB8E66-BEA2-494F-B908-57C83BDF1501} = {A5E2EC13-2E8B-4EB3-8C0F-D252A04B42F4}
{39CF3703-98C2-4EE5-872B-AEFE85EEAC9A} = {CE99781E-E88C-404C-B9AD-D7F713265F31}
{95A1F8B4-98B2-42F2-8CF1-0067F3D6B125} = {CE99781E-E88C-404C-B9AD-D7F713265F31}
{6A9E74CC-3E65-4422-8E3E-862C3A999B72} = {CE99781E-E88C-404C-B9AD-D7F713265F31}
{A4CBB313-9DCC-441E-BB44-1473CDF7A986} = {CE99781E-E88C-404C-B9AD-D7F713265F31}
{3F0C0AD0-CFD8-4722-BF3B-6259D0878680} = {CE99781E-E88C-404C-B9AD-D7F713265F31}
{6839FA17-DF28-418B-AB5A-2249D44F4CA3} = {CE99781E-E88C-404C-B9AD-D7F713265F31}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {66EE15D8-69C4-4AFB-9EB8-6020B2D8C01A}
EndGlobalSection
EndGlobal
3 changes: 3 additions & 0 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ ecosystem.
The following backends are implemented:

- [Contrib.KafkaFlow.Outbox.Postgres](./src/Contrib.KafkaFlow.Outbox.Postgres) - Postgres SQL backend
- [Contrib.KafkaFlow.Outbox.SqlServer](./src/Contrib.KafkaFlow.Outbox.SqlServer) - SQL Server backend


- [Contrib.KafkaFlow.ProcessManagers](./src/Contrib.KafkaFlow.ProcessManagers/Readme.md)
Expand All @@ -22,6 +23,8 @@ ecosystem.

- [Contrib.KafkaFlow.ProcessManagers.Postgres](./src/Contrib.KafkaFlow.ProcessManagers.Postgres) -
Postgres SQL backend for storing process' state
- [Contrib.KafkaFlow.ProcessManagers.SqlServer](./src/Contrib.KafkaFlow.ProcessManagers.SqlServer) -
SQL Server backend for storing process' state

## Usage example

Expand Down
22 changes: 21 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ services:
- KAFKA_SCHEMAREGISTRY_ENABLED=TRUE
- KAFKA_SCHEMAREGISTRY_URLS=http://kafka:8081

db:
postgres:
image: public.ecr.aws/docker/library/postgres:14.7-alpine
environment:
PGDATA: /var/lib/postgresql/data/pgdata
Expand All @@ -61,3 +61,23 @@ services:
- "./local-env/postgres:/docker-entrypoint-initdb.d"
- "./src/Contrib.KafkaFlow.Outbox.Postgres/schema:/docker-entrypoint-initdb.d/0001.outbox"
- "./src/Contrib.KafkaFlow.ProcessManagers.Postgres/schema:/docker-entrypoint-initdb.d/0002.processes"

mssql:
image: mcr.microsoft.com/mssql/server:2022-CU12-ubuntu-22.04
environment:
ACCEPT_EULA: "Y"
MSSQL_SA_PASSWORD: "4YiUmU2YJ8$6eqbSXF8765Ck3"
MSSQL_PID: Developer
healthcheck:
test: /opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P "$${MSSQL_SA_PASSWORD}" -Q "SELECT 1" || exit 1
interval: 5s
timeout: 5s
retries: 5
ports:
- "1433:1433/tcp"
restart: unless-stopped
command: ["/bin/bash", "-c", "/docker-entrypoint-initdb.d/initdb.sh"]
volumes:
- "./local-env/mssql:/docker-entrypoint-initdb.d"
- "./src/Contrib.KafkaFlow.Outbox.SqlServer/schema:/docker-entrypoint-initdb.d/0001.outbox"
- "./src/Contrib.KafkaFlow.ProcessManagers.SqlServer/schema:/docker-entrypoint-initdb.d/0002.processes"
48 changes: 48 additions & 0 deletions local-env/mssql/initdb.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/usr/bin/env bash

# Start SQL Server
/opt/mssql/bin/sqlservr &

# Wait for SQL to start
TRIES=60
DBSTATUS=1
ERRCODE=1
i=0

echo "Waiting for SQL Server to start"
while [[ $DBSTATUS -ne 0 ]] && [[ $i -lt $TRIES ]]; do
i=$((i+1))
DBSTATUS=$(/opt/mssql-tools/bin/sqlcmd -h -1 -t 1 -U sa -P $MSSQL_SA_PASSWORD -Q "SET NOCOUNT ON; Select COALESCE(SUM(state), 0) from sys.databases") || DBSTATUS=1
if [ $DBSTATUS -ne 0 ]; then
sleep 1s
fi
done

sleep 5s
if [ $DBSTATUS -ne 0 ]; then
echo "SQL Server took more than $TRIES seconds to start up or one or more databases are not in an ONLINE state"
exit 1
fi

mssql=( /opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P "$MSSQL_SA_PASSWORD" -d master -i )

find /docker-entrypoint-initdb.d -mindepth 2 -type f | sort | while read f; do
case "$f" in
*.sh)
if [ -x "$f" ]; then
echo "$0: running $f"
"$f"
else
echo "$0: sourcing $f"
. "$f"
fi
;;
*.sql) echo "$0: running $f"; "${mssql[@]}" "$f"; echo ;;
*.sql.gz) echo "$0: running $f"; gunzip -c "$f" | "${mssql[@]}"; echo ;;
*) echo "$0: ignoring $f" ;;
esac
echo
done

echo "SQL Server is running"
sleep infinity
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Dapper" Version="2.1.28" />
<PackageReference Include="Dapper" Version="2.1.35" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="8.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using KafkaFlow;
using KafkaFlow.SqlServer;
using Microsoft.Extensions.DependencyInjection;

namespace KafkaFlow.Outbox.SqlServer;

public static class ConfigurationBuilderExtensions
{
public static IServiceCollection AddSqlServerOutboxBackend(this IServiceCollection services) =>
services.AddSingleton<IOutboxBackend, SqlServerOutboxBackend>();

public static IServiceCollection AddSqlServerOutboxBackend(this IServiceCollection services, string connectionString)
{
services.ConfigureSqlServerBackend(options => options.ConnectionString = connectionString);
return AddSqlServerOutboxBackend(services);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<LangVersion>latest</LangVersion>
<EnableNETAnalyzers>true</EnableNETAnalyzers>
<AnalysisLevel>latest</AnalysisLevel>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Contrib.KafkaFlow.SqlServer\Contrib.KafkaFlow.SqlServer.csproj" />
<ProjectReference Include="..\Contrib.KafkaFlow.Outbox\Contrib.KafkaFlow.Outbox.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Dapper" Version="2.1.35" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="8.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Options.DataAnnotations" Version="8.0.0" />
<PackageReference Include="System.Data.SqlClient" Version="4.8.6" />
</ItemGroup>

</Project>
95 changes: 95 additions & 0 deletions src/Contrib.KafkaFlow.Outbox.SqlServer/SqlServerOutboxBackend.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
using Confluent.Kafka;
using Dapper;
using KafkaFlow.SqlServer;
using Microsoft.Extensions.Options;
using System.Data.SqlClient;
using System.Text.Json;

namespace KafkaFlow.Outbox.SqlServer;

public class SqlServerOutboxBackend : IOutboxBackend
{
private readonly SqlServerBackendOptions _options;

public SqlServerOutboxBackend(IOptions<SqlServerBackendOptions> options)
=> _options = options?.Value ?? throw new ArgumentNullException(nameof(options));

public async ValueTask Store(TopicPartition topicPartition, Message<byte[], byte[]> message, CancellationToken token = default)
{
var sql = """
INSERT INTO [outbox].[outbox] ([topic_name], [partition], [message_key], [message_headers], [message_body])
VALUES (@topic_name, @partition, @message_key, @message_headers, @message_body);
""";

using var conn = new SqlConnection(_options.ConnectionString);

var rawHeaders =
message.Headers == null
? null
: JsonSerializer.Serialize(message.Headers.ToDictionary(x => x.Key, x => x.GetValueBytes()));

var res = await conn.ExecuteAsync(sql, new
{
topic_name = topicPartition.Topic,
partition = topicPartition.Partition.IsSpecial ? null : (int?)topicPartition.Partition.Value,
message_key = message.Key,
message_headers = rawHeaders,
message_body = message.Value
}).ConfigureAwait(false);

}

public async ValueTask<OutboxRecord[]> Read(int batchSize, CancellationToken token = default)
{
var sql = """
DELETE FROM [outbox].[outbox]
OUTPUT DELETED.*
WHERE
[sequence_id] IN (
SELECT TOP (@batch_size) [sequence_id] FROM [outbox].[outbox]
ORDER BY [sequence_id]
);
""";
using var conn = new SqlConnection(_options.ConnectionString);
var result = await conn.QueryAsync<OutboxTableRow>(sql, new { batch_size = batchSize });

return result?.Select(ToOutboxRecord).ToArray() ?? Array.Empty<OutboxRecord>();
}

private static OutboxRecord ToOutboxRecord(OutboxTableRow row)
{
var partition = row.partition.HasValue ? new Partition(row.partition.Value) : Partition.Any;
var topicPartition = new TopicPartition(row.topic_name, partition);

var storedHeaders =
row.message_headers == null
? null
: JsonSerializer.Deserialize<Dictionary<string, byte[]>>(row.message_headers);

var headers =
storedHeaders?.Aggregate(new Headers(), (h, x) =>
{
h.Add(x.Key, x.Value);
return h;
});

var msg = new Message<byte[], byte[]>
{
Key = row.message_key!,
Value = row.message_body!,
Headers = headers
};

return new OutboxRecord(topicPartition, msg);
}
}

internal sealed class OutboxTableRow
{
public long sequence_id { get; set; }
public string topic_name { get; set; } = null!;
public int? partition { get; set; }
public byte[]? message_key { get; set; }
public string? message_headers { get; set; }
public byte[]? message_body { get; set; }
}
5 changes: 5 additions & 0 deletions src/Contrib.KafkaFlow.Outbox.SqlServer/schema/0001.Shema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
IF NOT EXISTS(SELECT * FROM sys.schemas WHERE name = 'outbox')
BEGIN
EXEC ('CREATE SCHEMA [outbox] AUTHORIZATION [dbo]')
END
GO
18 changes: 18 additions & 0 deletions src/Contrib.KafkaFlow.Outbox.SqlServer/schema/0002.Table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/* Table */
IF OBJECT_ID(N'[outbox].[outbox]', N'U') IS NULL
BEGIN
CREATE TABLE [outbox].[outbox](
[sequence_id] [bigint] IDENTITY(1,1) NOT NULL,
[topic_name] [nvarchar](255) NOT NULL,
[partition] [int] NULL,
[message_key] [varbinary](max) NULL,
[message_headers] [nvarchar](max) NULL,
[message_body] [varbinary](max) NULL,
[date_added_utc] [datetime2] NOT NULL DEFAULT(SYSUTCDATETIME()),
[rowversion] [int] NOT NULL DEFAULT(1),
CONSTRAINT [PK_outbox] PRIMARY KEY CLUSTERED ([sequence_id] ASC),
CONSTRAINT [CK_headers_not_blank_or_empty] CHECK ((TRIM([message_headers])<>N'')),
CONSTRAINT [CK_topic_name_not_blank_or_empty] CHECK ((TRIM([topic_name])<>N''))
)
END
GO
6 changes: 3 additions & 3 deletions src/Contrib.KafkaFlow.Outbox/Contrib.KafkaFlow.Outbox.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="KafkaFlow.Abstractions" Version="3.0.3" />
<PackageReference Include="KafkaFlow.Microsoft.DependencyInjection" Version="3.0.3" />
<PackageReference Include="KafkaFlow.Abstractions" Version="3.0.7" />
<PackageReference Include="KafkaFlow.Microsoft.DependencyInjection" Version="3.0.7" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.1" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Dapper" Version="2.1.28" />
<PackageReference Include="Dapper" Version="2.1.35" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="8.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using KafkaFlow;
using KafkaFlow.SqlServer;
using Microsoft.Extensions.DependencyInjection;

namespace KafkaFlow.ProcessManagers.SqlServer;

public static class ConfigurationBuilderExtensions
{
public static IServiceCollection AddSqlServerProcessManagerState(this IServiceCollection services) =>
services.AddSingleton<IProcessStateStore, SqlServerProcessManagersStore>();


public static IServiceCollection AddSqlServerProcessManagerState(this IServiceCollection services, string connectionString)
{
services.ConfigureSqlServerBackend(options => options.ConnectionString = connectionString);
return AddSqlServerProcessManagerState(services);
}
}
Loading

0 comments on commit f87a685

Please sign in to comment.