grouzen / zio-apache-arrow

ZIO-powered Apache Arrow library
https://mnedokushev.me/zio-apache-arrow/
Apache License 2.0
8 stars 1 forks source link

Using Apache Arrow with Iceberg #61

Open calvinlfer opened 4 months ago

calvinlfer commented 4 months ago

Hey there,

I was trying to use zio-apache-arrow with the iceberg standalone Java API as they expose a VectorSchemaRoot iterator over their table. The code ends up looking like this:

import me.mnedokushev.zio.apache.arrow.core.codec.VectorSchemaRootDecoder
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.iceberg.Table
import org.apache.iceberg.arrow.vectorized.{ColumnarBatch, VectorizedTableScanIterable}
import org.apache.iceberg.aws.glue.GlueCatalog
import org.apache.iceberg.catalog.{Namespace, TableIdentifier}
import org.apache.iceberg.expressions.Expressions
import zio.schema.{DeriveSchema, Schema}

import scala.jdk.CollectionConverters.*

 val catalog = GlueCatalog()
    catalog.initialize("glue_catalog", Map.empty.asJava)
    val namespace: Namespace = Namespace.of("exampledb")
    val table: Table         = catalog.loadTable(TableIdentifier.of(namespace, "icebergtable"))
    val scan =
      table
        .newScan()
        .select("date", "hour", "minute")
        .filter(
          Expressions.and(
            Expressions.equal("date", "2024-01-20"),
            Expressions.and(
              Expressions.equal("hour", 10),
              Expressions.equal("minute", 0)
            )
          )
        )

    val batchSize              = 1024
    val reuseContainers        = false
    val vectorizedScanIterable = VectorizedTableScanIterable(scan, batchSize, reuseContainers)

    vectorizedScanIterable.asScala.foreach { (batch: ColumnarBatch) =>
      val root: VectorSchemaRoot = batch.createVectorSchemaRootFromVectors()

      println(root.getSchema)
      println(root.contentToTSVString())
      println(root.getFieldVectors.asScala)

      println(
        VectorSchemaRootDecoder[Event].decode(root).left.map(_.getCause)
      )
    }

final case class Event(date: String, hour: Int, minute: Int)
object Event:
  given zio.Schema[Event] = DeriveSchema.gen[Event]

I get an error because the table format is Schema<date: Utf8, hour: Int(32, true), minute: Int(32, true)> and what the Decoder ends up deriving is Schema<date: Utf8 not null, hour: Int(32, true) not null, minute: Int(32, true) not null>.

Schema<date: Utf8, hour: Int(32, true), minute: Int(32, true)>
Left(me.mnedokushev.zio.apache.arrow.core.ValidationError: Schemas are not equal Schema<date: Utf8 not null, hour: Int(32, true) not null, minute: Int(32, true) not null> != Schema<date: Utf8, hour: Int(32, true), minute: Int(32, true)>)

However, I debugged and stepped through the code and I observed that it's actually able to decode perfectly

image

The only problem is that the schema doesn't match so it takes the second branch. Is there a way to make the derived Arrow schema more relaxed to allow this use case?

Thank you for this really useful library!

grouzen commented 4 months ago

Hello! Thanks for trying the library! It is very alpha and definitely not production-ready yet :). I've never thought about using it with Apache Iceberg, which could be a good candidate to integrate with, actually! Thanks for this idea. At first glance, it looks like the schema of your table consists of nullable fields, but the Event class you defined contains non-optional fields. Try to make the fields in your class optional as follows:

final case class Event(date: Option[String], hours: Option[Int], minutes: Option[Int])
calvinlfer commented 4 months ago

Hey @grouzen thank you for looking into this! I tried adding options to all the fields but I ended up getting

Error: me.mnedokushev.zio.apache.arrow.core.codec.DecoderError: Unsupported ZIO Schema type Optional(Primitive(string,Chunk()),Chunk())

I ended up handcoding it for now and it worked properly:

final case class Event(
  date: String,
  hour: Int,
  minute: Int,
  event_name: String,
  event_value: String,
  event_time: OffsetDateTime
)
object Event:
  given decoder: VectorSchemaRootDecoder[Event] =
    (root: VectorSchemaRoot) =>
      val dates      = root.getVector("date").getReader
      val hours      = root.getVector("hour").getReader
      val minutes    = root.getVector("minute").getReader
      val eventTime  = root.getVector("event_time").getReader
      val eventName  = root.getVector("event_name").getReader
      val eventValue = root.getVector("event_value").getReader

      var current = 0
      val len     = root.getRowCount
      val builder = ChunkBuilder.make[Event](len)
      while current < len do
        dates.setPosition(current)
        hours.setPosition(current)
        minutes.setPosition(current)
        eventTime.setPosition(current)
        eventName.setPosition(current)
        eventValue.setPosition(current)

        builder += Event(
          date = dates.readText().toString,
          hour = hours.readInteger(),
          minute = minutes.readInteger,
          event_time = Instant.ofEpochMilli(eventTime.readLong() / 1000 /* micros to millis */ ).atOffset(ZoneOffset.UTC),
          event_name = eventName.readText().toString,
          event_value = eventValue.readText().toString
        )
        current += 1

      builder.result()
grouzen commented 4 months ago

Oh yeah, I just realized that optional fields are not supported yet. Sorry. I have not touched the library for a couple of months ;) I have plans to do a huge refactoring once I finish with https://github.com/grouzen/zio-apache-parquet, which is kind of a playground where I'm testing some more advanced features of ZIO Schema.

calvinlfer commented 4 months ago

No worries my friend! I’m glad you are working in this space. I’ve connected with you on discord - maybe our paths will cross soon