scylladb / scylla-migrator

Migrate data extract using Spark to Scylla, normally from Cassandra
Apache License 2.0
54 stars 34 forks source link

Dynamo DB struc column convert issue #71

Closed stjack closed 2 years ago

stjack commented 2 years ago

There is a source column in Dynamo DB without explicit element, printed result is .... |-- name: text(nullable = true) |-- role: struct (nullable = true) |-- permission_live: boolean (nullable = true)

I define role as map, list , set, all failed with below error CREATE TYPE devdb.role_type( role text -- placeholder only ); ALTER TABLE devdb.dev_db_4RD ADD role frozen<map<text,role_type>>;

Anyone know why ?

22/03/26 21:41:14 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 172.18.0.5, executor 0): com.datastax.spark.connector.types.TypeConversionException: Cannot convert object [] of type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema to Map[AnyRef,com.datastax.spark.connector.UDTValue]. at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:46) at com.datastax.spark.connector.types.TypeConverter$CollectionConverter$$anonfun$convertPF$37.applyOrElse(TypeConverter.scala:698) at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:44) at com.datastax.spark.connector.types.TypeConverter$CollectionConverter.convert(TypeConverter.scala:686) at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter$$anonfun$convertPF$38.applyOrElse(TypeConverter.scala:902) at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:44) at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.com$datastax$spark$connector$types$NullableTypeConverter$$super$convert(TypeConverter.scala:885) at com.datastax.spark.connector.types.NullableTypeConverter$class.convert(TypeConverter.scala:57) at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.convert(TypeConverter.scala:885) at com.datastax.spark.connector.writer.SqlRowWriter$$anonfun$readColumnValues$1.apply$mcVI$sp(SqlRowWriter.scala:26) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:24) at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:12) at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:102) at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:105) at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:30) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:30) at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$2.apply(TableWriter.scala:241) at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$2.apply(TableWriter.scala:210) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111) at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:129) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111) at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210) at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:188) at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:175) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:38) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:38) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

stjack commented 2 years ago

I use scylla mode, which is based on cassandra, may cause this issue

gongcon commented 1 year ago

@stjack how did you work around this?

hopugop commented 1 year ago

@gongcon when migrating from DynamoDB as a source, Scylla Migrator only supports the target being the Scylla Alternator API (typed dynamodb). If you specify the target as type scylla, it will try to use CQL to write data and it's currently not supported.