scylladb / scylla-migrator

Migrate data extract using Spark to Scylla, normally from Cassandra
Apache License 2.0
54 stars 34 forks source link

Migration from DynamoDB falls back to single partition #130

Closed pdbossman closed 1 month ago

pdbossman commented 2 months ago

Using latest version of scylla-migrator for DynamodDB -> Alternator migration, I receive an error, and it falls back to processing a single partition instead of scanSegments.

time spark-submit --class com.scylladb.migrator.Migrator \ --master spark://spark-master:7077 \ --conf spark.eventLog.enabled=true \ --conf spark.scylla.config=/home/ubuntu/scylla-migrator/config.yaml \ --conf spark.driver.memory=64G \ /home/ubuntu/scylla-migrator/migrator/target/scala-2.11/scylla-migrator-assembly-0.0.1.jar 24/04/19 15:29:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 24/04/19 15:29:02 INFO SparkContext: Running Spark version 2.4.8 24/04/19 15:29:02 INFO SparkContext: Submitted application: scylla-migrator 24/04/19 15:29:02 INFO SecurityManager: Changing view acls to: ubuntu 24/04/19 15:29:02 INFO SecurityManager: Changing modify acls to: ubuntu 24/04/19 15:29:02 INFO SecurityManager: Changing view acls groups to: 24/04/19 15:29:02 INFO SecurityManager: Changing modify acls groups to: 24/04/19 15:29:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ubuntu); groups with view permissions: Set(); users with modify permissions: Set(ubuntu); groups with modify permissions: Set() 24/04/19 15:29:03 INFO Utils: Successfully started service 'sparkDriver' on port 38471. 24/04/19 15:29:03 INFO SparkEnv: Registering MapOutputTracker 24/04/19 15:29:03 INFO SparkEnv: Registering BlockManagerMaster 24/04/19 15:29:03 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 24/04/19 15:29:03 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 24/04/19 15:29:03 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-fccb888f-178d-4683-b8f6-b8d6269fafcb 24/04/19 15:29:03 INFO MemoryStore: MemoryStore started with capacity 34.0 GB 24/04/19 15:29:03 INFO SparkEnv: Registering OutputCommitCoordinator 24/04/19 15:29:03 INFO Utils: Successfully started service 'SparkUI' on port 4040. 24/04/19 15:29:03 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://spark-master:4040 24/04/19 15:29:03 INFO SparkContext: Added JAR file:/home/ubuntu/scylla-migrator/migrator/target/scala-2.11/scylla-migrator-assembly-0.0.1.jar at spark://spark-master:38471/jars/scylla-migrator-assembly-0.0.1.jar with timestamp 1713540543193 24/04/19 15:29:03 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://spark-master:7077... 24/04/19 15:29:03 INFO TransportClientFactory: Successfully created connection to spark-master/172.31.30.174:7077 after 23 ms (0 ms spent in bootstraps) 24/04/19 15:29:03 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20240419152903-0000 24/04/19 15:29:03 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20240419152903-0000/0 on worker-20240419152554-172.31.79.38-42559 (172.31.79.38:42559) with 12 core(s) 24/04/19 15:29:03 INFO StandaloneSchedulerBackend: Granted executor ID app-20240419152903-0000/0 on hostPort 172.31.79.38:42559 with 12 core(s), 1024.0 MB RAM 24/04/19 15:29:03 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20240419152903-0000/1 on worker-20240419152604-172.31.79.38-42149 (172.31.79.38:42149) with 12 core(s) 24/04/19 15:29:03 INFO StandaloneSchedulerBackend: Granted executor ID app-20240419152903-0000/1 on hostPort 172.31.79.38:42149 with 12 core(s), 1024.0 MB RAM 24/04/19 15:29:03 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20240419152903-0000/2 on worker-20240419152556-172.31.79.38-34579 (172.31.79.38:34579) with 12 core(s) 24/04/19 15:29:03 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 36365. 24/04/19 15:29:03 INFO StandaloneSchedulerBackend: Granted executor ID app-20240419152903-0000/2 on hostPort 172.31.79.38:34579 with 12 core(s), 1024.0 MB RAM 24/04/19 15:29:03 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20240419152903-0000/3 on worker-20240419152601-172.31.79.38-39533 (172.31.79.38:39533) with 12 core(s) 24/04/19 15:29:03 INFO NettyBlockTransferService: Server created on spark-master:36365 24/04/19 15:29:03 INFO StandaloneSchedulerBackend: Granted executor ID app-20240419152903-0000/3 on hostPort 172.31.79.38:39533 with 12 core(s), 1024.0 MB RAM 24/04/19 15:29:03 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 24/04/19 15:29:03 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20240419152903-0000/4 on worker-20240419152612-172.31.79.38-42791 (172.31.79.38:42791) with 12 core(s) 24/04/19 15:29:03 INFO StandaloneSchedulerBackend: Granted executor ID app-20240419152903-0000/4 on hostPort 172.31.79.38:42791 with 12 core(s), 1024.0 MB RAM 24/04/19 15:29:03 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20240419152903-0000/5 on worker-20240419152609-172.31.79.38-42775 (172.31.79.38:42775) with 12 core(s) 24/04/19 15:29:03 INFO StandaloneSchedulerBackend: Granted executor ID app-20240419152903-0000/5 on hostPort 172.31.79.38:42775 with 12 core(s), 1024.0 MB RAM 24/04/19 15:29:03 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20240419152903-0000/6 on worker-20240419152559-172.31.79.38-42645 (172.31.79.38:42645) with 12 core(s) 24/04/19 15:29:03 INFO StandaloneSchedulerBackend: Granted executor ID app-20240419152903-0000/6 on hostPort 172.31.79.38:42645 with 12 core(s), 1024.0 MB RAM 24/04/19 15:29:03 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20240419152903-0000/7 on worker-20240419152607-172.31.79.38-37553 (172.31.79.38:37553) with 12 core(s) 24/04/19 15:29:03 INFO StandaloneSchedulerBackend: Granted executor ID app-20240419152903-0000/7 on hostPort 172.31.79.38:37553 with 12 core(s), 1024.0 MB RAM 24/04/19 15:29:03 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, spark-master, 36365, None) 24/04/19 15:29:03 INFO BlockManagerMasterEndpoint: Registering block manager spark-master:36365 with 34.0 GB RAM, BlockManagerId(driver, spark-master, 36365, None) 24/04/19 15:29:03 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, spark-master, 36365, None) 24/04/19 15:29:03 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, spark-master, 36365, None) 24/04/19 15:29:03 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20240419152903-0000/1 is now RUNNING 24/04/19 15:29:03 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20240419152903-0000/0 is now RUNNING 24/04/19 15:29:03 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20240419152903-0000/2 is now RUNNING 24/04/19 15:29:03 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20240419152903-0000/5 is now RUNNING 24/04/19 15:29:03 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20240419152903-0000/3 is now RUNNING 24/04/19 15:29:03 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20240419152903-0000/6 is now RUNNING 24/04/19 15:29:03 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20240419152903-0000/4 is now RUNNING 24/04/19 15:29:03 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20240419152903-0000/7 is now RUNNING 24/04/19 15:29:03 INFO EventLoggingListener: Logging events to file:/tmp/spark-events/app-20240419152903-0000 24/04/19 15:29:03 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 24/04/19 15:29:04 INFO migrator: Loaded config: MigratorConfig(DynamoDB(None,None,None,XXX,Some(4000),None,None,Some(2)),DynamoDB(Some(DynamoDBEndpoint(http://scylla,8000)),None,Some(AWSCredentials(emp..., )),XXX,None,None,None,Some(1),false),List(),Savepoints(300,/app/savepoints),Set(),Validation(false,60000,1000,100,0.001,0)) 24/04/19 15:29:05 WARN ApacheUtils: NoSuchMethodException was thrown when disabling normalizeUri. This indicates you are using an old version (< 4.5.8) of Apache http client. It is recommended to use http client version >= 4.5.9 to avoid the breaking change introduced in apache client 4.5.7 and the latency in exception handling. See https://github.com/aws/aws-sdk-java/issues/1919 for more information 24/04/19 15:29:05 WARN ClusterTopologyNodeCapacityProvider: Exception when trying to determine instance types java.nio.file.NoSuchFileException: /mnt/var/lib/info/job-flow.json at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214) at java.nio.file.Files.newByteChannel(Files.java:361) at java.nio.file.Files.newByteChannel(Files.java:407) at java.nio.file.Files.readAllBytes(Files.java:3152) at org.apache.hadoop.dynamodb.util.ClusterTopologyNodeCapacityProvider.readJobFlowJsonString(ClusterTopologyNodeCapacityProvider.java:102) at org.apache.hadoop.dynamodb.util.ClusterTopologyNodeCapacityProvider.getCoreNodeMemoryMB(ClusterTopologyNodeCapacityProvider.java:41) at org.apache.hadoop.dynamodb.util.TaskCalculator.getMaxMapTasks(TaskCalculator.java:53) at org.apache.hadoop.dynamodb.DynamoDBUtil.calcMaxMapTasks(DynamoDBUtil.java:271) at org.apache.hadoop.dynamodb.read.AbstractDynamoDBInputFormat.getSplits(AbstractDynamoDBInputFormat.java:46) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:269) at org.apache.spark.rdd.RDD.getNumPartitions(RDD.scala:289) at com.scylladb.migrator.alternator.AlternatorMigrator$.migrate(AlternatorMigrator.scala:22) at com.scylladb.migrator.Migrator$.main(Migrator.scala:43) at com.scylladb.migrator.Migrator.main(Migrator.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:855) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:930) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:939) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 24/04/19 15:29:05 INFO alternator: We need to transfer: 1 partitions in total 24/04/19 15:29:05 INFO alternator: Starting write... 24/04/19 15:29:05 INFO DynamoUtils: Checking for table existence at destination 24/04/19 15:29:05 INFO DynamoUtils: Table XXX exists at destination 24/04/19 15:29:05 WARN FileOutputCommitter: Output Path is null in setupJob()

