googleapis / google-cloud-python

Google Cloud Client Library for Python
https://googleapis.github.io/google-cloud-python/
Apache License 2.0
4.86k stars 1.53k forks source link

Gevent greenlet stuck when using stream recognize request in a separate thread. #12535

Open dongzeli95 opened 8 months ago

dongzeli95 commented 8 months ago

import logging
import gevent
from google.cloud import speech_v1
import queue
import sentry_sdk

from gevent import spawn

# https://github.com/googleapis/google-cloud-python/blob/main/packages/google-cloud-speech/samples/generated_samples/speech_v1_generated_speech_streaming_recognize_async.py

logger = logging.getLogger('streaming_session')
class StreamingSession:
    END_OF_STREAM = object()
    def __init__(self, sample_rate=16000):
        # Initialize the Google Speech-to-Text client
        self.client = speech_v1.SpeechClient()

        # Configure the streaming request
        self.streaming_config = speech_v1.StreamingRecognitionConfig(
            config=speech_v1.RecognitionConfig(
                encoding=speech_v1.RecognitionConfig.AudioEncoding.LINEAR16,
                sample_rate_hertz=sample_rate,
                language_code="en-US",
                enable_automatic_punctuation=True,
                enable_spoken_punctuation=True,
            ),
            interim_results=False
        )

        # Initialize the request generator
        self.__start_session()

    def reset(self):
        """Reset the streaming session."""
        self.__start_session()

    def __start_session(self):
        """Resets the streaming session to start a new audio stream."""
        self.queue = queue.Queue()
        self.stream_recognize_task = spawn(self.stream_recognize)
        # gevent.sleep(0)
        self.transcription = ""
        self.closed = False

    def request_generator(self):
        """Generator function that yields streaming requests."""
        # Then, continuously send audio chunks as they arrive
        while not self.closed:
            try:
                # Use a blocking get() to wait for the first chunk
                chunk = self.queue.get()
                if chunk is self.END_OF_STREAM:
                    print("return here: END OF STREAM")
                    return
                data = [chunk]

                # Drain the queue of any additional data
                while True:
                    try:
                        chunk = self.queue.get(block=False)
                        if chunk is self.END_OF_STREAM:
                            self.closed = True
                            print("set closed to true")
                            break
                        data.append(chunk)
                    except queue.Empty:
                        break

                yield speech_v1.StreamingRecognizeRequest(audio_content=b"".join(data))
            except Exception as e:
                sentry_sdk.capture_exception(e)
                break

    def add_chunk(self, chunk):
        """Send an audio chunk to the streaming session in a non-blocking manner."""
        self.queue.put(chunk, block=False)

    def stream_recognize(self):
        return self.client.streaming_recognize(config=self.streaming_config,
                                                         requests=self.request_generator())

    def get_transcription(self):
        """Finalize the session and return the complete transcription."""
        self.queue.put(self.END_OF_STREAM)
        self.stream_recognize_task.join()
        responses = self.stream_recognize_task.get()

        # Process responses from the stream
        try:
            for response in responses:
                # Check for the presence of results in the response
                if not response.results:
                    continue

                # Iterate over the results in the response
                for result in response.results:
                    # Check if the result is a final result
                    if result.is_final:
                        # Get the best transcription from the final result
                        self.transcription += result.alternatives[0].transcript + ' '
                    else:
                        sentry_sdk.capture_message("GCS streaming result is not final")
        except Exception as e:
            self.client = speech_v1.SpeechClient()
            raise e

        # Capture sentry alert.
        if len(self.transcription) == 0:
            sentry_sdk.capture_message("GCS streaming transcription is empty, recreate speech client...")
            self.client = speech_v1.SpeechClient()

        return self.transcription

This is my code for running each recognize task in a separate gevent greenlet and this causes the greenlet to stuck:

