I was following the official guide to allow Prefect reading/writing from/to Snowflake.
I have setup the necessary credentials and blocks in Prefect.
I was using the Sync example in the documentation to test if it's working and while it did write the test data, I was unable to fetch it back:
The flow:
from prefect import flow, task
from prefect_snowflake import SnowflakeConnector
@task
def setup_table(block_name: str) -> None:
with SnowflakeConnector.load(block_name) as connector:
connector.execute(
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
)
connector.execute_many(
"INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Space"},
{"name": "Me", "address": "Myway 88"},
],
)
@task
def fetch_data(block_name: str) -> list:
all_rows = []
with SnowflakeConnector.load(block_name) as connector:
while True:
# Repeated fetch* calls using the same operation will
# skip re-executing and instead return the next set of results
new_rows = connector.fetch_many("SELECT * FROM customers", size=2)
if len(new_rows) == 0:
break
all_rows.append(new_rows)
return all_rows
@flow
def snowflake_flow(block_name: str) -> list:
setup_table(block_name)
all_rows = fetch_data(block_name)
return all_rows
if __name__=="__main__":
snowflake_flow("test-block")
What it returns on the fetch_many task:
09:26:11.894 | ERROR | Task run 'fetch_data-dde' - Finished in state Failed("Task run encountered an exception AttributeError: 'SnowflakeCursor' object has no attribute 'strip'")
09:26:11.901 | ERROR | Flow run 'unbiased-tortoise' - Encountered exception during execution: AttributeError("'SnowflakeCursor' object has no attribute 'strip'")
Traceback (most recent call last):
File "/home/vscode/.local/lib/python3.12/site-packages/prefect/flow_engine.py", line 652, in run_context
yield self
File "/home/vscode/.local/lib/python3.12/site-packages/prefect/flow_engine.py", line 696, in run_flow_sync
engine.call_flow_fn()
File "/home/vscode/.local/lib/python3.12/site-packages/prefect/flow_engine.py", line 675, in call_flow_fn
result = call_with_parameters(self.flow.fn, self.parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/vscode/.local/lib/python3.12/site-packages/prefect/utilities/callables.py", line 206, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/workspaces/data-analytic-workflows/flows/snowflake_test.py", line 36, in snowflake_flow
#setup_table(block_name)
^^^^^^^^^^^^^^
File "/home/vscode/.local/lib/python3.12/site-packages/prefect/tasks.py", line 997, in __call__
return run_task(
^^^^^^^^^
File "/home/vscode/.local/lib/python3.12/site-packages/prefect/task_engine.py", line 1512, in run_task
return run_task_sync(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/vscode/.local/lib/python3.12/site-packages/prefect/task_engine.py", line 1325, in run_task_sync
return engine.state if return_type == "state" else engine.result()
^^^^^^^^^^^^^^^
File "/home/vscode/.local/lib/python3.12/site-packages/prefect/task_engine.py", line 457, in result
raise self._raised
File "/home/vscode/.local/lib/python3.12/site-packages/prefect/task_engine.py", line 763, in run_context
yield self
File "/home/vscode/.local/lib/python3.12/site-packages/prefect/task_engine.py", line 1323, in run_task_sync
engine.call_task_fn(txn)
File "/home/vscode/.local/lib/python3.12/site-packages/prefect/task_engine.py", line 786, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/vscode/.local/lib/python3.12/site-packages/prefect/utilities/callables.py", line 206, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/workspaces/data-analytic-workflows/flows/snowflake_test.py", line 27, in fetch_data
# skip re-executing and instead return the next set of results
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/vscode/.local/lib/python3.12/site-packages/prefect_snowflake/database.py", line 442, in fetch_many
self.execute(cursor, inputs)
File "/home/vscode/.local/lib/python3.12/site-packages/prefect_snowflake/database.py", line 656, in execute
run_coro_as_sync(self._execute_async(cursor, inputs))
File "/home/vscode/.local/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 243, in run_coro_as_sync
return call.result()
^^^^^^^^^^^^^
File "/home/vscode/.local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 312, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/vscode/.local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 182, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/home/vscode/.local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
result = await coro
^^^^^^^^^^
File "/home/vscode/.local/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 225, in coroutine_wrapper
return await task
^^^^^^^^^^
File "/home/vscode/.local/lib/python3.12/site-packages/prefect_snowflake/database.py", line 199, in _execute_async
response = await run_sync_in_worker_thread(cursor.execute_async, **inputs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/vscode/.local/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 269, in run_sync_in_worker_thread
result = await anyio.to_thread.run_sync(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/vscode/.local/lib/python3.12/site-packages/anyio/to_thread.py", line 56, in run_sync
return await get_async_backend().run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/vscode/.local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 2441, in run_sync_in_worker_thread
return await future
^^^^^^^^^^^^
File "/home/vscode/.local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 943, in run
result = context.run(func, *args)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/vscode/.local/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 279, in call_with_mark
return call()
^^^^^^
File "/home/vscode/.local/lib/python3.12/site-packages/snowflake/connector/cursor.py", line 1097, in execute_async
return self.execute(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/vscode/.local/lib/python3.12/site-packages/snowflake/connector/cursor.py", line 927, in execute
command = command.strip(" \t\n\r") if command else None
^^^^^^^^^^^^^
AttributeError: 'SnowflakeCursor' object has no attribute 'strip'
I did some digging in the source code but haven't noticed anything which could have caused this. Seems like instead of stripping the SQL operation from unwanted characters, it tries to do it on the cursor object.
Version info (prefect version output)
Version: 3.0.4
API version: 0.8.4
Python version: 3.12.6
Git commit: c068d7e2
Built: Tue, Oct 1, 2024 11:54 AM
OS/Arch: linux/aarch64
Profile: ephemeral
Server type: cloud
Pydantic version: 2.9.2
Integrations:
prefect-aws: 0.5.0
prefect-gcp: 0.6.1
prefect-snowflake: 0.28.0
prefect-github: 0.3.0
Bug summary
Hi!
I was following the official guide to allow Prefect reading/writing from/to Snowflake.
I have setup the necessary credentials and blocks in Prefect.
I was using the
Sync
example in the documentation to test if it's working and while it did write the test data, I was unable to fetch it back:The flow:
What it returns on the fetch_many task:
I did some digging in the source code but haven't noticed anything which could have caused this. Seems like instead of stripping the SQL operation from unwanted characters, it tries to do it on the cursor object.
Version info (
prefect version
output)Additional context
No response