Skip to content

Commit

Permalink
refactor!: Cleanup C# and Kotlin code
Browse files Browse the repository at this point in the history
  • Loading branch information
bojidar-bg committed Oct 29, 2021
1 parent 92cb833 commit d367dde
Show file tree
Hide file tree
Showing 29 changed files with 593 additions and 553 deletions.
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,7 @@ csharp_preserve_single_line_blocks = true
csharp_preserve_single_line_statements = true

# Using directive preferences
csharp_using_directive_placement = true
csharp_using_directive_placement = outside_namespace



Expand Down
31 changes: 18 additions & 13 deletions agent/dotnet/src/Perper/Application/PerperStartup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,16 @@ public static Task RunAsync(string agent, string rootNamespace, CancellationToke

public static async Task EnterServicesContext(Func<Task> context)
{
var (cacheService, notificationService) = await EstablishConnection().ConfigureAwait(false);
var fabricService = await EstablishConnection().ConfigureAwait(false);

AsyncLocals.SetConnection(cacheService, notificationService);
AsyncLocals.SetConnection(fabricService);

await context().ConfigureAwait(false);

await notificationService.DisposeAsync().ConfigureAwait(false);
await fabricService.DisposeAsync().ConfigureAwait(false);
}

public static async Task<(CacheService, NotificationService)> EstablishConnection()
public static async Task<FabricService> EstablishConnection()
{
var apacheIgniteEndpoint = Environment.GetEnvironmentVariable("APACHE_IGNITE_ENDPOINT") ?? "127.0.0.1:10800";
var fabricGrpcAddress = Environment.GetEnvironmentVariable("PERPER_FABRIC_ENDPOINT") ?? "http://127.0.0.1:40400";
Expand All @@ -100,13 +100,10 @@ public static async Task EnterServicesContext(Func<Task> context)
(exception, timespan) => Console.WriteLine("Failed to connect to Ignite, retrying in {0}s", timespan.TotalSeconds))
.ExecuteAsync(() => Task.Run(() => Ignition.StartClient(igniteConfiguration))).ConfigureAwait(false);;

var cacheService = new CacheService(ignite);

AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
var grpcChannel = GrpcChannel.ForAddress(fabricGrpcAddress);
var notificationService = new NotificationService(grpcChannel);

return (cacheService, notificationService);
return new FabricService(ignite, grpcChannel);
}

#endregion Services
Expand All @@ -125,13 +122,17 @@ public Task RunInServiceContext(CancellationToken cancellationToken = default)

executionHandlers.TryAdd(PerperContext.StartupFunctionName, async () =>
{
await AsyncLocals.CacheService.WriteExecutionFinished(AsyncLocals.Execution).ConfigureAwait(false);
await AsyncLocals.FabricService.WriteExecutionFinished(AsyncLocals.Execution).ConfigureAwait(false);
});

var initExecution = new Execution(Agent, instance ?? $"{Agent}-init", "Init", $"{Agent}-init", cancellationToken);
foreach (var handler in initHandlers)
{
var initExecution = new ExecutionRecord(Agent, instance ?? $"{Agent}-init", "Init", $"{Agent}-init", cancellationToken);
taskCollection.Add(AsyncLocals.EnterContext(initExecution, handler));
taskCollection.Add(async () =>
{
AsyncLocals.SetExecution(initExecution);
await handler().ConfigureAwait(false);
});
}

