dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.3k stars 1.43k forks source link

Utilise snowpark for IO Managers #15991

Open mjclarke94 opened 1 year ago

mjclarke94 commented 1 year ago

What's the use case?

Snowpark is now stable for public usage and seems to be the preferred approach for interacting with snowflake from python. Not only does it allow for complex queries to be composed without the need to manually compose complex SQL queries, but it also offers some performance benefits (based entirely off my non-quantitative experience!)

I think there are a few potential wins here:

Snowpark dataframes can either be converted to pandas dataframes (providing parity with current behaviour albeit a fair bit faster), or operated on directly. When used directly, you effectively get a Lazyframe which can be operated on in various ways, and fed back to the IO manager to be written to a new table. None of this requires the data leave snowflake.

We've had calculations where the operation itself is trivial, but requires huge amounts of data to perform. For these sort of operations, you can knock an order of magnitude off the total materialisation time as you aren't waiting on data transfer and dagster can leverage extremely minimal compute resources for this.

The pythonic API for composing operations on tables is also a lot more readable than manually composing SQL commands. For something like "Fetch the data in this table relating to a given range of time partitions" the query would look something like:

table = session.table(asset_name)

target_range = table.filter(table[partition_col].between(partition_start, partition_end))

if as_pandas:
    return target_range.to_pandas()
return target_range

Pseudo-code, but hopefully shows that it is a bit more readable than needing to slowly build up a SQL string as is needed in the IO manager at the moment.

Ideas of implementation

A new IO manager which uses snowpark rather than the snowflake-connector directly would be ideal. The session object used at the root of most workflows is analogous to the SnowflakeConnection used to directly execute custom queries.

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

jamiedemaria commented 1 year ago

hey @mjclarke94 thanks for the feature request! We're currently prioritize making it easier to use dagster without one of the provided I/O managers (which in turn would make it easier to use snowpark without us needing to write a full integration), but will keep this in mind for future work

In the meantime, if you want to use snowpark to work with your data directly in snowflake, you could look into writing a Resource that would provide a snowpark session to your assets. You could then do your computations in the asset without returning any values, and use deps to create asset dependencies https://docs.dagster.io/concepts/assets/software-defined-assets#defining-basic-dependencies

gnilrets commented 11 months ago

I've started dabbling in Snowpark too. My first thought was to create an asset that returned a Snowpark DataFrame. However, when I attempted that, I got an error that _thread.lock was not pickleable. Not sure if that's the right thing to do here, but it was my first thought.

jamiedemaria commented 11 months ago

You might be getting the pickle error because you're still using the default IO manager. The default IO manager tries to pickle all outputs before storing them in the filesystem. You'd need to write an IO manager that can handle storing Snowpark Dataframes

gnilrets commented 9 months ago

I'm not really sure how we'd write an IO manager for a Snowpark dataframe because it's kind of an ephemeral object. The dataframe doesn't actually store the data (like a Pandas dataframe does). Instead, it's just an object that can be used to compile SQL that ends up being executed in Snowflake. The problem seems to be that these dataframe objects contain the SQL session object as part of them, which are not serializable.

AlaaInflyter commented 4 months ago

Hey there ! Any news on this ?