PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.97k stars 1.57k forks source link

Dask task scheduling hangs, then eventually flow fails with PoolTimeout #12877

Open arneyjfs opened 5 months ago

arneyjfs commented 5 months ago

First check

Bug summary

I have a flow run which triggers about 2,000 tasks. I am using the DaskTaskRunner to handle running this many tasks at scale. After about 200 tasks are scheduled, scheduling hangs. After this point the flow continues for about 5 minutes, with the scheduled tasks running fine but no more being scheduled. All tasks that managed to be scheduled eventually complete and the flow hangs in a running status, not finished, but with not picking showing any new tasks starting. After a seemingly variable amount of time (0-5mins?), the flow crashes with a PoolTimeout trying to reach <Private-Prefect-Server-IP>/api/task_runs/

Detailed sequential walkthrough of error

All logs are fine, apart from eventually reporting the crash in various ways. The docker logs on the prefect-worker are the only logs with a bit more info as shown in the stack trace provided below.

There is clearly a bottleneck here but I am struggling to find where it is. The cpu and memory stays at a normal level for all VMs. Thinking the bottleneck may be the prefect-server database (after reading similar issues) I moved to a Cloud SQL managed postgreSQL database. That has not fixed it - the database shows low loading and the issue still happens.

Reproduction

# I managed to reproduce the problem in a simpler way with the below code (running from prefect.serve). This seems to result in a similar error (though about 900 tasks are scheduled before grinding to a halt. Maybe because it is simpler code/not run through docker, or because my laptop is handling the sheduling and is higher spec? but still no out of the ordinary resource usage at the point it hangs)

import os
from prefect import task, flow, get_run_logger, serve
from prefect.runtime import task_run
from prefect_dask import DaskTaskRunner

def generate_task_name():
    parameters = task_run.parameters
    task_number = parameters["task_number"]
    return str(task_number)

@task(name="example_task",
      task_run_name=generate_task_name,
      log_prints=True)
def example_task(task_number):
    logger = get_run_logger()
    logger.info(f'I am task {task_number}')

@flow(name="example_flow",
      log_prints=True,
      task_runner=DaskTaskRunner(address="tcp://10.128.15.193:8786"))
def example_flow():
    logger = get_run_logger()

    args = list(range(2000))
    logger.info(f'Triggering {len(args)} tasks')

    example_task.map(args)

if __name__ == '__main__':
    repro_deploy = example_flow.to_deployment(
        name='reproduce_error',
        work_pool_name='default-agent-pool')
    serve(repro_deploy)

Error

