datastax / cassandra-data-migrator

Cassandra Data Migrator - Migrate & Validate data between origin and target Apache Cassandra®-compatible clusters.
Apache License 2.0
28 stars 18 forks source link

CDM run fails with the below error for complex tables when trying to preserve TTL and Writetime using properties #300

Closed ashu9041 closed 2 weeks ago

ashu9041 commented 2 months ago

CDM run fails with the below error for complex tables when trying to preserve TTL and Writetime using properties. Performed the below steps :

  1. Update below property in cdm.properties- spark.cdm.schema.origin.column.ttl.names complex_column spark.cdm.schema.origin.column.writetime.names complex_column

Table desc - CREATE TABLE keyspacename.table1 ( column1 uuid, column2 bigint, column3 bigint, column4 timestamp, complex_column set, PRIMARY KEY ((column1 , column2 , column3 ), column4 ) ) WITH CLUSTERING ORDER BY (column4 DESC) CREATE CUSTOM INDEX table1_sai ON table1 (column1 ) USING 'StorageAttachedIndex';

ERROR ENCOUNTERED

24/08/30 10:00:09 ERROR WritetimeTTL: TTL column emp_group is not a column which can provide a TTL on origin table kpi_embedded_ops_analytics 24/08/30 10:00:09 ERROR WritetimeTTL: Writetime column emp_group is not a column which can provide a WRITETIME on origin table kpi_embedded_ops_analytics 24/08/30 10:00:09 INFO WritetimeTTL: Feature WritetimeTTL is disabled 24/08/30 10:00:09 ERROR CopyJobSession: Feature com.datastax.cdm.feature.WritetimeTTL is not valid. Please check the configuration. Exception in thread "main" java.lang.RuntimeException: One or more features are not valid. Please check the configuration. at com.datastax.cdm.job.AbstractJobSession.(AbstractJobSession.java:89) at com.datastax.cdm.job.AbstractJobSession.(AbstractJobSession.java:46) at com.datastax.cdm.job.CopyJobSession.(CopyJobSession.java:54) at com.datastax.cdm.job.CopyJobSessionFactory.getInstance(CopyJobSessionFactory.java:28) at com.datastax.cdm.job.Migrate$.$anonfun$execute$2(Migrate.scala:27) at com.datastax.cdm.job.Migrate$.$anonfun$execute$2$adapted(Migrate.scala:26) at com.datastax.spark.connector.cql.CassandraConnector.$anonfun$withSessionDo$1(CassandraConnector.scala:104) at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:121) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:103) at com.datastax.cdm.job.Migrate$.$anonfun$execute$1(Migrate.scala:26) at com.datastax.cdm.job.Migrate$.$anonfun$execute$1$adapted(Migrate.scala:25) at com.datastax.spark.connector.cql.CassandraConnector.$anonfun$withSessionDo$1(CassandraConnector.scala:104) at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:121) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:103) at com.datastax.cdm.job.Migrate$.execute(Migrate.scala:25) at com.datastax.cdm.job.Migrate$.delayedEndpoint$com$datastax$cdm$job$Migrate$1(Migrate.scala:20) at com.datastax.cdm.job.Migrate$delayedInit$body.apply(Migrate.scala:18) at scala.Function0.apply$mcV$sp(Function0.scala:39) at scala.Function0.apply$mcV$sp$(Function0.scala:39) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17) at scala.App.$anonfun$main$1(App.scala:76) at scala.App.$anonfun$main$1$adapted(App.scala:76) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) at scala.collection.AbstractIterable.foreach(Iterable.scala:926) at scala.App.main(App.scala:76) at scala.App.main$(App.scala:74) at com.datastax.cdm.job.BaseJob.main(BaseJob.scala:32) at com.datastax.cdm.job.Migrate.main(Migrate.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1029) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 24/08/30 10:00:09 INFO SparkContext: Invoking stop() from shutdown hook 24/08/30 10:00:09 INFO SparkContext: SparkContext is stopping with exitCode 0. 24/08/30 10:00:09 INFO SparkUI: Stopped Spark web UI at http://keng03-dev01-ast78-mgr-1724989528-1.int.dev.mykronos.com:4040 24/08/30 10:00:09 INFO CassandraConnector: Disconnected from Cassandra cluster. 24/08/30 10:00:09 INFO CassandraConnector: Disconnected from Cassandra cluster. 24/08/30 10:00:09 INFO SerialShutdownHooks: Successfully executed shutdown hook: Clearing session cache for C* connector 24/08/30 10:00:09 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 24/08/30 10:00:09 INFO MemoryStore: MemoryStore cleared 24/08/30 10:00:09 INFO BlockManager: BlockManager stopped 24/08/30 10:00:09 INFO BlockManagerMaster: BlockManagerMaster stopped 24/08/30 10:00:09 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/08/30 10:00:09 INFO SparkContext: Successfully stopped SparkContext 24/08/30 10:00:09 INFO ShutdownHookManager: Shutdown hook called 24/08/30 10:00:09 INFO ShutdownHookManager: Deleting directory /tmp/spark-34001d74-f100-4cbb-a75c-9d5530bb442b 24/08/30 10:00:09 INFO ShutdownHookManager: Deleting directory /tmp/spark-b3fbf831-542a-40bd-af3b-1955c10e02f7

pravinbhat commented 1 month ago

Hi @ashu9041

The table definition is not complete i.e. you have not provided the details of what data-type the column emp_group holds within the set. Also, I see that you have set the below properties spark.cdm.schema.origin.column.ttl.names emp_group spark.cdm.schema.origin.column.writetime.names emp_group

Can you elaborate why you are trying to use the set column to compute the ttl and writetime? If you do not set the above properties, CDM will auto calculate the ttl and writetime from other columns. Note, currently CDM does not support collection columns (like set, list & map) for computing ttl and writetime.

ashu9041 commented 1 month ago

Hi @pravinbhat,

Please find the table definition. The complex_column column contains a set of bigint data. In Astra, since we can retrieve the ttl and writetime for complex type tables, we attempted to migrate these using CDM. However, apart from complex_column , the other columns are part of the primary key, so CDM will not calculate the ttl and writetime for those columns. That's why we added the above properties.

CREATE TABLE keyspacename.table1 ( column1 uuid, column2 bigint, column3 bigint, column4 timestamp, complex_column set, PRIMARY KEY ((column1 , column2 , column3 ), column4 ) ) WITH CLUSTERING ORDER BY (column4 DESC)

pravinbhat commented 1 month ago

Hi @ashu9041

Thanks for providing the schema. As you can see the only non PK column we have above is emp_group set, and CDM is unable to auto calculate ttl & writetime from such fields. In such scenarios, CDM will not auto-calc ttl & writetime & go with the defaults i.e. currenttime for writetime and table level ttl or -1 if no table level ttl is defined.

Since you are seeing the above exception, it looks like its trying to do some writetime calc, can you share the props file you are using as well as the full log output which includes the above error.