Open relud opened 5 years ago
For the record, this is my current implementation of an asyncio batch: https://github.com/mozilla/gcp-ingestion/blob/24c1cea/ingestion-edge/ingestion_edge/util.py#L7-L95
I haven't tried this myself, but there's a proposed solution to make publish future act like a concurrent Future: https://github.com/googleapis/google-cloud-python/issues/6201#issuecomment-472155433
Python 2 support is deprecated, but it still needs to be preserved until the end of the year. Any support for asyncio will thus have to wait for at least another 6 months or so.
do we have any update on this
Now that we're past the date when python 2 support is officially dropped, can we have an update on this? Any timeline for the official asyncio support?
I'm sure we're not alone in trying to use google cloud pubsub with asyncio-based libraries.
We just released a new version of Pub/Sub that drops python 2.7 (and 3.5) support: https://github.com/googleapis/python-pubsub/releases/tag/v2.0.0 about a week ago.
Is your feature request related to a problem? Please describe.
I have an
asyncio
application that needs to publish messages to PubSub, but I'm having issues becausegoogle.cloud.pubsub.PublisherClient.publish
:await
orasyncio.wrap_future
Batch._commit
throws an uncaught exception (like in googleapis/google-cloud-python#7103 and googleapis/google-cloud-python#7071)Describe the solution you'd like
I wrote a new
google.cloud.pubsub_v1.publisher._batch.async.Batch
that implementsgoogle.cloud.pubsub_v1.publisher._batch.base.Batch
. It usesasyncio
to provide awaitable futures that automatically propagate exceptions. It uses a sharedconcurrent.futures.ThreadPoolExecutor
in conjunction withasyncio.wrap_future
to asynchronously callBatch.client.publish
while enforcing a maximum number of workers. I specifically only wrappedBatch.client.publish
in a thread because (if i understand correctly) it only blocks on exclusive access to the grpc channel, so it shouldn't create performance issues as seen in the first alternative below.I would like to submit this as a pull request, but only if it would be useful.
Describe alternatives you've considered
google.cloud.pubsub_v1.publisher._batch.thread.Batch
to useconcurrent.futures.ThreadPoolExecutor
. Unfortunately it had performance issues when all workers would reach atime.sleep
and there wouldn't be any workers to check that not yet submitted tasks could be ready.google.cloud.pubsub_v1.futures.Future
to inherit fromconcurrent.futures.Future
. This fixed compatiblity withasyncio.wrap_future
, but not uncaught exceptions and unlimited thread spawning.google.cloud.pubsub_v1.publisher._batch.thread.Batch
to join spawned threads, which would propagate uncaught exceptions, but I was unable to figure out a solution.