eto-ai / rikai

Parquet-based ML data format optimized for working with unstructured data
https://rikai.readthedocs.io/en/latest/
Apache License 2.0
136 stars 19 forks source link

Cannot use Image with pandas_udf #90

Open chunyang opened 3 years ago

chunyang commented 3 years ago

I tried to pass an Image into a pandas_udf but got an error.

Code:

import pandas as pd
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import FloatType
from rikai.types.vision import Image

@pandas_udf(returnType=FloatType())
def foo(images: pd.Series) -> pd.Series:
    return images.map(lambda _: 3.14)

spark = (
    SparkSession.builder.master("local")
    .appName("test")
    .config("spark.jars.packages", "ai.eto:rikai_2.12:0.0.2-SNAPSHOT")
    .getOrCreate()
)
spark.udf.register("FOO", foo)

test_data = [
    Row(
        image=Image(
            "https://upload.wikimedia.org/wikipedia/commons/thumb/a/ad/Commodore_Grace_M._Hopper%2C_USN_%28covered%29.jpg/819px-Commodore_Grace_M._Hopper%2C_USN_%28covered%29.jpg"
        )
    )
]
test_data_df = spark.createDataFrame(test_data)
test_data_df.createOrReplaceTempView("test_data")
df = spark.sql("SELECT FOO(image) FROM test_data")
df.show()

Error:

21/02/16 19:37:06 ERROR ArrowPythonRunner: Python worker exited unexpectedly (crashed)                                                                   
org.apache.spark.api.python.PythonException: Traceback (most recent call last):                                                                          
  File "/home/chuck.yang/src/repro/venv/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 585, in main              
    eval_type = read_int(infile)                                                                                                                         
  File "/home/chuck.yang/src/repro/venv/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 595, in read_int     
    raise EOFError                                                                                                                                       
EOFError                                                                                                                                                 

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)                                     
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:99)                                              
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)                                              
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)                                                   
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.UnsupportedOperationException: Unsupported data type: struct<uri:string>
        at org.apache.spark.sql.util.ArrowUtils$.toArrowType(ArrowUtils.scala:57)
        at org.apache.spark.sql.util.ArrowUtils$.toArrowField(ArrowUtils.scala:103)
        at org.apache.spark.sql.util.ArrowUtils$.$anonfun$toArrowSchema$1(ArrowUtils.scala:132)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at org.apache.spark.sql.types.StructType.map(StructType.scala:102)
        at org.apache.spark.sql.util.ArrowUtils$.toArrowSchema(ArrowUtils.scala:131)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:76)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)
21/02/16 19:37:06 ERROR ArrowPythonRunner: This may have been caused by a prior exception:
java.lang.UnsupportedOperationException: Unsupported data type: struct<uri:string>
        at org.apache.spark.sql.util.ArrowUtils$.toArrowType(ArrowUtils.scala:57)
        at org.apache.spark.sql.util.ArrowUtils$.toArrowField(ArrowUtils.scala:103)
        at org.apache.spark.sql.util.ArrowUtils$.$anonfun$toArrowSchema$1(ArrowUtils.scala:132)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at org.apache.spark.sql.types.StructType.map(StructType.scala:102)
        at org.apache.spark.sql.util.ArrowUtils$.toArrowSchema(ArrowUtils.scala:131)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:76)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)
21/02/16 19:37:06 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.UnsupportedOperationException: Unsupported data type: struct<uri:string>
        at org.apache.spark.sql.util.ArrowUtils$.toArrowType(ArrowUtils.scala:57)
        at org.apache.spark.sql.util.ArrowUtils$.toArrowField(ArrowUtils.scala:103)
        at org.apache.spark.sql.util.ArrowUtils$.$anonfun$toArrowSchema$1(ArrowUtils.scala:132)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at org.apache.spark.sql.types.StructType.map(StructType.scala:102)
        at org.apache.spark.sql.util.ArrowUtils$.toArrowSchema(ArrowUtils.scala:131)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:76)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)
