PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.31k stars 1.59k forks source link

Flows with "Crashed" subflows that exceed flow timeout stuck in "Running" state #7683

Open efranksrecroom opened 1 year ago

efranksrecroom commented 1 year ago

First check

Bug summary

We have a flow that fires a subflow to run a sync with FiveTran. This flow has a timeout of 55 minutes defined. What we've found is that if the subflow ends up in a Crashed state, the parent flow will hit this timeout but never be marked as TimedOut; it will stay marked as "Running" with no ability to cancel it. More importantly, it never shows up as Failed/Crashed/Late and thus will not trigger an alert.

Note in the image that the Start Time for this flow was 12:57:42 AM and this screenshot was taken at ~11:00 AM the same day. The "Duration" of the job is 55m 1s which is roughly the 55m timeout that we have set for this flow. As you can see, the flow is still in a "Running" state. image

Now observe that the subflow is listed as "Crashed". image

Reproduction

Can't really provide code here. The repro would be to have a flow with a timeout, trigger a subflow that crashes.

Error

This is the error that we typically see in the "Crashed" subflow

Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/anyio/streams/tls.py", line 130, in _call_sslobject_method
    result = func(*args)
  File "/usr/local/lib/python3.10/ssl.py", line 917, in read
    v = self._sslobj.read(len)
ssl.SSLWantReadError: The operation did not complete (read) (_ssl.c:2548)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 33, in read
    return await self._stream.receive(max_bytes=max_bytes)
  File "/usr/local/lib/python3.10/site-packages/anyio/streams/tls.py", line 195, in receive
    data = await self._call_sslobject_method(self._ssl_object.read, max_bytes)
  File "/usr/local/lib/python3.10/site-packages/anyio/streams/tls.py", line 137, in _call_sslobject_method
    data = await self.transport_stream.receive()
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 1265, in receive
    await self._protocol.read_event.wait()
  File "/usr/local/lib/python3.10/asyncio/locks.py", line 214, in wait
    await fut
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/httpcore/_exceptions.py", line 8, in map_exceptions
    yield
  File "/usr/local/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 31, in read
    with anyio.fail_after(timeout):
  File "/usr/local/lib/python3.10/site-packages/anyio/_core/_tasks.py", line 118, in __exit__
    raise TimeoutError
TimeoutError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
    yield
  File "/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
    raise exc
  File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
    response = await connection.handle_async_request(request)
  File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
    return await self._connection.handle_async_request(request)
  File "/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py", line 105, in handle_async_request
    raise exc
  File "/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py", line 84, in handle_async_request
    ) = await self._receive_response_headers(**kwargs)
  File "/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py", line 148, in _receive_response_headers
    event = await self._receive_event(timeout=timeout)
  File "/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py", line 177, in _receive_event
    data = await self._network_stream.read(
  File "/usr/local/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 30, in read
    with map_exceptions(exc_map):
  File "/usr/local/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.10/site-packages/httpcore/_exceptions.py", line 12, in map_exceptions
    raise to_exc(exc)
httpcore.ReadTimeout

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1183, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect_fivetran/connectors.py", line 202, in start_fivetran_connector_sync
    await fivetran_client.force_connector(connector_id=connector_id)
  File "/usr/local/lib/python3.10/site-packages/prefect_fivetran/clients.py", line 107, in force_connector
    await self.client.post(
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1842, in post
    return await self.request(
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1527, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1614, in send
    response = await self._send_handling_auth(
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1642, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1679, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1716, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py", line 352, in handle_async_request
    with map_httpcore_exceptions():
  File "/usr/local/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.ReadTimeout

Versions

Version:             2.6.3
API version:         0.8.2
Python version:      3.10.8
Git commit:          9e7da96e
Built:               Tue, Oct 18, 2022 1:55 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         ephemeral
Server:
  Database:          sqlite
  SQLite version:    3.34.1

Additional context

No response

zanieb commented 1 year ago

@efranksrecroom can you please create a reproduction? You can simulate a crashed subflow run by having it raise a base exception.

efranksrecroom commented 1 year ago

This seems to roughly repro:

class GoBoom:
    def generate_subflow(self, logger):        
        @flow(name='Raise BaseException', retries=3, retry_delay_seconds=10)
        def sub_flow(logger):
            logger.info("Starting")
            time.sleep(30)
            raise BaseException()
        sub_flow(logger)

@flow(timeout_seconds=30, retries=1, retry_delay_seconds=25)
def foo_bar():
    go_boom = GoBoom()
    logger = get_run_logger()
    go_boom.generate_subflow(logger)

if __name__ == "__main__":
    foo_bar()

The net result of running this is a crashed subflow w/ a running flow that has exceeded the timeout that I specified.

image image image
zanieb commented 1 year ago

Thank you! We'll investigate this :)

jkz commented 1 day ago

@zanieb did you ever get to the bottom of this? We're facing this issue in Prefect 3 still.