coiled / dask-snowflake

Dask integration for Snowflake
BSD 3-Clause "New" or "Revised" License
29 stars 7 forks source link

No option to write index to table #43

Open phobson opened 1 year ago

phobson commented 1 year ago

Based on my understand of snowflake-connector-python, there's no option there to write an index to a snowflake table as a column.

However, we could provide an option in the dask-snowflake API that would trigger a reset index call on our send.

import os
import dotenv

import pandas
import dask
from dask.dataframe import from_pandas
from dask_snowflake import to_snowflake, read_snowflake
from distributed import Client, LocalCluster

dotenv.load_dotenv()
cnxn = dict(
    user=os.environ["SNOWFLAKE_USER"],
    password=os.environ["SNOWFLAKE_PASSWORD"],
    account=os.environ["SNOWFLAKE_ACCOUNT"],
    database=os.environ.get("SNOWFLAKE_DATABASE", "testdb"),
    schema=os.environ.get("SNOWFLAKE_SCHEMA", "public"),
    warehouse=os.environ["SNOWFLAKE_WAREHOUSE"],
    role=os.environ["SNOWFLAKE_ROLE"],
)

times = pandas.date_range(start='2000-01-01', periods=4, freq="5min")
data = pandas.DataFrame({
    'dewp': [12.0, 12.5, 12.4, 12.6],
    'wind': [0.1, 0.2, 0.3, 0.4],
}, index=pandas.Index(times, name="time"))
dask.config.set({"dataframe.dtype_backend": "pyarrow"})
if __name__ == "__main__":
    with LocalCluster(n_workers=4) as cluster:
        with Client(cluster) as client:
            pdf = pandas.DataFrame(data)
            ddf = from_pandas(pdf, npartitions=2)
            print(ddf.compute())

            to_snowflake(ddf, name="_test_no_dates_ddd", connection_kwargs=cnxn)
            no_dates = read_snowflake("select * from _test_no_dates_ddd", connection_kwargs=cnxn)
            print(no_dates.compute())

Which will print:

                     dewp  wind
time                           
2000-01-01 00:00:00  12.0   0.1
2000-01-01 00:05:00  12.5   0.2
2000-01-01 00:10:00  12.4   0.3
2000-01-01 00:15:00  12.6   0.4

   DEWP  WIND
0  12.4   0.3
1  12.6   0.4
2  12.0   0.1
3  12.5   0.2

I'm proposing an API like this:

 def to_snowflake(
     df: dd.DataFrame,
     name: str,
     connection_kwargs: dict,
+    index: bool = False
):
+    if index:
+        df = df.reset_index()