21/02/16 19:37:06 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 10.74.33.69, executor driver): java.lang.UnsupportedOperationException: Unsupported data type: struct<uri:string>
        at org.apache.spark.sql.util.ArrowUtils$.toArrowType(ArrowUtils.scala:57)                                                                        
        at org.apache.spark.sql.util.ArrowUtils$.toArrowField(ArrowUtils.scala:103)                                                                      
        at org.apache.spark.sql.util.ArrowUtils$.$anonfun$toArrowSchema$1(ArrowUtils.scala:132)                                                          
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)                                                                    
        at scala.collection.Iterator.foreach(Iterator.scala:941)                                                                                         
        at scala.collection.Iterator.foreach$(Iterator.scala:941)                                                                                        
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)                                                                                
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)                                                                                  
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)                                                                                 
        at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)                                                                           
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)                                                                               
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)                                                                              
        at org.apache.spark.sql.types.StructType.map(StructType.scala:102)                                                                               
        at org.apache.spark.sql.util.ArrowUtils$.toArrowSchema(ArrowUtils.scala:131)                                                                     
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:76)                             
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)                                              
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)                                                                          
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)                                                         

21/02/16 19:37:06 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job                                                                 
Traceback (most recent call last):                                  (0 + 0) / 1]                                                                         
  File "repro/repro.py", line 31, in <module>                                                                                                         
    df.show()                                                                                                                                            
  File "/home/chuck.yang/src/repro/venv/lib/python3.7/site-packages/pyspark/sql/dataframe.py", line 440, in show                                      
    print(self._jdf.showString(n, 20, vertical))                                                                                                         
  File "/home/chuck.yang/src/repro/venv/lib/python3.7/site-packages/py4j/java_gateway.py", line 1305, in __call__                                     
    answer, self.gateway_client, self.target_id, self.name)                                                                                              
  File "/home/chuck.yang/src/repro/venv/lib/python3.7/site-packages/pyspark/sql/utils.py", line 128, in deco                                          
    return f(*a, **kw)                                                                                                                                   
  File "/home/chuck.yang/src/repro/venv/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value                                  
    format(target_id, ".", name), value)                                                                                                                 
py4j.protocol.Py4JJavaError: An error occurred while calling o63.showString.                                                                             
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 
(TID 0, 10.74.33.69, executor driver): java.lang.UnsupportedOperationException: Unsupported data type: struct<uri:string>                                
        at org.apache.spark.sql.util.ArrowUtils$.toArrowType(ArrowUtils.scala:57)                                                                        
        at org.apache.spark.sql.util.ArrowUtils$.toArrowField(ArrowUtils.scala:103)                                                                      
        at org.apache.spark.sql.util.ArrowUtils$.$anonfun$toArrowSchema$1(ArrowUtils.scala:132)                                                          
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)                                                                    
        at scala.collection.Iterator.foreach(Iterator.scala:941)                                                                                         
        at scala.collection.Iterator.foreach$(Iterator.scala:941)                                                                                        
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)                                                                                
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)                                                                                  
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)                                                                                 
        at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)                                                                           
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)                                                                               
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)                                                                              
        at org.apache.spark.sql.types.StructType.map(StructType.scala:102)                                                                               
        at org.apache.spark.sql.util.ArrowUtils$.toArrowSchema(ArrowUtils.scala:131)                                                                     
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:76)                             
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)                                              
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)                                                                          
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)

Driver stacktrace:                                                                                                                                       
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)                                                  
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)                                                        
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)                                                
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)                                                                      
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)                                                                     
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)                                                                            
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
        at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.UnsupportedOperationException: Unsupported data type: struct<uri:string>
        at org.apache.spark.sql.util.ArrowUtils$.toArrowType(ArrowUtils.scala:57)
        at org.apache.spark.sql.util.ArrowUtils$.toArrowField(ArrowUtils.scala:103)
        at org.apache.spark.sql.util.ArrowUtils$.$anonfun$toArrowSchema$1(ArrowUtils.scala:132)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at org.apache.spark.sql.types.StructType.map(StructType.scala:102)
        at org.apache.spark.sql.util.ArrowUtils$.toArrowSchema(ArrowUtils.scala:131)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:76)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)

Using 0.0.2-SNAPSHOT built from lei/sql_ml branch.

eddyxu commented 3 years ago

Can we use implicit class to add conversions to spark's ArrayUtils?

eddyxu commented 3 years ago

Filed https://issues.apache.org/jira/browse/SPARK-34600