Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP feat: support Kafka messages with nullable or empty partition key #87

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ Install packages related to your context. The Core package is required for all o
.WithRetryTopicName("test-topic-retry")
.WithRetryConsumerBufferSize(4)
.WithRetryConsumerWorkersCount(2)
.WithRetryConusmerStrategy(RetryConsumerStrategy.GuaranteeOrderedConsumption)
.WithRetryConsumerStrategy(RetryConsumerStrategy.GuaranteeOrderedConsumption)
.WithRetryTypedHandlers(
handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Transient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

internal interface IPhysicalStorageAssert
{
Task AssertEmptyKeyRetryDurableMessageRetryingAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int retryCount);

Task AssertRetryDurableMessageCreationAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int count);

Task AssertRetryDurableMessageDoneAsync(RepositoryType repositoryType, RetryDurableTestMessage message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,29 @@ public RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert(IRepositoryP
this.repositoryProvider = repositoryProvider;
}

public async Task AssertEmptyKeyRetryDurableMessageRetryingAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int retryCount)
{
var retryQueue = await this
.repositoryProvider
.GetRepositoryOfType(repositoryType)
.GetRetryQueueAsync(message.Key)
.ConfigureAwait(false);

Assert.True(retryQueue.Id != Guid.Empty, "Retry Durable Creation Get Retry Queue cannot be asserted.");

var retryQueueItems = await this
.repositoryProvider
.GetRepositoryOfType(repositoryType)
.GetRetryQueueItemsAsync(retryQueue.Id, rqi => rqi.Count(item => item.Status == RetryQueueItemStatus.InRetry) != retryCount)
.ConfigureAwait(false);

Assert.True(retryQueueItems != null, "Retry Durable Creation Get Retry Queue Item Message cannot be asserted.");
fernando-a-marins marked this conversation as resolved.
Show resolved Hide resolved

Assert.Equal(retryQueueItems.Count() - 1, retryQueueItems.Max(i => i.Sort));
Assert.True(Enum.Equals(retryQueue.Status, RetryQueueStatus.Active));
Assert.All(retryQueueItems, i => Enum.Equals(i.Status, RetryQueueItemStatus.Waiting));
}

public async Task AssertRetryDurableMessageCreationAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int count)
{
var retryQueue = await this
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow.Retry.IntegrationTests.Core.Storages.Assertion
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using KafkaFlow.Retry.Durable.Repository.Model;
Expand All @@ -18,6 +19,37 @@ public RetryDurableLatestConsumptionPhysicalStorageAssert(IRepositoryProvider re
this.repositoryProvider = repositoryProvider;
}

public async Task AssertEmptyKeyRetryDurableMessageRetryingAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int retryCount)
{
var retryQueue = await this
.repositoryProvider
.GetRepositoryOfType(repositoryType)
.GetRetryQueueAsync(message.Key).ConfigureAwait(false);

Assert.True(retryQueue.Id != Guid.Empty, "Retry Durable Retrying Get Retry Queue cannot be asserted.");

var retryQueueItems = await this
.repositoryProvider
.GetRepositoryOfType(repositoryType)
.GetRetryQueueItemsAsync(
retryQueue.Id,
rqi =>
{
return rqi.Count(item => item.Status == RetryQueueItemStatus.Waiting) != retryCount;
}).ConfigureAwait(false);

var lastRetryItem = retryQueueItems.OrderBy(x => x.Sort).Last();
var numberOrRetryItems = retryQueueItems.Count();
var maxSortValue = retryQueueItems.Max(i => i.Sort);
var cancelledRetryItems = retryQueueItems.Except(new List<RetryQueueItem> { lastRetryItem });

Assert.True(retryQueueItems != null, "Retry Durable Retrying Get Retry Queue Item Message cannot be asserted.");
fernando-a-marins marked this conversation as resolved.
Show resolved Hide resolved
Assert.True(Enum.Equals(retryQueue.Status, RetryQueueStatus.Active), "Actual retry queue should be in active state");
Assert.Equal(numberOrRetryItems - 1, maxSortValue);
Assert.Equal(RetryQueueItemStatus.Waiting, lastRetryItem.Status);
Assert.All(cancelledRetryItems, i => Enum.Equals(i.Status, RetryQueueItemStatus.Cancelled));
}

public async Task AssertRetryDurableMessageCreationAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int count)
{
var retryQueue = await this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,52 @@
internal static class InMemoryAuxiliarStorage<T> where T : ITestMessage
{
private const int TimeoutSec = 60;
private static readonly ConcurrentBag<T> Message = new ConcurrentBag<T>();
private static readonly ConcurrentBag<T> Messages = new ConcurrentBag<T>();

public static bool ThrowException { get; set; }

public static void Add(T message)
{
Message.Add(message);
Messages.Add(message);
}

public static async Task AssertCountMessageAsync(T message, int count)
{
var start = DateTime.Now;

while (Message.Count(x => x.Key == message.Key && x.Value == message.Value) != count)
while (Messages.Count(x => x.Key == message.Key && x.Value == message.Value) != count)
{
if (DateTime.Now.Subtract(start).TotalSeconds > TimeoutSec && !Debugger.IsAttached)
{
Assert.True(false, "Message not received.");
Assert.True(false, $"Message not received - {message.Key}:{message.Value}.");
return;
}

await Task.Delay(100).ConfigureAwait(false);
}
}

public static async Task AssertEmptyPartitionKeyCountMessageAsync(T message, int count, int timoutSeconds = TimeoutSec)
{
var start = DateTime.Now;
int numberOfMessages = 0;
do
{
numberOfMessages = Messages.Count(x => x.Value == message.Value);

if (DateTime.Now.Subtract(start).TotalSeconds > timoutSeconds && !Debugger.IsAttached)
{
Assert.True(false, $"Message {message.Key}:{message.Value} not received. Expected {count}, messages received {numberOfMessages}");
return;
}

await Task.Delay(1000).ConfigureAwait(false);
} while (numberOfMessages != count);
}

public static void Clear()
{
Message.Clear();
Messages.Clear();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
namespace KafkaFlow.Retry.IntegrationTests
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using KafkaFlow.Retry.IntegrationTests.Core.Bootstrappers.Fixtures;
using KafkaFlow.Retry.IntegrationTests.Core.Messages;
using KafkaFlow.Retry.IntegrationTests.Core.Producers;
using KafkaFlow.Retry.IntegrationTests.Core.Storages;
using KafkaFlow.Retry.IntegrationTests.Core.Storages.Assertion;
using KafkaFlow.Retry.IntegrationTests.Core.Storages.Repositories;
using Microsoft.Extensions.DependencyInjection;
using Xunit;

[Collection("BootstrapperHostCollection")]
public class EmptyPartitionKeyRetryDurableTests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's missing tests with the null partition key. But please check my other comment.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't test with a null partition because the Kafka flow doesn't support it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can test it if the test does not start with the production of a Kafka message but with a consumption. If we want to support null partition keys we should have a test for it.

{
private const int defaultWaitingTimeSeconds = 120;
private readonly IRepositoryProvider repositoryProvider;
private readonly IServiceProvider serviceProvider;

public EmptyPartitionKeyRetryDurableTests(BootstrapperHostFixture bootstrapperHostFixture)
{
this.serviceProvider = bootstrapperHostFixture.ServiceProvider;
this.repositoryProvider = bootstrapperHostFixture.ServiceProvider.GetRequiredService<IRepositoryProvider>();
InMemoryAuxiliarStorage<RetryDurableTestMessage>.Clear();
InMemoryAuxiliarStorage<RetryDurableTestMessage>.ThrowException = true;
}

public static IEnumerable<object[]> EmptyKeyScenarios()
{
yield return new object[]
{
RepositoryType.MongoDb,
typeof(IMessageProducer<RetryDurableGuaranteeOrderedConsumptionMongoDbProducer>),
typeof(RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert),
3 //numberOfMessagesToBeProduced
};
yield return new object[]
{
RepositoryType.SqlServer,
typeof(IMessageProducer<RetryDurableGuaranteeOrderedConsumptionSqlServerProducer>),
typeof(RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert),
3
};
yield return new object[]
{
RepositoryType.MongoDb,
typeof(IMessageProducer<RetryDurableLatestConsumptionMongoDbProducer>),
typeof(RetryDurableLatestConsumptionPhysicalStorageAssert),
1
};
yield return new object[]
{
RepositoryType.SqlServer,
typeof(IMessageProducer<RetryDurableLatestConsumptionSqlServerProducer>),
typeof(RetryDurableLatestConsumptionPhysicalStorageAssert),
1
};
}

[Theory]
[MemberData(nameof(EmptyKeyScenarios))]
internal async Task EmptyKeyRetryDurableTest(
RepositoryType repositoryType,
Type producerType,
Type physicalStorageType,
int numberOfMessagesToBeProduced)
{
// Arrange
var numberOfMessagesByEachSameKey = 1;
var numberOfTimesThatEachMessageIsTriedWhenDone = 1;
var numberOfTimesThatEachMessageIsTriedDuringDurable = 1;
var producer = this.serviceProvider.GetRequiredService(producerType) as IMessageProducer;
var physicalStorageAssert = this.serviceProvider.GetRequiredService(physicalStorageType) as IPhysicalStorageAssert;
var messages = new List<RetryDurableTestMessage>();
for (int i = 0; i < numberOfMessagesToBeProduced; i++)
{
messages.Add(new RetryDurableTestMessage { Key = string.Empty, Value = $"Message_{i + 1}" });
}

await this.repositoryProvider.GetRepositoryOfType(repositoryType).CleanDatabaseAsync().ConfigureAwait(false);

// Act
foreach (var message in messages)
{
await producer.ProduceAsync(message.Key, message).ConfigureAwait(false);
}

RetryDurableTestMessage messageToValidate = messages[0];

await physicalStorageAssert
.AssertEmptyKeyRetryDurableMessageRetryingAsync(repositoryType, messageToValidate, numberOfMessagesByEachSameKey)
.ConfigureAwait(false);

// Assert - Retrying
InMemoryAuxiliarStorage<RetryDurableTestMessage>.Clear();

await InMemoryAuxiliarStorage<RetryDurableTestMessage>
.AssertEmptyPartitionKeyCountMessageAsync(messageToValidate, numberOfTimesThatEachMessageIsTriedDuringDurable, defaultWaitingTimeSeconds)
.ConfigureAwait(false);

await physicalStorageAssert
.AssertEmptyKeyRetryDurableMessageRetryingAsync(repositoryType, messageToValidate, numberOfTimesThatEachMessageIsTriedDuringDurable)
.ConfigureAwait(false);

// Assert - Done
InMemoryAuxiliarStorage<RetryDurableTestMessage>.ThrowException = false;
InMemoryAuxiliarStorage<RetryDurableTestMessage>.Clear();

await InMemoryAuxiliarStorage<RetryDurableTestMessage>
.AssertEmptyPartitionKeyCountMessageAsync(messageToValidate, numberOfTimesThatEachMessageIsTriedWhenDone, defaultWaitingTimeSeconds)
.ConfigureAwait(false);

await physicalStorageAssert
.AssertRetryDurableMessageDoneAsync(repositoryType, messageToValidate)
.ConfigureAwait(false);
}
}
}
8 changes: 4 additions & 4 deletions src/KafkaFlow.Retry.IntegrationTests/RetryDurableTests.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
namespace KafkaFlow.Retry.IntegrationTests
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using AutoFixture;
using KafkaFlow.Retry.IntegrationTests.Core.Bootstrappers.Fixtures;
using KafkaFlow.Retry.IntegrationTests.Core.Messages;
Expand All @@ -12,6 +8,10 @@ namespace KafkaFlow.Retry.IntegrationTests
using KafkaFlow.Retry.IntegrationTests.Core.Storages.Assertion;
using KafkaFlow.Retry.IntegrationTests.Core.Storages.Repositories;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Xunit;

[Collection("BootstrapperHostCollection")]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
namespace KafkaFlow.Retry.UnitTests.API
{
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using global::KafkaFlow.Retry.Durable.Repository.Model;
using global::KafkaFlow.Retry.UnitTests.API.Surrogate;
using global::KafkaFlow.Retry.UnitTests.API.Utilities;
using Microsoft.AspNetCore.Http;
using Moq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

public class RetryRequestHandlerBaseTests
Expand All @@ -21,7 +22,7 @@ public async Task RetryRequestHandlerBase_HandleAsync_CallsHandleRequestAsync()
// Arrange
var dto = new DtoSurrogate
{
Text = Durable.Repository.Model.RetryQueueStatus.Active
Text = RetryQueueStatus.Active
};

var mockHttpContext = HttpContextHelper.MockHttpContext(ResourcePath, HttpMethod, requestBody: dto);
Expand Down
Loading