googleapis / python-pubsub

Apache License 2.0
378 stars 197 forks source link

Possible race condition between `_on_response` and `close` #997

Open hson2 opened 10 months ago

hson2 commented 10 months ago

Environment details

Steps to reproduce

  1. Run code sample indefinitely
  2. Sometime it shows Assertion error in _on_response function.

Code example

def on_subscribe(subscription, until=None):
    """Decorator factory that provides subscribed messages to function.
    It handle decorated function as callback. So message should be acked/nacked
    inside decorated function.

    Args:
        subscription (str): Subscription ID. 
            Should be `projects/{PROJECT_ID}/subscriptions/{SUBSCRIPTION_ID}`
        until (datetime.datetime): This function will subscribe messages
            published until this timestamp.
    """

    def _callback_factory(func, finished, subscribe_until, **kwargs):

        def _callback(message):
            """Callback function.

            It sends signal if subscribed all messages.
            """
            publish_time = datetime.fromtimestamp(
                message.publish_time.timestamp())
            if subscribe_until and publish_time <= subscribe_until:
                return func(message, **kwargs)
            if subscribe_until and not finished.is_set():
                logging.info('Subscribed all messages published until %s',
                             subscribe_until)
                finished.set()
                message.nack()

        return _callback

    def _wrapper(func):

        @functools.wraps(func)
        def _inner_wrapper(**kwargs):
            # Event variable that is triggered when all messages are subscribed
            all_subscribed = Event()

            callback = _callback_factory(func=func,
                                         finished=all_subscribed,
                                         subscribe_until=subscribe_until,
                                         **kwargs)

            # Ensure closing subscriber for memory leak prevention.
            with pubsub_v1.SubscriberClient() as subscriber:
                future = subscriber.subscribe(
                    subscription=subscription,
                    callback=callback,
                    await_callbacks_on_shutdown=True,
                    flow_control=pubsub_v1.types.FlowControl(max_messages=5000),
                )

                all_subscribed.wait(timeout=60)

                # Wait until future is finished when it's cancelled.
                # If it cancelled by timeout or keyboard interrupt, ignore it.
                try:
                    future.cancel()
                    future.result(timeout=60)
                except (KeyboardInterrupt, TimeoutError):
                    pass
                except Exception as e:
                    logging.error("Error occurs during subscription to %s: %s",
                                  subscription, str(e))

        return _inner_wrapper

    return _wrapper

@on_subscribe(subscription="SUBSCRIPTION")
def callback(message):
    # Do something with message

Stack trace

Traceback (most recent call last):
  File "/layers/google.python.pip/pip/lib/python3.11/site-packages/google/api_core/bidi.py", line 657, in _thread_main
    self._on_response(response)
  File "/layers/google.python.pip/pip/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 1107, in _on_response
    assert self._scheduler is not None

Explanation

It's because future.cancel() executes manager.close() which makes _scheduler as None and it makes _on_response raise AssertionError.

Maybe it has to be protected by threading lock somehow.

liuyunnnn commented 10 months ago

Thank you for the detailed report. I'll try to reproduce this and fix accordingly. In the meanwhile, I'm wondering why you choose to cancel first and then wait for result here

future.cancel()
future.result(timeout=60)

instead of a common use case as the sample shows. This could be a mitigation for the issue you are experiencing.

hson2 commented 10 months ago

Thank you for the detailed report. I'll try to reproduce this and fix accordingly.

In the meanwhile, I'm wondering why you choose to cancel first and then wait for result here


future.cancel()

future.result(timeout=60)

instead of a common use case as the sample shows. This could be a mitigation for the issue you are experiencing.

Because I wanted to subscribe messages until certain timestamp (which turns out wrong idea 😅) and make callback to signal event if it subscribed all messages by using threading.Event object.

If I call result first without timeout then it will wait indefinitely and if I set timeout then it will raise TimeoutError at second call of result. I wanted to just cancel it first to make it avoid raising error.

chase-peach commented 6 months ago

We are also seeing this issue.

Python version: 3.11 pip version: 23.3.1 google-cloud-pubsub version: 2.14.1

We are running in Google Kubernetes Engine (GKE) and listening to the following two signals in our worker for scaling:

import signal
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)

With a exit_gracefully implemented as follows:

def exit_gracefully(self) -> None:
    """Stop the async worker."""
    self.future.cancel()  # Request shutdown
    try:
        self.future.result(timeout=60 * 5)  # Block until the shutdown is complete (or up to 5min).
    except TimeoutError:
        self.logger.warning("Stop timeout reached, Pod will die.")
    except Exception as e:
        self.logger.error(f"Error while shutting down worker: {e}")

with the following setup:

self.subscriber = pubsub_v1.SubscriberClient()

flow_control = pubsub_v1.types.FlowControl(max_messages=self.pub_sub_flow_control_max_messages)
executor = futures.ThreadPoolExecutor(
    max_workers=self.pub_sub_thread_pool_executor_max_workers, thread_name_prefix="TPE-PeachWorker"
)
scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(executor=executor)

self.future = self.subscriber.subscribe(
    self.subscription_path,
    callback=self.pubsub_callback,
    flow_control=flow_control,
    scheduler=scheduler,
    await_callbacks_on_shutdown=True,
)

We do our main work in a separate thread from the main thread to allow python to receive signals:

# Execute the wait on the future for streaming pull in a different thread
# because Python's main thread is the only one that will receive signals such as
# SIGTERM for when a pod is requested to die so it can be replaced with a new one.
thread = Thread(target=self.process, args=(), daemon=True)
thread.start()

With the process method implemented as follows:

def process(self):
    with self.subscriber:
        try:
            self.future.result()
        except Cancelled:
            self.logger.info("Cancel request received.")

We are seeing the same scheduler assertion error that @hson2 saw:

AssertionError: null
  File "threading.py", line 982, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 921, in _shutdown
    assert self._scheduler is not None

However, we are also seeing another error immediately before:

AttributeError: 'NoneType' object has no attribute 'nack'
  File "threading.py", line 982, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 948, in _shutdown
    msg.nack()

@liuyunnnn Could you expand on the preference to not call cancel first in a scenario like this?

Thank you for your help :)!