scylladb / scylla-migrator

Migrate data extract using Spark to Scylla, normally from Cassandra/parquet files. Alt. from DynamoDB to Scylla Alternator.
https://migrator.docs.scylladb.com/stable/
Apache License 2.0
55 stars 34 forks source link

migrator exit with exception when collection data-type is found #28

Closed pidiaquez closed 3 years ago

pidiaquez commented 3 years ago

Hi

we faced the following Exception, related to collection data type

Exception in thread "main" java.lang.Exception: TTL/Writetime preservation is unsupported for tables with collection types"

If table doesn't use TTL, can this exception be safely skip?

can the TTL be set for all values from a collection via an update with TTL after data is migrated?

suspected tables:

CREATE TABLE ks.orders (
id text PRIMARY KEY,
createdat timestamp,
customer frozen<customer>,
deliveryaddress frozen<address>,
deliveryfee float,
discounts text,
items set<frozen<item>>,
merchant frozen<merchant>,
ordershortnumber text,
payment frozen<payment>,
status text,
subtotal float,
takeout boolean,
totalprice float
) WITH bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'ALL'}
AND comment = ''
AND compaction = {'class': 'SizeTieredCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';

CREATE TYPE ks.item (
quantity int,
name text,
price float,
observations text,
subitems set<frozen<subitem>>,
seq int
);

Thanks

Pablo

tarzanek commented 3 years ago

@pidiaquez I think you should be able to just disable writing of timestamps in such case, shouldn't you? in config:

preserveTimestamps: true

flip to false

pidiaquez commented 3 years ago

attempted the transfer with preserveTimestamps: true

and got a different error stack

Caused by: java.lang.RuntimeException: org.apache.spark.unsafe.types.UTF8String is not a valid external type for schema of string

using Spark 2.4.4

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else mapobjects(MapObjects_loopValue8, MapObjects_loopIsNull8, ObjectType(class java.lang.Object), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObjects_loopValue8, MapObjects_loopIsNull8, ObjectType(class java.lang.Object), true), StringType), true, false), validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 19, tags), ArrayType(StringType,true)), None) AS tags#19
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 20, taxonomy_name), StringType), true, false) AS taxonomy_name#20
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 21, taxonomy_score), DoubleType) AS taxonomy_score#21
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at com.datastax.spark.connector.util.CountingIterator.next(CountingIterator.scala:16)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:105)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:30)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:30)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$2.apply(TableWriter.scala:241)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$2.apply(TableWriter.scala:210)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:129)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:188)
at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:175)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:38)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:38)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: org.apache.spark.unsafe.types.UTF8String is not a valid external type for schema of string
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_2_3$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:289)
... 30 more
tarzanek commented 3 years ago

hello, this last stack was fixed in latest scylla-migrator, please update from master