Skip to content

Commit

Permalink
chore: Merge branch 'feat/layer-restructure-refactor' as 0.8.0-beta
Browse files Browse the repository at this point in the history
  • Loading branch information
bojidar-bg committed Oct 29, 2021
2 parents aa729de + c712819 commit ca3d63e
Show file tree
Hide file tree
Showing 114 changed files with 2,602 additions and 3,280 deletions.
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,7 @@ csharp_preserve_single_line_blocks = true
csharp_preserve_single_line_statements = true

# Using directive preferences
csharp_using_directive_placement = true
csharp_using_directive_placement = outside_namespace



Expand Down
104 changes: 39 additions & 65 deletions agent/dotnet/src/Perper/Application/PerperStartup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ public class PerperStartup
public string Agent { get; }
public bool UseInstances { get; set; } = false;
private readonly List<Func<Task>> initHandlers = new();
private readonly Dictionary<string, Func<Task>> callHandlers = new();
private readonly Dictionary<string, Func<Task>> streamHandlers = new();
private readonly Dictionary<string, Func<Task>> executionHandlers = new();

public PerperStartup(string agent) => Agent = agent;

Expand All @@ -32,15 +31,9 @@ public PerperStartup AddInitHandler(Func<Task> handler)
return this;
}

public PerperStartup AddCallHandler(string @delegate, Func<Task> handler)
public PerperStartup AddHandler(string @delegate, Func<Task> handler)
{
callHandlers.Add(@delegate, handler);
return this;
}

public PerperStartup AddStreamHandler(string @delegate, Func<Task> handler)
{
streamHandlers.Add(@delegate, handler);
executionHandlers.Add(@delegate, handler);
return this;
}

Expand All @@ -54,7 +47,7 @@ public PerperStartup WithInstances()

public async Task RunAsync(CancellationToken cancellationToken = default)
{
await EnterServicesContext(Agent, () => RunInServiceContext(cancellationToken), UseInstances).ConfigureAwait(false);
await EnterServicesContext(() => RunInServiceContext(cancellationToken)).ConfigureAwait(false);
}

public static Task RunAsync(string agent, CancellationToken cancellationToken = default)
Expand All @@ -70,31 +63,24 @@ public static Task RunAsync(string agent, string rootNamespace, CancellationToke
#endregion RunAsync
#region Services

public static async Task EnterServicesContext(string agent, Func<Task> context, bool useInstance = false)
public static async Task EnterServicesContext(Func<Task> context)
{
var (cacheService, notificationService) = await EstablishConnection(agent, useInstance).ConfigureAwait(false);
var fabricService = await EstablishConnection().ConfigureAwait(false);

AsyncLocals.SetConnection(cacheService, notificationService);
AsyncLocals.SetConnection(fabricService);

await context().ConfigureAwait(false);

await notificationService.StopAsync().ConfigureAwait(false);
notificationService.Dispose();
await fabricService.DisposeAsync().ConfigureAwait(false);
}

public static async Task<(CacheService, NotificationService)> EstablishConnection(string agent, bool useInstance = false)
public static async Task<FabricService> EstablishConnection()
{
var apacheIgniteEndpoint = Environment.GetEnvironmentVariable("APACHE_IGNITE_ENDPOINT") ?? "127.0.0.1:10800";
var fabricGrpcAddress = Environment.GetEnvironmentVariable("PERPER_FABRIC_ENDPOINT") ?? "http://127.0.0.1:40400";
string? instance = null;

Console.WriteLine($"APACHE_IGNITE_ENDPOINT: {apacheIgniteEndpoint}");
Console.WriteLine($"PERPER_FABRIC_ENDPOINT: {fabricGrpcAddress}");
if (useInstance)
{
instance = Environment.GetEnvironmentVariable("X_PERPER_INSTANCE") ?? "";
Console.WriteLine($"X_PERPER_INSTANCE: {instance}");
}

var igniteConfiguration = new IgniteClientConfiguration
{
Expand All @@ -107,85 +93,73 @@ public static async Task EnterServicesContext(string agent, Func<Task> context,
}
};

AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
var grpcChannel = GrpcChannel.ForAddress(fabricGrpcAddress);

var ignite = await Policy
.HandleInner<System.Net.Sockets.SocketException>()
.WaitAndRetryAsync(10,
attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt - 2)),
(exception, timespan) => Console.WriteLine("Failed to connect to Ignite, retrying in {0}s", timespan.TotalSeconds))
.ExecuteAsync(() => Task.Run(() => Ignition.StartClient(igniteConfiguration))).ConfigureAwait(false);

