scylladb / scylla-migrator

Migrate data extract using Spark to Scylla, normally from Cassandra/parquet files. Alt. from DynamoDB to Scylla Alternator.
https://migrator.docs.scylladb.com/stable/
Apache License 2.0
55 stars 34 forks source link

support conversion for inet type #13

Closed tarzanek closed 4 years ago

tarzanek commented 4 years ago

Scylla spark migrator doesn’t appear to work for all data types, e.g. migrating a table with an inet column results in all tasks crashing.
For reference: com.datastax.spark.connector.types.TypeConversionException (Cannot convert object 50.233.105.0 of type class org.apache.spark.unsafe.types.UTF8String to java.net.InetAddress.)

as per https://docs.scylladb.com/getting-started/types/ we could leave it as string https://docs.scylladb.com/getting-started/types/ (need test)

tarzanek commented 4 years ago

this might work, needs tests:

diff --git a/src/main/scala/com/scylladb/migrator/converters.scala b/src/main/scala/com/scylladb/migrator/converters.scala
index d76eec0..6ce23d3 100644
--- a/src/main/scala/com/scylladb/migrator/converters.scala
+++ b/src/main/scala/com/scylladb/migrator/converters.scala
@@ -1,14 +1,10 @@
 package com.scylladb.migrator

+import java.net.InetAddress
 import java.nio.charset.StandardCharsets
 import java.util.UUID

-import com.datastax.spark.connector.types.{
-  CustomDriverConverter,
-  NullableTypeConverter,
-  PrimitiveColumnType,
-  TypeConverter
-}
+import com.datastax.spark.connector.types.{CustomDriverConverter, NullableTypeConverter, PrimitiveColumnType, TypeConverter}
 import org.apache.spark.unsafe.types.UTF8String

 import scala.reflect.runtime.universe.TypeTag
@@ -37,6 +33,21 @@ case object CustomUUIDType extends PrimitiveColumnType[UUID] {
     new TypeConverter.OptionToNullConverter(AnotherCustomUUIDConverter)
 }

+case object CustomInetType extends PrimitiveColumnType[String] {
+  def scalaTypeTag = implicitly[TypeTag[String]]
+  def cqlTypeName = "inet"
+  def converterToCassandra =
+    new TypeConverter.OptionToNullConverter(CustomInetAddressConverter)
+}
+
+case object CustomInetAddressConverter extends NullableTypeConverter[String] {
+  def targetTypeTag = implicitly[TypeTag[String]]
+  def convertPF = {
+    case x: InetAddress => x.getHostAddress
+    case x: String => x
+  }
+}
+
 object CustomUUIDConverter extends CustomDriverConverter {
   import org.apache.spark.sql.{ types => catalystTypes }
   import com.datastax.driver.core.DataType
@@ -47,10 +58,13 @@ object CustomUUIDConverter extends CustomDriverConverter {
       CustomTimeUUIDType
     case dataType if dataType.getName == DataType.uuid().getName =>
       CustomUUIDType
+    case dataType if dataType.getName == DataType.inet().getName =>
+      CustomInetType
   }

   override val catalystDataType: PartialFunction[ColumnType[_], catalystTypes.DataType] = {
     case CustomTimeUUIDType => catalystTypes.StringType
     case CustomUUIDType     => catalystTypes.StringType
+    case CustomInetType     => catalystTypes.StringType
   }
 }
iravid commented 4 years ago

@tarzanek The code looks correct to me, assuming Scylla can coerce the string to Inet.

tarzanek commented 4 years ago

same assumption is on my side, so I need to retest to be 100% sure (but per examples it seems '1.1.1.1' as input should work and same for IPV6 IPs)

tarzanek commented 4 years ago

retested with following table:

CREATE KEYSPACE inettest
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

use inettest;

CREATE TABLE loads (
    machine inet,
    cpu int,
    mtime timeuuid,
    load float,
    PRIMARY KEY ((machine, cpu), mtime)
) WITH CLUSTERING ORDER BY (mtime DESC);

INSERT INTO loads(machine, cpu, mtime, load)
VALUES ('192.168.1.2',23,currentTimeUUID(),0.5);

INSERT INTO inettest.loads(machine, cpu, mtime, load)
VALUES ('192.168.1.20',5,now(),0.7);

select * from inettest.loads ;

and works well, data was migrated from above cassandra to new scylla table

tarzanek commented 4 years ago

what is puzzling me though is that there is InetAddressConverter https://github.com/iravid/spark-cassandra-connector/blob/fd3645552ab43c113e2155221010ebc7e2d588a5/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala#L407 BUT it lacks UTF8String conversion so I am wondering if the rest of migrator isn't doing some explicit UTF8 conversion when reading data that shouldn't be there ... @iravid ?