googleapis / python-pubsub

Apache License 2.0
392 stars 206 forks source link

Publishing hangs with fresh PublisherClient instance #977

Open acolavin opened 1 year ago

acolavin commented 1 year ago

Howdy folks,

We ran across undesirable behavior stemming from this python pubsub library. We found several unintuitive workarounds (described below), and we're sharing the behavior here for the developers' benefit, and for others who run into this behavior.

Briefly, we found that a submitting many jobs of intermediate message size to a nascent PublisherClient instance causes publishing to hang. Smaller or larger messages don't seem to cause hanging. The problematic message size threshold can be altered by changing the Publisher batch settings. Submitting a single job to the PublisherClient to completion also suppresses this behavior, as does checking credentials before submitting the bolus of jobs.

We do see a stack trace that suggests an authentication issue reminiscent of similar issues reported in other related libraries, such as this one, but we believe the issue stems from how pubsub handles credentials and job batching.

All this and more is enumerated in the code for reproducing the behavior.

Also of note: we could only reproduce this behavior on ubuntu. I could not figure out how to reproduce this on a mac.

Environment details

Steps to reproduce

from google.cloud import pubsub

# error prevention methods. Set either of these to True and the hanging does not occur.
suppress_hang_method_1 = False # Set to True to check permissions before sending jobs
suppress_hang_method_2 = False # Set to True to wait for the first job future to resolve before submitting other jobs

# replace with real topic
topic = 'projects/<project-name>/topics/<topic-name>' 

# With default batch settings, we find that a job size between 0.5-1.5MB triggers the behavior. 
# With smaller `max_bytes` batching, the problematic job size is between 0.01-2MB
# See the bottom of this script for other combinations of batch and message sizes that cause issues.
max_bytes = 1_000_000 # This is default value
message_size = 1_000_000 # in bytes, roughly

message = b'a' * message_size

print(f"Message size is {message.__sizeof__()/1000./1000.}MB")
print(f"Batch size is {max_bytes}")

# If you change max_bytes to 80, the problematic job size is smaller. 
publisher = pubsub.PublisherClient(
    pubsub.types.BatchSettings(
        max_bytes = max_bytes,
    ))

if suppress_hang_method_1:
    # checking permissions using the publisher instance somehow prevents the job publishing from hanging
    permissions_to_check = ["pubsub.topics.publish", "pubsub.topics.update"]

    allowed_permissions = publisher.test_iam_permissions(
        request={"resource": topic, "permissions": permissions_to_check}
    )

futures = list()
for _ in range(100): # submit 100 messages

    if suppress_hang_method_2 and len(futures)==1:
        # allowing first message future to resolve prevents hanging
        futures[0].result()

    futures.append(publisher.publish(topic=topic, data=message))

print(f"The future result is: {futures[0].result()}") # this is what hangs indefinitely

# batch/message size combinations that cause or don't cause hanging:
nonproblematic_batch_message_combinations = [(1_000_000, 10), # (max_bytes, message_size)
                                             (1_000_000, 100),
                                             (1_000_000, 1_000),
                                             (1_000_000, 300_000),
                                             (1_000_000, 3_000_000),
                                             (1_000_000, 6_000_000),
                                             (100, 1),
                                             (100, 5_000_000)] # notice that very large messages work fine

problematic_batch_message_combinations = [(1_000_000, 500_000), 
                                          (1_000_000, 1_000_000),
                                          (1_000_000, 1_500_000),
                                          (100, 100),
                                          (100, 1_000_000)]

Code example

# example

Stack trace

