Closed alafanechere closed 1 year ago
Well laid out @alafanechere 👏
Some great benefits here are
So far im a tentative thumbs up!
One question, Is there any other known challenges to "dagger in dagger"?
+1 from me too! The benefits are real, and the more airbyte code we can standardize, the better.
I'd suggest time boxing this. If there are dagger-in-dagger problems, let's only spend a few days on this, as it is nice-to-have, not required
Thank you for your inputs @bnchrch and @evantahler!
I asked a couple of month ago to the dagger team if dagger in dagger
is tricky, they told it was not because they follow this paradigm for their tests.
I think it's indeed worth timeboxing / spiking this: not more than 1.5 day.
The hypothesis we came up with the Dagger team was the following:
The import_
operation is not cached throughout sessions, as a new Dagger session is instantiated we pay the cost of loading the tarball to a container on each ConnectorRunner
method call.
I refactored the CAT code to use a single Dagger sessions. But the "hanging test" error remain and finish with timeout issue:
source-postgres
and source-openweather
[35m778:[0m > in [34mTest Pipeline - source-postgres[0m
[35m778:[0m [1253.2s] ============================= test session starts ==============================
[35m778:[0m [1253.2s] platform linux -- Python 3.10.12, pytest-6.2.5, py-1.11.0, pluggy-1.2.0
[35m778:[0m [1253.2s] rootdir: /test_input
[35m778:[0m [1253.2s] plugins: hypothesis-6.82.0, anyio-3.7.1, requests-mock-1.9.3, timeout-1.4.2, sugar-0.9.7, mock-3.6.1, custom-exit-code-0.3.0, cov-3.0.0
[35m778:[0m [1253.2s] collected 76 items / 1 skipped / 75 selected
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] test_core.py ........................................ss................. [ 77%]
[35m778:[0m [1253.2s] .......ssssFFFF [ 97%]
[35m778:[0m [1253.2s] test_full_refresh.py FFE [100%]
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] ==================================== ERRORS ====================================
[35m778:[0m [1253.2s] _____ ERROR at teardown of TestFullRefresh.test_sequential_reads[inputs1] ______
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] map = {<class 'TimeoutError'>: <class 'httpcore.ReadTimeout'>, <class 'anyio.BrokenResourceError'>: <class 'httpcore.ReadError'>, <class 'anyio.ClosedResourceError'>: <class 'httpcore.ReadError'>}
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] @contextlib.contextmanager
[35m778:[0m [1253.2s] def map_exceptions(map: ExceptionMapping) -> Iterator[None]:
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] > yield
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/httpcore/_exceptions.py:10:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <httpcore._backends.anyio.AnyIOStream object at 0x7f958496a470>
[35m778:[0m [1253.2s] max_bytes = 65536, timeout = None
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def read(
[35m778:[0m [1253.2s] self, max_bytes: int, timeout: typing.Optional[float] = None
[35m778:[0m [1253.2s] ) -> bytes:
[35m778:[0m [1253.2s] exc_map = {
[35m778:[0m [1253.2s] TimeoutError: ReadTimeout,
[35m778:[0m [1253.2s] anyio.BrokenResourceError: ReadError,
[35m778:[0m [1253.2s] anyio.ClosedResourceError: ReadError,
[35m778:[0m [1253.2s] }
[35m778:[0m [1253.2s] with map_exceptions(exc_map):
[35m778:[0m [1253.2s] with anyio.fail_after(timeout):
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] > return await self._stream.receive(max_bytes=max_bytes)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/httpcore/_backends/anyio.py:34:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <anyio._backends._asyncio.SocketStream object at 0x7f9584968400>
[35m778:[0m [1253.2s] max_bytes = 65536
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def receive(self, max_bytes: int = 65536) -> bytes:
[35m778:[0m [1253.2s] with self._receive_guard:
[35m778:[0m [1253.2s] await checkpoint()
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] if (
[35m778:[0m [1253.2s] not self._protocol.read_event.is_set()
[35m778:[0m [1253.2s] and not self._transport.is_closing()
[35m778:[0m [1253.2s] ):
[35m778:[0m [1253.2s] self._transport.resume_reading()
[35m778:[0m [1253.2s] await self._protocol.read_event.wait()
[35m778:[0m [1253.2s] self._transport.pause_reading()
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] chunk = self._protocol.read_queue.popleft()
[35m778:[0m [1253.2s] except IndexError:
[35m778:[0m [1253.2s] if self._closed:
[35m778:[0m [1253.2s] > raise ClosedResourceError from None
[35m778:[0m [1253.2s] E anyio.ClosedResourceError
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:1210: ClosedResourceError
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] The above exception was the direct cause of the following exception:
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] @contextlib.contextmanager
[35m778:[0m [1253.2s] def map_httpcore_exceptions() -> typing.Iterator[None]:
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] > yield
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/httpx/_transports/default.py:60:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <httpx.AsyncHTTPTransport object at 0x7f958496ba00>
[35m778:[0m [1253.2s] request = <Request('POST', 'http://127.0.0.1:36711/query')>
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def handle_async_request(
[35m778:[0m [1253.2s] self,
[35m778:[0m [1253.2s] request: Request,
[35m778:[0m [1253.2s] ) -> Response:
[35m778:[0m [1253.2s] assert isinstance(request.stream, AsyncByteStream)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] req = httpcore.Request(
[35m778:[0m [1253.2s] method=request.method,
[35m778:[0m [1253.2s] url=httpcore.URL(
[35m778:[0m [1253.2s] scheme=request.url.raw_scheme,
[35m778:[0m [1253.2s] host=request.url.raw_host,
[35m778:[0m [1253.2s] port=request.url.port,
[35m778:[0m [1253.2s] target=request.url.raw_path,
[35m778:[0m [1253.2s] ),
[35m778:[0m [1253.2s] headers=request.headers.raw,
[35m778:[0m [1253.2s] content=request.stream,
[35m778:[0m [1253.2s] extensions=request.extensions,
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s] with map_httpcore_exceptions():
[35m778:[0m [1253.2s] > resp = await self._pool.handle_async_request(req)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/httpx/_transports/default.py:353:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <httpcore.AsyncConnectionPool object at 0x7f958496a200>
[35m778:[0m [1253.2s] request = <Request [b'POST']>
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def handle_async_request(self, request: Request) -> Response:
[35m778:[0m [1253.2s] """
[35m778:[0m [1253.2s] Send an HTTP request, and return an HTTP response.
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] This is the core implementation that is called into by `.request()` or `.stream()`.
[35m778:[0m [1253.2s] """
[35m778:[0m [1253.2s] scheme = request.url.scheme.decode()
[35m778:[0m [1253.2s] if scheme == "":
[35m778:[0m [1253.2s] raise UnsupportedProtocol(
[35m778:[0m [1253.2s] "Request URL is missing an 'http://' or 'https://' protocol."
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s] if scheme not in ("http", "https", "ws", "wss"):
[35m778:[0m [1253.2s] raise UnsupportedProtocol(
[35m778:[0m [1253.2s] f"Request URL has an unsupported protocol '{scheme}://'."
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] status = RequestStatus(request)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async with self._pool_lock:
[35m778:[0m [1253.2s] self._requests.append(status)
[35m778:[0m [1253.2s] await self._close_expired_connections()
[35m778:[0m [1253.2s] await self._attempt_to_acquire_connection(status)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] while True:
[35m778:[0m [1253.2s] timeouts = request.extensions.get("timeout", {})
[35m778:[0m [1253.2s] timeout = timeouts.get("pool", None)
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] connection = await status.wait_for_connection(timeout=timeout)
[35m778:[0m [1253.2s] except BaseException as exc:
[35m778:[0m [1253.2s] # If we timeout here, or if the task is cancelled, then make
[35m778:[0m [1253.2s] # sure to remove the request from the queue before bubbling
[35m778:[0m [1253.2s] # up the exception.
[35m778:[0m [1253.2s] async with self._pool_lock:
[35m778:[0m [1253.2s] # Ensure only remove when task exists.
[35m778:[0m [1253.2s] if status in self._requests:
[35m778:[0m [1253.2s] self._requests.remove(status)
[35m778:[0m [1253.2s] raise exc
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] response = await connection.handle_async_request(request)
[35m778:[0m [1253.2s] except ConnectionNotAvailable:
[35m778:[0m [1253.2s] # The ConnectionNotAvailable exception is a special case, that
[35m778:[0m [1253.2s] # indicates we need to retry the request on a new connection.
[35m778:[0m [1253.2s] #
[35m778:[0m [1253.2s] # The most common case where this can occur is when multiple
[35m778:[0m [1253.2s] # requests are queued waiting for a single connection, which
[35m778:[0m [1253.2s] # might end up as an HTTP/2 connection, but which actually ends
[35m778:[0m [1253.2s] # up as HTTP/1.1.
[35m778:[0m [1253.2s] async with self._pool_lock:
[35m778:[0m [1253.2s] # Maintain our position in the request queue, but reset the
[35m778:[0m [1253.2s] # status so that the request becomes queued again.
[35m778:[0m [1253.2s] status.unset_connection()
[35m778:[0m [1253.2s] await self._attempt_to_acquire_connection(status)
[35m778:[0m [1253.2s] except BaseException as exc:
[35m778:[0m [1253.2s] with AsyncShieldCancellation():
[35m778:[0m [1253.2s] await self.response_closed(status)
[35m778:[0m [1253.2s] > raise exc
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py:262:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <httpcore.AsyncConnectionPool object at 0x7f958496a200>
[35m778:[0m [1253.2s] request = <Request [b'POST']>
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def handle_async_request(self, request: Request) -> Response:
[35m778:[0m [1253.2s] """
[35m778:[0m [1253.2s] Send an HTTP request, and return an HTTP response.
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] This is the core implementation that is called into by `.request()` or `.stream()`.
[35m778:[0m [1253.2s] """
[35m778:[0m [1253.2s] scheme = request.url.scheme.decode()
[35m778:[0m [1253.2s] if scheme == "":
[35m778:[0m [1253.2s] raise UnsupportedProtocol(
[35m778:[0m [1253.2s] "Request URL is missing an 'http://' or 'https://' protocol."
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s] if scheme not in ("http", "https", "ws", "wss"):
[35m778:[0m [1253.2s] raise UnsupportedProtocol(
[35m778:[0m [1253.2s] f"Request URL has an unsupported protocol '{scheme}://'."
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] status = RequestStatus(request)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async with self._pool_lock:
[35m778:[0m [1253.2s] self._requests.append(status)
[35m778:[0m [1253.2s] await self._close_expired_connections()
[35m778:[0m [1253.2s] await self._attempt_to_acquire_connection(status)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] while True:
[35m778:[0m [1253.2s] timeouts = request.extensions.get("timeout", {})
[35m778:[0m [1253.2s] timeout = timeouts.get("pool", None)
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] connection = await status.wait_for_connection(timeout=timeout)
[35m778:[0m [1253.2s] except BaseException as exc:
[35m778:[0m [1253.2s] # If we timeout here, or if the task is cancelled, then make
[35m778:[0m [1253.2s] # sure to remove the request from the queue before bubbling
[35m778:[0m [1253.2s] # up the exception.
[35m778:[0m [1253.2s] async with self._pool_lock:
[35m778:[0m [1253.2s] # Ensure only remove when task exists.
[35m778:[0m [1253.2s] if status in self._requests:
[35m778:[0m [1253.2s] self._requests.remove(status)
[35m778:[0m [1253.2s] raise exc
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] > response = await connection.handle_async_request(request)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py:245:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <AsyncHTTPConnection ['http://127.0.0.1:36711', HTTP/1.1, CLOSED, Request Count: 79]>
[35m778:[0m [1253.2s] request = <Request [b'POST']>
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def handle_async_request(self, request: Request) -> Response:
[35m778:[0m [1253.2s] if not self.can_handle_request(request.url.origin):
[35m778:[0m [1253.2s] raise RuntimeError(
[35m778:[0m [1253.2s] f"Attempted to send request to {request.url.origin} on connection to {self._origin}"
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async with self._request_lock:
[35m778:[0m [1253.2s] if self._connection is None:
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] stream = await self._connect(request)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] ssl_object = stream.get_extra_info("ssl_object")
[35m778:[0m [1253.2s] http2_negotiated = (
[35m778:[0m [1253.2s] ssl_object is not None
[35m778:[0m [1253.2s] and ssl_object.selected_alpn_protocol() == "h2"
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s] if http2_negotiated or (self._http2 and not self._http1):
[35m778:[0m [1253.2s] from .http2 import AsyncHTTP2Connection
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self._connection = AsyncHTTP2Connection(
[35m778:[0m [1253.2s] origin=self._origin,
[35m778:[0m [1253.2s] stream=stream,
[35m778:[0m [1253.2s] keepalive_expiry=self._keepalive_expiry,
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s] else:
[35m778:[0m [1253.2s] self._connection = AsyncHTTP11Connection(
[35m778:[0m [1253.2s] origin=self._origin,
[35m778:[0m [1253.2s] stream=stream,
[35m778:[0m [1253.2s] keepalive_expiry=self._keepalive_expiry,
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s] except Exception as exc:
[35m778:[0m [1253.2s] self._connect_failed = True
[35m778:[0m [1253.2s] raise exc
[35m778:[0m [1253.2s] elif not self._connection.is_available():
[35m778:[0m [1253.2s] raise ConnectionNotAvailable()
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] > return await self._connection.handle_async_request(request)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/httpcore/_async/connection.py:96:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <AsyncHTTP11Connection ['http://127.0.0.1:36711', CLOSED, Request Count: 79]>
[35m778:[0m [1253.2s] request = <Request [b'POST']>
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def handle_async_request(self, request: Request) -> Response:
[35m778:[0m [1253.2s] if not self.can_handle_request(request.url.origin):
[35m778:[0m [1253.2s] raise RuntimeError(
[35m778:[0m [1253.2s] f"Attempted to send request to {request.url.origin} on connection "
[35m778:[0m [1253.2s] f"to {self._origin}"
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async with self._state_lock:
[35m778:[0m [1253.2s] if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
[35m778:[0m [1253.2s] self._request_count += 1
[35m778:[0m [1253.2s] self._state = HTTPConnectionState.ACTIVE
[35m778:[0m [1253.2s] self._expire_at = None
[35m778:[0m [1253.2s] else:
[35m778:[0m [1253.2s] raise ConnectionNotAvailable()
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] kwargs = {"request": request}
[35m778:[0m [1253.2s] async with Trace("send_request_headers", logger, request, kwargs) as trace:
[35m778:[0m [1253.2s] await self._send_request_headers(**kwargs)
[35m778:[0m [1253.2s] async with Trace("send_request_body", logger, request, kwargs) as trace:
[35m778:[0m [1253.2s] await self._send_request_body(**kwargs)
[35m778:[0m [1253.2s] async with Trace(
[35m778:[0m [1253.2s] "receive_response_headers", logger, request, kwargs
[35m778:[0m [1253.2s] ) as trace:
[35m778:[0m [1253.2s] (
[35m778:[0m [1253.2s] http_version,
[35m778:[0m [1253.2s] status,
[35m778:[0m [1253.2s] reason_phrase,
[35m778:[0m [1253.2s] headers,
[35m778:[0m [1253.2s] ) = await self._receive_response_headers(**kwargs)
[35m778:[0m [1253.2s] trace.return_value = (
[35m778:[0m [1253.2s] http_version,
[35m778:[0m [1253.2s] status,
[35m778:[0m [1253.2s] reason_phrase,
[35m778:[0m [1253.2s] headers,
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] return Response(
[35m778:[0m [1253.2s] status=status,
[35m778:[0m [1253.2s] headers=headers,
[35m778:[0m [1253.2s] content=HTTP11ConnectionByteStream(self, request),
[35m778:[0m [1253.2s] extensions={
[35m778:[0m [1253.2s] "http_version": http_version,
[35m778:[0m [1253.2s] "reason_phrase": reason_phrase,
[35m778:[0m [1253.2s] "network_stream": self._network_stream,
[35m778:[0m [1253.2s] },
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s] except BaseException as exc:
[35m778:[0m [1253.2s] with AsyncShieldCancellation():
[35m778:[0m [1253.2s] async with Trace("response_closed", logger, request) as trace:
[35m778:[0m [1253.2s] await self._response_closed()
[35m778:[0m [1253.2s] > raise exc
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py:121:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <AsyncHTTP11Connection ['http://127.0.0.1:36711', CLOSED, Request Count: 79]>
[35m778:[0m [1253.2s] request = <Request [b'POST']>
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def handle_async_request(self, request: Request) -> Response:
[35m778:[0m [1253.2s] if not self.can_handle_request(request.url.origin):
[35m778:[0m [1253.2s] raise RuntimeError(
[35m778:[0m [1253.2s] f"Attempted to send request to {request.url.origin} on connection "
[35m778:[0m [1253.2s] f"to {self._origin}"
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async with self._state_lock:
[35m778:[0m [1253.2s] if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
[35m778:[0m [1253.2s] self._request_count += 1
[35m778:[0m [1253.2s] self._state = HTTPConnectionState.ACTIVE
[35m778:[0m [1253.2s] self._expire_at = None
[35m778:[0m [1253.2s] else:
[35m778:[0m [1253.2s] raise ConnectionNotAvailable()
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] kwargs = {"request": request}
[35m778:[0m [1253.2s] async with Trace("send_request_headers", logger, request, kwargs) as trace:
[35m778:[0m [1253.2s] await self._send_request_headers(**kwargs)
[35m778:[0m [1253.2s] async with Trace("send_request_body", logger, request, kwargs) as trace:
[35m778:[0m [1253.2s] await self._send_request_body(**kwargs)
[35m778:[0m [1253.2s] async with Trace(
[35m778:[0m [1253.2s] "receive_response_headers", logger, request, kwargs
[35m778:[0m [1253.2s] ) as trace:
[35m778:[0m [1253.2s] (
[35m778:[0m [1253.2s] http_version,
[35m778:[0m [1253.2s] status,
[35m778:[0m [1253.2s] reason_phrase,
[35m778:[0m [1253.2s] headers,
[35m778:[0m [1253.2s] > ) = await self._receive_response_headers(**kwargs)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py:99:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <AsyncHTTP11Connection ['http://127.0.0.1:36711', CLOSED, Request Count: 79]>
[35m778:[0m [1253.2s] request = <Request [b'POST']>
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def _receive_response_headers(
[35m778:[0m [1253.2s] self, request: Request
[35m778:[0m [1253.2s] ) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]]]:
[35m778:[0m [1253.2s] timeouts = request.extensions.get("timeout", {})
[35m778:[0m [1253.2s] timeout = timeouts.get("read", None)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] while True:
[35m778:[0m [1253.2s] > event = await self._receive_event(timeout=timeout)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py:164:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <AsyncHTTP11Connection ['http://127.0.0.1:36711', CLOSED, Request Count: 79]>
[35m778:[0m [1253.2s] timeout = None
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def _receive_event(
[35m778:[0m [1253.2s] self, timeout: Optional[float] = None
[35m778:[0m [1253.2s] ) -> Union[h11.Event, Type[h11.PAUSED]]:
[35m778:[0m [1253.2s] while True:
[35m778:[0m [1253.2s] with map_exceptions({h11.RemoteProtocolError: RemoteProtocolError}):
[35m778:[0m [1253.2s] event = self._h11_state.next_event()
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] if event is h11.NEED_DATA:
[35m778:[0m [1253.2s] > data = await self._network_stream.read(
[35m778:[0m [1253.2s] self.READ_NUM_BYTES, timeout=timeout
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py:200:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <httpcore._backends.anyio.AnyIOStream object at 0x7f958496a470>
[35m778:[0m [1253.2s] max_bytes = 65536, timeout = None
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def read(
[35m778:[0m [1253.2s] self, max_bytes: int, timeout: typing.Optional[float] = None
[35m778:[0m [1253.2s] ) -> bytes:
[35m778:[0m [1253.2s] exc_map = {
[35m778:[0m [1253.2s] TimeoutError: ReadTimeout,
[35m778:[0m [1253.2s] anyio.BrokenResourceError: ReadError,
[35m778:[0m [1253.2s] anyio.ClosedResourceError: ReadError,
[35m778:[0m [1253.2s] }
[35m778:[0m [1253.2s] > with map_exceptions(exc_map):
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/httpcore/_backends/anyio.py:31:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <contextlib._GeneratorContextManager object at 0x7f9582ead990>
[35m778:[0m [1253.2s] typ = <class 'anyio.ClosedResourceError'>, value = ClosedResourceError()
[35m778:[0m [1253.2s] traceback = <traceback object at 0x7f9583a33640>
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] def __exit__(self, typ, value, traceback):
[35m778:[0m [1253.2s] if typ is None:
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] next(self.gen)
[35m778:[0m [1253.2s] except StopIteration:
[35m778:[0m [1253.2s] return False
[35m778:[0m [1253.2s] else:
[35m778:[0m [1253.2s] raise RuntimeError("generator didn't stop")
[35m778:[0m [1253.2s] else:
[35m778:[0m [1253.2s] if value is None:
[35m778:[0m [1253.2s] # Need to force instantiation so we can reliably
[35m778:[0m [1253.2s] # tell if we get the same exception back
[35m778:[0m [1253.2s] value = typ()
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] > self.gen.throw(typ, value, traceback)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/contextlib.py:153:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] map = {<class 'TimeoutError'>: <class 'httpcore.ReadTimeout'>, <class 'anyio.BrokenResourceError'>: <class 'httpcore.ReadError'>, <class 'anyio.ClosedResourceError'>: <class 'httpcore.ReadError'>}
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] @contextlib.contextmanager
[35m778:[0m [1253.2s] def map_exceptions(map: ExceptionMapping) -> Iterator[None]:
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] yield
[35m778:[0m [1253.2s] except Exception as exc: # noqa: PIE786
[35m778:[0m [1253.2s] for from_exc, to_exc in map.items():
[35m778:[0m [1253.2s] if isinstance(exc, from_exc):
[35m778:[0m [1253.2s] > raise to_exc(exc) from exc
[35m778:[0m [1253.2s] E httpcore.ReadError
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/httpcore/_exceptions.py:14: ReadError
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] The above exception was the direct cause of the following exception:
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = Context(session=<gql.client.AsyncClientSession object at 0x7f958496aa70>, schema=<gql.dsl.DSLSchema object at 0x7f9584...ainer', name='stdout', args={}, children={})]), converter=<cattrs.preconf.json.JsonConverter object at 0x7f9582d61590>)
[35m778:[0m [1253.2s] query = DocumentNode
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] @contextlib.contextmanager
[35m778:[0m [1253.2s] def _handle_execute(self, query: graphql.DocumentNode):
[35m778:[0m [1253.2s] # Reduces duplication when handling errors, between sync and async.
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] > yield
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/dagger/api/base.py:244:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = Context(session=<gql.client.AsyncClientSession object at 0x7f958496aa70>, schema=<gql.dsl.DSLSchema object at 0x7f9584...ainer', name='stdout', args={}, children={})]), converter=<cattrs.preconf.json.JsonConverter object at 0x7f9582d61590>)
[35m778:[0m [1253.2s] return_type = <class 'str'>
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def execute(self, return_type: type[T] | None = None) -> T | None:
[35m778:[0m [1253.2s] assert isinstance(self.session, AsyncClientSession)
[35m778:[0m [1253.2s] await self.resolve_ids()
[35m778:[0m [1253.2s] query = self.query()
[35m778:[0m [1253.2s] with self._handle_execute(query):
[35m778:[0m [1253.2s] > result = await self.session.execute(query)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/dagger/api/base.py:157:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <gql.client.AsyncClientSession object at 0x7f958496aa70>
[35m778:[0m [1253.2s] document = DocumentNode, variable_values = None, operation_name = None
[35m778:[0m [1253.2s] serialize_variables = None, parse_result = None, get_execution_result = False
[35m778:[0m [1253.2s] kwargs = {}
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def execute(
[35m778:[0m [1253.2s] self,
[35m778:[0m [1253.2s] document: DocumentNode,
[35m778:[0m [1253.2s] variable_values: Optional[Dict[str, Any]] = None,
[35m778:[0m [1253.2s] operation_name: Optional[str] = None,
[35m778:[0m [1253.2s] serialize_variables: Optional[bool] = None,
[35m778:[0m [1253.2s] parse_result: Optional[bool] = None,
[35m778:[0m [1253.2s] get_execution_result: bool = False,
[35m778:[0m [1253.2s] **kwargs,
[35m778:[0m [1253.2s] ) -> Union[Dict[str, Any], ExecutionResult]:
[35m778:[0m [1253.2s] """Coroutine to execute the provided document AST asynchronously using
[35m778:[0m [1253.2s] the async transport.
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] Raises a TransportQueryError if an error has been returned in
[35m778:[0m [1253.2s] the ExecutionResult.
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] :param document: GraphQL query as AST Node object.
[35m778:[0m [1253.2s] :param variable_values: Dictionary of input parameters.
[35m778:[0m [1253.2s] :param operation_name: Name of the operation that shall be executed.
[35m778:[0m [1253.2s] :param serialize_variables: whether the variable values should be
[35m778:[0m [1253.2s] serialized. Used for custom scalars and/or enums.
[35m778:[0m [1253.2s] By default use the serialize_variables argument of the client.
[35m778:[0m [1253.2s] :param parse_result: Whether gql will unserialize the result.
[35m778:[0m [1253.2s] By default use the parse_results argument of the client.
[35m778:[0m [1253.2s] :param get_execution_result: return the full ExecutionResult instance instead of
[35m778:[0m [1253.2s] only the "data" field. Necessary if you want to get the "extensions" field.
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] The extra arguments are passed to the transport execute method."""
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] # Validate and execute on the transport
[35m778:[0m [1253.2s] > result = await self._execute(
[35m778:[0m [1253.2s] document,
[35m778:[0m [1253.2s] variable_values=variable_values,
[35m778:[0m [1253.2s] operation_name=operation_name,
[35m778:[0m [1253.2s] serialize_variables=serialize_variables,
[35m778:[0m [1253.2s] parse_result=parse_result,
[35m778:[0m [1253.2s] **kwargs,
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/gql/client.py:1220:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <gql.client.AsyncClientSession object at 0x7f958496aa70>
[35m778:[0m [1253.2s] document = DocumentNode, variable_values = None, operation_name = None
[35m778:[0m [1253.2s] serialize_variables = None, parse_result = None, kwargs = {}
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def _execute(
[35m778:[0m [1253.2s] self,
[35m778:[0m [1253.2s] document: DocumentNode,
[35m778:[0m [1253.2s] variable_values: Optional[Dict[str, Any]] = None,
[35m778:[0m [1253.2s] operation_name: Optional[str] = None,
[35m778:[0m [1253.2s] serialize_variables: Optional[bool] = None,
[35m778:[0m [1253.2s] parse_result: Optional[bool] = None,
[35m778:[0m [1253.2s] **kwargs,
[35m778:[0m [1253.2s] ) -> ExecutionResult:
[35m778:[0m [1253.2s] """Coroutine to execute the provided document AST asynchronously using
[35m778:[0m [1253.2s] the async transport, returning an ExecutionResult object.
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] * Validate the query with the schema if provided.
[35m778:[0m [1253.2s] * Serialize the variable_values if requested.
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] :param document: GraphQL query as AST Node object.
[35m778:[0m [1253.2s] :param variable_values: Dictionary of input parameters.
[35m778:[0m [1253.2s] :param operation_name: Name of the operation that shall be executed.
[35m778:[0m [1253.2s] :param serialize_variables: whether the variable values should be
[35m778:[0m [1253.2s] serialized. Used for custom scalars and/or enums.
[35m778:[0m [1253.2s] By default use the serialize_variables argument of the client.
[35m778:[0m [1253.2s] :param parse_result: Whether gql will unserialize the result.
[35m778:[0m [1253.2s] By default use the parse_results argument of the client.
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] The extra arguments are passed to the transport execute method."""
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] # Validate document
[35m778:[0m [1253.2s] if self.client.schema:
[35m778:[0m [1253.2s] self.client.validate(document)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] # Parse variable values for custom scalars if requested
[35m778:[0m [1253.2s] if variable_values is not None:
[35m778:[0m [1253.2s] if serialize_variables or (
[35m778:[0m [1253.2s] serialize_variables is None and self.client.serialize_variables
[35m778:[0m [1253.2s] ):
[35m778:[0m [1253.2s] variable_values = serialize_variable_values(
[35m778:[0m [1253.2s] self.client.schema,
[35m778:[0m [1253.2s] document,
[35m778:[0m [1253.2s] variable_values,
[35m778:[0m [1253.2s] operation_name=operation_name,
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] # Execute the query with the transport with a timeout
[35m778:[0m [1253.2s] > result = await asyncio.wait_for(
[35m778:[0m [1253.2s] self.transport.execute(
[35m778:[0m [1253.2s] document,
[35m778:[0m [1253.2s] variable_values=variable_values,
[35m778:[0m [1253.2s] operation_name=operation_name,
[35m778:[0m [1253.2s] **kwargs,
[35m778:[0m [1253.2s] ),
[35m778:[0m [1253.2s] self.client.execute_timeout,
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/gql/client.py:1126:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] fut = <coroutine object HTTPXAsyncTransport.execute at 0x7f9583a4b300>
[35m778:[0m [1253.2s] timeout = None
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def wait_for(fut, timeout):
[35m778:[0m [1253.2s] """Wait for the single Future or coroutine to complete, with timeout.
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] Coroutine will be wrapped in Task.
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] Returns result of the Future or coroutine. When a timeout occurs,
[35m778:[0m [1253.2s] it cancels the task and raises TimeoutError. To avoid the task
[35m778:[0m [1253.2s] cancellation, wrap it in shield().
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] If the wait is cancelled, the task is also cancelled.
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] This function is a coroutine.
[35m778:[0m [1253.2s] """
[35m778:[0m [1253.2s] loop = events.get_running_loop()
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] if timeout is None:
[35m778:[0m [1253.2s] > return await fut
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/asyncio/tasks.py:408:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <dagger.transport.httpx.HTTPXAsyncTransport object at 0x7f958496b8b0>
[35m778:[0m [1253.2s] document = DocumentNode, variable_values = None, operation_name = None
[35m778:[0m [1253.2s] extra_args = None, upload_files = False
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def execute(
[35m778:[0m [1253.2s] self,
[35m778:[0m [1253.2s] document: DocumentNode,
[35m778:[0m [1253.2s] variable_values: Optional[Dict[str, Any]] = None,
[35m778:[0m [1253.2s] operation_name: Optional[str] = None,
[35m778:[0m [1253.2s] extra_args: Optional[Dict[str, Any]] = None,
[35m778:[0m [1253.2s] upload_files: bool = False,
[35m778:[0m [1253.2s] ) -> ExecutionResult:
[35m778:[0m [1253.2s] if not self.client:
[35m778:[0m [1253.2s] raise TransportClosed("Transport is not connected")
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] post_args = self._prepare_request(
[35m778:[0m [1253.2s] document,
[35m778:[0m [1253.2s] variable_values,
[35m778:[0m [1253.2s] operation_name,
[35m778:[0m [1253.2s] extra_args,
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] > response = await self.client.post(self.url, **post_args)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/dagger/transport/httpx.py:179:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <httpx.AsyncClient object at 0x7f958496a740>
[35m778:[0m [1253.2s] url = URL('http://127.0.0.1:36711/query')
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def post(
[35m778:[0m [1253.2s] self,
[35m778:[0m [1253.2s] url: URLTypes,
[35m778:[0m [1253.2s] *,
[35m778:[0m [1253.2s] content: typing.Optional[RequestContent] = None,
[35m778:[0m [1253.2s] data: typing.Optional[RequestData] = None,
[35m778:[0m [1253.2s] files: typing.Optional[RequestFiles] = None,
[35m778:[0m [1253.2s] json: typing.Optional[typing.Any] = None,
[35m778:[0m [1253.2s] params: typing.Optional[QueryParamTypes] = None,
[35m778:[0m [1253.2s] headers: typing.Optional[HeaderTypes] = None,
[35m778:[0m [1253.2s] cookies: typing.Optional[CookieTypes] = None,
[35m778:[0m [1253.2s] auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
[35m778:[0m [1253.2s] follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
[35m778:[0m [1253.2s] timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
[35m778:[0m [1253.2s] extensions: typing.Optional[RequestExtensions] = None,
[35m778:[0m [1253.2s] ) -> Response:
[35m778:[0m [1253.2s] """
[35m778:[0m [1253.2s] Send a `POST` request.
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] **Parameters**: See `httpx.request`.
[35m778:[0m [1253.2s] """
[35m778:[0m [1253.2s] > return await self.request(
[35m778:[0m [1253.2s] "POST",
[35m778:[0m [1253.2s] url,
[35m778:[0m [1253.2s] content=content,
[35m778:[0m [1253.2s] data=data,
[35m778:[0m [1253.2s] files=files,
[35m778:[0m [1253.2s] json=json,
[35m778:[0m [1253.2s] params=params,
[35m778:[0m [1253.2s] headers=headers,
[35m778:[0m [1253.2s] cookies=cookies,
[35m778:[0m [1253.2s] auth=auth,
[35m778:[0m [1253.2s] follow_redirects=follow_redirects,
[35m778:[0m [1253.2s] timeout=timeout,
[35m778:[0m [1253.2s] extensions=extensions,
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/httpx/_client.py:1848:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <httpx.AsyncClient object at 0x7f958496a740>, method = 'POST'
[35m778:[0m [1253.2s] url = URL('http://127.0.0.1:36711/query')
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def request(
[35m778:[0m [1253.2s] self,
[35m778:[0m [1253.2s] method: str,
[35m778:[0m [1253.2s] url: URLTypes,
[35m778:[0m [1253.2s] *,
[35m778:[0m [1253.2s] content: typing.Optional[RequestContent] = None,
[35m778:[0m [1253.2s] data: typing.Optional[RequestData] = None,
[35m778:[0m [1253.2s] files: typing.Optional[RequestFiles] = None,
[35m778:[0m [1253.2s] json: typing.Optional[typing.Any] = None,
[35m778:[0m [1253.2s] params: typing.Optional[QueryParamTypes] = None,
[35m778:[0m [1253.2s] headers: typing.Optional[HeaderTypes] = None,
[35m778:[0m [1253.2s] cookies: typing.Optional[CookieTypes] = None,
[35m778:[0m [1253.2s] auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
[35m778:[0m [1253.2s] follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
[35m778:[0m [1253.2s] timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
[35m778:[0m [1253.2s] extensions: typing.Optional[RequestExtensions] = None,
[35m778:[0m [1253.2s] ) -> Response:
[35m778:[0m [1253.2s] """
[35m778:[0m [1253.2s] Build and send a request.
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] Equivalent to:
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] ```python
[35m778:[0m [1253.2s] request = client.build_request(...)
[35m778:[0m [1253.2s] response = await client.send(request, ...)
[35m778:[0m [1253.2s] ```
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] See `AsyncClient.build_request()`, `AsyncClient.send()`
[35m778:[0m [1253.2s] and [Merging of configuration][0] for how the various parameters
[35m778:[0m [1253.2s] are merged with client-level configuration.
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] [0]: /advanced/#merging-of-configuration
[35m778:[0m [1253.2s] """
[35m778:[0m [1253.2s] request = self.build_request(
[35m778:[0m [1253.2s] method=method,
[35m778:[0m [1253.2s] url=url,
[35m778:[0m [1253.2s] content=content,
[35m778:[0m [1253.2s] data=data,
[35m778:[0m [1253.2s] files=files,
[35m778:[0m [1253.2s] json=json,
[35m778:[0m [1253.2s] params=params,
[35m778:[0m [1253.2s] headers=headers,
[35m778:[0m [1253.2s] cookies=cookies,
[35m778:[0m [1253.2s] timeout=timeout,
[35m778:[0m [1253.2s] extensions=extensions,
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s] > return await self.send(request, auth=auth, follow_redirects=follow_redirects)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/httpx/_client.py:1530:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <httpx.AsyncClient object at 0x7f958496a740>
[35m778:[0m [1253.2s] request = <Request('POST', 'http://127.0.0.1:36711/query')>
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def send(
[35m778:[0m [1253.2s] self,
[35m778:[0m [1253.2s] request: Request,
[35m778:[0m [1253.2s] *,
[35m778:[0m [1253.2s] stream: bool = False,
[35m778:[0m [1253.2s] auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
[35m778:[0m [1253.2s] follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
[35m778:[0m [1253.2s] ) -> Response:
[35m778:[0m [1253.2s] """
[35m778:[0m [1253.2s] Send a request.
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] The request is sent as-is, unmodified.
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] Typically you'll want to build one with `AsyncClient.build_request()`
[35m778:[0m [1253.2s] so that any client-level configuration is merged into the request,
[35m778:[0m [1253.2s] but passing an explicit `httpx.Request()` is supported as well.
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] See also: [Request instances][0]
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] [0]: /advanced/#request-instances
[35m778:[0m [1253.2s] """
[35m778:[0m [1253.2s] if self._state == ClientState.CLOSED:
[35m778:[0m [1253.2s] raise RuntimeError("Cannot send a request, as the client has been closed.")
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self._state = ClientState.OPENED
[35m778:[0m [1253.2s] follow_redirects = (
[35m778:[0m [1253.2s] self.follow_redirects
[35m778:[0m [1253.2s] if isinstance(follow_redirects, UseClientDefault)
[35m778:[0m [1253.2s] else follow_redirects
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] auth = self._build_request_auth(request, auth)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] > response = await self._send_handling_auth(
[35m778:[0m [1253.2s] request,
[35m778:[0m [1253.2s] auth=auth,
[35m778:[0m [1253.2s] follow_redirects=follow_redirects,
[35m778:[0m [1253.2s] history=[],
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/httpx/_client.py:1617:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <httpx.AsyncClient object at 0x7f958496a740>
[35m778:[0m [1253.2s] request = <Request('POST', 'http://127.0.0.1:36711/query')>
[35m778:[0m [1253.2s] auth = <httpx.BasicAuth object at 0x7f958496b730>, follow_redirects = False
[35m778:[0m [1253.2s] history = []
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def _send_handling_auth(
[35m778:[0m [1253.2s] self,
[35m778:[0m [1253.2s] request: Request,
[35m778:[0m [1253.2s] auth: Auth,
[35m778:[0m [1253.2s] follow_redirects: bool,
[35m778:[0m [1253.2s] history: typing.List[Response],
[35m778:[0m [1253.2s] ) -> Response:
[35m778:[0m [1253.2s] auth_flow = auth.async_auth_flow(request)
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] request = await auth_flow.__anext__()
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] while True:
[35m778:[0m [1253.2s] > response = await self._send_handling_redirects(
[35m778:[0m [1253.2s] request,
[35m778:[0m [1253.2s] follow_redirects=follow_redirects,
[35m778:[0m [1253.2s] history=history,
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/httpx/_client.py:1645:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <httpx.AsyncClient object at 0x7f958496a740>
[35m778:[0m [1253.2s] request = <Request('POST', 'http://127.0.0.1:36711/query')>
[35m778:[0m [1253.2s] follow_redirects = False, history = []
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def _send_handling_redirects(
[35m778:[0m [1253.2s] self,
[35m778:[0m [1253.2s] request: Request,
[35m778:[0m [1253.2s] follow_redirects: bool,
[35m778:[0m [1253.2s] history: typing.List[Response],
[35m778:[0m [1253.2s] ) -> Response:
[35m778:[0m [1253.2s] while True:
[35m778:[0m [1253.2s] if len(history) > self.max_redirects:
[35m778:[0m [1253.2s] raise TooManyRedirects(
[35m778:[0m [1253.2s] "Exceeded maximum allowed redirects.", request=request
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] for hook in self._event_hooks["request"]:
[35m778:[0m [1253.2s] await hook(request)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] > response = await self._send_single_request(request)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/httpx/_client.py:1682:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <httpx.AsyncClient object at 0x7f958496a740>
[35m778:[0m [1253.2s] request = <Request('POST', 'http://127.0.0.1:36711/query')>
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def _send_single_request(self, request: Request) -> Response:
[35m778:[0m [1253.2s] """
[35m778:[0m [1253.2s] Sends a single request, without handling any redirections.
[35m778:[0m [1253.2s] """
[35m778:[0m [1253.2s] transport = self._transport_for_url(request.url)
[35m778:[0m [1253.2s] timer = Timer()
[35m778:[0m [1253.2s] await timer.async_start()
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] if not isinstance(request.stream, AsyncByteStream):
[35m778:[0m [1253.2s] raise RuntimeError(
[35m778:[0m [1253.2s] "Attempted to send an sync request with an AsyncClient instance."
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] with request_context(request=request):
[35m778:[0m [1253.2s] > response = await transport.handle_async_request(request)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/httpx/_client.py:1719:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <httpx.AsyncHTTPTransport object at 0x7f958496ba00>
[35m778:[0m [1253.2s] request = <Request('POST', 'http://127.0.0.1:36711/query')>
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def handle_async_request(
[35m778:[0m [1253.2s] self,
[35m778:[0m [1253.2s] request: Request,
[35m778:[0m [1253.2s] ) -> Response:
[35m778:[0m [1253.2s] assert isinstance(request.stream, AsyncByteStream)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] req = httpcore.Request(
[35m778:[0m [1253.2s] method=request.method,
[35m778:[0m [1253.2s] url=httpcore.URL(
[35m778:[0m [1253.2s] scheme=request.url.raw_scheme,
[35m778:[0m [1253.2s] host=request.url.raw_host,
[35m778:[0m [1253.2s] port=request.url.port,
[35m778:[0m [1253.2s] target=request.url.raw_path,
[35m778:[0m [1253.2s] ),
[35m778:[0m [1253.2s] headers=request.headers.raw,
[35m778:[0m [1253.2s] content=request.stream,
[35m778:[0m [1253.2s] extensions=request.extensions,
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s] > with map_httpcore_exceptions():
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/httpx/_transports/default.py:352:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <contextlib._GeneratorContextManager object at 0x7f9582d374f0>
[35m778:[0m [1253.2s] typ = <class 'httpcore.ReadError'>, value = ReadError(ClosedResourceError())
[35m778:[0m [1253.2s] traceback = <traceback object at 0x7f9583a30800>
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] def __exit__(self, typ, value, traceback):
[35m778:[0m [1253.2s] if typ is None:
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] next(self.gen)
[35m778:[0m [1253.2s] except StopIteration:
[35m778:[0m [1253.2s] return False
[35m778:[0m [1253.2s] else:
[35m778:[0m [1253.2s] raise RuntimeError("generator didn't stop")
[35m778:[0m [1253.2s] else:
[35m778:[0m [1253.2s] if value is None:
[35m778:[0m [1253.2s] # Need to force instantiation so we can reliably
[35m778:[0m [1253.2s] # tell if we get the same exception back
[35m778:[0m [1253.2s] value = typ()
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] > self.gen.throw(typ, value, traceback)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/contextlib.py:153:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] @contextlib.contextmanager
[35m778:[0m [1253.2s] def map_httpcore_exceptions() -> typing.Iterator[None]:
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] yield
[35m778:[0m [1253.2s] except Exception as exc: # noqa: PIE-786
[35m778:[0m [1253.2s] mapped_exc = None
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] for from_exc, to_exc in HTTPCORE_EXC_MAP.items():
[35m778:[0m [1253.2s] if not isinstance(exc, from_exc):
[35m778:[0m [1253.2s] continue
[35m778:[0m [1253.2s] # We want to map to the most specific exception we can find.
[35m778:[0m [1253.2s] # Eg if `exc` is an `httpcore.ReadTimeout`, we want to map to
[35m778:[0m [1253.2s] # `httpx.ReadTimeout`, not just `httpx.TimeoutException`.
[35m778:[0m [1253.2s] if mapped_exc is None or issubclass(to_exc, mapped_exc):
[35m778:[0m [1253.2s] mapped_exc = to_exc
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] if mapped_exc is None: # pragma: no cover
[35m778:[0m [1253.2s] raise
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] message = str(exc)
[35m778:[0m [1253.2s] > raise mapped_exc(message) from exc
[35m778:[0m [1253.2s] E httpx.ReadError
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/httpx/_transports/default.py:77: ReadError
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] The above exception was the direct cause of the following exception:
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] anyio_backend = 'asyncio', args = (), kwargs = {'anyio_backend': 'asyncio'}
[35m778:[0m [1253.2s] backend_name = 'asyncio', backend_options = {}
[35m778:[0m [1253.2s] runner = <anyio._backends._asyncio.TestRunner object at 0x7f958713aa70>
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] def wrapper(*args, anyio_backend, **kwargs): # type: ignore[no-untyped-def]
[35m778:[0m [1253.2s] backend_name, backend_options = extract_backend_and_options(anyio_backend)
[35m778:[0m [1253.2s] if has_backend_arg:
[35m778:[0m [1253.2s] kwargs["anyio_backend"] = anyio_backend
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] with get_runner(backend_name, backend_options) as runner:
[35m778:[0m [1253.2s] if isasyncgenfunction(func):
[35m778:[0m [1253.2s] > yield from runner.run_asyncgen_fixture(func, kwargs)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/anyio/pytest_plugin.py:68:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:2098: in run_asyncgen_fixture
[35m778:[0m [1253.2s] self._raise_async_exceptions()
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:2054: in _raise_async_exceptions
[35m778:[0m [1253.2s] raise exceptions[0]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/connector_acceptance_test/tests/test_core.py:968: in test_read
[35m778:[0m [1253.2s] output = await docker_runner.call_read(connector_config, configured_catalog)
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/connector_acceptance_test/utils/connector_runner.py:47: in call_read
[35m778:[0m [1253.2s] return await self._run(
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/connector_acceptance_test/utils/connector_runner.py:104: in _run
[35m778:[0m [1253.2s] output = await container.stdout()
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/dagger/api/base.py:486: in async_wrapper
[35m778:[0m [1253.2s] return await bear(*args, **kwargs)
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/dagger/api/gen.py:878: in stdout
[35m778:[0m [1253.2s] return await _ctx.execute(str)
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/dagger/api/base.py:156: in execute
[35m778:[0m [1253.2s] with self._handle_execute(query):
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/contextlib.py:153: in __exit__
[35m778:[0m [1253.2s] self.gen.throw(typ, value, traceback)
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = Context(session=<gql.client.AsyncClientSession object at 0x7f958496aa70>, schema=<gql.dsl.DSLSchema object at 0x7f9584...ainer', name='stdout', args={}, children={})]), converter=<cattrs.preconf.json.JsonConverter object at 0x7f9582d61590>)
[35m778:[0m [1253.2s] query = DocumentNode
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] @contextlib.contextmanager
[35m778:[0m [1253.2s] def _handle_execute(self, query: graphql.DocumentNode):
[35m778:[0m [1253.2s] # Reduces duplication when handling errors, between sync and async.
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] yield
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] except httpx.TimeoutException as e:
[35m778:[0m [1253.2s] msg = (
[35m778:[0m [1253.2s] "Request timed out. Try setting a higher value in 'execute_timeout' "
[35m778:[0m [1253.2s] "config for this `dagger.Connection()`."
[35m778:[0m [1253.2s] )
[35m778:[0m [1253.2s] raise ExecuteTimeoutError(msg) from e
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] except httpx.RequestError as e:
[35m778:[0m [1253.2s] msg = f"Failed to make request: {e}"
[35m778:[0m [1253.2s] > raise TransportError(msg) from e
[35m778:[0m [1253.2s] E dagger.exceptions.TransportError: Failed to make request:
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/dagger/api/base.py:255: TransportError
[35m778:[0m [1253.2s] =================================== FAILURES ===================================
[35m778:[0m [1253.2s] _______________________ TestBasicRead.test_read[inputs0] _______________________
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] pyfuncitem = <Function test_read[inputs0]>
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] @pytest.hookimpl(tryfirst=True)
[35m778:[0m [1253.2s] def pytest_pyfunc_call(pyfuncitem: Any) -> bool | None:
[35m778:[0m [1253.2s] def run_with_hypothesis(**kwargs: Any) -> None:
[35m778:[0m [1253.2s] with get_runner(backend_name, backend_options) as runner:
[35m778:[0m [1253.2s] runner.run_test(original_func, kwargs)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] backend = pyfuncitem.funcargs.get("anyio_backend")
[35m778:[0m [1253.2s] if backend:
[35m778:[0m [1253.2s] backend_name, backend_options = extract_backend_and_options(backend)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] if hasattr(pyfuncitem.obj, "hypothesis"):
[35m778:[0m [1253.2s] # Wrap the inner test function unless it's already wrapped
[35m778:[0m [1253.2s] original_func = pyfuncitem.obj.hypothesis.inner_test
[35m778:[0m [1253.2s] if original_func.__qualname__ != run_with_hypothesis.__qualname__:
[35m778:[0m [1253.2s] if iscoroutinefunction(original_func):
[35m778:[0m [1253.2s] pyfuncitem.obj.hypothesis.inner_test = run_with_hypothesis
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] return None
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] if iscoroutinefunction(pyfuncitem.obj):
[35m778:[0m [1253.2s] funcargs = pyfuncitem.funcargs
[35m778:[0m [1253.2s] testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames}
[35m778:[0m [1253.2s] with get_runner(backend_name, backend_options) as runner:
[35m778:[0m [1253.2s] > runner.run_test(pyfuncitem.obj, testargs)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/anyio/pytest_plugin.py:117:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:2113: in run_test
[35m778:[0m [1253.2s] self._loop.run_until_complete(test_func(**kwargs))
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/asyncio/base_events.py:636: in run_until_complete
[35m778:[0m [1253.2s] self.run_forever()
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/asyncio/base_events.py:603: in run_forever
[35m778:[0m [1253.2s] self._run_once()
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/asyncio/base_events.py:1871: in _run_once
[35m778:[0m [1253.2s] event_list = self._selector.select(timeout)
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <selectors.EpollSelector object at 0x7f95841e07f0>, timeout = -1
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] def select(self, timeout=None):
[35m778:[0m [1253.2s] if timeout is None:
[35m778:[0m [1253.2s] timeout = -1
[35m778:[0m [1253.2s] elif timeout <= 0:
[35m778:[0m [1253.2s] timeout = 0
[35m778:[0m [1253.2s] else:
[35m778:[0m [1253.2s] # epoll_wait() has a resolution of 1 millisecond, round away
[35m778:[0m [1253.2s] # from zero to wait *at least* timeout seconds.
[35m778:[0m [1253.2s] timeout = math.ceil(timeout * 1e3) * 1e-3
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] # epoll_wait() expects `maxevents` to be greater than zero;
[35m778:[0m [1253.2s] # we want to make sure that `select()` can be called when no
[35m778:[0m [1253.2s] # FD is registered.
[35m778:[0m [1253.2s] max_ev = max(len(self._fd_to_key), 1)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] ready = []
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] > fd_event_list = self._selector.poll(timeout, max_ev)
[35m778:[0m [1253.2s] E Failed: Timeout >300.0s
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/selectors.py:469: Failed
[35m778:[0m [1253.2s] ----------------------------- Captured stderr call -----------------------------
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] +++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] ~~~~~~~~~~~~~~~~ Stack of AnyIO worker thread (140280134059712) ~~~~~~~~~~~~~~~~
[35m778:[0m [1253.2s] File "/usr/local/lib/python3.10/threading.py", line 973, in _bootstrap
[35m778:[0m [1253.2s] self._bootstrap_inner()
[35m778:[0m [1253.2s] File "/usr/local/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
[35m778:[0m [1253.2s] self.run()
[35m778:[0m [1253.2s] File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 797, in run
[35m778:[0m [1253.2s] item = self.queue.get()
[35m778:[0m [1253.2s] File "/usr/local/lib/python3.10/queue.py", line 171, in get
[35m778:[0m [1253.2s] self.not_empty.wait()
[35m778:[0m [1253.2s] File "/usr/local/lib/python3.10/threading.py", line 320, in wait
[35m778:[0m [1253.2s] waiter.acquire()
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] +++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++
[35m778:[0m [1253.2s] _______________________ TestBasicRead.test_read[inputs1] _______________________
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] pyfuncitem = <Function test_read[inputs1]>
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] @pytest.hookimpl(tryfirst=True)
[35m778:[0m [1253.2s] def pytest_pyfunc_call(pyfuncitem: Any) -> bool | None:
[35m778:[0m [1253.2s] def run_with_hypothesis(**kwargs: Any) -> None:
[35m778:[0m [1253.2s] with get_runner(backend_name, backend_options) as runner:
[35m778:[0m [1253.2s] runner.run_test(original_func, kwargs)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] backend = pyfuncitem.funcargs.get("anyio_backend")
[35m778:[0m [1253.2s] if backend:
[35m778:[0m [1253.2s] backend_name, backend_options = extract_backend_and_options(backend)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] if hasattr(pyfuncitem.obj, "hypothesis"):
[35m778:[0m [1253.2s] # Wrap the inner test function unless it's already wrapped
[35m778:[0m [1253.2s] original_func = pyfuncitem.obj.hypothesis.inner_test
[35m778:[0m [1253.2s] if original_func.__qualname__ != run_with_hypothesis.__qualname__:
[35m778:[0m [1253.2s] if iscoroutinefunction(original_func):
[35m778:[0m [1253.2s] pyfuncitem.obj.hypothesis.inner_test = run_with_hypothesis
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] return None
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] if iscoroutinefunction(pyfuncitem.obj):
[35m778:[0m [1253.2s] funcargs = pyfuncitem.funcargs
[35m778:[0m [1253.2s] testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames}
[35m778:[0m [1253.2s] with get_runner(backend_name, backend_options) as runner:
[35m778:[0m [1253.2s] > runner.run_test(pyfuncitem.obj, testargs)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/anyio/pytest_plugin.py:117:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:2113: in run_test
[35m778:[0m [1253.2s] self._loop.run_until_complete(test_func(**kwargs))
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/asyncio/base_events.py:636: in run_until_complete
[35m778:[0m [1253.2s] self.run_forever()
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/asyncio/base_events.py:603: in run_forever
[35m778:[0m [1253.2s] self._run_once()
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/asyncio/base_events.py:1871: in _run_once
[35m778:[0m [1253.2s] event_list = self._selector.select(timeout)
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <selectors.EpollSelector object at 0x7f95841e07f0>, timeout = -1
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] def select(self, timeout=None):
[35m778:[0m [1253.2s] if timeout is None:
[35m778:[0m [1253.2s] timeout = -1
[35m778:[0m [1253.2s] elif timeout <= 0:
[35m778:[0m [1253.2s] timeout = 0
[35m778:[0m [1253.2s] else:
[35m778:[0m [1253.2s] # epoll_wait() has a resolution of 1 millisecond, round away
[35m778:[0m [1253.2s] # from zero to wait *at least* timeout seconds.
[35m778:[0m [1253.2s] timeout = math.ceil(timeout * 1e3) * 1e-3
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] # epoll_wait() expects `maxevents` to be greater than zero;
[35m778:[0m [1253.2s] # we want to make sure that `select()` can be called when no
[35m778:[0m [1253.2s] # FD is registered.
[35m778:[0m [1253.2s] max_ev = max(len(self._fd_to_key), 1)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] ready = []
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] > fd_event_list = self._selector.poll(timeout, max_ev)
[35m778:[0m [1253.2s] E Failed: Timeout >300.0s
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/selectors.py:469: Failed
[35m778:[0m [1253.2s] ----------------------------- Captured stderr call -----------------------------
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] +++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] ~~~~~~~~~~~~~~~~ Stack of AnyIO worker thread (140280134059712) ~~~~~~~~~~~~~~~~
[35m778:[0m [1253.2s] File "/usr/local/lib/python3.10/threading.py", line 973, in _bootstrap
[35m778:[0m [1253.2s] self._bootstrap_inner()
[35m778:[0m [1253.2s] File "/usr/local/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
[35m778:[0m [1253.2s] self.run()
[35m778:[0m [1253.2s] File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 797, in run
[35m778:[0m [1253.2s] item = self.queue.get()
[35m778:[0m [1253.2s] File "/usr/local/lib/python3.10/queue.py", line 171, in get
[35m778:[0m [1253.2s] self.not_empty.wait()
[35m778:[0m [1253.2s] File "/usr/local/lib/python3.10/threading.py", line 320, in wait
[35m778:[0m [1253.2s] waiter.acquire()
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] +++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++
[35m778:[0m [1253.2s] _________ TestBasicRead.test_airbyte_trace_message_on_failure[inputs0] _________
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] pyfuncitem = <Function test_airbyte_trace_message_on_failure[inputs0]>
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] @pytest.hookimpl(tryfirst=True)
[35m778:[0m [1253.2s] def pytest_pyfunc_call(pyfuncitem: Any) -> bool | None:
[35m778:[0m [1253.2s] def run_with_hypothesis(**kwargs: Any) -> None:
[35m778:[0m [1253.2s] with get_runner(backend_name, backend_options) as runner:
[35m778:[0m [1253.2s] runner.run_test(original_func, kwargs)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] backend = pyfuncitem.funcargs.get("anyio_backend")
[35m778:[0m [1253.2s] if backend:
[35m778:[0m [1253.2s] backend_name, backend_options = extract_backend_and_options(backend)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] if hasattr(pyfuncitem.obj, "hypothesis"):
[35m778:[0m [1253.2s] # Wrap the inner test function unless it's already wrapped
[35m778:[0m [1253.2s] original_func = pyfuncitem.obj.hypothesis.inner_test
[35m778:[0m [1253.2s] if original_func.__qualname__ != run_with_hypothesis.__qualname__:
[35m778:[0m [1253.2s] if iscoroutinefunction(original_func):
[35m778:[0m [1253.2s] pyfuncitem.obj.hypothesis.inner_test = run_with_hypothesis
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] return None
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] if iscoroutinefunction(pyfuncitem.obj):
[35m778:[0m [1253.2s] funcargs = pyfuncitem.funcargs
[35m778:[0m [1253.2s] testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames}
[35m778:[0m [1253.2s] with get_runner(backend_name, backend_options) as runner:
[35m778:[0m [1253.2s] > runner.run_test(pyfuncitem.obj, testargs)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/anyio/pytest_plugin.py:117:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:2113: in run_test
[35m778:[0m [1253.2s] self._loop.run_until_complete(test_func(**kwargs))
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/asyncio/base_events.py:636: in run_until_complete
[35m778:[0m [1253.2s] self.run_forever()
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/asyncio/base_events.py:603: in run_forever
[35m778:[0m [1253.2s] self._run_once()
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/asyncio/base_events.py:1871: in _run_once
[35m778:[0m [1253.2s] event_list = self._selector.select(timeout)
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <selectors.EpollSelector object at 0x7f95841e07f0>, timeout = -1
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] def select(self, timeout=None):
[35m778:[0m [1253.2s] if timeout is None:
[35m778:[0m [1253.2s] timeout = -1
[35m778:[0m [1253.2s] elif timeout <= 0:
[35m778:[0m [1253.2s] timeout = 0
[35m778:[0m [1253.2s] else:
[35m778:[0m [1253.2s] # epoll_wait() has a resolution of 1 millisecond, round away
[35m778:[0m [1253.2s] # from zero to wait *at least* timeout seconds.
[35m778:[0m [1253.2s] timeout = math.ceil(timeout * 1e3) * 1e-3
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] # epoll_wait() expects `maxevents` to be greater than zero;
[35m778:[0m [1253.2s] # we want to make sure that `select()` can be called when no
[35m778:[0m [1253.2s] # FD is registered.
[35m778:[0m [1253.2s] max_ev = max(len(self._fd_to_key), 1)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] ready = []
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] > fd_event_list = self._selector.poll(timeout, max_ev)
[35m778:[0m [1253.2s] E Failed: Timeout >300.0s
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/selectors.py:469: Failed
[35m778:[0m [1253.2s] ----------------------------- Captured stderr call -----------------------------
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] +++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] ~~~~~~~~~~~~~~~~ Stack of AnyIO worker thread (140280134059712) ~~~~~~~~~~~~~~~~
[35m778:[0m [1253.2s] File "/usr/local/lib/python3.10/threading.py", line 973, in _bootstrap
[35m778:[0m [1253.2s] self._bootstrap_inner()
[35m778:[0m [1253.2s] File "/usr/local/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
[35m778:[0m [1253.2s] self.run()
[35m778:[0m [1253.2s] File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 797, in run
[35m778:[0m [1253.2s] item = self.queue.get()
[35m778:[0m [1253.2s] File "/usr/local/lib/python3.10/queue.py", line 171, in get
[35m778:[0m [1253.2s] self.not_empty.wait()
[35m778:[0m [1253.2s] File "/usr/local/lib/python3.10/threading.py", line 320, in wait
[35m778:[0m [1253.2s] waiter.acquire()
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] +++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++
[35m778:[0m [1253.2s] _________ TestBasicRead.test_airbyte_trace_message_on_failure[inputs1] _________
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] pyfuncitem = <Function test_airbyte_trace_message_on_failure[inputs1]>
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] @pytest.hookimpl(tryfirst=True)
[35m778:[0m [1253.2s] def pytest_pyfunc_call(pyfuncitem: Any) -> bool | None:
[35m778:[0m [1253.2s] def run_with_hypothesis(**kwargs: Any) -> None:
[35m778:[0m [1253.2s] with get_runner(backend_name, backend_options) as runner:
[35m778:[0m [1253.2s] runner.run_test(original_func, kwargs)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] backend = pyfuncitem.funcargs.get("anyio_backend")
[35m778:[0m [1253.2s] if backend:
[35m778:[0m [1253.2s] backend_name, backend_options = extract_backend_and_options(backend)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] if hasattr(pyfuncitem.obj, "hypothesis"):
[35m778:[0m [1253.2s] # Wrap the inner test function unless it's already wrapped
[35m778:[0m [1253.2s] original_func = pyfuncitem.obj.hypothesis.inner_test
[35m778:[0m [1253.2s] if original_func.__qualname__ != run_with_hypothesis.__qualname__:
[35m778:[0m [1253.2s] if iscoroutinefunction(original_func):
[35m778:[0m [1253.2s] pyfuncitem.obj.hypothesis.inner_test = run_with_hypothesis
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] return None
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] if iscoroutinefunction(pyfuncitem.obj):
[35m778:[0m [1253.2s] funcargs = pyfuncitem.funcargs
[35m778:[0m [1253.2s] testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames}
[35m778:[0m [1253.2s] with get_runner(backend_name, backend_options) as runner:
[35m778:[0m [1253.2s] > runner.run_test(pyfuncitem.obj, testargs)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/anyio/pytest_plugin.py:117:
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:2113: in run_test
[35m778:[0m [1253.2s] self._loop.run_until_complete(test_func(**kwargs))
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/asyncio/base_events.py:636: in run_until_complete
[35m778:[0m [1253.2s] self.run_forever()
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/asyncio/base_events.py:603: in run_forever
[35m778:[0m [1253.2s] self._run_once()
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/asyncio/base_events.py:1871: in _run_once
[35m778:[0m [1253.2s] event_list = self._selector.select(timeout)
[35m778:[0m [1253.2s] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <selectors.EpollSelector object at 0x7f95841e07f0>, timeout = -1
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] def select(self, timeout=None):
[35m778:[0m [1253.2s] if timeout is None:
[35m778:[0m [1253.2s] timeout = -1
[35m778:[0m [1253.2s] elif timeout <= 0:
[35m778:[0m [1253.2s] timeout = 0
[35m778:[0m [1253.2s] else:
[35m778:[0m [1253.2s] # epoll_wait() has a resolution of 1 millisecond, round away
[35m778:[0m [1253.2s] # from zero to wait *at least* timeout seconds.
[35m778:[0m [1253.2s] timeout = math.ceil(timeout * 1e3) * 1e-3
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] # epoll_wait() expects `maxevents` to be greater than zero;
[35m778:[0m [1253.2s] # we want to make sure that `select()` can be called when no
[35m778:[0m [1253.2s] # FD is registered.
[35m778:[0m [1253.2s] max_ev = max(len(self._fd_to_key), 1)
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] ready = []
[35m778:[0m [1253.2s] try:
[35m778:[0m [1253.2s] > fd_event_list = self._selector.poll(timeout, max_ev)
[35m778:[0m [1253.2s] E Failed: Timeout >300.0s
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/selectors.py:469: Failed
[35m778:[0m [1253.2s] ----------------------------- Captured stderr call -----------------------------
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] +++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] ~~~~~~~~~~~~~~~~ Stack of AnyIO worker thread (140280134059712) ~~~~~~~~~~~~~~~~
[35m778:[0m [1253.2s] File "/usr/local/lib/python3.10/threading.py", line 973, in _bootstrap
[35m778:[0m [1253.2s] self._bootstrap_inner()
[35m778:[0m [1253.2s] File "/usr/local/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
[35m778:[0m [1253.2s] self.run()
[35m778:[0m [1253.2s] File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 797, in run
[35m778:[0m [1253.2s] item = self.queue.get()
[35m778:[0m [1253.2s] File "/usr/local/lib/python3.10/queue.py", line 171, in get
[35m778:[0m [1253.2s] self.not_empty.wait()
[35m778:[0m [1253.2s] File "/usr/local/lib/python3.10/threading.py", line 320, in wait
[35m778:[0m [1253.2s] waiter.acquire()
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] +++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++
[35m778:[0m [1253.2s] ________________ TestFullRefresh.test_sequential_reads[inputs0] ________________
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <connector_acceptance_test.tests.test_full_refresh.TestFullRefresh object at 0x7f9582b25c30>
[35m778:[0m [1253.2s] connector_config = SecretDict(******)
[35m778:[0m [1253.2s] configured_catalog = ConfiguredAirbyteCatalog(streams=[ConfiguredAirbyteStream(stream=AirbyteStream(name='id_and_name', json_schema={'type'...'full_refresh'>, cursor_field=[], destination_sync_mode=<DestinationSyncMode.append: 'append'>, primary_key=[['id']])])
[35m778:[0m [1253.2s] ignored_fields = {}
[35m778:[0m [1253.2s] docker_runner = <connector_acceptance_test.utils.connector_runner.ConnectorRunner object at 0x7f9582b25ff0>
[35m778:[0m [1253.2s] detailed_logger = <Logger detailed_logger /test_input/acceptance_tests_logs/test_full_refresh.py__TestFullRefresh__test_sequential_reads[inputs0].txt (DEBUG)>
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def test_sequential_reads(
[35m778:[0m [1253.2s] self,
[35m778:[0m [1253.2s] connector_config: SecretDict,
[35m778:[0m [1253.2s] configured_catalog: ConfiguredAirbyteCatalog,
[35m778:[0m [1253.2s] ignored_fields: Optional[Mapping[str, List[IgnoredFieldsConfiguration]]],
[35m778:[0m [1253.2s] docker_runner: ConnectorRunner,
[35m778:[0m [1253.2s] detailed_logger: Logger,
[35m778:[0m [1253.2s] ):
[35m778:[0m [1253.2s] > configured_catalog = await full_refresh_only_catalog(configured_catalog)
[35m778:[0m [1253.2s] E TypeError: object ConfiguredAirbyteCatalog can't be used in 'await' expression
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/connector_acceptance_test/tests/test_full_refresh.py:95: TypeError
[35m778:[0m [1253.2s] ________________ TestFullRefresh.test_sequential_reads[inputs1] ________________
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] self = <connector_acceptance_test.tests.test_full_refresh.TestFullRefresh object at 0x7f9583a19600>
[35m778:[0m [1253.2s] connector_config = SecretDict(******)
[35m778:[0m [1253.2s] configured_catalog = ConfiguredAirbyteCatalog(streams=[ConfiguredAirbyteStream(stream=AirbyteStream(name='id_and_name', json_schema={'type'...'>, cursor_field=['_ab_cdc_lsn'], destination_sync_mode=<DestinationSyncMode.append: 'append'>, primary_key=[['id']])])
[35m778:[0m [1253.2s] ignored_fields = {}
[35m778:[0m [1253.2s] docker_runner = <connector_acceptance_test.utils.connector_runner.ConnectorRunner object at 0x7f9583a18d60>
[35m778:[0m [1253.2s] detailed_logger = <Logger detailed_logger /test_input/acceptance_tests_logs/test_full_refresh.py__TestFullRefresh__test_sequential_reads[inputs1].txt (DEBUG)>
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] async def test_sequential_reads(
[35m778:[0m [1253.2s] self,
[35m778:[0m [1253.2s] connector_config: SecretDict,
[35m778:[0m [1253.2s] configured_catalog: ConfiguredAirbyteCatalog,
[35m778:[0m [1253.2s] ignored_fields: Optional[Mapping[str, List[IgnoredFieldsConfiguration]]],
[35m778:[0m [1253.2s] docker_runner: ConnectorRunner,
[35m778:[0m [1253.2s] detailed_logger: Logger,
[35m778:[0m [1253.2s] ):
[35m778:[0m [1253.2s] > configured_catalog = await full_refresh_only_catalog(configured_catalog)
[35m778:[0m [1253.2s] E TypeError: object ConfiguredAirbyteCatalog can't be used in 'await' expression
[35m778:[0m [1253.2s]
[35m778:[0m [1253.2s] /usr/local/lib/python3.10/site-packages/connector_acceptance_test/tests/test_full_refresh.py:95: TypeError
[35m778:[0m [1253.3s] =============================== warnings summary ===============================
[35m778:[0m [1253.3s] ../usr/local/lib/python3.10/site-packages/hypothesis_jsonschema/_canonicalise.py:116
[35m778:[0m [1253.3s] /usr/local/lib/python3.10/site-packages/hypothesis_jsonschema/_canonicalise.py:116: DeprecationWarning: jsonschema.exceptions.RefResolutionError is deprecated as of version 4.18.0. If you wish to catch potential reference resolution errors, directly catch referencing.exceptions.Unresolvable.
[35m778:[0m [1253.3s] class HypothesisRefResolutionError(jsonschema.exceptions.RefResolutionError):
[35m778:[0m [1253.3s]
[35m778:[0m [1253.3s] ../usr/local/lib/python3.10/site-packages/hypothesis_jsonschema/_resolve.py:31
[35m778:[0m [1253.3s] /usr/local/lib/python3.10/site-packages/hypothesis_jsonschema/_resolve.py:31: DeprecationWarning: jsonschema.RefResolver is deprecated as of v4.18.0, in favor of the https://github.com/python-jsonschema/referencing library, which provides more compliant referencing behavior as well as more flexible APIs for customization. A future release will remove RefResolver. Please file a feature request (on referencing) if you are missing an API for the kind of customization you need.
[35m778:[0m [1253.3s] class LocalResolver(jsonschema.RefResolver):
[35m778:[0m [1253.3s]
[35m778:[0m [1253.3s] -- Docs: https://docs.pytest.org/en/stable/warnings.html
[35m778:[0m [1253.3s] =========================== short test summary info ============================
[35m778:[0m [1253.3s] FAILED test_core.py::TestBasicRead::test_read[inputs0] - Failed: Timeout >300.0s
[35m778:[0m [1253.3s] FAILED test_core.py::TestBasicRead::test_read[inputs1] - Failed: Timeout >300.0s
[35m778:[0m [1253.3s] FAILED test_core.py::TestBasicRead::test_airbyte_trace_message_on_failure[inputs0]
[35m778:[0m [1253.3s] FAILED test_core.py::TestBasicRead::test_airbyte_trace_message_on_failure[inputs1]
[35m778:[0m [1253.3s] FAILED test_full_refresh.py::TestFullRefresh::test_sequential_reads[inputs0]
[35m778:[0m [1253.3s] FAILED test_full_refresh.py::TestFullRefresh::test_sequential_reads[inputs1]
[35m778:[0m [1253.3s] ERROR test_full_refresh.py::TestFullRefresh::test_sequential_reads[inputs1]
[35m778:[0m [1253.3s] == 6 failed, 64 passed, 7 skipped, 2 warnings, 1 error in 1252.06s (0:20:52) ===
[35m778:[0m exec python -m pytest -p connector_acceptance_test.plugin --suppress-tests-failed-exit-code --acceptance-test-config /test_input -p integration_tests.acceptance [32mDONE[0m
Running locally, the source-openweather
test run correctly.
( The TypeError: object ConfiguredAirbyteCatalog can't be used in 'await' expression
was a easily fixed)
I provided the Dagger team with access to source-openweather
and source-postgres
secrets for repro.
Currently testing with nightly builds... PR review will be big... so... thanks :D
Moving this back to the sprint backlog as I want to focus on other task force issue and https://github.com/airbytehq/airbyte/pull/28656 fixed a lot of transient CAT failure.
Context
Our Connector Acceptance Tests (CAT) project is a python program running tests on our connector docker images. These tests execute commands on the container and perform assertions on the output. We containerize CAT and ran it so far with airbyte-ci using a docker-in-docker pattern:
Problems
We'd love to not rely on the docker-in-docker pattern for the following reasons:
Solution
To get rid of the docker-in-docker pattern, we'd love to make CAT use Dagger. Docker interactions with containers in CAT are located in a single class: ConnectorRunner.
Implementation
In CAT I refactored ConnectorRunner class to use Dagger instead of docker commands. In airbyte-ci I refactored our
with_connector_acceptance_test
function to build the CAT container and mount the connector image tarball to it.How airbyte-ci shares the connector under test image / container with CAT:
We share the build connector image from airbyte-ci by mounting the tarball file to the CAT container and use
dagger_client.container().import_(<tarball_file>)
to create the connnector container.I originally tried to share the connector container by sharing the ContainerID to CAT and instantiate the container in CAT with
dagger_client.container(dagger.ContainerId(<container_id>))
but it didn't work I occassionally face error likefailed to load cache key: unable to get info about digest: NotFound: rpc error: code = NotFound desc = content sha256:7e9d2045e9891e8eef3c917af3a5246a292c2b2c8c7f8c0d0b76a357d5439ea8:
.Current blockers
It's not working for heavy images
The current implementation works for lightweight images (~50mb). But when I try to run CAT on a heavy images (~500mb) the test execution hangs... My hypothesis is that the
import_
is a costly operation, and as we instantiate a new Dagger connection for each exec we want on to run on a container, we pay the cost of the import for each exec. We instantiate a new Dagger connection for each exec because the public methods ofConnectorRunner
function are synchronous, so we runanyio.run
inside them. I'd prefer to not change the public methods ofConnectorRunner
to be async as it would require to change the whole CAT codebase to be async.