-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Cleanup & refactor * Minor reformatting of queries * Fix lost-somewhere spaces +semver:minor
- Loading branch information
Showing
49 changed files
with
716 additions
and
832 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2 changes: 1 addition & 1 deletion
2
src/Contrib.KafkaFlow.Outbox.Postgres/Contrib.KafkaFlow.Outbox.Postgres.csproj
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
100 changes: 0 additions & 100 deletions
100
src/Contrib.KafkaFlow.Outbox.Postgres/PostgresOutboxBackend.cs
This file was deleted.
Oops, something went wrong.
50 changes: 50 additions & 0 deletions
50
src/Contrib.KafkaFlow.Outbox.Postgres/PostgresOutboxRepository.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
using Dapper; | ||
using Npgsql; | ||
|
||
namespace KafkaFlow.Outbox.Postgres; | ||
|
||
public class PostgresOutboxRepository(NpgsqlDataSource connectionPool) : IOutboxRepository | ||
{ | ||
private readonly NpgsqlDataSource _connectionPool = connectionPool; | ||
|
||
public async ValueTask Store(OutboxTableRow outboxTableRow, CancellationToken token = default) | ||
{ | ||
var sql = """ | ||
INSERT INTO outbox.outbox(topic_name, partition, message_key, message_headers, message_body) | ||
VALUES (@topic_name, @partition, @message_key, @message_headers, @message_body) | ||
"""; | ||
|
||
await using var conn = _connectionPool.CreateConnection(); | ||
await conn.ExecuteAsync(sql, new | ||
{ | ||
topic_name = outboxTableRow.TopicName, | ||
partition = outboxTableRow.Partition, | ||
message_key = outboxTableRow.MessageKey, | ||
message_headers = outboxTableRow.MessageHeaders, | ||
message_body = outboxTableRow.MessageBody | ||
}).ConfigureAwait(false); | ||
} | ||
|
||
public async Task<IEnumerable<OutboxTableRow>> Read(int batchSize, CancellationToken token = default) | ||
{ | ||
var sql = """ | ||
DELETE FROM outbox.outbox | ||
WHERE | ||
sequence_id = ANY(ARRAY( | ||
SELECT sequence_id FROM outbox.outbox | ||
ORDER BY sequence_id | ||
LIMIT @batch_size | ||
FOR UPDATE | ||
)) | ||
RETURNING | ||
sequence_id as "SequenceId", | ||
topic_name as "TopicName", | ||
partition as "Partition", | ||
message_key as "MessageKey", | ||
message_headers as "MessageHeaders", | ||
message_body as "MessageBody" | ||
"""; | ||
await using var conn = _connectionPool.CreateConnection(); | ||
return await conn.QueryAsync<OutboxTableRow>(sql, new { batch_size = batchSize }).ConfigureAwait(false); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
94 changes: 0 additions & 94 deletions
94
src/Contrib.KafkaFlow.Outbox.SqlServer/SqlServerOutboxBackend.cs
This file was deleted.
Oops, something went wrong.
50 changes: 50 additions & 0 deletions
50
src/Contrib.KafkaFlow.Outbox.SqlServer/SqlServerOutboxRepository.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
using Dapper; | ||
using KafkaFlow.SqlServer; | ||
using Microsoft.Extensions.Options; | ||
using System.Data.SqlClient; | ||
|
||
namespace KafkaFlow.Outbox.SqlServer; | ||
|
||
public class SqlServerOutboxRepository(IOptions<SqlServerBackendOptions> options) : IOutboxRepository | ||
{ | ||
private readonly SqlServerBackendOptions _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); | ||
|
||
public async ValueTask Store(OutboxTableRow outboxTableRow, CancellationToken token = default) | ||
{ | ||
var sql = """ | ||
INSERT INTO [outbox].[outbox] ([topic_name], [partition], [message_key], [message_headers], [message_body]) | ||
VALUES (@topic_name, @partition, @message_key, @message_headers, @message_body); | ||
"""; | ||
|
||
using var conn = new SqlConnection(_options.ConnectionString); | ||
await conn.ExecuteAsync(sql, new | ||
{ | ||
topic_name = outboxTableRow.TopicName, | ||
partition = outboxTableRow.Partition, | ||
message_key = outboxTableRow.MessageKey, | ||
message_headers = outboxTableRow.MessageHeaders, | ||
message_body = outboxTableRow.MessageBody | ||
}).ConfigureAwait(false); | ||
} | ||
|
||
public async Task<IEnumerable<OutboxTableRow>> Read(int batchSize, CancellationToken token = default) | ||
{ | ||
var sql = """ | ||
DELETE FROM [outbox].[outbox] | ||
OUTPUT [DELETED].[sequence_id] as [SequenceId], | ||
[DELETED].[topic_name] as [TopicName], | ||
[DELETED].[partition] as [Partition], | ||
[DELETED].[message_key] as [MessageKey], | ||
[DELETED].[message_headers] as [MessageHeaders], | ||
[DELETED].[message_body] as [MessageBody] | ||
WHERE | ||
[sequence_id] IN ( | ||
SELECT TOP (@batch_size) [sequence_id] FROM [outbox].[outbox] | ||
ORDER BY [sequence_id] | ||
); | ||
"""; | ||
|
||
using var conn = new SqlConnection(_options.ConnectionString); | ||
return await conn.QueryAsync<OutboxTableRow>(sql, new { batch_size = batchSize }).ConfigureAwait(false); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
namespace KafkaFlow.Outbox; | ||
|
||
public interface IOutboxDispatcher {} |
Oops, something went wrong.