PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.46k stars 1.64k forks source link

Pause/Resume doesn't work post 2.8.6 #8878

Closed EmilRex closed 1 year ago

EmilRex commented 1 year ago

First check

Bug summary

Pausing a flow with pause_flow_run crashes when installing from main (g1897139b7). I'm guessing this is related to the #8702.

Reproduction

from prefect import flow, task, pause_flow_run

@task
def my_task():
    return 2 * 2

@flow
def pause_test():
    my_task()
    pause_flow_run()
    my_task()

if __name__ == "__main__":
    pause_test()

Error

(default) ➜  examples python pause.py                                              
10:49:19.181 | INFO    | prefect.engine - Created flow run 'jovial-orca' for flow 'pause-test'
10:49:20.201 | INFO    | Flow run 'jovial-orca' - Created task run 'my_task-0' for task 'my_task'
10:49:20.202 | INFO    | Flow run 'jovial-orca' - Executing 'my_task-0' immediately...
10:49:20.771 | INFO    | Task run 'my_task-0' - Finished in state Completed()
10:49:20.774 | INFO    | Flow run 'jovial-orca' - Pausing flow, execution will continue when this flow run is resumed.
10:49:20.779 | ERROR   | Flow run 'jovial-orca' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/anyio/streams/tls.py", line 130, in _call_sslobject_method
    result = func(*args)
  File "/opt/homebrew/Cellar/python@3.9/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/ssl.py", line 889, in read
    v = self._sslobj.read(len)
