coiled / dask-snowflake

Dask integration for Snowflake
BSD 3-Clause "New" or "Revised" License
29 stars 7 forks source link

Read/write performance issues #22

Closed avriiil closed 2 years ago

avriiil commented 2 years ago

I'm seeing some performance issues while reading / writing data using the dask-snowflake connector.

Specifically, the to_snowflake call in the code below (taken from the coiled docs) sometimes runs in ~25 seconds and other times takes >1min.

import coiled
import dask
from dask_snowflake import to_snowflake

cluster = coiled.Cluster(
    name="coiled-snowflake",
    software="coiled-examples/snowflake",
    n_workers=20,
    shutdown_on_close=False,
    scheduler_options={'idle_timeout':'2 hours'},
    backend_options={'spot':'True'},
)

from dask.distributed import Client
client = Client(cluster)

ddf = dask.datasets.timeseries(
    start="2021-01-01",
    end="2021-03-31",
)

ctx = snowflake.connector.connect(
    user=os.environ["SNOWFLAKE_USER"],
    password=os.environ["SNOWFLAKE_PASSWORD"],
    account=os.environ["SNOWFLAKE_ACCOUNT"],
)

cs = ctx.cursor()

cs.execute("CREATE WAREHOUSE IF NOT EXISTS dask_snowflake_wh")
cs.execute("CREATE DATABASE IF NOT EXISTS dask_snowflake_db")
cs.execute("USE DATABASE dask_snowflake_db")

connection_kwargs = {
    "user": os.environ["SNOWFLAKE_USER"],
    "password": os.environ["SNOWFLAKE_PASSWORD"],
    "account": os.environ["SNOWFLAKE_ACCOUNT"],
    "warehouse": os.environ["SNOWFLAKE_WAREHOUSE"],
    "database": "dask_snowflake_db",
    "schema": "PUBLIC",
}

%%time
to_snowflake(
    ddf,
    name="dask_snowflake_table",
    connection_kwargs=connection_kwargs,
)

And the read_snowflake call in the code below ran in ~40s in @fjetter's original demo video but takes >4min. in my case.

import coiled
from dask_snowflake import read_snowflake
from dask.distributed import Client
import snowflake.connector

# spin up cluster
cluster = coiled.Cluster(
    name="coiled-snowflake",
    software="coiled-examples/snowflake",
    n_workers=20,
    shutdown_on_close=False,
    scheduler_options={'idle_timeout':'2 hours'},
    backend_options={'spot':'True'},
)

client = Client(cluster)

ctx = snowflake.connector.connect(
    user=os.environ["SNOWFLAKE_USER"],
    password=os.environ["SNOWFLAKE_PASSWORD"],
    account=os.environ["SNOWFLAKE_ACCOUNT"],
)

SCHEMA = "SNOWFLAKE_SAMPLE_DATA.TPCH_SF100"

example_query=f"""
SELECT

    C_CUSTKEY,
    C_NAME,
    SUM(L_QUANTITY) AS sum_qty,
    SUM(PS_AVAILQTY) AS sum_avail_qty,
    MAX(P_RETAILPRICE) AS max_retail_price

    FROM {SCHEMA}.CUSTOMER

        JOIN {SCHEMA}.ORDERS
            ON C_CUSTKEY = O_CUSTKEY

            JOIN {SCHEMA}.LINEITEM
                ON L_ORDERKEY = O_ORDERKEY

                JOIN {SCHEMA}.PART
                    ON P_PARTKEY = L_PARTKEY

                    JOIN {SCHEMA}.PARTSUPP
                        ON P_PARTKEY = PS_PARTKEY

    WHERE PS_SUPPLYCOST > 11

GROUP BY C_CUSTKEY, C_NAME
"""

connection_kwargs = {
    "user": os.environ["SNOWFLAKE_USER"],
    "password": os.environ["SNOWFLAKE_PASSWORD"],
    "account": os.environ["SNOWFLAKE_ACCOUNT"],
    "warehouse": os.environ["SNOWFLAKE_WAREHOUSE"],
}

%%time
ddf = read_snowflake(
    query=example_query,
    connection_kwargs=connection_kwargs,
)

I'm running this from a notebook on an EC2 instance because the snowflake.connector still doesn't run on M1. I've asked @pavithraes to run the code above on her local non-M1 machine to see if she can reproduce.

fjetter commented 2 years ago

how large is the compute warehouse? this sounds like you might've used a smaller warehouse than I did. I'm not sure if the video shows what I used back then but I would simply suggest to bump that up.

Either way, it would be helpful if you can find out what takes that much time (e.g. a performance report). The dask snowflake connector doesn't perform any computations itself so I strongly suspect this is on snowflakes side and the most likely source is the warehous size

pavithraes commented 2 years ago

I've asked @pavithraes to run the code above on her local non-M1 machine to see if she can reproduce.

Here are my timings:

avriiil commented 2 years ago

Confirming that this was an issue due to warehouse size and closing this issue.