theburningmonk / ReactoKinesix

A Rx based .Net client library for working with Amazon Kinesis
http://theburningmonk.github.io/ReactoKinesix/
MIT License
29 stars 5 forks source link

Getting an exception after a while with standard configuration #48

Closed marcosavini closed 10 years ago

marcosavini commented 10 years ago

Hello

I am testing your ReactoKinesiX framework and it appears as it is exactly what I was looking for. However, after a while, I get an exception when running locally.

In a thread I regularly send log messages with my own log4net appender. In parallel I run the processor who simply prints them out. All values are standard. However, after a while, maybe 2 minutes, I regularly get the following exception:

image

I code in C# and the processor is IMHO straightforward:

using ReactoKinesix;
using ReactoKinesix.Model;
using System;
using System.Text;

namespace KinesisLoggerRunner
{
    public class KinesisLoggerRecordProcessor : IRecordProcessor
    {
        public ErrorHandlingMode GetErrorHandlingMode(Record record)
        {
            return ErrorHandlingMode.NewRetryAndSkip(5);
        }

        public void OnMaxRetryExceeded(Record record, ErrorHandlingMode errorHandlingMode)
        {
            Console.WriteLine("Max retries exceeded for record with payload: {0}", Encoding.UTF8.GetString(record.Data));
        }

        public void Process(Record record)
        {
            Console.WriteLine("{0}: Processing record from partition key {1} with this content: {2}", DateTime.Now, record.PartitionKey, Encoding.UTF8.GetString(record.Data));
        }
    }
}

As said, the code works well but throws an exception after a while. I start the processor as follows in my main program:

            var processor = new KinesisLoggerRecordProcessor();
            var app = ReactoKinesixApp.CreateNew(
                ConfigurationManager.AppSettings["AWSAccessKeyId"],
                ConfigurationManager.AppSettings["AWSSecretAccessKey"],
                RegionEndpoint.EUWest1,
                "DEV-KinesisLoggerRunner",
                "DEV-LoggerStream",
                "WORMWOOD",
                processor
            );

Any input is appreciated and if you need more feedback to reproduce, let me know.

Regards Marco

theburningmonk commented 10 years ago

Hey, could you paste the StackTrace in here, it'll probably show me which line this errors on. Which version are you running on?

marcosavini commented 10 years ago

Hi

System.ArgumentException was unhandled by user code
  HResult=-2147024809
  Message=The input sequence was empty.
Parametername: source
  Source=ReactoKinesix
  ParamName=source
  StackTrace:
       bei <StartupCode$ReactoKinesix>.$Client.shareLoad@653-1.Invoke(Result`2 _arg27) in E:\SourceCode\personal\ReactoKinesiX\src\ReactoKinesiX\Client.fs:Zeile 676.
       bei Microsoft.FSharp.Control.AsyncBuilderImpl.args@753.Invoke(a a)
  InnerException: 

I have a German .Net version locally, unfortunately, "Zeile" means "line", therefore in Client.fs, line 676.

theburningmonk commented 10 years ago

Great, that's useful, thanks. I'll take a look at it in the morning.

marcosavini commented 10 years ago

Thank you! As for the used version, I just installed the latest NuGet package yesterday.

theburningmonk commented 10 years ago

Hey, looked into this and it was actually fixed already on the development branch along with bunch of other changes but I didn't get around to documenting them, but most are on the 0.2.0-beta version on nuget.

Do you need something to work with urgently?