eto-ai / rikai

Parquet-based ML data format optimized for working with unstructured data
https://rikai.readthedocs.io/en/latest/
Apache License 2.0
138 stars 19 forks source link

Rikai format did not pass the predicate correctly. #234

Open eddyxu opened 3 years ago

eddyxu commented 3 years ago
# Download coco dataset into "coco" directory
df = convert(spark, "coco")
df.write.format("rikai").save("dataset")

df = spark.read.format("rikai").load("dataset")
df.registerTempTable("df")
df.printSchema()
df.show(1)
root
 |-- image_id: long (nullable = true)
 |-- annotations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- category_id: integer (nullable = true)
 |    |    |-- category_text: string (nullable = true)
 |    |    |-- area: float (nullable = true)
 |    |    |-- bbox: box2d (nullable = true)
 |-- image: image (nullable = true)
 |-- split: string (nullable = true)

+--------+--------------------+--------------------+-----+
|image_id|         annotations|               image|split|
+--------+--------------------+--------------------+-----+
|  414022|[{13, stop sign, ...|Image(uri='/home/...|train|
+--------+--------------------+--------------------+-----+
only showing top 1 row

spark.sql("""SELECT image_id, image, split FROM df WHERE split = 'val'; """).show(5)

---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-4-7f7883882de0> in <module>
      1 spark.sql("SELECT DISTINCT(split) FROM df;").show()
----> 2 spark.sql("""SELECT image_id, image, split FROM df WHERE split = 'val'; """).show(5)

~/miniconda3/envs/coco/lib/python3.8/site-packages/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
    482         """
    483         if isinstance(truncate, bool) and truncate:
--> 484             print(self._jdf.showString(n, 20, vertical))
    485         else:
    486             print(self._jdf.showString(n, int(truncate), vertical))

~/miniconda3/envs/coco/lib/python3.8/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

~/miniconda3/envs/coco/lib/python3.8/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
    115                 # Hide where the exception came from that shows a non-Pythonic
    116                 # JVM exception message.
--> 117                 raise converted from None
    118             else:
    119                 raise

AnalysisException: cannot resolve '`val`' given input columns: [image, image_id, split]; line 1 pos 8;
'Filter (split#127 = 'val)
+- Project [image_id#124L, image#126, split#127]
   +- Relation[image_id#124L,annotations#125,image#126,split#127] parquet
eddyxu commented 3 years ago

The query works if read the dataset using spark.format("parquet").load()

eddyxu commented 3 years ago

I suspect that this is not related to coco dataset. It is about how RikaiRelation passing predicates to parquet.

I have not tried it myself yet, but i do expect something like this should reproduce

df = spark.createDataFrame([Row(id=1, split="train"), Row(id=2, split="eval")])
df.write.format("rikai").save("/tmp/rikai-dataset")

df = spark.read.format("rikai").load("/tmp/rikai-dataset")
df.registerTempView("df")
spark.sql("SELECT * FROM df WHERE split = 'eval'").show()
da-liii commented 3 years ago

Using bin/rikai from rikai-example.

# filename: issue_234.py
# $ bin/rikai issue_234.py
from pyspark.sql import Row

df = spark.createDataFrame([Row(id=1, split="train"), Row(id=2, split="eval")])
df.write.format("rikai").save("/tmp/rikai_example/issue_234")
df = spark.read.format("rikai").load("/tmp/rikai_example/issue_234")
df.registerTempTable("df")
spark.sql("SELECT * FROM df WHERE split = 'eval'").show()
Traceback (most recent call last):
  File "/Users/da/opt/miniconda3/envs/rikai-example/lib/python3.8/site-packages/pyspark/python/pyspark/shell.py", line 75, in <module>
    exec(code)
  File "issue_234.py", line 7, in <module>
    spark.sql("SELECT * FROM df WHERE split = 'eval'").show()
  File "/Users/da/opt/miniconda3/envs/rikai-example/lib/python3.8/site-packages/pyspark/sql/dataframe.py", line 484, in show
    print(self._jdf.showString(n, 20, vertical))
  File "/Users/da/opt/miniconda3/envs/rikai-example/lib/python3.8/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
  File "/Users/da/opt/miniconda3/envs/rikai-example/lib/python3.8/site-packages/pyspark/sql/utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.AnalysisException: cannot resolve '`eval`' given input columns: [id, split]; line 1 pos 8;
'Filter (split#49 = 'eval)
+- Project [id#48L, split#49]
   +- Relation[id#48L,split#49] parquet
da-liii commented 3 years ago

Thanks for the reproducer!

da-liii commented 3 years ago

Here is a unit test reproducer in Scala:

  test("Write and read with predicate") {
    examples.write.rikai(testDir.toString)
    val df = spark.read.rikai(testDir.toString)
    df.createOrReplaceTempView("df")
    val query = spark.sql("select * from df where label = 'car'")
    println(query.queryExecution.logical)
    println(query.queryExecution.analyzed)
    println(query.queryExecution.optimizedPlan)
    query.show()
  }
'Project [*]
+- 'Filter ('label = car)
   +- 'UnresolvedRelation [df], [], false
Project [id#106, label#107]
+- Filter (label#107 = car)
   +- SubqueryAlias df
      +- Relation[id#106,label#107] ai.eto.rikai.RikaiRelation@48b790dd
Filter (isnotnull(label#107) AND (label#107 = car))
+- Relation[id#106,label#107] ai.eto.rikai.RikaiRelation@48b790dd
[info] - Write and read with predicate *** FAILED ***
[info]   org.apache.spark.sql.AnalysisException: cannot resolve '`car`' given input columns: [id, label]; line 1 pos 8;
[info] 'Filter (label#137 = 'car)
[info] +- Project [id#136, label#137]
[info]    +- Relation[id#136,label#137] parquet
[info]   at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:155)
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:152)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:341)
[info]   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:73)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:341)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:338)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:407)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:243)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:405)
da-liii commented 3 years ago

How about https://github.com/eto-ai/rikai/pull/238

What's the difference between rikai and parquet for the latest implementation?