scylladb / scylla-migrator

Migrate data extract using Spark to Scylla, normally from Cassandra/parquet files. Alt. from DynamoDB to Scylla Alternator.
https://migrator.docs.scylladb.com/stable/
Apache License 2.0
55 stars 34 forks source link

java.lang.Integer is not a valid external type for schema of bigint #32

Closed clad closed 3 years ago

clad commented 3 years ago

Hello,

Getting an exception for the following table -

CREATE TABLE organization_notification (
    partition_timestamp bigint,
    org_id bigint,
    timestamp_uuid timeuuid,
    notification_id bigint,
    error_code text,
    full_details text,
    issuer_user_id bigint,
    issuer_user_ip inet,
    variables blob,
    PRIMARY KEY ((partition_timestamp, org_id), timestamp_uuid, notification_id)
) WITH CLUSTERING ORDER BY (timestamp_uuid DESC, notification_id ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'ALL'}
    AND comment = ''
    AND compaction = {'class': 'SizeTieredCompactionStrategy'}
    AND compression = {}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99.0PERCENTILE';
validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, partition_timestamp), LongType) AS partition_timestamp#0L
validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, org_id), LongType) AS org_id#1L
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, timestamp_uuid), StringType), true, false) AS timestamp_uuid#2
validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, notification_id), LongType) AS notification_id#3L
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, error_code), StringType), true, false) AS error_code#4
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, error_code_ttl), LongType) AS error_code_ttl#5L
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, error_code_writetime), LongType) AS error_code_writetime#6L
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, full_details), StringType), true, false) AS full_details#7
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, full_details_ttl), LongType) AS full_details_ttl#8L
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, full_details_writetime), LongType) AS full_details_writetime#9L
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, issuer_user_id), LongType) AS issuer_user_id#10L
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, issuer_user_id_ttl), LongType) AS issuer_user_id_ttl#11L
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, issuer_user_id_writetime), LongType) AS issuer_user_id_writetime#12L
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, issuer_user_ip), StringType), true, false) AS issuer_user_ip#13
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 14, issuer_user_ip_ttl), LongType) AS issuer_user_ip_ttl#14L
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 15, issuer_user_ip_writetime), LongType) AS issuer_user_ip_writetime#15L
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 16, variables), BinaryType) AS variables#16
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 17, variables_ttl), LongType) AS variables_ttl#17L
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 18, variables_writetime), LongType) AS variables_writetime#18L
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
    at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
    at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.hasNext(GroupingBatchBuilder.scala:100)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:30)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$2.apply(TableWriter.scala:241)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$2.apply(TableWriter.scala:210)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:129)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
    at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
    at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:188)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:175)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:38)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:38)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.lang.Integer is not a valid external type for schema of bigint
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If_5$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_3$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:289)
    ... 33 more

config.yaml for table -

source:
  type: scylla
  host: XXX
  port: 9042
  keyspace: cust
  table: organization_notification
  preserveTimestamps: true
  splitCount: 256
  connections: 8
  fetchSize: 1000

target:
  type: scylla
  host: XXX
  port: 9042
  keyspace: cust
  table: organization_notification
  connections: 16

savepoints:
  path: /app/savepoints
  intervalSeconds: 300

renames: []
skipTokenRanges: []

validation:
  # Should WRITETIMEs and TTLs be compared?
  compareTimestamps: true
  # What difference should we allow between TTLs?
  ttlToleranceMillis: 60000
  # What difference should we allow between WRITETIMEs?
  writetimeToleranceMillis: 1000
  # How many differences to fetch and print
  failuresToFetch: 100
  # What difference should we allow between floating point numbers?
  floatingPointTolerance: 0.001
iravid commented 3 years ago

Hey @clad - this is not replicating for me given latest master, your schema definitions and the following test row:

INSERT INTO organization_notification (partition_timestamp, org_id, timestamp_uuid, notification_id, error_code, full_details, issuer_user_Id, issuer_user_ip, variables) 
VALUES (50, 50, now(), 50, 'bla', 'bla', 50, '172.0.0.1', textAsBlob('blabber')) ;

If you could provide a test row that demonstrates the issue that'd be great.

clad commented 3 years ago

organization_notification.csv.gz

Tried with latest master, same error with above data. Thanks for looking into this.

Had to add stripTrailingZerosForDecimals: false to target in yaml, btw.

tarzanek commented 3 years ago

I could reproduce today, but not with this data I reproduced with uuid and timeuuid data in schema

tarzanek commented 3 years ago

now I am sure that the problem comes from "preserveTimestamps: true" above data doesn't repro, since it lacks TTL and timestamps (and default table has TTL 0) it looks like TTLs get transfered as Integers and hence the error @iravid

Dobiasd commented 3 years ago

Just ran into the same problem (using the latest version from scylla-migrator/tree/master) when trying to migrate a table from Cassandra to Scylla.

CREATE TABLE foo.bar (
    a    bigint,
    b    bigint,
    c    tinyint,
    d    timestamp,
    PRIMARY KEY (a, b, c)
)
WITH default_time_to_live = 2592000;

It's working fine when setting preserveTimestamps: false (and the values in the d column are of course copied correctly), but I'd like to preserve my write timestamps too though. :grimacing:

tarzanek commented 3 years ago

@Dobiasd for now reset to changeset https://github.com/scylladb/scylla-migrator/commit/71c37b32a2b8ea9c125a55a541f21c38aadf7ab6 and use older migrator, that will work until master is fixed

iravid commented 3 years ago

Just submitted a fix for this. Sorry it took so long - I missed preserveTimestamps: true in @clad's original config.