scylladb / scylla-migrator

Migrate data extract using Spark to Scylla, normally from Cassandra
Apache License 2.0
54 stars 34 forks source link

Improve documentation surrounding tuning and scaling scylla-migrator for DynamoDB migration #133

Open pdbossman opened 2 months ago

pdbossman commented 2 months ago

Background With the prior version of DynamoDB -> Alternator, scanSegments translated to partitions. The recommendation for scanSegments was to take total bytes of source table from DynamoDB describe, and divide by 128MB (128000000). There was also a desire to round-up so the number of scanSegments was divisible by number of executors with the goal of uniformly distributing work.

128MB was mentioned many places in spark about a size of work that's large enough to justify scheduling overhead, but not too big to overwhelm spark. With 500GB+ datasets, the number of scanSegments (which translated directly to partitions) was much larger then the number of parallel tasks.

It was very useful to over-provision the migration hardware and start a certain number of workers to achieve a particular velocity, and if the source/target would tolerate it, increase the number of workers to go faster. In my testing, with 128MB chunks, adding workers was very much like a step function in velocity of the migration. If you had 4 workers and you added 2 more with same CPU, etc - you increased throughput by 50%. This predictable behavior was/is very desirable. I mention over-provisioning, as the migration instances are only needed during setup, configuration, then the migration itself, and they can be stopped when not in use. Basically, the cost of more powerful instances for a small amount of time seems worth it to reduce the time humans are spent waiting/monitoring/tending the migration, (just remember to stop them when not in use).

Ask All that TL;DR above brings me to - I don't know how to set parameters to achieve similar predictability with the new code. ScanSegments don't work the same way. I don't know how to get 128MB partitions anymore, and I am not sure whether adding workers would take on additional work...

To use the new code, I need to know how to control the velocity and increase the velocity and what parameters influence the process - in creating tasks for the spark executors to pick up, and how to ensure we create enough tasks that adding workers can increase the velocity in predictable chunks.

If possible, please provide guidance in this regard.

julienrf commented 2 months ago

Here are my notes after investigating which parameters play a role in the performance of reading/writing DynamoDB tables using the emr-dynamodb-connector.

The number of task mappers (which directly translates into the number of RDD partitions) is first defined by the memory capacity of the workers. However, I am not sure the way the memory capacity is estimated is correct because the emr-dynamodb-connector expects to be used within an EMR environment where the memory capacity is configured as described here.

Then, the number of task mappers is capped to make sure it does not go beyond the table provisioned throughput. However, the way we currently configure the read throughput looks wrong to me.

Last, the number of task mappers is capped to make sure it is not higher than the configured number of maximum mappers set by maxMapTasks in file config.yaml.

Note that this logic has changed a little bit in newer versions of the emr-dynamodb-connector. The memory constraint is used only if the Spark cluster is managed by Yarn.

pdbossman commented 2 months ago

I expect us to get the output from describe table. So I expect item size, number of items. I also typically have a duration that's desirable - for example kinesis streams of kafka, customers might have a 24h limit. So they want the data copy (scylla-migrator) portion to finish within 24h. I actually lower that since we want to make sure we don't even approach that limit.

So I want to size a cluster - the master, slaves, number of workers, processors, memory to move x data in y hours - and I'd generally over-provision the cluster to give myself the opportunity to crush the deadline if the source system can handle the pressure from the migration. The cost of instances for a few days typically pales in comparison to human resources involved in planning, and executing the migration.

pdbossman commented 2 months ago

The functional aspect is estimating number of partitions to create chunks of work an executor can handle - and sizing the executor with memory and cpus to handle that work.

Getting to throughput becomes - how many of those executors to achieve a particular velocity.

The assignment of executors to workers - some might just use entire capacity of node for a worker, I used it as a step function to control velocity (and pressure on source/target systems).