PrefectHQ / prefect-airbyte

https://PrefectHQ.github.io/prefect-airbyte/
Apache License 2.0
40 stars 5 forks source link

Successful Airbyte syncs reporting as failed in Prefect, causing workflows to show as failed #64

Open mikebaldry-acu opened 9 months ago

mikebaldry-acu commented 9 months ago

We are experiencing intermittent "false negatives" (where an Airbyte sync succeeds but is reported as failed by the Prefect server) when triggering Airbyte connections in Prefect workflows. These failures occur more often when running multiple concurrent syncs, but aren't limited to only those times. The errors appear to be driven by timeouts, where the Airbyte server doesn't report status back to Prefect in a given timeframe.

Technical specs for the tools we're currently running:

Downloading flow code from storage at '/usr/local/src/data_engineering'
08:12:11 AM
prefect.flow_runs
Created task run 'Update <Redacted> Airbyte Connection-0' for task 'Update <Redacted> Airbyte Connection'
08:13:00 AM
prefect.flow_runs
Executing 'Update <Redacted> Airbyte Connection-0' immediately...
08:13:00 AM
prefect.flow_runs
Got source file <Redacted>
08:13:01 AM
Update <Redacted> Airbyte Connection-0
prefect.task_runs
Successfully updated source file <Redacted>
08:13:01 AM
Update <Redacted> Airbyte Connection-0
prefect.task_runs
Finished in state Completed()
08:13:01 AM
Update <Redacted> Airbyte Connection-0
prefect.task_runs
Created task run 'Run Airbyte Sync-0' for task 'Run Airbyte Sync'
08:13:01 AM
prefect.flow_runs
Executing 'Run Airbyte Sync-0' immediately...
08:13:01 AM
prefect.flow_runs
Starting Airbyte sync for <Redacted>
08:13:04 AM
Run Airbyte Sync-0
prefect.task_runs
Triggering Airbyte Connection <Redacted>, in workspace at '<Redacted>/api/v1'
08:13:04 AM
Run Airbyte Sync-0
prefect.task_runs
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 34, in read
    return await self._stream.receive(max_bytes=max_bytes)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 1265, in receive
    await self._protocol.read_event.wait()
  File "/usr/lib/python3.10/asyncio/locks.py", line 214, in wait
    await fut
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_exceptions.py", line 10, in map_exceptions
    yield
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 32, in read
    with anyio.fail_after(timeout):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/anyio/_core/_tasks.py", line 118, in __exit__
    raise TimeoutError
TimeoutError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
    yield
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
    raise exc
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
    response = await connection.handle_async_request(request)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
    return await self._connection.handle_async_request(request)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 112, in handle_async_request
    raise exc
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 91, in handle_async_request
    ) = await self._receive_response_headers(**kwargs)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 155, in _receive_response_headers
    event = await self._receive_event(timeout=timeout)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 191, in _receive_event
    data = await self._network_stream.read(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 31, in read
    with map_exceptions(exc_map):
  File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
    raise to_exc(exc)
httpcore.ReadTimeout

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/engine.py", line 1719, in orchestrate_task_run
    result = await call.aresult()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/tmp/tmp9st250atprefect/orchestration/flows/<Redacted>_archive/workflow.py", line 54, in run_airbyte_sync
    job_run = connection.trigger()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 255, in coroutine_wrapper
    return call()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 383, in __call__
    return self.result()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
    result = await coro
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect_airbyte/connections.py", line 367, in trigger
    (job_id, _,) = await airbyte_client.trigger_manual_sync_connection(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect_airbyte/client.py", line 151, in trigger_manual_sync_connection
    response = await self._client.post(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1848, in post
    return await self.request(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1533, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1620, in send
    response = await self._send_handling_auth(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1648, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1685, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1722, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 352, in handle_async_request
    with map_httpcore_exceptions():
  File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.ReadTimeout
08:13:09 AM
Run Airbyte Sync-0
prefect.task_runs
Finished in state Failed('Task run encountered an exception ReadTimeout: ')
08:13:12 AM
Run Airbyte Sync-0
prefect.task_runs
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 34, in read
    return await self._stream.receive(max_bytes=max_bytes)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 1265, in receive
    await self._protocol.read_event.wait()
  File "/usr/lib/python3.10/asyncio/locks.py", line 214, in wait
    await fut
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_exceptions.py", line 10, in map_exceptions
    yield
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 32, in read
    with anyio.fail_after(timeout):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/anyio/_core/_tasks.py", line 118, in __exit__
    raise TimeoutError
TimeoutError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
    yield
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
    raise exc
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
    response = await connection.handle_async_request(request)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
    return await self._connection.handle_async_request(request)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 112, in handle_async_request
    raise exc
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 91, in handle_async_request
    ) = await self._receive_response_headers(**kwargs)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 155, in _receive_response_headers
    event = await self._receive_event(timeout=timeout)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 191, in _receive_event
    data = await self._network_stream.read(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 31, in read
    with map_exceptions(exc_map):
  File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
    raise to_exc(exc)
httpcore.ReadTimeout

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/engine.py", line 833, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/tmp/tmp9st250atprefect/orchestration/flows/<Redacted>_archive/workflow.py", line 73, in <Redacted>_sftp_sync
    res = run_airbyte_sync(file.connection_id)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/tasks.py", line 505, in __call__
    return enter_task_run_engine(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/engine.py", line 1137, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
    result = await coro
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/engine.py", line 1302, in get_task_call_return_value
    return await future._result()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/engine.py", line 1719, in orchestrate_task_run
    result = await call.aresult()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 316, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/tmp/tmp9st250atprefect/orchestration/flows/<Redacted>_archive/workflow.py", line 54, in run_airbyte_sync
    job_run = connection.trigger()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 255, in coroutine_wrapper
    return call()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 383, in __call__
    return self.result()
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 283, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 169, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
    result = await coro
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect_airbyte/connections.py", line 367, in trigger
    (job_id, _,) = await airbyte_client.trigger_manual_sync_connection(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/prefect_airbyte/client.py", line 151, in trigger_manual_sync_connection
    response = await self._client.post(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1848, in post
    return await self.request(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1533, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1620, in send
    response = await self._send_handling_auth(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1648, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1685, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_client.py", line 1722, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 352, in handle_async_request
    with map_httpcore_exceptions():
  File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/src/data_engineering/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.ReadTimeout
08:13:12 AM
prefect.flow_runs
Finished in state Failed('Flow run encountered an exception. ReadTimeout: ')
davidfromtandym commented 9 months ago

Also running into this issue, seemingly out of nowhere. I've tried increasing the timeout parameter for the trigger_sync well beyond what the poll interval for the Airbyte server is set at. That seemed to help a bit, but still encountering frequent errors...

hawkaa commented 5 months ago

We are also seeing this issue on our end!

a-monteiro commented 4 months ago

Any updates here? This happens quite a lot and is especially an issue for larger syncs.

Ishankoradia commented 1 month ago

Any updates on this issue, we are also facing this.