PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.33k stars 1.59k forks source link

`with ShellOperation` context block results in error #13073

Open discdiver opened 1 year ago

discdiver commented 1 year ago

Reproduction

Use the context block example from the docs (first create folder with today's date):

from prefect import flow
from datetime import datetime
from prefect_shell import ShellOperation

@flow
def download_data():
    today = datetime.today().strftime("%Y%m%d")

    # for long running operations, you can use a context manager
    with ShellOperation(
        commands=[
            "curl -O https://masie_web.apps.nsidc.org/pub/DATASETS/NOAA/G02135/north/daily/data/N_seaice_extent_daily_v3.0.csv",
        ],
        working_dir=f"{today}",
    ) as download_csv_operation:
        # trigger runs the process in the background
        download_csv_process = download_csv_operation.trigger()

        # when you're ready, wait for the process to finish
        download_csv_process.wait_for_completion()

        # if you'd like to get the output lines, you can use the `fetch_result` method
        output_lines = download_csv_process.fetch_result()
Version:             2.10.11
API version:         0.8.4
Python version:      3.10.8
Git commit:          8c651ffc
Built:               Thu, May 25, 2023 2:59 PM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         cloud
prefect-shell             0.1.5                    pypi_0    pypi

Traceback


11:19:46.098 | INFO    | prefect.engine - Created flow run 'crazy-mastodon' for flow 'download-data'
11:19:46.099 | INFO    | Flow run 'crazy-mastodon' - View at https://app.prefect.cloud/account/cbad165a-7057-416a-bb31-a0aab3f1d0cf/workspace/a575bdc0-4dcc-4ad4-b286-3639c7077307/flow-runs/flow-run/bb0f53ad-596d-45d2-9e65-9a481a480e8b
11:19:46.586 | INFO    | Flow run 'crazy-mastodon' - PID 54582 triggered with 2 commands running inside the '.' directory.
11:19:46.596 | INFO    | Flow run 'crazy-mastodon' - PID 54582 completed with return code 0.
11:19:46.709 | INFO    | Flow run 'crazy-mastodon' - Finished in state Completed()
(base) jeffhale prefect/linkcheck $ python tshell.py
11:20:26.719 | INFO    | prefect.engine - Created flow run 'careful-terrier' for flow 'download-data'
11:20:26.723 | INFO    | Flow run 'careful-terrier' - View at https://app.prefect.cloud/account/cbad165a-7057-416a-bb31-a0aab3f1d0cf/workspace/a575bdc0-4dcc-4ad4-b286-3639c7077307/flow-runs/flow-run/bfa7557b-e3a2-45f7-9d6c-66a0b1112218
11:20:27.144 | INFO    | Flow run 'careful-terrier' - PID 54654 triggered with 2 commands running inside the '.' directory.
11:20:27.156 | INFO    | Flow run 'careful-terrier' - PID 54654 completed with return code 0.
11:20:27.300 | INFO    | Flow run 'careful-terrier' - Finished in state Completed()
(base) jeffhale prefect/linkcheck $ python tshell.py
11:20:41.903 | INFO    | prefect.engine - Created flow run 'vanilla-manul' for flow 'download-data'
11:20:41.907 | INFO    | Flow run 'vanilla-manul' - View at https://app.prefect.cloud/account/cbad165a-7057-416a-bb31-a0aab3f1d0cf/workspace/a575bdc0-4dcc-4ad4-b286-3639c7077307/flow-runs/flow-run/78626444-9688-4345-abd1-d6cd029983f7
11:20:42.361 | INFO    | Flow run 'vanilla-manul' - PID 54681 triggered with 1 commands running inside the PosixPath('20230602') directory.
11:20:42.390 | ERROR   | asyncio - Exception in callback SubprocessStreamProtocol.pipe_data_received(2, b'  % Total  ...ed\n\r  0    ')
handle: <Handle SubprocessStreamProtocol.pipe_data_received(2, b'  % Total  ...ed\n\r  0    ')>
Traceback (most recent call last):
  File "/Users/jeffhale/miniforge3/lib/python3.10/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/jeffhale/miniforge3/lib/python3.10/asyncio/subprocess.py", line 72, in pipe_data_received
    reader.feed_data(data)
  File "/Users/jeffhale/miniforge3/lib/python3.10/asyncio/streams.py", line 456, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
11:20:42.393 | ERROR   | asyncio - Exception in callback SubprocessStreamProtocol.pipe_data_received(2, b' 0    0    ...-:--:--     0')
handle: <Handle SubprocessStreamProtocol.pipe_data_received(2, b' 0    0    ...-:--:--     0')>
Traceback (most recent call last):
  File "/Users/jeffhale/miniforge3/lib/python3.10/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/jeffhale/miniforge3/lib/python3.10/asyncio/subprocess.py", line 72, in pipe_data_received
    reader.feed_data(data)
  File "/Users/jeffhale/miniforge3/lib/python3.10/asyncio/streams.py", line 456, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
11:20:42.870 | ERROR   | asyncio - Exception in callback SubprocessStreamProtocol.pipe_data_received(2, b'\r  0 1620k...0:01:37 17102')
handle: <Handle SubprocessStreamProtocol.pipe_data_received(2, b'\r  0 1620k...0:01:37 17102')>
Traceback (most recent call last):
  File "/Users/jeffhale/miniforge3/lib/python3.10/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/jeffhale/miniforge3/lib/python3.10/asyncio/subprocess.py", line 72, in pipe_data_received
    reader.feed_data(data)
  File "/Users/jeffhale/miniforge3/lib/python3.10/asyncio/streams.py", line 456, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
11:20:43.203 | ERROR   | asyncio - Exception in callback SubprocessStreamProtocol.pipe_data_received(2, b'\r100 1620k...--:-- 1992k\n')
handle: <Handle SubprocessStreamProtocol.pipe_data_received(2, b'\r100 1620k...--:-- 1992k\n')>
Traceback (most recent call last):
  File "/Users/jeffhale/miniforge3/lib/python3.10/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/jeffhale/miniforge3/lib/python3.10/asyncio/subprocess.py", line 72, in pipe_data_received
    reader.feed_data(data)
  File "/Users/jeffhale/miniforge3/lib/python3.10/asyncio/streams.py", line 456, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
11:20:43.207 | INFO    | Flow run 'vanilla-manul' - PID 54681 completed with return code 0.
11:20:43.210 | INFO    | Flow run 'vanilla-manul' - Successfully closed all open processes.
11:20:43.311 | INFO    | Flow run 'vanilla-manul' - Finished in state Completed()
(base) jeffhale prefect/linkcheck $ 
egnor commented 1 year ago

Much smaller repro example:

from prefect import flow
from prefect_shell import ShellOperation

@flow
def python_version():
    with ShellOperation(commands=["python3 --version"]) as op:
        process = op.trigger()
        process.wait_for_completion()
        result = process.fetch_result()

    return result

print(python_version())

On my system (Ubuntu 22.04, Linux 5.19.0-43-generic) this generates a feed_data after feed_eof exception every time. This appears to be a manifestation of https://github.com/PrefectHQ/prefect/issues/6335?

egnor commented 1 year ago

On further investigation, this may be a side effect of using the context block from non-async code, with no async event loop available. In that case, @sync_compatible will spin up a separate async event loop for each method that's called. That means one event loop will be used to create the ShellOperation, another one will be used to run op.trigger(), and so on.

My hypothesis is that since op.trigger() launches a subprocess with various file descriptors connected and sets up event listeners for those file descriptors, tearing down and rebuilding event loops ends up doing unfortunate things, which cause the internals of the async I/O system to get confused and unhappy.

Probably:

eiko4 commented 1 year ago

any updates?

zhen0 commented 4 months ago

Thanks for the bumps and the additional MREs here. I'm adding a side note that these MREs run fine for me in 3.0. I'll leave the issue open as I am not certain they are resolved in 2.0.