scylladb / scylla-migrator

Migrate data extract using Spark to Scylla, normally from Cassandra/parquet files. Alt. from DynamoDB to Scylla Alternator.
https://migrator.docs.scylladb.com/stable/
Apache License 2.0
55 stars 34 forks source link

Error migrating Null collumns #26

Closed fcosta-td closed 3 years ago

fcosta-td commented 3 years ago

When running scylla-migrator i am getting the following error on a table with 200 records that allows null values:

20/10/06 08:58:41 WARN TaskSetManager: Lost task 8.16 in stage 0.0 (TID 122, 172.17.0.3, executor 0): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: org.apache.spark.unsafe.types.UTF8String is not a valid external type for schema of string
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, app_id), StringType), true, false) AS app_id#0
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, app_version_id), StringType), true, false) AS app_version_id#1
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, active), BooleanType) AS active#2
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, created), TimestampType), true, false) AS created#3
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, destination), StringType), true, false) AS destination#4
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class java.lang.Object), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class java.lang.Object), true), StringType), true, false), validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, events), ArrayType(StringType,true)), None) AS events#5
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, modified), TimestampType), true, false) AS modified#6
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, shared_secret_id), StringType), true, false) AS shared_secret_id#7
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
    at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593)
    at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at com.datastax.spark.connector.util.CountingIterator.next(CountingIterator.scala:16)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:105)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:30)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:30)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$2.apply(TableWriter.scala:241)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$2.apply(TableWriter.scala:210)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:129)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
    at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
    at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:188)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:175)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:38)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:38)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: org.apache.spark.unsafe.types.UTF8String is not a valid external type for schema of string
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_1$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:289)
    ... 30 more
fcosta-td commented 3 years ago

Please let me know if you need additional information. Thanks

avikivity commented 3 years ago

/cc @iravid

tarzanek commented 3 years ago

@fcosta-td could you share schema? we miss some encoder for some type it seems

tarzanek commented 3 years ago

ok, I think this looks similar to https://github.com/scylladb/scylla-migrator/pull/16

@fcosta-td can you patch your clone with above PR and retry please?

fcosta-td commented 3 years ago
CREATE TABLE apps_subscriptions (
    app_id text,
    app_version_id text,
    active boolean,
    created timestamp,
    destination text,
    events set<text>,
    modified timestamp,
    shared_secret_id text,
    PRIMARY KEY (app_id, app_version_id)
) WITH CLUSTERING ORDER BY (app_version_id ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'ALL'}
    AND comment = ''
    AND compaction = {'class': 'SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99.0PERCENTILE';
fcosta-td commented 3 years ago

ok, I think this looks similar to

16

@fcosta-td can you patch your clone with above PR and retry please?

Hi @tarzanek, I've already tried that and it did not work. That function (explodeRow) was moved from Migrator.scala to Cassandra.scala, but when i change it to match that PR, it fails to compile.

tarzanek commented 3 years ago

ok, let me try to update the patch

tarzanek commented 3 years ago

@fcosta-td can you check https://github.com/tarzanek/scylla-migrator/tree/null-regular-columns-rebase and build and try to use it? worst case ping me on slack, I can give you a build of above if needed

fcosta-td commented 3 years ago

Hi @tarzanek , I've just tested it and it still fails with the same error.

sangeethdba commented 3 years ago

any update

we are getting same issue null columns

sangeethdba commented 3 years ago

org.apache.spark.sql.Row, true]).isNullAt) null else mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class java.lang.Object), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class java.lang.Object), true), StringType), true, false), validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, exclusions), ArrayType(StringType,true)), None) AS exclusions#2 at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292) at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594) at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at com.datastax.spark.connector.util.CountingIterator.next(CountingIterator.scala:16) at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:105) at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.

iravid commented 3 years ago

Hi folks, sorry for the problems you're running into here. This looks familiar to me so it must have been a regression from the recent driver upgrade. I will take a stab at this later today.

anonymoose commented 3 years ago

Thanks @iravid. Seeing the same issue here.

anonymoose commented 3 years ago

@iravid This is our setup that causes the problem.

table definition

CREATE TABLE qa2.exclusions (
    sku text,
    exclusion_type text,
    exclusions list<text>,
    PRIMARY KEY (sku, exclusion_type)
) WITH CLUSTERING ORDER BY (exclusion_type ASC)
    AND bloom_filter_fp_chance = 0.1
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.DeflateCompressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

exception trace

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: org.apache.spark.unsafe.types.UTF8String is not a valid external type for schema of string
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, sku), StringType), true, false) AS sku#0
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, exclusion_type), StringType), true, false) AS exclusion_type#1
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class java.lang.Object), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class java.lang.Object), true), StringType), true, false), validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, exclusions), ArrayType(StringType,true)), None) AS exclusions#2
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
    at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
    at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at com.datastax.spark.connector.util.CountingIterator.next(CountingIterator.scala:16)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:105)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:30)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:30)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$2.apply(TableWriter.scala:241)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$2.apply(TableWriter.scala:210)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:129)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
    at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
    at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:188)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:175)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:38)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:38)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: org.apache.spark.unsafe.types.UTF8String is not a valid external type for schema of string
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:289)
    ... 30 more
anonymoose commented 3 years ago

Tried this with a small table with known data. Ensured there were no nulls in the 'exclusions' column collections. Still failed with same exception described above.

cassuser@cqlsh> select * from qa2.exclusions_2;
 sku     | exclusion_type | exclusions
---------+----------------+------------------
 1000000 |              x |          ['159']
 1000001 |              x | ['5157', '1371']
 1000002 |              x |         ['5788']
 1000003 |              x |         ['6397']
 1000004 |              x |         ['4031']
 1000005 |              x |         ['1231']
 1000006 |              x |         ['7128']
 1000007 |              x |          ['907']
 1000008 |              x | ['3192', '7327']
 1000009 |              x |         ['9529']
(10 rows)
iravid commented 3 years ago

Thanks for the info everyone! The problem is with composite data types (lists, sets, maps, tuples, UDTs) that contain strings. Will have a fix soon.

sangeethdba commented 3 years ago

@iravid thanks for looking into it .looking forward for the fix.

iravid commented 3 years ago

Latest master should fix this 👍 Thank you for your patience!