coiled / dask-snowflake

Dask integration for Snowflake
BSD 3-Clause "New" or "Revised" License
29 stars 7 forks source link

Partition size as argument for read_snowflake #7

Closed fjetter closed 1 year ago

fjetter commented 3 years ago

The performance of a lot of dask workflows are highly sensitive to partition sizes. Currently we see a lot of the result batches from snowflake to be small and a repartitioning should improve downstream performance. As a user I would like to have an easy way to control this when reading data from snowflake.

While the snowflake backend currently doesn't expose this as an option, it should (currently buggy, waiting for response on bug report) provide partition level size statistics. Using this statistics we should be able to implement a fast size based repartitioning and offer this as a read argument, e.g.


read_snowflake(query=..., ..., partition_size="100MB")
mrocklin commented 3 years ago

@fjetter do we get information from snowflake on the size of their partitions? Maybe we can batch these up on our end and have one task read many partitions?

fjetter commented 3 years ago

do we get information from snowflake on the size of their partitions?

Theoretically (currently a bit buggy) we get row count, compressed and uncompressed size and we could do some simple bin packing. My first draft included this already but I removed it again because it was not reliable. we could already start implementing the logic, though, and once the connector is released we can enable it.

fjetter commented 3 years ago

For the interested reader. A simple way to do this yourself, but less accurately, is to read a dataframe and perform some repartitioning yourself, e.g.

ddf = read_snowflake(...)
ddf = ddf.repartition(npartitions=ddf.npartitions // 2)

That's not optimal but should already speed things up. You are free to experiment with the factor 2 or you might want to consider using another option of DataFrame.repartition

evanmc2 commented 2 years ago

Heya, I'm the one who reached out initially about this issue. I'm wondering if it might be easier / also useful to allow partition keys at read time, something like:

ddf = read_snowflake("select a, b from mytable", partition_on=["a"])

I'm not super familiar with snowflake's backend if they allow something like this in an intelligent way, but I've been doing partitioning at the query level kinda like:

q: sqlalchemy.Query = ...
partitions =  [q.filter(partition_col = v) for v in partition_vals)]   # often a known date range
read_partitions = [delayed(query_to_pandas(p) for p in partitions)]
ddf = dask.dataframe.from_delayed(read_partitions)

it works but it feels messy. Not sure if it's possible to maybe do a groupby on snowflake side and read the chunks into separate partitions maybe?

mrocklin commented 1 year ago

@mdwgrogan you were running into this as well, is that correct?

mdwgrogan commented 1 year ago

Yes. I implemented a way to do this using the rowcount but using an actual size value would probably be better.

mrocklin commented 1 year ago

@jrbourbeau can I ask your team to own this?

My guess is that this also affects another workflow we've see recently.

mrocklin commented 1 year ago

@mdwgrogan if you have time to test this out that would be welcome