cockroachdb / cockroach

CockroachDB — the cloud native, distributed SQL database designed for high availability, effortless scale, and control over data placement.
https://www.cockroachlabs.com
Other
30.14k stars 3.81k forks source link

changefeedccl: Kafka batch retry might not work correctly; or at all. #90029

Closed miretskiy closed 2 years ago

miretskiy commented 2 years ago

While testing kafka batch retries, the following behavior was observed:

1 topic create with max message size 128K Changefeed created, with somewhat insane configuration:

 create changefeed for tpcc.history into 'kafka://10.11.43.129:9092?topic_name=testTopic' with initial_scan='only', kafka_sin
k_config='{"Flush": {"Messages": 5000, "MaxMessages": 10000, "Frequency": "1s"}}';

Each row in tpcc.history is about 216 bytes -- thus, only ~600 messages could ever fit. The following error messages were observed in the logs:

ccl/changefeedccl/sink_kafka.go:452 ⋮ kafka sink with flush config ({Bytes:0 Messages:5000 Frequency:1s MaxMessages:10000}) beginning internal retry with 5640 inflight messages due to error: ‹while sending message with key=[1034, "1a7c3a46-862f-4300-8000-000001d997eb"], size=272, stats=m=5637/b=1508686/largest=546: kafka server: Message was too large, server rejected it to avoid allocation error.›
ccl/changefeedccl/sink_kafka.go:558 ⋮  kafka sink handling 5641 buffered messages for internal retry
ccl/changefeedccl/sink_kafka.go:587 ⋮ kafka sink retrying 5641 messages with reduced flush config: ({Bytes:0 Messages:2500 Frequency:1s MaxMessages:5000})
ccl/changefeedccl/sink_kafka.go:580 ⋮ 17822  kafka sink abandoning internal retry due to error: ‹kafka: Failed to deliver 5641 messages.›

The last message kafka sink abandoning internal retry due to error: ‹kafka: Failed to deliver 5641 messages.> is particularly disconcerting. If I'm reading the code correctly, when that happens, the error returned up the stack: https://github.com/cockroachdb/cockroach/blob/master/pkg/ccl/changefeedccl/sink_kafka.go#L518-L518

if isRetrying() && s.mu.inflight == 0 {
  if err := s.handleBufferedRetries(retryBuf, retryErr); err != nil {
    s.mu.flushErr = err
  }
  endInternalRetry()
}

So, the good news is that we do set flushErr; the bad news is that we clear out retryBuf (in endInternalRetry) and we clear out retry error:

endInternalRetry := func() {
        retryErr = nil
        retryBuf = nil
    }

So, at this point, we have swallowed 5641 messages, and set flushErr; Now, clearing out retryErr above causes isRetrying function to return false:

isRetrying := func() bool {
        return retryErr != nil
    }

This is very bad because now, new messages can come in -- that's already fatally broken since we might be emitting newer key version never having emitted previous version (e.g. it was among those 5641 messages).

The changefeed not exiting with an error is highly problematic. Best guess, and that's just a guess is that change aggregator does attempt to flush the sink, and it probably does return flushErr, but that error probably gets swallowed here:

if errors.As(err, &e) {
                // ErrBufferClosed is a signal that
                // our kvfeed has exited expectedly.
                err = e.Unwrap()
                if errors.Is(err, kvevent.ErrNormalRestartReason) {
                    err = nil
                }
            } else {

It appears this issue is very severe, and that batching retries are entirely broken.

Unfortunately, it appears that disabling this feature changefeed.batch_reduction_retry_enabled=false does not work either -- for the same reason as above -- it seems we rely on flushErr , which is wrong. I'm probably missing some details here (and I hope I'm wrong!), but I think at this point, we have to consider kafka entirely broken.

Jira issue: CRDB-20557

Epic CRDB-11783

blathers-crl[bot] commented 2 years ago

Hi @miretskiy, please add branch-* labels to identify which branch(es) this release-blocker affects.

:owl: Hoot! I am a Blathers, a bot for CockroachDB. My owner is otan.

blathers-crl[bot] commented 2 years ago

cc @cockroachdb/cdc

miretskiy commented 2 years ago

From @samiskin:

Yeah looking at async_producer.go it seems like Sarama will keep track of whats errored and block emitting it while its retrying, but once its failed it'll just fail the broker and later a new one would get created for that partition and it'd lose the map of what has failed.

I don't know if theres even a way to deal with this rigorously using Sarama, as it looks like its still possible that between the time Sarama does the "run out of retries for k@t1 and mark the broker as failed and put the error in Errors()" and "we see the error on Errors() and set flushErr", we've already done "insert k@t2 into Input -> new brokerproducer is made with no knowledge of the failure -> k@t2 is emitted".

Seems like Sarama only guarantees "Order [within partitions] is preserved even when there are network hiccups and certain messages have to be retried."[1], but doesn't guarantee ordering under messages failing.

miretskiy commented 2 years ago

Removing GA blockers, now that backports merged.