googleapis / python-pubsublite

Apache License 2.0
19 stars 12 forks source link

Make client work with gevent monkey patching #328

Open ns-dzhang opened 2 years ago

ns-dzhang commented 2 years ago

Hello,

Steps to reproduce

  1. using tutorial codes : https://cloud.google.com/pubsub/lite/docs/publishing
  2. add two more lines at the beginning:
    from gevent import monkey
    monkey.patch_all()

Code example

from gevent import monkey
monkey.patch_all()

from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# num_messages = 100
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

topic_path = TopicPath(project_number, location, topic_id)

# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
    for message in range(num_messages):
        data = f"{message}"
        # Messages of the same ordering key will always get published to the same partition.
        # When ordering_key is unset, messsages can get published ot different partitions if
        # more than one partition exists for the topic.
        api_future = publisher_client.publish(
            topic_path, data.encode("utf-8"), ordering_key="testing"
        )
        # result() blocks. To resolve api futures asynchronously, use add_done_callback().
        message_id = api_future.result()
        message_metadata = MessageMetadata.decode(message_id)
        print(
            f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
        )

print(
    f"Finished publishing {num_messages} messages with an ordering key to {str(topic_path)}."
)

When running the code with the monkey.patch_all(), publish() hangs.

I debugged a little bit, looks "thread" and "select" patch affect pubsublite. If I didn't patch these two as below, it worked as expected. Not sure if pubsublite is gevent friendly? My case is I need patch all.

from gevent import monkey
monkey.patch_all(thread=False, select=False)

Thanks!

ns-dzhang commented 2 years ago

cc: @ns-xliu @ns-zquan