+--- <Greenlet "Greenlet-2" at 0xffff6c59d860: spawn_greenlets>
 :          Parent: <Hub '' at 0xffff819ffd60 epoll default pending=0 ref=116 fileno=7 resolver=<gevent.resolver.thread.Resolver at 0xffff75a3ad90 pool=<ThreadPool at 0xffff819c86d0 tasks=11 size=10 maxsize=10 hub=<Hub at 0xffff819ffd60 thread_ident=0xffff83b5b020>>> threadpool=<ThreadPool at 0xffff819c86d0 tasks=11 size=10 maxsize=10 hub=<Hub at 0xffff819ffd60 thread_ident=0xffff83b5b020>> thread_ident=0xffff83b5b020>
 :          Running:
 :            File "/usr/local/lib/python3.8/site-packages/gevent/pool.py", line 161, in apply
 :              return self.spawn(func, *args, **kwds).get()
 :          Spawned at:
 :            File "/app/app/audiohub/audiohub_client.py", line 48, in add_chunk
 :              self.__init_states(session_id, sample_rate=sample_rate)
 :            File "/app/app/audiohub/audiohub_client.py", line 190, in __init_states
 :              self.streaming_sessions[session_id] = StreamingSession(sample_rate=sample_rate)
 :            File "/app/app/audiohub/gcp/streaming_session.py", line 15, in __init__
 :              self.client = speech_v1.SpeechClient()
 :            File "/usr/local/lib/python3.8/site-packages/google/cloud/speech_v1/services/speech/client.py", line 461, in __init__
 :              self._transport = Transport(
 :            File "/usr/local/lib/python3.8/site-packages/google/cloud/speech_v1/services/speech/transports/grpc.py", line 162, in __init__
 :              self._grpc_channel = type(self).create_channel(
 :            File "/usr/local/lib/python3.8/site-packages/google/cloud/speech_v1/services/speech/transports/grpc.py", line 217, in create_channel
 :              return grpc_helpers.create_channel(
 :            File "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 386, in create_channel
 :              return grpc.secure_channel(
 :            File "/usr/local/lib/python3.8/site-packages/grpc/__init__.py", line 2119, in secure_channel
 :              return _channel.Channel(
 :            File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 2056, in __init__
 :              cygrpc.gevent_increment_channel_count()
 :            File "/usr/local/lib/python3.8/site-packages/gevent/pool.py", line 392, in spawn
 :              greenlet = self.greenlet_class(*args, **kwargs)

I've also included patch for my gevent application:

from gevent import monkey
monkey.patch_all()

import grpc.experimental.gevent as grpc_gevent
grpc_gevent.init_gevent()

So after the above greenlet stuck, my whole application will basically hang.

Note that if I do the stream recognition synchronously without using gevent greenlet, it works fine. However I still prefer to use it in separate thread to improve on latency. I wonder if this is also another grpc incompatibility issue with gevent.

dongzeli95 commented 8 months ago

So upon load testing, I found that if I have 30 concurrent streaming recognize sessions, it would create problem where all threads stuck. I have tried to use both gevent greenlet and native thread pool, nothing helps with this limit.

On the other hand, concurrency under 25 seems to work fine. I've also seen this post arguing about grpc max concurrency settings. I assume we are using grpc underneath this speech recognition package. Is there a way we can increase concurrency on speech client?

Could someone help take a look? Thank you so much!

parthea commented 7 months ago

Hi @dongzeli95 ,

Thanks for reporting this issue. This is potentially related to https://github.com/grpc/grpc/issues/36265, https://github.com/googleapis/python-bigtable/issues/949 and https://github.com/googleapis/google-cloud-python/issues/12423. To confirm, can you try downgrading to grpcio==1.58.0?

dongzeli95 commented 7 months ago

@parthea Just confirmed that changing dependency onto grpcio==1.58.0 didn't help with this issue. Hope that helps.

embray commented 3 days ago

I have seen a similar issue a number of times on my development server, which we believe might be related to deadlocks that occurred on production too. When I have the gevent monitor thread running I get this:

+--- <Greenlet "Greenlet-1" at 0x7f468274ec00: spawn_greenlets>
 :          Parent: <Hub '' at 0x7f468eaa4720 epoll default pending=0 ref=3 fileno=8 resolver=<gevent.resolver.thread.Resolver at 0x7f46831ad5d0 pool=<ThreadPool at 0x7f468e827610 tasks=1 size=1 maxsize=10 hub=<Hub at 0x7f468eaa4720 thread_ident=0x7f469d874740>>> threadpool=<ThreadPool at 0x7f468e827610 tasks=1 size=1 maxsize=10 hub=<Hub at 0x7f468eaa4720 thread_ident=0x7f469d874740>> thread_ident=0x7f469d874740>
 :          Spawned at:
 :            File ".../app/util/telemetry/cloud_trace.py", line 85, in batch_write_spans
 :              name = self.client.common_project_path(app_id)
 :            File ".../lib/python3.10/functools.py", line 981, in __get__
 :              val = self.func(instance)
 :            File ".../app/util/telemetry/cloud_trace.py", line 61, in client
 :              return google.cloud.trace.TraceServiceClient(credentials=self.credentials)
 :            File "/home/embray/src/talque/talque/tools/talque3/lib/python3.10/site-packages/google/cloud/trace_v2/services/trace_service/client.py", line 640, in __init__
 :              self._transport = transport_init(
 :            File ".../lib/python3.10/site-packages/google/cloud/trace_v2/services/trace_service/transports/grpc.py", line 174, in __init__
 :              self._grpc_channel = channel_init(
 :            File ".../lib/python3.10/site-packages/google/cloud/trace_v2/services/trace_service/transports/grpc.py", line 229, in create_channel
 :              return grpc_helpers.create_channel(
 :            File ".../lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 386, in create_channel
 :              return grpc.secure_channel(
 :            File ".../lib/python3.10/site-packages/grpc/__init__.py", line 2146, in secure_channel
 :              return _channel.Channel(
 :            File ".../lib/python3.10/site-packages/grpc/_channel.py", line 2084, in __init__
 :              cygrpc.gevent_increment_channel_count()
 :            File ".../lib/python3.10/site-packages/gevent/pool.py", line 392, in spawn
 :              greenlet = self.greenlet_class(*args, **kwargs)

A common thread I think I've seen in most of the related issues to this is just some Google API that's using gRPC for transport, called from a concurrent.futures.ThreadPoolExecutor (which, with gevent patching at least, is replaced with greenlets, though I've seen some similar reports that didn't mention gevent at all).

In my case it happens to be coming from the Trace API, but others have reported a similar problem against other APIs, including this one.

I tried a reproduction similar to https://github.com/googleapis/python-bigtable/issues/949#issuecomment-2040332357 but couldn't get the problem to happen so far that way, even though it bears similarity to a simplified version of what's going on in the production server.

I have grpc==1.68.0 for what it's worth.