Source per partition does not support backoff "restart" #1386

opened 3 years ago

fguerout commented 3 years ago

Versions used

Akka version: "2.6.15" Akka Kafka version: "2.1.0"

Expected Behavior

Using source per partition (Consumer.committablePartitionedSource - https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#source-per-partition) and source restart (RestartSource.onFailuresWithBackoff - https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html#restarting-the-stream-with-a-backoff-stage) : source per partition should be restarted in case of stream failure.

Actual Behavior

Using source per partition (Consumer.committablePartitionedSource - https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#source-per-partition) and source restart (RestartSource.onFailuresWithBackoff - https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html#restarting-the-stream-with-a-backoff-stage) : in case of stream failure, source per partition is restarted but then completed, and data are not consumed anymore from that partition.

Relevant logs

akka.stream.scaladsl.RestartWithBackoffSource [RestartWithBackoffSource(akka://processor)]                                                                          
        Restarting graph due to failure. stack_trace: 
        Caused by: java.util.concurrent.TimeoutException: Ask timed out on
akka.kafka.internal.CommittableSubSourceStageLogic [CommittableSubSourceStageLogic(akka://processor)] 
        [c1832#1] Starting. Partition test-15
akka.kafka.internal.KafkaConsumerActor [akka://processor@] 
        [5881c] RequestMessages from topic/partition Set(test-15) already requested by other stage Set(test-15)
akka.kafka.internal.CommittableSubSourceStageLogic [CommittableSubSourceStageLogic(akka://processor)] 
        [c1832#1] Completing. Partition test-15

Reproducible Test Case

                .mapAsyncUnordered(10, topicToSource ->
                                () -> topicToSource.second()
                                        .mapAsync(1, message ->
                                                        (ActorRef<StatusReply<Done>> replyTo) -> new ProcessMessage("", replyTo),
                                                        .thenApply(done -> message.committableOffset())))
                                .runWith(Committer.sink(CommitterSettings.create(context.getSystem().classicSystem())), context.getSystem()))
                .toMat(Sink.ignore(), Consumer::createDrainingControl)
mwkohout commented 3 years ago

Is there a workaround for this on the current release(2.1.1)?

fguerout commented 3 years ago

Hi @mwkohout , you could rely on "retry pattern" (https://doc.akka.io/docs/akka/current/futures.html#retry) in source processing logic.

mwkohout commented 3 years ago

thanks for your idea @fguerout but either my test case is bad or it's got something that prevents it from rereading a message:

here's a testcase that's executed as part of a Junit 5 TestcontainersKafkaTest subclass:

 ActorSystem testSystem = ActorSystem.apply();
    String topicName = createTopic();
    String groupId = createGroupId();

    AtomicBoolean fail = new AtomicBoolean(true);

    Done published = Source.from(
            new ProducerRecord<String,String>(topicName,"someKey","someValue")
            //        new ProducerRecord<String,String>(topicName,"someKey2","someValue2")

    Source<Pair<TopicPartition, Source<ConsumerMessage.CommittableMessage<String, String>, NotUsed>>, Consumer.Control> source = Consumer.committablePartitionedSource(
        consumerDefaults().withGroupId(groupId).withProperty("auto.offset.reset", "earliest"),

    TestKit tk = new TestKit(testSystem);
    CompletionStage<Done> processedAndCommittedSource = source.mapAsyncUnordered(3, pair -> {
      Callable<CompletionStage<Done>> running =()-> pair.second().map( m-> {
          throw new Exception("fail");
            tk.getTestActor().tell(m.record().value(), ActorRef.noSender());
            return m.committableOffset();
          ).runWith(Sink.ignore(), system);

      return Patterns.retry(running,3,Duration.ofSeconds(10),Duration.ofSeconds(40),0.1, system);
    }).runWith(Sink.ignore(), system);

    var processedValue = tk.receiveOne(Duration.ofSeconds(120));

    assertEquals("someValue", processedValue);

Is there an issue with the way I've written my testcase?

mmatloka commented 3 years ago

Hey, quite recently we have tested in our project a few variants of using RestartSource after/inside of the committablePartitionedSource. Our tests result was that it does not work fully correctly in any variant (e.g. if one item processing failed still next stream elements were being processed). The only way to make it work was to wrap the whole Consumer.committablePartitionedSource in RestartSource.

mwkohout commented 3 years ago

thank you @mmatloka -- that did the trick. I just pulled the whole flow, with the per-partition logic set up inside my mapAsyncUnordered into the restartable source callable. It restarted and reprocessed the messages as I expected.

One question I have is that how are you accessing the control for the flow? Are you using a killswitch instead?
I was using Consumer.Control before(without the restartable source) to start and stop the system in a controlled way and Consumer.Control.isShutdown() as part of the healthcheck for the system(changing an AtomicBoolean to true when the system was shut down via CompletionStage.thenRun(Runnable)).

mmatloka commented 3 years ago

@mwkohout Take a look here: https://github.com/akka/alpakka-kafka/blob/master/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala#L541