This repository accompanies the Amazon MSK migration lab. It includes resources used in the lab including AWS CloudFormation templates, configuration files and Java code. This repository differs from the lab in deploying MirrorMaker on ECS Fargate for improved resilience, scalability, and maintainability.
For more background on Kafka Connect, please see kafka-connect.
The MirrorMaker task running in Kafka Connect reads from configured source cluster, replicating topics, consumer group offsets, and ACLs 1-1 to the target cluster
ECS services are deployed privately in a multi-AZ configuration, with autoscaling based on task CPU to automatically scale to meet Kafka cluster load and ensure fault tolerance when tasks fail
Prometheus is used to scrape metrics from Kafka Connect tasks to monitor replication latency and task status over time
Grafana is used to visualize Prometheus metrics
Consumers can be migrated to the target cluster over time as topics and consumer groups are kept in sync by Kafka Connect. Once consumers are migrated to the target cluster, producers can migrate as well.
Bastion host or virtual desktop are used to access private resources, such as configuring Kafka Connect tasks and monitoring replica lag in Grafana
This project relies on Docker images running in ECS Fargate to deploy Kafka Connect, Prometheus, and Grafana.
The build_images.sh
script will build and deploy the Kafka Connect and Prometheus images to ECR repositories. It requires that the ECR repositories have already been created, and are named kafka-connect
and prometheus
. The Terraform resources will create the ECR repositories on your behalf - please see automated build instructions for more information.
Usage:
./build_images.sh ACCOUNT_ID REGION
./build_images.sh 012345678910 us-east-1
The build script includes environment variabls to build AMD x86 images, even when running on ARM hosts. If you choose to build and deploy your images manually without the build script, please ensure you build AMD x86 images:
DOCKER_DEFAULT_PLATFORM="linux/amd64" docker build .
The root folder contains the definitions for CustomMM2ReplicationPolicy and Centos-based Java dependencies necessary for running Kafka Connect in the Dockerfile, as outlined below. It also contains the Kafka Connect configuration examples for MirrorMaker tasks in a variety of scenarios (such as IAM authentication, mTLS authentication, etc.).
The prometheus folder contains a custom Prometheus image that includes the necessary scrape targets and intervals to gather Prometheus metrics from the Kafka brokers.
First, we need to build the backend infrastructure (ECS tasks, Kafka clusters, etc) for the migration tasks. We can do this either with the automated build scripts, or manually.
The majority of the required infrastructure for this example can be built and deployed using the Terraform source located in terraform/. The only thing not provisioned in the Terraform example are the VPC to deploy in, and the build/push of the Docker images. After the Terraform has been deployed, the images can be automatically built using the provided build script to build and push to ECR.
cd terraform/
terraform init
terraform apply -var-file main.tfvars
cd ..
./build_images.sh 012345678910 us-east-1
Finally, you will need to deploy the Kafka Connect tasks (see below).
Please see the manual build instructions for steps on deploying infrastructure manually via the AWS CLI.
Once the infrastructure is deployed and our ECS tasks reach the RUNNING state, we can set up the monitoring and MirrorMaker tasks. To access the ECS tasks, ensure you have an SSH tunnel/proxy running to set up the connectivity, or use a bastion host / Amazon WorkSpaces virtual desktop.
To make a ssh tunnel to your Amazon EC2 bastion and specify the port your proxy is using:
ssh -i privatekey.pem ec2-user@ec2-xx-xxx-xxx-xxx.compute-1.amazonaws.com -ND 8157
Navigate to http://prometheus.monitoring:9090 and verify you can view main page. Note that this URL may differ if you used the automated build - the URLs for these services can be found in the terraform outputs.
Navigate to http://graphana.monitoring:3000 and verify you can view dashboard
* The default username and password is `admin`
Add a new source:
http://prometheus.monitoring:9090
as URL 3-Click Test and Save buttonImport the grafana/MM2-dashboard-1.json monitoring dashboard
If you used the manual build, clone this repository on your instance
git clone https://github.com/aws-samples/mirrormaker2-msk-migration.git
If you used the automated build, copy the configured task definitions from S3 to your instance - the S3 URIs for these files can be found in the terraform outputs:
aws s3 cp s3://my-config-bucket/connector/mm2-msc-iam-auth.json .
aws s3 cp s3://my-config-bucket/connector/mm2-hbc-iam-auth.json .
aws s3 cp s3://my-config-bucket/connector/mm2-cpc-iam-auth.json .
Edit the connector json files in configurations directory with your broker addresses if not already populated.
Run the source connector, Example for IAM:
curl -X PUT -H "Content-Type: application/json" --data @mm2-msc-iam-auth.json http://kafkaconnect.migration:8083/connectors/mm2-msc/config | jq '.'
Check the status of the connector to make sure it's running:
curl -s kafkaconnect.migration:8083/connectors/mm2-msc/status | jq .
Repeat steps 3&4 for two other connectors:
curl -X PUT -H "Content-Type: application/json" --data @mm2-cpc-iam-auth.json http://kafkaconnect.migration:8083/connectors/mm2-cpc/config | jq '.'
curl -s kafkaconnect.migration:8083/connectors/mm2-cpc/status | jq .
curl -X PUT -H "Content-Type: application/json" --data @mm2-hbc-iam-auth.json http://kafkaconnect.migration:8083/connectors/mm2-hbc/config | jq '.'
curl -s kafkaconnect.migration:8083/connectors/mm2-hbc/status | jq .
If you need help running a sample Kafka producer / Consumer, refer to MSK Labs Migration Workshop
There are a few main use cases for MirrorMaker2 in migrations:
There are a few key components to the overall cost of running Kafka Connect on ECS Fargate to run MirrorMaker2:
Baseline ECS Costs
There are 3 ECS services recommended for running Kafka Connect and MirrorMaker2:
Each service may have 1 or more tasks. For Prometheus and Grafana, 2-3 tasks may be used for high availability. These services do not have a backend data store configured, and therefore aren't persistent across reboots. Adding persistent storage would increase the cost of the solution.
The ECS costs will be based on the total number of ECS tasks and their CPU/memory configuration, as outlined in the ECS Fargate pricing page.
Scaling Considerations
For Kafka Connect, the service is autoscaled based on the load in the Kafka cluster being replicated. This can be capped to a maximum task limit to limit the overall cost of the solution.
In MirrorMaker2, the Kakfa Connect tasks are used to consume from the cluster partitions. For example:
* 10 partitions, 5 MirrorMaker2 tasks yields 2 partitions per task
* 10 partitions, 10 MirrorMaker2 tasks yields 1 partition per task
ECS will autoscale the number of ECS tasks, and therefore MirrorMaker2 tasks, based on the CPU of the ECS tasks. At most, for a cluster with X
partitions you can expect a total of X
Kafka Connect tasks, and therefore X/10
ECS tasks for Kafka Connect (assuming tasks.max=10
).
MSK Costs
During the migration you will use an MSK cluster for storing the replicated topic data. You can use this blog to help with right sizing your cluster and understanding cluster costs.
There are several MirrorMaker settings for the MirrorSourceConnector (MSC) and MirrorCheckpointConnector (CPC) tasks that can be used to fine-tune replication:
Config | Default Setting in Sample | Description |
---|---|---|
replication.policy.class |
...CustomMM2ReplicationPolicy |
Custom Java code to rename topics from the source cluster to the destination cluster. Allows changing or not changing topic names to assist with producer/consumer logic in migration. |
tasks.max |
4 |
The overall number of Kafka Connect tasks running across all distributed worker nodes. The ideal setting for this allows for 5-10 partitions per task (e.g. tasks.max = Total Partition Count / 5 ). |
replication.factor |
3 |
The replication factor for newly created topics - set based on the configuration of the destination cluster. |
offset-syncs.topic.replication.factor |
3 |
The replication factor for the internal MirrorMaker topic used to replicate offsets to the destination cluster - set based on the configuration of the destination cluster. |
sync.topic.acls.interval.seconds |
600 |
Frequency of the ACL sync. Setting too low can cause disruption in the source cluster due to the overhead caused by high frequency ACL polling. |
sync.topic.configs.interval.seconds |
600 |
Frequency of the topic configuration sync. Setting too low can cause disruption in the source cluster due to the overhead caused by high frequency config polling. |
refresh.topics.interval.seconds |
300 |
Frequency of finding and replicating newly added topics. Setting too low can cause disruption in the source cluster due to the overhead caused by high frequency topic polling. |
refresh.groups.interval.seconds |
20 |
Frequency of the Kafka consumer group sync. Setting too low can cause disruption in the source cluster due to the overhead caused by high frequency group polling |
producer.enable.idempotence |
true |
Gives the MirrorMaker2 Producers PIDs and message sequence numbers that allows Kafka brokers to reject messages if lower sequence numbers are ever recieved (the default behavior as of Kafka 3.0). |
max.poll.records |
50,000 |
Maximum number of records returned in a single poll operation. Can tune based on your throughput and message size, but generally can be a sensible default. |
receive.buffer.bytes |
33,554,432 |
TCP receive buffer size. Kafka default is 64 KB so a higher value here allows for less disruption in high throughput workloads. |
send.buffer.bytes |
33,554,432 |
TCP send buffer size. Kafka default is 128 KB so a higher value here allows for less disruption in high throughput workloads. |
max.partition.fetch.bytes |
3,355,4432 |
The maximum amount of data returned for a single partition - set to match the send/receive buffer sizes since we always read and write the same amount of data for MirrorMaker2. |
message.max.bytes |
37,755,000 |
Broker level setting for maxium message size, maybe unneeded here? |
compression.type |
gzip |
Broker level setting for compression, maybe unneeded here? |
max.request.size |
26,214,400 |
Max request size from a producer - must be large than the largest message being sent (e.g. max.message.bytes in the topic and message.max.bytes in the broker). |
buffer.memory |
524,288,000 |
In memory buffer to store messages for batching and if messages can't be sent to the broker. Allows for higher throughput by sending larger message batches. After reaching this buffer size and wiating max.block.ms the producer will shut down. Can change want to tune producer shutdown behavior during a broker disruption. |
batch.size |
524,288 |
The maximum amount of data that can be sent in a single request. Should be smaller than the buffer memory to allow efficient use of the buffer. |
Config | Default Setting in Sample | Description |
---|---|---|
tasks.max |
1 |
Should always be 1 for CPC |
replication.factor |
3 |
Replication factor for new topics - set based on the configuration of the destination cluster. |
checkpoints.topic.replication.factor |
3 |
Replication factor for the internal offset tracking topic - set based on the configuration of the destination cluster. |
emit.checkpoints.interval.seconds |
20 |
Frequency of retrieving and syncing consumer group offsets to the replication topic. |
sync.group.offsets.interval.seconds |
20 |
Frequency of syncing consumer group offsets in the destination cluster. |
sync.group.offsets.enabled |
true |
Whether or not to automatically sync offsets to the destination consumer groups (vs. only tracking in the offset topic). Can set to false if you want to manually manage offsets in the destination. |
Consumer group offsets are replicated by the MirrorMaker2 CPC task. The offsets are replicated periodically while there is an active consumer in the souce cluster. This means that when the consumer is stopped on the source cluster, the offsets will stop being replicated. This will cause duplicate messages to be consumed when the consumer starts on the target cluster. The number of duplicate messages will depend on the frequency of the offset sync.
The offsets are replicated based on two configurations:
Config | Default Setting in Sample | Description |
---|---|---|
emit.checkpoints.interval.seconds |
20 |
Frequency of retrieving and syncing consumer group offsets to the replication topic. |
sync.group.offsets.interval.seconds |
20 |
Frequency of syncing consumer group offsets in the destination cluster. |
For example, if these are set to 20 seconds, then there may be up to 20 seconds of data that are consumed and committed on the source cluster, but not be sync'ed to the target cluster. To minimize these duplicates, you can reduce these intervals. Note that setting these lower can impact performance on the source/target cluster. We recommend monitoring cluster CPU/memory closely if you reduce these below 20 seconds.
To understand the current replicated offset positions in the source/target cluster, you can use the kafka-consumer-groups.sh
script provided by Kafka:
bin/kafka-consumer-groups.sh \
--describe \
--bootstrap-server "${CLUSTER_BROKER_CONNECTION_STRING}" \
--group "${CONSUMER_GROUP_NAME}" \
--command-config client.properties
This functionality may change in the future, as the Kafka community is actively investigating methods to further reduce offset replication delay (KAFKA-16364).
kafka_connect_source_task_source_record_active_count
) to see if records are evenly balanced across tasks, and that tasks generally have 2,000-20,000 records each. If tasks have more than this, it's possible that they are over-provisioned with partitions.tasks.max
to balanace partitions across the tasks so that each task gets 5-10 partitions each.After validating these settings if your replication latency is still high, you may need to investigate the settings for max.message.bytes
and message.max.bytes
to ensure that buffers are sized correctly for the Kafka Connect producer and consumer sides in the MSC settings.
tasks.max
setting?Kafka Connect is built to be a distributed framework scaling horizontally across
task compute nodes. To scale Kafka Connect, we simply add more worker nodes to load balance tasks across workers.
MirrorMaker2 should size tasks.max
so that each MirrorSourceConnector task has 5-10 partitions to replicate.
kafka_connect_mirror_source_connector_replication_latency_ms_avg
:kafka_consumer_fetch_manager_records_lag
:kafka_connect_mirror_source_connector_record_age_ms_avg
:kafka_consumer_fetch_manager_fetch_latency_avg
:kafka_connect_source_task_source_record_active_count
:tasks.max
.