scylladb / scylla-migrator

Migrate data extract using Spark to Scylla, normally from Cassandra/parquet files. Alt. from DynamoDB to Scylla Alternator.
https://migrator.docs.scylladb.com/stable/
Apache License 2.0
55 stars 34 forks source link

int conversion problems #46

Closed tarzanek closed 3 years ago

tarzanek commented 3 years ago

Migrator version https://github.com/scylladb/scylla-migrator/commit/b960adacd9ebf878f4f087b48e448dc31223f6b9 I have a simple cql as source:

CREATE KEYSPACE mykeyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1};

use mykeyspace;

CREATE TABLE users ( user_id int, fname text, lname text, PRIMARY KEY((user_id)));

insert into users(user_id, fname, lname) values (1, 'rick', 'sanchez');
insert into users(user_id, fname, lname) values (4, 'rust', 'cohle');

when I migrate this table migrator throws errors in driver logs:

21/01/20 19:51:04 WARN TaskSetManager: Lost task 212.0 in stage 0.0 (TID 212, 192.168.1.207, executor 0): java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Integer
        at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
        at org.apache.spark.sql.Row$class.getInt(Row.scala:225)
        at org.apache.spark.sql.catalyst.expressions.GenericRow.getInt(rows.scala:166)
        at com.scylladb.migrator.readers.Cassandra$$anonfun$11.apply(Cassandra.scala:109)
        at com.scylladb.migrator.readers.Cassandra$$anonfun$11.apply(Cassandra.scala:100)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.Map$Map2.foreach(Map.scala:137)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at com.scylladb.migrator.readers.Cassandra$.explodeRow(Cassandra.scala:100)
        at com.scylladb.migrator.readers.Cassandra$$anonfun$adjustDataframeForTimestampPreservation$1.apply(Cassandra.scala:192)
        at com.scylladb.migrator.readers.Cassandra$$anonfun$adjustDataframeForTimestampPreservation$1.apply(Cassandra.scala:192)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
        at com.datastax.spark.connector.writer.GroupingBatchBuilder.hasNext(GroupingBatchBuilder.scala:100)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:30)
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$2.apply(TableWriter.scala:241)
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$2.apply(TableWriter.scala:210)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
        at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:129)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
        at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
        at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:188)
        at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:175)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:38)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:38)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
tarzanek commented 3 years ago

spark 2.4.7 jdk 1.8 used for build and for running spark

tarzanek commented 3 years ago

I don't see the problem with https://github.com/scylladb/scylla-migrator/commit/85e5039809dbfe56d58c5bcca2ac4acf13b4b7ed