martindurant / daskberg

dask client for iceberg (super-alpha)
BSD 3-Clause "New" or "Revised" License
7 stars 0 forks source link

Feedback #1

Open martindurant opened 1 year ago

martindurant commented 1 year ago

cc @Fokko .

This is a super simple implementation of an iceberg client for dask. It works for the limited couple of datasets I have available including

This is POC, but I think an effective one.

That the client works on data locations/paths only, but they can be remote. It is the opposite of pyiceberg in this sense, which talks to the metadata store over thrift or REST.

Note that fastparquet loads decimal columns as float64, and arrow makes python decimal objects (super slow!) even though it supports a decimal type internally.

Fokko commented 1 year ago

Hey @martindurant thanks for pinging me, and this is awesome! Great to see a working Dask integration, and love the name 😄

Looking at the version-hint.txt it looks like your implementation is based around the HadoopCatalog, where all the data lives on HDFS. For PyIceberg we're centered around catalogs because that gives us atomicity when multiple clients are reading/writing (meta)data.

I also see some gaps in the implementation, for example, hidden partitioning which is the secret sauce of Iceberg. But you already mentioned that it is a POC.

Note that fastparquet loads decimal columns as float64, and arrow makes python decimal objects (super slow!) even though it supports a decimal type internally.

I don't think we should choose between PyArrow or fastparquet support, I think we should make them both available.

Once we have https://github.com/apache/iceberg/pull/6069 in, we know which files we need to load, and we can integrate the creation of the dask dataframe into PyIceberg. What do you think?

martindurant commented 1 year ago

Looking at the version-hint.txt it looks like your implementation is based around the HadoopCatalog

More fair to say "filesystem-based datasets", which is more likely to be S3 for me. Getting the same metadata from a service is reasonable, but this method cuts out the middleman.

hidden partitioning which is the secret sauce of Iceberg

Since I don't know what this is, you are clearly right!

I don't think we should choose between PyArrow or fastparquet support

Totally agree, but of course we do need to be aware of the capabilities of both. Since I am responsible for fastparquet, I can make any changes there that might be needed, like the linked PR. Changes in arrow are much harder to achieve (for me, and probably in general).

ddrinka commented 1 year ago

Looking at the version-hint.txt it looks like your implementation is based around the HadoopCatalog, where all the data lives on HDFS. For PyIceberg we're centered around catalogs because that gives us atomicity when multiple clients are reading/writing (meta)data.

@Fokko, for some background here, I've generated some sample data in a public S3 bucket that @martindurant is probably working off of. Configuring cross-account Glue access or a dedicated Nessie server just to provide access to some static tables in S3 is overkill. I realize the criticality of having a catalog for atomicity but in a slow-changing dataset, being able to point Iceberg directly at an S3 location and launching some queries outweighs the complexity of doing it "right", IMO. I think supporting version-hint.text in PyIceberg the same way Spark supports it (with stern cautions that you will lose atomicity if you don't use a catalog) is appropriate.

I'd be interested to hear more about the internal conversations that have led to the current position that a catalog will be required.

ddrinka commented 1 year ago

@martindurant in the sample data we have a date and a time (seconds since midnight EST). We could instead store that data as a datetime. With Hive that would usually mean having a redundant column for date that we can partition over, and requiring queries to consider both date and datetime so that the partition is utilized and the time is available. This is why we originally structured the data into the two columns.

Hidden Partitioning lets you store only a datetime but generate partitions out of only the date portion of that column. There are a few operators available to transform a column into a partition. Those partitions are used automatically in any query against datetime, like magic.

For a full-featured table scan that functionality will be necessary, but hopefully the pyiceberg project will handle that complexity for us soon.

martindurant commented 1 year ago

Hidden Partitioning lets you store only a datetime but generate partitions out of only the date portion of that column. There are a few operators available to transform a column into a partition. Those partitions are used automatically in any query against datetime, like magic.

