ntent / kafka4net

C# client for Kafka
Apache License 2.0
52 stars 32 forks source link

ObjectDisposedException after closing producer #31

Closed vchekan closed 8 years ago

vchekan commented 8 years ago

In certain conditions it is possible to get exception after closing Producer:

2016-05-24 10:11:31.8848 Error [:13] RecoveryTest Unhandled exception System.ObjectDisposedException: Cannot access a disposed object.
   at System.Reactive.Concurrency.EventLoopScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action)
   at kafka4net.Utils.WatchdogScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action) in C:\projects\kafka4net\src\Utils\WatchdogScheduler.cs:line 63
   at System.Reactive.Concurrency.LocalScheduler.Schedule[TState](TState state, Func`3 action)
   at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Action action)
   at kafka4net.Utils.RxSyncContextFromScheduler.Post(SendOrPostCallback d, Object state) in C:\projects\kafka4net\src\Utils\RxSyncContextFromScheduler.cs:line 20
   at System.Threading.Tasks.SynchronizationContextAwaitTaskContinuation.PostAction(Object state)
   at System.Threading.Tasks.AwaitTaskContinuation.RunCallback(ContextCallback callback, Object state, Task& currentTask)
--- End of stack trace from previous location where exception was thrown ---
   at System.Threading.Tasks.AwaitTaskContinuation.<>c.<ThrowAsyncIfNecessary>b__18_0(Object s)
   at System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state)
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()    at System.Reactive.Concurrency.EventLoopScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action)
   at kafka4net.Utils.WatchdogScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action) in C:\projects\kafka4net\src\Utils\WatchdogScheduler.cs:line 63
   at System.Reactive.Concurrency.LocalScheduler.Schedule[TState](TState state, Func`3 action)
   at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Action action)
   at kafka4net.Utils.RxSyncContextFromScheduler.Post(SendOrPostCallback d, Object state) in C:\projects\kafka4net\src\Utils\RxSyncContextFromScheduler.cs:line 20
   at System.Threading.Tasks.SynchronizationContextAwaitTaskContinuation.PostAction(Object state)
   at System.Threading.Tasks.AwaitTaskContinuation.RunCallback(ContextCallback callback, Object state, Task& currentTask)
--- End of stack trace from previous location where exception was thrown ---
   at System.Threading.Tasks.AwaitTaskContinuation.<>c.<ThrowAsyncIfNecessary>b__18_0(Object s)
   at System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state)
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()

The reason is quite tricky. Producer will inform caller about success or failure of individual messages via callbacks OnSuccess and OnPermError. Those callbacks are executed in driver's internal thread loop context. As long as subscriber to those events does not consume too much time, everything is good. But, in case if one create an observable from those events and does await successMessagesObservable, things break down. Observable is executed in driver's internal thread and when Observable is complete, it calls observer's OnComplete. If user code has no synchronization context, TaskScheduler decides to execute continuation in current thread, which cause kafka driver's internal tread exposure to the user's code. Now all await commands will be executed in kafka driver's synchronization context. But driver is already closed and it's EventLoopScheduler is disposed, so any "await" continuation will cause ObjectDisposedException.

dhalfageme commented 6 years ago

I'm using the latest version of the kafka4net NuGet package (2.0.1), which seems to have been realeased on March of 2017 (The version 2.0.0 is the version that fixed this bug on May of 2016). However I'm still experiencing this issue, so I would like to manage it on my own code if possible.

I'm not sure If I understand completely this issue. The solution to fix it would be to include a critical section / lock on producer.Disconnect() and on OnSuccess and OnPermError callback methods ? Thank you

vchekan commented 6 years ago

@dhalfageme could you give more details, stack trace if you can?

dhalfageme commented 6 years ago

Hi @vchekan, I'm doing the following steps:

  1. Connecting a producer and wait for the task to complete
  2. Send some messages in a loop and disconnecting the producer.
  3. Disconnect the producer and wait for the task to complete

I'm doing these tasks several times (every time I have enough messages to get a batch) in a streaming fashion.

I defined OnPermError and OnSuccess callbacks. (Initially I tried to resend the messages if the code reached the OnPermError callback, but even if I removed that code it still failed).

After some loops, the following exception is reaised, which seems to be the same exception of this issue:

