scylladb / scylla-migrator

Migrate data extract using Spark to Scylla, normally from Cassandra/parquet files. Alt. from DynamoDB to Scylla Alternator.
https://migrator.docs.scylladb.com/stable/
Apache License 2.0
58 stars 35 forks source link

renames with preserve timestamps don't work together #91

Open wtibbitts opened 2 years ago

wtibbitts commented 2 years ago

Renames was working for me when I pulled and built from about 10 days ago, but has stopped working since then. Schema after renames still shows correctly, but the error is the same as when I had the naming conflict prior to renames (see tokn_ttl type string of original schema, conflicted with generated tokn_ttl of type integer)

Config:

source:
  type: cassandra
  host: carbine01.######.com
  port: 9042
  keyspace: carbine
  table: login
  consistencyLevel: LOCAL_QUORUM
  preserveTimestamps: true
  splitCount: 256
  connections: 8
  fetchSize: 1000
target:
  type: scylla
  host: scylla-us-west-2-2a-0
  port: 9042
  keyspace: carbine
  table: login
  consistencyLevel: LOCAL_QUORUM
  connections: 16
  stripTrailingZerosForDecimals: false
savepoints:
  path: /opt/bitnami/spark/tmp/savepoints
  intervalSeconds: 300
renames:
  - from: tokn_ttl
    to: tokn_expir
skipTokenRanges: []
validation:
  compareTimestamps: true
  ttlToleranceMillis: 60000
  writetimeToleranceMillis: 1000
  failuresToFetch: 100
  floatingPointTolerance: 0.001
  timestampMsTolerance: 0

Run logs:

22/09/29 19:44:15 INFO Cassandra: TableDef(carbine,login,ArrayBuffer(ColumnDef(email,PartitionKeyColumn,VarCharType)),ArrayBuffer(),Stream(ColumnDef(account_id,RegularColumn,VarCharType), ColumnDef(details,RegularColumn,VarCharType), ColumnDef(last_updated_ts,RegularColumn,TimestampType), ColumnDef(password_hash,RegularColumn,VarCharType), ColumnDef(password_salt,RegularColumn,VarCharType), ColumnDef(scheme,RegularColumn,VarCharType), ColumnDef(tokn,RegularColumn,VarCharType), ColumnDef(tokn_ttl,RegularColumn,VarCharType)),Stream(),false,false,Map())
22/09/29 19:44:15 INFO Cassandra: Original schema loaded:
root
 |-- email: string (nullable = false)
 |-- account_id: string (nullable = true)
 |-- details: string (nullable = true)
 |-- last_updated_ts: timestamp (nullable = true)
 |-- password_hash: string (nullable = true)
 |-- password_salt: string (nullable = true)
 |-- scheme: string (nullable = true)
 |-- tokn: string (nullable = true)
 |-- tokn_ttl: string (nullable = true)

22/09/29 19:44:15 INFO Cassandra: ColumnRefs generated for selection:
22/09/29 19:44:15 INFO Cassandra: email
account_id
ttl(account_id)
writetime(account_id)
details
ttl(details)
writetime(details)
last_updated_ts
ttl(last_updated_ts)
writetime(last_updated_ts)
password_hash
ttl(password_hash)
writetime(password_hash)
password_salt
ttl(password_salt)
writetime(password_salt)
scheme
ttl(scheme)
writetime(scheme)
tokn
ttl(tokn)
writetime(tokn)
tokn_ttl
ttl(tokn_ttl)
writetime(tokn_ttl)
22/09/29 19:44:15 INFO Cassandra: Schema generated with TTLs and Writetimes:
root
 |-- email: string (nullable = false)
 |-- account_id: string (nullable = true)
 |-- account_id_ttl: integer (nullable = true)
 |-- account_id_writetime: long (nullable = true)
 |-- details: string (nullable = true)
 |-- details_ttl: integer (nullable = true)
 |-- details_writetime: long (nullable = true)
 |-- last_updated_ts: timestamp (nullable = true)
 |-- last_updated_ts_ttl: integer (nullable = true)
 |-- last_updated_ts_writetime: long (nullable = true)
 |-- password_hash: string (nullable = true)
 |-- password_hash_ttl: integer (nullable = true)
 |-- password_hash_writetime: long (nullable = true)
 |-- password_salt: string (nullable = true)
 |-- password_salt_ttl: integer (nullable = true)
 |-- password_salt_writetime: long (nullable = true)
 |-- scheme: string (nullable = true)
 |-- scheme_ttl: integer (nullable = true)
 |-- scheme_writetime: long (nullable = true)
 |-- tokn: string (nullable = true)
 |-- tokn_ttl: integer (nullable = true)
 |-- tokn_writetime: long (nullable = true)
 |-- tokn_ttl: string (nullable = true)
 |-- tokn_ttl_ttl: integer (nullable = true)
 |-- tokn_ttl_writetime: long (nullable = true)

