Skip to content

Commit

Permalink
WaitAnnotation and ResourceNotificationService.WaitForDependenciesAsy…
Browse files Browse the repository at this point in the history
…nc (#5842)
  • Loading branch information
mitchdenny authored Sep 23, 2024
1 parent aee5a84 commit 570db52
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 107 deletions.
133 changes: 130 additions & 3 deletions src/Aspire.Hosting/ApplicationModel/ResourceNotificationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Immutable;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

Expand All @@ -20,6 +21,7 @@ public class ResourceNotificationService
private readonly ILogger<ResourceNotificationService> _logger;
private readonly IServiceProvider _serviceProvider;
private readonly CancellationToken _applicationStopping;
private readonly ResourceLoggerService _resourceLoggerService;

private Action<ResourceEvent>? OnResourceUpdated { get; set; }

Expand All @@ -33,28 +35,31 @@ public class ResourceNotificationService
/// <param name="logger">The logger.</param>
/// <param name="hostApplicationLifetime">The host application lifetime.</param>
[Obsolete($"""
{nameof(ResourceNotificationService)} now requires an {nameof(IServiceProvider)}.
Use the constructor that accepts an {nameof(ILogger)}<{nameof(ResourceNotificationService)}>, {nameof(IHostApplicationLifetime)} and {nameof(IServiceProvider)}.
{nameof(ResourceNotificationService)} now requires an {nameof(IServiceProvider)} and {nameof(ResourceLoggerService)}.
Use the constructor that accepts an {nameof(ILogger)}<{nameof(ResourceNotificationService)}>, {nameof(IHostApplicationLifetime)}, {nameof(IServiceProvider)} and {nameof(ResourceLoggerService)}.
This constructor will be removed in the next major version of Aspire.
""")]
public ResourceNotificationService(ILogger<ResourceNotificationService> logger, IHostApplicationLifetime hostApplicationLifetime)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_serviceProvider = new NullServiceProvider();
_applicationStopping = hostApplicationLifetime?.ApplicationStopping ?? throw new ArgumentNullException(nameof(hostApplicationLifetime));
_resourceLoggerService = new ResourceLoggerService();
}

/// <summary>
/// Creates a new instance of <see cref="ResourceNotificationService"/>.
/// </summary>
/// <param name="logger">The logger.</param>
/// <param name="hostApplicationLifetime">The host application lifetime.</param>
/// <param name="resourceLoggerService">The resource logger service.</param>
/// <param name="serviceProvider">The service provider.</param>
public ResourceNotificationService(ILogger<ResourceNotificationService> logger, IHostApplicationLifetime hostApplicationLifetime, IServiceProvider serviceProvider)
public ResourceNotificationService(ILogger<ResourceNotificationService> logger, IHostApplicationLifetime hostApplicationLifetime, IServiceProvider serviceProvider, ResourceLoggerService resourceLoggerService)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_serviceProvider = serviceProvider;
_applicationStopping = hostApplicationLifetime?.ApplicationStopping ?? throw new ArgumentNullException(nameof(hostApplicationLifetime));
_resourceLoggerService = resourceLoggerService ?? throw new ArgumentNullException(nameof(resourceLoggerService));
}

private class NullServiceProvider : IServiceProvider
Expand Down Expand Up @@ -115,6 +120,128 @@ public async Task<string> WaitForResourceAsync(string resourceName, IEnumerable<
throw new OperationCanceledException($"The operation was cancelled before the resource reached one of the target states: [{string.Join(", ", targetStates)}]");
}

