Open phuongvu opened 1 month ago
Hey @asddongmen, since you implemented the Glue integration, would you mind sharing some insights on this issue? Maybe there is a workaround? Thank you!
Some logs
May 08 19:18:34 ip-.ec2.internal cdc[541201]: [2024/05/08 19:18:34.426 +00:00] [INFO] [glue_schema_registry.go:108] ["Schema already exists in registry, update it"] [schemaName=stats_events_db_conv_staging_cdc_v0-key] May 08 19:18:34 ip-.ec2.internal cdc[541201]: [2024/05/08 19:18:34.464 +00:00] [ERROR] [glue_schema_registry.go:219] ["GetCachedOrRegister: Could not register schema"] [error="operation error Glue: RegisterSchemaVersion, https response error StatusCode: 400, RequestID: 78473ce4-3981-4230-89ac-bcd16f57f21c, ConcurrentModificationException: Some other operation happened for the schema, please retry again. SchemaName: staging_cdc_v0-key, RegistryName: staging_cdc, SchemaArn: arn:aws:glue:us-east-1::schema/staging_cdc/staging_cdc_v0-key"] [errorVerbose="operation error Glue: RegisterSchemaVersion, https response error StatusCode: 400, RequestID: 78473ce4-3981-4230-89ac-bcd16f57f21c, ConcurrentModificationException: Some other operation happened for the schema, please retry again. SchemaName: staging_cdc_v0-key, RegistryName: staging_cdc, SchemaArn: arn:aws:glue:us-east-1::schema/staging_cdc/staging_cdc_v0-key\ngithub.com/pingcap/errors.AddStack\n\tgithub.com/pingcap/errors@v0.11.5-0.20231212100244-799fae176cfb/errors.go:174\ngithub.com/pingcap/errors.Trace\n\tgithub.com/pingcap/errors@v0.11.5-0.20231212100244-799fae176cfb/juju_adaptor.go:15\ngithub.com/pingcap/tiflow/pkg/sink/codec/avro.(*glueSchemaManager).updateSchema\n\tgithub.com/pingcap/tiflow/pkg/sink/codec/avro/glue_schema_registry.go:289\ngithub.com/pingcap/tiflow/pkg/sink/codec/avro.(*glueSchemaManager).Register\n\tgithub.com/pingcap/tiflow/pkg/sink/codec/avro/glue_schema_registry.go:109\ngithub.com/pingcap/tiflow/pkg/sink/codec/avro.(*glueSchemaManager).GetCachedOrRegister\n\tgithub.com/pingcap/tiflow/pkg/sink/codec/avro/glue_schema_registry.go:217\ngithub.com/pingcap/tiflow/pkg/sink/codec/avro.(*BatchEncoder).getKeySchemaCodec\n\tgithub.com/pingcap/tiflow/pkg/sink/codec/avro/avro.go:154\ngithub.com/pingcap/tiflow/pkg/sink/codec/avro.(*BatchEncoder).encodeKey\n\tgithub.com/pingcap/tiflow/pkg/sink/codec/avro/avro.go:89\ngithub.com/pingcap/tiflow/pkg/sink/codec/avro.(*BatchEncoder).AppendRowChangedEvent\n\tgithub.com/pingcap/tiflow/pkg/sink/codec/avro/avro.go:215\ngithub.com/pingcap/tiflow/pkg/sink/codec.(*encoderGroup).runEncoder\n\tgithub.com/pingcap/tiflow/pkg/sink/codec/encoder_group.go:143\ngithub.com/pingcap/tiflow/pkg/sink/codec.(*encoderGroup).Run.func2\n\tgithub.com/pingcap/tiflow/pkg/sink/codec/encoder_group.go:115\ngolang.org/x/sync/errgroup.(*Group).Go.func1\n\tgolang.org/x/sync@v0.6.0/errgroup/errgroup.go:78\nruntime.goexit\n\truntime/asm_arm64.s:1197"]
It appears that this issue is due to all the tables sinking to a single Kafka topic, causing them to share a schemaName. However, these tables have different tableVersion
values. This inconsistency often leads to the failure of the check at https://github.com/pingcap/tiflow/blob/6f0f2e3b6084c43a84f1b8ad4dd1dae4577950af/pkg/sink/codec/avro/glue_schema_registry.go#L190, since the tableVersion
values differ. So a new schema will be update to schemaRegistry.
A possible workaround is to sink different tables to different topics. You can refer to: https://docs.pingcap.com/tidb/dev/ticdc-sink-to-kafka#topic-dispatchers
@phuongvu
If there are multiple tables, and they were dispatched to the same topic, so the schema key is the same.
The encoder group runs 32 encoders concurrently, each one may fetch the schema independently, this may cause the issue.
/severity moderate
@phuongvu Could you please provide the changefeed's config to help us investigate further?
It appears that this issue is due to all the tables sinking to a single Kafka topic,
Thank you guys for the insight! It makes sense that my setup causes this issue :( I'll go with the workaround!
Here is my config as requested:
cdc cli changefeed create --changefeed-id="staging-v0" --server=http://localhost:2017 --sink-uri="kafka://localhost:9092/staging_cdc.v0?protocol=avro&partition-num=24&compression=zstd&max-message-bytes=67108864&replication-factor=2&enable-tidb-extension=true&avro-decimal-handling-mode=string&avro-bigint-unsigned-handling-mode=string" --config /etc/tikv/db-conv-staging.toml
With the changefeed.toml
config:
case-sensitive = false
changefeed-error-stuck-duration = "30m"
[mounter]
[filter]
rules = ['sa_events.events_conv_staging_*']
[[filter.event-filters]]
ignore-event = ["all ddl"] # Ignore all ddl events.
[scheduler]
enable-table-across-nodes = true
region-threshold = 10000
# write-key-threshold = 30000
[sink]
dispatchers = [
{matcher = ['sa_events.events_conv_staging_*'], partition = "index-value"},
]
protocol = "avro"
[sink.kafka-config.codec-config]
[integrity]
integrity-check-level = "correctness"
It might not be related but I try to sink all the tables to a single Kafka topic using canal-json
(since it doesn't require schema registry). But I also got an error doing this as well. So maybe this limitation (i.e. sink all the tables to a single Kafka topic) is also applied to other protocol as well (e.g. canal-json
in this case)
[
{
"id": "staging-v0",
"namespace": "default",
"summary": {
"state": "warning",
"tso": 449651699691815041,
"checkpoint": "2024-05-09 20:05:09.298",
"error": {
"time": "2024-05-09T20:18:56.453120599Z",
"addr": "172.30.67.215:2017",
"code": "CDC:ErrKafkaSendMessage",
"message": "[CDC:ErrKafkaSendMessage]kafka send message failed: kafka: Failed to deliver 24 messages."
}
}
}
]
Interestingly, the error message Failed to deliver 24 messages.
coincides with what I was trying to do: sinking 24 tables to one topic using canal-json
.
1 # https://docs.pingcap.com/tidb/v8.0/ticdc-changefeed-config
2 case-sensitive = false
3 changefeed-error-stuck-duration = "30m"
4
5 [filter]
6 rules = ['sa_events.events_conv_staging_*']
7
8 [[filter.event-filters]]
9 ignore-event = ["all ddl","delete"] # Ignore all ddl events.
10
11 [scheduler]
12 enable-table-across-nodes = true
13 region-threshold = 1000
14 # write-key-threshold = 30000
15
16 [sink]
17 dispatchers = [
18 {matcher = ['sa_events.events_conv_staging_*'], partition = "index-value"},
19 ]
20
21 protocol = "canal-json"
Some logs:
- May 09 19:21:22 ip-.ec2.internal cdc[541201]: [2024/05/09 19:21:22.961 +00:00] [WARN] [options.go:608] ["topic's `max.message.bytes` less than the `max-message-bytes`,use topic's `max.message.bytes` to initialize the Kafka producer"] [max.message.bytes=1048588] [max-message-bytes=67108864] [real-max-message-bytes=1048460]
May 09 19:21:22 ip-.ec2.internal cdc[541201]: [2024/05/09 19:21:22.961 +00:00] [WARN] [options.go:623] ["topic already exist, TiCDC will not create the topic"] [topic=stats.staging_cdc.v0] [detail="{"Name":"stats.staging_cdc.v0","NumPartitions":24,"ReplicationFactor":0}"]
May 09 19:21:22 ip-.ec2.internal cdc[541201]: [2024/05/09 19:21:22.961 +00:00] [INFO] [kafka_manager.go:136] ["store topic partition number"] [namespace=default] [changefeed=conv-staging-v0] [topic=stats.staging_cdc.v0] [partitionNumber=24]
May 09 19:21:22 ip-.ec2.internal cdc[541201]: [2024/05/09 19:21:22.961 +00:00] [INFO] [tz.go:43] ["Load the timezone specified by the user"] [timezoneName=UTC] [timezone=UTC]
- May 09 19:21:22 ip-.ec2.internal cdc[541201]: [2024/05/09 19:21:22.961 +00:00] [INFO] [kafka_ddl_sink.go:107] ["Try to create a DDL sink producer"] [changefeed=default/conv-staging-v0]
May 09 19:21:22 ip-.ec2.internal cdc[541201]: [2024/05/09 19:21:22.961 +00:00] [INFO] [sarama.go:96] ["Kafka producer uses zstd compression algorithm"]
May 09 19:21:22 ip-.ec2.internal cdc[541201]: [2024/05/09 19:21:22.963 +00:00] [INFO] [kafka_ddl_sink.go:116] ["DDL sink producer client created"] [duration=1.73655ms]
May 09 19:21:22 ip-.ec2.internal cdc[541201]: [2024/05/09 19:21:22.969 +00:00] [WARN] [ddl_sink.go:170] ["owner ddl sink fails on action"] [namespace=default] [changefeed=conv-staging-v0] [action=writeCheckpointTs] [retryable=true] [error="[CDC:ErrKafkaSendMessage]kafka send message failed: kafka: Failed to deliver 24 messages."] [errorVerbose="[CDC:ErrKafkaSendMessage]kafka send message failed: kafka: Failed to deliver 24 messages.ngithub.com/pingcap/errors.AddStackntgithub.com/pingcap/errors@v0.11.5-0.20231212100244-799fae176cfb/errors.go:174ngithub.com/pingcap/errors.(*Error).GenWithStackByArgs\n\tgithub.com/pingcap/errors@v0.11.5-0.20231212100244-799fae176cfb/normalize.go:164\ngithub.com/pingcap/tiflow/pkg/errors.WrapError\n\tgithub.com/pingcap/tiflow/pkg/errors/helper.go:34\ngithub.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer.(*kafkaDDLProducer).SyncBroadcastMessagentgithub.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer/kafka_ddl_producer.go:74ngithub.com/pingcap/tiflow/cdc/sink/ddlsink/mq.(*DDLSink).WriteCheckpointTs\n\tgithub.com/pingcap/tiflow/cdc/sink/ddlsink/mq/mq_ddl_sink.go:180\ngithub.com/pingcap/tiflow/cdc/owner.(*ddlSinkImpl).writeCheckpointTs.func1ntgithub.com/pingcap/tiflow/cdc/owner/ddl_sink.go:227ngithub.com/pingcap/tiflow/cdc/owner.(*ddlSinkImpl).retrySinkAction\n\tgithub.com/pingcap/tiflow/cdc/owner/ddl_sink.go:166\ngithub.com/pingcap/tiflow/cdc/owner.(*ddlSinkImpl).observedRetrySinkAction.func1ntgithub.com/pingcap/tiflow/cdc/owner/ddl_sink.go:198nruntime.goexitntruntime/asm_arm64.s:1197"]
May 09 19:21:23 ip-.ec2.internal cdc[541201]: [2024/05/09 19:21:23.003 +00:00] [WARN] [changefeed.go:325] ["an warning occurred in Owner"] [namespace=default] [changefeed=conv-staging-v0] [error="[CDC:ErrKafkaSendMessage]kafka send message failed: kafka: Failed to deliver 24 messages."] [errorVerbose="[CDC:ErrKafkaSendMessage]kafka send message failed: kafka: Failed to deliver 24 messages.ngithub.com/pingcap/errors.AddStackntgithub.com/pingcap/errors@v0.11.5-0.20231212100244-799fae176cfb/errors.go:174ngithub.com/pingcap/errors.(*Error).GenWithStackByArgs\n\tgithub.com/pingcap/errors@v0.11.5-0.20231212100244-799fae176cfb/normalize.go:164\ngithub.com/pingcap/tiflow/pkg/errors.WrapError\n\tgithub.com/pingcap/tiflow/pkg/errors/helper.go:34\ngithub.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer.(*kafkaDDLProducer).SyncBroadcastMessagentgithub.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer/kafka_ddl_producer.go:74ngithub.com/pingcap/tiflow/cdc/sink/ddlsink/mq.(*DDLSink).WriteCheckpointTs\n\tgithub.com/pingcap/tiflow/cdc/sink/ddlsink/mq/mq_ddl_sink.go:180\ngithub.com/pingcap/tiflow/cdc/owner.(*ddlSinkImpl).writeCheckpointTs.func1ntgithub.com/pingcap/tiflow/cdc/owner/ddl_sink.go:227ngithub.com/pingcap/tiflow/cdc/owner.(*ddlSinkImpl).retrySinkAction\n\tgithub.com/pingcap/tiflow/cdc/owner/ddl_sink.go:166\ngithub.com/pingcap/tiflow/cdc/owner.(*ddlSinkImpl).observedRetrySinkAction.func1ntgithub.com/pingcap/tiflow/cdc/owner/ddl_sink.go:198nruntime.goexitntruntime/asm_arm64.s:1197"]
May 09 19:21:23 ip-.ec2.internal cdc[541201]: [2024/05/09 19:21:23.583 +00:00] [INFO] [kafka_manager.go:181] ["Kafka admin client describe topics success"] [namespace=default] [changefeed=conv-staging-v0] [duration=3.774µs]
@phuongvu
May I ask, did the workaround for Avro protocol work?
Regarding your question about the cana-json protocol, it shouldn't be impacted by this issue. I have not been able to reproduce it in my local environment.
If possible, providing the complete ticdc log could assist us in further investigating this matter.
Hey @asddongmen, we sort of put the project that gonna use TiCDC on hold right now so I haven't had the chance to try to try the workaround yet but I think it should work. Re: log for canal-json, I can try the find the log and post it here.
What did you do?
We created:
Whenever the tables are created, changefeed task would try to register the schema version at the same time for all the tables and that causes rate limited errors.
What did you expect to see?
No concurrently update the schema version for the same schema https://github.com/pingcap/tiflow/blob/master/pkg/sink/codec/avro/avro.go#L154
What did you see instead?
operation error Glue: RegisterSchemaVersion, exceeded maximum number of attempts, 3, https response error StatusCode: 400, RequestID: c85a2ee3-b98b-459e-a621-d58a89b045bd, api error ThrottlingException: Rate exceeded
operation error Glue: RegisterSchemaVersion, https response error StatusCode: 400, RequestID: 45b24c85-eb3c-4936-bc37-4e46f18efd61, ConcurrentModificationException: Some other operation happened for the schema, please retry again. SchemaName: staging_cdc_v0-key, RegistryName: staging_cdc, SchemaArn: arn:aws:glue:us-east-1:131234521375:schema/staging_cdc/staging_cdc_v0-key
Pause and resume changefeed sometimes fixes this but I was hoping we can handle this within ticdc.
Versions of the cluster
Upstream TiDB cluster version (execute
SELECT tidb_version();
in a MySQL client):Upstream TiKV version (execute
tikv-server --version
):TiCDC version (execute
cdc version
):