googleapis / python-pubsublite

Apache License 2.0
19 stars 12 forks source link

RequireStarted sets the _started flag to True on __enter__, but never clears it on __exit__. #468

Open kangelov opened 8 months ago

kangelov commented 8 months ago

Environment details

Steps to reproduce

  1. Write a method that publishes a message to a topic within a context manager within the method.
  2. Call that method.
  3. Call that method again. <Error that we are already in a context manager: "enter called twice.">

Code example


from google.cloud.pubsub_v1.types import BatchSettings
from google.cloud.pubsublite.types import CloudRegion, CloudZone, TopicPath, MessageMetadata
from google.cloud.pubsublite.cloudpubsub import AsyncPublisherClient

class AbstractPublisher:

    def __init__(self, configuration):
        self._configuration = configuration

        location = CloudRegion(self._configuration.region)
        if not self._configuration.regional:
            location = CloudZone(location, self._configuration.zone_id)

        # This topic obviously needs to exist on GCloud already: it needs to be defined beforehand.
        self.topic_path = TopicPath(os.getenv("GOOGLE_CLOUD_PROJECT"), location, self._configuration.topic_id)

        self._publisher = AsyncPublisherClient(
            per_partition_batching_settings=BatchSettings(
                # These are used for message flow control: messages being published within max_latency sec of each other
                # are batched and sent in with a single request, up to max_messages messages and max_bytes bytes.
                # This is to help limit rapid calls to the backend to once every max_latency in most circumstances.
                max_bytes=self._configuration.max_bytes_num,
                max_latency=self._configuration.max_latency_sec,
                max_messages=self._configuration.max_messages_num
            ),
            enable_idempotence=self._configuration.enable_idempotence
        )

    async def publish_bytes(self, message: bytes, key: str, **kwargs):
        async with self._publisher as publisher:
            message_id = await publisher.publish(self.topic_path, message,
                                                 publish_timestamp=str(datetime.utcnow()),
                                                 message_key=key,
                                                 **kwargs)

#### Stack trace

FailedPrecondition exception at RequireStarted:32 on google.cloud.pubsublite.internal.require_started.py
This on every call to publish_bytes after the first.

Take a look at RequireStarted class, please. Verify if the _started behavior. Thanks!

Kamen