cockroachdb / cockroach

CockroachDB — the cloud native, distributed SQL database designed for high availability, effortless scale, and control over data placement.
https://www.cockroachlabs.com
Other
29.87k stars 3.77k forks source link

CDC changefeed to Kafka retrying backfill non stop when Kafka errors out #65211

Open daniel-crlabs opened 3 years ago

daniel-crlabs commented 3 years ago

Describe the problem

Client enabled CDC changefeed into Kafka for a table with more than 4 billion rows. Job never completes It appears potentially some rows are too large for the sink to accept. CRDB does not proceed past that row and it'll keep retrying the backfill non stop.

To Reproduce

Analyzed logs and noticed a pattern and error messages indicating the problem.

Expected behavior Either complete the job or fail, instead of being stuck in an endless loop.

Additional data

Job status, notice high_water_timestamp is 0 since April 21st, which means this job hasn't started yet to send data to kafka.

job_id  job_type    description statement   user_name   descriptor_ids  status  running_status  created started finished    modified    fraction_completed  high_water_timestamp    error   coordinator_id

652005261758005251  CHANGEFEED  CREATE CHANGEFEED FOR TABLE nodes INTO 'kafka://{host}:{port}?metatron_server_app=kaas_kafka&tls_enabled=true&topic_prefix=loadtest_0421_' WITH avro_schema_prefix = 'loadtest_0421_crdb_data_plane_', confluent_schema_registry = 'https://{host}:{port}?metatron_server_app=ksschemaregistry', format = 'experimental_avro', updated     root    {54,57} running NULL    2021-04-21 23:10:15.136101+00:00    2021-04-21 23:10:15.72695+00:00 NULL    2021-04-21 23:10:15.136101+00:00    0   NULLNULL

CRDB errors from Kafka:

I210511 16:19:35.212299 337490368 ccl/changefeedccl/changefeed_processors.go:911 ⋮ [n3,job=652005261758005251] ‹job 652005261758005251› span ‹/Table/57/1/"p\x9{c\x90`\xf1\x83\x11\xea\xa7?\x12s\x9d\f\xfc}"-e\xf6\x
00Fs\x11餝\x12\x15\xb6\x1a/\xec"}› is behind by 450208h19m35.212294571s
I210511 16:19:35.522716 660506798 vendor/github.com/Shopify/sarama/async_producer.go:801 ⋮ [kafka-producer] producer/broker/4 maximum request accumulated, waiting for space
I210511 16:19:35.789597 660506798 vendor/github.com/Shopify/sarama/async_producer.go:801 ⋮ [kafka-producer] producer/broker/4 maximum request accumulated, waiting for space
I210511 16:19:36.055036 660506798 vendor/github.com/Shopify/sarama/async_producer.go:801 ⋮ [kafka-producer] producer/broker/4 maximum request accumulated, waiting for space
I210511 16:19:36.356937 660506798 vendor/github.com/Shopify/sarama/async_producer.go:801 ⋮ [kafka-producer] producer/broker/4 maximum request accumulated, waiting for space
I210511 16:19:36.730344 660506798 vendor/github.com/Shopify/sarama/async_producer.go:801 ⋮ [kafka-producer] producer/broker/4 maximum request accumulated, waiting for space
I210511 16:19:37.018298 660506798 vendor/github.com/Shopify/sarama/async_producer.go:801 ⋮ [kafka-producer] producer/broker/4 maximum request accumulated, waiting for space
I210511 16:19:37.373296 660506798 vendor/github.com/Shopify/sarama/async_producer.go:801 ⋮ [kafka-producer] producer/broker/4 maximum request accumulated, waiting for space
I210511 16:19:37.679778 660506798 vendor/github.com/Shopify/sarama/async_producer.go:801 ⋮ [kafka-producer] producer/broker/4 maximum request accumulated, waiting for space
I210511 16:19:38.065981 660506798 vendor/github.com/Shopify/sarama/async_producer.go:801 ⋮ [kafka-producer] producer/broker/4 maximum request accumulated, waiting for space