15:38:42.341 | INFO    | Flow run 'flow-1234' - Submitted task run 'exampleDaskTask-201' for execution.
15:38:42.343 | INFO    | Flow run 'flow-1234' - Created task run 'exampleDaskTask-202' for task 'exampleDaskTask'
15:38:42.350 | INFO    | Flow run 'flow-1234' - Submitted task run 'exampleDaskTask-202' for execution.
15:38:42.352 | INFO    | Flow run 'flow-1234' - Created task run 'exampleDaskTask-203' for task 'exampleDaskTask'
15:38:42.359 | INFO    | Flow run 'flow-1234' - Submitted task run 'exampleDaskTask-203' for execution.
15:43:46.511 | INFO    | httpx - HTTP Request: POST http://<Prefect-Server-IP>:4200/api/logs/ "HTTP/1.1 201 Created"
15:45:03.206 | ERROR   | distributed.client -
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/distributed/utils.py", line 832, in wrapper
    return await func(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/distributed/client.py", line 1715, in _close
    await self.scheduler_comm.close()
asyncio.exceptions.CancelledError
15:45:03.210 | ERROR   | Flow run 'amazing-mastodon' - Crash detected! Request to <Prefect-Server-IP>:4200/api/task_runs/ failed: PoolTimeout: .
15:45:04.944 | INFO    | httpx - HTTP Request: POST <Prefect-Server-IP>/api/logs/ "HTTP/1.1 201 Created"
15:51:59.305 | ERROR   | prefect.engine - Engine execution of flow run '0ee09f65-828c-43a3-9112-77f2e84cbcea' exited with unexpected exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/httpx/_transports/default.py", line 69, in map_httpcore_exceptions
    yield
  File "/usr/local/lib/python3.10/dist-packages/httpx/_transports/default.py", line 373, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/usr/local/lib/python3.10/dist-packages/httpcore/_async/connection_pool.py", line 216, in handle_async_request
    raise exc from None
  File "/usr/local/lib/python3.10/dist-packages/httpcore/_async/connection_pool.py", line 192, in handle_async_request
    connection = await pool_request.wait_for_connection(timeout=timeout)
  File "/usr/local/lib/python3.10/dist-packages/httpcore/_async/connection_pool.py", line 35, in wait_for_connection
    await self._connection_acquired.wait(timeout=timeout)
  File "/usr/local/lib/python3.10/dist-packages/httpcore/_synchronization.py", line 148, in wait
    with map_exceptions(anyio_exc_map):
  File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.10/dist-packages/httpcore/_exceptions.py", line 14, in map_exceptions
    raise to_exc(exc) from exc
httpcore.PoolTimeout

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/prefect/engine.py", line 2997, in <module>
    enter_flow_run_engine_from_subprocess(flow_run_id)
  File "/usr/local/lib/python3.10/dist-packages/prefect/engine.py", line 322, in enter_flow_run_engine_from_subprocess
    state = from_sync.wait_for_call_in_loop_thread(
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
  File "/usr/local/lib/python3.10/dist-packages/prefect/client/utilities.py", line 78, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/prefect/engine.py", line 477, in retrieve_flow_then_begin_flow_run
    return await begin_flow_run(
  File "/usr/local/lib/python3.10/dist-packages/prefect/engine.py", line 514, in begin_flow_run
    async with AsyncExitStack() as stack:
  File "/usr/lib/python3.10/contextlib.py", line 714, in __aexit__
    raise exc_details[1]
  File "/usr/lib/python3.10/contextlib.py", line 697, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/usr/lib/python3.10/contextlib.py", line 217, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "/usr/local/lib/python3.10/dist-packages/prefect/engine.py", line 2344, in report_flow_run_crashes
    flow_run_state = await propose_state(client, state, flow_run_id=flow_run.id)
  File "/usr/local/lib/python3.10/dist-packages/prefect/client/orchestration.py", line 2117, in set_flow_run_state
    response = await self._client.post(
  File "/usr/local/lib/python3.10/dist-packages/httpx/_client.py", line 1892, in post
    return await self.request(
  File "/usr/local/lib/python3.10/dist-packages/httpx/_client.py", line 1574, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/lib/python3.10/dist-packages/prefect/client/base.py", line 311, in send
    response = await self._send_with_retry(
  File "/usr/local/lib/python3.10/dist-packages/prefect/client/base.py", line 235, in _send_with_retry
    response = await send(request, *send_args, **send_kwargs)
  File "/usr/local/lib/python3.10/dist-packages/httpx/_client.py", line 1661, in send
    response = await self._send_handling_auth(
  File "/usr/local/lib/python3.10/dist-packages/httpx/_client.py", line 1689, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/usr/local/lib/python3.10/dist-packages/httpx/_client.py", line 1726, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/usr/local/lib/python3.10/dist-packages/httpx/_client.py", line 1763, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/usr/local/lib/python3.10/dist-packages/httpx/_transports/default.py", line 372, in handle_async_request
    with map_httpcore_exceptions():
  File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.10/dist-packages/httpx/_transports/default.py", line 86, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.PoolTimeout
15:52:01.736 | ERROR   | prefect.flow_runs.runner - Process for flow run 'amazing-mastodon' exited with status code: 1
15:52:01.802 | INFO    | prefect.flow_runs.runner - Reported flow run '0ee09f65-828c-43a3-9112-77f2e84cbcea' as crashed: Flow run process exited with non-zero status code 1.

Versions

# for server and worker the output is the same
Version:             2.16.6
API version:         0.8.4
Python version:      3.10.12
Git commit:          3fecd435
Built:               Sat, Mar 23, 2024 4:06 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         server

Additional context

This may be a terrible architecture, so if the solution is to use prefect in a diifferent way then I would also welcome so advice here. A post to the prefect discourse on how to do something like this did not lead to any advice on good architecture patterns. For context I tried another approach of using prefect workers to distribute tasks out to other VMs (i.e. instead of 1-prefect-server => 1-prefect-worker => 1-dask-cluster => many dask-workers, i tried 1-prefect-server => many prefect-workers), but this crashed in a similar way (failed to reach api). I could also simply use prefect to trigger the flow and call dask normally from within the flow. Since it seems to be prefect struggling to keep track of all the tasks this would likely help - however I would lose the greatly increased visibility and logging that prefect provides at the task level

arneyjfs commented 5 months ago

For some additional context, I removed the prefect @task decorator from the task to let Dask handle scaling the tasks natively and I was easily able to scale to 1,000,000+ tasks. I think I can therefore also rule out the dask server as being a bottleneck here

arneyjfs commented 5 months ago

Has anyone been able to reproduce this? @desertaxle? Perhaps there is some more information I can provide

tkramer-motion commented 1 month ago

I get the same error when submitting a large number of tasks using the ConcurrentTaskRunner

peakv-dersek commented 1 month ago

We occasionally see this on regular tasks/flows as well:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 872, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
  File "/opt/prefect/flows/extraction/tiktok/tiktok_e_accounts.py", line 122, in extract_adgroups
    await asyncio.gather(*tasks)
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 150, in wait_for_call_in_loop_thread
    return call.result()
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1600, in get_task_call_return_value
    return await future._result()
  File "/usr/local/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/usr/local/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/lib/python3.10/site-packages/prefect/task_runners.py", line 231, in submit
    result = await call()
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1851, in begin_task_run
    state = await orchestrate_task_run(
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 2033, in orchestrate_task_run
    state = await propose_state(
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 2576, in propose_state
    response = await set_state_and_handle_waits(set_state)
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 2563, in set_state_and_handle_waits
    response = await set_state_func()
  File "/usr/local/lib/python3.10/site-packages/prefect/client/orchestration.py", line 2317, in set_task_run_state
    response = await self._client.post(
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1892, in post
    return await self.request(
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1574, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 311, in send
    response = await self._send_with_retry(
  File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 235, in _send_with_retry
    response = await send(request, *send_args, **send_kwargs)
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1661, in send
    response = await self._send_handling_auth(
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1689, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1726, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1763, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py", line 372, in handle_async_request
    with map_httpcore_exceptions():
  File "/usr/local/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py", line 86, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.PoolTimeout

We're using prefect 2.16.9 in our jobs

Based on https://github.com/PrefectHQ/prefect/blob/2.16.9/src/prefect/client/base.py#L325 it looks like there is a retry for the httpx.PoolTimeout. By default the send will retry 5 times, so it's odd that it's still having issues by then unless the API is seriously overloaded or there are too many connections being initiated simultaneously on the process/container/host.

We use a kubernetes backend with a fair amount of asyncio coroutines (and each coroutine may make calls to the prefect for blocks or rate limiting). It's common for us to have 100-200 simultaneously running flows. The flow runs are distributed across ~25 small (1-2 CPU) hosts. Since our workflows are io-bound, we never see CPU maxed.

samdchamberlain commented 1 month ago

Is there any known solution to get around this issue? We are facing similar problems with spinning up many mapped tasks / DaskTaskRunner and are unable to make progress around this point

peakv-dersek commented 1 month ago

I was considering changing the PREFECT_API_REQUEST_TIMEOUT and PREFECT_CLIENT_MAX_RETRIES, but those default to 60.0 and 5, which means it should theoretically try for > 300 seconds (with 2**{retry_num} backoff on _send_with_retry). I'd be surprised if it needs that much time if it's just a transient issue.

That said, maybe it's possible that the retries aren't actually going through, or the pool gets into a bad state. Would probably need to work in some debug logs to figure out what might be going on (at least within the _send_with_retries code). If I can get some free time, I'll try to patch things in our dev code image and run the reproduction case provided above.

desertaxle commented 1 month ago

I've seen improvement in httpx.PoolTimeout errors by disabling HTTP2 in the Prefect client. You can disable HTTP2 by setting PREFECT_API_ENABLE_HTTP2=False in your profile.

There's also been a big reduction in the number of API calls made during task orchestration in 3.0, so it might be worthwhile to try out 3.0 and see if there's improvement for your use case!

peakv-dersek commented 1 month ago

We're already using PREFECT_API_ENABLE_HTTP2=False and still see this a fair amount (though I don't remember if it's an improvement from having it set to True)

samdchamberlain commented 1 month ago

Similarly, we've tried updating PREFECT_API_ENABLE_HTTP2=False and are still seeing failures with runs across ~1500 mapped tasks. When looking directly at DEBUG logs we are able to surface the following logs showing Prefect worker threads shutting down before all goes quiet. We are unfortunately unable to upgrade to Prefect3 as our organization is using Prefect2.

 ERROR 2024-08-22T18:00:54.010352253Z [resource.labels.taskName: workerpool0-0] 18:00:53.254 | DEBUG | APILogWorkerThread | prefect._internal.concurrency - Running call get(timeout=1.8705078409999487) in thread 'APILogWorkerThread'
 ERROR 2024-08-22T18:00:54.010356674Z [resource.labels.taskName: workerpool0-0] 18:00:53.254 | DEBUG | APILogWorkerThread | prefect._internal.concurrency - Running call get(timeout=1.8681604759999573) in thread 'APILogWorkerThread'
 ERROR 2024-08-22T18:00:54.010360610Z [resource.labels.taskName: workerpool0-0] 18:00:53.296 | DEBUG | APILogWorkerThread | prefect._internal.concurrency - Running call get(timeout=1.9999872829999958) in thread 'APILogWorkerThread'
 ERROR 2024-08-22T18:00:54.010364465Z [resource.labels.taskName: workerpool0-0] 18:00:53.394 | INFO | WorkerThread-23 | prefect._internal.concurrency - Exiting worker thread 'WorkerThread-23'
 ERROR 2024-08-22T18:00:54.010371081Z [resource.labels.taskName: workerpool0-0] 18:00:53.413 | INFO | WorkerThread-30 | prefect._internal.concurrency - Exiting worker thread 'WorkerThread-30'
 ERROR 2024-08-22T18:00:54.010386549Z [resource.labels.taskName: workerpool0-0] 18:00:53.545 | DEBUG | APILogWorkerThread | prefect._internal.concurrency - Running call get(timeout=1.5788529749999611) in thread 'APILogWorkerThread'
 ERROR 2024-08-22T18:00:54.010390523Z [resource.labels.taskName: workerpool0-0] 18:00:53.546 | DEBUG | EventsWorkerThread | prefect._internal.concurrency - Running call get() in thread 'EventsWorkerThread'
 ERROR 2024-08-22T18:00:54.010394558Z [resource.labels.taskName: workerpool0-0] 18:00:53.553 | DEBUG | APILogWorkerThread | prefect._internal.concurrency - Running call get(timeout=1.743925106000006) in thread 'APILogWorkerThread'
 ERROR 2024-08-22T18:00:54.010399764Z [resource.labels.taskName: workerpool0-0] 18:00:53.554 | DEBUG | EventsWorkerThread | prefect._internal.concurrency - Running call get() in thread 'EventsWorkerThread'
 ERROR 2024-08-22T18:00:54.010404003Z [resource.labels.taskName: workerpool0-0] 18:00:53.634 | INFO | APILogWorkerThread | prefect._internal.concurrency - Exiting worker thread 'APILogWorkerThread'

Finally, when it fails. PoolTimeout crash details are as follows:

Crash details:

peakv-dersek commented 1 month ago

Looking into the nodes seeing PoolTimeouts, I noticed we had a flow run that can run up to 100-150 tasks concurrently through asyncio. Each task uses prefect async ratelimit to throttle requests for data extraction from an API - in addition to any connections used to manage prefect task/flow logs or state, so there may be a fair number of active connections.

After I reduced the parallelism to 50-75, I haven't seen any more pool timeouts whereas we saw them regularly before.

If we can confirm that the retries are happening as expected (5 times over at least 300 seconds), then I'm inclined to believe it's more of an infrastructure connection limitation issue per process/pod/host/NAT/prefect workspace/prefect API. I'm not familiar with how the connections work at the task level, but if any tasks have their own connections, maybe having a common manager through which all tasks interface would help? Otherwise there may be a physical limit to how many tasks can start/run/finish simultaneously.

desertaxle commented 1 month ago

@peakv-dersek thanks for sharing! It makes sense that the pool timeout errors decreased after decreasing the number of parallel tasks based on what I'm seeing in the httpx docs about pool timeouts. It seems like increasing the max connections on the httpx client could help with these issues at higher concurrency/parallelism (as long the machine has enough resources).

Looks like the client sets the max_connections here:https://github.com/PrefectHQ/prefect/blob/cb2874fea36e5ec8789a1cdfdf9955238e75093e/src/prefect/client/orchestration.py#L325 16 is quite a bit lower than the httpx default. I think it's reasonable to make that limit configurable via a setting (something like PREFECT_API_MAX_CONNECTIONS).

peakv-dersek commented 1 month ago

It's the comment above it that concerns me: https://github.com/PrefectHQ/prefect/blob/cb2874fea36e5ec8789a1cdfdf9955238e75093e/src/prefect/client/orchestration.py#L323-L324

Seems like it might be an issue either way. Alternatively, maybe we could try lowering it further if the task parallelism is increasing.

Next step I think is to do some exploration of the PrefectClient within each task - are they independent instances? Are the connections independent?

https://github.com/encode/httpx/discussions/1633#discussioncomment-717658

Yes. HTTPX is intended to be thread-safe, and yes, a single client-instance across all threads will do better in terms of connection pooling, than using an instance-per-thread.

Makes me think that we should be setting up a client (or at least a single connection pool) per process and sharing that among the tasks

peakv-dersek commented 1 month ago

TL;DR - issue doesn't seem to be client-side. Potential workaround is increasing PREFECT_API_REQUEST_TIMEOUT to 300 or more for the infra spec of work pools that will be expected to run flows with parallelization on tasks or other entities that require the prefect API.

Tried out a few things within a running flow using an asyncio repro case (similar to the original repro, but using async def on the task & flow and asyncio.gather to schedule the tasks and wait for them to complete), I see the pool timeouts as well.

Interestingly, it looks like all task creation is happening on the same Client and it isn't able to create all the tasks in time.

# python -m repro
Thread: 140084441921216 Created client <prefect.client.base.PrefectHttpxClient object at 0x7f67f367a650>
22:24:26.122 | INFO    | prefect.engine - Created flow run 'determined-elk' for flow 'example_flow'
22:24:26.124 | INFO    | Flow run 'determined-elk' - View at https://app.prefect.cloud/account/<redacted>/workspace/<redacted>/flow-runs/flow-run/<redacted>
22:24:26.364 | INFO    | Flow run 'determined-elk' - Triggering 2000 tasks
22:24:27.264 | INFO    | Flow run 'determined-elk' - Thread: 140084441921216 Created client <prefect.client.base.PrefectHttpxClient object at 0x7f67f01ebf40>
22:27:00.463 | INFO    | Flow run 'determined-elk' - Retryable exception in thread 140084441921216: <class 'httpx.PoolTimeout'>, PoolTimeout('')
22:27:00.572 | INFO    | Flow run 'determined-elk' - Retryable exception in thread 140084441921216: <class 'httpx.PoolTimeout'>, PoolTimeout('')
22:27:00.764 | INFO    | Flow run 'determined-elk' - Retryable exception in thread 140084441921216: <class 'httpx.PoolTimeout'>, PoolTimeout('')
22:27:00.868 | INFO    | Flow run 'determined-elk' - Retryable exception in thread 140084441921216: <class 'httpx.PoolTimeout'>, PoolTimeout('')
22:27:00.972 | INFO    | Flow run 'determined-elk' - Retryable exception in thread 140084441921216: <class 'httpx.PoolTimeout'>, PoolTimeout('')
22:27:01.164 | INFO    | Flow run 'determined-elk' - Retryable exception in thread 140084441921216: <class 'httpx.PoolTimeout'>, PoolTimeout('')
22:27:01.267 | INFO    | Flow run 'determined-elk' - Retryable exception in thread 140084441921216: <class 'httpx.PoolTimeout'>, PoolTimeout('')
...

I tried increasing the timeout to 600 seconds (PREFECT_API_REQUEST_TIMEOUT) and saw it working at the n=2000 case:

22:29:01.107 | INFO    | prefect.engine - Created flow run 'opalescent-shellfish' for flow 'example_flow'
22:29:01.109 | INFO    | Flow run 'opalescent-shellfish' - View at https://app.prefect.cloud/account/<redacted>/workspace/<redacted>/flow-runs/flow-run/<redacted>
22:29:01.360 | INFO    | Flow run 'opalescent-shellfish' - Triggering 2000 tasks
22:29:02.374 | INFO    | Flow run 'opalescent-shellfish' - Thread: 140693059696320 Created client <prefect.client.base.PrefectHttpxClient object at 0x7ff59c043c40>
22:31:25.574 | INFO    | Flow run 'opalescent-shellfish' - Created task run 'example_task-10' for task 'example_task'
22:31:25.575 | INFO    | Flow run 'opalescent-shellfish' - Executing 'example_task-10' immediately...
22:31:25.775 | INFO    | Flow run 'opalescent-shellfish' - Created task run 'example_task-9' for task 'example_task'
22:31:25.776 | INFO    | Flow run 'opalescent-shellfish' - Executing 'example_task-9' immediately...
22:31:26.471 | INFO    | Flow run 'opalescent-shellfish' - Created task run 'example_task-11' for task 'example_task'
22:31:26.472 | INFO    | Flow run 'opalescent-shellfish' - Executing 'example_task-11' immediately...
22:31:26.764 | INFO    | Flow run 'opalescent-shellfish' - Created task run 'example_task-12' for task 'example_task'
22:31:26.764 | INFO    | Flow run 'opalescent-shellfish' - Executing 'example_task-12' immediately...
...

It takes the client 2.5 minutes to ~start~ finish getting all the tasks actually created in the api.

Increasing the max connections to 1024 while keeping the default timeout: It sends all the requests, but doesn't seem to make any progress even after 10 minutes (not even a pool timeout error). Logging the send requests, after submitting the initial 1024 very quickly, it's only able to do 1 task per second for the rest. (The previous approach maintains a much faster throughput)

In terms of flow, all the task creation POST are handled by a single prefect client (from the flow), so if too many are created at once, it seems like the pool gets backed up. There are also POST to task run /set_state and GET to the task id all done within the flow (same client) around the time of task creation. From there the task takes over and starts making its own calls the to API. I didn't test out the parallelism of the tasks themselves in my setup, but it seemed like the initial creation was the problem anyway.

Side note: I'm not exactly sure why the task creation/execution doesn't seem very asynchronous (all /task_runs POST requests individual tasks start running and trying to set their state), but that's something for another day...

Will probably need to look at the Dask version of this since it seems like that does run tasks as they get created rather than waiting for all of them, so I'd want to make sure it's still an issue on the task creation pool.