monix / monix-kafka

Monix integration with Kafka
Apache License 2.0
123 stars 38 forks source link

Detecting Consumer Failures #20

Open jedossa opened 6 years ago

jedossa commented 6 years ago

I have a case with an unpredictable delay in the processing time of messages, following Kafka consumer documentation I modified KafkaConsumerObservable.runLoop as follows:

def runLoop(consumer: KafkaConsumer[K, V]): Task[Unit] = {
      val ackTask: Task[Ack] = Task.unsafeCreate { (context, cb) =>
        implicit val s = context.scheduler
        s.executeAsync { () =>
          context.frameRef.reset()
          val ackFuture =
            try consumer.synchronized {
              if (context.connection.isCanceled) Stop
              else {
                val next = blocking(consumer.poll(pollTimeoutMillis))
                // Pasue partition
                blocking(consumer.pause(consumer.assignment()))
                Observer.feed(out, next.asScala)(out.scheduler)
              }
            } catch {
              case NonFatal(ex) =>
                Future.failed(ex)
            }

          ackFuture.syncOnComplete {
            case Success(ack) =>
              var streamErrors = true
              try consumer.synchronized {
                if (context.connection.isCanceled) {
                  streamErrors = false
                  cb.asyncOnSuccess(Stop)
                } else {
                  // Resume partition and commit offset
                  consumer.resume(consumer.assignment())
                  consumerCommit(consumer)
                  streamErrors = false
                  cb.asyncOnSuccess(ack)
                }
              } catch {
                case NonFatal(ex) =>
                  if (streamErrors) cb.asyncOnError(ex)
                  else s.reportFailure(ex)
              }

            case Failure(ex) =>
              cb.asyncOnError(ex)
          }
        }
      }

      ackTask.flatMap {
        case Stop     => Task.unit
        case Continue => runLoop(consumer)
      }
    }

Is possible to handle this use case without modify the exposed observable ? Is worth it to create a PR for this case ?

Thanks!

leandrob13 commented 6 years ago

@jedossa Can you elaborate a bit more on the justification of these changes?

jedossa commented 6 years ago

I just want to explore an option for use cases where message processing time varies unpredictably. In these cases -according to Kafka docs- is recommended to move message processing to another thread, continue calling poll, disable automatic commits and pause the partition. But I am not 100% sure that this changes are the solution for this use cases.