PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.51k stars 1.64k forks source link

Prefect Reading contents of BytesIO objects. #8844

Open kokybo opened 1 year ago

kokybo commented 1 year ago

First check

Bug summary

I currently have a Prefect flow that downloads a large (60MB) file from a website that is gzip compressed. The file is kept in memory and passed into a task (as a BytesIO object) that decompresses the results. Prefect seems to messing with the position of the head within the BytesIO object. I have confirmed that the position is 0 when the object is created and is not 0 when it is loaded into the second task.

Reproduction

import requests
import io
import gzip

from prefect import task, flow

@task
def download_gzip_file() -> io.BytesIO:

    data_url = "https://dumps.wikimedia.org/other/pageviews/2023/2023-03/pageviews-20230309-220000.gz")
    resp = requests.get(data_url)

    data = io.BytesIO(resp.content)
    print(data.tell())
    return data

@task
def parse_response(data: io.BytesIO) -> str:
    output = ""
    print(data.tell())
    # data.seek(0) # If this line is commented in the code starts working

    with gzip.open(data, "r") as file:
        for line in file:
            if line[0:3] == b'en ':
                line_str = line.decode()
                output += line_str

    return output

@flow
def ingest_data():
    data = download_gzip_file.submit()

    output = parse_response.submit(data)

    return output

if __name__ == "__main__":
    ingest_data()

Error

Traceback (most recent call last):
  File "c:\Users\user\Desktop\Projects\data-collection\wiki\wiki\test.py", line 44, in <module>
    ingest_data()
  File "C:\Users\user\Desktop\Projects\data-collection\wiki\.venv\lib\site-packages\prefect\flows.py", line 456, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "C:\Users\user\Desktop\Projects\data-collection\wiki\.venv\lib\site-packages\prefect\engine.py", line 170, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "C:\Users\user\Desktop\Projects\data-collection\wiki\.venv\lib\site-packages\anyio\_core\_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "C:\Users\user\Desktop\Projects\data-collection\wiki\.venv\lib\site-packages\anyio\_backends\_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "C:\Python310\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Python310\lib\asyncio\base_events.py", line 641, in run_until_complete
    return future.result()
  File "C:\Users\user\Desktop\Projects\data-collection\wiki\.venv\lib\site-packages\anyio\_backends\_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "C:\Users\user\Desktop\Projects\data-collection\wiki\.venv\lib\site-packages\prefect\client\utilities.py", line 47, in with_injected_client    
    return await fn(*args, **kwargs)
  File "C:\Users\user\Desktop\Projects\data-collection\wiki\.venv\lib\site-packages\prefect\engine.py", line 250, in create_then_begin_flow_run       
    return await state.result(fetch=True)
  File "C:\Users\user\Desktop\Projects\data-collection\wiki\.venv\lib\site-packages\prefect\states.py", line 89, in _get_state_result
    raise await get_state_exception(state)
  File "C:\Users\user\Desktop\Projects\data-collection\wiki\.venv\lib\site-packages\prefect\engine.py", line 1512, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "C:\Users\user\Desktop\Projects\data-collection\wiki\.venv\lib\site-packages\prefect\utilities\asyncutils.py", line 154, in run_sync_in_interruptible_worker_thread
    async with anyio.create_task_group() as tg:
  File "C:\Users\user\Desktop\Projects\data-collection\wiki\.venv\lib\site-packages\anyio\_backends\_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "C:\Users\cmein\Desktop\Projects\data-collection\wiki\.venv\lib\site-packages\anyio\to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "C:\Users\user\Desktop\Projects\data-collection\wiki\.venv\lib\site-packages\anyio\_backends\_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "C:\Users\user\Desktop\Projects\data-collection\wiki\.venv\lib\site-packages\anyio\_backends\_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "C:\Users\user\Desktop\Projects\data-collection\wiki\.venv\lib\site-packages\prefect\utilities\asyncutils.py", line 135, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "c:\Users\user\Desktop\Projects\data-collection\wiki\wiki\test.py", line 26, in parse_response
    for line in file:
  File "C:\Python310\lib\gzip.py", line 399, in readline
    return self._buffer.readline(size)
  File "C:\Python310\lib\_compression.py", line 68, in readinto
    data = self.read(len(byte_view))
  File "C:\Python310\lib\gzip.py", line 488, in read
    if not self._read_gzip_header():
  File "C:\Python310\lib\gzip.py", line 436, in _read_gzip_header
    raise BadGzipFile('Not a gzipped file (%r)' % magic)
gzip.BadGzipFile: Not a gzipped file (b'\x7f\t')

Versions

Version:             2.82
API version:         0.8.4
Python version:      3.10.2
Git commit:          afbed19d
Built:               Fri, Feb 17, 2023 10:02 AM
OS/Arch:             win32/AMD64
Profile:             default
Server type:         ephemeral
Server:
  Database:          sqlite
  SQLite version:    3.35.5

Additional context

No response

zanieb commented 1 year ago

Thanks for the report! I presume this is caused by visit_collection and that we will need to add an exception for data of this type.

If you do

from prefect.utilities.annotations import quote

....

    output = parse_response.submit(quote(data))

Does this work as expected?

kokybo commented 1 year ago

That does not seem to work. I updated my code to be

@flow
def ingest_data():
    data = download_gzip_file.submit()

    output = parse_response.submit(quote(data))

    return output

and added the line data = data.result() to the parse_response task to manually await the future.

zanieb commented 1 year ago

@kokybo in that case, the bytes io object is still consumed?

kokybo commented 1 year ago

That is correct. The read head (object.tell()) is still partially offset within the buffer.

zanieb commented 1 year ago

I'm not sure where we are consuming it then, we'll investigate this when we get a chance or we're happy to review a contribution that resolves it :)

aaronroffel commented 7 months ago

It looks like you could be using a yanked version of Prefect (2.82).