foreach (var (@delegate, handler) in executionHandlers)
Expand All @@ -146,9 +147,13 @@ public static void ListenExecutions(TaskCollection taskCollection, string agent,
{
taskCollection.Add(async () =>
{
await foreach (var execution in AsyncLocals.NotificationService.GetExecutionsReader(agent, instance, @delegate).ReadAllAsync(cancellationToken))
await foreach (var execution in AsyncLocals.FabricService.GetExecutionsReader(agent, instance, @delegate).ReadAllAsync(cancellationToken))
{
taskCollection.Add(AsyncLocals.EnterContext(execution, handler));
taskCollection.Add(async () =>
{
AsyncLocals.SetExecution(execution);
await handler().ConfigureAwait(false);
});
}
});
}
Expand Down
266 changes: 109 additions & 157 deletions agent/dotnet/src/Perper/Application/PerperStartupExtensions.cs

Large diffs are not rendered by default.

36 changes: 8 additions & 28 deletions agent/dotnet/src/Perper/Extensions/AsyncLocals.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,23 @@ namespace Perper.Extensions
{
public static class AsyncLocals
{
private static readonly AsyncLocal<(CacheService, NotificationService)> _connection = new();
private static readonly AsyncLocal<ExecutionRecord> _execution = new();
private static readonly AsyncLocal<FabricService> _fabricService = new();
private static readonly AsyncLocal<Execution> _execution = new();

public static CacheService CacheService => _connection.Value.Item1;
public static NotificationService NotificationService => _connection.Value.Item2;
public static FabricService FabricService => _fabricService.Value!;
public static string Agent => _execution.Value?.Agent!;
public static string Instance => _execution.Value?.Instance!;
public static string Delegate => _execution.Value?.Delegate!;
public static string Execution => _execution.Value?.Execution!;
public static string Execution => _execution.Value?.ExecutionId!;
public static CancellationToken CancellationToken => _execution.Value?.CancellationToken ?? default;

public static void SetConnection(CacheService cacheService, NotificationService notificationService)
public static void SetConnection(FabricService fabricService)
{
SetConnection((cacheService, notificationService));
_fabricService.Value = fabricService;
}

public static void SetConnection((CacheService, NotificationService) connection)
{
_connection.Value = connection;
}

public static Task EnterContext(ExecutionRecord execution, Func<Task> action)
{
return Task.Run(() =>
{
_execution.Value = execution;
return action();
});
}

public static Task<T> EnterContext<T>(ExecutionRecord execution, Func<Task<T>> action)
public static void SetExecution(Execution execution)
{
return Task.Run(() =>
{
_execution.Value = execution;
return action();
});
_execution.Value = execution;
}
}
}
48 changes: 0 additions & 48 deletions agent/dotnet/src/Perper/Extensions/CacheServiceExtensions.cs

This file was deleted.

48 changes: 48 additions & 0 deletions agent/dotnet/src/Perper/Extensions/FabricServiceExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using System;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;

using Perper.Model;
using Perper.Protocol;

namespace Perper.Extensions
{
public static class FabricServiceExtensions
{
public static Task WriteStreamItem<T>(this FabricService fabricService, string stream, T item, bool keepBinary = false)
{
return fabricService.WriteStreamItem(stream, FabricService.CurrentTicks, item, keepBinary);
}

[SuppressMessage("Design", "CA1031: Do not catch general exception types", Justification = "Exception is logged/handled through other means; rethrowing from handler will crash whole application.")]
public static async Task WriteExecutionResultTask(this FabricService fabricService, string call, Task<object?[]> task)
{
try
{
var result = await task.ConfigureAwait(false);
await fabricService.WriteExecutionResult(call, result).ConfigureAwait(false);
}
catch (Exception exception)
{
await fabricService.WriteExecutionException(call, exception).ConfigureAwait(false);
}
}

public static Task WriteExecutionException(this FabricService fabricService, string call, Exception exception)
{
return fabricService.WriteExecutionError(call, exception.Message);
}

public static async Task<object?[]?> ReadExecutionResult(this FabricService fabricService, string call)
{
var (error, result) = await fabricService.ReadExecutionErrorAndResult(call).ConfigureAwait(false);

if (error != null)
{
throw new InvalidOperationException($"Execution failed with error: {error}");
}

return result;
}
}
}
30 changes: 20 additions & 10 deletions agent/dotnet/src/Perper/Extensions/PerperAgentExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

using Perper.Model;
Expand All @@ -11,7 +12,7 @@ public static class PerperAgentExtensions
{
public static async Task<TResult> CallAsync<TResult>(this PerperAgent agent, string functionName, params object?[] parameters)
{
var results = await InternalCallAsync(agent, functionName, parameters).ConfigureAwait(false);
var results = await InternalCallAsync(agent, functionName, parameters, default).ConfigureAwait(false);

if (results is null)
{
Expand All @@ -37,26 +38,35 @@ public static async Task<TResult> CallAsync<TResult>(this PerperAgent agent, str

public static async Task CallAsync(this PerperAgent agent, string actionName, params object?[] parameters)
{
await InternalCallAsync(agent, actionName, parameters).ConfigureAwait(false);
await InternalCallAsync(agent, actionName, parameters, default).ConfigureAwait(false);
}

private static async Task<object?[]?> InternalCallAsync(PerperAgent agent, string @delegate, object?[] parameters)
private static async Task<object?[]?> InternalCallAsync(PerperAgent agent, string @delegate, object?[] parameters, CancellationToken cancellationToken)
{
var execution = CacheService.GenerateName(@delegate);
var execution = FabricService.GenerateName(@delegate);

await AsyncLocals.CacheService.CreateExecution(execution, agent.Agent, agent.Instance, @delegate, parameters).ConfigureAwait(false);
await AsyncLocals.FabricService.CreateExecution(execution, agent.Agent, agent.Instance, @delegate, parameters).ConfigureAwait(false);

await AsyncLocals.NotificationService.WaitExecutionFinished(execution).ConfigureAwait(false);
try
{
await AsyncLocals.FabricService.WaitExecutionFinished(execution, cancellationToken).ConfigureAwait(false);

var results = await AsyncLocals.FabricService.ReadExecutionResult(execution).ConfigureAwait(false);
await AsyncLocals.FabricService.RemoveExecution(execution).ConfigureAwait(false);

var results = await AsyncLocals.CacheService.ReadExecutionResult(execution).ConfigureAwait(false);
await AsyncLocals.CacheService.RemoveExecution(execution).ConfigureAwait(false);
return results;
}
catch (OperationCanceledException)
{
await AsyncLocals.FabricService.RemoveExecution(execution).ConfigureAwait(false);
throw;
}

return results;
}

public static async Task DestroyAsync(this PerperAgent agent)
{
await AsyncLocals.CacheService.RemoveInstance(agent.Instance).ConfigureAwait(false);
await AsyncLocals.FabricService.RemoveInstance(agent.Instance).ConfigureAwait(false);
}
}
}
11 changes: 8 additions & 3 deletions agent/dotnet/src/Perper/Extensions/PerperContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ public static async Task<PerperAgent> StartAgentAsync(string agent, params objec

private static async Task<PerperAgent> CreateInstanceAsync(string agent)
{
var instance = CacheService.GenerateName(agent);
await AsyncLocals.CacheService.CreateInstance(instance, agent).ConfigureAwait(false);
var instance = FabricService.GenerateName(agent);
await AsyncLocals.FabricService.CreateInstance(instance, agent).ConfigureAwait(false);
return new PerperAgent(agent, instance);
}

Expand All @@ -46,9 +46,14 @@ public static PerperStreamBuilder BlankStream()

public static Task CallAsync(string actionName, params object[] parameters) => Agent.CallAsync(actionName, parameters);

public static async Task WriteToBlankStream<TItem>(PerperStream stream, long key, TItem item, bool keepBinary = false)
{
await AsyncLocals.FabricService.WriteStreamItem(stream.Stream, key, item, keepBinary).ConfigureAwait(false);
}

public static async Task WriteToBlankStream<TItem>(PerperStream stream, TItem item, bool keepBinary = false)
{
await AsyncLocals.CacheService.WriteStreamItem(stream.Stream, item, keepBinary).ConfigureAwait(false);
await AsyncLocals.FabricService.WriteStreamItem(stream.Stream, item, keepBinary).ConfigureAwait(false);
}
}
}
15 changes: 13 additions & 2 deletions agent/dotnet/src/Perper/Extensions/PerperState.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Threading.Tasks;

namespace Perper.Extensions
Expand All @@ -6,12 +7,22 @@ public static class PerperState
{
public static async Task<(bool, T)> TryGetAsync<T>(string key)
{
return await AsyncLocals.CacheService.TryGetStateValue<T>(AsyncLocals.Instance, key).ConfigureAwait(false);
return await AsyncLocals.FabricService.TryGetStateValue<T>(AsyncLocals.Instance, key).ConfigureAwait(false);
}

public static Task<T> GetOrDefaultAsync<T>(string key, T @default = default!) => GetOrNewAsync(key, () => @default);

public static Task<T> GetOrNewAsync<T>(string key) where T : new() => GetOrNewAsync(key, () => new T());

public static async Task<T> GetOrNewAsync<T>(string key, Func<T> createFunc)
{
var (success, value) = await TryGetAsync<T>(key).ConfigureAwait(false);
return success ? value : createFunc();
}

public static async Task SetAsync<T>(string key, T value)
{
await AsyncLocals.CacheService.SetStateValue(AsyncLocals.Instance, key, value).ConfigureAwait(false);
await AsyncLocals.FabricService.SetStateValue(AsyncLocals.Instance, key, value).ConfigureAwait(false);
}
}
}
Loading

0 comments on commit d367dde

Please sign in to comment.