After a few minutes the operations seem to fail

I210511 16:27:13.570463 660506798 vendor/github.com/Shopify/sarama/async_producer.go:801 ⋮ [kafka-producer] producer/broker/4 maximum request accumulated, waiting for space
I210511 16:27:13.853258 660506798 vendor/github.com/Shopify/sarama/async_producer.go:801 ⋮ [kafka-producer] producer/broker/4 maximum request accumulated, waiting for space
I210511 16:27:14.056935 660506798 vendor/github.com/Shopify/sarama/async_producer.go:801 ⋮ [kafka-producer] producer/broker/4 maximum request accumulated, waiting for space
I210511 16:27:14.247631 660506798 vendor/github.com/Shopify/sarama/async_producer.go:801 ⋮ [kafka-producer] producer/broker/4 maximum request accumulated, waiting for space
I210511 16:27:14.426737 660506798 vendor/github.com/Shopify/sarama/async_producer.go:801 ⋮ [kafka-producer] producer/broker/4 maximum request accumulated, waiting for space
I210511 16:27:14.602590 660506798 vendor/github.com/Shopify/sarama/async_producer.go:801 ⋮ [kafka-producer] producer/broker/4 maximum request accumulated, waiting for space
I210511 16:27:14.824212 660506798 vendor/github.com/Shopify/sarama/async_producer.go:801 ⋮ [kafka-producer] producer/broker/4 maximum request accumulated, waiting for space
I210511 16:27:15.023263 660506798 vendor/github.com/Shopify/sarama/async_producer.go:801 ⋮ [kafka-producer] producer/broker/4 maximum request accumulated, waiting for space
I210511 16:27:15.261271 660506798 vendor/github.com/Shopify/sarama/async_producer.go:801 ⋮ [kafka-producer] producer/broker/4 maximum request accumulated, waiting for space
I210511 16:27:17.208938 662132545 vendor/github.com/Shopify/sarama/async_producer.go:1003 ⋮ [kafka-producer] ‹Producer shutting down.›
I210511 16:27:17.208989 660506468 vendor/github.com/Shopify/sarama/client.go:227 ⋮ [kafka-producer] ‹Closing Client›
I210511 16:27:17.209037 660506798 vendor/github.com/Shopify/sarama/async_producer.go:789 ⋮ [kafka-producer] producer/broker/4 shut down
I210511 16:27:17.209039 660506764 vendor/github.com/Shopify/sarama/async_producer.go:789 ⋮ [kafka-producer] producer/broker/2 shut down
I210511 16:27:17.209050 660506767 vendor/github.com/Shopify/sarama/async_producer.go:789 ⋮ [kafka-producer] producer/broker/5 shut down
I210511 16:27:17.209042 660506802 vendor/github.com/Shopify/sarama/async_producer.go:789 ⋮ [kafka-producer] producer/broker/3 shut down
I210511 16:27:17.209066 660506790 vendor/github.com/Shopify/sarama/async_producer.go:789 ⋮ [kafka-producer] producer/broker/0 shut down
I210511 16:27:17.209073 660506821 vendor/github.com/Shopify/sarama/async_producer.go:789 ⋮ [kafka-producer] producer/broker/1 shut down
I210511 16:27:17.209087 662133122 vendor/github.com/Shopify/sarama/broker.go:252 ⋮ [kafka-producer] Closed connection to broker ‹{host}:{port}›
I210511 16:27:17.209059 662133126 vendor/github.com/Shopify/sarama/broker.go:254 ⋮ [kafka-producer] Error while closing connection to broker ‹{host}:{port}›: write tcp ‹100.66.199.169:17684› -> ‹100.65.237.219:7102›: ‹i/o timeout›
I210511 16:27:17.209109 662133124 vendor/github.com/Shopify/sarama/broker.go:252 ⋮ [kafka-producer] Closed connection to broker ‹{host}:{port}›
I210511 16:27:17.209118 662133126 vendor/github.com/Shopify/sarama/utils.go:53 ⋮ [kafka-producer] ‹Error closing broker›-1‹:›write tcp ‹{host}:{port}› -> ‹{host}:{port}›: ‹i/o timeout›
I210511 16:27:17.209124 662133123 vendor/github.com/Shopify/sarama/broker.go:252 ⋮ [kafka-producer] Closed connection to broker ‹{host}:{port}›
I210511 16:27:17.209140 662132673 vendor/github.com/Shopify/sarama/broker.go:252 ⋮ [kafka-producer] Closed connection to broker ‹{host}:{port}›
I210511 16:27:17.209139 662132672 vendor/github.com/Shopify/sarama/broker.go:252 ⋮ [kafka-producer] Closed connection to broker ‹{host}:{port}›
I210511 16:27:17.209158 662133125 vendor/github.com/Shopify/sarama/broker.go:252 ⋮ [kafka-producer] Closed connection to broker ‹{host}:{port}2›
W210511 16:28:11.759487 337490368 ccl/changefeedccl/changefeed_stmt.go:623 ⋮ [n3,job=652005261758005251] CHANGEFEED job 652005261758005251 encountered retryable error: ‹retryable changefeed error›: ‹kafka: Failed to produce message to topic loadtest_0421_nodes: kafka server: Message was too large, server rejected it to avoid allocation error.›
I210511 16:28:11.759524 662145568 vendor/github.com/Shopify/sarama/broker.go:254 ⋮ [kafka-producer] Error while closing connection to broker ‹{host}:{port}›: write tcp ‹100.66.199.169:40620› -> ‹100.65.201.192:7102›: ‹i/o timeout›

