scylladb / scylla-migrator

Migrate data extract using Spark to Scylla, normally from Cassandra/parquet files. Alt. from DynamoDB to Scylla Alternator.
https://migrator.docs.scylladb.com/stable/
Apache License 2.0
55 stars 34 forks source link

Getting timeout from scylla migrator if system is configured with ipv6 #61

Open amit78del opened 2 years ago

amit78del commented 2 years ago

Hi,

I am trying to run scylla-migrator on my local system with ipv6 configured. But getting timeout error while migrator is trying to select data from source DB.

I have source and target DB running on 9043 and 9044. Both DB are accessible via cqlsh but migrator is returning below error.

21/11/25 11:23:04 WARN TaskSetManager: Lost task 3.0 in stage 0.0 (TID 3, 127.0.1.1, executor 1): java.io.IOException: Exception during execution of SELECT "tenant_id", "created_at", TTL("created_at"), WRITETIME("created_at"), "created_by", TTL("created_by"), WRITETIME("created_by"), "description", TTL("description"), WRITETIME("description"), "tenant_name", TTL("tenant_name"), WRITETIME("tenant_name"), "updated_at", TTL("updated_at"), WRITETIME("updated_at"), "updated_by", TTL("updated_by"), WRITETIME("updated_by") FROM "test_keyspace"."tenant_info" WHERE token("tenant_id") > ? AND token("tenant_id") <= ?   ALLOW FILTERING: Query timed out after PT2M
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:357)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$18.apply(CassandraTableScanRDD.scala:382)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$18.apply(CassandraTableScanRDD.scala:382)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.hasNext(GroupingBatchBuilder.scala:100)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:30)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$2.apply(TableWriter.scala:241)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$2.apply(TableWriter.scala:210)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:129)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
    at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
    at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:188)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:175)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:38)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:38)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.oss.driver.api.core.DriverTimeoutException: Query timed out after PT2M
    at com.datastax.oss.driver.api.core.DriverTimeoutException.copy(DriverTimeoutException.java:34)
    at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:149)
    at com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:53)
    at com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:30)
    at com.datastax.oss.driver.internal.core.session.DefaultSession.execute(DefaultSession.java:210)
    at com.datastax.oss.driver.api.core.cql.SyncCqlSession.execute(SyncCqlSession.java:53)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:43)
    at com.sun.proxy.$Proxy39.execute(Unknown Source)
    at com.datastax.spark.connector.cql.DefaultScanner.scan(Scanner.scala:38)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:349)
    ... 36 more

Please look into this. @tarzanek adding you here as we discussed it.

tarzanek commented 2 years ago

just a note this system is Ubuntu 18.04 and it uses IPv6 (that's what DNS replies for hostname) also it is on wifi - so laptop with dynamic DHCP address

tarzanek commented 2 years ago

is https://github.com/scylladb/scylla-migrator/issues/39 duplicate of this? @sangeethdba did you run on similar setup?

wtibbitts commented 1 year ago

I am seeing this error, but just on one table. Don't see anything particularly special about the table and it has been fine to migrate in the past. However, it is a rather large table and larger than when we last migrated.

22/11/15 16:11:45 INFO Cassandra: TableDef retrieved for source: 22/11/15 16:11:45 INFO Cassandra: TableDef(carbine,geo_location,ArrayBuffer(ColumnDef(account_id,PartitionKeyColumn,VarCharType), ColumnDef(device_id,PartitionKeyColumn,VarCharType)),ArrayBuffer(ColumnDef(location_ts,ClusteringColumn(0,ASC),TimestampType)),Stream(ColumnDef(location_data,RegularColumn,VarCharType), ColumnDef(user_id,RegularColumn,VarCharType)),Stream(),false,false,Map()) 22/11/15 16:11:45 INFO Cassandra: Original schema loaded: root |-- account_id: string (nullable = false) |-- device_id: string (nullable = false) |-- location_ts: timestamp (nullable = false) |-- location_data: string (nullable = true) |-- user_id: string (nullable = true)

22/11/15 16:11:45 INFO Cassandra: ColumnRefs generated for selection: 22/11/15 16:11:45 INFO Cassandra: account_id device_id location_ts location_data ttl(location_data) writetime(location_data) user_id ttl(user_id) writetime(user_id) 22/11/15 16:11:45 INFO Cassandra: Schema generated with TTLs and Writetimes: root |-- account_id: string (nullable = false) |-- device_id: string (nullable = false) |-- location_ts: timestamp (nullable = false) |-- location_data: string (nullable = true) |-- location_data_ttl: integer (nullable = true) |-- location_data_writetime: long (nullable = true) |-- user_id: string (nullable = true) |-- user_id_ttl: integer (nullable = true) |-- user_id_writetime: long (nullable = true)

22/11/15 16:11:45 INFO Cassandra: Schema that'll be used for writing to Scylla: 22/11/15 16:11:45 INFO Cassandra: root |-- account_id: string (nullable = false) |-- device_id: string (nullable = false) |-- location_ts: timestamp (nullable = false) |-- location_data: string (nullable = true) |-- user_id: string (nullable = true) |-- ttl: integer (nullable = true) |-- writetime: long (nullable = true)

22/11/15 16:11:45 INFO migrator: Created source dataframe; resulting schema: root |-- account_id: string (nullable = false) |-- device_id: string (nullable = false) |-- location_ts: timestamp (nullable = false) |-- location_data: string (nullable = true) |-- user_id: string (nullable = true) |-- ttl: integer (nullable = true) |-- writetime: long (nullable = true)