Traceback (most recent call last):
  File "/mnt/jupyter-disk/.poetry-envs/***lib/python3.8/site-packages/requests/adapters.py", line 486, in send
    resp = conn.urlopen(
  File "/mnt/jupyter-disk/.poetry-envs/***/lib/python3.8/site-packages/urllib3/connectionpool.py", line 798, in urlopen
    retries = retries.increment(
  File "/mnt/jupyter-disk/.poetry-envs/***/lib/python3.8/site-packages/urllib3/util/retry.py", line 592, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='oauth2.googleapis.com', port=443): Max retries exceeded with url: /token (Caused by SSLError(SSLZeroReturnError(6, 'TLS/SSL connection has been closed (EOF) (_ssl.c:1131)')))

Subsequent attempts to submit jobs in the same python thread will eventually yield the following traceback:

The above exception was the direct cause of the following exception:

RetryError                                Traceback (most recent call last)
Cell In[12], line 1
----> 1 first_future.result() # this hangs

File /mnt/jupyter-disk/.poetry-envs/***/lib/python3.8/site-packages/google/cloud/pubsub_v1/publisher/futures.py:66, in Future.result(self, timeout)
     48 def result(self, timeout: Union[int, float] = None) -> str:
     49     """Return the message ID or raise an exception.
     50 
     51     This blocks until the message has been published successfully and
   (...)
     64             call execution.
     65     """
---> 66     return super().result(timeout=timeout)

File /usr/lib/python3.8/concurrent/futures/_base.py:437, in Future.result(self, timeout)
    435     raise CancelledError()
    436 elif self._state == FINISHED:
--> 437     return self.__get_result()
    439 self._condition.wait(timeout)
    441 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File /usr/lib/python3.8/concurrent/futures/_base.py:389, in Future.__get_result(self)
    387 if self._exception:
    388     try:
--> 389         raise self._exception
    390     finally:
    391         # Break a reference cycle with the exception in self._exception
    392         self = None

File /mnt/jupyter-disk/.poetry-envs/***/lib/python3.8/site-packages/google/cloud/pubsub_v1/publisher/_batch/thread.py:274, in Batch._commit(self)
    271 batch_transport_succeeded = True
    272 try:
    273     # Performs retries for errors defined by the retry configuration.
--> 274     response = self._client._gapic_publish(
    275         topic=self._topic,
    276         messages=self._messages,
    277         retry=self._commit_retry,
    278         timeout=self._commit_timeout,
    279     )
    280 except google.api_core.exceptions.GoogleAPIError as exc:
    281     # We failed to publish, even after retries, so set the exception on
    282     # all futures and exit.
    283     self._status = base.BatchStatus.ERROR

File /mnt/jupyter-disk/.poetry-envs/***/lib/python3.8/site-packages/google/cloud/pubsub_v1/publisher/client.py:267, in Client._gapic_publish(self, *args, **kwargs)
    265 def _gapic_publish(self, *args, **kwargs) -> "pubsub_types.PublishResponse":
    266     """Call the GAPIC public API directly."""
--> 267     return super().publish(*args, **kwargs)

File /mnt/jupyter-disk/.poetry-envs/***/lib/python3.8/site-packages/google/pubsub_v1/services/publisher/client.py:831, in PublisherClient.publish(self, request, topic, messages, retry, timeout, metadata)
    826 metadata = tuple(metadata) + (
    827     gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)),
    828 )
    830 # Send the request.
--> 831 response = rpc(
    832     request,
    833     retry=retry,
    834     timeout=timeout,
    835     metadata=metadata,
    836 )
    838 # Done; return the response.
    839 return response

File /mnt/jupyter-disk/.poetry-envs/***/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py:113, in _GapicCallable.__call__(self, timeout, retry, *args, **kwargs)
    110     metadata.extend(self._metadata)
    111     kwargs["metadata"] = metadata
--> 113 return wrapped_func(*args, **kwargs)

File /mnt/jupyter-disk/.poetry-envs/***/lib/python3.8/site-packages/google/api_core/retry.py:349, in Retry.__call__.<locals>.retry_wrapped_func(*args, **kwargs)
    345 target = functools.partial(func, *args, **kwargs)
    346 sleep_generator = exponential_sleep_generator(
    347     self._initial, self._maximum, multiplier=self._multiplier
    348 )
--> 349 return retry_target(
    350     target,
    351     self._predicate,
    352     sleep_generator,
    353     self._timeout,
    354     on_error=on_error,
    355 )

File /mnt/jupyter-disk/.poetry-envs/***/lib/python3.8/site-packages/google/api_core/retry.py:207, in retry_target(target, predicate, sleep_generator, timeout, on_error, **kwargs)
    203     next_attempt_time = datetime_helpers.utcnow() + datetime.timedelta(
    204         seconds=sleep
    205     )
    206     if deadline < next_attempt_time:
--> 207         raise exceptions.RetryError(
    208             "Deadline of {:.1f}s exceeded while calling target function".format(
    209                 timeout
    210             ),
    211             last_exc,
    212         ) from last_exc
    214 _LOGGER.debug(
    215     "Retrying due to {}, sleeping {:.1f}s ...".format(last_exc, sleep)
    216 )
    217 time.sleep(sleep)

RetryError: Deadline of 600.0s exceeded while calling target function, last exception: 504 Deadline Exceeded
mukund-ananthu commented 1 day ago

acolavin

I ran the code provided. I was not able to reproduce the issue with the latest version of the client library:

python3 repro_batch.py
Message size is 1.000033MB
Batch size is 1000000
The future result is: 12919415560370439

I tried both from google.cloud import pubsub that you use in your code and and from google.cloud import pubsub_v1 , that is the recommended import for publishes. Could you please let me know if you're still facing the issue / consistently able to reproduce it?