zio / zio-kafka

A Kafka client for ZIO and ZIO Streams
https://zio.dev/zio-kafka
Apache License 2.0
334 stars 133 forks source link

Best practice for recovering a consumer ZStream after any error so messages keep getting consumed #1265

Closed Andycharalambous closed 1 week ago

Andycharalambous commented 1 week ago

I have an application that quite often stops processing messages from a topic and when I check the logs the reason is usually a CommitTimeout that brought the stream to a close. I have tried various ways to recover from this but none ever work 100% of the time.

It would be good to have some documentation on what the best way to do this is. For example, I've tried using a .retry(Schedule.forever) to no avail - even though "retry", according to the docs, "This retries the entire stream, so will re-execute all of the stream's acquire operations.".

I have also tried :


def createPartitionedStream(topic: String): ZStream[..] = Consumer
  .partitionedStream(...)
  .flatMapPar(10){ case (_, partitionStream) => 
                                       partitionStream
                                          .tap(..)
                                          .map(_.offet)
                                          .retry(Schedule.forever) 
   }
  .aggregateAsync(Consumer.offsetBatches)
  .mapZIO(_.commitOrRetry)
  .orElse(createPartitionedStream(topic)

createPartitionedStream("topic").drain

and while this does execute the code to create the ZStream again, no messages are ever produced again by the new ZStream at this point.

What is the prescribed way to deal with errors that cause the ZStream to fail?

Thanks

Andycharalambous commented 1 week ago

I have logging in place and have determined that while the use of orElse DOES create a new "partitioned stream" just fine, flatMapPar never executes and no partition streams are created, one per partition, like it does the first time Consumer.partitionedStream is called.

erikvanoosten commented 1 week ago

You should try to prevent the commit timeouts, for example by checking your network, maybe tweaking some timeouts. I do not have experience with this situation so I cannot advice more on that. Perhaps the kafka-user mailinglist will be more helpful.

I see you're already using .commitOrRetry, though in the example no schedule is passed (so it would not compile). Here is what we usually do at my work: .commitOrRetry(Schedule.exponential(100.millis) && Schedule.upTo(4.minutes)), but you will have to verify if it makes sense for your environment.

Anyways, this works for restarting consuming:

val runKafkaConsumer =
  Consumer.make(consumerSettings).flatMap { consumer =>
    consumer   // <-- Note: all lower case
     .partitionedStream(...)
     ...
  }

ZIO
  .scoped(runKafkaConsumer)
  .retry(Schedule.exponential(100.millis) && Schedule.upTo(4.minutes))

It shuts down the entire consumer and starts a new one. This will lead to some duplicate processing since not all processed offsets might have been committed. Preventing commit timeout is important.

Andycharalambous commented 1 week ago

Thanks @erikvanoosten - i'll give the scoped Consumer a try - I dont seem to have much control over the commit timeouts - I've tried using commitOrRetry with a Schedule policy (I ommitted from my sample code for brevity - my intent was not for the code to compile - it was just to show the general pattern I was using) but it still ultimately times out. I don't think the exceptions being thrown are of the retriable variety. I'm using Azure Eventhub with kafka and the commit timeouts have only started happening with relative frequency recently, and not for all topics or clusters, so it's a bit of a crap shoot - not sure what else I can do except let it fail and restart - reprocessing the messages in our case is acceptable.

erikvanoosten commented 1 week ago

I'm using Azure Eventhub with kafka and the commit timeouts have only started happening with relative frequency recently, and not for all topics or clusters, so it's a bit of a crap shoot

Auch, that doesn't sound nice. Perhaps the Azure people can help.