22/09/29 19:44:16 INFO Cassandra: Schema that'll be used for writing to Scylla:
22/09/29 19:44:16 INFO Cassandra: root
 |-- email: string (nullable = false)
 |-- account_id: string (nullable = true)
 |-- details: string (nullable = true)
 |-- last_updated_ts: timestamp (nullable = true)
 |-- password_hash: string (nullable = true)
 |-- password_salt: string (nullable = true)
 |-- scheme: string (nullable = true)
 |-- tokn: string (nullable = true)
 |-- tokn_ttl: string (nullable = true)
 |-- ttl: integer (nullable = true)
 |-- writetime: long (nullable = true)

22/09/29 19:44:16 INFO migrator: Created source dataframe; resulting schema:
root
 |-- email: string (nullable = false)
 |-- account_id: string (nullable = true)
 |-- details: string (nullable = true)
 |-- last_updated_ts: timestamp (nullable = true)
 |-- password_hash: string (nullable = true)
 |-- password_salt: string (nullable = true)
 |-- scheme: string (nullable = true)
 |-- tokn: string (nullable = true)
 |-- tokn_ttl: string (nullable = true)
 |-- ttl: integer (nullable = true)
 |-- writetime: long (nullable = true)

22/09/29 19:44:16 INFO migrator: Installing SIGINT/TERM/USR2 handler. Send this to dump the current progress to a savepoint.
22/09/29 19:44:16 INFO migrator: Starting savepoint schedule; will write a savepoint every 300 seconds
22/09/29 19:44:16 INFO migrator: Created a savepoint config at /opt/bitnami/spark/tmp/savepoints/savepoint_1664480656.yaml due to schedule. Ranges added: Set()
22/09/29 19:44:17 INFO migrator: We need to transfer: 280 partitions in total
22/09/29 19:44:17 INFO migrator: All token ranges extracted from partitions size:3073
22/09/29 19:44:17 INFO migrator: Savepoints array defined, size of the array: 0
22/09/29 19:44:17 INFO migrator: Diff ... total diff of full ranges to savepoints is: 3073
22/09/29 19:44:17 INFO migrator: Starting write...
22/09/29 19:44:17 INFO Scylla: Using consistencyLevel [LOCAL_QUORUM] for TARGET based on target config [LOCAL_QUORUM]
22/09/29 19:44:17 INFO Scylla: Schema after renames:
22/09/29 19:44:17 INFO Scylla: root
 |-- email: string (nullable = false)
 |-- account_id: string (nullable = true)
 |-- details: string (nullable = true)
 |-- last_updated_ts: timestamp (nullable = true)
 |-- password_hash: string (nullable = true)
 |-- password_salt: string (nullable = true)
 |-- scheme: string (nullable = true)
 |-- tokn: string (nullable = true)
 |-- tokn_expiry: string (nullable = true)
 |-- ttl: integer (nullable = true)
 |-- writetime: long (nullable = true)
22/09/29 19:44:22 WARN TaskSetManager: Lost task 5.0 in stage 0.0 (TID 5, 172.21.5.60, executor 5): java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
    at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
    at org.apache.spark.sql.Row$class.getInt(Row.scala:223)
    at org.apache.spark.sql.catalyst.expressions.GenericRow.getInt(rows.scala:166)
    at com.scylladb.migrator.readers.Cassandra$$anonfun$11.apply(Cassandra.scala:108)
    at com.scylladb.migrator.readers.Cassandra$$anonfun$11.apply(Cassandra.scala:101)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
    at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at com.scylladb.migrator.readers.Cassandra$.explodeRow(Cassandra.scala:101)
    at com.scylladb.migrator.readers.Cassandra$$anonfun$adjustDataframeForTimestampPreservation$1.apply(Cassandra.scala:193)
    at com.scylladb.migrator.readers.Cassandra$$anonfun$adjustDataframeForTimestampPreservation$1.apply(Cassandra.scala:193)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.hasNext(GroupingBatchBuilder.scala:100)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:30)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$2.apply(TableWriter.scala:241)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$2.apply(TableWriter.scala:210)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:129)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
    at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
    at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:188)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:175)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:38)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:38)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

22/09/29 19:44:22 WARN TaskSetManager: Lost task 3.2 in stage 0.0 (TID 12, 172.21.4.113, executor 0): java.lang.ClassCastException
tarzanek commented 2 years ago

can you try disable preserve timestamps if it fixes your issue? @jwsomis ?

I just want to see if ordering for preserve timestamp and column rewrite don't conflict

wtibbitts commented 2 years ago

That did seem to resolve it, but that also seems to avoid this particular collision all together as it doesn't seem to generate the duplicate tokn_ttl like it did before. Which leads me to another question, we were using the renames to handle this collision, but would disabling preserveTimestamps be just as effective?

tarzanek commented 2 years ago

so this is not really a regression but we have a bug when both rename and preserve timestamp is set

both affect schema generation in code and migrator needs to do rename for ttl tokens too

wtibbitts commented 2 years ago

The only reason I consider it a regression is that it did work for me a couple weeks ago to have both preserveTimestamps and a rename, but I have not checked out an older version to re confirm that the same config that is broken now worked prior it is just from memory that nothing was changed.