This patch solves a problem on the agent when pulsar is temporarily unavailable and then recovers, task with partial failures could hang forever. Here is a sample timeline:
[T1] Pulsar endpoint is not reachable due to a network problem
[T2] A CDC enabled table continues to receive mutations. All mutations are sent to the cdc_raw for async processing
[T3] Agent picks up the *.idx files form the cdc_raw and submits to the pendingTasks queue for execution. Although there is logical 1:1 mapping between a Task and a segment (represented by a pair of *.idx and *.log files in the commit log dir, the segment file is mutable. With each segment mutation, a new task is submitted for processing. [T4] Task starts, it will read the log files and starts sending mutations one by one, up to available permits in the inflightMessagesSemaphore[T5] Because pulsar is not available, each task will wait forever until pulsar is back, there is controlled by the finish() logic
[T6] pulsar becomes reachable again
[T7] there are two possibilities depending on what happen in the inflight pulsar requests:
If all of them finish successfully, the finish() logic will unblock and the task is complete. This can happen because the pulsar client is configured to run forever - no issues here.
If there are other pending tasks already wait (because of the mutable nature of *.idx files mentioned above), then the current task instance will not be resubmitted as the other pending task will do. This is a lucky case that will cause no issue, but it will actually mask the next issue
This patch solves a problem on the agent when pulsar is temporarily unavailable and then recovers, task with partial failures could hang forever. Here is a sample timeline: [T1] Pulsar endpoint is not reachable due to a network problem [T2] A CDC enabled table continues to receive mutations. All mutations are sent to the
cdc_raw
for async processing [T3] Agent picks up the*.idx
files form thecdc_raw
and submits to thependingTasks
queue for execution. Although there is logical 1:1 mapping between aTask
and asegment
(represented by a pair of*.idx
and*.log
files in the commit log dir, the segment file is mutable. With each segment mutation, a new task is submitted for processing.[T4] Task starts, it will read the log files and starts sending mutations one by one, up to available permits in the
inflightMessagesSemaphore
[T5] Because pulsar is not available, each task will wait forever until pulsar is back, there is controlled by the finish() logic [T6] pulsar becomes reachable again [T7] there are two possibilities depending on what happen in the inflight pulsar requests:org.apache.pulsar.client.api.PulsarClientException$AuthenticationException: Failed to authenticate
like the ones observed in dev andPulsarClientException: java.util.concurrent.CompletionException: java.net.UnknownHostException
like the one observed in prod, that task will partially fail, and attempt to resubmit it self here: https://github.com/datastax/cdc-apache-cassandra/blob/7912d85c2be16c7ba9560109b95404e71e13a83d/agent/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderService.java#L283 There are again two possibilities here:*.idx
files mentioned above), then the current task instance will not be resubmitted as the other pending task will do. This is a lucky case that will cause no issue, but it will actually mask the next issueA thread dump was taken that is inline with the above findings, here is the interesting part:
A unit test is added to mimic what happened during the thread dump as close as possible
Fill thread dump: https://jstack.review/?https://gist.github.com/aymkhalil/237de69bb919d13413df30c77570fd93