zero-one-group / geni

A Clojure dataframe library that runs on Spark
Apache License 2.0
283 stars 28 forks source link

support for writing arrow files #279

Closed behrica closed 3 years ago

behrica commented 3 years ago

This would be super usefull, I believe. Tech.ml.dataset has very good support for working with arrow files.

(memory mapped and larger then heap)

This would nicely bring Geni and tech.ml.dataset.

I currently can convert with them, using R...

anthony-khong commented 3 years ago

Hi @behrica, I tried to make this happen a couple of times. I think it would be a very good addition to the library. I'm still looking into what the best way of doing it!

behrica commented 3 years ago

Potentially there are 3 ways, I think

  1. Extend Spark itself and make it write arrow files.... I searched for this, but did not find anything
  2. Make an easy tool for converting the Spark output of Parquet files into arrow files (on file system level)
  3. Do "something" in Geni, so the result of "collect" type of calls can be "arrow data structures" (raw bytes or pointers to files or similar) and have tech.ml.dataset ready to create tech.ml.Datasets from this (this is nearly done)
behrica commented 3 years ago

1) and 2) are alternatives. 1) is prefered but maybe imposible to start on. Or maybe we need just to wait until the Spark community does it. For now, they seems to be focussing on writing Parquet only

3 could be useful in addition to 1) or 2)

behrica commented 3 years ago

Can 3) potentially work for "data larger the RAM" ? I don't think so. Spark is not meant to be able to return to the spark driver "large data" (larger then RAM), correct ? The "work with data larger then RAM capabilities" of Spark are constraint to the "workers", correct ?

behrica commented 3 years ago

I explored 2) and I think it can be done rather easily by using the libraries provided by arrow: https://github.com/apache/arrow

I have the impression that the C++,Python, R libraries are more advanced then the Java libraries I looked in detail in to the R package for arrow, and it allows this "transform" in 2 or three lines of code.

But I found "bugs" immediately, but is its foressen and API is there.

behrica commented 3 years ago

Additionally I believe that Spark as optimized data transfers (workers to driver and therefore into a client) into:

but not yet for the JVM (Maybe because there is not a single columnar data structure used by everbody)

behrica commented 3 years ago

I was pointed as well to this by Chris Nuerberger:

http://bytedeco.org/javacpp-presets/arrow/apidocs/

This seems to allow to use the C++ API of arrow from java/clojure easely". This could help for doing 2) and 3)

anthony-khong commented 3 years ago

Can 3) potentially work for "data larger the RAM" ? I don't think so. Spark is not meant to be able to return to the spark driver "large data" (larger then RAM), correct ?

You're right, I think collect wouldn't work here. And converting formats (i.e. option 2) is also not ideal, because you incur the IO costs twice, once to load and once to write, which is usually non-negligible.

I'm still stumped on this one. Best case scenario is for Spark to support the data format, and we can just call the appropriate methods :)

behrica commented 3 years ago

I saw that Chris started to implement parquet support for t.d.m: https://github.com/techascent/tech.ml.dataset/tree/parquet-for-real

That is an other option.

anthony-khong commented 3 years ago

Chris is the hero we need, but don't deserve!

behrica commented 3 years ago

I would still think that implementing 2) and accepting that it will not work for larger then RAM is fine.

I still have a lot of "large data" analysis scenarios in my head in which I would use:

(Even Though your benchmark, shows that Geni is slighly faster the "in-memoy t.d.m ...)

So I think to have a function in Geni, which does this:


(-> df
filter
...
...
collect-as-mmaped-arrow-file
)
could be useful.

This should write the spark data frame as an arrow file (or a partition of arrow files) and return a  "file/directory name".
behrica commented 3 years ago

I would still think that implementing 2) and accepting that it will not work for larger then RAM is fine.

I think even larger then RAM can work: https://www.waitingforcode.com/apache-spark/collecting-part-data-driver-rdd-tolocaiIterator/read_

Not larger then "largest" partition. But that is fine.

behrica commented 3 years ago

So basically it is doing this: https://mikestaszel.com/2018/02/05/writing-huge-csvs-easily-and-efficiently-with-pyspark/ But writing several arrow files, instead of a CSV files.

It will still be slow for very large data, as all data need to flow through the driver. (and crash of too large partitions)

I did not find anything, but it would not surprise me if there is somewhere some java code, which does the RDD to arrow conversion.

behrica commented 3 years ago

Maybe we even can do something else:

The localIterator might allow to:

