PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.64k stars 1.65k forks source link

"RuntimeError: is bound to a different event loop" when using prefect client from sync flow in kubernetes #13181

Open carlosjourdan opened 7 months ago

carlosjourdan commented 7 months ago

First check

Bug summary

When tying to run prefect client async methods from a sync context on kubernetes, I get a "RuntimeError: is bound to a different event loop" error. I've tried using prefect.utilities.asyncutils.run_sync, asyncio.run and asyncio.get_event_loop().run_util_complete.

The code works fine locally, and calling a simple async on the k8s deployment also works. I'm only able to reproduce the error in the k8s deployment with the prefect.client methods.

Reproduction

from prefect.utilities.asyncutils import run_sync
from prefect.context import get_run_context
from prefect import flow
from prefect.deployments import Deployment
import asyncio

@flow()
def flow2():
    #This method works fine locally, but crashes on k8s
    context = get_run_context()
    prefect_client = context.client
    flow_run_id = context.flow_run.id

    flow_run_state = run_sync(prefect_client.read_flow_run(flow_run_id))
    print(flow_run_state)

@flow()
def flow1():
    #This works fine locally and on k8s
    run_sync(asyncio.sleep(10))

if __name__ == "__main__":
    Deployment.build_from_flow(flow=flow1, name="flow1", path='/source', work_pool_name="my-workpool", job_variables={"image": "my-image"}).apply()
    Deployment.build_from_flow(flow=flow2, name="flow2", path='/source', work_pool_name="my-workpool", job_variables={"image": "my-image"}).apply()

The image basically builds from python:3.11-buster, installs some packages and copies the source files to the /source dir.

This is the result of a pip freeze:

aiosqlite==0.20.0
alembic==1.13.1
altair==4.2.2
annotated-types==0.6.0
anyio==3.7.1
apprise==1.7.6
argon2-cffi==23.1.0
argon2-cffi-bindings==21.2.0
arrow==1.3.0
asgi-lifespan==2.1.0
asn1crypto==1.5.1
asttokens==2.4.1
async-timeout==4.0.3
asyncpg==0.29.0
attrs==23.2.0
azure-common==1.1.28
azure-core==1.30.1
azure-identity==1.14.1
azure-keyvault-secrets==4.7.0
beautifulsoup4==4.12.3
bleach==6.1.0
boto3==1.34.95
botocore==1.34.95
cachetools==5.3.3
certifi==2024.2.2
cffi==1.16.0
charset-normalizer==3.3.2
click==8.1.7
cloudpickle==3.0.0
colorama==0.4.6
comm==0.2.2
coolname==2.2.0
croniter==2.0.5
cryptography==42.0.5
dateparser==1.2.0
debugpy==1.8.1
decorator==5.1.1
defusedxml==0.7.1
dependency-injector==4.41.0
dnspython==2.6.1
docker==6.1.3
elastic-transport==8.1.2
elasticsearch==8.2.3
email-validator==2.1.1
entrypoints==0.4
executing==2.0.1
fastjsonschema==2.19.1
filelock==3.14.0
fqdn==1.5.1
fsspec==2024.3.1
fuzzywuzzy==0.18.0
geographiclib==2.0
geopy==2.4.1
google-auth==2.29.0
graphviz==0.20.3
great-expectations==0.15.32
greenlet==3.0.3
griffe==0.44.0
h11==0.14.0
h2==4.1.0
hpack==4.0.0
httpcore==1.0.5
httpx==0.27.0
humanize==4.9.0
hyperframe==6.0.1
idna==3.7
ijson==3.2.3
importlib-metadata==7.1.0
importlib-resources==6.1.3
ipykernel==6.29.4
ipython==8.24.0
ipython-genutils==0.2.0
ipywidgets==8.1.2
isodate==0.6.1
isoduration==20.11.0
itsdangerous==2.2.0
jedi==0.19.1
jinja2==3.1.3
jinja2-humanize-extension==0.4.0
jmespath==1.0.1
jsonpatch==1.33
jsonpointer==2.4
jsonschema==4.7.2
jupyter-client==7.4.9
jupyter-core==5.7.2
jupyter-events==0.6.3
jupyter-server==2.10.0
jupyter-server-terminals==0.5.3
jupyterlab-pygments==0.3.0
jupyterlab-widgets==3.0.10
kubernetes==29.0.0
lazy-import==0.2.2
mailbits==0.2.1
makefun==1.15.2
mako==1.3.3
markdown==3.6
markdown-it-py==3.0.0
markupsafe==2.1.5
marshmallow==3.21.1
matplotlib-inline==0.1.7
mdurl==0.1.2
mistune==3.0.2
msal==1.23.0
msal-extensions==1.1.0
nbclassic==1.0.0
nbclient==0.10.0
nbconvert==7.16.4
nbformat==5.10.4
nest-asyncio==1.6.0
notebook==6.5.6
notebook-shim==0.2.4
nslookup==1.7.0
numpy==1.24.4
oauthlib==3.2.2
orjson==3.10.1
overrides==7.7.0
packaging==24.0
pandas==2.1.4
pandocfilters==1.5.1
parso==0.8.4
pathspec==0.12.1
pendulum==2.1.2
platformdirs==4.2.1
portalocker==2.8.2
prefect==2.18.1
prometheus-client==0.20.0
prompt-toolkit==3.0.43
psutil==5.9.8
pure-eval==0.2.2
pyasn1==0.6.0
pyasn1-modules==0.4.0
pycparser==2.22
pydantic==2.7.1
pydantic-core==2.18.2
pygments==2.17.2
pyjwt==2.8.0
pyopenssl==20.0.1
pyparsing==3.1.2
pypi-simple==1.5.0
pypyodbc==1.3.6
pyrsistent==0.20.0
pysmbclient==0.1.5
pyspnego==0.10.2
python-dateutil==2.9.0.post0
python-dotenv==1.0.1
python-json-logger==2.0.7
python-multipart==0.0.9
python-slugify==8.0.4
pytz==2024.1
pytzdata==2020.1
pywin32==306
pywinpty==2.0.13
pyyaml==6.0.1
pyzmq==24.0.1
readchar==4.0.6
regex==2024.4.28
requests==2.31.0
requests-oauthlib==2.0.0
rfc3339-validator==0.1.4
rfc3986-validator==0.1.1
rich==13.7.1
rsa==4.9
ruamel-yaml==0.17.17
s3transfer==0.10.1
scipy==1.13.0
selenium==3.141.0
send2trash==1.8.3
setuptools==69.5.1
shellingham==1.5.4
six==1.14.0
smbprotocol==1.6.2
sniffio==1.3.1
snowflake-connector-python==3.10.0
snowflake-sqlalchemy==1.5.3
sortedcontainers==2.4.0
soupsieve==2.5
sqlalchemy==1.4.52
sspilib==0.1.0
stack-data==0.6.3
termcolor==2.0.1
terminado==0.18.1
text-unidecode==1.3
tinycss2==1.3.0
toml==0.10.2
tomlkit==0.12.4
toolz==0.12.1
tornado==6.4
tqdm==4.66.2
traitlets==5.14.3
truststore==0.7.0
typer==0.12.3
types-python-dateutil==2.9.0.20240316
typing-extensions==4.11.0
tzdata==2024.1
tzlocal==5.2
ujson==5.9.0
unidecode==1.3.8
uri-template==1.3.0
urllib3==1.26.18
uvicorn==0.28.1
vertica-python==1.0.5
wcwidth==0.2.13
webcolors==1.13
webdriver-manager==4.0.1
webencodings==0.5.1
websocket-client==1.8.0
websockets==12.0
widgetsnbextension==4.0.10
zipp==3.18.1

