Open j-tr opened 10 months ago
Thanks for the issue @j-tr! Do you have an example of a flow we can use to reproduce this issue? Also, can you share the version of httpx
, httpcore
and h2
you have installed?
@desertaxle thank you for looking into this
I'm using h2==4.1.0 httpcore==1.0.2 httpx==0.26.0
so far we haven't come up with an MRE yet as this is very flaky and seems to happen only for relatively long-running flows.
I tried to make sense of the stack trace and found that the stream_id that cannot be found in self.streams of the h2 connection is provided by get_next_available_stream_id (https://github.com/python-hyper/h2/blob/bc005afad8302549facf5afde389a16759b2ccdb/src/h2/connection.py#L625C17-L625C17).
The docstring of that method contains a warning:
The return value from this function does not change until
the stream ID has actually been used by sending or pushing
headers on that stream. For that reason, it should be
called as close as possible to the actual use of the
stream ID.
As this is all aync code, could it be possible that under high load multiple connections get the same stream_id and consequently ending the stream a second time fails as the stream is already removed?
In this case, would this rather be an h2 problem?
Over the past two weeks, we have consistently encountered similar issues. Our implementation primarily utilizes asynchronous code, and I've integrated retry mechanisms into all relevant functions for enhanced reliability. Our system operates within a Docker Pool environment.
The error predominantly arises during our extended workflows, which typically run for durations ranging between 30 to 50 minutes and are scheduled hourly. It also occurs in our shorter workflows, which execute every 15 minutes. Although the issue's occurrence is sporadic, it has been happening frequently throughout the day.
As a potential solution, I am currently implementing PREFECT_API_ENABLE_HTTP2 = False to evaluate its effectiveness in resolving these issues.
Here is the list of packages installed. pip_list.xlsx
Attached below is the stack trace from a recent incident for further analysis:
Crash detected! Execution was cancelled by the runtime environment.
01:19:52 PM
prefect.flow_runs
Crash details:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 293, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async
result = await coro
^^^^^^^^^^
File "/code/libs/flows.py", line 101, in fl_process_ingest_to_silver
await write_delta_table_to_silver(
File "/code/libs/deltalake/function.py", line 124, in write_delta_table_to_silver
storage_options = await get_delta_table_storage_options()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/_asyncio.py", line 88, in async_wrapped
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/_asyncio.py", line 47, in __call__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 314, in iter
return fut.result()
^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/tenacity/_asyncio.py", line 50, in __call__
result = await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/code/libs/deltalake/function.py", line 59, in get_delta_table_storage_options
access_key = await fetch_secret.fn(
^^^^^^^^^^^^^^^^^^^^^^
File "/code/libs/core/prefect_utils.py", line 119, in fetch_secret
secret = await Secret.load(key.value)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/blocks/core.py", line 838, in load
block_document, block_document_name = await cls._get_block_document(name)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/blocks/core.py", line 743, in _get_block_document
block_document = await client.read_block_document_by_name(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/orchestration.py", line 1434, in read_block_document_by_name
response = await self._client.get(
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1786, in get
return await self.request(
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1559, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/base.py", line 282, in send
response = await self._send_with_retry(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/base.py", line 216, in _send_with_retry
response = await request()
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1646, in send
response = await self._send_handling_auth(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1674, in _send_handling_auth
response = await self._send_handling_redirects(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1711, in _send_handling_redirects
response = await self._send_single_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1748, in _send_single_request
response = await transport.handle_async_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 371, in handle_async_request
resp = await self._pool.handle_async_request(req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 268, in handle_async_request
raise exc
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 251, in handle_async_request
response = await connection.handle_async_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection.py", line 76, in handle_async_request
stream = await self._connect(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection.py", line 124, in _connect
stream = await self._network_backend.connect_tcp(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_backends/auto.py", line 30, in connect_tcp
return await self._backend.connect_tcp(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_backends/anyio.py", line 114, in connect_tcp
stream: anyio.abc.ByteStream = await anyio.connect_tcp(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/_core/_sockets.py", line 221, in connect_tcp
await event.wait()
File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 1778, in wait
if await self._event.wait():
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/asyncio/locks.py", line 213, in wait
await fut
asyncio.exceptions.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 2289, in report_flow_run_crashes
yield
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 702, in create_and_begin_subflow_run
terminal_state = await orchestrate_flow_run(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 851, in orchestrate_flow_run
result = await flow_call.aresult()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 295, in aresult
raise CancelledError() from exc
prefect._internal.concurrency.cancellation.CancelledError
Update: After adding PREFECT_API_ENABLE_HTTP2 = False it is giving more errors on different pipelines.
Crash details:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 2326, in report_task_run_crashes
yield
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1808, in begin_task_run
state = await orchestrate_task_run(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 2067, in orchestrate_task_run
task_run = await client.read_task_run(task_run.id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/orchestration.py", line 2032, in read_task_run
response = await self._client.get(f"/task_runs/{task_run_id}")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1786, in get
return await self.request(
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1559, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/base.py", line 282, in send
response = await self._send_with_retry(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/base.py", line 216, in _send_with_retry
response = await request()
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1646, in send
response = await self._send_handling_auth(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1674, in _send_handling_auth
response = await self._send_handling_redirects(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1711, in _send_handling_redirects
response = await self._send_single_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1748, in _send_single_request
response = await transport.handle_async_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 371, in handle_async_request
resp = await self._pool.handle_async_request(req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 268, in handle_async_request
raise exc
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 251, in handle_async_request
response = await connection.handle_async_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection.py", line 99, in handle_async_request
raise exc
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection.py", line 76, in handle_async_request
stream = await self._connect(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection.py", line 156, in _connect
stream = await stream.start_tls(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_backends/anyio.py", line 78, in start_tls
raise exc
File "/usr/local/lib/python3.11/site-packages/httpcore/_backends/anyio.py", line 69, in start_tls
ssl_stream = await anyio.streams.tls.TLSStream.wrap(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/streams/tls.py", line 123, in wrap
await wrapper._call_sslobject_method(ssl_object.do_handshake)
File "/usr/local/lib/python3.11/site-packages/anyio/streams/tls.py", line 163, in _call_sslobject_method
raise EndOfStream from None
anyio.EndOfStream
Hey, it's really difficult to say what is going on without a reproduction.
Can you try pinning clicked on the wrong h2 repo 🤦♂️ h2 < 4.0.0
and see if that helps? It looks like they released 4.0.0
2 weeks ago and it lines up with the timeline for your errors.
Yes, since it happens randomly it is very hard to troubleshoot. Do you know if there is a way to get more detailed logging? Would it be helpful to jump on a call?
In my requirements.txt file I am still doing anyio<4.0.0. I just tried it can it is working local now. Should we continue or is this fixed in Prefect now?
Additionally, I've attempted downgrading Prefect to prefect==2.14.10, but unfortunately, it didn't resolve the issue.
I have re-pulled our pip dependencies and confirmed that we are on the latest versions, as detailed below. Notably, h2 has not had an update since October 2021.
Here is the current status of the related packages:
aiohttp: 3.9.1 (Last updated on Nov 26, 2023) httpcore: 1.0.2 (Last updated on Nov 10, 2023) httptools: 0.6.1 (Last updated on Oct 16, 2023) httpx: 0.26.0 (Last updated on Dec 20, 2023) h11: 0.14.0 (Last updated on Sep 25, 2022) h2: 4.1.0 (Last updated on Oct 5, 2021) h3: 3.7.6 (Last updated on Nov 24, 2022) hpack: 4.0.0 (Last updated on Aug 30, 2020)
I unfortunately clicked on the wrong h2 repo and gave you some bad info, my apologies! Prefect still has anyio<4.0.0 pinned at the moment.
Was everything working before on a different prefect version and after an update you started to see the errors?
No worries! We always updated to the latest Prefect version on deployment and deploy at least once a day. This flow we are seeing the most issues with went fully live on Dec 20th and looking back at the logs it started around on Dec 25th but was once a day. Now it is many times a day.
Here is the stack trace from the 25th. If I was able to handle the exception, then it would be better so I can still allow the rest of the script to finish. But I have added try excepts to every function and it is cancelling so I am not able to handle it.
Crash detected! Execution was cancelled by the runtime environment.
09:21:11 PM
prefect.flow_runs
Crash details:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 293, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async
result = await coro
^^^^^^^^^^
File "/code/libs/ifs/flows.py", line 103, in fl_process_ifs_ingest_to_silver
await write_delta_table_to_silver(
File "/code/libs/deltalake/function.py", line 71, in write_delta_table_to_silver
storage_options = await get_delta_table_storage_options()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/code/libs/deltalake/function.py", line 39, in get_delta_table_storage_options
storage_account_access_key = await get_account_key(
^^^^^^^^^^^^^^^^^^^^^^
File "/code/libs/azure_blob_storage/functions.py", line 26, in get_account_key
) = await AzureBlobStorageCredentials.load(storage_account.value)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/blocks/core.py", line 838, in load
block_document, block_document_name = await cls._get_block_document(name)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/blocks/core.py", line 743, in _get_block_document
block_document = await client.read_block_document_by_name(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/orchestration.py", line 1403, in read_block_document_by_name
response = await self._client.get(
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1757, in get
return await self.request(
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1530, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/base.py", line 282, in send
response = await self._send_with_retry(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/base.py", line 216, in _send_with_retry
response = await request()
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1617, in send
response = await self._send_handling_auth(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1645, in _send_handling_auth
response = await self._send_handling_redirects(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1682, in _send_handling_redirects
response = await self._send_single_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1719, in _send_single_request
response = await transport.handle_async_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 366, in handle_async_request
resp = await self._pool.handle_async_request(req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 268, in handle_async_request
raise exc
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 251, in handle_async_request
response = await connection.handle_async_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection.py", line 76, in handle_async_request
stream = await self._connect(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection.py", line 124, in _connect
stream = await self._network_backend.connect_tcp(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_backends/auto.py", line 30, in connect_tcp
return await self._backend.connect_tcp(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_backends/anyio.py", line 114, in connect_tcp
stream: anyio.abc.ByteStream = await anyio.connect_tcp(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/_core/_sockets.py", line 221, in connect_tcp
await event.wait()
File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 1779, in wait
await checkpoint()
File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 447, in checkpoint
await sleep(0)
File "/usr/local/lib/python3.11/asyncio/tasks.py", line 640, in sleep
await __sleep0()
File "/usr/local/lib/python3.11/asyncio/tasks.py", line 634, in __sleep0
yield
asyncio.exceptions.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 2221, in report_flow_run_crashes
yield
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 700, in create_and_begin_subflow_run
terminal_state = await orchestrate_flow_run(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 849, in orchestrate_flow_run
result = await flow_call.aresult()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 295, in aresult
raise CancelledError() from exc
prefect._internal.concurrency.cancellation.CancelledError
Do you build images for your deployments? Any way to find out exactly what your dependencies looked like before this started and roll back to that?
Unfortunately, no I do not. I am looking at migrating us from Pip to Poetry to help dependency management going forward. Thank you for your support!!!!!
Update: I was able to lower the number of times we see this error by caching the results of the function manually. (not a solution but it is allowing us to operate) The error did occur again today, happening after the sub flow was complete from my logging. Is there a way in Prefect to catch this error? Here's the stack trace for more details.
Crash detected! Execution was interrupted by an unexpected exception: EndOfStream:
12:01:44 PM
prefect.flow_runs
Crash details:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 2299, in report_flow_run_crashes
yield
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 712, in create_and_begin_subflow_run
terminal_state = await orchestrate_flow_run(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 932, in orchestrate_flow_run
state = await propose_state(
^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 2544, in propose_state
response = await set_state_and_handle_waits(set_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 2528, in set_state_and_handle_waits
response = await set_state_func()
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/orchestration.py", line 1924, in set_flow_run_state
response = await self._client.post(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1877, in post
return await self.request(
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1559, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/base.py", line 282, in send
response = await self._send_with_retry(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/base.py", line 216, in _send_with_retry
response = await request()
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1646, in send
response = await self._send_handling_auth(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1674, in _send_handling_auth
response = await self._send_handling_redirects(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1711, in _send_handling_redirects
response = await self._send_single_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1748, in _send_single_request
response = await transport.handle_async_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 371, in handle_async_request
resp = await self._pool.handle_async_request(req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 268, in handle_async_request
raise exc
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 251, in handle_async_request
response = await connection.handle_async_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection.py", line 99, in handle_async_request
raise exc
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection.py", line 76, in handle_async_request
stream = await self._connect(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection.py", line 156, in _connect
stream = await stream.start_tls(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_backends/anyio.py", line 78, in start_tls
raise exc
File "/usr/local/lib/python3.11/site-packages/httpcore/_backends/anyio.py", line 69, in start_tls
ssl_stream = await anyio.streams.tls.TLSStream.wrap(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/streams/tls.py", line 123, in wrap
await wrapper._call_sslobject_method(ssl_object.do_handshake)
File "/usr/local/lib/python3.11/site-packages/anyio/streams/tls.py", line 163, in _call_sslobject_method
raise EndOfStream from None
anyio.EndOfStream
Not a new issue for sure, but possibly related: https://github.com/encode/httpcore/issues/808
@jakekaplan, That does look like the same stack trace we are seeing. Thank you!!
Hello,
Here is another stack trace from the one this morning.
Looking at this, it appears it is failing on pushing logs and states to Prefect Cloud. Would it be helpful to add some retry logic on these Prefect Engine functions?
Crash detected! Execution was interrupted by an unexpected exception: EndOfStream:
08:19:37 AM
prefect.flow_runs
Crash details:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 2296, in report_flow_run_crashes
yield
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 715, in create_and_begin_subflow_run
terminal_state = await orchestrate_flow_run(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 935, in orchestrate_flow_run
state = await propose_state(
^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 2541, in propose_state
response = await set_state_and_handle_waits(set_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 2525, in set_state_and_handle_waits
response = await set_state_func()
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/orchestration.py", line 1924, in set_flow_run_state
response = await self._client.post(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1877, in post
return await self.request(
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1559, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/base.py", line 282, in send
response = await self._send_with_retry(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/base.py", line 216, in _send_with_retry
response = await request()
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1646, in send
response = await self._send_handling_auth(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1674, in _send_handling_auth
response = await self._send_handling_redirects(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1711, in _send_handling_redirects
response = await self._send_single_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1748, in _send_single_request
response = await transport.handle_async_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 371, in handle_async_request
resp = await self._pool.handle_async_request(req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 268, in handle_async_request
raise exc
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 251, in handle_async_request
response = await connection.handle_async_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection.py", line 99, in handle_async_request
raise exc
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection.py", line 76, in handle_async_request
stream = await self._connect(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection.py", line 156, in _connect
stream = await stream.start_tls(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_backends/anyio.py", line 78, in start_tls
raise exc
File "/usr/local/lib/python3.11/site-packages/httpcore/_backends/anyio.py", line 69, in start_tls
ssl_stream = await anyio.streams.tls.TLSStream.wrap(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/streams/tls.py", line 123, in wrap
await wrapper._call_sslobject_method(ssl_object.do_handshake)
File "/usr/local/lib/python3.11/site-packages/anyio/streams/tls.py", line 163, in _call_sslobject_method
raise EndOfStream from None
anyio.EndOfStream
Hello,
I wanted to update. I have removed all the Prefect Tasks and moved all the logic into one Prefect Flow. We are not seeing the issue now on that specific Prefect Deployment/Flow. For the other Flows, we are still seeing the issue we are going to rewrite them to just be the one Flow.
Thanks!
Hi @bnewman-tech sorry to hear you're still seeing the issue.
The issue I linked above (https://github.com/encode/httpcore/issues/808) that I believe to be the cause seems to have been fixed ~5 days ago. It will still be a little bit before it gets into their next release, but will try and respond here once I see it merged and we can pin prefect
to that new version of httpcore
.
@jakekaplan looks like they released a new version. We’ll be on the lookout for when it gets pinned in prefect
We have seen a similar error in our Prefect 2 Agent in GCP Cloud Run (launches Vertex jobs). The agent falls over at the start of the hour when ~50 flow runs are scheduled. We recently scaled up the number of instances and don't seem to be hitting resource limits on the instances.
Prefect version: 2.14.10
Error:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/cli/_utilities.py", line 41, in wrapper
return fn(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 255, in coroutine_wrapper
return call()
File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 398, in __call__
return self.result()
File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 284, in result
return self.future.result(timeout=timeout)
File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
return self.__get_result()
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async
result = await coro
File "/usr/local/lib/python3.9/site-packages/prefect/cli/agent.py", line 209, in start
tg.start_soon(
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
raise exceptions[0]
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/services.py", line 57, in critical_service_loop
await workload()
File "/usr/local/lib/python3.9/site-packages/prefect/agent.py", line 298, in check_for_cancelled_flow_runs
typed_cancelling_flow_runs = await self.client.read_flow_runs(
File "/usr/local/lib/python3.9/site-packages/prefect/client/orchestration.py", line 1862, in read_flow_runs
response = await self._client.post("/flow_runs/filter", json=body)
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1848, in post
return await self.request(
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1530, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/usr/local/lib/python3.9/site-packages/prefect/client/base.py", line 282, in send
response = await self._send_with_retry(
File "/usr/local/lib/python3.9/site-packages/prefect/client/base.py", line 216, in _send_with_retry
response = await request()
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1617, in send
response = await self._send_handling_auth(
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1645, in _send_handling_auth
response = await self._send_handling_redirects(
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1682, in _send_handling_redirects
response = await self._send_single_request(request)
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1719, in _send_single_request
response = await transport.handle_async_request(request)
File "/usr/local/lib/python3.9/site-packages/httpx/_transports/default.py", line 366, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 268, in handle_async_request
raise exc
File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 251, in handle_async_request
response = await connection.handle_async_request(request)
File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection.py", line 103, in handle_async_request
return await self._connection.handle_async_request(request)
File "/usr/local/lib/python3.9/site-packages/httpcore/_async/http2.py", line 185, in handle_async_request
raise exc
File "/usr/local/lib/python3.9/site-packages/httpcore/_async/http2.py", line 144, in handle_async_request
await self._send_request_body(request=request, stream_id=stream_id)
File "/usr/local/lib/python3.9/site-packages/httpcore/_async/http2.py", line 261, in _send_request_body
await self._send_end_stream(request, stream_id)
File "/usr/local/lib/python3.9/site-packages/httpcore/_async/http2.py", line 280, in _send_end_stream
self._h2_state.end_stream(stream_id)
File "/usr/local/lib/python3.9/site-packages/h2/connection.py", line 883, in end_stream
frames = self.streams[stream_id].end_stream()
KeyError: 199
the upstream fix should be merged and released at this point, so you should be able to upgrade httpcore
to avoid this specific error
@zzstoatzz We've upgrade our httpcore library to 1.0.5 but we're still seeing this. We are on prefect 2.16.4 - was anything changed on the Prefect side in tandem with this fix? I'm not sure if we need to upgrade to 2.16.9 to see improvement or if it should be resolved with just the httpcore update
I've been able to reproduce this bug by rapidly running task.submit()
with a ConcurrentTaskRunner
@task()
def one_second_task():
time.sleep(1)
return 1
@flow(
log_prints=True,
task_runner=ConcurrentTaskRunner(),
)
def no_op_flow(
num_tasks: int,
task_submit_delay: float,
):
no_op_futures: list[PrefectFuture] = []
for _ in range(num_tasks):
no_op_futures.append(one_second_task.submit())
time.sleep(task_submit_delay)
print("Submitted all tasks, now waiting for futures")
[future.result(timeout=240) for future in no_op_futures]
slogger.info(f"Finished test {test}")
Deploying, then repeatedly running the deployment with task_submit_delay=0
and num_tasks=300
got me error rates ~2%. I bet you could get higher error rate with higher num_tasks
. These are running in kubernetes pods that a worker spins up, with a python:3.11-slim-buster
Docker base image.
Good news is, throttling task submission with a `time.sleep(0.25) completely eliminated the error during my testing. If anyone else is struggling with this bug, consider adding a little throttling to your task submissions!
First check
Bug summary
Occasionally, flows crash with a connection-related exception that seems to originate from h2. So far this could only be observed in longer flow runs (>2h) and seems not to be related to any specific workload.
Possibly related to https://github.com/PrefectHQ/prefect/issues/7442, https://github.com/PrefectHQ/prefect/pull/9429
Reproduction
Error
Versions
Additional context
The stream_id from the final KeyError is different for each crash.