private async Task WaitUntilHealthyAsync(IResource resource, IResource dependency, CancellationToken cancellationToken)
{
var resourceLogger = _resourceLoggerService.GetLogger(resource);
resourceLogger.LogInformation("Waiting for resource '{Name}' to enter the '{State}' state.", dependency.Name, KnownResourceStates.Running);

await PublishUpdateAsync(resource, s => s with { State = KnownResourceStates.Waiting }).ConfigureAwait(false);
var resourceEvent = await WaitForResourceAsync(dependency.Name, re => IsContinuableState(re.Snapshot), cancellationToken: cancellationToken).ConfigureAwait(false);
var snapshot = resourceEvent.Snapshot;

if (snapshot.State?.Text == KnownResourceStates.FailedToStart)
{
resourceLogger.LogError(
"Dependency resource '{ResourceName}' failed to start.",
dependency.Name
);

throw new DistributedApplicationException($"Dependency resource '{dependency.Name}' failed to start.");
}
else if (snapshot.State!.Text == KnownResourceStates.Finished || snapshot.State!.Text == KnownResourceStates.Exited)
{
resourceLogger.LogError(
"Resource '{ResourceName}' has entered the '{State}' state prematurely.",
dependency.Name,
snapshot.State.Text
);

throw new DistributedApplicationException(
$"Resource '{dependency.Name}' has entered the '{snapshot.State.Text}' state prematurely."
);
}

// If our dependency resource has health check annotations we want to wait until they turn healthy
// otherwise we don't care about their health status.
if (dependency.TryGetAnnotationsOfType<HealthCheckAnnotation>(out var _))
{
resourceLogger.LogInformation("Waiting for resource '{Name}' to become healthy.", dependency.Name);
await WaitForResourceAsync(dependency.Name, re => re.Snapshot.HealthStatus == HealthStatus.Healthy, cancellationToken: cancellationToken).ConfigureAwait(false);
}

resourceLogger.LogInformation("Finished waiting for resource '{Name}'.", dependency.Name);

static bool IsContinuableState(CustomResourceSnapshot snapshot) =>
snapshot.State?.Text == KnownResourceStates.Running ||
snapshot.State?.Text == KnownResourceStates.Finished ||
snapshot.State?.Text == KnownResourceStates.Exited ||
snapshot.State?.Text == KnownResourceStates.FailedToStart;
}

private async Task WaitUntilCompletionAsync(IResource resource, IResource dependency, int exitCode, CancellationToken cancellationToken)
{
if (dependency.TryGetLastAnnotation<ReplicaAnnotation>(out var replicaAnnotation) && replicaAnnotation.Replicas > 1)
{
throw new DistributedApplicationException("WaitForCompletion cannot be used with resources that have replicas.");
}

var resourceLogger = _resourceLoggerService.GetLogger(resource);
resourceLogger.LogInformation("Waiting for resource '{Name}' to complete.", dependency.Name);

await PublishUpdateAsync(resource, s => s with { State = KnownResourceStates.Waiting }).ConfigureAwait(false);
var resourceEvent = await WaitForResourceAsync(dependency.Name, re => IsKnownTerminalState(re.Snapshot), cancellationToken: cancellationToken).ConfigureAwait(false);
var snapshot = resourceEvent.Snapshot;

if (snapshot.State?.Text == KnownResourceStates.FailedToStart)
{
resourceLogger.LogError(
"Dependency resource '{ResourceName}' failed to start.",
dependency.Name
);

throw new DistributedApplicationException($"Dependency resource '{dependency.Name}' failed to start.");
}
else if ((snapshot.State!.Text == KnownResourceStates.Finished || snapshot.State!.Text == KnownResourceStates.Exited) && snapshot.ExitCode is not null && snapshot.ExitCode != exitCode)
{
resourceLogger.LogError(
"Resource '{ResourceName}' has entered the '{State}' state with exit code '{ExitCode}' expected '{ExpectedExitCode}'.",
dependency.Name,
snapshot.State.Text,
snapshot.ExitCode,
exitCode
);

throw new DistributedApplicationException(
$"Resource '{dependency.Name}' has entered the '{snapshot.State.Text}' state with exit code '{snapshot.ExitCode}', expected '{exitCode}'."
);
}

resourceLogger.LogInformation("Finished waiting for resource '{Name}'.", dependency.Name);

static bool IsKnownTerminalState(CustomResourceSnapshot snapshot) =>
KnownResourceStates.TerminalStates.Contains(snapshot.State?.Text) ||
snapshot.ExitCode is not null;
}