Oh those partitions. Yes, this is supported, as many transforms as I could be bothered so far ( https://github.com/martindurant/daskberg/blob/main/daskberg/conversions.py#L46 ); but there are not many. So you may have a column called "name" and partition on buckets, and expect a filter like ("name", "==", "fred") to pick the correct partition, the bucket containing that name. I have a (private) demo showing this.

ddrinka commented 1 year ago

The last note I had on the current implementation is it looks like you'll currently open every manifest file from the manifest list and begin scanning for matching partitions. The manifest list also has partition bounds that should be considered so that you only have to open a manifest file if the partition is within that manifest's bounds. For our data, we might be generating a few manifest files per day, and we wouldn't want to retrieve the hundreds of those from S3 just to see if they contain data for the date we're looking for. Instead we should work from the manifest list, identify which manifest files include datafiles within our bounds, and then open those manifest files and restrict further based on the column bounds.

I might be looking past that functionality in the code, but thought I'd mention just in case.

martindurant commented 1 year ago

Ah, I didn't realise that the manifests had partition max-min values - very clever! It will be trivial to deal with those, although harder to engineer some data to test. Furthermore, we should be using fsspec's magic to collect the bytes of all the required manifest avro files concurrently, rather than the current serial approach; but these optimisations can wait.

I have managed to make my own test data, but only with spark-sql. I found an OK tutorial in dremio documentation.

martindurant commented 1 year ago

2

Fokko commented 1 year ago

Sorry for not sharing this earlier, but I always use this to create test-data: https://github.com/tabular-io/docker-spark-iceberg You could mount the table under the same path on your host system. The metadata is available through the REST catalog. This also allows for the easy creation of different partition transforms.

nickvazz commented 1 year ago

Hi @martindurant, I have been using dask for a while now and have recently started playing around with iceberg coming across this thread :)

I have managed to make my own test data, but only with spark-sql. I found an OK tutorial in dremio documentation.

Sorry for not sharing this earlier, but I always use this to create test-data: https://github.com/tabular-io/docker-spark-iceberg

I noticed that pyiceberg can read tables and daskberg will also be able to read tables (haven't tested myself quite yet). Have you found a way to create tables without requiring spark?

I have mostly stayed in the dask ecosystem and would like to if possible 😛

Thanks!

martindurant commented 1 year ago

No, daskberg does not have a write mode, so not for now. It would not be too hard, at least with pure files (rather than the REST server), but I was wanting first to see if there was any interest in this package or not.

I noticed that pyiceberg can read tables

Really?

nickvazz commented 1 year ago

That would be awesome if Daskberg could 😀

I thought it could read but looks like you have to pass the file list to something else. Seems to support pyarrow and duckdb

https://py.iceberg.apache.org/api/#apache-arrow

martindurant commented 1 year ago

looks like you have to pass the file list

I see, it looks like scan.plan_files() from the section above may do what we need for dask. You an pass any list of parquet files to dask's parquet engines, but I wonder whether arrow actually supports schema changes; fastparquet has support for adding and removing fields, not changing (well maybe, but not tested anyway).

martindurant commented 1 year ago

Note that pyiceberg also has no writing. You can create tables/schemas, but not put any data. Actually you can do that with daskberg too. The next pyiceberg community meeting isn't until the 31st, I'll try to find out what their plans and timelines are. It may well still prove worthwhile to maintain this package as a simpler alternative, where I can fold in secret fastparquet sauce.

nickvazz commented 1 year ago

Note that pyiceberg also has no writing. You can create tables/schemas, but not put any data.

Ah yes, after exploring a bit I noticed that as well. I could not really tell exactly what the spark engine does when writing to the table. I assume it let's the catalog/table know all the relevant information but looks like some of the niceties like schema updates might not be fully baked in (https://github.com/apache/iceberg/blob/cf00f6a06b256e9c4defe226b6a37aa83c40f561/python/pyiceberg/table/__init__.py#L79). That said, I am very new to the iceberg world and still trying to wrap my head around it.

martindurant commented 1 year ago

Metadata updates involve writing a new JSON "v" file, and data updates involve writing the new files (parquet), manifests for those (and for deletes) and a new manifest list "snapshot" file in avro. Communicating this to a server appears to need sending the same information bundled as JSON to the REST endpoint (haven't checked hive/thrift).

The complexity comes in partitioning the writes, and any transforms in the partitioning or sorting to be applied - maybe not necessary for simple operation, but a big part of iceberg's "secret sauce". Plus, dask has no API yet to do anything other than append to a dataset.

Fokko commented 1 year ago

Happy new year everyone!

I see, it looks like scan.plan_files() from the section above may do what we need for dask. You an pass any list of parquet files to dask's parquet engines, but I wonder whether arrow actually supports schema changes; fastparquet has support for adding and removing fields, not changing (well maybe, but not tested anyway).

It is a bit more involved than we initially anticipated. We ended up implementing the ID-based projection ourselves. Instead of relying on the names of the fields, Iceberg relies on the IDs of each field that are embedded in the Parquet file (and Iceberg schema). For example, if you write a bunch of files, and then rename a column, with Iceberg you don't have to rewrite the files that you wrote before because we just map the ID of the column, and rename that column when reading the data.

I would expect that we need to do something similar to fastparquet, but it would be nice to push as much as possible to the library itself.

With respect to writing, that is a bit further along because we want to make sure that we write the Avro files correctly, and test this thoroughly. There is nothing worse than corrupting a table in a table format :) Simple operations like appending files should be quite easy. For more complicated stuff like updating, I think a bit more work is involved (where merge-on-read is easier than the copy-on-write strategy).