Unhandled exception: System.ObjectDisposedException: Cannot access a disposed object.
   en System.Reactive.Concurrency.EventLoopScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action)
   en kafka4net.Utils.WatchdogScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action)
   en System.Reactive.Concurrency.LocalScheduler.Schedule[TState](TState state, Func`3 action)
   en System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Action action)
   en kafka4net.Utils.RxSyncContextFromScheduler.Post(SendOrPostCallback d, Object state)
   en System.Threading.Tasks.SynchronizationContextAwaitTaskContinuation.PostAction(Object state)
   en System.Threading.Tasks.AwaitTaskContinuation.RunCallback(ContextCallback callback, Object state, Task& currentTask)
--- End of stack trace from previous location where exception was thrown ---
   en System.Threading.Tasks.AwaitTaskContinuation.<>c.<ThrowAsyncIfNecessary>b__18_0(Object s)
   en System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state)
   en System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   en System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   en System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
   en System.Threading.ThreadPoolWorkQueue.Dispatch()
   en System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()

The exception is thrown after closed the producer, but the code stills continue executing (looking at the log messages), and then the application breaks down, so it seems some kind of asynchronous fail, ¿maybe in one of the callbacks?

I wrote below my code (the irrelevant parts have been removed):

            try
            {
                producer = new KafkaProducer("myTopic");

                producer.Connect()

                foreach (Message message in messageBatch)
                {
                    mLog.DebugFormat("Processing file {0}", message.OutputPath);
                    producer.SendToKafka(
                        System.Environment.TickCount.ToString(),
                        SliverImageSerializer.ConvertToJSON(message),
                        message.OutputPath);
                }
            }
            catch (Exception e)
            {
                mLog.ErrorFormat("An error occurred while exporting messages: {0}", e.Message);
                mLog.DebugFormat("Stacktrace: {0}", e.StackTrace);
            }
            finally
            {
                if (producer != null)
                    producer.Disconnect();
            }

    public class KafkaProducer
    {
        static long messagesSent = 0;

        public KafkaProducer(string topic)
        {
            mProducer = new Producer(KAFKA_BROKERS, GetProducerConfiguration(topic));

            mProducer.OnPermError += (exception, messages) =>
            {
                {
                    mLog.Error($"Failed to write {messages.Length} messages because of {exception.Message}");
                }  
            };

            mProducer.OnSuccess += messages =>
            {
                {
                    messagesSent += messages.Length;
                    mLog.Info($"Sent {messages.Length} messages. Total sent {messagesSent}");
                }
            };
        }

        public bool Connect()
        {
            try
            {
                Task task = mProducer.ConnectAsync();
                task.Wait();
                return true;
            }
            catch (Exception e)
            {
                mLog.ErrorFormat("Error connection kafka producer: {0}. Disconnecting...", e.Message);
                Disconnect();
                return false;
            }
        }

        public void Disconnect()
        {
            try
            {
                Task task =  mProducer.CloseAsync(TimeSpan.FromMinutes(10));
                task.Wait();
            }
            catch (Exception e)
            {
                mLog.ErrorFormat("Error disconnecting kafka producer: {0}", e.Message);
            }
        }

        public void SendToKafka(string messageKey, string message, string file)
        {
            try
            {
                ProduceMessage(DateTime.Now.ToString(), message);
            }
            catch (Exception e)
            {
                mLog.ErrorFormat("Couldn't send Kafka message with key {0} for file {1}. Error message {2}",
                    messageKey, file, e.Message);
                mLog.DebugFormat("Stacktrace: {0}", e.StackTrace);
            }
        }

        void ProduceMessage(string key, string message)
        {
            mProducer.Send(new Message{
                Value = Encoding.UTF8.GetBytes(message),
                Key = Encoding.UTF8.GetBytes(key)
            });
        }

        Producer mProducer;

        static ILog mLog = log4net.LogManager.GetLogger("KafkaProdcer");
        const string KAFKA_BROKERS = "10.250.17.242:9092,10.250.17.241:9092";
    }

Thank you in advance

dhalfageme commented 6 years ago

Hi again,

I checked that the error happens even If y don't set the OnSucess and OnPermError callbacks

vchekan commented 6 years ago

Hmm, too complex stack trace to figure it out without debugging. The only thing I can spot is that you use Task.Wait(), which is blocking call. Just for sake of experiment, can ou turn your code into async?

dhalfageme commented 6 years ago

It happens again! (This time running the disconnect asynchronously, but again using the callback methods)