Eventually the same process repeats itself. It seems to start up but eventually dies again:

I210511 16:28:22.536059 662147728 vendor/github.com/Shopify/sarama/client.go:174 ⋮ [kafka-producer] ‹Successfully initialized new client›
I210511 16:28:22.538025 662147563 ccl/changefeedccl/schemafeed/schema_feed.go:500 ⋮ [n3,job=652005261758005251] validate ‹57:7@1617410427.314317340,0›
I210511 16:28:22.853993 778 server/status/runtime.go:525 ⋮ [n3] runtime stats: 48 GiB RSS, 1734 goroutines, 2.4 GiB/12 GiB/7.2 GiB GO alloc/idle/total, 32 GiB/40 GiB CGO alloc/total, 1481.8 CGO/sec, 88.9/150.6 %(
u/s)time, 0.0 %gc (1x), 7.5 MiB/1.5 MiB (r/w)net
I210511 16:28:23.258781 337490368 ccl/changefeedccl/changefeed_processors.go:911 ⋮ [n3,job=652005261758005251] ‹job 652005261758005251› span ‹/Table/57/1{-/"\b\x94\xb9\xf2gu\x11\xe9\xafN\n\x81\xb7;\xa4\xba"}› is
behind by 450208h28m23.258777409s
I210511 16:28:23.492685 662148024 vendor/github.com/Shopify/sarama/async_producer.go:710 ⋮ [kafka-producer] producer/broker/0 starting up
I210511 16:28:23.492744 662148024 vendor/github.com/Shopify/sarama/async_producer.go:721 ⋮ [kafka-producer] producer/broker/0 state change to [open] on ‹loadtest_0421_nodes›/20
I210511 16:28:23.492809 662148024 vendor/github.com/Shopify/sarama/async_producer.go:721 ⋮ [kafka-producer] producer/broker/0 state change to [open] on ‹loadtest_0421_nodes›/56
I210511 16:28:23.493155 662148027 vendor/github.com/Shopify/sarama/async_producer.go:710 ⋮ [kafka-producer] producer/broker/4 starting up
I210511 16:28:23.493191 662148027 vendor/github.com/Shopify/sarama/async_producer.go:721 ⋮ [kafka-producer] producer/broker/4 state change to [open] on ‹loadtest_0421_nodes›/16
I210511 16:28:23.493228 662148027 vendor/github.com/Shopify/sarama/async_producer.go:721 ⋮ [kafka-producer] producer/broker/4 state change to [open] on ‹loadtest_0421_nodes›/28
I210511 16:28:23.493629 662148230 vendor/github.com/Shopify/sarama/async_producer.go:710 ⋮ [kafka-producer] producer/broker/5 starting up
I210511 16:28:23.493665 662148230 vendor/github.com/Shopify/sarama/async_producer.go:721 ⋮ [kafka-producer] producer/broker/5 state change to [open] on ‹loadtest_0421_nodes›/57
I210511 16:28:23.494133 662148024 vendor/github.com/Shopify/sarama/async_producer.go:721 ⋮ [kafka-producer] producer/broker/0 state change to [open] on ‹loadtest_0421_nodes›/32
I210511 16:28:23.494128 662148233 vendor/github.com/Shopify/sarama/async_producer.go:710 ⋮ [kafka-producer] producer/broker/2 starting up
I210511 16:28:23.494176 662148024 vendor/github.com/Shopify/sarama/async_producer.go:721 ⋮ [kafka-producer] producer/broker/0 state change to [open] on ‹loadtest_0421_nodes›/14
I210511 16:28:23.494167 662148230 vendor/github.com/Shopify/sarama/async_producer.go:721 ⋮ [kafka-producer] producer/broker/5 state change to [open] on ‹loadtest_0421_nodes›/9
I210511 16:28:23.494171 662148027 vendor/github.com/Shopify/sarama/async_producer.go:721 ⋮ [kafka-producer] producer/broker/4 state change to [open] on ‹loadtest_0421_nodes›/40
I210511 16:28:23.494206 662148230 vendor/github.com/Shopify/sarama/async_producer.go:721 ⋮ [kafka-producer] producer/broker/5 state change to [open] on ‹loadtest_0421_nodes›/3
I210511 16:28:23.494173 662148233 vendor/github.com/Shopify/sarama/async_producer.go:721 ⋮ [kafka-producer] producer/broker/2 state change to [open] on ‹loadtest_0421_nodes›/55
I210511 16:28:23.494238 662147339 vendor/github.com/Shopify/sarama/async_producer.go:710 ⋮ [kafka-producer] producer/broker/3 starting up
I210511 16:28:23.494241 662148233 vendor/github.com/Shopify/sarama/async_producer.go:721 ⋮ [kafka-producer] producer/broker/2 state change to [open] on ‹loadtest_0421_nodes›/1
I210511 16:28:23.494191 662148024 vendor/github.com/Shopify/sarama/async_producer.go:721 ⋮ [kafka-producer] producer/broker/0 state change to [open] on ‹loadtest_0421_nodes›/50
I210511 16:28:23.494264 662148233 vendor/github.com/Shopify/sarama/async_producer.go:721 ⋮ [kafka-producer] producer/broker/2 state change to [open] on ‹loadtest_0421_nodes›/49
I210511 16:28:23.494263 662147339 vendor/github.com/Shopify/sarama/async_producer.go:721 ⋮ [kafka-producer] producer/broker/3 state change to [open] on ‹loadtest_0421_nodes›/6
I210511 16:28:23.494290 662148024 vendor/github.com/Shopify/sarama/async_producer.go:721 ⋮ [kafka-producer] producer/broker/0 state change to [open] on ‹loadtest_0421_nodes›/44
I210511 16:28:23.494298 662148233 vendor/github.com/Shopify/sarama/async_producer.go:721 ⋮ [kafka-producer] producer/broker/2 state change to [open] on ‹loadtest_0421_nodes›/31
I210511 16:28:23.494199 662147565 vendor/github.com/Shopify/sarama/async_producer.go:710 ⋮ [kafka-producer] producer/broker/1 starting up
I210511 16:28:23.501266 662147565 vendor/github.com/Shopify/sarama/async_producer.go:721 ⋮ [kafka-producer] producer/broker/1 state change to [open] on ‹loadtest_0421_nodes›/59
I210511 16:28:23.506869 662148233 vendor/github.com/Shopify/sarama/async_producer.go:721 ⋮ [kafka-producer] producer/broker/2 state change to [open] on ‹loadtest_0421_nodes›/43
I210511 16:28:23.512705 662148023 vendor/github.com/Shopify/sarama/broker.go:211 ⋮ [kafka-producer] Connected to broker at ‹{host}:{port}› (registered as #0)
I210511 16:28:23.513895 662147338 vendor/github.com/Shopify/sarama/broker.go:211 ⋮ [kafka-producer] Connected to broker at ‹{host}:{port}› (registered as #3)
I210511 16:28:23.514520 662148026 vendor/github.com/Shopify/sarama/broker.go:211 ⋮ [kafka-producer] Connected to broker at ‹{host}:{port}› (registered as #4)
I210511 16:28:23.514993 662148232 vendor/github.com/Shopify/sarama/broker.go:211 ⋮ [kafka-producer] Connected to broker at ‹{host}:{port}› (registered as #2)
I210511 16:28:23.515296 662147991 vendor/github.com/Shopify/sarama/broker.go:211 ⋮ [kafka-producer] Connected to broker at ‹{host}:{port}› (registered as #1)
I210511 16:28:23.516158 662148229 vendor/github.com/Shopify/sarama/broker.go:211 ⋮ [kafka-producer] Connected to broker at ‹{host}:{port}› (registered as #5)
E210511 16:28:24.944035 662148441 server/server.go:1901 ⋮ [n3,client=‹127.0.0.1:29492›] serving SQL client conn: ‹EOF›
I210511 16:28:26.304179 662148230 vendor/github.com/Shopify/sarama/async_producer.go:801 ⋮ [kafka-producer] producer/broker/5 maximum request accumulated, waiting for space
I210511 16:28:26.608243 662148230 vendor/github.com/Shopify/sarama/async_producer.go:801 ⋮ [kafka-producer] producer/broker/5 maximum request accumulated, waiting for space

this only goes on for a few seconds, then the following start showing up again:

I210511 16:28:53.498247 662148230 vendor/github.com/Shopify/sarama/async_producer.go:801 ⋮ [kafka-producer] producer/broker/5 maximum request accumulated, waiting for space
I210511 16:28:53.766438 662148230 vendor/github.com/Shopify/sarama/async_producer.go:801 ⋮ [kafka-producer] producer/broker/5 maximum request accumulated, waiting for space
I210511 16:28:54.876971 662148233 vendor/github.com/Shopify/sarama/async_producer.go:801 ⋮ [kafka-producer] producer/broker/2 maximum request accumulated, waiting for space

and then a few minutes later:

I210511 16:38:21.999908 337490368 ccl/changefeedccl/changefeed_processors.go:911 ⋮ [n3,job=652005261758005251] ‹job 652005261758005251› span ‹/Table/57/1/"\x00{ \xe6\x10\x1d\xdf\x11\xea\xaa\x06\x12\xba~D\xfa-"-A\
x93M\xdeW\x11ꗋ\n\xf1D\x8eF\xad"}› is behind by 450208h38m21.99990493s
I210511 16:38:22.504931 662147593 vendor/github.com/Shopify/sarama/client.go:775 ⋮ [kafka-producer] client/metadata fetching metadata for all topics from broker ‹{host}:{port}›
I210511 16:38:22.505050 662147593 vendor/github.com/Shopify/sarama/client.go:813 ⋮ [kafka-producer] client/metadata got error from broker -1 while fetching metadata: ‹EOF›
I210511 16:38:22.505122 662147593 vendor/github.com/Shopify/sarama/broker.go:252 ⋮ [kafka-producer] Closed connection to broker ‹{host}:{port}›
I210511 16:38:22.505144 662147593 vendor/github.com/Shopify/sarama/client.go:775 ⋮ [kafka-producer] client/metadata fetching metadata for all topics from broker ‹{host}:{port}›
I210511 16:38:22.532687 662259764 vendor/github.com/Shopify/sarama/broker.go:211 ⋮ [kafka-producer] Connected to broker at ‹{host}:{port}› (registered as #0)
I210511 16:38:22.536310 662147558 vendor/github.com/Shopify/sarama/client.go:775 ⋮ [kafka-producer] client/metadata fetching metadata for all topics from broker ‹{host}:{port}›
I210511 16:38:22.539512 662147558 vendor/github.com/Shopify/sarama/client.go:813 ⋮ [kafka-producer] client/metadata got error from broker -1 while fetching metadata: ‹EOF›
I210511 16:38:22.539589 662147558 vendor/github.com/Shopify/sarama/broker.go:254 ⋮ [kafka-producer] Error while closing connection to broker ‹{host}:{port}›: write tcp ‹{host}:{port}› -> ‹{host}:{port}›: write: connection reset by peer
I210511 16:38:22.539685 662147558 vendor/github.com/Shopify/sarama/client.go:775 ⋮ [kafka-producer] client/metadata fetching metadata for all topics from broker ‹{host}:{port}›
E210511 16:38:22.847401 662259441 server/server.go:1901 ⋮ [n3,client=‹127.0.0.1:30624›] serving SQL client conn: ‹EOF›

Environment:

Additional context What was the impact?

Can't send data from CRDB to Kafka

gz#8388

Jira issue: CRDB-7494

miretskiy commented 3 years ago

It appears that that was happening during the backfill? @daniel-crlabs can you confirm?

daniel-crlabs commented 2 years ago

My apologies, I just saw this message. How would I be able to tell when this happened?

It appears that that was happening during the backfill? @daniel-crlabs can you confirm?

daniel-crlabs commented 2 years ago

This is what the customer stated when they originally created the ticket, so I assume yes:

We have a cluster that is not currently receiving any traffic. There is a table with 4,536,254,464 rows. We enabled CDC changefeed on the table on 4/21. We are seeing approximately 400,000k messages per second emitted by the CRDB cluster to Kafka. At that rate, we calculate the entire backfill should be emitted in 11340.63616 seconds (or significantly less than one day). We are, however, still seeing messages emitted to Kafka over 10 days later at the same rate.

kmanamcheri commented 2 years ago

We are hitting this too. We have a table with ~15M rows and the backfill never completes and the kafka topic has over 15 billion messages with multiple duplicates.

What is the recommendation for this?

In my particular case, the error from Kafka is that the message size is too big. I increased the message size, but I never know how large a row could be and I don't know what to set the message size to be.

amruss commented 2 years ago

Hi guys - wanted to follow up on this. We're looking into product-level ways to address invalid message. I'd love to get your feedback on some features we are considering.

miretskiy commented 1 year ago

Do you think we can close this issue w/ recent dynamic batching?

kmanamcheri commented 1 year ago

@miretskiy Is this particular issue fixed though? Does CDC still continually retry and never finish if there's a Kafka error?

miretskiy commented 1 year ago

22.2.0 has dynamic batching. CDC still continuously retries, but it automatically reduces batch size if necessary.

afilipchik commented 1 year ago

@miretskiy, what is the new behavior? Will it retry failed batch, or will restart the table with a smaller batch?

miretskiy commented 1 year ago

it will retry failed batch -- with fewer messages.

miretskiy commented 1 year ago

@kmanamcheri still retries (at this point forever) since we can't know if the error is transient. This will change in the near future.