var cacheService = new CacheService(ignite);
var notificationService = new NotificationService(ignite, grpcChannel, agent, instance);
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
var grpcChannel = GrpcChannel.ForAddress(fabricGrpcAddress);

await Policy
.Handle<Grpc.Core.RpcException>(ex => ex.Status.DebugException is System.Net.Http.HttpRequestException)
.WaitAndRetryAsync(10,
attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt - 2)),
(exception, timespan) => Console.WriteLine("Failed to connect to GRPC, retrying in {0}s", timespan.TotalSeconds))
.ExecuteAsync(notificationService.StartAsync).ConfigureAwait(false);
return new FabricService(ignite, grpcChannel);
}

return (cacheService, notificationService);
public static string GetConfiguredInstance()
{
return Environment.GetEnvironmentVariable("X_PERPER_INSTANCE") ?? "";
}

#endregion Services
#region ListenNotifications

public Task RunInServiceContext(CancellationToken cancellationToken = default)
{
var instance = UseInstances ? null : GetConfiguredInstance();

if (instance != null)
{
Console.WriteLine($"X_PERPER_INSTANCE: {instance}");
}

var taskCollection = new TaskCollection();

callHandlers.TryAdd(PerperContext.StartupFunctionName, async () =>
executionHandlers.TryAdd(PerperContext.StartupFunctionName, async () =>
{
await AsyncLocals.CacheService.CallWriteFinished(AsyncLocals.Execution).ConfigureAwait(false);
await AsyncLocals.FabricService.WriteExecutionFinished(AsyncLocals.Execution).ConfigureAwait(false);
});

var initInstance = instance ?? $"{Agent}-init";
var initExecution = new FabricExecution(Agent, initInstance, "Init", $"{initInstance}-init", cancellationToken);
foreach (var handler in initHandlers)
{
taskCollection.Add(AsyncLocals.EnterContext($"{AsyncLocals.Agent}-init", $"Init-init", handler));
}

foreach (var (@delegate, handler) in callHandlers)
{
ListenCallNotifications(taskCollection, @delegate, handler, cancellationToken);
taskCollection.Add(async () =>
{
AsyncLocals.SetExecution(initExecution);
await handler().ConfigureAwait(false);
});
}

foreach (var (@delegate, handler) in streamHandlers)
foreach (var (@delegate, handler) in executionHandlers)
{
ListenStreamNotifications(taskCollection, @delegate, handler, cancellationToken);
ListenExecutions(taskCollection, Agent, instance, @delegate, handler, cancellationToken);
}

return taskCollection.GetTask();
}

public static void ListenCallNotifications(TaskCollection taskCollection, string @delegate, Func<Task> handler, CancellationToken cancellationToken)
{
taskCollection.Add(async () =>
{
await foreach (var (key, notification) in AsyncLocals.NotificationService.GetCallTriggerNotifications(@delegate).ReadAllAsync(cancellationToken))
{
taskCollection.Add(AsyncLocals.EnterContext(notification.Instance, notification.Call, async () =>
{
await handler().ConfigureAwait(false);
await AsyncLocals.NotificationService.ConsumeNotification(key).ConfigureAwait(false); // TODO?
}));
}
});
}

public static void ListenStreamNotifications(TaskCollection taskCollection, string @delegate, Func<Task> handler, CancellationToken cancellationToken)
public static void ListenExecutions(TaskCollection taskCollection, string agent, string? instance, string @delegate, Func<Task> handler, CancellationToken cancellationToken)
{
taskCollection.Add(async () =>
{
await foreach (var (key, notification) in AsyncLocals.NotificationService.GetStreamTriggerNotifications(@delegate).ReadAllAsync(cancellationToken))
await foreach (var execution in AsyncLocals.FabricService.GetExecutionsReader(agent, instance, @delegate).ReadAllAsync(cancellationToken))
{
taskCollection.Add(AsyncLocals.EnterContext(notification.Instance, notification.Stream, async () =>
taskCollection.Add(async () =>
{
AsyncLocals.SetExecution(execution);
await handler().ConfigureAwait(false);
await AsyncLocals.NotificationService.ConsumeNotification(key).ConfigureAwait(false); // TODO?
}));
});
}
});
}
Expand Down
Loading

0 comments on commit ca3d63e

Please sign in to comment.