PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.25k stars 1.58k forks source link

Cannot start Orion `FastAPI` app: `Cannot assign requested address` #7860

Closed jborman-exos closed 1 year ago

jborman-exos commented 1 year ago

First check

Bug summary

I am trying to standup the Orion FastAPI app as a pytest fixture to use for internal integration testing. My fixture is based off of the hosted_orion_api fixture from prefect.testing.fixtures but runs the app as a task in the event loop.

Reproduction

pytest test_prefect.py
# test_prefect.py
import asyncio
import os
from typing import AsyncGenerator, Callable

import pytest
import pytest_asyncio
import uvicorn
from prefect import OrionClient
from prefect.orion.api.server import create_app
from pytest import TempPathFactory

@pytest_asyncio.fixture(name='orion_api_url')
async def hosted_orion_api(
    unused_tcp_port: int,
    tmp_path_factory: TempPathFactory,
) -> AsyncGenerator[str, None]:  # pragma: no cover
    """
    Runs an instance of the Orion API at a dedicated URL instead of the
    ephemeral application.

    Uses the same database as the rest of the tests.

    Yields:
        The connection string
    """

    # Disabe Orion telemetry for tests
    prefect_home = str(tmp_path_factory.mktemp('prefect').absolute())
    os.environ['PREFECT_HOME'] = prefect_home
    os.environ['PREFECT_ORION_ANALYTICS_ENABLED'] = 'false'

    config = uvicorn.Config(
        app=create_app(ephemeral=True),
        host='0.0.0.0',
        port=unused_tcp_port,
    )
    server = uvicorn.Server(config)
    task = asyncio.create_task(server.serve())

    yield f'http://localhost:{unused_tcp_port}/api'

    await server.shutdown()
    task.cancel()

@pytest.mark.asyncio
async def test_prefect(orion_api_url: str) -> None:
    client = OrionClient(orion_api_url)
    response = await client.hello()
    assert response.status_code == 200

Error