This is of course slow, but as we can re-partition the RDD before in any number of pieces using Spark, we can control the chunk size

The nippy files can then be opened in TDM and eventually concatinated into a single dataset (if fitting in memory)

behrica commented 3 years ago

I looked into this in detail and I believe that it is indeed doable to write a conversation from:

I would try to go "to arrow" directly. It looks rather simple using the Java JDK directly. https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-streaming-format

The "memory usage" of this is controlled by "2 chunk sizes":

So we would need a function taking as

behrica commented 3 years ago

Maybe this is somehow done in this class already: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala This converts from a https://www.javadoc.io/doc/org.apache.spark/spark-catalyst_2.10/2.2.2/org/apache/spark/sql/catalyst/InternalRow.html

to arrow. The interfcaes are not the same, but have teh same methods....

behrica commented 3 years ago

I have a proof of concept working, and it looks good to me. The transformation of the Spark data frames to arrow files is quite simple.

As a final result, we can have "mixed" pipelines, where the first part is Geni, and the second part is tech.ml.dataset

(println
    (-> data
        (g/limit 1000)
        (first (collect-to-arrow 2000))
        (ds-arrow/read-stream-dataset-in-place )
        (ds/descriptive-stats)
        )
    )
anthony-khong commented 3 years ago

I have a proof of concept working, and it looks good to me.

Really really cool. It's great to see the thought process as you're working on it.

As a final result, we can have "mixed" pipelines, where the first part is Geni, and the second part is tech.ml.dataset

FYI, I'm thinking of working on a Geni-like interfaces but runs on TMD. Very much like how tablecloth tries to add dplyr-like interfaces that runs on TMD. I think it'll smoothen the bridge even more. But yea, it's a big project!

behrica commented 3 years ago

I have a proof of concept working, and it looks good to me.

Really really cool. It's great to see the thought process as you're working on it.

As a final result, we can have "mixed" pipelines, where the first part is Geni, and the second part is tech.ml.dataset

FYI, I'm thinking of working on a Geni-like interfaces but runs on TMD. Very much like how tablecloth tries to add dplyr-like interfaces that runs on TMD. I think it'll smoothen the bridge even more. But yea, it's a big project!

I have seen this very successful in R. The idea of using the same "dplyr" calls on any form of backend. (sparklyr as the example of dplyr syntax for spark)

I can see the usefullness of the goal and I like the way tablecloth is done.

behrica commented 3 years ago

Maybe you can comment on one restriction I want to make, at least initially. Spark dataframes and probably as well Arrow, Support complex types (nested for example)

I would leave this out as a first implementation and only support the simple types:

:string :double :float :long :integer :boolean

We needed to keep in imd that it would require to get complex types working from:

Spark -> Arrow -> t.d.m.

behrica commented 3 years ago

I would like to add as well some integration tests, which use t.d.m to read the arrow files.

This would mean a "test" dependency of Geni to t.d.m What do you think ?

anthony-khong commented 3 years ago

I would leave this out as a first implementation and only support the simple types:

Yes, I think that's sensible.

This would mean a "test" dependency of Geni to t.d.m

Sure, give it a go and put it on the dev profile on the project.clj? My fear is that there are conflicting dependencies between Spark and TMD. But no harm in trying. Do make coverage as soon as you put it in, and see if any of the tests fail?

behrica commented 3 years ago

Having seen this https://github.com/techascent/tech.ml.dataset/pull/156, I should maybe not focussing on TDM as being the natural "target" of this function.

This function should write arrow files, for whoever want to consume them.

We should maybe rethink the "naming". The spark "collect" function is clearly bound by heap space on driver.

While my code is bound by heap space of "largest chunk".

It is based on the Spark function ".toLocalIterator".

behrica commented 3 years ago

I am somehow done with the code and it works for the main data types.

Should I do a PR at this point in time ?

anthony-khong commented 3 years ago

Hi @behrica, absolutely! This would be a great contribution to Geni!

behrica commented 3 years ago

I would leave this out as a first implementation and only support the simple types:

Yes, I think that's sensible.

This would mean a "test" dependency of Geni to t.d.m

Sure, give it a go and put it on the dev profile on the project.clj? My fear is that there are conflicting dependencies between Spark and TMD. But no harm in trying. Do make coverage as soon as you put it in, and see if any of the tests fail?

It looks fine, all test pass. Including TDM has an ugly side effect while running the tests, namely the logging is suddelny on debug, and there is a lot of output. I solved it, by adding a logback.xml to "test-resources"

behrica commented 3 years ago

Can be closed