Open kamilglod opened 1 year ago
I'm seeing a similar issue with google-cloud-bigquery-storage
version: 2.22.0
.
In my case I'm not using an async iterator though. Instead, I'm calling the send
method of an AppendRowsStream
instance.
More context:
AppendRowsStream
. I do this by instantiating it inside a Python module, and then importing that instance where I need it.gevent
loop, and multiple greenlets
(each should see the same AppendRowsStream
instance).503 recvmsg:Connection reset by peer
, which for some reason is not being handled correctly.Let me know if I can be of any help (e.g. by providing additional details).
Thank you @kamilglod and @abannura. Could you provide a minimum code snippet that can reproduce the error?
@Linchin it's not easy to repeat as it's happening occasionally, the easiest way to reproduce it is probably by setup the simplest streaming example, run it for longer than 1 day and wait for ServiceUnavailable 503 recvmsg:Connection reset by peer
or some other server related errors. From the code it should be retried, but what it's happening is error is raised immediately without any retry.
@kamilglod Could you tell me which exact class you are using? Still, a code snippet would help a lot, it reduces lots of ambiguities.
@abannura For class AppendRowsStream
, I think retry and timeout are not supported at this moment, see here.
@Linchin sure, I can simplify my code to something like:
import logging
from typing import AsyncIterator, Sequence
from google.api_core import exceptions as core_exceptions
from google.api_core import retry as retries
from google.cloud.bigquery_storage_v1.services.big_query_write.async_client import (
BigQueryWriteAsyncClient,
)
from google.cloud.bigquery_storage_v1.types import (
AppendRowsRequest,
CreateWriteStreamRequest,
WriteStream,
)
from google.protobuf.message import Message
from google.rpc.code_pb2 import Code
logger = logging.getLogger(__name__)
# same as the default, but with more exceptions in predicate
DEFAULT_RETRY = retries.Retry(
initial=0.1,
maximum=60.0,
multiplier=1.3,
timeout=86400.0,
predicate=retries.if_exception_type(
core_exceptions.ServiceUnavailable,
core_exceptions.Aborted,
core_exceptions.InternalServerError,
core_exceptions.BadGateway,
core_exceptions.GatewayTimeout,
),
on_error=lambda exc: logger.warning("BQ stream retriable error.", exc_info=exc),
)
async def stream(table_path: str, messages: AsyncIterator[Sequence[Message]]):
client = BigQueryWriteAsyncClient()
write_stream = await client.create_write_stream(
CreateWriteStreamRequest(
parent=table_path,
write_stream=WriteStream(type_=WriteStream.Type.COMMITTED),
)
)
stream = await client.append_rows(
_inner_stream(messages, write_stream.name), retry=DEFAULT_RETRY
)
async for result in stream:
if result.error.code != Code.OK:
raise Exception(
f"Unexpected result {result.error.code} {result.error.message}"
)
async def _inner_stream(
self, messages: AsyncIterator[Sequence[Message]], stream_name: str
) -> AsyncIterator[AppendRowsRequest]:
...
and when the messages
stream is open for a long time we're getting errors that are not repeated. Good example of it is Aborted
exception after 10 minutes of inactivity: it should be retried based on the retry
configuration but it's not. It's most likely connected with the issue you linked.
I am able to reproduce the issue by intentionally making the append row request invalid:
Traceback (most recent call last):
File "/usr/local/google/home/xxxxxxxxxxxx/micromamba/envs/dev-3.11/lib/python3.11/site-packages/google/api_core/grpc_helpers_async.py", line 102, in _wrapped_aiter
async for response in self._call: # pragma: no branch
File "/usr/local/google/home/xxxxxxxxxxxx/micromamba/envs/dev-3.11/lib/python3.11/site-packages/grpc/aio/_call.py", line 326, in _fetch_stream_responses
await self._raise_for_status()
File "/usr/local/google/home/xxxxxxxxxxxx/micromamba/envs/dev-3.11/lib/python3.11/site-packages/grpc/aio/_call.py", line 236, in _raise_for_status
raise _create_rpc_error(await self.initial_metadata(), await
grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
status = StatusCode.INVALID_ARGUMENT
details = "Invalid stream name. Entity: projects/xxxxxxxxxxxx-testing/datasets/created_dataset/tables/datetime_strings/streams/Cic2ZTQxYTIzZC0wMDAwLTI5MDctYWFjYi0wODllMDgzMjJjYzQ6czQasadaa"
debug_error_string = "UNKNOWN:Error received from peer ipv4:108.177.98.95:443 {grpc_message:"Invalid stream name. Entity: projects/xxxxxxxxxxxx-testing/datasets/created_dataset/tables/datetime_strings/streams/Cic2ZTQxYTIzZC0wMDAwLTI5MDctYWFjYi0wODllMDgzMjJjYzQ6czQasadaa", grpc_status:3, created_time:"2023-11-16T19:24:38.990469091+00:00"}"
>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/google/home/xxxxxxxxxxxx/github.com/googleapis/python-bigquery-storage/samples/snippets/append_rows_retry_committed.py", line 222, in <module>
asyncio.run(main())
File "/usr/local/google/home/xxxxxxxxxxxx/micromamba/envs/dev-3.11/lib/python3.11/asyncio/runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/usr/local/google/home/xxxxxxxxxxxx/micromamba/envs/dev-3.11/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/google/home/xxxxxxxxxxxx/micromamba/envs/dev-3.11/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/usr/local/google/home/xxxxxxxxxxxx/github.com/googleapis/python-bigquery-storage/samples/snippets/append_rows_retry_committed.py", line 218, in main
await stream(write_stream_value)
File "/usr/local/google/home/xxxxxxxxxxxx/github.com/googleapis/python-bigquery-storage/samples/snippets/append_rows_retry_committed.py", line 117, in stream
async for item in result:
File "/usr/local/google/home/xxxxxxxxxxxx/micromamba/envs/dev-3.11/lib/python3.11/site-packages/google/api_core/grpc_helpers_async.py", line 105, in _wrapped_aiter
raise exceptions.from_grpc_error(rpc_error) from rpc_error
google.api_core.exceptions.InvalidArgument: 400 Invalid stream name. Entity: projects/xxxxxxxxxxxx-testing/datasets/created_dataset/tables/datetime_strings/streams/Cic2ZTQxYTIzZC0wMDAwLTI5MDctYWFjYi0wODllMDgzMjJjYzQ6czQasadaa [detail: "[ORIGINAL ERROR] generic::invalid_argument: Invalid stream name.; Cannot parse write_stream `projects/xxxxxxxxxxxx-testing/datasets/created_dataset/tables/datetime_strings/streams/Cic2ZTQxYTIzZC0wMDAwLTI5MDctYWFjYi0wODllMDgzMjJjYzQ6czQasadaa` from request while initializing GWS logs. [google.rpc.error_details_ext] { message: \"Invalid stream name. Entity: projects/xxxxxxxxxxxx-testing/datasets/created_dataset/tables/datetime_strings/streams/Cic2ZTQxYTIzZC0wMDAwLTI5MDctYWFjYi0wODllMDgzMjJjYzQ6czQasadaa\" }"
]
@Linchin please try to reproduce it by
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/google/api_core/grpc_helpers_async.py", line 102, in _wrapped_aiter
async for response in self._call: # pragma: no branch
File "/usr/local/lib/python3.11/site-packages/grpc/aio/_call.py", line 326, in _fetch_stream_responses
await self._raise_for_status()
File "/usr/local/lib/python3.11/site-packages/grpc/aio/_call.py", line 236, in _raise_for_status
raise _create_rpc_error(await self.initial_metadata(), await
grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
status = StatusCode.ABORTED
details = "Closing the stream because it has been inactive for 600 seconds. Entity: projects/XXX/streams/Cic2ZTU2MjJhZi0wMDAwLTI2ODMtOTc0MS01ODI0MjlhNjE5MTQ6czk"
debug_error_string = "UNKNOWN:Error received from peer ipv4:64.233.164.95:443 {created_time:"2023-11-20T03:35:12.366249827+00:00", grpc_status:10, grpc_message:"Closing the stream because it has been inactive for 600 seconds. Entity: projects/XXX/streams/Cic2ZTU2MjJhZi0wMDAwLTI2ODMtOTc0MS01ODI0MjlhNjE5MTQ6czk"}"
>
The above exception was the direct cause of the following exception:
Traceback (most recent call last): File "/app/src/repositories/exceptions/bigquery.py", line 122, in _stream_exceptions_task async for attempt in tenacity.AsyncRetrying( File "/usr/local/lib/python3.11/site-packages/tenacity/_asyncio.py", line 71, in anext do = self.iter(retry_state=self._retry_state) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/tenacity/init.py", line 314, in iter return fut.result() ^^^^^^^^^^^^ File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result return self.get_result() ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in get_result raise self._exception File "/app/src/repositories/exceptions/bigquery.py", line 148, in _stream_exceptions_task await exceptions_bq_stream.stream(self.exceptions_iterator) File "/usr/local/lib/python3.11/site-packages/XXX/clients/gcp_big_query/gcp_big_query_stream.py", line 137, in stream await self._validate_stream_result(stream) File "/usr/local/lib/python3.11/site-packages/XXX/clients/gcp_big_query/gcp_big_query_stream.py", line 186, in _validate_stream_result async for result in stream: File "/usr/local/lib/python3.11/site-packages/google/api_core/grpc_helpers_async.py", line 105, in _wrapped_aiter raise exceptions.from_grpc_error(rpc_error) from rpc_error google.api_core.exceptions.Aborted: 409 Closing the stream because it has been inactive for 600 seconds. Entity: projects/XXX/streams/Cic2ZTU2MjJhZi0wMDAwLTI2ODMtOTc0MS01ODI0MjlhNjE5MTQ6czk
Here are some other errors that I logged in last couple of days that should be retried based on the stream `DEFAULT_RETRY`:
ServiceUnavailable 503 Socket closed ServiceUnavailable 503 recvmsg:Connection reset by peer InternalServerError 500 Received RST_STREAM with error code 2 Closing the stream because server is restarted. This is expected and client is advised to reconnect.
After some learning I think the retry is actually working as intended here. The retry
we are setting up here is only for establishing the connection, rather than sending the data. Right now we don't support retries at this level, so if you really need it, you will have to implement it yourself.
There is some work going on in the core client library to support retries in streaming: https://github.com/googleapis/python-api-core/pull/495. However, this will only support server streaming, not the client streaming or bidirectional streaming.
Thanks to @daniel-sanche @leahecole @shollyman for helping me understand the situation.
@Linchin what do you mean by server streaming, client streaming and bidirectional streaming?
Here are the official definitions in gRPC: https://grpc.io/docs/what-is-grpc/core-concepts/#server-streaming-rpc
tl;dr: server streaming is when streaming is sent from server to user (like youtube), and client streaming is the other way around. In your case we are streaming stuff to the server, so it's client streaming.
@Linchin thanks, now it's clear.
Do you have any best practise how to handle this server error? I handled it by simply restarting the stream (I'm using tenacity to retry in case of an error), but the problem is that I'm probably loosing one row that should be inserted (it might be consumed from iterator by stream) and I'm not sure how I can check if it was inserted into the table or not. Server might fail before or after handling row and it's not idempotent to send the same message to the different stream.
The read client handles reconnection to the server in the following way: https://github.com/googleapis/python-bigquery-storage/blob/1683879b44477ae849f68e350d38143f88b657aa/google/cloud/bigquery_storage_v1/reader.py#L165
I don't think we implemented anything similar for the write API yet.
Edit: Potentially, we might want to add "reopen" or something like that to our send method, since we can't do a plain loop in the way that we did for reads.
The complication with the write API is that we might have a bunch of requests that we're waiting for a response on. So we might need to resend that backlog queue of requests.
Server might fail before or after handling row and it's not idempotent to send the same message to the different stream.
Perhaps @yirutang can comment on this? I see that https://github.com/googleapis/java-bigquerystorage/blob/main/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java#L707 always resends any in-flight requests.
Default configuration for append_rows() call here and here sets that by default request would be retired in case of
google.api_core.exceptions.ServiceUnavailable
exception with a timeout of 1 day. However I observed that in case of this server response it raises this exception without retrying the call.Environment details
3.11.2
22.3.1
google-cloud-bigquery-storage
version:2.19.0
Steps to reproduce
append_rows()
predicate
Stack trace