coiled / dask-snowflake

Dask integration for Snowflake
BSD 3-Clause "New" or "Revised" License
29 stars 7 forks source link

`_fetch_batch` AttributeError when reading from Snowflake #25

Closed avriiil closed 2 years ago

avriiil commented 2 years ago

Trying to read data from Snowflake using read_snowflake is throwing an AttributeError: Can't get attribute '_fetch_batch' on <module 'dask_snowflake.core' from '/opt/conda/envs/coiled/lib/python3.9/site-packages/dask_snowflake/core.py'>

The code I'm running:

# generate synthetic timeseries data
ddf = dask.datasets.timeseries(
    start="2021-01-01",
    end="2021-03-31",
)

# create warehouse and database
cs.execute("CREATE WAREHOUSE IF NOT EXISTS dask_snowflake_wh")
cs.execute("CREATE DATABASE IF NOT EXISTS dask_snowflake_db")
cs.execute("USE DATABASE dask_snowflake_db")

connection_kwargs = {
    "user": os.environ["SNOWFLAKE_USER"],
    "password": os.environ["SNOWFLAKE_PASSWORD"],
    "account": os.environ["SNOWFLAKE_ACCOUNT"],
    "warehouse": os.environ["SNOWFLAKE_WAREHOUSE"],
    "database": "dask_snowflake_db",
    "schema": "PUBLIC",
}

# write Dask dataframe to Snowflake in parallel <-------- this WORKS
to_snowflake(
    ddf,
    name="dask_snowflake_table",
    connection_kwargs=connection_kwargs,
)

# read data from snowflake into a Dask dataframe <----- this DOESN'T WORK
snowflake_data = read_snowflake(
    query="""
      SELECT *
      FROM dask_snowflake_table;
   """,
    connection_kwargs=connection_kwargs,
)

print(snowflake_data.head())

I'm running:

I've tried connecting to my personal Snowflake account instead of the Coiled account and I get the same error. @fjetter -- any idea what could be going wrong here?

Full Traceback:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
File <timed exec>:10, in <module>

File ~/mambaforge/lib/python3.9/site-packages/dask/dataframe/core.py:1140, in _Frame.head(self, n, npartitions, compute)
   1138 # No need to warn if we're already looking at all partitions
   1139 safe = npartitions != self.npartitions
-> 1140 return self._head(n=n, npartitions=npartitions, compute=compute, safe=safe)

File ~/mambaforge/lib/python3.9/site-packages/dask/dataframe/core.py:1174, in _Frame._head(self, n, npartitions, compute, safe)
   1169 result = new_dd_object(
   1170     graph, name, self._meta, [self.divisions[0], self.divisions[npartitions]]
   1171 )
   1173 if compute:
-> 1174     result = result.compute()
   1175 return result

File ~/mambaforge/lib/python3.9/site-packages/dask/base.py:290, in DaskMethodsMixin.compute(self, **kwargs)
    266 def compute(self, **kwargs):
    267     """Compute this dask collection
    268 
    269     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    288     dask.base.compute
    289     """
--> 290     (result,) = compute(self, traverse=False, **kwargs)
    291     return result

File ~/mambaforge/lib/python3.9/site-packages/dask/base.py:573, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    570     keys.append(x.__dask_keys__())
    571     postcomputes.append(x.__dask_postcompute__())
--> 573 results = schedule(dsk, keys, **kwargs)
    574 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/mambaforge/lib/python3.9/site-packages/distributed/client.py:3010, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3008         should_rejoin = False
   3009 try:
-> 3010     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3011 finally:
   3012     for f in futures.values():

File ~/mambaforge/lib/python3.9/site-packages/distributed/client.py:2162, in Client.gather(self, futures, errors, direct, asynchronous)
   2160 else:
   2161     local_worker = None
-> 2162 return self.sync(
   2163     self._gather,
   2164     futures,
   2165     errors=errors,
   2166     direct=direct,
   2167     local_worker=local_worker,
   2168     asynchronous=asynchronous,
   2169 )

File ~/mambaforge/lib/python3.9/site-packages/distributed/utils.py:311, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    309     return future
    310 else:
--> 311     return sync(
    312         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    313     )

File ~/mambaforge/lib/python3.9/site-packages/distributed/utils.py:378, in sync(loop, func, callback_timeout, *args, **kwargs)
    376 if error:
    377     typ, exc, tb = error
--> 378     raise exc.with_traceback(tb)
    379 else:
    380     return result

File ~/mambaforge/lib/python3.9/site-packages/distributed/utils.py:351, in sync.<locals>.f()
    349         future = asyncio.wait_for(future, callback_timeout)
    350     future = asyncio.ensure_future(future)
--> 351     result = yield future
    352 except Exception:
    353     error = sys.exc_info()

File ~/mambaforge/lib/python3.9/site-packages/tornado/gen.py:762, in Runner.run(self)
    759 exc_info = None
    761 try:
--> 762     value = future.result()
    763 except Exception:
    764     exc_info = sys.exc_info()

File ~/mambaforge/lib/python3.9/site-packages/distributed/client.py:2025, in Client._gather(self, futures, errors, direct, local_worker)
   2023         exc = CancelledError(key)
   2024     else:
-> 2025         raise exception.with_traceback(traceback)
   2026     raise exc
   2027 if errors == "skip":

File /opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/protocol/pickle.py:75, in loads()
avriiil commented 2 years ago

Additionally I can confirm that the code below does work

# create Snowflake Python connector
ctx = snowflake.connector.connect(
    user=os.environ["SNOWFLAKE_USER"],
    password=os.environ["SNOWFLAKE_PASSWORD"],
    account=os.environ["SNOWFLAKE_ACCOUNT"],
)

# run sample code to test connection
cs = ctx.cursor()

schema = "TPCDS_SF100TCL"
table = "CALL_CENTER"

cs.execute("USE SNOWFLAKE_SAMPLE_DATA")
cs.execute("SELECT * FROM " + schema + "." + table)

one_row = str(cs.fetchone())

print(one_row)
fjetter commented 2 years ago

The function _fetch_batch was only introduced in 0.1.0 and was called differently before. Can you please verify that the cluster and the client are running the same version of dask-snowflake?

fjetter commented 2 years ago

Additionally, it would be helpful to check if there are any other errors, e.g. in the dask Worker logs

avriiil commented 2 years ago

@fjetter -- the cluster was indeed running an old version of dask-snowflake. Updated and that solved the issue, thanks!