aws-samples / mirrormaker2-msk-migration

MIT No Attribution
26 stars 29 forks source link

Consumer offsets are not getting translated to destination #2

Open shravangit20 opened 3 years ago

shravangit20 commented 3 years ago

We have run the mirrormaker2 on a kafka connect framework as per the instructions and notice the topics/data migrated from source to destination. However the offsets on the destination seem to be started from "0" and not translated from the source. This would leave all clients to consume the offsets from the beginning after switching to MSK. I am assuming some issue with the consumer group configuration but not able to identify what is missing.

Below are the configurations used: mm2-msc-cust-repl-policy: { "name": "mm2-msc", "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "clusters": "msksource,mskdest", "source.cluster.alias": "msksource", "target.cluster.alias": "mskdest", "target.cluster.bootstrap.servers": "Destination MSK bootstraps", "source.cluster.bootstrap.servers": "Source kafka bootstraps", "topics": ".*", "tasks.max": "4", "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "replication.policy.class": "com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy", "replication.factor": "3", "offset-syncs.topic.replication.factor": "3", "sync.topic.configs.interval.seconds": "20", "refresh.topics.interval.seconds": "20", "refresh.groups.interval.seconds": "20", "consumer.group.id": "preprod-cgi", "producer.enable.idempotence":"true", "sync.topic.acls.enabled": "false" } The consumer.group.id specified here was not created by mirrormaker2, we used the same consumer.group.id while running the Group offset sync application and it could not identify the consumer group. Our understanding is this is the consumer group created for mirror maker consumer and after the mirror maker producer writes the data to MSK these offsets with respect the "preprod-cgi" will be translated in destination.

java -jar /tmp/kafka/MM2GroupOffsetSync-1.0-SNAPSHOT.jar -cgi preprod-cgi -src msksource-ps -pfp /tmp/kafka/consumer.properties -rpc com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy 2>&1 > /tmp/preprod-v4.log &

mm2-cpc-cust-repl-policy: {

"target.cluster.bootstrap.servers": "Destination MSK bootstraps",
"source.cluster.bootstrap.servers": "Source kafka bootstraps",
"tasks.max": "1",
"key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"replication.policy.class": "com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy",
"replication.factor": "3",
"checkpoints.topic.replication.factor": "3",
"emit.checkpoints.interval.seconds": "20"

}

Also while starting the kafka connect service, it uses a connect-distributed.properties file which has a group.id in it. Does this have any particular significance or can create any conflicts with the above consumer group id from mm2-msc-cust-repl-policy:??

Reference for source vs destination offsets post migration: Source: agent-payloads:0:235943806 agent-payloads:1:0 agent-payloads:2:0 agent-payloads:3:0 agent-payloads:4:0

Destination: agent-payloads:0:24483600 agent-payloads:1:0 agent-payloads:2:0 agent-payloads:3:0 agent-payloads:4:0

Thanks and appreciate any help on this.

Thanks, Shravan

shravangit20 commented 3 years ago

@rcchakr It would be great if you can share your thoughts on the above situation. Thanks in advance.

rcchakr commented 3 years ago

Hi @shravangit20 The consumer group id for the MM2GroupOffsetSync application (for the -cgi parameter) should be the consumer group id for the consumer group you're trying to migrate from the source to the destination cluster and not the consumer group id for MM2 or the underlying Kafka Connect cluster. So, if you have a consumer running in the source cluster with a group.id of consumer1 against topic ExampleTopic which you're migrating to the destination cluster, you would specify that group, since that is the group that is being checkpointed to the destination cluster internal Checkpoint topic for MM2 which the MM2GroupOffsetSync application reads and sync to the destination cluster's __consumer_offsets internal topic.

Hope this helps.

shravangit20 commented 3 years ago

Thank you @rcchakr for clarifying. Really appreciate your timely response.So if I understand correctly I need to use the existing "source consumer group id"while running the MM2GroupOffsetSync application? We have about 20+ consumer groups on the source so after running all connectors we need to run the MM2GroupOffset Application for each of the consumer groups correct? Also, as per the AWS Lab documentation the consumer.properties file used by the MM2GroupOffset has the destination bootstrap severs only, there is no source bootstrap servers picked by the MM2GroupOffset except for the consumer groups we are passing which belong to source correct? One last question: Upon running the MM2GroupOffset with the source consumer group, the same consumer group will be mirrored on to the destination correct? Thanks, Shravan

rcchakr commented 3 years ago

Hi @shravangit20 Yes, you would have to run a separate instance of the app for each of the consumer groups or you could modify the code to run it for all consumer groups ids. Also, yes you only need the destination bootstrap brokers as this runs entirely in the destination cluster and syncs the checkpoint topic and the _cconsumer_offsets topic in the destination cluster. MM2 does the work of syncing the source consumer group ids and offsets from the source to the destination checkpoints topic. The answer to your last question is yes.

shravangit20 commented 3 years ago

Awesome. We will try another run. Thank you!!