FYI - @tarzanek @hopugop @wpaven

pdbossman commented 2 months ago

Attached config.yaml - had to rename it so git would accept it. (table name was redacted) config.dynamodb.yaml.txt

tarzanek commented 2 months ago

@julienrf can you review above please?

julienrf commented 2 months ago

AFAIU, the exception you see in the logs (“Exception when trying to determine instance types java.nio.file.NoSuchFileException: /mnt/var/lib/info/job-flow.json”) is not a real issue. It happens because the library that we use to load the data from DynamoDB as a Spark RDD has been designed to be used within an AWS EMR environment. See also https://github.com/awslabs/emr-dynamodb-connector/issues/50 for some additional discussion about it.

How many items are in your source database? How many CPUs are in your Spark cluster? Would it be possible to increase the log level of the logger org.apache.hadoop.dynamodb to at least INFO or even DEBUG? (There are many properties taken into account to choose how to split the input and increasing the log levels should help us follow the algorithm) Also note that it is possible that your issue is the same the one reported here: https://github.com/awslabs/emr-dynamodb-connector/issues/125.

tarzanek commented 2 months ago

so this means we should also set dynamodb.numberOfSplits ? maybe we can take it from old audienceproject library, I remember seeing some logic around splits there

resp. we should have a way to fallback to some manual splits if there won't be autodetection as in EMR

