scylladb / scylla-migrator

Migrate data extract using Spark to Scylla, normally from Cassandra/parquet files. Alt. from DynamoDB to Scylla Alternator.
https://migrator.docs.scylladb.com/stable/
Apache License 2.0
58 stars 35 forks source link

DynamoDB to Scylla Alternator - high parallel settings with little source data errors out #104

Closed pdbossman closed 1 month ago

pdbossman commented 7 months ago

Source dynamodb, target scylla alternator.

Using higher concurrency settings on source with very little data errors out (no data copied): config.yaml scanSegments: 256 maxMapTasks: 8

24/02/19 19:07:15 WARN TaskSetManager: Lost task 2.0 in stage 0.0 (TID 2, 172.18.0.7, executor 0): java.lang.IllegalArgumentException: Field "user_id" does not exist.
Available fields: 
    at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:274)
    at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:274)
    at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
    at scala.collection.AbstractMap.getOrElse(Map.scala:59)
    at org.apache.spark.sql.types.StructType.apply(StructType.scala:273)
    at com.audienceproject.spark.dynamodb.connector.ColumnSchema.<init>(ColumnSchema.scala:42)
    at com.audienceproject.spark.dynamodb.datasource.DynamoWriterFactory.createDataWriter(DynamoWriterFactory.scala:45)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:113)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Same source/target, all other parameters the same - but set scanSegments and maxMapTasks to 1: scanSegments: 1 maxMapTasks: 1

No errors, copies the row.

julienrf commented 6 months ago

This is probably a consequence of #103, see my comment there about using anything else than 1 for scanSegments: https://github.com/scylladb/scylla-migrator/issues/103#issuecomment-1959192238.

julienrf commented 1 month ago

I am closing this issue since it was likely due to the schema inference, which we do not perform anymore.