Eventual-Inc / Daft

Distributed DataFrame for Python designed for the cloud, powered by Rust
https://getdaft.io
Apache License 2.0
1.89k stars 121 forks source link

Add Unity Catalog Volume support #2482

Open avriiil opened 1 week ago

avriiil commented 1 week ago

It would be great to start building out Volume support from Daft for Unity Catalog. Images and JSON feel like the highest prio to start with.

Right now Table supports looks like this:

# connect to UC
unity = UnityCatalog(endpoint, token)

# list catalog/schema/tables
print(unity.list_tables("unity.default"))
['unity.default.numbers', 'unity.default.marksheet_uniform', 'unity.default.marksheet']

# load table into Daft df 
unity_table = unity.load_table("unity.default.numbers")
df = daft.read_delta_lake(unity_table)

It would be cool to be able to do something like this (pseudo-code):

# load volume
unity_volume = unity.load_volume("unity.default.images")

# get refs/urls per image
img_refs = unity_volume.get_references()
df_img = df.with_column("image_refs", img_refs)

df_img = df.with_column("image_bytes", df["image_refs"].uc_volume.download(on_error="null"))

Of course, whatever API we design needs to be able to handle many different data types. Perhaps it makes more sense to introduce a sublevel per dtype, e.g.

In this case we could leverage existing methods/expressions in the url, image and json modules.

Curious to hear what other folks think.

avriiil commented 1 week ago

copying over this comment from @jaychia from Slack

I’m interested in figuring out the API. Was thinking we could leverage .url.download() like how we do for S3 URLs, but the tricky thing to try and make work is the credentials story as well as the URL scheme.

Maybe we can recognize URLs like unity://<volume_name>/... and perform a 2-step process:

  1. Convert these URLs to S3 URLs. Grab S3 credentials from Unity Catalog.
  2. Run a normal .url.download() on the S3 URLs using those S3 credentials
jaychia commented 1 week ago

What do you think instead of:

volume = unity.load_volume("unity.default.images")
df = daft.from_unity_catalog_volume(volume)

# Shows references, file sizes etc
df.show()

df_img = df.with_column("image_data_bytes", df["references"].unity_catalog.download(volume))
df_img = df.with_column("image", df["image_data_bytes"].image.decode())
avriiil commented 5 days ago

I like the simplicity of this. My brain hesitates for a minute at the idea of putting unstructured data into a DataFrame...but then I guess that's the basic premise of Daft :) We should document clearly that daft.from_uc_volume creates a DataFrame containing only the metadata.

JSON is the other big use-case here. So that would look something like:

#create df with multiple json refs
df_json = df.with_column("json", df["references"].unity_catalog.download(volume))

#perform json expressions on content
df_json = df.with_column("json_query", df["json"].json.query(...))

What if we want to take a single JSON stored in UC and turn it into a dataframe? We'd need something like:

volume = unity.load_volume("unity.default.json")
uc_json = volume.get(path=<path/to/json>)
df = daft.read_json(uc_json)

Or do we have users create the full df containing all the files first + then selectively filter the row(s) they need to operate on further?