================================================================================= FAILURES =================================================================================
_______________________________________________________________________________ test_prefect _______________________________________________________________________________
anyio._backends._asyncio.ExceptionGroup: 2 exceptions were raised in the task group:
----------------------------
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/anyio/_core/_sockets.py", line 164, in try_connect
    stream = await asynclib.connect_tcp(remote_host, remote_port, local_address)
  File "/usr/local/lib/python3.10/dist-packages/anyio/_backends/_asyncio.py", line 1691, in connect_tcp
    await get_running_loop().create_connection(
  File "/usr/lib/python3.10/asyncio/base_events.py", line 1067, in create_connection
    raise exceptions[0]
  File "/usr/lib/python3.10/asyncio/base_events.py", line 1052, in create_connection
    sock = await self._connect_sock(
  File "/usr/lib/python3.10/asyncio/base_events.py", line 963, in _connect_sock
    await self.sock_connect(sock, address)
  File "/usr/lib/python3.10/asyncio/selector_events.py", line 500, in sock_connect
    return await fut
  File "/usr/lib/python3.10/asyncio/selector_events.py", line 505, in _sock_connect
    sock.connect(address)
OSError: [Errno 99] Cannot assign requested address
----------------------------
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/anyio/_core/_sockets.py", line 164, in try_connect
    stream = await asynclib.connect_tcp(remote_host, remote_port, local_address)
  File "/usr/local/lib/python3.10/dist-packages/anyio/_backends/_asyncio.py", line 1691, in connect_tcp
    await get_running_loop().create_connection(
  File "/usr/lib/python3.10/asyncio/base_events.py", line 1067, in create_connection
    raise exceptions[0]
  File "/usr/lib/python3.10/asyncio/base_events.py", line 1052, in create_connection
    sock = await self._connect_sock(
  File "/usr/lib/python3.10/asyncio/base_events.py", line 963, in _connect_sock
    await self.sock_connect(sock, address)
  File "/usr/lib/python3.10/asyncio/selector_events.py", line 500, in sock_connect
    return await fut
  File "/usr/lib/python3.10/asyncio/selector_events.py", line 535, in _sock_connect_cb
    raise OSError(err, f'Connect call failed {address}')
ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 36691)

The above exception was the direct cause of the following exception:

map = {<class 'TimeoutError'>: <class 'httpcore.ConnectTimeout'>, <class 'OSError'>: <class 'httpcore.ConnectError'>, <class 'anyio.BrokenResourceError'>: <class 'httpcore.ConnectError'>}

    @contextlib.contextmanager
    def map_exceptions(map: ExceptionMapping) -> Iterator[None]:
        try:
>           yield

/usr/local/lib/python3.10/dist-packages/httpcore/_exceptions.py:10: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <httpcore.backends.asyncio.AsyncIOBackend object at 0x7ff3d2861870>, host = 'localhost', port = 36691, timeout = 30.0, local_address = None

    async def connect_tcp(
        self,
        host: str,
        port: int,
        timeout: typing.Optional[float] = None,
        local_address: typing.Optional[str] = None,
    ) -> AsyncNetworkStream:
        exc_map = {
            TimeoutError: ConnectTimeout,
            OSError: ConnectError,
            anyio.BrokenResourceError: ConnectError,
        }
        with map_exceptions(exc_map):
            with anyio.fail_after(timeout):
>               stream: anyio.abc.ByteStream = await anyio.connect_tcp(
                    remote_host=host,
                    remote_port=port,
                    local_host=local_address,
                )

/usr/local/lib/python3.10/dist-packages/httpcore/backends/asyncio.py:109: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

remote_host = 'localhost', remote_port = 36691

    async def connect_tcp(
        remote_host: IPAddressType,
        remote_port: int,
        *,
        local_host: Optional[IPAddressType] = None,
        tls: bool = False,
        ssl_context: Optional[ssl.SSLContext] = None,
        tls_standard_compatible: bool = True,
        tls_hostname: Optional[str] = None,
        happy_eyeballs_delay: float = 0.25,
    ) -> Union[SocketStream, TLSStream]:
        """
        Connect to a host using the TCP protocol.

        This function implements the stateless version of the Happy Eyeballs algorithm (RFC 6555).
        If ``address`` is a host name that resolves to multiple IP addresses, each one is tried until
        one connection attempt succeeds. If the first attempt does not connected within 250
        milliseconds, a second attempt is started using the next address in the list, and so on.
        On IPv6 enabled systems, an IPv6 address (if available) is tried first.

        When the connection has been established, a TLS handshake will be done if either
        ``ssl_context`` or ``tls_hostname`` is not ``None``, or if ``tls`` is ``True``.

        :param remote_host: the IP address or host name to connect to
        :param remote_port: port on the target host to connect to
        :param local_host: the interface address or name to bind the socket to before connecting
        :param tls: ``True`` to do a TLS handshake with the connected stream and return a
            :class:`~anyio.streams.tls.TLSStream` instead
        :param ssl_context: the SSL context object to use (if omitted, a default context is created)
        :param tls_standard_compatible: If ``True``, performs the TLS shutdown handshake before closing
            the stream and requires that the server does this as well. Otherwise,
            :exc:`~ssl.SSLEOFError` may be raised during reads from the stream.
            Some protocols, such as HTTP, require this option to be ``False``.
            See :meth:`~ssl.SSLContext.wrap_socket` for details.
        :param tls_hostname: host name to check the server certificate against (defaults to the value
            of ``remote_host``)
        :param happy_eyeballs_delay: delay (in seconds) before starting the next connection attempt
        :return: a socket stream object if no TLS handshake was done, otherwise a TLS stream
        :raises OSError: if the connection attempt fails

        """
        # Placed here due to https://github.com/python/mypy/issues/7057
        connected_stream: Optional[SocketStream] = None

        async def try_connect(remote_host: str, event: Event) -> None:
            nonlocal connected_stream
            try:
                stream = await asynclib.connect_tcp(remote_host, remote_port, local_address)
            except OSError as exc:
                oserrors.append(exc)
                return
            else:
                if connected_stream is None:
                    connected_stream = stream
                    tg.cancel_scope.cancel()
                else:
                    await stream.aclose()
            finally:
                event.set()

        asynclib = get_asynclib()
        local_address: Optional[IPSockAddrType] = None
        family = socket.AF_UNSPEC
        if local_host:
            gai_res = await getaddrinfo(str(local_host), None)
            family, *_, local_address = gai_res[0]

        target_host = str(remote_host)
        try:
            addr_obj = ip_address(remote_host)
        except ValueError:
            # getaddrinfo() will raise an exception if name resolution fails
            gai_res = await getaddrinfo(
                target_host, remote_port, family=family, type=socket.SOCK_STREAM
            )

            # Organize the list so that the first address is an IPv6 address (if available) and the
            # second one is an IPv4 addresses. The rest can be in whatever order.
            v6_found = v4_found = False
            target_addrs: List[Tuple[socket.AddressFamily, str]] = []
            for af, *rest, sa in gai_res:
                if af == socket.AF_INET6 and not v6_found:
                    v6_found = True
                    target_addrs.insert(0, (af, sa[0]))
                elif af == socket.AF_INET and not v4_found and v6_found:
                    v4_found = True
                    target_addrs.insert(1, (af, sa[0]))
                else:
                    target_addrs.append((af, sa[0]))
        else:
            if isinstance(addr_obj, IPv6Address):
                target_addrs = [(socket.AF_INET6, addr_obj.compressed)]
            else:
                target_addrs = [(socket.AF_INET, addr_obj.compressed)]

        oserrors: List[OSError] = []
        async with create_task_group() as tg:
            for i, (af, addr) in enumerate(target_addrs):
                event = Event()
                tg.start_soon(try_connect, addr, event)
                with move_on_after(happy_eyeballs_delay):
                    await event.wait()

        if connected_stream is None:
            cause = oserrors[0] if len(oserrors) == 1 else asynclib.ExceptionGroup(oserrors)
>           raise OSError("All connection attempts failed") from cause
E           OSError: All connection attempts failed

/usr/local/lib/python3.10/dist-packages/anyio/_core/_sockets.py:222: OSError

During handling of the above exception, another exception occurred:

    @contextlib.contextmanager
    def map_httpcore_exceptions() -> typing.Iterator[None]:
        try:
>           yield

/usr/local/lib/python3.10/dist-packages/httpx/_transports/default.py:60: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <httpx.AsyncHTTPTransport object at 0x7ff3d2862350>, request = <Request('GET', 'http://localhost:36691/api/hello')>

    async def handle_async_request(
        self,
        request: Request,
    ) -> Response:
        assert isinstance(request.stream, AsyncByteStream)

        req = httpcore.Request(
            method=request.method,
            url=httpcore.URL(
                scheme=request.url.raw_scheme,
                host=request.url.raw_host,
                port=request.url.port,
                target=request.url.raw_path,
            ),
            headers=request.headers.raw,
            content=request.stream,
            extensions=request.extensions,
        )
        with map_httpcore_exceptions():
>           resp = await self._pool.handle_async_request(req)

/usr/local/lib/python3.10/dist-packages/httpx/_transports/default.py:353: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <httpcore.AsyncConnectionPool object at 0x7ff3d2862320>, request = <Request [b'GET']>

    async def handle_async_request(self, request: Request) -> Response:
        """
        Send an HTTP request, and return an HTTP response.

        This is the core implementation that is called into by `.request()` or `.stream()`.
        """
        scheme = request.url.scheme.decode()
        if scheme == "":
            raise UnsupportedProtocol(
                "Request URL is missing an 'http://' or 'https://' protocol."
            )
        if scheme not in ("http", "https"):
            raise UnsupportedProtocol(
                f"Request URL has an unsupported protocol '{scheme}://'."
            )

        status = RequestStatus(request)

        async with self._pool_lock:
            self._requests.append(status)
            await self._close_expired_connections()
            await self._attempt_to_acquire_connection(status)

        while True:
            timeouts = request.extensions.get("timeout", {})
            timeout = timeouts.get("pool", None)
            try:
                connection = await status.wait_for_connection(timeout=timeout)
            except BaseException as exc:
                # If we timeout here, or if the task is cancelled, then make
                # sure to remove the request from the queue before bubbling
                # up the exception.
                async with self._pool_lock:
                    self._requests.remove(status)
                    raise exc

            try:
                response = await connection.handle_async_request(request)
            except ConnectionNotAvailable:
                # The ConnectionNotAvailable exception is a special case, that
                # indicates we need to retry the request on a new connection.
                #
                # The most common case where this can occur is when multiple
                # requests are queued waiting for a single connection, which
                # might end up as an HTTP/2 connection, but which actually ends
                # up as HTTP/1.1.
                async with self._pool_lock:
                    # Maintain our position in the request queue, but reset the
                    # status so that the request becomes queued again.
                    status.unset_connection()
                    await self._attempt_to_acquire_connection(status)
            except BaseException as exc:
                await self.response_closed(status)
>               raise exc

/usr/local/lib/python3.10/dist-packages/httpcore/_async/connection_pool.py:253: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <httpcore.AsyncConnectionPool object at 0x7ff3d2862320>, request = <Request [b'GET']>

    async def handle_async_request(self, request: Request) -> Response:
        """
        Send an HTTP request, and return an HTTP response.

        This is the core implementation that is called into by `.request()` or `.stream()`.
        """
        scheme = request.url.scheme.decode()
        if scheme == "":
            raise UnsupportedProtocol(
                "Request URL is missing an 'http://' or 'https://' protocol."
            )
        if scheme not in ("http", "https"):
            raise UnsupportedProtocol(
                f"Request URL has an unsupported protocol '{scheme}://'."
            )

        status = RequestStatus(request)

        async with self._pool_lock:
            self._requests.append(status)
            await self._close_expired_connections()
            await self._attempt_to_acquire_connection(status)

        while True:
            timeouts = request.extensions.get("timeout", {})
            timeout = timeouts.get("pool", None)
            try:
                connection = await status.wait_for_connection(timeout=timeout)
            except BaseException as exc:
                # If we timeout here, or if the task is cancelled, then make
                # sure to remove the request from the queue before bubbling
                # up the exception.
                async with self._pool_lock:
                    self._requests.remove(status)
                    raise exc

            try:
>               response = await connection.handle_async_request(request)

/usr/local/lib/python3.10/dist-packages/httpcore/_async/connection_pool.py:237: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <AsyncHTTPConnection [CONNECTION FAILED]>, request = <Request [b'GET']>

    async def handle_async_request(self, request: Request) -> Response:
        if not self.can_handle_request(request.url.origin):
            raise RuntimeError(
                f"Attempted to send request to {request.url.origin} on connection to {self._origin}"
            )

        async with self._request_lock:
            if self._connection is None:
                try:
                    stream = await self._connect(request)

                    ssl_object = stream.get_extra_info("ssl_object")
                    http2_negotiated = (
                        ssl_object is not None
                        and ssl_object.selected_alpn_protocol() == "h2"
                    )
                    if http2_negotiated or (self._http2 and not self._http1):
                        from .http2 import AsyncHTTP2Connection

                        self._connection = AsyncHTTP2Connection(
                            origin=self._origin,
                            stream=stream,
                            keepalive_expiry=self._keepalive_expiry,
                        )
                    else:
                        self._connection = AsyncHTTP11Connection(
                            origin=self._origin,
                            stream=stream,
                            keepalive_expiry=self._keepalive_expiry,
                        )
                except Exception as exc:
                    self._connect_failed = True
>                   raise exc

/usr/local/lib/python3.10/dist-packages/httpcore/_async/connection.py:86: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <AsyncHTTPConnection [CONNECTION FAILED]>, request = <Request [b'GET']>

    async def handle_async_request(self, request: Request) -> Response:
        if not self.can_handle_request(request.url.origin):
            raise RuntimeError(
                f"Attempted to send request to {request.url.origin} on connection to {self._origin}"
            )

        async with self._request_lock:
            if self._connection is None:
                try:
>                   stream = await self._connect(request)

/usr/local/lib/python3.10/dist-packages/httpcore/_async/connection.py:63: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <AsyncHTTPConnection [CONNECTION FAILED]>, request = <Request [b'GET']>

    async def _connect(self, request: Request) -> AsyncNetworkStream:
        timeouts = request.extensions.get("timeout", {})
        timeout = timeouts.get("connect", None)

        retries_left = self._retries
        delays = exponential_backoff(factor=RETRIES_BACKOFF_FACTOR)

        while True:
            try:
                if self._uds is None:
                    kwargs = {
                        "host": self._origin.host.decode("ascii"),
                        "port": self._origin.port,
                        "local_address": self._local_address,
                        "timeout": timeout,
                    }
                    async with Trace(
                        "connection.connect_tcp", request, kwargs
                    ) as trace:
>                       stream = await self._network_backend.connect_tcp(**kwargs)

/usr/local/lib/python3.10/dist-packages/httpcore/_async/connection.py:111: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <httpcore.backends.auto.AutoBackend object at 0x7ff3d2862290>, host = 'localhost', port = 36691, timeout = 30.0, local_address = None

    async def connect_tcp(
        self,
        host: str,
        port: int,
        timeout: Optional[float] = None,
        local_address: Optional[str] = None,
    ) -> AsyncNetworkStream:
        await self._init_backend()
>       return await self._backend.connect_tcp(
            host, port, timeout=timeout, local_address=local_address
        )

/usr/local/lib/python3.10/dist-packages/httpcore/backends/auto.py:29: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <httpcore.backends.asyncio.AsyncIOBackend object at 0x7ff3d2861870>, host = 'localhost', port = 36691, timeout = 30.0, local_address = None

    async def connect_tcp(
        self,
        host: str,
        port: int,
        timeout: typing.Optional[float] = None,
        local_address: typing.Optional[str] = None,
    ) -> AsyncNetworkStream:
        exc_map = {
            TimeoutError: ConnectTimeout,
            OSError: ConnectError,
            anyio.BrokenResourceError: ConnectError,
        }
>       with map_exceptions(exc_map):

/usr/local/lib/python3.10/dist-packages/httpcore/backends/asyncio.py:107: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <contextlib._GeneratorContextManager object at 0x7ff3d28ac0a0>, typ = <class 'OSError'>, value = OSError('All connection attempts failed')
traceback = <traceback object at 0x7ff3d0a6cc00>

    def __exit__(self, typ, value, traceback):
        if typ is None:
            try:
                next(self.gen)
            except StopIteration:
                return False
            else:
                raise RuntimeError("generator didn't stop")
        else:
            if value is None:
                # Need to force instantiation so we can reliably
                # tell if we get the same exception back
                value = typ()
            try:
>               self.gen.throw(typ, value, traceback)

/usr/lib/python3.10/contextlib.py:153: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

map = {<class 'TimeoutError'>: <class 'httpcore.ConnectTimeout'>, <class 'OSError'>: <class 'httpcore.ConnectError'>, <class 'anyio.BrokenResourceError'>: <class 'httpcore.ConnectError'>}

    @contextlib.contextmanager
    def map_exceptions(map: ExceptionMapping) -> Iterator[None]:
        try:
            yield
        except Exception as exc:  # noqa: PIE786
            for from_exc, to_exc in map.items():
                if isinstance(exc, from_exc):
>                   raise to_exc(exc)
E                   httpcore.ConnectError: All connection attempts failed

/usr/local/lib/python3.10/dist-packages/httpcore/_exceptions.py:14: ConnectError

The above exception was the direct cause of the following exception:

orion_api_url = 'http://localhost:36691/api'

    @pytest.mark.asyncio
    async def test_prefect(orion_api_url: str) -> None:
        client = OrionClient(orion_api_url)
>       response = await client.hello()

test_2/test_prefect.py:50: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/local/lib/python3.10/dist-packages/prefect/client/orion.py:213: in hello
    return await self._client.get("/hello")
/usr/local/lib/python3.10/dist-packages/httpx/_client.py:1757: in get
    return await self.request(
/usr/local/lib/python3.10/dist-packages/httpx/_client.py:1533: in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
/usr/local/lib/python3.10/dist-packages/prefect/client/base.py:160: in send
    await super().send(*args, **kwargs)
/usr/local/lib/python3.10/dist-packages/httpx/_client.py:1620: in send
    response = await self._send_handling_auth(
/usr/local/lib/python3.10/dist-packages/httpx/_client.py:1648: in _send_handling_auth
    response = await self._send_handling_redirects(
/usr/local/lib/python3.10/dist-packages/httpx/_client.py:1685: in _send_handling_redirects
    response = await self._send_single_request(request)
/usr/local/lib/python3.10/dist-packages/httpx/_client.py:1722: in _send_single_request
    response = await transport.handle_async_request(request)
/usr/local/lib/python3.10/dist-packages/httpx/_transports/default.py:352: in handle_async_request
    with map_httpcore_exceptions():
/usr/lib/python3.10/contextlib.py:153: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    @contextlib.contextmanager
    def map_httpcore_exceptions() -> typing.Iterator[None]:
        try:
            yield
        except Exception as exc:  # noqa: PIE-786
            mapped_exc = None

            for from_exc, to_exc in HTTPCORE_EXC_MAP.items():
                if not isinstance(exc, from_exc):
                    continue
                # We want to map to the most specific exception we can find.
                # Eg if `exc` is an `httpcore.ReadTimeout`, we want to map to
                # `httpx.ReadTimeout`, not just `httpx.TimeoutException`.
                if mapped_exc is None or issubclass(to_exc, mapped_exc):
                    mapped_exc = to_exc

            if mapped_exc is None:  # pragma: nocover
                raise

            message = str(exc)
>           raise mapped_exc(message) from exc
E           httpx.ConnectError: All connection attempts failed

/usr/local/lib/python3.10/dist-packages/httpx/_transports/default.py:77: ConnectError

Versions

Version:             2.6.9
API version:         0.8.3
Python version:      3.10.8
Git commit:          014d093e
Built:               Tue, Nov 22, 2022 2:14 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         <client error>

Additional context

Pytest session info:

platform linux -- Python 3.10.8, pytest-7.2.0, pluggy-1.0.0
rootdir: /exos, configfile: pyproject.toml
plugins: asyncio-0.20.2, anyio-3.6.2, mock-3.10.0, cov-4.0.0, typeguard-2.13.3, docker-1.0.1
asyncio: mode=strict
collected 1 item  

OS Info:

Distributor ID: Ubuntu
Description:    Ubuntu 20.04.5 LTS
Release:        20.04
Codename:       focal
zanieb commented 1 year ago

Thanks for the thorough report! I think only OSError: [Errno 99] Cannot assign requested address is relevant; the rest is just a failed connection after that. I'm not sure what's going on here.

Is there a reason you're binding to 0.0.0.0 instead of localhost? We do a similar Uvicorn serve pattern for our login CLI: https://github.com/PrefectHQ/prefect/blob/main/src/prefect/cli/cloud.py#L101-L102

jborman-exos commented 1 year ago

Good point on the addresses in the fixture - I noticed that I was being inconsistent with my use of localhost and 0.0.0.0. That lead me to update my fixture:

@pytest_asyncio.fixture(name='orion_api_url')
async def hosted_orion_api(
    unused_tcp_port: int,
    tmp_path_factory: TempPathFactory,
) -> AsyncGenerator[str, None]:  # pragma: no cover
    """
    Runs an instance of the Orion API at a dedicated URL instead of the
    ephemeral application.

    Uses the same database as the rest of the tests.

    Yields:
        The connection string
    """

    # Disabe Orion telemetry for tests
    prefect_home = str(tmp_path_factory.mktemp('prefect').absolute())
    os.environ['PREFECT_HOME'] = prefect_home
    os.environ['PREFECT_ORION_ANALYTICS_ENABLED'] = 'false'

    host = '0.0.0.0'        # <- Works
    # host = 'localhost'    # <- Does not work
    config = uvicorn.Config(
        app=create_app(ephemeral=True),
        host=host,
        port=unused_tcp_port,
    )
    server = uvicorn.Server(config)
    task = asyncio.create_task(server.serve())

    yield f'http://{host}:{unused_tcp_port}/api'

    await server.shutdown()
    task.cancel()

I'm seeing that using 'localhost' causes the error above, but '0.0.0.0' passes. This could be related to my environment (Ubuntu Docker container)?

zanieb commented 1 year ago

Have you tried "127.0.0.1"? I'm not sure "localhost" is supported.

This could be related to my environment (Ubuntu Docker container)?

I'm not sure, it seems like it should behave fine in a container.

jborman-exos commented 1 year ago

Have you tried "127.0.0.1"? I'm not sure "localhost" is supported.

Yep, that was it 😄 Thanks for the help @madkinsz