ssl.SSLWantReadError: The operation did not complete (read) (_ssl.c:2633)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/engine.py", line 671, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/emilchristensen/github/examples/pause.py", line 12, in pause_test
    pause_flow_run()
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 260, in coroutine_wrapper
    return call()
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 245, in __call__
    return self.result()
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
  File "/opt/homebrew/Cellar/python@3.9/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/opt/homebrew/Cellar/python@3.9/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/engine.py", line 808, in pause_flow_run
    return await _in_process_pause(
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/engine.py", line 831, in _in_process_pause
    state = await propose_state(
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/engine.py", line 1873, in propose_state
    response = await set_state_and_handle_waits(set_state)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/engine.py", line 1857, in set_state_and_handle_waits
    response = await set_state_func()
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/client/orchestration.py", line 1713, in set_flow_run_state
    response = await self._client.post(
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpx/_client.py", line 1842, in post
    return await self.request(
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpx/_client.py", line 1527, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/client/base.py", line 243, in send
    response = await self._send_with_retry(
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/client/base.py", line 189, in _send_with_retry
    response = await request()
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpx/_client.py", line 1614, in send
    response = await self._send_handling_auth(
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpx/_client.py", line 1642, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpx/_client.py", line 1679, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpx/_client.py", line 1716, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
    raise exc
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
    response = await connection.handle_async_request(request)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
    return await self._connection.handle_async_request(request)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpcore/_async/http2.py", line 146, in handle_async_request
    raise exc
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpcore/_async/http2.py", line 114, in handle_async_request
    status, headers = await self._receive_response(
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpcore/_async/http2.py", line 231, in _receive_response
    event = await self._receive_stream_event(request, stream_id)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpcore/_async/http2.py", line 262, in _receive_stream_event
    await self._receive_events(request, stream_id)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpcore/_async/http2.py", line 283, in _receive_events
    events = await self._read_incoming_data(request)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpcore/_async/http2.py", line 343, in _read_incoming_data
    raise exc
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpcore/_async/http2.py", line 329, in _read_incoming_data
    data = await self._network_stream.read(self.READ_NUM_BYTES, timeout)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 33, in read
    return await self._stream.receive(max_bytes=max_bytes)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/anyio/streams/tls.py", line 195, in receive
    data = await self._call_sslobject_method(self._ssl_object.read, max_bytes)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/anyio/streams/tls.py", line 137, in _call_sslobject_method
    data = await self.transport_stream.receive()
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 1265, in receive
    await self._protocol.read_event.wait()
  File "/opt/homebrew/Cellar/python@3.9/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/locks.py", line 226, in wait
    await fut
RuntimeError: Task <Task pending name='Task-13' coro=<Call._run_async() running at /Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py:218> cb=[_run_until_complete_cb() at /opt/homebrew/Cellar/python@3.9/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop
10:49:22.365 | ERROR   | Flow run 'jovial-orca' - Finished in state Failed('Flow run encountered an exception. Traceback (most recent call last):\n  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/anyio/streams/tls.py", line 130, in _call_sslobject_method\n    result = func(*args)\n  File "/opt/homebrew/Cellar/python@3.9/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/ssl.py", line 889, in read\n    v = self._sslobj.read(len)\nssl.SSLWantReadError: The operation did not complete (read) (_ssl.c:2633)\n\nDuring handling of the above exception, another exception occurred:\n\nRuntimeError: Task <Task pending name=\'Task-13\' coro=<Call._run_async() running at /Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py:218> cb=[_run_until_complete_cb() at /opt/homebrew/Cellar/python@3.9/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop\n')
Traceback (most recent call last):
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/anyio/streams/tls.py", line 130, in _call_sslobject_method
    result = func(*args)
  File "/opt/homebrew/Cellar/python@3.9/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/ssl.py", line 889, in read
    v = self._sslobj.read(len)
ssl.SSLWantReadError: The operation did not complete (read) (_ssl.c:2633)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/emilchristensen/github/examples/pause.py", line 17, in <module>
    pause_test()
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/flows.py", line 468, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/engine.py", line 182, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
    return call.result()
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
  File "/opt/homebrew/Cellar/python@3.9/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/opt/homebrew/Cellar/python@3.9/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
    return await fn(*args, **kwargs)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/engine.py", line 257, in create_then_begin_flow_run
    return await state.result(fetch=True)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/engine.py", line 671, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/emilchristensen/github/examples/pause.py", line 12, in pause_test
    pause_flow_run()
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 260, in coroutine_wrapper
    return call()
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 245, in __call__
    return self.result()
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
  File "/opt/homebrew/Cellar/python@3.9/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/opt/homebrew/Cellar/python@3.9/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/engine.py", line 808, in pause_flow_run
    return await _in_process_pause(
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/engine.py", line 831, in _in_process_pause
    state = await propose_state(
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/engine.py", line 1873, in propose_state
    response = await set_state_and_handle_waits(set_state)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/engine.py", line 1857, in set_state_and_handle_waits
    response = await set_state_func()
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/client/orchestration.py", line 1713, in set_flow_run_state
    response = await self._client.post(
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpx/_client.py", line 1842, in post
    return await self.request(
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpx/_client.py", line 1527, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/client/base.py", line 243, in send
    response = await self._send_with_retry(
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/client/base.py", line 189, in _send_with_retry
    response = await request()
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpx/_client.py", line 1614, in send
    response = await self._send_handling_auth(
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpx/_client.py", line 1642, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpx/_client.py", line 1679, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpx/_client.py", line 1716, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
    raise exc
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
    response = await connection.handle_async_request(request)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
    return await self._connection.handle_async_request(request)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpcore/_async/http2.py", line 146, in handle_async_request
    raise exc
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpcore/_async/http2.py", line 114, in handle_async_request
    status, headers = await self._receive_response(
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpcore/_async/http2.py", line 231, in _receive_response
    event = await self._receive_stream_event(request, stream_id)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpcore/_async/http2.py", line 262, in _receive_stream_event
    await self._receive_events(request, stream_id)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpcore/_async/http2.py", line 283, in _receive_events
    events = await self._read_incoming_data(request)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpcore/_async/http2.py", line 343, in _read_incoming_data
    raise exc
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpcore/_async/http2.py", line 329, in _read_incoming_data
    data = await self._network_stream.read(self.READ_NUM_BYTES, timeout)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 33, in read
    return await self._stream.receive(max_bytes=max_bytes)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/anyio/streams/tls.py", line 195, in receive
    data = await self._call_sslobject_method(self._ssl_object.read, max_bytes)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/anyio/streams/tls.py", line 137, in _call_sslobject_method
    data = await self.transport_stream.receive()
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 1265, in receive
    await self._protocol.read_event.wait()
  File "/opt/homebrew/Cellar/python@3.9/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/locks.py", line 226, in wait
    await fut
RuntimeError: Task <Task pending name='Task-13' coro=<Call._run_async() running at /Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py:218> cb=[_run_until_complete_cb() at /opt/homebrew/Cellar/python@3.9/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop

Versions

Version:             2.8.6+28.g1897139b7
API version:         0.8.4
Python version:      3.9.16
Git commit:          1897139b
Built:               Wed, Mar 22, 2023 9:58 AM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         cloud

Additional context

Works correctly with 2.8.6:

(default) ➜  examples prefect version                                              
Version:             2.8.6
API version:         0.8.4
Python version:      3.9.16
Git commit:          061d877b
Built:               Thu, Mar 16, 2023 2:58 PM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         cloud

(default) ➜  examples python pause.py                                              
10:48:23.639 | INFO    | prefect.engine - Created flow run 'magic-pillbug' for flow 'pause-test'
10:48:24.766 | INFO    | Flow run 'magic-pillbug' - Created task run 'my_task-0' for task 'my_task'
10:48:24.767 | INFO    | Flow run 'magic-pillbug' - Executing 'my_task-0' immediately...
10:48:25.278 | INFO    | Task run 'my_task-0' - Finished in state Completed()
10:48:25.279 | INFO    | Flow run 'magic-pillbug' - Pausing flow, execution will continue when this flow run is resumed.
10:48:35.904 | INFO    | Flow run 'magic-pillbug' - Resuming flow run execution!
10:48:36.094 | INFO    | Flow run 'magic-pillbug' - Created task run 'my_task-1' for task 'my_task'
10:48:36.096 | INFO    | Flow run 'magic-pillbug' - Executing 'my_task-1' immediately...
10:48:36.564 | INFO    | Task run 'my_task-1' - Finished in state Completed()
10:48:36.708 | INFO    | Flow run 'magic-pillbug' - Finished in state Completed('All states completed.')
zanieb commented 1 year ago

Thank you! @EmilRex digging into this ASAP.