/// <summary>
/// Waits for all dependencies of the resource to be ready.
/// </summary>
/// <param name="resource">The resource with dependencies to wait for.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task.</returns>
/// <exception cref="DistributedApplicationException"></exception>
public async Task WaitForDependenciesAsync(IResource resource, CancellationToken cancellationToken)
{
if (!resource.TryGetAnnotationsOfType<WaitAnnotation>(out var waitAnnotations))
{
return;
}

var pendingDependencies = new List<Task>();
foreach (var waitAnnotation in waitAnnotations)
{
var pendingDependency = waitAnnotation.WaitType switch
{
WaitType.WaitUntilHealthy => WaitUntilHealthyAsync(resource, waitAnnotation.Resource, cancellationToken),
WaitType.WaitForCompletion => WaitUntilCompletionAsync(resource, waitAnnotation.Resource, waitAnnotation.ExitCode, cancellationToken),
_ => throw new DistributedApplicationException($"Unexpected wait type: {waitAnnotation.WaitType}")
};
pendingDependencies.Add(pendingDependency);
}

await Task.WhenAll(pendingDependencies).ConfigureAwait(false);
}

/// <summary>
/// Waits until a resource satisfies the specified predicate.
/// </summary>
Expand Down
50 changes: 50 additions & 0 deletions src/Aspire.Hosting/ApplicationModel/WaitAnnotation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics;

namespace Aspire.Hosting.ApplicationModel;

/// <summary>
/// Represents a wait relationship between two resources.
/// </summary>
/// <param name="resource">The resource that will be waited on.</param>
/// <param name="waitType">The type of wait to apply to the dependency resource.</param>
/// <param name="exitCode">The exit code that the resource must return for the wait to be satisfied.</param>
/// <remarks>
/// The holder of this annotation is waiting on the resource in the <see cref="WaitAnnotation.Resource"/> property.
/// </remarks>
[DebuggerDisplay("Resource = {Resource.Name}")]
public class WaitAnnotation(IResource resource, WaitType waitType, int exitCode = 0) : IResourceAnnotation
{
/// <summary>
/// The resource that will be waited on.
/// </summary>
public IResource Resource { get; } = resource;

/// <summary>
/// The type of wait to apply to the dependency resource.
/// </summary>
public WaitType WaitType { get; } = waitType;

/// <summary>
/// The exit code that the resource must return for the wait to be satisfied.
/// </summary>
public int ExitCode { get; } = exitCode;
}

/// <summary>
/// Specifies the type of Wait applied to dependency resources.
/// </summary>
public enum WaitType
{
/// <summary>
/// Dependent resource will wait until resource starts and all health checks are satisfied.
/// </summary>
WaitUntilHealthy,

/// <summary>
/// Dependent resource will wait until resource completes.
/// </summary>
WaitForCompletion
}
7 changes: 7 additions & 0 deletions src/Aspire.Hosting/DistributedApplicationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@ public DistributedApplicationBuilder(DistributedApplicationOptions options)

ExecutionContext = new DistributedApplicationExecutionContext(_executionContextOptions);

//
Eventing.Subscribe<BeforeResourceStartedEvent>(async (@event, ct) =>
{
var rns = @event.Services.GetRequiredService<ResourceNotificationService>();
await rns.WaitForDependenciesAsync(@event.Resource, ct).ConfigureAwait(false);
});

