mrocklin / nyc-taxi

Fooling around with NYC Taxi data
1 stars 0 forks source link

Stage data into Snowflake #3

Open mrocklin opened 1 year ago

mrocklin commented 1 year ago

@dchudz and I were chatting with @IndexSeek who was curious about extending this example to show how to pre-process data for efficient loading into Snowflake.

@IndexSeek there is a notebook in this repository with the example that you saw in this video . Are you able to run it? (also, if running this on your personal account becomes onerous let me know and I'll add your Coiled account to one of ours).

If so, I'll suggest some next steps:

  1. @IndexSeek sets up a basic Snowflake system, and extends/alters the notebook (or a copy of the notebook) to include Snowflake ingestion (or maybe this doesn't happen in a notebook, but instead in some other documentation somewhere). This already is an interesting bit of content
  2. We look at using some workflow management system to update this dataset on-demand (presumably a new file lands every month). @dchudz has been looking more at Prefect recently, but we could consider some other configuration as well.

Thoughts?

IndexSeek commented 1 year ago

@mrocklin I'm looking into trying to get this working in my AWS account but have been having a little trouble due to the vCPU limit, but I'm looking to have that increased now.

Current State

So far, what I've done to start building out this solution with the trouble above is to use a Dask LocalCluster and only play with a single partition from your notebook, which speaks volumes as I have what I would consider a pretty powerful desktop and am unable to work across the entire nyc-tlc/trip data/fhvhv_tripdata_*.parquet data set locally.

Working with this set, I've repartitioned the DataFrame and leveraged more efficient data types, as you demonstrate in your notebook. Based on Snowflake documentation regarding File Sizing Best Practices and Limitations, in my first test I used a partition size of 512 MB, which led to the creation of Parquet files that were around ~130 MB each.

This is where I tweaked some code within the "Repartition to smaller chunks!" section of the notebook, that looked like this:

# Taking only the first partition due to current limitations. 
df = df.partitions[0]
df = df.repartition(partition_size="512MB").persist()
name_function = lambda x: f"fhvhv_tripdata_{x}.parquet"

df.to_parquet(path="s3::mybucket/", engine="pyarrow", compression="snappy", name_function=name_function)

To take this a step further, I wanted to continue to leverage cloud storage as the current notebook is doing, so I wrote these files to S3 and configured a Storage Integration and an External Stage in Snowflake using the following code, edited for sensitivity:

USE ROLE ACCOUNTADMIN;

CREATE STORAGE INTEGRATION S3_INT
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = 'role_arn'
  STORAGE_ALLOWED_LOCATIONS = ('s3://mybucket/');

DESCRIBE STORAGE INTEGRATION S3_INT;

GRANT USAGE ON INTEGRATION S3_INT TO ROLE SYSADMIN;

USE ROLE SYSADMIN;

CREATE STAGE FHVHV
  STORAGE INTEGRATION = S3_INT
  URL = 's3://mybucket/'
  FILE_FORMAT = (TYPE = PARQUET);

I ran a quick test to ensure permissions were set correctly between the Snowflake Account and the S3 Bucket by listing the files from the stage.

image

With this in place, I wanted to leverage Snowpark to see about starting to work with this table using Snowflake. Going back to Python, I wrote some code to explore the data.

from snowflake.snowpark import Session
import json

session = Session.builder.configs(
    json.loads(open("secrets.json").read())["SNOWFLAKE_CONNECTION"]
).create()

session.read.parquet("@FHVHV").show()

The result of the show command took around 15 seconds to complete on a cold X-Small warehouse.

image

Rather than reading the contents across the network using the stage, I persisted this data to a Snowflake table, still using the same X-Small warehouse.

session.read.parquet("@FHVHV").write.save_as_table("FHVHV")

This was completed in about 1 minute and 12 seconds.

Now that the table has persisted, the entire table is compressed to 462.7MB inside Snowflake, still retaining the same contents. I thought this was interesting as the separate compressed snappy files were ~664 MB on their own. I reran another show command using the table on the same X-Small warehouse, which was completed in 1.4 seconds.

image

Potential Next Steps

We discussed looking at a workflow management system based on the idea of assuming a new file lands at a specific cadence, and we're leveraging cloud storage; As part of the puzzle, I think we might be able to leverage a Snowpipe to automate this ingestion piece once the data has been optimized upstream and landed in cloud storage. I might like to try to piece this together.

I would be interested in benchmarking this more with differently-sized files and various-sized warehouses to see if there is additional performance gain on the Snowflake side. I was only using an X-Small warehouse, but I would anticipate additional performance scaling up.

I hope we can test this with the entire dataset using Coiled soon. Another route I might like to explore is trying different file types aside from Parquet.

mrocklin commented 1 year ago

@mrocklin I'm looking into trying to get this working in my AWS account but have been having a little trouble due to the vCPU limit, but I'm looking to have that increased now.

I've added you to my account. Add account="mrocklin" to your coiled.Cluster command and you should be good.

Potential Next Steps

I'm also curious about the difference in querying between coiled and snowflake for typical queries. I wouldn't be surprised to learn that Snowflake was faster/easier given that it's more specialized for this job. It might be useful to verify.

I agree that it would be useful to start looking into keeping this updated. Presumably there is a new file every month. What do we set up and where to make sure that we keep this warehouse up to date? My tools of choice here would be either Github actions or Prefect. @dchudz any ideas?

dchudz commented 1 year ago

My tools of choice here would be either Github actions or Prefect. @dchudz any ideas?

I don't have much view. Many good options. Prefect, Dagster, Github Actions, GCP Cloud Function, AWS Lambda, ... .

(I might personally go with one of the latter two since they're what I know, and maybe it's nice that Lambda can trigger based on objects appearing in S3.)

I'd be very happy to see this proceed with any choice of orchestration tool.

mrocklin commented 1 year ago

and maybe it's nice that Lambda can trigger based on objects appearing in S3.)

That does sound nice and like the right tool for the job. I have no experience with Lambda unfortuantely.

IndexSeek commented 1 year ago

@mrocklin, I was able to access the cluster; thank you! I got a good bit of work done quickly using it. More details are below.

Before I started with the full, different-size file analysis, I wanted to log where we're starting to get a baseline and demonstrate just how useful your DataFrame optimizations were.

filename raw_file_size default_pandas_file_size optimized_pandas_file_size
fhvhv_tripdata_2019-02.parquet 489.29 MiB 10.85 GiB 2.13 GiB
fhvhv_tripdata_2019-03.parquet 582.56 MiB 12.88 GiB 2.53 GiB
fhvhv_tripdata_2019-04.parquet 533.73 MiB 11.82 GiB 2.31 GiB
fhvhv_tripdata_2019-05.parquet 544.38 MiB 12.29 GiB 2.37 GiB
fhvhv_tripdata_2019-06.parquet 511.35 MiB 11.97 GiB 2.22 GiB
fhvhv_tripdata_2019-07.parquet 492.10 MiB 11.27 GiB 2.15 GiB
fhvhv_tripdata_2019-08.parquet 478.21 MiB 11.17 GiB 2.13 GiB
fhvhv_tripdata_2019-09.parquet 492.04 MiB 11.15 GiB 2.12 GiB
fhvhv_tripdata_2019-10.parquet 523.92 MiB 11.74 GiB 2.23 GiB
fhvhv_tripdata_2019-11.parquet 535.02 MiB 12.02 GiB 2.29 GiB
fhvhv_tripdata_2019-12.parquet 549.94 MiB 12.35 GiB 2.35 GiB
fhvhv_tripdata_2020-01.parquet 506.66 MiB 11.41 GiB 2.17 GiB
fhvhv_tripdata_2020-02.parquet 532.98 MiB 12.06 GiB 2.30 GiB
fhvhv_tripdata_2020-03.parquet 330.43 MiB 7.44 GiB 1.42 GiB
fhvhv_tripdata_2020-04.parquet 109.52 MiB 2.46 GiB 467.12 MiB
fhvhv_tripdata_2020-05.parquet 153.11 MiB 3.38 GiB 658.75 MiB
fhvhv_tripdata_2020-06.parquet 188.80 MiB 4.18 GiB 815.54 MiB
fhvhv_tripdata_2020-07.parquet 248.20 MiB 5.53 GiB 1.05 GiB
fhvhv_tripdata_2020-08.parquet 276.99 MiB 6.16 GiB 1.17 GiB
fhvhv_tripdata_2020-09.parquet 300.11 MiB 6.73 GiB 1.28 GiB
fhvhv_tripdata_2020-10.parquet 329.19 MiB 7.57 GiB 1.40 GiB
fhvhv_tripdata_2020-11.parquet 287.87 MiB 6.44 GiB 1.23 GiB
fhvhv_tripdata_2020-12.parquet 289.22 MiB 6.46 GiB 1.23 GiB
fhvhv_tripdata_2021-01.parquet 294.61 MiB 6.62 GiB 1.26 GiB
fhvhv_tripdata_2021-02.parquet 288.61 MiB 6.44 GiB 1.23 GiB
fhvhv_tripdata_2021-03.parquet 351.31 MiB 7.90 GiB 1.50 GiB
fhvhv_tripdata_2021-04.parquet 351.35 MiB 7.84 GiB 1.49 GiB
fhvhv_tripdata_2021-05.parquet 369.31 MiB 8.18 GiB 1.56 GiB
fhvhv_tripdata_2021-06.parquet 375.86 MiB 8.31 GiB 1.58 GiB
fhvhv_tripdata_2021-07.parquet 377.62 MiB 8.34 GiB 1.59 GiB
fhvhv_tripdata_2021-08.parquet 364.62 MiB 8.04 GiB 1.53 GiB
fhvhv_tripdata_2021-09.parquet 375.45 MiB 8.26 GiB 1.57 GiB
fhvhv_tripdata_2021-10.parquet 410.19 MiB 9.19 GiB 1.75 GiB
fhvhv_tripdata_2021-11.parquet 392.02 MiB 8.92 GiB 1.70 GiB
fhvhv_tripdata_2021-12.parquet 391.90 MiB 8.92 GiB 1.70 GiB
fhvhv_tripdata_2022-01.parquet 357.26 MiB 8.20 GiB 1.56 GiB
fhvhv_tripdata_2022-02.parquet 388.29 MiB 8.89 GiB 1.69 GiB
fhvhv_tripdata_2022-03.parquet 449.40 MiB 10.24 GiB 1.95 GiB
fhvhv_tripdata_2022-04.parquet 434.45 MiB 9.86 GiB 1.88 GiB
fhvhv_tripdata_2022-05.parquet 446.86 MiB 10.09 GiB 1.92 GiB
fhvhv_tripdata_2022-06.parquet 436.97 MiB 9.88 GiB 1.88 GiB
fhvhv_tripdata_2022-07.parquet 423.17 MiB 9.70 GiB 1.85 GiB
fhvhv_tripdata_2022-08.parquet 416.31 MiB 9.55 GiB 1.82 GiB
fhvhv_tripdata_2022-09.parquet 436.79 MiB 9.88 GiB 1.88 GiB

I started the Coiled cluster and read all of the "s3://nyc-tlc/trip data/fhvhvtripdata*.parquet" files into a Dask DataFrame and performed your data type optimization, and persisted the DataFrame. I wanted to test a few things, repartitioning the data into various sizes. I chose to do the following by writing snappy parquet files and pumping them to an S3 bucket in the same region:

I have yet to test performance from reading these files from Snowflake over each "folder" of right-sized files, but I will look to try this soon.

I would have to answer your question regarding typical query performance comparing Coiled and Snowflake with the classic: "it depends." I would imagine in most scenarios that if data is persisted to a Snowflake table, we would likely see better performance due to the nature of Snowflake's Micro-partitions & Data Clustering. The performance is likely to be similar if both platforms were to read files directly from external (in Snowflake's case) cloud storage. I agree that it would be useful to test and explore different operations and compare this.

Regarding an orchestration solution, I think we can try an example with AWS Lambda using Event Notifications, as @dchudz mentioned about notifications with the objects landing in S3. Would it be possible to do a small test with Lambda to see if a Coiled cluster could be spun up and do the file splitting/writing? This might mean the appropriate Python libraries must be deployed for the runtime dependency. I think this would be feasible as the Lambda itself would be using limited compute; this would mostly occur in the Coiled cluster.

I hope to have some of those benchmarks in Snowflake soon regarding the different-size file analysis.

dchudz commented 1 year ago

Would it be possible to do a small test with Lambda to see if a Coiled cluster could be spun up and do the file splitting/writing? This might mean the appropriate Python libraries must be deployed for the runtime dependency. I think this would be feasible as the Lambda itself would be using limited compute; this would mostly occur in the Coiled cluster.

Sounds possible!

There are some subtleties if you want the cluster to keep working after the lambda times out (15 minutes).

I have a draft blog post on that I never finished (oops) that I can send you. So that's one reason lambda isn't ideal. It's definitely feasible for the cluster to finish its work after the lambda exits, but maybe annoying. Happy to help though, or we can go another route.

dchudz commented 1 year ago

Here's the draft post just in case you do end up with a cluster that you want to keep working when its client lambda goes away:

# How to Launch Batch Dask Jobs ## Submit Big Python Jobs on a Schedule Sometimes we use Python interactively from a Jupyter notebook or an IDE, but after we've figured out the right workflow we often want to switch and run that job on a regular schedule, like every day or whenever some external event occurs. Often folks use job schedulers for this purpose, these could look like the following: - Serverless cloud functions like AWS Lambda or Google Cloud Functions - Your CI/CD system like GitHub actions - Cronjobs running from some stable machine - Workflow managers like Airflow or Prefect This blog goes through how to compose Dask with these systems so that the job can merely launch the task, rather than track it all the way through. ## Client and Cluster Lifecycles When you invoke a job from one of these places, you have to ask yourself: *Should the Dask Client (from which you initiate and drive operations on the cluster) run as long as the cluster?* If your Dask job is pretty quick, then "Yes" might be an okay answer. But for anything longer, you may need to plan for time limits: * AWS Lambda: 15 minutes * Google Cloud Functions: 9 minutes * GitHub actions: 58 minutes * Your laptop: As soon as you forget a job is running and close the lid So you may need the client to stop running long before the cluster finishes its computation. That's what we'll cover in the this post. ## How to run a Dask cluster longer than a Dask client To make sure the client can stop while the cluster still does useful work, there are two requirements: 1. Avoid blocking `compute` calls 2. Use `fire_and_forget` to make sure that the cluster doesn't cancel tasks once the client is gone. We now walk through modifying a toy example to meet these requirements. ## A toy example Here's the toy example we'll work with: ```python import pandas as pd import time SLEEP = 120 client = ... @delayed def load_data(part): print("sleeping to simulate slow data load") time.sleep(SLEEP) print("done sleeping") return pd.DataFrame({"a": [part, part]}) import dask.dataframe as dd print("load") NUM_PARTS = 2 df = dd.from_delayed([load_data(i) for i in range(NUM_PARTS)], meta={"a": int}) # Add a column "b" which is "a" doubled. # In the example, this is a stand-in for whatever more useful transformation you really want to do. # This step could involve big joins, training a machine learning model, etc. print("transform") df["b"] = df.a * 2 print("write output") df.to_parquet("out.parquet") print("client is done") ``` ## First problem: Make sure loading the data is lazy When I run this, we see: ``` load... ``` ... and then no other output for a couple minutes. The problem here is that while `dask.dataframe` is a lazy collection, it still needs to know something about the schema before it can do much of anything. So we can help it out by specifying a schema in advance: ``` df_in = dd.from_delayed([load_data(i) for i in range(NUM_PARTS)], meta={"a": int}) ``` Now we run this again and see: ``` load... transform... write... ``` ## Second problem: blocking write That's progress! We've quickly asked the cluster to load, transform, and write our data. But our client script is still running, and we don't see `client is done`. The reason for this is that by default, `dask.dataframe.to_parquet` (and the other writing methods, like `to_csv`) assumes you want to submit the work immediately, *and* wait for it. ``` Signature: df.to_parquet(path, *args, **kwargs) Docstring: Store Dask.dataframe to Parquet files Parameters ---------- ... compute : bool, default True If :obj:`True` (default) then the result is computed immediately. If :obj:`False` then a ``dask.dataframe.Scalar`` object is returned for future computation. ... ``` To solve this, we add `compute=False` to our `to_parquet` call. Now the method returns a *lazy result*, which we can submit to the cluster to kick off the work: ``` scalar = df_out.to_parquet("a_and_b", compute=False) future = client.compute(scalar) ``` Now when we run the script, we quickly see: ``` load... transform... write... client is done ``` ## Third Problem: Cancelled Tasks The client script ran quickly, but our output data isn't yet written. There's a directory `out.parquet`, but no data in there, and there's never going to be. The problem is that by default, if no one is waiting around for a future, Dask decides it's not needed and cancels that task. That's because Dask is trying to be as efficient as possible and avoid work that no one cares about. If there's no one waiting for a return value, then Dask thinks that it can stop doing the work. In this case however, we still care about the result, even if we're not waiting for it, so we need to tell dask to keep the task running even after our script launching the job completes. We can do this with the `fire_and_forget` method (TODO link): ``` fire_and_forget(future) ``` Now the client script runs quicly, and when it finishes the cluster is still working on our data. Eventually the cluster completes its work, and writes out the output data. ## The full script ``` import os import time import pandas as pd import coiled import dask from dask.distributed import Client, fire_and_forget SLEEP = 10 NUM_PARTS = 2 # If running locally client = Client() # If running remotely # scheduler_address = os.environ.get("SCHEDULER_ADDRESS") # cluster = coiled.Cluster(name="fire-and-forget-experiment") # client = Client(cluster) @dask.delayed def load_data(part): print("sleeping to simulate slow data load") time.sleep(SLEEP) print("done sleeping") return pd.DataFrame({"a": [part, part]}) import dask.dataframe as dd print("load...") df = dd.from_delayed([load_data(i) for i in range(NUM_PARTS)], meta={"a": int}) # Add a column "b" which is "a" doubled. # In the example, this is a stand-in for whatever more useful transformation you really want to do. # This step could involve big joins, training a machine learning model, etc. print("transform...") df["b"] = df.a * 2 print("write...") lazy_result = df.to_parquet("out.parquet", compute=False) future = client.compute(lazy_result) fire_and_forget(future) print("client is done") ```
mrocklin commented 1 year ago

There are some subtleties if you want the cluster to keep working after the lambda times out (15 minutes).

I imagine that we'll likely spend less than a minute after the cluster comes up. We can always do fire-and-forget though if we want to be safe (and let the lambda release quickly).

mrocklin commented 1 year ago

@dchudz one can always submit a general function to run on the cluster as well:

from dask.distributed import get_client, fire_and_forget

def f():
    client = get_client()
    df = dd.read_parquet(...)
    df.to_parquet(...)

fire_and_forget(client.submit(f))

This is maybe a little easier than being careful with compute calls.

IndexSeek commented 1 year ago

I want to look to complete the orchestration pipeline, these do seem like feasible solutions, and I'll look to see if I can have another colleague help me set this up from the Lambda side. We'll probably want to start with fewer workers and smaller files for testing, but the fire_and_forget reference is helpful here. For inspiration, I want to watch this video.

I've put together a repo demonstrating the value of right-sizing files before storage persistence; I would love your feedback. Now that the data is persisted, we can do some performance comparisons if you like when retrieving data. I put a few examples at the bottom of the notebook.

https://github.com/IndexSeek/ingestion-snowflake-coiled-benchmarking/blob/main/notebook.ipynb

hayesgb commented 1 year ago

@IndexSeek -- I've been working on creating an orchestration pipeline. I spent some time with Lambda but ended up switching to Prefect for orchestration. It's still a work in progress, but wanted to share a link to the repo in case you were interested. I'll be refining this over the next day or so to publish.

IndexSeek commented 1 year ago

@hayesgb This is great stuff; this is a similar road I started to go down, but it differs as I was looking to trigger the Prefect Flow with an S3 Event Notification and Lambda using the Prefect REST API, but admittedly, your solution has less configuration involved with the daily flow execution checking for files.

If we can, I would still like to demonstrate the "end-to-end" solution with Snowflake; this would occur downstream via AWS/Snowflake with an event notification configured on the bucket to which the load_and_clean_data task is writing to. This complete solution might demonstrate the following: