Migrate and Validate Tables between Origin and Target Cassandra Clusters.
:warning: Please note this job has been tested with spark version 3.5.3
cassandra-data-migrator
+ dsbulk
+ cqlsh
) would be available in the /assets/
folder of the container3.5.3
on a single VM (no cluster necessary) where you want to run this job. Spark can be installed by running the following: -
wget https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3-scala2.13.tgz
tar -xvzf spark-3.5.3-bin-hadoop3-scala2.13.tgz
:warning: If the above Spark and Scala version is not properly installed, you'll then see a similar exception like below when running the CDM jobs,
Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.Statics.releaseFence()V
:warning: Note that Version 4 of the tool is not backward-compatible with .properties files created in previous versions, and that package names have changed.
cdm.properties
file needs to be configured as applicable for the environment. Parameter descriptions and defaults are described in the file. The file can have any name, it does not need to be cdm.properties
.
- A simplified sample properties file configuration can be found here as cdm.properties
- A complete sample properties file configuration can be found here as cdm-detailed.properties
spark-submit
command as shown below:spark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.Migrate cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
Note:
logfile_name_*.txt
to avoid log output on the console.target
keyspace, pass param --conf spark.cdm.trackRun=true
Migration
or Validation
jobs --conf spark.cdm.filter.cassandra.partition.min=<token-range-min>
--conf spark.cdm.filter.cassandra.partition.max=<token-range-max>
--class com.datastax.cdm.job.DiffData
as shown belowspark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.DiffData cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
23/04/06 08:43:06 ERROR DiffJobSession: Mismatch row found for key: [key3] Mismatch: Target Index: 1 Origin: valueC Target: value999)
23/04/06 08:43:06 ERROR DiffJobSession: Corrected mismatch row in target: [key3]
23/04/06 08:43:06 ERROR DiffJobSession: Missing target row found for key: [key2]
23/04/06 08:43:06 ERROR DiffJobSession: Inserted missing row in target: [key2]
ERROR
from the output log files to get the list of missing and mismatched records.
missing
and mismatched
rows) into a separate file, you could use the log4j2.properties
file provided here as shown belowspark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--conf spark.executor.extraJavaOptions='-Dlog4j.configurationFile=log4j2.properties' \
--conf spark.driver.extraJavaOptions='-Dlog4j.configurationFile=log4j2.properties' \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.DiffData cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
spark.cdm.autocorrect.missing false|true
spark.cdm.autocorrect.mismatch false|true
Note:
spark.cdm.trackRun.previousRunId
param as shown belowspark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--conf spark.cdm.trackRun.previousRunId=<prev_run_id> \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.<Migrate|DiffData> cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
--class com.datastax.cdm.job.GuardrailCheck
as shown belowspark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--conf spark.cdm.feature.guardrail.colSizeInKB=10000 \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.GuardrailCheck cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
validation
job, it will include any token-ranges that had differences in the previous run Origin
using writetime
and/or CQL conditions and/or a list of token-rangesconstants
as new columns on Target
Map
columns on Origin
into multiple records on Target
Origin
and map it to a specific field on Target
Origin
(Apache Cassandra® / DataStax Enterprise™ / DataStax Astra DB™) to any Cassandra Target
(Apache Cassandra® / DataStax Enterprise™ / DataStax Astra DB™)writetime
and/or ttl
cdm_run_info
and cdm_run_details
) on the target keyspacecdm_run_info
and cdm_run_details
in the target keyspace.ttl
& writetime
at the field-level (for optimization reasons). It instead finds the field with the highest ttl
& the field with the highest writetime
within an origin
row and uses those values on the entire target
row.ttl
& writetime
calculations by default for performance reasons. If you want to include such fields, set spark.cdm.schema.ttlwritetime.calc.useCollections
param to true
.ttl
configuration, the target will have no ttl
, which can lead to inconsistencies between origin
and target
as rows expire on origin
due to ttl
expiry. If you want to avoid this, we recommend setting spark.cdm.schema.ttlwritetime.calc.useCollections
param to true
in such scenarios.writetime
used on target will be time the job was run. If you want to avoid this, we recommend setting spark.cdm.schema.ttlwritetime.calc.useCollections
param to true
in such scenarios. list
type columns. Note this is due to a Cassandra/DSE bug and not a CDM issue. This issue can be addressed by enabling and setting a positive value for spark.cdm.transform.custom.writetime.incrementBy
param. This param was specifically added to address this issue.cdm_run_info
will be only for the current run. If the previous run was killed for some reasons, its run metrics may not have been saved. If the previous run did complete (not killed) but with errors, then you will have all run metrics from previous run as well.Origin
OR Target
clusterOrigin
cluster, large partitions (> 100 MB), large rows (> 10MB) and/or high column count.numParts
: Default is 5K, but ideal value is usually around table-size/10MB. batchSize
: Default is 5, but this should be set to 1 for tables where primary-key=partition-key OR where average row-size is > 20 KB. Similarly, this should be set to a value > 5, if row-size is small (< 1KB) and most partitions have several rows (100+).fetchSizeInRows
: Default is 1K & this usually fine. However you can reduce this if your table has many large rows (over 100KB).ratelimit
: Default is 20K. Once you set all the other properties appropriately, set this value to the highest possible value that your cluster (origin & target) is able to handle.constantColumns
, explodeMap
, extractJson
), transformation functions and/or where-filter-conditions (except partition min/max) may negatively impacts performanceOrigin
and Target
cluster.cd cassandra-data-migrator
mvn clean package
(Needs Maven 3.9.x)cassandra-data-migrator-4.x.x.jar
) file should now be present in the target
folderCheckout all our wonderful contributors here.