From 60fd16d6fd449764bf62b563246887c0bcc9a135 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Fri, 7 Feb 2025 08:36:31 -0800 Subject: [PATCH] Fix Azure Queue Streaming IConfiguration provider --- Directory.Packages.props | 1 + .../AzureQueueStreamProviderBuilder.cs | 19 +- .../Orleans.Streaming.AzureStorage.csproj | 1 + .../Hosting/SiloBuilderStreamingExtensions.cs | 2 +- src/Orleans.TestingHost/InProcTestCluster.cs | 32 +++- .../Streaming/AQStreamingTests.cs | 179 +++++++++++------- .../TesterAzureUtils/Tester.AzureUtils.csproj | 1 + 7 files changed, 148 insertions(+), 87 deletions(-) diff --git a/Directory.Packages.props b/Directory.Packages.props index af90d041ba..450486392d 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -10,6 +10,7 @@ + diff --git a/src/Azure/Orleans.Streaming.AzureStorage/Hosting/AzureQueueStreamProviderBuilder.cs b/src/Azure/Orleans.Streaming.AzureStorage/Hosting/AzureQueueStreamProviderBuilder.cs index 478cc167ad..d3e3edab3b 100644 --- a/src/Azure/Orleans.Streaming.AzureStorage/Hosting/AzureQueueStreamProviderBuilder.cs +++ b/src/Azure/Orleans.Streaming.AzureStorage/Hosting/AzureQueueStreamProviderBuilder.cs @@ -63,9 +63,22 @@ private static Action> GetQueueOptionBuilder(I if (!string.IsNullOrEmpty(connectionString)) { - options.QueueServiceClient = Uri.TryCreate(connectionString, UriKind.Absolute, out var uri) - ? new QueueServiceClient(uri) - : new QueueServiceClient(connectionString); + if (Uri.TryCreate(connectionString, UriKind.Absolute, out var uri)) + { + if (!string.IsNullOrEmpty(uri.Query)) + { + // SAS URI + options.QueueServiceClient = new QueueServiceClient(uri); + } + else + { + options.QueueServiceClient = new QueueServiceClient(uri, credential: new Azure.Identity.DefaultAzureCredential()); + } + } + else + { + options.QueueServiceClient = new QueueServiceClient(connectionString); + } } } }); diff --git a/src/Azure/Orleans.Streaming.AzureStorage/Orleans.Streaming.AzureStorage.csproj b/src/Azure/Orleans.Streaming.AzureStorage/Orleans.Streaming.AzureStorage.csproj index 60fb7471ac..a6b5e866f9 100644 --- a/src/Azure/Orleans.Streaming.AzureStorage/Orleans.Streaming.AzureStorage.csproj +++ b/src/Azure/Orleans.Streaming.AzureStorage/Orleans.Streaming.AzureStorage.csproj @@ -23,6 +23,7 @@ + diff --git a/src/Orleans.Streaming/Hosting/SiloBuilderStreamingExtensions.cs b/src/Orleans.Streaming/Hosting/SiloBuilderStreamingExtensions.cs index 15d0977970..766103ebb5 100644 --- a/src/Orleans.Streaming/Hosting/SiloBuilderStreamingExtensions.cs +++ b/src/Orleans.Streaming/Hosting/SiloBuilderStreamingExtensions.cs @@ -5,7 +5,7 @@ namespace Orleans.Hosting { /// - /// Extension methods for confiiguring streaming on silos. + /// Extension methods for configuring streaming on silos. /// public static class SiloBuilderStreamingExtensions { diff --git a/src/Orleans.TestingHost/InProcTestCluster.cs b/src/Orleans.TestingHost/InProcTestCluster.cs index 5256724292..ec2c87310f 100644 --- a/src/Orleans.TestingHost/InProcTestCluster.cs +++ b/src/Orleans.TestingHost/InProcTestCluster.cs @@ -497,8 +497,19 @@ public async Task InitializeClientAsync() DisableDefaults = true, }); + foreach (var hostDelegate in Options.ClientHostConfigurationDelegates) + { + hostDelegate(hostBuilder); + } + hostBuilder.UseOrleansClient(clientBuilder => { + clientBuilder.Configure(o => + { + o.ClusterId = Options.ClusterId; + o.ServiceId = Options.ServiceId; + }); + if (Options.UseTestClusterMembership) { clientBuilder.Services.AddSingleton(_membershipTable); @@ -509,11 +520,6 @@ public async Task InitializeClientAsync() TryConfigureFileLogging(Options, hostBuilder.Services, "TestClusterClient"); - foreach (var hostDelegate in Options.ClientHostConfigurationDelegates) - { - hostDelegate(hostBuilder); - } - ClientHost = hostBuilder.Build(); await ClientHost.StartAsync(); } @@ -557,8 +563,19 @@ public async Task CreateSiloAsync(InProcessTestSiloSpecific services.Configure(op => op.ResponseTimeout = TimeSpan.FromMilliseconds(1000000)); } + foreach (var hostDelegate in Options.SiloHostConfigurationDelegates) + { + hostDelegate(siloOptions, appBuilder); + } + appBuilder.UseOrleans(siloBuilder => { + siloBuilder.Configure(o => + { + o.ClusterId = Options.ClusterId; + o.ServiceId = Options.ServiceId; + }); + siloBuilder.Configure(o => { o.SiloName = siloOptions.SiloName; @@ -590,11 +607,6 @@ public async Task CreateSiloAsync(InProcessTestSiloSpecific } }); - foreach (var hostDelegate in Options.SiloHostConfigurationDelegates) - { - hostDelegate(siloOptions, appBuilder); - } - var host = appBuilder.Build(); InitializeTestHooksSystemTarget(host); await host.StartAsync(); diff --git a/test/Extensions/TesterAzureUtils/Streaming/AQStreamingTests.cs b/test/Extensions/TesterAzureUtils/Streaming/AQStreamingTests.cs index e3d708f117..311b381aad 100644 --- a/test/Extensions/TesterAzureUtils/Streaming/AQStreamingTests.cs +++ b/test/Extensions/TesterAzureUtils/Streaming/AQStreamingTests.cs @@ -1,5 +1,7 @@ +using System.Runtime.CompilerServices; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using Orleans.Configuration; @@ -13,106 +15,137 @@ namespace Tester.AzureUtils.Streaming { [TestCategory("Streaming"), TestCategory("AzureStorage"), TestCategory("AzureQueue")] - public class AQStreamingTests : TestClusterPerTest + public class AQStreamingTests : IClassFixture { public const string AzureQueueStreamProviderName = StreamTestsConstants.AZURE_QUEUE_STREAM_PROVIDER_NAME; public const string SmsStreamProviderName = StreamTestsConstants.SMS_STREAM_PROVIDER_NAME; - private SingleStreamTestRunner runner; private const int queueCount = 8; - protected override void ConfigureTestCluster(TestClusterBuilder builder) + private readonly Fixture _fixture; + + public sealed class Fixture : IAsyncLifetime { - TestUtils.CheckForAzureStorage(); - builder.ConfigureHostConfiguration(cb => + public Fixture() { - Dictionary queueConfig = []; - void ConfigureStreaming(string option, string value) - { - var prefix = $"Orleans:Streaming:{AzureQueueStreamProviderName}:"; - queueConfig[$"{prefix}{option}"] = value; - } - - ConfigureStreaming("ProviderType", "AzureQueueStorage"); - ConfigureStreaming("ConnectionString", TestDefaultConfiguration.UseAadAuthentication - ? TestDefaultConfiguration.DataQueueUri.AbsoluteUri - : TestDefaultConfiguration.DataConnectionString); + var builder = new InProcessTestClusterBuilder(); - var names = AzureQueueUtilities.GenerateQueueNames(builder.Options.ClusterId, queueCount); - for (var i = 0; i < names.Count; i++) + TestUtils.CheckForAzureStorage(); + builder.ConfigureHost(cb => { - ConfigureStreaming($"QueueNames:{i}", names[i]); - } - - cb.AddInMemoryCollection(queueConfig); - }); + Dictionary queueConfig = []; + void ConfigureStreaming(string option, string value) + { + var prefix = $"Orleans:Streaming:{AzureQueueStreamProviderName}:"; + queueConfig[$"{prefix}{option}"] = value; + } - builder.AddSiloBuilderConfigurator(); - } + ConfigureStreaming("ProviderType", "AzureQueueStorage"); + if (TestDefaultConfiguration.UseAadAuthentication) + { + cb.AddKeyedAzureQueueClient(AzureQueueStreamProviderName, settings => + { + settings.ServiceUri = TestDefaultConfiguration.DataQueueUri; + settings.Credential = TestDefaultConfiguration.TokenCredential; + }); + ConfigureStreaming("ServiceKey", AzureQueueStreamProviderName); + } + else + { + ConfigureStreaming("ConnectionString", TestDefaultConfiguration.DataConnectionString); + } - private class SiloBuilderConfigurator : ISiloConfigurator - { - public void Configure(ISiloBuilder hostBuilder) - { - hostBuilder - .AddAzureTableGrainStorage("AzureStore", builder => builder.Configure>((options, silo) => + var names = AzureQueueUtilities.GenerateQueueNames(builder.Options.ClusterId, queueCount); + for (var i = 0; i < names.Count; i++) { - options.ConfigureTestDefaults(); - options.DeleteStateOnClear = true; - })) - .AddAzureTableGrainStorage("PubSubStore", builder => builder.Configure>((options, silo) => + ConfigureStreaming($"QueueNames:{i}", names[i]); + } + + cb.Configuration.AddInMemoryCollection(queueConfig); + }); + builder.ConfigureSilo((options, siloBuilder) => + { + siloBuilder + .AddAzureTableGrainStorage("AzureStore", builder => builder.Configure(options => { options.ConfigureTestDefaults(); options.DeleteStateOnClear = true; })) - .AddMemoryGrainStorage("MemoryStore"); + .AddAzureTableGrainStorage("PubSubStore", builder => builder.Configure(options => + { + options.ConfigureTestDefaults(); + options.DeleteStateOnClear = true; + })) + .AddMemoryGrainStorage("MemoryStore"); + }); + builder.ConfigureClient(clientBuilder => + { + clientBuilder + .AddAzureQueueStreams(AzureQueueStreamProviderName, b=> + b.ConfigureAzureQueue(ob=>ob.Configure>( + (options, dep) => + { + options.ConfigureTestDefaults(); + options.QueueNames = AzureQueueUtilities.GenerateQueueNames(dep.Value.ClusterId, queueCount); + }))); + }); + Cluster = builder.Build(); } - } - public override async Task InitializeAsync() - { - await base.InitializeAsync(); - runner = new SingleStreamTestRunner(this.InternalClient, SingleStreamTestRunner.AQ_STREAM_PROVIDER_NAME); - } + public InProcessTestCluster Cluster { get; } + public SingleStreamTestRunner Runner { get; private set; } - public override async Task DisposeAsync() - { - await base.DisposeAsync(); - try + public async Task DisposeAsync() { - TestUtils.CheckForAzureStorage(); - await AzureQueueStreamProviderUtils.ClearAllUsedAzureQueues(NullLoggerFactory.Instance, - AzureQueueUtilities.GenerateQueueNames(this.HostedCluster.Options.ClusterId, queueCount), - new AzureQueueOptions().ConfigureTestDefaults()); + try + { + TestUtils.CheckForAzureStorage(); + await AzureQueueStreamProviderUtils.ClearAllUsedAzureQueues(NullLoggerFactory.Instance, + AzureQueueUtilities.GenerateQueueNames(Cluster.Options.ClusterId, queueCount), + new AzureQueueOptions().ConfigureTestDefaults()); + } + catch (SkipException) + { + // ignore + } + + await Cluster.DisposeAsync(); } - catch (SkipException) + + public async Task InitializeAsync() { - // ignore + await Cluster.DeployAsync(); + Runner = new SingleStreamTestRunner(Cluster.InternalClient, SingleStreamTestRunner.AQ_STREAM_PROVIDER_NAME); } } + public AQStreamingTests(Fixture fixture) + { + _fixture = fixture; + } + ////------------------------ One to One ----------------------// [SkippableFact, TestCategory("Functional")] public async Task AQ_01_OneProducerGrainOneConsumerGrain() { - await runner.StreamTest_01_OneProducerGrainOneConsumerGrain(); + await _fixture.Runner.StreamTest_01_OneProducerGrainOneConsumerGrain(); } [SkippableFact, TestCategory("Functional")] public async Task AQ_02_OneProducerGrainOneConsumerClient() { - await runner.StreamTest_02_OneProducerGrainOneConsumerClient(); + await _fixture.Runner.StreamTest_02_OneProducerGrainOneConsumerClient(); } [SkippableFact, TestCategory("Functional")] public async Task AQ_03_OneProducerClientOneConsumerGrain() { - await runner.StreamTest_03_OneProducerClientOneConsumerGrain(); + await _fixture.Runner.StreamTest_03_OneProducerClientOneConsumerGrain(); } [SkippableFact, TestCategory("Functional")] public async Task AQ_04_OneProducerClientOneConsumerClient() { - await runner.StreamTest_04_OneProducerClientOneConsumerClient(); + await _fixture.Runner.StreamTest_04_OneProducerClientOneConsumerClient(); } //------------------------ MANY to Many different grains ----------------------// @@ -120,50 +153,50 @@ public async Task AQ_04_OneProducerClientOneConsumerClient() [SkippableFact, TestCategory("Functional")] public async Task AQ_05_ManyDifferent_ManyProducerGrainsManyConsumerGrains() { - await runner.StreamTest_05_ManyDifferent_ManyProducerGrainsManyConsumerGrains(); + await _fixture.Runner.StreamTest_05_ManyDifferent_ManyProducerGrainsManyConsumerGrains(); } [SkippableFact, TestCategory("Functional")] public async Task AQ_06_ManyDifferent_ManyProducerGrainManyConsumerClients() { - await runner.StreamTest_06_ManyDifferent_ManyProducerGrainManyConsumerClients(); + await _fixture.Runner.StreamTest_06_ManyDifferent_ManyProducerGrainManyConsumerClients(); } [SkippableFact(Skip = "https://github.com/dotnet/orleans/issues/5648"), TestCategory("Functional")] public async Task AQ_07_ManyDifferent_ManyProducerClientsManyConsumerGrains() { - await runner.StreamTest_07_ManyDifferent_ManyProducerClientsManyConsumerGrains(); + await _fixture.Runner.StreamTest_07_ManyDifferent_ManyProducerClientsManyConsumerGrains(); } [SkippableFact, TestCategory("Functional")] public async Task AQ_08_ManyDifferent_ManyProducerClientsManyConsumerClients() { - await runner.StreamTest_08_ManyDifferent_ManyProducerClientsManyConsumerClients(); + await _fixture.Runner.StreamTest_08_ManyDifferent_ManyProducerClientsManyConsumerClients(); } //------------------------ MANY to Many Same grains ----------------------// [SkippableFact, TestCategory("Functional")] public async Task AQ_09_ManySame_ManyProducerGrainsManyConsumerGrains() { - await runner.StreamTest_09_ManySame_ManyProducerGrainsManyConsumerGrains(); + await _fixture.Runner.StreamTest_09_ManySame_ManyProducerGrainsManyConsumerGrains(); } [SkippableFact, TestCategory("Functional")] public async Task AQ_10_ManySame_ManyConsumerGrainsManyProducerGrains() { - await runner.StreamTest_10_ManySame_ManyConsumerGrainsManyProducerGrains(); + await _fixture.Runner.StreamTest_10_ManySame_ManyConsumerGrainsManyProducerGrains(); } [SkippableFact, TestCategory("Functional")] public async Task AQ_11_ManySame_ManyProducerGrainsManyConsumerClients() { - await runner.StreamTest_11_ManySame_ManyProducerGrainsManyConsumerClients(); + await _fixture.Runner.StreamTest_11_ManySame_ManyProducerGrainsManyConsumerClients(); } [SkippableFact, TestCategory("Functional")] public async Task AQ_12_ManySame_ManyProducerClientsManyConsumerGrains() { - await runner.StreamTest_12_ManySame_ManyProducerClientsManyConsumerGrains(); + await _fixture.Runner.StreamTest_12_ManySame_ManyProducerClientsManyConsumerGrains(); } //------------------------ MANY to Many producer consumer same grain ----------------------// @@ -171,13 +204,13 @@ public async Task AQ_12_ManySame_ManyProducerClientsManyConsumerGrains() [SkippableFact, TestCategory("Functional")] public async Task AQ_13_SameGrain_ConsumerFirstProducerLater() { - await runner.StreamTest_13_SameGrain_ConsumerFirstProducerLater(false); + await _fixture.Runner.StreamTest_13_SameGrain_ConsumerFirstProducerLater(false); } [SkippableFact, TestCategory("Functional")] public async Task AQ_14_SameGrain_ProducerFirstConsumerLater() { - await runner.StreamTest_14_SameGrain_ProducerFirstConsumerLater(false); + await _fixture.Runner.StreamTest_14_SameGrain_ProducerFirstConsumerLater(false); } //----------------------------------------------// @@ -185,22 +218,22 @@ public async Task AQ_14_SameGrain_ProducerFirstConsumerLater() [SkippableFact, TestCategory("Functional")] public async Task AQ_15_ConsumeAtProducersRequest() { - await runner.StreamTest_15_ConsumeAtProducersRequest(); + await _fixture.Runner.StreamTest_15_ConsumeAtProducersRequest(); } [SkippableFact, TestCategory("Functional")] public async Task AQ_16_MultipleStreams_ManyDifferent_ManyProducerGrainsManyConsumerGrains() { - var multiRunner = new MultipleStreamsTestRunner(this.InternalClient, SingleStreamTestRunner.AQ_STREAM_PROVIDER_NAME, 16, false); + var multiRunner = new MultipleStreamsTestRunner(_fixture.Cluster.InternalClient, SingleStreamTestRunner.AQ_STREAM_PROVIDER_NAME, 16, false); await multiRunner.StreamTest_MultipleStreams_ManyDifferent_ManyProducerGrainsManyConsumerGrains(); } [SkippableFact, TestCategory("Functional")] public async Task AQ_17_MultipleStreams_1J_ManyProducerGrainsManyConsumerGrains() { - var multiRunner = new MultipleStreamsTestRunner(this.InternalClient, SingleStreamTestRunner.AQ_STREAM_PROVIDER_NAME, 17, false); + var multiRunner = new MultipleStreamsTestRunner(_fixture.Cluster.InternalClient, SingleStreamTestRunner.AQ_STREAM_PROVIDER_NAME, 17, false); await multiRunner.StreamTest_MultipleStreams_ManyDifferent_ManyProducerGrainsManyConsumerGrains( - this.HostedCluster.StartAdditionalSilo); + _fixture.Cluster.StartAdditionalSilo); } //[SkippableFact, TestCategory("BVT")] @@ -216,21 +249,21 @@ await multiRunner.StreamTest_MultipleStreams_ManyDifferent_ManyProducerGrainsMan public async Task AQ_19_ConsumerImplicitlySubscribedToProducerClient() { // todo: currently, the Azure queue queue adaptor doesn't support namespaces, so this test will fail. - await runner.StreamTest_19_ConsumerImplicitlySubscribedToProducerClient(); + await _fixture.Runner.StreamTest_19_ConsumerImplicitlySubscribedToProducerClient(); } [SkippableFact] public async Task AQ_20_ConsumerImplicitlySubscribedToProducerGrain() { // todo: currently, the Azure queue queue adaptor doesn't support namespaces, so this test will fail. - await runner.StreamTest_20_ConsumerImplicitlySubscribedToProducerGrain(); + await _fixture.Runner.StreamTest_20_ConsumerImplicitlySubscribedToProducerGrain(); } [SkippableFact(Skip = "Ignored"), TestCategory("Failures")] public async Task AQ_21_GenericConsumerImplicitlySubscribedToProducerGrain() { // todo: currently, the Azure queue queue adaptor doesn't support namespaces, so this test will fail. - await runner.StreamTest_21_GenericConsumerImplicitlySubscribedToProducerGrain(); + await _fixture.Runner.StreamTest_21_GenericConsumerImplicitlySubscribedToProducerGrain(); } } } diff --git a/test/Extensions/TesterAzureUtils/Tester.AzureUtils.csproj b/test/Extensions/TesterAzureUtils/Tester.AzureUtils.csproj index 5e21aaff96..db398c4ff2 100644 --- a/test/Extensions/TesterAzureUtils/Tester.AzureUtils.csproj +++ b/test/Extensions/TesterAzureUtils/Tester.AzureUtils.csproj @@ -10,6 +10,7 @@ +