RedisLabs / spark-redis

A connector for Spark that allows reading and writing to/from Redis cluster
BSD 3-Clause "New" or "Revised" License
935 stars 368 forks source link

Does jar file spark-redis_2.11-2.6.0-SNAPSHOT-jar-with-dependencies.jar work with Spark 3.1.1? #299

Closed michTalebzadeh closed 3 years ago

michTalebzadeh commented 3 years ago

Hi,

New to Redis. Testing Redis-Spark connector

I have put jar file that I made

in directory $SPARK_HOME/jars.

The redis version is

Redis server v=6.2.1 sha=00000000:0 malloc=jemalloc-5.1.0 bits=64 build=698eb4abaa66a5f9

and spark version is 3.1.1

Wrote a basic Python code in PyCharm as follows:

     spark_session = SparkSession.builder \
             .appName(appName) \
             .config("spark.redis.host", "rhes76") \
             .config("spark.redis.port", "6379") \
             .config("spark.redis.auth", "hduser") \
             .config("spark.redis.db", 1)  \
             .getOrCreate()

       dept = [("Finance", 10), ("Marketing", 20), ("Sales", 30), ("IT", 40) ]
        deptColumns = ["dept_name", "dept_id"]
        deptDF = self.spark.createDataFrame(data=dept, schema=deptColumns)
        deptDF.printSchema()
        deptDF.show()
        deptDF.write.format("org.apache.spark.sql.redis") \
                      .option("table", "testme") \
                      .save()

I get this output and error

root
 |-- dept_name: string (nullable = true)
 |-- dept_id: long (nullable = true)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
|Marketing|     20|
|    Sales|     30|
|       IT|     40|
+---------+-------+

Traceback (most recent call last):
  File "/home/hduser/PycharmProjects/DSBQ/src/exampleRedis.py", line 52, in <module>
    redishive.loadIntoRedisTable()
  File "/home/hduser/PycharmProjects/DSBQ/src/exampleRedis.py", line 40, in loadIntoRedisTable
    .option("table", "testme") \
  File "/d4T/hduser/spark-3.1.1-bin-hadoop3.2/python/pyspark/sql/readwriter.py", line 1107, in save
    self._jwrite.save()
  File "/home/hduser/PycharmProjects/DSBQ/venv/lib64/python3.6/site-packages/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/d4T/hduser/spark-3.1.1-bin-hadoop3.2/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/home/hduser/PycharmProjects/DSBQ/venv/lib64/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o65.save.
: java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
    at com.redislabs.provider.redis.RedisConfig.clusterEnabled(RedisConfig.scala:198)
    at com.redislabs.provider.redis.RedisConfig.getNodes(RedisConfig.scala:320)
    at com.redislabs.provider.redis.RedisConfig.getHosts(RedisConfig.scala:236)
    at com.redislabs.provider.redis.RedisConfig.<init>(RedisConfig.scala:135)
    at org.apache.spark.sql.redis.RedisSourceRelation.<init>(RedisSourceRelation.scala:34)
    at org.apache.spark.sql.redis.DefaultSource.createRelation(DefaultSource.scala:21)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

Is this because the jar file was created with Scala 2.1.1 whereas Spark 3.1.1 uses Scala 2.1.2?

Thanks

michTalebzadeh commented 3 years ago

Hi,

I built the jar file with instructions given in

https://github.com/RedisLabs/spark-redis/issues/193

mvn -P scala-2.12 clean package -DskipTests

And it worked.

       dept = [("Finance", 10), ("Marketing", 20), ("Sales", 30), ("IT", 40) ]
        deptColumns = ["dept_name", "dept_id"]
        deptDF = self.spark.createDataFrame(data=dept, schema=deptColumns)
        deptDF.printSchema()
        deptDF.show()
        deptDF.write.format("org.apache.spark.sql.redis") \
                      .option("table", "testme") \
                      .option("key.column", "dept_id") \
                      .mode("overwrite") \
                      .save()

        loadedDf = self.spark.read.format("org.apache.spark.sql.redis") \
                      .option("table", "testme") \
                      .option("key.column", "dept_id") \
                      .option("infer.schema", True) \
                      .load()
        loadedDf.show()

Output

root
 |-- dept_name: string (nullable = true)
 |-- dept_id: long (nullable = true)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
|Marketing|     20|
|    Sales|     30|
|       IT|     40|
+---------+-------+

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|       IT|     40|
|    Sales|     30|
|Marketing|     20|
|  Finance|     10|
+---------+-------+
michTalebzadeh commented 3 years ago

sorted closing it