Kotlin / dataframe

Structured data processing in Kotlin
https://kotlin.github.io/dataframe/overview.html
Apache License 2.0
784 stars 50 forks source link

Allow reading dataframe from an ArrowReader #528

Closed fb64 closed 7 months ago

fb64 commented 7 months ago

Some tools/frameworks like datafustion (java binding) or duckdb does not directly expose inputstream of Arrow data but an ArrowReader. It could be useful and not so complicated to add readArrowReader method

Jolanrensen commented 7 months ago

Good idea! I'm not very familiar with Arrow, but it sounds like a reasonable addition. Comparable to reading from JDBC resultSets afaik. I would suggest another name though, as you cannot really "read a reader". Maybe readWithArrowReader or readUsingArrowReader? And additionally an extension function ArrowReader.toDataFrame() for better discoverability.

I do have one concern. Can you read multiple times from the same ArrowReader instance or not? DataFrame is functional in the sense that the same input always yields the same output. This also holds for data sources. Things might become unexpected if reading multiple times from the same reader yields different results.

Jolanrensen commented 7 months ago

Actually DataFrame.readArrow(ArrowReader) could probably work even better

fb64 commented 7 months ago

Ok for DataFrame.readArrow(ArrowReader) I think ArrowReader should be read only once like InputStream it's maybe different for ArrowFileReader as it takes a SeekableByteChannel as input. Should I add ArrowReader.toDataFrame() extenstion in arrowReading.kt file too ? Maybe I can detect if a reader has be already read and throw an exception or return an error. I think it's the same case for InputStream generally it's only read once ....

Jolanrensen commented 7 months ago

It's indeed the same for InputStream good point.

Yes, please put the extension there. Most of our extensions are currently in :core, but since this is an optional supported type, :core cannot depend on it.

I think an exception is a good way to go.

Would it also be possible to create some tests?

fb64 commented 7 months ago

While I added some test I discover that write or read from ArrowIPC change the type of inner column. The following test fail because c column is a LinkedHashMap in a SingletonList on excepted Dataframe and a single String in an Arraylist on readIpc object May I should create a specific issue for that ....

    @Test
    fun testReadIPC(){
        val a by columnOf("one")
        val b by columnOf(2.0)
        val c by listOf(
            mapOf(
                "c1" to Text("inner"),
                "c2" to 4.0,
                "c3" to 50.0,
            ) as Map<String, Any?>
        ).toColumn()
        val d by columnOf("four")
        val expected =  dataFrameOf(a, b, c, d)
        val readIpc = DataFrame.readArrowIPC(expected.saveArrowIPCToByteArray())
        readIpc shouldBe expected
    }

image

image

fb64 commented 7 months ago

Extension and tests have been added but unfortunately it's complicated to detect if an ArrowReader has already be read (or could be once again) as no method exists and the underlying used source (InputStream, Channel, SeekableReadChannel...) is not accessible.

fb64 commented 7 months ago

By the way, an Exception is already thrown (the type of exception dependis of the underlying source) if we try to re-read the Arrow Reader. I don't think that catch and rethrow another exception could be relevant.