mercadona / rele

Easy to use Google Pub/Sub
https://mercadonarele.readthedocs.io/en/latest/index.html
Apache License 2.0
200 stars 23 forks source link

Consumer stucks after recover of a lose of connectivity network #270

Open jBonoraW opened 10 months ago

jBonoraW commented 10 months ago

We have realised consumers stucks in the Re-established stream when we lose the network connectivity more than 1 minute and after recover it. This is an example:

{"levelname": "ERROR", "asctime": "2024-01-17 10:18:46,526", "module": "helper_threads", "process": 1, "thread": 281472426570208, "message": "Error in queue callback worker: Deadline of 60.0s exceeded while calling target function, last exception: 503 failed to connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection refused", "lineno": 120, "pathname": "/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/helper_threads.py", "exc_info": "Traceback (most recent call last):\n  File \"/usr/local/lib/python3.11/site-packages/google/api_core/grpc_helpers.py\", line 79, in error_remapped_callable\n    return callable_(*args, **kwargs)\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/grpc/_channel.py\", line 1160, in __call__\n    return _end_unary_response_blocking(state, call, False, None)\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/grpc/_channel.py\", line 1003, in _end_unary_response_blocking\n    raise _InactiveRpcError(state)  # pytype: disable=not-instantiable\n    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\ngrpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:\n\tstatus = StatusCode.UNAVAILABLE\n\tdetails = \"failed to connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection refused\"\n\tdebug_error_string = \"UNKNOWN:Error received from peer  {grpc_message:\"failed to connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection refused\", grpc_status:14, created_time:\"2024-01-17T10:18:46.524909861+00:00\"}\"\n>\n\nThe above exception was the direct cause of the following exception:\n\nTraceback (most recent call last):\n  File \"/usr/local/lib/python3.11/site-packages/google/api_core/retry.py\", line 207, in retry_target\n    result = target()\n             ^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/google/api_core/timeout.py\", line 120, in func_with_timeout\n    return func(*args, **kwargs)\n           ^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/google/api_core/grpc_helpers.py\", line 81, in error_remapped_callable\n    raise exceptions.from_grpc_error(exc) from exc\ngoogle.api_core.exceptions.ServiceUnavailable: 503 failed to connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection refused\n\nThe above exception was the direct cause of the following exception:\n\nTraceback (most recent call last):\n  File \"/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/helper_threads.py\", line 118, in __call__\n    self._callback(items)\n  File \"/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py\", line 187, in dispatch_callback\n    self.ack(ack_requests)\n  File \"/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py\", line 241, in ack\n    requests_completed, requests_to_retry = self._manager.send_unary_ack(\n                                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py\", line 637, in send_unary_ack\n    self._client.acknowledge(subscription=self._subscription, ack_ids=ack_ids)\n  File \"/usr/local/lib/python3.11/site-packages/google/pubsub_v1/services/subscriber/client.py\", line 1359, in acknowledge\n    rpc(\n  File \"/usr/local/lib/python3.11/site-packages/google/api_core/gapic_v1/method.py\", line 131, in __call__\n    return wrapped_func(*args, **kwargs)\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/google/api_core/retry.py\", line 372, in retry_wrapped_func\n    return retry_target(\n           ^^^^^^^^^^^^^\n  File \"/usr/local/lib/python3.11/site-packages/google/api_core/retry.py\", line 226, in retry_target\n    raise exceptions.RetryError(\ngoogle.api_core.exceptions.RetryError: Deadline of 60.0s exceeded while calling target function, last exception: 503 failed to connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection refused"}
{"levelname": "WARNING", "asctime": "2024-01-17 10:18:47,528", "module": "bidi", "process": 1, "thread": 281472384606688, "message": "Background thread did not exit.", "lineno": 717, "pathname": "/usr/local/lib/python3.11/site-packages/google/api_core/bidi.py"}
Exception in thread Thread-LeaseMaintainer:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/google/api_core/grpc_helpers.py", line 79, in error_remapped_callable
    return callable_(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/grpc/_channel.py", line 1160, in __call__
    return _end_unary_response_blocking(state, call, False, None)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/grpc/_channel.py", line 1003, in _end_unary_response_blocking
    raise _InactiveRpcError(state)  # pytype: disable=not-instantiable
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    status = StatusCode.UNAVAILABLE
    details = "failed to connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection refused"
    debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"failed to connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection refused", grpc_status:14, created_time:"2024-01-17T10:18:48.656946156+00:00"}"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/google/api_core/retry.py", line 207, in retry_target
    result = target()
             ^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/api_core/timeout.py", line 120, in func_with_timeout
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/api_core/grpc_helpers.py", line 81, in error_remapped_callable
    raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.ServiceUnavailable: 503 failed to connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection refused

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/threading.py", line 1038, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.11/threading.py", line 975, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py", line 201, in maintain_leases
    expired_ack_ids = self._manager._send_lease_modacks(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 1047, in _send_lease_modacks
    self._dispatcher.modify_ack_deadline(items, ack_deadline)
  File "/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py", line 358, in modify_ack_deadline
    _, requests_to_retry = self._manager.send_unary_modack(
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 726, in send_unary_modack
    self._client.modify_ack_deadline(
  File "/usr/local/lib/python3.11/site-packages/google/pubsub_v1/services/subscriber/client.py", line 1248, in modify_ack_deadline
    rpc(
  File "/usr/local/lib/python3.11/site-packages/google/api_core/gapic_v1/method.py", line 131, in __call__
    return wrapped_func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/api_core/retry.py", line 372, in retry_wrapped_func
    return retry_target(
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/google/api_core/retry.py", line 226, in retry_target
    raise exceptions.RetryError(
google.api_core.exceptions.RetryError: Deadline of 60.0s exceeded while calling target function, last exception: 503 failed to connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection refused
{"levelname": "INFO", "asctime": "2024-01-17 10:18:55,184", "module": "bidi", "process": 1, "thread": 281472392999392, "message": "Re-established stream", "lineno": 494, "pathname": "/usr/local/lib/python3.11/site-packages/google/api_core/bidi.py"}
...
{"levelname": "INFO", "asctime": "2024-01-17 10:23:15,254", "module": "bidi", "process": 1, "thread": 281472392999392, "message": "Re-established stream", "lineno": 494, "pathname": "/usr/local/lib/python3.11/site-packages/google/api_core/bidi.py"}

We have dig it and even we have a way to cancel the StreamingPullFuture and restart the consumption, the futures go in a PENDING status after recover the network connectivity but are not consuming at all, once is in this inconsistent status nothing can recover it. Looks like after reaching first timeout google denies the connectivity. We found a issue we think is the source of this https://github.com/grpc/grpc/issues/20562.

So we think we can do a workaround identifying this problem and raising an exception, letting know we reach this unrecoverable error.