mjakubowski84 / parquet4s

Read and write Parquet in Scala. Use Scala classes as schema. No need to start a cluster.
https://mjakubowski84.github.io/parquet4s/
MIT License
283 stars 65 forks source link

Applying `not in` (`in` negation) filter always resulting in 0 rows #292

Closed isharamet closed 1 year ago

isharamet commented 1 year ago

Hey folks,

Not sure if I'm making something wrong there, but applying not in filter to ParquetReader.Builder and then reading the data always results in 0 rows (while it shouldn't).

Simple app to reproduce the issue:

import com.github.mjakubowski84.parquet4s.{Col, ParquetReader, ParquetWriter, Path}

import java.nio.file.Files
import scala.util.Random

object NotInFilterApp extends App {

  case class Data(id: Int, text: String)

  val data = List(
    Data(1, "first_entry"),
    Data(2, "second_entry"),
    Data(3, "third_entry"),
    Data(4, "forth_entry"),
    Data(5, "fifth_entry")
  )

  val path = Path(Files.createTempDirectory("example"))

  ParquetWriter.of[Data].writeAndClose(path.append("data.parquet"), data)

  val filteredWithIn = ParquetReader.as[Data].filter(Col("id") in Set(1, 2, 3)).read(path)

  println("Filtered with `in`:")
  filteredWithIn.foreach(println) // works as expected, reads 3 records

  val filteredWithNotIn = ParquetReader.as[Data].filter(!(Col("id") in Set(1, 2, 3))).read(path)

  println("Filtered with `not in`:")
  filteredWithNotIn.foreach(println) // empty, while it should read 2 records
}

Reproducible on the lates master and v2.10.0. I'll try to dig deeper, but wasn't able to find the reason for such behaviour yet.

mjakubowski84 commented 1 year ago

Hi @isharamet ,

It looks like a bug. There is a Github issue created to replace the custom in predicate with one recently introduced to Parquet: #272.

isharamet commented 1 year ago

And I believe the error is in inverseCanDrop method of InPredicate:

  override def inverseCanDrop(statistics: Statistics[T]): Boolean = {
    val compare   = statistics.getComparator.compare(_, _)
    val min       = statistics.getMin
    val max       = statistics.getMax
    val isInRange = (value: T) => compare(value, min) >= 0 && compare(value, max) <= 0
    values.exists(isInRange)
  }

While it works for canDrop, in not in scenario it'll drop all the blocks with values from the set, even if blocks might contain other values. So, for example from my original post all values will be stored in a single block (min = 1, max = 5), so this block will be skipped.

SergWh commented 1 year ago

Hi @isharamet,

We faced this bug too and ended up implementing our own NinPredicate similar to https://github.com/mjakubowski84/parquet4s/blob/master/core/src/main/scala/com/github/mjakubowski84/parquet4s/Filter.scala#L323 with both canDrop and inverseCanDrop returning false. I don't see an option to utilize this "drop whole block if values not in range" logic, since for nin it should rather be "accept whole block if values not in range".

This can be a temporary solution until nin predicate is supported.

mjakubowski84 commented 1 year ago

Fix to be released in 2.11.0