coiled / dask-snowflake

Dask integration for Snowflake
BSD 3-Clause "New" or "Revised" License
29 stars 7 forks source link

Questioning lock necessity in `write_snowflake` #29

Closed DamianBarabonkovQC closed 1 year ago

DamianBarabonkovQC commented 2 years ago

Hi there,

Looking at the implementation, there is a SerializableLock in the write_snowflake function. Looking at the linked issue above the SerializableLock, it seems to be related to the SQL PARALLEL = <n_workers> query option. However, looking at the documentation of write_pandas and in particular the parallel=1, that option seems to set PARALLEL = 1.

Therefore, does the lock buy us anything?

The lock has always been a part of this codebase, so I cannot point to a PR where it was added for a given specific reason. I am opening this issue to shed some light on this lock which might be unnecessary.

phobson commented 2 years ago

The issue mentioned: https://github.com/snowflakedb/snowflake-connector-python/issues/156

jrbourbeau commented 2 years ago

Thanks @DamianBarabonkovQC! I recall needing the lock to avoid multithreading issues but don't remember the specifics. I'm curious, do things work if you remove the lock?

DamianBarabonkovQC commented 2 years ago

Hi @jrbourbeau . I will try it out. Currently, I have some custom code, but it is essentially the same as what is in your library during write, and it works just fine.

phobson commented 1 year ago

I just checked and removing the lock shortens the local run time of the test suite from about 190 sec to about 120 seconds on my machine.

The diff is super short. Want me to push to a PR @jrbourbeau / @mrocklin ?

diff --git a/dask_snowflake/core.py b/dask_snowflake/core.py
index b2eaaeb..d0ff165 100644
--- a/dask_snowflake/core.py
+++ b/dask_snowflake/core.py
@@ -34,16 +34,16 @@ def write_snowflake(
     with snowflake.connector.connect(**connection_kwargs) as conn:
         # NOTE: Use a process-wide lock to avoid a `boto` multithreading issue
         # https://github.com/snowflakedb/snowflake-connector-python/issues/156
-        with SerializableLock(token="write_snowflake"):
-            write_pandas(
-                conn=conn,
-                df=df,
-                schema=connection_kwargs.get("schema", None),
-                # NOTE: since ensure_db_exists uses uppercase for the table name
-                table_name=name.upper(),
-                parallel=1,
-                quote_identifiers=False,
-            )
+        write_pandas(
+            conn=conn,
+            df=df,
+            schema=connection_kwargs.get("schema", None),
+            # NOTE: since ensure_db_exists uses uppercase for the table name
+            table_name=name.upper(),
+            parallel=1,
+            quote_identifiers=False,
+        )
phobson commented 1 year ago

Just dropping in a note here to say that in a discussion that I had with James, the multithreading issues he observed required a larger to cluster, not just a larger dataset. I'll be trying that out shortly.

phobson commented 1 year ago

I tried this out and got everything working successfully!

from dask_snowflake import to_snowflake, read_snowflake
from distributed import Client
from coiled import Cluster

cluster = Cluster(n_workers=100, package_sync=True, name="big-sflake")
client = Client(cluster)

ddf = (
        dask.datasets.timeseries(start="2000-01-01", end="2010-01-01", freq="10s", seed=1)
        .reset_index(drop=True)
        .rename(columns=lambda c: c.upper())
    )

to_snowflake(ddf, name=test_table, connection_kwargs=connection_kwargs)
# Test partition_size logic
partition_size = "10 MiB"
ddf_out = read_snowflake(
    f"SELECT * FROM {test_table}",
    connection_kwargs=connection_kwargs,
    partition_size=partition_size,
)
dd.utils.assert_eq(ddf, ddf_out, scheduler=client, check_index=False, check_dtype=False)

and the assert passed!