scylladb / scylla-migrator

Migrate data extract using Spark to Scylla, normally from Cassandra/parquet files. Alt. from DynamoDB to Scylla Alternator.
https://migrator.docs.scylladb.com/stable/
Apache License 2.0
55 stars 34 forks source link

java.lang.RuntimeException: Error while encoding UDTValue #30

Closed anonymoose closed 3 years ago

anonymoose commented 3 years ago

Issue

When running scylla-migrator to copy a table with a user defined type, scylla-migrator throws an exception.

scylla-migrator configuration

source:
  type: cassandra
  host: 172.20.111.11
  port: 9042
  credentials:
    username: cass
    password: "xxx"
  keyspace: qa3
  table: zavil
  preserveTimestamps: false
  splitCount: 350
  connections: 16
  fetchSize: 3000
target:
  type: cassandra
  host: 172.20.111.22
  port: 9042
  credentials:
    username: cass
    password: "xxx"
  keyspace: qa2
  table: zavil
  connections: 16

Keyspace / Table config

#
# keyspace config
#
CREATE KEYSPACE qa2 WITH replication = {'class': 'NetworkTopologyStrategy', 'DC1': '3', 'DC2': '3', 'DC3': '3'}  AND durable_writes = true;

CREATE TYPE qa2.max_loc (
    lid text,
    ltype text
);
CREATE TABLE qa2.zavil (
    ktu bigint,
    zc text,
    max_loc set<frozen<maximal_location>>,
    mts timestamp,
    musr text,
    z_avil decimal,
    PRIMARY KEY ((ktu, zc))
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {‘keys’: ‘ALL’, ‘rows_per_partition’: ‘NONE’}
    AND comment = ‘’
    AND compaction = {‘class’: ‘org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy’, ‘max_threshold’: ‘32’, ‘min_threshold’: ‘4’}
    AND compression = {‘chunk_length_in_kb’: ‘64’, ‘class’: ‘org.apache.cassandra.io.compress.DeflateCompressor’}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = ‘99PERCENTILE’;

Exception

20/10/02 14:00:57 WARN TaskSetManager: Lost task 122.0 in stage 0.0 (TID 122, 10.16.111.22, executor 5): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: com.datastax.spark.connector.UDTValue is not a valid external type for schema of struct<locationid:string,locationtype:string>
validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, sku), LongType) AS sku#0L
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, zip_code), StringType), true, false) AS zip_code#1
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class java.lang.Object), if (isnull(validateexternaltype(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class java.lang.Object), true), StructField(locationid,StringType,true), StructField(
locationtype,StringType,true)))) null else named_struct(locationid, if (validateexternaltype(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class java.lang.Object), true), StructField(locationid,StringType,true), StructField(locationtype,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, S
tringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class java.lang.Object), true), StructField(locationid,StringType,true), StructField(locationtype,StringType,true)), 0, locationid), StringType), true, false), locationtype, if (validateexternaltype(lambdav
ariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class java.lang.Object), true), StructField(locationid,StringType,true), StructField(locationtype,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(
MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class java.lang.Object), true), StructField(locationid,StringType,true), StructField(locationtype,StringType,true)), 1, locationtype), StringType), true, false)), validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, maximal_locations), ArrayType(StructType(S
tructField(locationid,StringType,true), StructField(locationtype,StringType,true)),true)), None) AS maximal_locations#2
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, modify_ts), TimestampType), true, false) AS modify_ts#3
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, modify_user), StringType), true, false) AS modify_user#4
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else CheckOverflow(staticinvoke(class org.apache.spark.sql.types.Decimal$, DecimalType(38,18), fromDecimal, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, zip_availability), DecimalType(38,18)), true, false), DecimalType(38,18)) AS zip
_availability#5
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
        at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
        at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at com.datastax.spark.connector.util.CountingIterator.next(CountingIterator.scala:16)
        at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:105)
        at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:30)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:30)
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$2.apply(TableWriter.scala:241)
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$2.apply(TableWriter.scala:210)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
        at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:129)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
        at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
        at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:188)
        at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:175)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:38)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:38)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: com.datastax.spark.connector.UDTValue is not a valid external type for schema of struct<locationid:string,locationtype:string>
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:289)
        ... 30 more
tarzanek commented 3 years ago

31 should fix this ...

iravid commented 3 years ago

@tarzanek Not yet, I think this is a different problem :-)

iravid commented 3 years ago

Latest master should fix this 👍 Thank you for your patience!