googleapis / python-storage

Apache License 2.0
448 stars 154 forks source link

Issue: `ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))` #1294

Closed yohann84L closed 4 months ago

yohann84L commented 4 months ago

We are encountering a ConnectionError while using the cloud storage library in our application. It happens quite rarely 1 to 5 times per month on +20k requests.

Environment details

Steps to reproduce

The error occurs during the execution of an API endpoint responsible for uploading images or files to Google Cloud Storage.

Code example

Nothing particular, just a simple:

blob.upload_from_file(image.file)
# with image being a fastapi.UploadFile

Stack trace

The function I use:

WouldBlock: null
  File "anyio/streams/memory.py", line 98, in receive
    return self.receive_nowait()
  File "anyio/streams/memory.py", line 93, in receive_nowait
    raise WouldBlock
EndOfStream: null
  File "starlette/middleware/base.py", line 78, in call_next
    message = await recv_stream.receive()
  File "anyio/streams/memory.py", line 118, in receive
    raise EndOfStream
ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
  File "starlette/applications.py", line 122, in __call__
    await self.middleware_stack(scope, receive, send)
  File "starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "starlette/middleware/base.py", line 108, in __call__
    response = await self.dispatch_func(request, call_next)
  File "app/services/fastapi_sqlalchemy/middleware.py", line 65, in dispatch
    response = await call_next(request)
  File "starlette/middleware/base.py", line 84, in call_next
    raise app_exc
  File "starlette/middleware/base.py", line 70, in coro
    await self.app(scope, receive_or_disconnect, send_no_error)
  File "starlette/middleware/cors.py", line 83, in __call__
    await self.app(scope, receive, send)
  File "starlette/middleware/exceptions.py", line 79, in __call__
    raise exc
  File "starlette/middleware/exceptions.py", line 68, in __call__
    await self.app(scope, receive, sender)
  File "fastapi/middleware/asyncexitstack.py", line 20, in __call__
    raise e
  File "fastapi/middleware/asyncexitstack.py", line 17, in __call__
    await self.app(scope, receive, send)
  File "starlette/routing.py", line 718, in __call__
    await route.handle(scope, receive, send)
  File "starlette/routing.py", line 276, in handle
    await self.app(scope, receive, send)
  File "starlette/routing.py", line 66, in app
    response = await func(request)
  File "app/api/routes/context.py", line 37, in custom_route_handler
    response: Response = await original_route_handler(request)
  File "fastapi/routing.py", line 273, in app
    raw_response = await run_endpoint_function(
  File "fastapi/routing.py", line 192, in run_endpoint_function
    return await run_in_threadpool(dependant.call, **values)
  File "starlette/concurrency.py", line 41, in run_in_threadpool
    return await anyio.to_thread.run_sync(func, *args)
  File "anyio/to_thread.py", line 33, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
    return await future
  File "anyio/_backends/_asyncio.py", line 807, in run
    result = context.run(func, *args)
  File "app/services/sentry_wrapper.py", line 45, in wrapper
    return fn(*args, **kwargs)
  File "app/api/security/authorizations.py", line 60, in wrapper
    return func(*args, **kwargs)
  File "app/api/routes/tray_process/analyze.py", line 213, in analyze_full_csi
    result = analyze_queries.analyze(photo, current_user, restaurant_id)
  File "app/db/queries/tray_process/analyze.py", line 208, in analyze
    bucket_filepath = cls.upload_image(
  File "app/db/queries/base.py", line 141, in upload_image
    blob.upload_from_file(image.file)
  File "/usr/local/lib/python3.8/site-packages/google/cloud/storage/blob.py", line 2796, in upload_from_file
    self._prep_and_do_upload(
  File "/usr/local/lib/python3.8/site-packages/google/cloud/storage/blob.py", line 2637, in _prep_and_do_upload
    created_json = self._do_upload(
  File "/usr/local/lib/python3.8/site-packages/google/cloud/storage/blob.py", line 2460, in _do_upload
    response = self._do_resumable_upload(
  File "/usr/local/lib/python3.8/site-packages/google/cloud/storage/blob.py", line 2278, in _do_resumable_upload
    upload, transport = self._initiate_resumable_upload(
  File "/usr/local/lib/python3.8/site-packages/google/cloud/storage/blob.py", line 2152, in _initiate_resumable_upload
    upload.initiate(
  File "/usr/local/lib/python3.8/site-packages/google/resumable_media/requests/upload.py", line 420, in initiate
    return _request_helpers.wait_and_retry(
  File "/usr/local/lib/python3.8/site-packages/google/resumable_media/requests/_request_helpers.py", line 178, in wait_and_retry
    raise error
  File "/usr/local/lib/python3.8/site-packages/google/resumable_media/requests/_request_helpers.py", line 155, in wait_and_retry
    response = func()
  File "/usr/local/lib/python3.8/site-packages/google/resumable_media/requests/upload.py", line 412, in retriable_request
    result = transport.request(
  File "/usr/local/lib/python3.8/site-packages/google/auth/transport/requests.py", line 541, in request
    response = super(AuthorizedSession, self).request(
  File "requests/sessions.py", line 529, in request
    resp = self.send(prep, **send_kwargs)
  File "requests/sessions.py", line 645, in send
    r = adapter.send(request, **kwargs)
  File "requests/adapters.py", line 501, in send
    raise ConnectionError(err, request=request)

We appreciate any insights or suggestions on how to resolve this issue.

Thank you!

sassanuv commented 4 months ago

I'm also seeing the same issue, when are are uploading files within a cloud composer task instance.

cojenco commented 4 months ago

Thanks for the question. The python client library can retry ConnectionError, IF the request is made to be idempotent by including request preconditions. Are you using preconditions for the uploads?

In GCS uploads are conditionally idempotent, that is, they are only safe to retry with a precondition. Could you try including the precondition if_generation_match to your upload requests?

In your code sample, assuming the upload is for a new file, set if_generation_match=0to enable retries

blob.upload_from_file(image.file, if_generation_match=0)

Here is a sample code that demonstrates adding the generation match precondition to an upload. The retry strategy is outlined in https://cloud.google.com/storage/docs/retry-strategy#python. This docs go into detail about retryable exceptions, client retry config options, and which operations are conditionally idempotent.

cojenco commented 4 months ago

Hope this was helpful. Please open a new issue if you have further questions.