// Core things
_innerBuilder.Services.AddSingleton(sp => new DistributedApplicationModel(Resources));
_innerBuilder.Services.AddHostedService<DistributedApplicationLifecycle>();
Expand Down
11 changes: 10 additions & 1 deletion src/Aspire.Hosting/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ Aspire.Hosting.ApplicationModel.ResourceCommandState.Disabled = 1 -> Aspire.Host
Aspire.Hosting.ApplicationModel.ResourceCommandState.Enabled = 0 -> Aspire.Hosting.ApplicationModel.ResourceCommandState
Aspire.Hosting.ApplicationModel.ResourceCommandState.Hidden = 2 -> Aspire.Hosting.ApplicationModel.ResourceCommandState
Aspire.Hosting.ApplicationModel.ResourceNotificationService.ResourceNotificationService(Microsoft.Extensions.Logging.ILogger<Aspire.Hosting.ApplicationModel.ResourceNotificationService!>! logger, Microsoft.Extensions.Hosting.IHostApplicationLifetime! hostApplicationLifetime) -> void
Aspire.Hosting.ApplicationModel.ResourceNotificationService.ResourceNotificationService(Microsoft.Extensions.Logging.ILogger<Aspire.Hosting.ApplicationModel.ResourceNotificationService!>! logger, Microsoft.Extensions.Hosting.IHostApplicationLifetime! hostApplicationLifetime, System.IServiceProvider! serviceProvider) -> void
Aspire.Hosting.ApplicationModel.ResourceNotificationService.ResourceNotificationService(Microsoft.Extensions.Logging.ILogger<Aspire.Hosting.ApplicationModel.ResourceNotificationService!>! logger, Microsoft.Extensions.Hosting.IHostApplicationLifetime! hostApplicationLifetime, System.IServiceProvider! serviceProvider, Aspire.Hosting.ApplicationModel.ResourceLoggerService! resourceLoggerService) -> void
Aspire.Hosting.ApplicationModel.ResourceNotificationService.WaitForDependenciesAsync(Aspire.Hosting.ApplicationModel.IResource! resource, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task!
Aspire.Hosting.ApplicationModel.ResourceNotificationService.WaitForResourceAsync(string! resourceName, System.Func<Aspire.Hosting.ApplicationModel.ResourceEvent!, bool>! predicate, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<Aspire.Hosting.ApplicationModel.ResourceEvent!>!
Aspire.Hosting.ApplicationModel.ResourcePropertySnapshot.IsSensitive.get -> bool
Aspire.Hosting.ApplicationModel.ResourcePropertySnapshot.IsSensitive.init -> void
Expand All @@ -109,6 +110,14 @@ Aspire.Hosting.ApplicationModel.VolumeSnapshot.Source.init -> void
Aspire.Hosting.ApplicationModel.VolumeSnapshot.Target.get -> string!
Aspire.Hosting.ApplicationModel.VolumeSnapshot.Target.init -> void
Aspire.Hosting.ApplicationModel.VolumeSnapshot.VolumeSnapshot(string? Source, string! Target, string! MountType, bool IsReadOnly) -> void
Aspire.Hosting.ApplicationModel.WaitAnnotation
Aspire.Hosting.ApplicationModel.WaitAnnotation.ExitCode.get -> int
Aspire.Hosting.ApplicationModel.WaitAnnotation.Resource.get -> Aspire.Hosting.ApplicationModel.IResource!
Aspire.Hosting.ApplicationModel.WaitAnnotation.WaitAnnotation(Aspire.Hosting.ApplicationModel.IResource! resource, Aspire.Hosting.ApplicationModel.WaitType waitType, int exitCode = 0) -> void
Aspire.Hosting.ApplicationModel.WaitAnnotation.WaitType.get -> Aspire.Hosting.ApplicationModel.WaitType
Aspire.Hosting.ApplicationModel.WaitType
Aspire.Hosting.ApplicationModel.WaitType.WaitForCompletion = 1 -> Aspire.Hosting.ApplicationModel.WaitType
Aspire.Hosting.ApplicationModel.WaitType.WaitUntilHealthy = 0 -> Aspire.Hosting.ApplicationModel.WaitType
Aspire.Hosting.DistributedApplicationBuilder.AppHostPath.get -> string!
Aspire.Hosting.DistributedApplicationBuilder.Eventing.get -> Aspire.Hosting.Eventing.IDistributedApplicationEventing!
Aspire.Hosting.Eventing.DistributedApplicationEventing
Expand Down
Loading

0 comments on commit 570db52

Please sign in to comment.