jet / FsKafka

Minimal F# wrappers for Confluent.Kafka+librdkafka.redist 1.x
https://github.com/jet/dotnet-templates
Apache License 2.0
88 stars 17 forks source link

Monitor loop not terminating with exception when consumer is already disposed #74

Closed wantastic84 closed 4 years ago

wantastic84 commented 4 years ago

When the consumer is disposed and the monitor tries to get the assignment info from the consumer, an exception is thrown but the monitor loop does not terminate. The loop continues to increment the fail count and hangs, potentially make the client code hangs as well.

The following exception is thrown from FsKafka0:

Confluent.Kafka.KafkaException: Local: Broker handle destroyed
   at Confluent.Kafka.Impl.SafeKafkaHandle.GetAssignment()
   at FsKafka.MonitorImpl.getAssignedPartitions@249-1.GenerateNext(IEnumerable`1& next) in D:\a\1\s\src\FsKafka\Monitor.fs:line 249
   at Microsoft.FSharp.Core.CompilerServices.GeneratedSequenceBase`1.MoveNextImpl() in F:\workspace\_work\1\s\src\fsharp\FSharp.Core\seqcore.fs:line 371
   at Microsoft.FSharp.Core.CompilerServices.GeneratedSequenceBase`1.System-Collections-IEnumerator-MoveNext() in F:\workspace\_work\1\s\src\fsharp\FSharp.Core\seqcore.fs:line 403
   at Microsoft.FSharp.Collections.SetTreeModule.mkFromEnumerator[a](IComparer`1 comparer, SetTree`1 acc, IEnumerator`1 e) in F:\workspace\_work\1\s\src\fsharp\FSharp.Core\set.fs:line 499
   at Microsoft.FSharp.Collections.SetTreeModule.ofSeq[a](IComparer`1 comparer, IEnumerable`1 c) in F:\workspace\_work\1\s\src\fsharp\FSharp.Core\set.fs:line 505
   at Microsoft.FSharp.Collections.FSharpSet`1..ctor(IEnumerable`1 elements) in F:\workspace\_work\1\s\src\fsharp\FSharp.Core\set.fs:line 738
   at FsKafka.MonitorImpl.validateAssignments@253.Invoke(Unit unitVar0) in D:\a\1\s\src\FsKafka\Monitor.fs:line 254
   at FsKafka.MonitorImpl.loop@273-10.Invoke(Unit unitVar) in D:\a\1\s\src\FsKafka\Monitor.fs:line 273
   at Microsoft.FSharp.Control.AsyncPrimitives.CallThenInvoke[T,TResult](AsyncActivation`1 ctxt, TResult result1, FSharpFunc`2 part2) in F:\workspace\_work\1\s\src\fsharp\FSharp.Core\async.fs:line 398
   at <StartupCode$FSharp-Core>.$Async.Return@1066.Invoke(AsyncActivation`1 ctxt) in F:\workspace\_work\1\s\src\fsharp\FSharp.Core\async.fs:line 1066
   at Microsoft.FSharp.Control.Trampoline.Execute(FSharpFunc`2 firstAction) in F:\workspace\_work\1\s\src\fsharp\FSharp.Core\async.fs:line 109