scylladb / scylla-migrator

Migrate data extract using Spark to Scylla, normally from Cassandra
Apache License 2.0
54 stars 34 forks source link

Support skipping the snapshot transfer when streaming changes #129

Closed julienrf closed 2 months ago

julienrf commented 2 months ago

Fixes #120

julienrf commented 2 months ago

Successfully tested with my own AWS account (can not run the test in the CI because of #113).

Test output ~~~ text Table status is ACTIVE Read item from source database: Map(foo -> {S: bar,}, id -> {S: 12345,}) 24/04/18 08:53:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 24/04/18 08:53:57 INFO SparkContext: Running Spark version 2.4.4 24/04/18 08:53:57 INFO SparkContext: Submitted application: scylla-migrator 24/04/18 08:53:57 INFO SecurityManager: Changing view acls to: root 24/04/18 08:53:57 INFO SecurityManager: Changing modify acls to: root 24/04/18 08:53:57 INFO SecurityManager: Changing view acls groups to: 24/04/18 08:53:57 INFO SecurityManager: Changing modify acls groups to: 24/04/18 08:53:57 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set() 24/04/18 08:53:57 INFO Utils: Successfully started service 'sparkDriver' on port 40433. 24/04/18 08:53:57 INFO SparkEnv: Registering MapOutputTracker 24/04/18 08:53:57 INFO SparkEnv: Registering BlockManagerMaster 24/04/18 08:53:57 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 24/04/18 08:53:57 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 24/04/18 08:53:57 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-e99b229b-65a2-414d-90fb-81ab6366ab5f 24/04/18 08:53:57 INFO MemoryStore: MemoryStore started with capacity 366.3 MB 24/04/18 08:53:57 INFO SparkEnv: Registering OutputCommitCoordinator 24/04/18 08:53:57 INFO Utils: Successfully started service 'SparkUI' on port 4040. 24/04/18 08:53:57 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://spark-master:4040 24/04/18 08:53:57 INFO SparkContext: Added JAR file:/jars/scylla-migrator-assembly-0.0.1.jar at spark://spark-master:40433/jars/scylla-migrator-assembly-0.0.1.jar with timestamp 1713430437915 24/04/18 08:53:58 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://spark-master:7077... 24/04/18 08:53:58 INFO TransportClientFactory: Successfully created connection to spark-master/172.19.0.3:7077 after 82 ms (0 ms spent in bootstraps) 24/04/18 08:53:58 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20240418085358-0001 24/04/18 08:53:58 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20240418085358-0001/0 on worker-20240418084856-172.19.0.7-35565 (172.19.0.7:35565) with 3 core(s) 24/04/18 08:53:58 INFO StandaloneSchedulerBackend: Granted executor ID app-20240418085358-0001/0 on hostPort 172.19.0.7:35565 with 3 core(s), 1024.0 MB RAM 24/04/18 08:53:58 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 33707. 24/04/18 08:53:58 INFO NettyBlockTransferService: Server created on spark-master:33707 24/04/18 08:53:58 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 24/04/18 08:53:58 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20240418085358-0001/0 is now RUNNING 24/04/18 08:53:58 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, spark-master, 33707, None) 24/04/18 08:53:58 INFO BlockManagerMasterEndpoint: Registering block manager spark-master:33707 with 366.3 MB RAM, BlockManagerId(driver, spark-master, 33707, None) 24/04/18 08:53:58 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, spark-master, 33707, None) 24/04/18 08:53:58 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, spark-master, 33707, None) 24/04/18 08:53:58 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 24/04/18 08:54:01 INFO migrator: Loaded config: MigratorConfig(DynamoDB(None,Some(eu-central-1),Some(AWSCredentials(AKI..., )),StreamedItems,None,None,None,Some(1)),DynamoDB(Some(DynamoDBEndpoint(http://scylla,8000)),None,Some(AWSCredentials(dum..., )),StreamedItems,None,None,None,Some(1),true,Some(true)),List(),Savepoints(300,/app/savepoints),Set(),Validation(true,60000,1000,100,0.001,0)) 24/04/18 08:54:03 WARN ApacheUtils: NoSuchMethodException was thrown when disabling normalizeUri. This indicates you are using an old version (< 4.5.8) of Apache http client. It is recommended to use http client version >= 4.5.9 to avoid the breaking change introduced in apache client 4.5.7 and the latency in exception handling. See https://github.com/aws/aws-sdk-java/issues/1919 for more information 24/04/18 08:54:05 INFO alternator: We need to transfer: 1 partitions in total 24/04/18 08:54:05 INFO alternator: Starting write... 24/04/18 08:54:05 INFO alternator: Source is a Dynamo table and change streaming requested; enabling Dynamo Stream 24/04/18 08:54:06 INFO DynamoUtils: Stream not yet enabled (status ENABLING); waiting for 5 seconds and retrying 24/04/18 08:54:11 INFO DynamoUtils: Stream enabled successfully 24/04/18 08:54:11 INFO DynamoUtils: Checking for table existence at destination 24/04/18 08:54:11 INFO DynamoUtils: Table StreamedItems does not exist at destination - creating it according to definition: 24/04/18 08:54:11 INFO DynamoUtils: {AttributeDefinitions: [{AttributeName: id,AttributeType: S}],TableName: StreamedItems,KeySchema: [{AttributeName: id,KeyType: HASH}],TableStatus: ACTIVE,CreationDateTime: Thu Apr 18 08:53:38 GMT 2024,ProvisionedThroughput: {NumberOfDecreasesToday: 0,ReadCapacityUnits: 5,WriteCapacityUnits: 5},TableSizeBytes: 0,ItemCount: 0,TableArn: arn:aws:dynamodb:eu-central-1:164221962816:table/StreamedItems,TableId: 9f42869e-5edf-40cf-bc4c-a26cef0481fa,} 24/04/18 08:54:11 INFO DynamoUtils: Table StreamedItems created. 24/04/18 08:54:11 INFO alternator: Skip transferring table snapshot 24/04/18 08:54:11 INFO alternator: Starting to transfer changes 24/04/18 08:54:20 INFO DynamoStreamReplication: No changes to apply 24/04/18 08:54:20 WARN FileOutputCommitter: Output Path is null in setupJob() 24/04/18 08:54:22 WARN FileOutputCommitter: Output Path is null in commitJob() 24/04/18 08:54:25 INFO DynamoStreamReplication: No changes to apply 24/04/18 08:54:25 WARN FileOutputCommitter: Output Path is null in setupJob() 24/04/18 08:54:25 WARN FileOutputCommitter: Output Path is null in commitJob() 24/04/18 08:54:30 INFO DynamoStreamReplication: No changes to apply 24/04/18 08:54:30 WARN FileOutputCommitter: Output Path is null in setupJob() 24/04/18 08:54:30 WARN FileOutputCommitter: Output Path is null in commitJob() Checking that target database contains initial data Inserted more data 24/04/18 08:54:35 INFO DynamoStreamReplication: No changes to apply 24/04/18 08:54:35 WARN FileOutputCommitter: Output Path is null in setupJob() 24/04/18 08:54:35 WARN FileOutputCommitter: Output Path is null in commitJob() 24/04/18 08:54:40 INFO DynamoStreamReplication: No changes to apply 24/04/18 08:54:40 WARN FileOutputCommitter: Output Path is null in setupJob() 24/04/18 08:54:40 WARN FileOutputCommitter: Output Path is null in commitJob() 24/04/18 08:54:45 INFO DynamoStreamReplication: Changes to be applied: 24/04/18 08:54:45 INFO DynamoStreamReplication: UPSERT: 2 24/04/18 08:54:45 WARN FileOutputCommitter: Output Path is null in setupJob() 24/04/18 08:54:45 WARN FileOutputCommitter: Output Path is null in commitJob() 24/04/18 08:54:50 INFO DynamoStreamReplication: No changes to apply 24/04/18 08:54:50 WARN FileOutputCommitter: Output Path is null in setupJob() 24/04/18 08:54:50 WARN FileOutputCommitter: Output Path is null in commitJob() 24/04/18 08:54:55 INFO DynamoStreamReplication: No changes to apply 24/04/18 08:54:55 WARN FileOutputCommitter: Output Path is null in setupJob() 24/04/18 08:54:55 WARN FileOutputCommitter: Output Path is null in commitJob() 24/04/18 08:55:00 INFO DynamoStreamReplication: No changes to apply 24/04/18 08:55:00 WARN FileOutputCommitter: Output Path is null in setupJob() 24/04/18 08:55:00 WARN FileOutputCommitter: Output Path is null in commitJob() Stopping the Spark session Exception in thread "Thread-9" java.io.IOException: Stream closed | => tat java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170) at java.io.BufferedInputStream.read1(BufferedInputStream.java:283) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) at java.io.FilterInputStream.read(FilterInputStream.java:107) at scala.sys.process.BasicIO$.loop$1(BasicIO.scala:234) at scala.sys.process.BasicIO$.transferFullyImpl(BasicIO.scala:242) at scala.sys.process.BasicIO$.transferFully(BasicIO.scala:223) at scala.sys.process.BasicIO$$anonfun$toStdErr$1.apply(BasicIO.scala:212) at scala.sys.process.BasicIO$$anonfun$toStdErr$1.apply(BasicIO.scala:212) at scala.sys.process.ProcessBuilderImpl$Simple$$anonfun$4.apply$mcV$sp(ProcessBuilderImpl.scala:77) at scala.sys.process.ProcessImpl$Spawn$$anon$1.run(ProcessImpl.scala:23) Checking that the latest items are in the target database com.scylladb.migrator.alternator.StreamedItemsTest: + Stream changes 92.02s ~~~
tarzanek commented 2 months ago

awesome, @fee-mendes is this closer to what you asked for? merging, thnx

fee-mendes commented 2 months ago

Yes, perfect

Em sex., 26 de abr. de 2024 às 05:34, Lubos Kosco @.***> escreveu:

Merged #129 https://github.com/scylladb/scylla-migrator/pull/129 into master.

— Reply to this email directly, view it on GitHub https://github.com/scylladb/scylla-migrator/pull/129#event-12620480466, or unsubscribe https://github.com/notifications/unsubscribe-auth/ATX3AZSRBE6IGWO3DYKOKA3Y7IGRJAVCNFSM6AAAAABGMZEVI2VHI2DSMVQWIX3LMV45UABCJFZXG5LFIV3GK3TUJZXXI2LGNFRWC5DJN5XDWMJSGYZDANBYGA2DMNQ . You are receiving this because you were mentioned.Message ID: @.***>