apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.48k stars 3.52k forks source link

[Java] PySpark3 with pandas 1.1.5 and pyarrow 2.0.0 getting the below error #29636

Open asfimport opened 3 years ago

asfimport commented 3 years ago

While running pyspark3 with pandas 1.1.5 and pyarrow 2.0.0 getting the below error:

Spark Code:


import pyarrow
import pandas as pd

df = pd.DataFrame({'col1' : [1,2,3], 'col2': [4,5,6]})
df_sp = spark.createDataFrame(df)
df_sp.cache().count()
schema = df_sp.schema

def dummy_udf(data):
 return data

res = df_sp.groupby('col1').applyInPandas(dummy_udf, schema=schema)
print(res.cache().count())
print(res.toPandas())

Exception:


21/09/17 07:28:10 ERROR util.Utils: Uncaught exception in thread stdout writer for python3
 java.lang.NoSuchMethodError: com.google.flatbuffers.FlatBufferBuilder.createString(Ljava/lang/CharSequence;)I
 at org.apache.arrow.vector.types.pojo.Field.getField(Field.java:204)
 at org.apache.arrow.vector.types.pojo.Schema.getSchema(Schema.java:178)
 at org.apache.arrow.vector.ipc.message.MessageSerializer.serializeMetadata(MessageSerializer.java:187)
 at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:165)
 at org.apache.arrow.vector.ipc.ArrowWriter.ensureStarted(ArrowWriter.java:159)
 at org.apache.arrow.vector.ipc.ArrowWriter.start(ArrowWriter.java:112)
 at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$writeIteratorToStream$1(ArrowPythonRunner.scala:86)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
 at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:103)
 at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:397)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
 at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:232)
 21/09/17 07:28:10 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout writer for python3,5,main]
 java.lang.NoSuchMethodError: com.google.flatbuffers.FlatBufferBuilder.createString(Ljava/lang/CharSequence;)I
 at org.apache.arrow.vector.types.pojo.Field.getField(Field.java:204)
 at org.apache.arrow.vector.types.pojo.Schema.getSchema(Schema.java:178)
 at org.apache.arrow.vector.ipc.message.MessageSerializer.serializeMetadata(MessageSerializer.java:187)
 at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:165)
 at org.apache.arrow.vector.ipc.ArrowWriter.ensureStarted(ArrowWriter.java:159)
 at org.apache.arrow.vector.ipc.ArrowWriter.start(ArrowWriter.java:112)
 at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$writeIteratorToStream$1(ArrowPythonRunner.scala:86)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
 at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:103)
 at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:397)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
 at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:232)
 21/09/17 07:28:10 WARN storage.BlockManager: Putting block rdd_25_69 failed due to exception org.apache.spark.SparkException: Python worker exited unexpectedly (crashed).
 21/09/17 07:28:10 INFO memory.MemoryStore: MemoryStore cleared
 21/09/17 07:28:10 INFO storage.BlockManager: BlockManager stopped
 21/09/17 07:28:10 INFO util.ShutdownHookManager: Shutdown hook called

Reporter: Ranga Reddy

Note: This issue was originally created as ARROW-14038. Please see the migration documentation for further details.

asfimport commented 3 years ago

Ranga Reddy: While checking createString() method implementations in FlatBufferBuilder.java class, it has two methods one is accepting CharSequence and another one is accepting the ByteBuffer as a argument.


public int createString(CharSequence s) {

}

public int createString(ByteBuffer s) {

}

While checking getField() method implementation in Field.java class, here it is passed String value.


public class Field {
   private final String name;
   public int getField(FlatBufferBuilder builder)
   { 
     int nameOffset = name == null ? -1 : builder.createString(name); 
   }
}

To fix this issue, we need to pass either CharSequence or ByteBuffer as the argument in getField() method.

Solution:


public int getField(FlatBufferBuilder builder) {
   java.nio.ByteBuffer bb = java.nio.ByteBuffer.wrap(name.getBytes());
   int nameOffset = name == null ? -1 : builder.createString(bb);
   .......
}

 

asfimport commented 3 years ago

Ranga Reddy: If one of the committer allows my PR, i will contribute the code changes.