Skip to content

Commit

Permalink
feat: Unit test mocks for IState, IStream, IContext, and IAgent
Browse files Browse the repository at this point in the history
Addresses part of #55
  • Loading branch information
bojidar-bg committed Jan 19, 2021
1 parent 63e3865 commit ea5b328
Show file tree
Hide file tree
Showing 9 changed files with 550 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Threading.Tasks;
using Apache.Ignite.Core.Client;
using Perper.WebJobs.Extensions.Services;

namespace Perper.WebJobs.Extensions.Model
Expand All @@ -11,17 +10,15 @@ public class Agent : IAgent
public string AgentDelegate { get; set; }

[NonSerialized] private IContext _context;
[NonSerialized] private IIgniteClient _ignite;

[PerperInject]
protected Agent(IContext context, IIgniteClient ignite)
protected Agent(IContext context)
{
_context = context;
_ignite = ignite;
}

public Agent(string agentName, string agentDelegate, IContext context, IIgniteClient ignite)
: this(context, ignite)
public Agent(string agentName, string agentDelegate, IContext context)
: this(context)
{
AgentName = agentName;
AgentDelegate = agentDelegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class Context : IContext
private readonly PerperBinarySerializer _serializer;
private readonly IState _state;

public IAgent Agent => new Agent(_instance.Agent, _fabric.AgentDelegate, this, _ignite);
public IAgent Agent => new Agent(_instance.Agent, _fabric.AgentDelegate, this);

public Context(FabricService fabric, PerperInstanceData instance, IIgniteClient ignite, PerperBinarySerializer serializer, IState state)
{
Expand All @@ -32,7 +32,7 @@ public Context(FabricService fabric, PerperInstanceData instance, IIgniteClient
var callDelegate = delegateName;

var agentName = GenerateName(agentDelegate);
var agent = new Agent(agentName, agentDelegate, this, _ignite);
var agent = new Agent(agentName, agentDelegate, this);

var result = await agent.CallFunctionAsync<TResult>(callDelegate, parameters);

Expand Down Expand Up @@ -72,7 +72,7 @@ public async Task InitializeStreamFunctionAsync<TItem>(IStream<TItem> stream, ob
streamInstance.FunctionName = null;
}

public async Task<(IStream<TItem>, string)> CreateBlankStreamAsync<TItem>(StreamFlags flags = StreamFlags.Ephemeral)
public async Task<(IStream<TItem>, string)> CreateBlankStreamAsync<TItem>(StreamFlags flags = StreamFlags.Default)
{
var streamName = GenerateName();
await CreateStreamAsync(streamName, StreamDelegateType.External, "", null, typeof(TItem), flags);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
<ItemGroup>
<Compile Include="Model/**/*.cs" />
<Compile Include="Services/**/*.cs" />
<Compile Include="Test/**/*.cs" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'netcoreapp3.1'">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ public class PerperBinarySerializer : IBinarySerializer
[PerperData(Name = "<null>")]
public struct NullPlaceholder { };

private readonly IServiceProvider _services;
private IBinary _binary = default!;
private readonly IServiceProvider? _services;
private IBinary? _binary = null;

public PerperBinarySerializer(IServiceProvider services)
public PerperBinarySerializer(IServiceProvider? services)
{
_services = services;
}
Expand Down Expand Up @@ -157,7 +157,7 @@ private TypeData GetTypeData(Type type)
}
return serialized;
}
case var anonymous when PerperTypeUtils.IsAnonymousType(value.GetType()):
case var anonymous when PerperTypeUtils.IsAnonymousType(value.GetType()) && _binary != null:
{
var anonymousType = anonymous.GetType();

Expand Down Expand Up @@ -329,7 +329,7 @@ public void ReadBinary(object obj, IBinaryReader reader)

if (typeData.Constructor != null)
{
var parameters = typeData.Constructor.GetParameters().Select(p => _services.GetService(p.ParameterType)).ToArray();
var parameters = typeData.Constructor.GetParameters().Select(p => _services?.GetService(p.ParameterType)).ToArray();
typeData.Constructor.Invoke(obj, parameters);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
using System;
using System.Collections.Generic;
using System.Threading.Channels;
using System.Threading.Tasks;
using Perper.WebJobs.Extensions.Model;
using Perper.WebJobs.Extensions.Services;

namespace Perper.WebJobs.Extensions.Test
{
public class TestEnvironment
{
private Dictionary<(string, string), Func<object?, TestInstance, Task<object?>>> functions = new Dictionary<(string, string), Func<object?, TestInstance, Task<object?>>>();
private Dictionary<(string, string), object?> storedState = new Dictionary<(string, string), object?>();
private Dictionary<string, Func<object?, ValueTask>> blankStreams = new Dictionary<string, Func<object?, ValueTask>>();

private PerperBinarySerializer? _serializer;

public TestEnvironment()
{
_serializer = new PerperBinarySerializer(null);
}

public TestInstance CreateAgent(string agentDelegate)
{
return new TestInstance(agentDelegate, GenerateName(agentDelegate), this);
}

private void _RegisterFunction<TParams, TResult>(string agentDelegate, string functionDelegate, Func<TParams, TestInstance, Task<TResult>> function)
{
functions[(agentDelegate, functionDelegate)] = async (input, instance) => Serialize(await function(Deserialize<TParams>(input), instance));
}

public void RegisterFunction<TParams, TResult>(string agentDelegate, string functionDelegate, Func<TParams, TestInstance, Task<TResult>> function)
=> _RegisterFunction<TParams, TResult>(agentDelegate, functionDelegate, function);

public void RegisterFunction<TParams, TResult>(string agentDelegate, string functionDelegate, Func<TParams, TestInstance, TResult> function)
=> _RegisterFunction<TParams, TResult>(agentDelegate, functionDelegate, (input, context) => Task.FromResult(function(input, context)));

public void RegisterFunction<TParams, TResult>(string agentDelegate, string functionDelegate, Func<TParams, Task<TResult>> function)
=> _RegisterFunction<TParams, TResult>(agentDelegate, functionDelegate, (input, _context) => function(input));

public void RegisterFunction<TParams, TResult>(string agentDelegate, string functionDelegate, Func<TParams, TResult> function)
=> _RegisterFunction<TParams, TResult>(agentDelegate, functionDelegate, (input, _context) => Task.FromResult(function(input)));

public void RegisterFunction<TResult>(string agentDelegate, string functionDelegate, Func<TestInstance, Task<TResult>> function)
=> _RegisterFunction<object?, TResult>(agentDelegate, functionDelegate, (_input, context) => function(context));

public void RegisterFunction<TResult>(string agentDelegate, string functionDelegate, Func<TestInstance, TResult> function)
=> _RegisterFunction<object?, TResult>(agentDelegate, functionDelegate, (_input, context) => Task.FromResult(function(context)));

public void RegisterFunction<TResult>(string agentDelegate, string functionDelegate, Func<Task<TResult>> function)
=> _RegisterFunction<object?, TResult>(agentDelegate, functionDelegate, (_input, _context) => function());

public void RegisterFunction<TResult>(string agentDelegate, string functionDelegate, Func<TResult> function)
=> _RegisterFunction<object?, TResult>(agentDelegate, functionDelegate, (_input, _context) => Task.FromResult(function()));

//
public void RegisterFunction<TParams>(string agentDelegate, string functionDelegate, Func<TParams, TestInstance, Task> function)
=> _RegisterFunction<TParams, object?>(agentDelegate, functionDelegate, async (input, context) => { await function(input, context); return null; });

public void RegisterFunction<TParams>(string agentDelegate, string functionDelegate, Action<TParams, TestInstance> function)
=> _RegisterFunction<TParams, object?>(agentDelegate, functionDelegate, (input, context) => { function(input, context); return Task.FromResult<object?>(null); });

public void RegisterFunction<TParams>(string agentDelegate, string functionDelegate, Func<TParams, Task> function)
=> _RegisterFunction<TParams, object?>(agentDelegate, functionDelegate, async (input, _context) => { await function(input); return null; });

public void RegisterFunction<TParams>(string agentDelegate, string functionDelegate, Action<TParams> function)
=> _RegisterFunction<TParams, object?>(agentDelegate, functionDelegate, (input, _context) => { function(input); return Task.FromResult<object?>(null); });

public void RegisterFunction(string agentDelegate, string functionDelegate, Func<TestInstance, Task> function)
=> _RegisterFunction<object?, object?>(agentDelegate, functionDelegate, async (_input, context) => { await function(context); return null; });

public void RegisterFunction(string agentDelegate, string functionDelegate, Action<TestInstance> function)
=> _RegisterFunction<object?, object?>(agentDelegate, functionDelegate, (_input, context) => { function(context); return Task.FromResult<object?>(null); });

public void RegisterFunction(string agentDelegate, string functionDelegate, Func<Task> function)
=> _RegisterFunction<object?, object?>(agentDelegate, functionDelegate, async (_input, _context) => { await function(); return null; });

public void RegisterFunction(string agentDelegate, string functionDelegate, Action function)
=> _RegisterFunction<object?, object?>(agentDelegate, functionDelegate, (_input, _context) => { function(); return Task.FromResult<object?>(null); });

internal async Task<TResult> CallAsync<TParams, TResult>(TestInstance instance, string functionDelegate, TParams parameters)
{
var input = Serialize(parameters);
var function = functions[(instance.AgentDelegate, functionDelegate)];
var result = await function(input, instance);
return Deserialize<TResult>(result);
}

public IStream<T> CreateStream<T>(IAsyncEnumerable<T> source)
{
var streamName = GenerateName();
return new AsyncEnumerableTestStream<T>(streamName, this, source);
}

public IStream<T> CreateStream<T>(IEnumerable<T> source)
{
#pragma warning disable CS1998
async IAsyncEnumerable<T> helper()
{
foreach (var item in source)
{
yield return item;
}
}
#pragma warning restore CS1998

return CreateStream(helper());
}

public (IStream<T>, string) CreateBlankStream<T>()
{
var channel = Channel.CreateUnbounded<T>();
var stream = CreateStream(channel.Reader.ReadAllAsync());
var name = ((TestStream)stream).StreamName;
blankStreams.Add(name, (item) => channel.Writer.WriteAsync(Deserialize<T>(item)));
return (stream, name);
}

public async Task WriteToBlankStream<T>(string name, T value)
{
var input = Serialize(value);
await blankStreams[name](input);
}

internal object? Serialize<T>(T value)
{
return _serializer != null ? _serializer.Serialize(value) : value;
}

internal T Deserialize<T>(object? serialized)
{
return (T)(_serializer != null ? _serializer.Deserialize(serialized, typeof(T)) : serialized)!;
}

internal string GenerateName(string? baseName = null)
{
return $"{baseName}-{Guid.NewGuid()}";
}
}
}
138 changes: 138 additions & 0 deletions functions/runtime/src/Perper.WebJobs.Extensions/Test/TestInstance.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using Perper.WebJobs.Extensions.Model;

namespace Perper.WebJobs.Extensions.Test
{
public class TestInstance : IAgent, IContext, IState
{
public string AgentDelegate { get; set; }
public string AgentName { get; set; }

private readonly TestEnvironment _environment;

public TestInstance(string agentDelegate, string agentName, TestEnvironment environment)
{
AgentDelegate = agentDelegate;
AgentName = agentName;
_environment = environment;
}

#region State
public class TestStateEntry<T> : IStateEntry<T>
{
public T Value { get; set; } = default(T)!;

public Func<T> DefaultValueFactory = () => default(T)!;
public string Name { get; private set; }

private TestInstance _instance;

public TestStateEntry(TestInstance instance, string name, Func<T> defaultValueFactory)
{
_instance = instance;
Name = name;
DefaultValueFactory = defaultValueFactory;
}

public async Task Load()
{
Value = await _instance.GetValue<T>(Name, DefaultValueFactory);
}

public Task Store()
{
return _instance.SetValue<T>(Name, Value);
}
}

public ConcurrentDictionary<string, object?> Values = new ConcurrentDictionary<string, object?>();

public Task<T> GetValue<T>(string key, Func<T> defaultValueFactory)
{
return Task.FromResult(_environment.Deserialize<T>(Values.GetOrAdd(key, _k => defaultValueFactory())));
}

public Task SetValue<T>(string key, T value)
{
Values[key] = _environment.Serialize(value);
return Task.CompletedTask;
}

async Task<IStateEntry<T>> IState.Entry<T>(string key, Func<T> defaultValueFactory)
{
var entry = new TestStateEntry<T>(this, key, defaultValueFactory);
await entry.Load();
return entry;
}
#endregion State

#region Agent
public Task<TResult> CallFunctionAsync<TResult>(string functionName, object? parameters = default)
{
return _environment.CallAsync<object?, TResult>(this, functionName, parameters);
}

public Task CallActionAsync(string actionName, object? parameters = default)
{
return CallFunctionAsync<object?>(actionName, parameters);
}
#endregion Agent

#region Context
IAgent IContext.Agent => this;

public async Task<(IAgent, TResult)> StartAgentAsync<TResult>(string delegateName, object? parameters = default)
{
var agentDelegate = delegateName;
var callDelegate = delegateName;

var agent = _environment.CreateAgent(delegateName);
var result = await agent.CallFunctionAsync<TResult>(callDelegate, parameters);

return (agent, result);
}

public Task<IStream<TItem>> StreamFunctionAsync<TItem>(string functionName, object? parameters = default, StreamFlags flags = StreamFlags.Default)
{
var streamName = _environment.GenerateName(functionName);

return Task.FromResult<IStream<TItem>>(new FunctionTestStream<TItem>(streamName, _environment, this, functionName, parameters));
}

public Task<IStream> StreamActionAsync(string functionName, object? parameters = default, StreamFlags flags = StreamFlags.Default)
{
var streamName = _environment.GenerateName(functionName);

var task = CallActionAsync(functionName, parameters); // Reference to task is lost!

return Task.FromResult<IStream>(new TestStream(streamName));
}

public IStream<TItem> DeclareStreamFunction<TItem>(string functionName)
{
var streamName = _environment.GenerateName(functionName);

return new FunctionTestStream<TItem>(streamName, _environment, this, functionName, null);
}

public Task InitializeStreamFunctionAsync<TItem>(IStream<TItem> stream, object? parameters = default, StreamFlags flags = StreamFlags.Default)
{
var streamInstance = (FunctionTestStream<TItem>)stream;
if (streamInstance.Parameters != null)
{
throw new InvalidOperationException("Stream is already initialized");
}
streamInstance.Parameters = parameters;

return Task.CompletedTask;
}

public Task<(IStream<TItem>, string)> CreateBlankStreamAsync<TItem>(StreamFlags flags = StreamFlags.Default)
{
return Task.FromResult(_environment.CreateBlankStream<TItem>());
}
#endregion Context
}
}
Loading

0 comments on commit ea5b328

Please sign in to comment.