Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignite Throwing KeyNotFound Exception, while Function executes properly. #32

Closed
VikVelev opened this issue Jul 15, 2020 · 2 comments
Closed
Labels
bug Something isn't working
Milestone

Comments

@VikVelev
Copy link
Collaborator

Expected Behaviour: Run the module and successfully execute the DebugMonitor.
Actual Behaviour: Executes successfully nothing happens on the client, Ignite throws an exception failing to deploy a service.

Note: Everything works when run without a module.

Ignite Trace:

perper_1  | [07:58:28,085][SEVERE][srvc-deploy-#44][GridServiceProcessor] Failed to initialize service (service will not be deployed): f2eac19f-bf0f-4653-94b7-a5e8b52737f3
perper_1  | class org.apache.ignite.IgniteException: Native platform exception occurred.
perper_1  |     at org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils.java:1029)
perper_1  |     at org.apache.ignite.internal.processors.platform.services.PlatformAbstractService.init(PlatformAbstractService.java:107)
perper_1  |     at org.apache.ignite.internal.processors.service.GridServiceProcessor.redeploy(GridServiceProcessor.java:1364)
perper_1  |     at org.apache.ignite.internal.processors.service.GridServiceProcessor.processAssignment(GridServiceProcessor.java:1988)
perper_1  |     at org.apache.ignite.internal.processors.service.GridServiceProcessor.onSystemCacheUpdated(GridServiceProcessor.java:1615)
perper_1  |     at org.apache.ignite.internal.processors.service.GridServiceProcessor.access$300(GridServiceProcessor.java:126)
perper_1  |     at org.apache.ignite.internal.processors.service.GridServiceProcessor$ServiceEntriesListener$1.run0(GridServiceProcessor.java:1597)
perper_1  |     at org.apache.ignite.internal.processors.service.GridServiceProcessor$DepRunnable.run(GridServiceProcessor.java:2064)
perper_1  |     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
perper_1  |     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
perper_1  |     at java.lang.Thread.run(Thread.java:748)
perper_1  | Caused by: PlatformNativeException [cause=System.Collections.Generic.KeyNotFoundException [idHash=1404929808, hash=-670460937, ClassName=System.Collections.Generic.KeyNotFoundException, Data=null, ExceptionMethod=null, HResult=-2146232969, HelpURL=null, InnerException=null, Message=The given key was not present in the cache., RemoteStackIndex=0, RemoteStackTraceString=null, Source=Apache.Ignite.Core, StackTraceString=   at Apache.Ignite.Core.Impl.Cache.CacheImpl`2.<Get>b__25(IBinaryStream stream, Int64 res)
perper_1  |    at Apache.Ignite.Core.Impl.PlatformJniTarget.InStreamOutLong[TR](Int32 type, Action`1 outAction, Func`3 inAction, Func`2 readErrorAction)
perper_1  |    at Apache.Ignite.Core.Impl.PlatformTargetAdapter.DoOutInOpX[TR](Int32 type, Action`1 outAction, Func`3 inAction, Func`2 inErrorAction)
perper_1  |    at Apache.Ignite.Core.Impl.Cache.CacheImpl`2.Get(TK key)
perper_1  |    at Apache.Ignite.Core.Impl.Cache.CacheImpl`2.get_Item(TK key)
perper_1  |    at Perper.Fabric.Streams.StreamService.Init(IServiceContext context) in /app/src/Perper.Fabric/Streams/StreamService.cs:line 33
perper_1  |    at Apache.Ignite.Core.Impl.Unmanaged.UnmanagedCallbacks.ServiceInit(Int64 memPtr), WatsonBuckets=null]]
perper_1  |     at org.apache.ignite.internal.processors.platform.PlatformContextImpl.createNativeException(PlatformContextImpl.java:550)
perper_1  |     at org.apache.ignite.internal.processors.platform.utils.PlatformUtils.readInvocationResult(PlatformUtils.java:853)
perper_1  |     at org.apache.ignite.internal.processors.platform.services.PlatformAbstractService.init(PlatformAbstractService.java:104)
perper_1  |     ... 9 more

Perper Output:

[7/15/2020 8:07:55 AM] Executing 'santiment-data' (Reason='', Id=1da21615-55b8-4f52-9dbc-76cbfd7c67cb)
[7/15/2020 8:07:55 AM] Started SantimentData.Module
[7/15/2020 8:07:55 AM] Executed 'santiment-data' (Succeeded, Id=1da21615-55b8-4f52-9dbc-76cbfd7c67cb)

Code to reproduce:

A simple azure functions project with the following structure:

  • santiment-data
    --- Streams
    ----- SantimentBaseStream.cs
    ----- DebugMonitor.cs
    --- Module.cs

Module.cs:

namespace SantimentData
{
    public class Module
    {
        [FunctionName("santiment-data")]
        [return: Perper("$return")]
        public async Task<IPerperStream> StartAsync(
            [PerperModuleTrigger(RunOnStartup = true)] PerperModuleContext context,
            [Perper("input")] IPerperStream input,
            CancellationToken cancellationToken,
            ILogger logger)
        {
            logger.LogInformation("Started SantimentData.Module");
            var configPath = Path.Combine(
                Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location),
                "../local.settings.json"
            );

            var santimentQueryConfig = new SantimentQueryConfig();
            var config = new ConfigurationBuilder()
                .AddJsonFile(configPath, optional: false, reloadOnChange: true)
                .AddEnvironmentVariables()
                .Build();
            
            config.Bind("Santiment", santimentQueryConfig );
            
            await using var santimentStream = await context.StreamFunctionAsync(
                nameof(SantimentBaseStream),
                typeof(SantimentBaseStream),
                new { queryConfig = santimentQueryConfig }
            );

            await using var consumer = await context.StreamActionAsync(
                nameof(DebugMonitor),
                typeof(DebugMonitor),
                new { inputStream = santimentStream.Subscribe() }
            );
                
            return santimentStream;
        }
    }
}

DebugMonitor.cs:

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using HatcheryLib.Models.SantimentData;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Perper.WebJobs.Extensions.Config;
using Perper.WebJobs.Extensions.Model;

namespace SantimentData.Streams
{
    public class DebugMonitor
    {
        [FunctionName(nameof(DebugMonitor))]
        public static async Task RunAsync(
            [PerperStreamTrigger] PerperStreamContext context,
            [Perper("inputStream")] IAsyncEnumerable<object> inputStream,
            ILogger logger,
            CancellationToken cancellationToken)
        {
            logger.LogInformation("Started SantimentData.Streams.DebugMonitor");
            
            await foreach (var data in inputStream.WithCancellation(cancellationToken))
            {
                var _data = data as TimeSeriesValue;
                logger.LogInformation($"Date: {_data.DateTime}, Value: {_data.Value}");
            }
        }
    }
}

SantimentBaseStream does a graphql query and has a lot configs and just outputs info, I am guessing this bug can be reproduced with any simple data generator.

namespace SantimentData.Streams
{
    public class SantimentBaseStream
    {
        [FunctionName(nameof(SantimentBaseStream))]
        public async Task Run([PerperStreamTrigger] PerperStreamContext context,
            [Perper("queryConfig")] SantimentQueryConfig queryConfig,
            [Perper("output")] IAsyncCollector<TimeSeriesValue> output,
            CancellationToken cancellationToken,
            ILogger logger)
        {
            
            logger.LogInformation("Starting SantimentData.Streams.SantimentBaseStream");
            
            var client = new GraphQLHttpClient(
                new GraphQLHttpClientOptions{ EndPoint = new Uri(queryConfig.URI) }, 
                new NewtonsoftJsonSerializer());
            
            var query = $@"query {{
                getMetric(metric: ""{queryConfig.Metric}"") {{
                    timeseriesData(
                        slug: ""{queryConfig.Slug}""
                        from: ""{queryConfig.From}""
                        to: ""{queryConfig.To}""
                        interval: ""{queryConfig.Interval}"") {{
                        datetime
                        value
                    }}
                }}
            }}";
            
            var graphQLRequest = new GraphQLRequest { Query = query };
            var response = await client.SendQueryAsync<SantimentMetricResponse<TimeSeriesValue>>(graphQLRequest, cancellationToken);
            var responseData = response.Data.GetMetric.TimeSeriesData.ToList();
            
            logger.LogInformation($"[{queryConfig.Tag}] Fetched {responseData.Count} items.");
            
            var lastData = await context.FetchStateAsync<TimeSeriesValue>() ?? new TimeSeriesValue() { };
            
            foreach (var item in responseData)
            {
                await context.UpdateStateAsync(lastData);
                logger.LogInformation($"[SantimentDataStream] Sending item: {item}.");
                lastData = item;
                await output.AddAsync(item, cancellationToken);
            }
        }
    }
}
@branimirangelov
Copy link
Collaborator

Re-consider the idea of disposable streams as part of #44

@branimirangelov branimirangelov added this to the Release 0.6 milestone Oct 4, 2020
@branimirangelov branimirangelov added the bug Something isn't working label Oct 5, 2020
@bojidar-bg
Copy link
Collaborator

bojidar-bg commented Nov 18, 2021

The StreamService cluster singleton was removed in bfd8ed7, thus this should no longer happen. Closing.

Edit: Just realized the original issue was actually due to the stream disposed through IAsyncDisposable, which was indeed removed with #14.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants