googleapis / python-pubsub

Apache License 2.0
392 stars 206 forks source link

PublisherClient creation isn't thread safe and hangs if called by concurrent threads #1101

Open matt-mcallister opened 8 months ago

matt-mcallister commented 8 months ago

Environment details

Steps to reproduce

Calling PublisherClient.from_service_account_info() from multiple concurrent threads causes the program to hang.

Code example

import concurrent.futures
from google.cloud.pubsub_v1 import PublisherClient

creds = {
    "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
    "auth_uri": "https://accounts.google.com/o/oauth2/auth",
    "client_email": "",
    "client_id": "",
    "client_x509_cert_url": "",
    "private_key_id": "",
    "project_id": "",
    "token_uri": "https://oauth2.googleapis.com/token",
    "type": "service_account",
    "private_key": ""
}

def run(publisher_number):
    PublisherClient.from_service_account_info(creds)
    print(f'Created publisher {publisher_number}')

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    total_publishers = 10
    futures = []
    for i in range(0, total_publishers):
        futures.append(executor.submit(run, i))

    print(f"Submitted all {total_publishers} to thread executor")
    for f in futures:
        f.result()

Output

$ python test.py
Submitted all 10 to thread executor
^C^\Quit (core dumped)

Stack trace

The program hangs so there is no stack trace, but this is the core dump

$ pystack core core.72129
Using executable found in the core file: /home/temp/bin/python

Core file information:
state: R zombie: True niceness: 0
pid: 72129 ppid: 71086 sid: 71086
uid: 10001 gid: 30001 pgrp: 72129
executable: python arguments: python test.py

The process died due receiving signal SIGQUIT
Traceback for thread 72131 [Has the GIL] (most recent call last):
    (Python) File "/usr/local/lib/python3.10/threading.py", line 973, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.10/threading.py", line 953, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/local/lib/python3.10/concurrent/futures/thread.py", line 83, in _worker
        work_item.run()
    (Python) File "/usr/local/lib/python3.10/concurrent/futures/thread.py", line 58, in run
        result = self.fn(*self.args, **self.kwargs)
    (Python) File "/home/test.py", line 48, in run
        PublisherClient.from_service_account_info(creds)
    (Python) File "/home/temp/lib/python3.10/site-packages/google/pubsub_v1/services/publisher/client.py", line 172, in from_service_account_info
        return cls(*args, **kwargs)
    (Python) File "/home/temp/lib/python3.10/site-packages/google/cloud/pubsub_v1/publisher/client.py", line 139, in __init__
        super().__init__(**kwargs)
    (Python) File "/home/temp/lib/python3.10/site-packages/google/pubsub_v1/services/publisher/client.py", line 712, in __init__
        self._transport = Transport(
    (Python) File "/home/temp/lib/python3.10/site-packages/google/pubsub_v1/services/publisher/transports/grpc.py", line 165, in __init__
        self._grpc_channel = type(self).create_channel(
    (Python) File "/home/temp/lib/python3.10/site-packages/google/pubsub_v1/services/publisher/transports/grpc.py", line 222, in create_channel
        return grpc_helpers.create_channel(
    (Python) File "/home/temp/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 386, in create_channel
        return grpc.secure_channel(
    (Python) File "/home/temp/lib/python3.10/site-packages/grpc/__init__.py", line 2119, in secure_channel
        return _channel.Channel(
    (Python) File "/home/temp/lib/python3.10/site-packages/grpc/_channel.py", line 2046, in __init__
        self._channel = cygrpc.Channel(

Traceback for thread 72130 [] (most recent call last):
    (Python) File "/usr/local/lib/python3.10/threading.py", line 973, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.10/threading.py", line 953, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/local/lib/python3.10/concurrent/futures/thread.py", line 83, in _worker
        work_item.run()
    (Python) File "/usr/local/lib/python3.10/concurrent/futures/thread.py", line 58, in run
        result = self.fn(*self.args, **self.kwargs)
    (Python) File "/home/test.py", line 48, in run
        PublisherClient.from_service_account_info(creds)
    (Python) File "/home/temp/lib/python3.10/site-packages/google/pubsub_v1/services/publisher/client.py", line 172, in from_service_account_info
        return cls(*args, **kwargs)
    (Python) File "/home/temp/lib/python3.10/site-packages/google/cloud/pubsub_v1/publisher/client.py", line 139, in __init__
        super().__init__(**kwargs)
    (Python) File "/home/temp/lib/python3.10/site-packages/google/pubsub_v1/services/publisher/client.py", line 712, in __init__
        self._transport = Transport(
    (Python) File "/home/temp/lib/python3.10/site-packages/google/pubsub_v1/services/publisher/transports/grpc.py", line 165, in __init__
        self._grpc_channel = type(self).create_channel(
    (Python) File "/home/temp/lib/python3.10/site-packages/google/pubsub_v1/services/publisher/transports/grpc.py", line 222, in create_channel
        return grpc_helpers.create_channel(
    (Python) File "/home/temp/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 386, in create_channel
        return grpc.secure_channel(
    (Python) File "/home/temp/lib/python3.10/site-packages/grpc/__init__.py", line 2119, in secure_channel
        return _channel.Channel(
    (Python) File "/home/temp/lib/python3.10/site-packages/grpc/_channel.py", line 2046, in __init__
        self._channel = cygrpc.Channel(
    (Python) File "/usr/local/lib/python3.10/pkgutil.py", line 639, in get_data
        return loader.get_data(resource_name)
    (Python) File "<frozen importlib._bootstrap_external>", line 1073, in get_data

Traceback for thread 72129 [] (most recent call last):
    (Python) File "/home/test.py", line 60, in <module>
        f.result()
    (Python) File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 453, in result
        self._condition.wait(timeout)
    (Python) File "/usr/local/lib/python3.10/threading.py", line 320, in wait
        waiter.acquire()
pedrorjbr commented 6 months ago

I suppose that I am suffering from the same problem. https://stackoverflow.com/questions/78337450/publishing-message-to-gcp-pubsub-from-a-fastapi-app is there any workaround?

As I am using the PublisherClient from CloudRun, I can´t find way to reuse the same PublisherClient. It seems a so basic use case.

florianorpeliere commented 6 months ago

Hello,

I have exactly the same problem. I use the PublisherClient in an Apache Beam (Dataflow) job and I noticed mutual blocking problems.

I have a fairly simple way to reproduce it:

import threading
from google.cloud import pubsub_v1

def pubsub():
    for i in [1, 2, 3, 4, 5]:
        print(i)
        pubsub_v1.PublisherClient()

t1 = threading.Thread(target=pubsub)
t2 = threading.Thread(target=pubsub)
t1.start()
t2.start()
t1.join()
t2.join()

Producing:

$ python block.py
1
1
^C^C^C^C
[1]    63732 terminated  python3 block.py

OS: macOS 14.4.1 (23E224) - M2 Pro Python: Python 3.12.1 google-cloud-pubsub: 2.21.1

I hope this will help you. If there is a need to open another issue, no problem.

Thank you in advance for your help.

mukund-ananthu commented 4 days ago

Hi, is there a reason you would want to create multiple client library instances per thread, instead of creating a single client library instance and re-using it across multiple threads?

As I am using the PublisherClient from CloudRun, I can´t find way to reuse the same PublisherClient. It seems a so basic use case.

When the Cloud Run instance instantiates, could you create a single instance of the client library, and re-use it across the lifetime of the Cloud run container instance?

XuanWang-Amos commented 3 days ago

I don't have the setup to test PublisherClient with credentials so I changed run to the following:

def run(publisher_number):
    # PublisherClient.from_service_account_info(creds)
    PublisherClient()
    print(f'Created publisher {publisher_number}')

Using code example, I'm not able to reproduce the issue in python:3.10.13-bookworm docker image, maybe it's fixed in latest version?

XuanWang-Amos commented 3 days ago

Tried again just for grpc.secure_channel, I changed our example so that I can test creating multiple secure channel in different thread using composite_channel_credentials similar to what PublisherClient and python-api-core is doing:

import argparse
import contextlib
import logging

import concurrent

import _credentials
import grpc

helloworld_pb2, helloworld_pb2_grpc = grpc.protos_and_services(
    "helloworld.proto"
)

_LOGGER = logging.getLogger(__name__)
_LOGGER.setLevel(logging.INFO)

_SERVER_ADDR_TEMPLATE = "localhost:%d"

@contextlib.contextmanager
def create_client_channel(addr):
    # Call credential object will be invoked for every single RPC
    call_credentials = grpc.access_token_call_credentials(
        "example_oauth2_token"
    )
    # Channel credential will be valid for the entire channel
    channel_credential = grpc.ssl_channel_credentials(
        _credentials.ROOT_CERTIFICATE
    )
    # Combining channel credentials and call credentials together
    composite_credentials = grpc.composite_channel_credentials(
        channel_credential,
        call_credentials,
    )
    channel = grpc.secure_channel(addr, composite_credentials)
    yield channel

def send_rpc(channel):
    stub = helloworld_pb2_grpc.GreeterStub(channel)
    request = helloworld_pb2.HelloRequest(name="you")
    try:
        response = stub.SayHello(request)
    except grpc.RpcError as rpc_error:
        _LOGGER.error("Received error: %s", rpc_error)
        return rpc_error
    else:
        _LOGGER.info("Received message: %s", response)
        return response

def run(port):
    with create_client_channel(_SERVER_ADDR_TEMPLATE % port) as channel:
        send_rpc(channel)

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--port",
        nargs="?",
        type=int,
        default=50051,
        help="the address of server",
    )
    args = parser.parse_args()

    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        total_publishers = 10
        futures = []
        for i in range(0, total_publishers):
            futures.append(executor.submit(run, args.port))

        print(f"Submitted all {total_publishers} to thread executor")
        for f in futures:
            f.result()

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    main()

And still not able to reproduce.