cockroachdb / cockroach

CockroachDB — the cloud native, distributed SQL database designed for high availability, effortless scale, and control over data placement.
https://www.cockroachlabs.com
Other
29.96k stars 3.79k forks source link

changefeedccl: kafka changefeeds may send resolved messages to only some partitions #122666

Open andyyang890 opened 5 months ago

andyyang890 commented 5 months ago

After https://github.com/cockroachdb/cockroach/commit/361c142f6a1dacdf91a8efeac181bd6934593efc merged, Kafka changefeeds could sometimes send resolved messages to only some instead of all partitions for a topic.

The reason for this is that we (arguably incorrectly) use the Partition field on sarama.ProducerMessage to assign the partition, when this field is supposed to be written to (and not read) by sarama: https://github.com/cockroachdb/cockroach/blob/b4c8023d2ef7c4e4916359d0e692f3e60542fb5d/pkg/ccl/changefeedccl/sink_kafka.go#L416-L426

Before the commit above, we also supplied a custom partitioner to sarama that would read the Partition field, so this was fine: https://github.com/cockroachdb/cockroach/blob/b4c8023d2ef7c4e4916359d0e692f3e60542fb5d/pkg/ccl/changefeedccl/sink_kafka.go#L744-L751

But after the commit, the custom partitioner we supplied no longer had this wrapper struct and so we reverted to the default sarama behavior when a nil key was provided, namely randomly picking a partition. This means that even though we're emitting O(partitions) number of resolved events, it's not guaranteed that each partition will receive one, although if the random partitioner is relatively uniform, the expected value would be one resolved event per partition.

Jira issue: CRDB-38017

blathers-crl[bot] commented 5 months ago

cc @cockroachdb/cdc

andyyang890 commented 5 months ago

You can repro this issue by applying this patch and running the cdc/bank roachtest:

diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go
index 69194e7018b..d64e30065cf 100644
--- a/pkg/ccl/changefeedccl/sink_kafka.go
+++ b/pkg/ccl/changefeedccl/sink_kafka.go
@@ -377,6 +377,10 @@ func (s *kafkaSink) EmitRow(
        return s.emitMessage(ctx, msg)
 }

+type resolvedMetadata struct {
+       partition int32
+}
+
 // EmitResolvedTimestamp implements the Sink interface.
 func (s *kafkaSink) EmitResolvedTimestamp(
        ctx context.Context, encoder Encoder, resolved hlc.Timestamp,
@@ -419,6 +423,7 @@ func (s *kafkaSink) EmitResolvedTimestamp(
                                Partition: partition,
                                Key:       nil,
                                Value:     sarama.ByteEncoder(payload),
+                               Metadata:  resolvedMetadata{partition: partition},
                        }
                        if err := s.emitMessage(ctx, msg); err != nil {
                                return err
@@ -622,6 +627,11 @@ func (s *kafkaSink) finishProducerMessage(ackMsg *sarama.ProducerMessage, ackErr
                }
                m.alloc.Release(s.ctx)
        }
+       if m, ok := ackMsg.Metadata.(resolvedMetadata); ok {
+               if m.partition != ackMsg.Partition {
+                       panic(fmt.Sprintf("partition mismatch, requested %d, sent to %d", m.partition, ackMsg.Partition))
+               }
+       }
        if s.mu.flushErr == nil && ackError != nil {
                s.mu.flushErr = ackError
        }
diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go
index b419c6bb21d..710bd9e5d2a 100644
--- a/pkg/cmd/roachtest/tests/cdc.go
+++ b/pkg/cmd/roachtest/tests/cdc.go
@@ -1136,7 +1136,10 @@ func registerCDC(r registry.Registry) {
                        feed := ct.newChangefeed(feedArgs{
                                sinkType: kafkaSink,
                                targets:  allTpccTargets,
-                               opts:     map[string]string{"initial_scan": "'no'"},
+                               opts: map[string]string{
+                                       "initial_scan": "'no'",
+                                       "resolved":     "",
+                               },
                        })
                        ct.runFeedLatencyVerifier(feed, latencyTargets{
                                initialScanLatency: 3 * time.Minute,
@@ -2746,7 +2749,7 @@ func (k kafkaManager) createTopic(ctx context.Context, topic string) error {
                        return errors.Wrap(err, "admin client")
                }
                return admin.CreateTopic(topic, &sarama.TopicDetail{
-                       NumPartitions:     1,
+                       NumPartitions:     5,
                        ReplicationFactor: 1,
                }, false)
        })