the goal is that migrator shouldn't assume the environment, it should be able to run on any spark cluster (of course EMR is preferred, but it's not requirement)

(also I was under the impression https://github.com/scylladb/scylla-migrator/blob/master/migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala#L172 SCAN_SEGMENTS control splits in the end ... but afaik in Patricks tests it didn't change anything)

julienrf commented 2 months ago

The logic in emr-dynamodb-connector is pretty involved and it would be good to know better which parameters are significant. Maybe we should replace scanSegments with something else. @pdbossman are you able to provide the logs with more details (see my post above)?

julienrf commented 2 months ago

maybe we can take it from old audienceproject library, I remember seeing some logic around splits there

From their README:

readPartitions number of partitions to split the initial RDD when loading the data into Spark. Defaults to the size of the DynamoDB table divided into chunks of maxPartitionBytes

hopugop commented 2 months ago

Defaults to the size of the DynamoDB table divided into chunks of maxPartitionBytes

How is the size of the DynamoDB table collected? We don't export the table size in Alternator. If it relies on that it may be getting bogus results.

julienrf commented 2 months ago

They use [tableSizeBytes](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/dynamodb/model/TableDescription.html#tableSizeBytes()). But anyway, we are not using the audienceproject connector anymore.

pdbossman commented 2 months ago

I re-set this up and recreated the error again. It's certainly only starting one task now, where previously it would create scanSegments number of tasks, then then distribute those tasks across all the workers and executors I had.

I have two instances: x2iedn.2xlarge. - master - 4 CPUs (0 workers) i4i.2xlarge - I start 2 workers, each using 4 processors. Machine has 8 cpus

I use multiple workers on an instance because I usually start small and step up the number of workers while monitoring pressure on the source and target systems.

How many items are in your source database? How many CPUs are in your Spark cluster? Would it be possible to increase the log level of the logger org.apache.hadoop.dynamodb to at least INFO or even DEBUG?

I don't know how to change the logger level. Where would I set this for scylla-migrator?

pdbossman commented 2 months ago

"From their README:

readPartitions number of partitions to split the initial RDD when loading the data into Spark. Defaults to the size of the DynamoDB table divided into chunks of maxPartitionBytes"

... the computation of scanSegments that I used takes the output of describe table - the number of bytes divided by 128mb. My understanding of what/how to set scanSegments sounds exactly how readPartitions is described here.

julienrf commented 2 months ago

@pdbossman Thank you for your feedback. I ran some investigations and the configuration logic for the Hadoop job is complicated. It turns out the table read throughput plays an important role in it. Would you mind running your job from my branch here: https://github.com/julienrf/scylla-migrator/tree/tuned-read-throughput And setting readThroughput in your config.yaml file:

source:
  type: dynamodb
  ...
  readThroughput: 4096

Please let me know if you see more tasks created with that change.

pdbossman commented 2 months ago

I installed from the fork, built, and addedreadThroughput: 4096 stopped and restarted master and all the slaves.

I still got this error:

time spark-submit --class com.scylladb.migrator.Migrator --master spark://spark-master:7077 --conf spark.eventLog.enabled=true --conf spark.scylla.config=/home/ubuntu/scylla-migrator/config.yaml --conf spark.driver.memory=64G /home/ubuntu/scylla-migrator/migrator/target/scala-2.11/scylla-migrator-assembly-0.0.1.jar 24/04/28 21:50:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 24/04/28 21:50:33 INFO SparkContext: Running Spark version 2.4.8 24/04/28 21:50:33 INFO SparkContext: Submitted application: scylla-migrator 24/04/28 21:50:33 INFO SecurityManager: Changing view acls to: ubuntu 24/04/28 21:50:33 INFO SecurityManager: Changing modify acls to: ubuntu 24/04/28 21:50:33 INFO SecurityManager: Changing view acls groups to: 24/04/28 21:50:33 INFO SecurityManager: Changing modify acls groups to: 24/04/28 21:50:33 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ubuntu); groups with view permissions: Set(); users with modify permissions: Set(ubuntu); groups with modify permissions: Set() 24/04/28 21:50:33 INFO Utils: Successfully started service 'sparkDriver' on port 44423. 24/04/28 21:50:33 INFO SparkEnv: Registering MapOutputTracker 24/04/28 21:50:33 INFO SparkEnv: Registering BlockManagerMaster 24/04/28 21:50:33 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 24/04/28 21:50:33 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 24/04/28 21:50:33 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-ce6fe8e1-fdd7-46ec-9eb5-23728c973fab 24/04/28 21:50:33 INFO MemoryStore: MemoryStore started with capacity 34.0 GB 24/04/28 21:50:33 INFO SparkEnv: Registering OutputCommitCoordinator 24/04/28 21:50:33 INFO Utils: Successfully started service 'SparkUI' on port 4040. 24/04/28 21:50:33 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://spark-master:4040 24/04/28 21:50:34 INFO SparkContext: Added JAR file:/home/ubuntu/scylla-migrator/migrator/target/scala-2.11/scylla-migrator-assembly-0.0.1.jar at spark://spark-master:44423/jars/scylla-migrator-assembly-0.0.1.jar with timestamp 1714341034009 24/04/28 21:50:34 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://spark-master:7077... 24/04/28 21:50:34 INFO TransportClientFactory: Successfully created connection to spark-master/172.31.22.139:7077 after 27 ms (0 ms spent in bootstraps) 24/04/28 21:50:34 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20240428215034-0007 24/04/28 21:50:34 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20240428215034-0007/0 on worker-20240428001527-172.31.66.134-41501 (172.31.66.134:41501) with 4 core(s) 24/04/28 21:50:34 INFO StandaloneSchedulerBackend: Granted executor ID app-20240428215034-0007/0 on hostPort 172.31.66.134:41501 with 4 core(s), 1024.0 MB RAM 24/04/28 21:50:34 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20240428215034-0007/1 on worker-20240428001524-172.31.66.134-38687 (172.31.66.134:38687) with 4 core(s) 24/04/28 21:50:34 INFO StandaloneSchedulerBackend: Granted executor ID app-20240428215034-0007/1 on hostPort 172.31.66.134:38687 with 4 core(s), 1024.0 MB RAM 24/04/28 21:50:34 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 41691. 24/04/28 21:50:34 INFO NettyBlockTransferService: Server created on spark-master:41691 24/04/28 21:50:34 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 24/04/28 21:50:34 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20240428215034-0007/1 is now RUNNING 24/04/28 21:50:34 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20240428215034-0007/0 is now RUNNING 24/04/28 21:50:34 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, spark-master, 41691, None) 24/04/28 21:50:34 INFO BlockManagerMasterEndpoint: Registering block manager spark-master:41691 with 34.0 GB RAM, BlockManagerId(driver, spark-master, 41691, None) 24/04/28 21:50:34 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, spark-master, 41691, None) 24/04/28 21:50:34 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, spark-master, 41691, None) 24/04/28 21:50:34 INFO EventLoggingListener: Logging events to file:/tmp/spark-events/app-20240428215034-0007 24/04/28 21:50:34 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 24/04/28 21:50:35 INFO migrator: Loaded config: MigratorConfig(DynamoDB(None,None,None,partner_user_id_mappings,Some(4000),Some(4096),None,Some(2)),DynamoDB(Some(DynamoDBEndpoint(http://scylla,8000)),None,Some(AWSCredentials(emp..., )),partner_user_id_mappings,Some(4000),None,None,Some(1),false,None),List(),Savepoints(300,/app/savepoints),Set(),Validation(false,60000,1000,100,0.001,0)) 24/04/28 21:50:35 WARN ApacheUtils: NoSuchMethodException was thrown when disabling normalizeUri. This indicates you are using an old version (< 4.5.8) of Apache http client. It is recommended to use http client version >= 4.5.9 to avoid the breaking change introduced in apache client 4.5.7 and the latency in exception handling. See https://github.com/aws/aws-sdk-java/issues/1919 for more information 24/04/28 21:50:36 WARN ClusterTopologyNodeCapacityProvider: Exception when trying to determine instance types java.nio.file.NoSuchFileException: /mnt/var/lib/info/job-flow.json at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214) at java.nio.file.Files.newByteChannel(Files.java:361) at java.nio.file.Files.newByteChannel(Files.java:407) at java.nio.file.Files.readAllBytes(Files.java:3152) at org.apache.hadoop.dynamodb.util.ClusterTopologyNodeCapacityProvider.readJobFlowJsonString(ClusterTopologyNodeCapacityProvider.java:102) at org.apache.hadoop.dynamodb.util.ClusterTopologyNodeCapacityProvider.getCoreNodeMemoryMB(ClusterTopologyNodeCapacityProvider.java:41) at org.apache.hadoop.dynamodb.util.TaskCalculator.getMaxMapTasks(TaskCalculator.java:53) at org.apache.hadoop.dynamodb.DynamoDBUtil.calcMaxMapTasks(DynamoDBUtil.java:271) at org.apache.hadoop.dynamodb.read.AbstractDynamoDBInputFormat.getSplits(AbstractDynamoDBInputFormat.java:46) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:269) at org.apache.spark.rdd.RDD.getNumPartitions(RDD.scala:289) at com.scylladb.migrator.alternator.AlternatorMigrator$.migrate(AlternatorMigrator.scala:22) at com.scylladb.migrator.Migrator$.main(Migrator.scala:43) at com.scylladb.migrator.Migrator.main(Migrator.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:855) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:930) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:939) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 24/04/28 21:50:36 INFO alternator: We need to transfer: 1 partitions in total 24/04/28 21:50:36 INFO alternator: Starting write... 24/04/28 21:50:36 INFO DynamoUtils: Checking for table existence at destination 24/04/28 21:50:36 INFO DynamoUtils: Table partner_user_id_mappings exists at destination 24/04/28 21:50:36 WARN FileOutputCommitter: Output Path is null in setupJob()

pdbossman commented 2 months ago

For a prior POC, we used the following configuration: 3 servers to run spark/scylla migration: 1 x x2iedn.2xlarge - 8vCPU + 256GB ram to run spark-submit and master 2 x i4i.4xlarge - 16vCPU + 128G ram for worker nodes 250g disk

Scylla: 6 x i4i.8xlarge

We moved 28B items, ~3.2TB of data in around 9 hours. The other consideration is - DynamoDB we do not have a write time, so they need the migration to run fast as they need to replay transactions. The longer the migration takes, the more transactions they have queue'd up to replay. So being able to scale-out the migration cluster is important.

If there's a way to make it work with EMR, I would try that, I've always just created a stand-alone spark cluster.

julienrf commented 2 months ago

Thank you for your experiments @pdbossman. I can not reproduce your output even if I use the same config as you, though…

See my logs:

24/04/29 06:25:19 WARN ClusterTopologyNodeCapacityProvider: Exception when trying to determine instance types
java.nio.file.NoSuchFileException: /mnt/var/lib/info/job-flow.json
        at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
        …
24/04/29 06:25:19 INFO alternator: We need to transfer: 2 partitions in total

At the end, I have 2 partitions because maxMapTasks: 2 in the config. Maybe we should schedule a 1-1 chat to investigate this further together?

The exception is probably not a real issue. It happens when the DynamoDB connector tries to determine how much memory should be available on the worker node. Depending on the type of node it will read the memory setting from different properties (with a default to 8192 MB). You can remove the exception by adding a file like the following to your Spark master node at the path /mnt/var/lib/info/job-flow.json:

{
  "jobFlowId": "j-2AO77MNLG17NW",
  "jobFlowCreationInstant": 1429046932628,
  "instanceCount": 2,
  "masterInstanceId": "i-08dea4f4",
  "masterPrivateDnsName": "localhost",
  "masterInstanceType": "m1.medium",
  "slaveInstanceType": "m1.xlarge",
  "hadoopVersion": "2.4.0",
  "instanceGroups": [
    {
      "instanceGroupId": "ig-16NXM94TY33LB",
      "instanceGroupName": "CORE",
      "instanceRole": "Core",
      "marketType": "OnDemand",
      "instanceType": "m3.xlarge",
      "requestedInstanceCount": 1
    },
    {
      "instanceGroupId": "ig-2XQ29JGCTKLBL",
      "instanceGroupName": "MASTER",
      "instanceRole": "Master",
      "marketType": "OnDemand",
      "instanceType": "m1.medium",
      "requestedInstanceCount": 1
    }
  ]
}

(feel free to change the instanceType properties to your actual instance types, but keep in mind that anyway we do not currently configure the Hadoop job properties to set the memory settings, so the default value will always be used (8192 MB).