### Error

```python3
Encountered exception during execution:
Traceback (most recent call last):
  File "/venv/lib/python3.11/site-packages/prefect/engine.py", line 875, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/flows/my_flows.py", line 16, in flow2
    flow_run_state = run_sync(prefect_client.read_flow_run(flow_run_id))
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 115, in run_sync
    return asyncio.run(coroutine)
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/client/orchestration.py", line 2007, in read_flow_run
    response = await self._client.get(f"/flow_runs/{flow_run_id}")
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1801, in get
    return await self.request(
           ^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1574, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/client/base.py", line 325, in send
    response = await self._send_with_retry(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/client/base.py", line 249, in _send_with_retry
    response = await send(request, *send_args, **send_kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1661, in send
    response = await self._send_handling_auth(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1689, in _send_handling_auth
    response = await self._send_handling_redirects(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1726, in _send_handling_redirects
    response = await self._send_single_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 1763, in _send_single_request
    response = await transport.handle_async_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 373, in handle_async_request
    resp = await self._pool.handle_async_request(req)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 216, in handle_async_request
    raise exc from None
  File "/venv/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 196, in handle_async_request
    response = await connection.handle_async_request(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpcore/_async/connection.py", line 101, in handle_async_request
    return await self._connection.handle_async_request(request)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 185, in handle_async_request
    raise exc
  File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 148, in handle_async_request
    status, headers = await self._receive_response(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 292, in _receive_response
    event = await self._receive_stream_event(request, stream_id)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 333, in _receive_stream_event
    await self._receive_events(request, stream_id)
  File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 361, in _receive_events
    events = await self._read_incoming_data(request)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 452, in _read_incoming_data
    raise exc
  File "/venv/lib/python3.11/site-packages/httpcore/_async/http2.py", line 438, in _read_incoming_data
    data = await self._network_stream.read(self.READ_NUM_BYTES, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/httpcore/_backends/anyio.py", line 35, in read
    return await self._stream.receive(max_bytes=max_bytes)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/anyio/streams/tls.py", line 196, in receive
    data = await self._call_sslobject_method(self._ssl_object.read, max_bytes)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/anyio/streams/tls.py", line 138, in _call_sslobject_method
    data = await self.transport_stream.receive()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 1203, in receive
    await self._protocol.read_event.wait()
  File "/usr/local/lib/python3.11/asyncio/locks.py", line 210, in wait
    fut = self._get_loop().create_future()
          ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/asyncio/mixins.py", line 20, in _get_loop
    raise RuntimeError(f'{self!r} is bound to a different event loop')
RuntimeError:  is bound to a different event loop

Versions

# Locally

Version:             2.18.1
API version:         0.8.4
Python version:      3.11.8
Git commit:          8cff545a
Built:               Thu, Apr 25, 2024 3:40 PM
OS/Arch:             win32/AMD64
Profile:             default
Server type:         cloud

# On k8s

Version:             2.18.1
API version:         0.8.4
Python version:      3.11.4
Git commit:          8cff545a
Built:               Thu, Apr 25, 2024 3:40 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         cloud

Additional context

No response

zzstoatzz commented 7 months ago

hi @carlosjourdan - thanks for the issue! there are a couple things about this example code that you could try updating

carlosjourdan commented 7 months ago

Thanks @zzstoatzz.

To give some more context, I'm building a library that will be used by researchers at a financial institution. All their code is sync, hence the need to avoid exposing any async functions. I introduced the asyncio.sleep in the example just to show that run_sync works fine with it, but fails with the Prefect client.

Following your suggestion to use get_client(), I was able to make the code work, but I still see some things that could be addressed from this issue: