linkedin / brooklin

An extensible distributed system for reliable nearline data streaming at scale
BSD 2-Clause "Simplified" License
922 stars 137 forks source link

Added a timeout on the producer flush call in KafkaMirrorMakerConnectorTask #957

Open jzakaryan opened 1 year ago

jzakaryan commented 1 year ago

Even in flushness mode BMM's tasks do a flush call on the producer when shutting down. We have observed that producer flush call tends to get indefinitely stuck and this keeps the tasks from shutting down gracefully. The code change in this PR addresses this by wrapping the producer flush call in a future and blocking on that future with a timeout.

If the producer flush doesn't complete in the given timeout window, the task will proceed to committing safe offsets and shutting down. The timeout window is exposed through a configuration property.

jzakaryan commented 1 year ago

why shouldn't we just use the producer config offset.flush.timeout.ms instead of creating a newer config to wait on ?

ref: https://kafka.apache.org/21/documentation.html#producerconfigs

@shrinandthakkar the problem is that the the tasks were found to be stuck on flush after 60 seconds despite the default value of 5 seconds for offset.flush.timeout.ms. Connector logs show that it interrupted the producer.flush to shut down the task. We can reach out to Kafka team and ask if the behavior of LKC wrt producer flush and config values are the same as in Apache Kafka. But that can happen in parallel with us addressing it.

shrinandthakkar commented 1 year ago

why shouldn't we just use the producer config offset.flush.timeout.ms instead of creating a newer config to wait on ? ref: https://kafka.apache.org/21/documentation.html#producerconfigs

@shrinandthakkar the problem is that the the tasks were found to be stuck on flush after 60 seconds despite the default value of 5 seconds for offset.flush.timeout.ms. Connector logs show that it interrupted the producer.flush to shut down the task. We can reach out to Kafka team and ask if the behavior of LKC wrt producer flush and config values are the same as in Apache Kafka. But that can happen in parallel with us addressing it.

@jzakaryan Within the EventProducer's flush call, I think we already have a configuration defined (ref). And the value of that flush timeout config is INT_MAX ? Is that why we are waiting forever ?

Do you think if we should rather try to reconfigure that value for MM clusters ?