snowflakedb / snowflake-connector-python

Snowflake Connector for Python
https://pypi.python.org/pypi/snowflake-connector-python/
Apache License 2.0
601 stars 473 forks source link

SNOW-742076: PUT file_stream: io.UnsupportedOperation: File or stream is not seekable #1428

Open pp-gborodin opened 1 year ago

pp-gborodin commented 1 year ago
  1. What version of Python are you using?
Python 3.10.6 (main, Nov 14 2022, 16:10:14) [GCC 11.3.0]
  1. What operating system and processor architecture are you using?
Linux-5.15.0-58-generic-x86_64-with-glibc2.35
  1. What are the component versions in the environment (pip freeze)?
apache-airflow-providers-snowflake==2.2.0
  1. What did you do?

I tried to upload data to snowflake with PUT command via unix pipe. I don't want to store a temporary file on my system, so I tried to stream the data from postgres to snowflake through a pipe. Here is a code sample:

def _get_data_from_postgres(file):
    with os.fdopen(file, 'wb') as f:
        f.write(b'fruit,count\n')
        f.write(b'apple,3\n')
        f.write(b'orange,5\n')

def put_data_to_snowflake(f):
    conn = snowflake_connection
    with conn.cursor() as cur:
        cur.execute(f"PUT file:///pipe.csv @fruits OVERWRITE = TRUE", file_stream=File(f))

def _put_data_to_snowflake(file):
    with os.fdopen(file, 'rb') as f:
        put_data_to_snowflake(f)

rf, wf = os.pipe()

get_thread = Thread(target=_get_data_from_postgres, args=(wf,))
put_thread = Thread(target=_put_data_to_snowflake, args=(rf,))

get_thread.start()
put_thread.start()

timeout=30
get_thread.join(timeout=timeout)
put_thread.join(timeout=timeout)
  1. What did you expect to see?

I expected to see the data was uploaded

  1. What happened instead?
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/xometry-dags/dags/common/dwh_snowflake.py", line 37, in _put_data_to_snowflake
    put_data_to_snowflake(f, conn, table)
  File "/opt/xometry-dags/dags/common/dwh_snowflake.py", line 27, in put_data_to_snowflake
    cur.execute(f"PUT file:///pipe.csv @fruits OVERWRITE = TRUE", file_stream=File(f))
  File "/home/***/.local/lib/python3.7/site-packages/snowflake/connector/cursor.py", line 794, in execute
    sf_file_transfer_agent.execute()
  File "/home/***/.local/lib/python3.7/site-packages/snowflake/connector/file_transfer_agent.py", line 359, in execute
    self._init_file_metadata()
  File "/home/***/.local/lib/python3.7/site-packages/snowflake/connector/file_transfer_agent.py", line 973, in _init_file_metadata
    src_file_size=self._source_from_stream.seek(0, os.SEEK_END),
io.UnsupportedOperation: File or stream is not seekable.
  1. Thoughts

I see that snowflake connector uses seek method to calculate the file size. Is this mandatory? Can I upload the data without knowing the size? If so how can I do that?

sfc-gh-stan commented 1 year ago

Hi @pp-gborodin , unfortunately we don't yet support streaming PUT without knowing the content size. This is a very reasonable feature request, thank you for bringing this to our attention. Could you please open a feature request ticket instead?

nicornk commented 1 year ago

+1 for this feature

erikvanzijst commented 1 month ago

Same. My system processes some large data sets and it's a burden to first having to spool everything to a temporary file.

This is especially problematic in containerized deployments where pods have limited locally attached storage.

erikvanzijst commented 1 month ago

FWIW, I've created a feature request for this on the community site: https://community.snowflake.com/s/case/500VI00000IcDOVYA3/allow-put-to-upload-from-a-nonseekable-fifopipe

Not sure if that is accessible to other users though.

erikvanzijst commented 1 month ago

Snowflake Support came back, acknowledging the issue and raising it internally. We'll see what happens.

Hi Erik,

Thanks for your patience.

I have tested the code- and can indeed reproduce the issue on a later version of Python connector.

I have checked internally and can confirm that we don't currently support streaming PUT without knowing the content size- as such it cannot be used for streaming with pipes without using the seek method to know the actual file size currently. (We have a similar mechanism to stream data to Snowflake directly using Snowpipe Streaming - with Java SDK)

Having said that, I have gone ahead a raised a feature request to notify the Accounts team to have the supportability for file streaming in PUT Api.

Further, the community link and the JIRA that you have cited have been notified to your Accounts Team who can work with the Engineering team regarding the feature mentioned.

Your Accounts team may reach out to you if they need any further clarification so that they can help you with the next steps regarding the feature concerned.

If you have any follow up questions regarding this feature request, please reach out to your Account team, copied in this email, as they will be owning this request going forward. I have made them aware of our engagement and you can reference this case number 00869215 if needed.