scylladb / scylla-migrator

Migrate data extract using Spark to Scylla, normally from Cassandra
Apache License 2.0
54 stars 34 forks source link

[Alternator] ensure streamChanges skips transferring changes #69

Closed hopugop closed 2 years ago

hopugop commented 2 years ago

Even though having streamChanges = False in both Source (DynamoDB) and Target (Alternator), Migrator stll tries to transfer changes:

22/02/28 15:25:36 INFO migrator: We need to transfer: 1 partitions in total                                                                                                                                                       
22/02/28 15:25:36 INFO migrator: Starting write...                                                                                                                                                                                
22/02/28 15:25:36 INFO DynamoUtils: Checking for table existence at destination                                                                                                                                                   
22/02/28 15:25:36 INFO DynamoUtils: Table migtest exists at destination                                                                                                                                                           
22/02/28 15:25:36 INFO DynamoDB: Schema after renames:                                                                                                                                                                            
root                                                                                                                                                                                                                              
 |-- City: string (nullable = true)                                                                                                                                                                                               
 |-- Date: string (nullable = true)     
22/02/28 15:25:38 INFO migrator: Done transferring table snapshot. Starting to transfer changes                                                                                                                                   
22/02/28 15:25:40 INFO DynamoStreamReplication: Changes to be applied:                                                                                                                                                            
+---------------+-----+                                                                                                                                                                                                           
|_dynamo_op_type|count|                                                                                                                                                                                                           
+---------------+-----+                                                                                                                                                                                                           
+---------------+-----+                                                                                                                                                                                                           

22/02/28 15:25:41 INFO DynamoDB: Schema after renames:                                                                                                                                                                            
root                                                                                                                                                                                                                              
 |-- City: string (nullable = true)                                                                                                                                                                                               
 |-- Date: string (nullable = true)                                                                                                                                                                                               
 |-- _dynamo_op_type: boolean (nullable = true)                                                                                                                                                                                   

22/02/28 15:25:45 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: The security token included in the request is invalid. (
Service: AmazonDynamoDBv2; Status Code: 400; Error Code: UnrecognizedClientException; Request ID: DN0FN8HAHE4O253F5SKDAKSUJ7VV4KQNSO5AEMVJF66Q9ASUAAJG)                                                                           
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1799)                                                                                                                    
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1383)                                                                                                             
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1359)     
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)         
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)       
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)                                                                                                                                 
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)                                                                                                                                      at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)                                                                                                                     
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)                                                                                                                                                 
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)                                                                                                                                                 
        at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:5110)                                                                                                                        
        at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:5077)                                                                                                                          
        at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.executeDescribeTable(AmazonDynamoDBClient.java:1981)                                                                                                            
        at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.describeTable(AmazonDynamoDBClient.java:1947)                                                                                                                   
        at org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:191)                                                                                                                                  
        at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)                                                                                                                     
        at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)                                                                                                                             
        at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:601)                                                                                               
        at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:591)                                                                                               
        at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)                                                                                                                                               
        at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)                                                                                                                                               
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)                                                                                                                                                     
        at org.apache.spark.scheduler.Task.run(Task.scala:123)                                                                                                                                                                    
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)                                                                                                                                    
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)                                     
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
hopugop commented 2 years ago

This change worked out fine for me, let me know if you need further info:

--- a/src/main/scala/com/scylladb/migrator/Migrator.scala
+++ b/src/main/scala/com/scylladb/migrator/Migrator.scala
@@ -151,20 +155,22 @@ object Migrator {

           sourceAndDescriptions.foreach {
             case (source, sourceDesc, targetDesc) =>
-              log.info("Done transferring table snapshot. Starting to transfer changes")
-
-              DynamoStreamReplication.createDStream(
-                spark,
-                streamingContext,
-                source,
-                target,
-                sourceDF.dataFrame.schema,
-                sourceDesc,
-                targetDesc,
-                migratorConfig.renames)
-
-              streamingContext.start()
-              streamingContext.awaitTermination()
+              if (target.streamChanges) {
+                log.info("Done transferring table snapshot. Starting to transfer changes")
+
+                DynamoStreamReplication.createDStream(
+                  spark,
+                  streamingContext,
+                  source,
+                  target,
+                  sourceDF.dataFrame.schema,
+                  sourceDesc,
+                  targetDesc,
+                  migratorConfig.renames)
+
+                streamingContext.start()
+                streamingContext.awaitTermination()
+              }
           }
       }
     } catch {
hopugop commented 2 years ago

https://github.com/scylladb/scylla-migrator/pull/77 merged, closing.