diff --git a/functions/runtime/src/Perper.WebJobs.Extensions/Model/Agent.cs b/functions/runtime/src/Perper.WebJobs.Extensions/Model/Agent.cs index c3674c78..77cabdbc 100644 --- a/functions/runtime/src/Perper.WebJobs.Extensions/Model/Agent.cs +++ b/functions/runtime/src/Perper.WebJobs.Extensions/Model/Agent.cs @@ -1,6 +1,5 @@ using System; using System.Threading.Tasks; -using Apache.Ignite.Core.Client; using Perper.WebJobs.Extensions.Services; namespace Perper.WebJobs.Extensions.Model @@ -11,17 +10,15 @@ public class Agent : IAgent public string AgentDelegate { get; set; } [NonSerialized] private IContext _context; - [NonSerialized] private IIgniteClient _ignite; [PerperInject] - protected Agent(IContext context, IIgniteClient ignite) + protected Agent(IContext context) { _context = context; - _ignite = ignite; } - public Agent(string agentName, string agentDelegate, IContext context, IIgniteClient ignite) - : this(context, ignite) + public Agent(string agentName, string agentDelegate, IContext context) + : this(context) { AgentName = agentName; AgentDelegate = agentDelegate; diff --git a/functions/runtime/src/Perper.WebJobs.Extensions/Model/Context.cs b/functions/runtime/src/Perper.WebJobs.Extensions/Model/Context.cs index 1e9ae02a..c6a620a9 100644 --- a/functions/runtime/src/Perper.WebJobs.Extensions/Model/Context.cs +++ b/functions/runtime/src/Perper.WebJobs.Extensions/Model/Context.cs @@ -15,7 +15,7 @@ public class Context : IContext private readonly PerperBinarySerializer _serializer; private readonly IState _state; - public IAgent Agent => new Agent(_instance.Agent, _fabric.AgentDelegate, this, _ignite); + public IAgent Agent => new Agent(_instance.Agent, _fabric.AgentDelegate, this); public Context(FabricService fabric, PerperInstanceData instance, IIgniteClient ignite, PerperBinarySerializer serializer, IState state) { @@ -32,7 +32,7 @@ public Context(FabricService fabric, PerperInstanceData instance, IIgniteClient var callDelegate = delegateName; var agentName = GenerateName(agentDelegate); - var agent = new Agent(agentName, agentDelegate, this, _ignite); + var agent = new Agent(agentName, agentDelegate, this); var result = await agent.CallFunctionAsync(callDelegate, parameters); @@ -72,7 +72,7 @@ public async Task InitializeStreamFunctionAsync(IStream stream, ob streamInstance.FunctionName = null; } - public async Task<(IStream, string)> CreateBlankStreamAsync(StreamFlags flags = StreamFlags.Ephemeral) + public async Task<(IStream, string)> CreateBlankStreamAsync(StreamFlags flags = StreamFlags.Default) { var streamName = GenerateName(); await CreateStreamAsync(streamName, StreamDelegateType.External, "", null, typeof(TItem), flags); diff --git a/functions/runtime/src/Perper.WebJobs.Extensions/Perper.WebJobs.Extensions.csproj b/functions/runtime/src/Perper.WebJobs.Extensions/Perper.WebJobs.Extensions.csproj index 74e884b9..0f410221 100644 --- a/functions/runtime/src/Perper.WebJobs.Extensions/Perper.WebJobs.Extensions.csproj +++ b/functions/runtime/src/Perper.WebJobs.Extensions/Perper.WebJobs.Extensions.csproj @@ -59,6 +59,7 @@ + diff --git a/functions/runtime/src/Perper.WebJobs.Extensions/Services/PerperBinarySerializer.cs b/functions/runtime/src/Perper.WebJobs.Extensions/Services/PerperBinarySerializer.cs index 4ab0cb41..084d1800 100644 --- a/functions/runtime/src/Perper.WebJobs.Extensions/Services/PerperBinarySerializer.cs +++ b/functions/runtime/src/Perper.WebJobs.Extensions/Services/PerperBinarySerializer.cs @@ -15,10 +15,10 @@ public class PerperBinarySerializer : IBinarySerializer [PerperData(Name = "")] public struct NullPlaceholder { }; - private readonly IServiceProvider _services; - private IBinary _binary = default!; + private readonly IServiceProvider? _services; + private IBinary? _binary = null; - public PerperBinarySerializer(IServiceProvider services) + public PerperBinarySerializer(IServiceProvider? services) { _services = services; } @@ -157,7 +157,7 @@ private TypeData GetTypeData(Type type) } return serialized; } - case var anonymous when PerperTypeUtils.IsAnonymousType(value.GetType()): + case var anonymous when PerperTypeUtils.IsAnonymousType(value.GetType()) && _binary != null: { var anonymousType = anonymous.GetType(); @@ -329,7 +329,7 @@ public void ReadBinary(object obj, IBinaryReader reader) if (typeData.Constructor != null) { - var parameters = typeData.Constructor.GetParameters().Select(p => _services.GetService(p.ParameterType)).ToArray(); + var parameters = typeData.Constructor.GetParameters().Select(p => _services?.GetService(p.ParameterType)).ToArray(); typeData.Constructor.Invoke(obj, parameters); } diff --git a/functions/runtime/src/Perper.WebJobs.Extensions/Test/TestEnvironment.cs b/functions/runtime/src/Perper.WebJobs.Extensions/Test/TestEnvironment.cs new file mode 100644 index 00000000..7e4ab08b --- /dev/null +++ b/functions/runtime/src/Perper.WebJobs.Extensions/Test/TestEnvironment.cs @@ -0,0 +1,141 @@ +using System; +using System.Collections.Generic; +using System.Threading.Channels; +using System.Threading.Tasks; +using Perper.WebJobs.Extensions.Model; +using Perper.WebJobs.Extensions.Services; + +namespace Perper.WebJobs.Extensions.Test +{ + public class TestEnvironment + { + private Dictionary<(string, string), Func>> functions = new Dictionary<(string, string), Func>>(); + private Dictionary<(string, string), object?> storedState = new Dictionary<(string, string), object?>(); + private Dictionary> blankStreams = new Dictionary>(); + + private PerperBinarySerializer? _serializer; + + public TestEnvironment() + { + _serializer = new PerperBinarySerializer(null); + } + + public TestInstance CreateAgent(string agentDelegate) + { + return new TestInstance(agentDelegate, GenerateName(agentDelegate), this); + } + + private void _RegisterFunction(string agentDelegate, string functionDelegate, Func> function) + { + functions[(agentDelegate, functionDelegate)] = async (input, instance) => Serialize(await function(Deserialize(input), instance)); + } + + public void RegisterFunction(string agentDelegate, string functionDelegate, Func> function) + => _RegisterFunction(agentDelegate, functionDelegate, function); + + public void RegisterFunction(string agentDelegate, string functionDelegate, Func function) + => _RegisterFunction(agentDelegate, functionDelegate, (input, context) => Task.FromResult(function(input, context))); + + public void RegisterFunction(string agentDelegate, string functionDelegate, Func> function) + => _RegisterFunction(agentDelegate, functionDelegate, (input, _context) => function(input)); + + public void RegisterFunction(string agentDelegate, string functionDelegate, Func function) + => _RegisterFunction(agentDelegate, functionDelegate, (input, _context) => Task.FromResult(function(input))); + + public void RegisterFunction(string agentDelegate, string functionDelegate, Func> function) + => _RegisterFunction(agentDelegate, functionDelegate, (_input, context) => function(context)); + + public void RegisterFunction(string agentDelegate, string functionDelegate, Func function) + => _RegisterFunction(agentDelegate, functionDelegate, (_input, context) => Task.FromResult(function(context))); + + public void RegisterFunction(string agentDelegate, string functionDelegate, Func> function) + => _RegisterFunction(agentDelegate, functionDelegate, (_input, _context) => function()); + + public void RegisterFunction(string agentDelegate, string functionDelegate, Func function) + => _RegisterFunction(agentDelegate, functionDelegate, (_input, _context) => Task.FromResult(function())); + + // + public void RegisterFunction(string agentDelegate, string functionDelegate, Func function) + => _RegisterFunction(agentDelegate, functionDelegate, async (input, context) => { await function(input, context); return null; }); + + public void RegisterFunction(string agentDelegate, string functionDelegate, Action function) + => _RegisterFunction(agentDelegate, functionDelegate, (input, context) => { function(input, context); return Task.FromResult(null); }); + + public void RegisterFunction(string agentDelegate, string functionDelegate, Func function) + => _RegisterFunction(agentDelegate, functionDelegate, async (input, _context) => { await function(input); return null; }); + + public void RegisterFunction(string agentDelegate, string functionDelegate, Action function) + => _RegisterFunction(agentDelegate, functionDelegate, (input, _context) => { function(input); return Task.FromResult(null); }); + + public void RegisterFunction(string agentDelegate, string functionDelegate, Func function) + => _RegisterFunction(agentDelegate, functionDelegate, async (_input, context) => { await function(context); return null; }); + + public void RegisterFunction(string agentDelegate, string functionDelegate, Action function) + => _RegisterFunction(agentDelegate, functionDelegate, (_input, context) => { function(context); return Task.FromResult(null); }); + + public void RegisterFunction(string agentDelegate, string functionDelegate, Func function) + => _RegisterFunction(agentDelegate, functionDelegate, async (_input, _context) => { await function(); return null; }); + + public void RegisterFunction(string agentDelegate, string functionDelegate, Action function) + => _RegisterFunction(agentDelegate, functionDelegate, (_input, _context) => { function(); return Task.FromResult(null); }); + + internal async Task CallAsync(TestInstance instance, string functionDelegate, TParams parameters) + { + var input = Serialize(parameters); + var function = functions[(instance.AgentDelegate, functionDelegate)]; + var result = await function(input, instance); + return Deserialize(result); + } + + public IStream CreateStream(IAsyncEnumerable source) + { + var streamName = GenerateName(); + return new AsyncEnumerableTestStream(streamName, this, source); + } + + public IStream CreateStream(IEnumerable source) + { +#pragma warning disable CS1998 + async IAsyncEnumerable helper() + { + foreach (var item in source) + { + yield return item; + } + } +#pragma warning restore CS1998 + + return CreateStream(helper()); + } + + public (IStream, string) CreateBlankStream() + { + var channel = Channel.CreateUnbounded(); + var stream = CreateStream(channel.Reader.ReadAllAsync()); + var name = ((TestStream)stream).StreamName; + blankStreams.Add(name, (item) => channel.Writer.WriteAsync(Deserialize(item))); + return (stream, name); + } + + public async Task WriteToBlankStream(string name, T value) + { + var input = Serialize(value); + await blankStreams[name](input); + } + + internal object? Serialize(T value) + { + return _serializer != null ? _serializer.Serialize(value) : value; + } + + internal T Deserialize(object? serialized) + { + return (T)(_serializer != null ? _serializer.Deserialize(serialized, typeof(T)) : serialized)!; + } + + internal string GenerateName(string? baseName = null) + { + return $"{baseName}-{Guid.NewGuid()}"; + } + } +} \ No newline at end of file diff --git a/functions/runtime/src/Perper.WebJobs.Extensions/Test/TestInstance.cs b/functions/runtime/src/Perper.WebJobs.Extensions/Test/TestInstance.cs new file mode 100644 index 00000000..1278c83c --- /dev/null +++ b/functions/runtime/src/Perper.WebJobs.Extensions/Test/TestInstance.cs @@ -0,0 +1,138 @@ +using System; +using System.Collections.Concurrent; +using System.Threading.Tasks; +using Perper.WebJobs.Extensions.Model; + +namespace Perper.WebJobs.Extensions.Test +{ + public class TestInstance : IAgent, IContext, IState + { + public string AgentDelegate { get; set; } + public string AgentName { get; set; } + + private readonly TestEnvironment _environment; + + public TestInstance(string agentDelegate, string agentName, TestEnvironment environment) + { + AgentDelegate = agentDelegate; + AgentName = agentName; + _environment = environment; + } + + #region State + public class TestStateEntry : IStateEntry + { + public T Value { get; set; } = default(T)!; + + public Func DefaultValueFactory = () => default(T)!; + public string Name { get; private set; } + + private TestInstance _instance; + + public TestStateEntry(TestInstance instance, string name, Func defaultValueFactory) + { + _instance = instance; + Name = name; + DefaultValueFactory = defaultValueFactory; + } + + public async Task Load() + { + Value = await _instance.GetValue(Name, DefaultValueFactory); + } + + public Task Store() + { + return _instance.SetValue(Name, Value); + } + } + + public ConcurrentDictionary Values = new ConcurrentDictionary(); + + public Task GetValue(string key, Func defaultValueFactory) + { + return Task.FromResult(_environment.Deserialize(Values.GetOrAdd(key, _k => defaultValueFactory()))); + } + + public Task SetValue(string key, T value) + { + Values[key] = _environment.Serialize(value); + return Task.CompletedTask; + } + + async Task> IState.Entry(string key, Func defaultValueFactory) + { + var entry = new TestStateEntry(this, key, defaultValueFactory); + await entry.Load(); + return entry; + } + #endregion State + + #region Agent + public Task CallFunctionAsync(string functionName, object? parameters = default) + { + return _environment.CallAsync(this, functionName, parameters); + } + + public Task CallActionAsync(string actionName, object? parameters = default) + { + return CallFunctionAsync(actionName, parameters); + } + #endregion Agent + + #region Context + IAgent IContext.Agent => this; + + public async Task<(IAgent, TResult)> StartAgentAsync(string delegateName, object? parameters = default) + { + var agentDelegate = delegateName; + var callDelegate = delegateName; + + var agent = _environment.CreateAgent(delegateName); + var result = await agent.CallFunctionAsync(callDelegate, parameters); + + return (agent, result); + } + + public Task> StreamFunctionAsync(string functionName, object? parameters = default, StreamFlags flags = StreamFlags.Default) + { + var streamName = _environment.GenerateName(functionName); + + return Task.FromResult>(new FunctionTestStream(streamName, _environment, this, functionName, parameters)); + } + + public Task StreamActionAsync(string functionName, object? parameters = default, StreamFlags flags = StreamFlags.Default) + { + var streamName = _environment.GenerateName(functionName); + + var task = CallActionAsync(functionName, parameters); // Reference to task is lost! + + return Task.FromResult(new TestStream(streamName)); + } + + public IStream DeclareStreamFunction(string functionName) + { + var streamName = _environment.GenerateName(functionName); + + return new FunctionTestStream(streamName, _environment, this, functionName, null); + } + + public Task InitializeStreamFunctionAsync(IStream stream, object? parameters = default, StreamFlags flags = StreamFlags.Default) + { + var streamInstance = (FunctionTestStream)stream; + if (streamInstance.Parameters != null) + { + throw new InvalidOperationException("Stream is already initialized"); + } + streamInstance.Parameters = parameters; + + return Task.CompletedTask; + } + + public Task<(IStream, string)> CreateBlankStreamAsync(StreamFlags flags = StreamFlags.Default) + { + return Task.FromResult(_environment.CreateBlankStream()); + } + #endregion Context + } +} \ No newline at end of file diff --git a/functions/runtime/src/Perper.WebJobs.Extensions/Test/TestStream.cs b/functions/runtime/src/Perper.WebJobs.Extensions/Test/TestStream.cs new file mode 100644 index 00000000..97c62789 --- /dev/null +++ b/functions/runtime/src/Perper.WebJobs.Extensions/Test/TestStream.cs @@ -0,0 +1,199 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Linq.Expressions; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Perper.WebJobs.Extensions.Model; + +namespace Perper.WebJobs.Extensions.Test +{ + public class TestStream : IStream + { + public string StreamName { get; protected set; } + + public TestStream(string streamName) + { + StreamName = streamName; + } + } + + public abstract class TestStream : TestStream, IStream + { + public List StoredData { get; set; } = new List(); + + protected readonly TestEnvironment _environment; + + private HashSet> _channels = new HashSet>(); + private Task? _producerTask = null; + + public TestStream(string streamName, TestEnvironment environment) + : base(streamName) + { + _environment = environment; + } + + private TestStreamAsyncEnumerable GetEnumerable(Func filter, bool replay) + { + return new TestStreamAsyncEnumerable(this, filter, replay); + } + + public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + return GetEnumerable((_) => true, false).GetAsyncEnumerator(cancellationToken); + } + + public IAsyncEnumerable DataLocal() + { + return GetEnumerable((_) => true, false); + } + + public IAsyncEnumerable Filter(Expression> filter, bool dataLocal = false) + { + return GetEnumerable(filter.Compile(), false); + } + + public IAsyncEnumerable Replay(bool dataLocal = false) + { + return GetEnumerable((_) => true, true); + } + + public IAsyncEnumerable Replay(Expression> filter, bool dataLocal = false) + { + return GetEnumerable(filter.Compile(), true); + } + +#pragma warning disable CS1998 + public async IAsyncEnumerable Query(Func, IQueryable> query) + { + foreach (var value in query(StoredData.AsQueryable().Select(x => _environment.Deserialize(x)))) + { + yield return value; + } + } +#pragma warning restore CS1998 + + private ChannelReader GetChannel() + { + if (_producerTask == null) + { + _producerTask = Task.Run(async () => + { + try + { + await foreach (var item in Produce()) + { + var index = StoredData.Count; + StoredData.Add(_environment.Serialize(item)); + + foreach (var channel in _channels) + { + await channel.WriteAsync(index); + } + } + } + catch (Exception e) + { + Console.WriteLine("Exception while executing stream: {0}", e); + throw; + } + }); + } + + var channel = Channel.CreateUnbounded(); + _channels.Add(channel.Writer); + return channel.Reader; + } + + protected abstract IAsyncEnumerable Produce(); + + public class TestStreamAsyncEnumerable : IAsyncEnumerable + { + protected TestStream _stream; + + public Func Filter { get; private set; } + public bool Replay { get; private set; } + + public TestStreamAsyncEnumerable(TestStream stream, Func filter, bool replay) + { + _stream = stream; + Replay = replay; + Filter = filter; + } + + public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken()) + { + return Impl(cancellationToken).GetAsyncEnumerator(cancellationToken); + } + + private async IAsyncEnumerable Impl([EnumeratorCancellation] CancellationToken cancellationToken = default) + { + if (Replay) + { + for (var i = 0; i < _stream.StoredData.Count; i++) // Not using foreach, as StoredData can change asynchronously + { + var value = _stream._environment.Deserialize(_stream.StoredData[i]); + if (Filter(value)) + { + // NOTE: We are not storing state entries here + yield return value; + } + } + } + + // NOTE: Race condition: can miss a few elements while switching from replay to realtime + + await foreach (var i in _stream.GetChannel().ReadAllAsync()) + { + var value = _stream._environment.Deserialize(_stream.StoredData[i]); + + if (Filter(value)) + { + // NOTE: We are not storing state entries here + yield return value; + } + + } + } + } + } + + public class FunctionTestStream : TestStream + { + public TestInstance TestInstance { get; protected set; } + public string FunctionDelegate { get; protected set; } + public object? Parameters { get; set; } + + public FunctionTestStream(string streamName, TestEnvironment environment, TestInstance testInstance, string functionDelegate, object? parameters) + : base(streamName, environment) + { + TestInstance = testInstance; + FunctionDelegate = functionDelegate; + Parameters = parameters; + } + + protected override async IAsyncEnumerable Produce() + { + var producedData = await _environment.CallAsync>(TestInstance, FunctionDelegate, Parameters); + await foreach (var item in producedData) + { + yield return item; + } + } + } + + public class AsyncEnumerableTestStream : TestStream + { + private IAsyncEnumerable _source; + + public AsyncEnumerableTestStream(string streamName, TestEnvironment environment, IAsyncEnumerable source) + : base(streamName, environment) + { + _source = source; + } + + protected override IAsyncEnumerable Produce() => _source; + } +} \ No newline at end of file diff --git a/samples/DotNet.FunctionApp.Test/DotNet.FunctionApp.Test.csproj b/samples/DotNet.FunctionApp.Test/DotNet.FunctionApp.Test.csproj new file mode 100644 index 00000000..8c712363 --- /dev/null +++ b/samples/DotNet.FunctionApp.Test/DotNet.FunctionApp.Test.csproj @@ -0,0 +1,12 @@ + + + + + + + + Exe + netcoreapp3.1 + + + diff --git a/samples/DotNet.FunctionApp.Test/Program.cs b/samples/DotNet.FunctionApp.Test/Program.cs new file mode 100644 index 00000000..8bb50de3 --- /dev/null +++ b/samples/DotNet.FunctionApp.Test/Program.cs @@ -0,0 +1,48 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using DotNet.FunctionApp; +using Perper.WebJobs.Extensions.Model; +using Perper.WebJobs.Extensions.Test; + +namespace DotNet.FunctionApp.Test +{ + class Program + { + static async Task Main(string[] args) + { + var env = new TestEnvironment(); + + env.RegisterFunction("DotNet", "StatefulSum", (int input, TestInstance instance) => { + return Launcher.StatefulSum(input, instance, default); + }); + + var agent = env.CreateAgent("DotNet"); + + Console.WriteLine(await agent.GetValue("StatefulSum")); + + for (var i = 0; i < 100; i++) { + await agent.CallActionAsync("StatefulSum", i); + } + + Console.WriteLine(await agent.GetValue("StatefulSum")); + + var tasks = new List(); + for (var i = 0; i < 10; i ++) + { + tasks.Add(Task.Run(async () => + { + for (var i = 0; i < 100; i++) + { + await agent.CallActionAsync("StatefulSum", i); + } + })); + } + + await Task.WhenAll(tasks); + + Console.WriteLine(await agent.GetValue("StatefulSum")); + } + } +}