googleapis / google-cloud-go

Google Cloud Client Libraries for Go.
https://cloud.google.com/go/docs/reference
Apache License 2.0
3.76k stars 1.29k forks source link

pubsub: optionally provide a blocking receiver #566

Closed rakyll closed 7 years ago

rakyll commented 7 years ago

Defaulting to a high-performance, callback-oriented asynchronous API to receive messages is an overkill for anyone who is not looking for performance but to simply receive a message.

We can optionally provide a blocking receiver, and document that users should either use Receive or the blocking receiver. ReceiveMsgs always returns an error if Receive has ever called.

func (s *Subscription) ReceiveMsgs(ctx context.Context, msgs []*Message) (n int, err error)

ReceiveMsg blocks until len(msgs) is received or times out.

Receive is very open to erroneous usage (non-current-gorutine-running callbacks are quite anti-pattern in Go; our users are not well trained to deal with Receive), and should only be utilized only by expert users who has very performance critic apps.

/cc @jba

broady commented 7 years ago

What's the use case?

This decision was discussed extensively internally. We can publish some of the reasoning behind the recent changes.

rakyll commented 7 years ago

What's the use case?

The bug report explains the use case. If I have a low-rate subscription model where I don't care about performance, the new pubsub API is complicating my entire program.

Pubsub is a generic-use API with very diverse performance requirements from its users. The existence of the new API is a major benefit for high-rate subscribers but is very complex for the low-rate users. The new Receive is doing this at the cost of being less idiomatic.

Since the debut of this feature, https://github.com/GoogleCloudPlatform/google-cloud-go/issues/571 and https://github.com/GoogleCloudPlatform/google-cloud-go/issues/569 are filed which reflect the initial reason why I filed this issue. Receive should be an opt-in not the default.

matthewvalimaki commented 7 years ago

As my ticket #571 was mentioned here I feel I need to add my comments :) I personally want to have top-notch performance from the library. To the request of this ticket to have blocking mechanism I have no opinion as I have no need for it. I will handle any duplicate messages, errors etc. on my own and I am willing to accept complexities that come with that. I do acknowledge that documentation lacks complete examples making understanding harder. Perhaps such examples could cover blocking and non-blocking.

rakyll commented 7 years ago

+1 to work on providing examples and more comprehensive documentation first. Even the blocking case might be more complicated to achieve with the new API, a copy/paste-able example that addresses a real-world blocking requirement would be good to have. We can always come back to this case if we keep hearing complaints that blocking case is not well handled by the APIs.

bradleyfalzon commented 7 years ago

For my exact use case, I use this service as a continuous integration runner for gopherci.io, but where I work we'd (not currently) use this for video transcoding.

Essentially, we're a single instance handling one message at a time, as each message takes multiple minutes to process and is itself highly concurrent (uses multiple CPU cores). Receiving multiple messages concurrently would only saturate the CPU and potentially leave one instance processing multiple requests and other instances idle.

Ideally, each server should only take one message from the queue at a time (not taking multiple jobs and reserving them), allowing us to:

I hope that makes sense, if this is not the use case Pub/Sub is targeting, then that's fine too.

jba commented 7 years ago

We recommend that users who don't want the high-throughput features use the cloud.google.com/go/pubsub/apiv1 client, which gives them full control over the API.

bradleyfalzon commented 7 years ago

We recommend that users who don't want the high-throughput features use the cloud.google.com/go/pubsub/apiv1 client

Re-implementing the ancillary services such as keep alive and iterator is non-trivial, I am trying this but after a few seconds I also begin to receive rpc error: code = FailedPrecondition desc = grpc: the client connection is closing. I'm not certain using the underlying client is the best possible solution. It seems to have a number of edge cases that need to be handled perfectly. Example here https://gist.github.com/bradleyfalzon/8fa44404bd4854f73ff46001c4870bf0

569 contained a minimal example using the new Receive methods to achieve blocking by setting ReceiveSettings.MaxOutstandingMessages = 1, you've mentioned I'm afraid things will only get worse when the StreamingPull API becomes available., so the example program I provided is likely to behave differently in the near future? If not, then is there any reason I can't use the example program now the bug has been resolved?

Perhaps this is just the bubble I live in, but I can't believe that the use case for a blocking message queue is unusual? The Pub/Sub page https://cloud.google.com/pubsub/ does list Message Queue as a feature, and using the underlying client is non-trivial.

pongad commented 7 years ago

@bradleyfalzon We're still working with the pubsub team on the API issue, but I can comment on your gist specifically.

I think the problem is that runtime.Goexit runs all deferred calls. So it closes the subscriber while the second goroutine is still using it to pull messages. If you just need the main goroutine to hang forever, maybe replace the call with select{}?

bradleyfalzon commented 7 years ago

You got it, thanks a tonne @pongad, can you clarify exactly which API issue you were referring to?

pongad commented 7 years ago

@bradleyfalzon No problem!

can you clarify exactly which API issue you were referring to?

I was referring to the blocking receiver this thread is discussing. We're discussing it, not ignoring you :)

jba commented 7 years ago

@bradleyfalzon, I took your gist as a starting point and wrote this, which pulls one message at a time from a subscription. Since you're processing only one message at a time, keepalive is fairly simple—each time you get a message, just start a goroutine to extend the ack deadline. I found it easier to think about if that goroutine made all the RPCs for the message. Then you never have to worry about a nack and a keepalive en route at the same time, for instance.

bradleyfalzon commented 7 years ago

Thanks @jba, much appreciated.

Because you've provided that snippet, is it my understanding that this package won't provide helpers for this behaviour, as outlined by this issue? Or at least, not in the immediate future (and therefore I should adopt this snippet to ensure my use case continues to work).

jba commented 7 years ago

@bradleyfalzon That's correct, the client at cloud.google.com/go/pubsub will continue to focus on high-throughput use cases.

By the way, I neglected error-handling in the first version of that gist, so please revisit to get the latest.

deckarep commented 7 years ago

I have a use case where @rakyll's original issue still stands. I am generating video transcoding pub messages every time a new file is added to Google Cloud Storage...this means that Google PubSub will send these events as fast as it can to my Compute fleet of instances which want to work on the load in a bounded fashion (otherwise they are easily overwhelmed). Each instance is capable of processing say 5 transcoding messages in parallel but the problem is the current API doesn't seem to provide a way to consume from Google PubSub as needed...this is where a proper blocking api would be nice for receiving subscriptions.

I don't think one can claim that this service maps to Amazon's SQS when this type of behavior is not well supported.

jba commented 7 years ago

Try setting Subscription.ReceiveSettings.MaxOutstandingMessages to 5. Let us know if that isn't what you want.

deckarep commented 7 years ago

Ah yes! I should have circled back on this thread. Yesterday I tried that setting and that's exactly what I needed to bound the work to something usable. My only comment is that this setting should be more prevalent in the docs and also maybe highlight an example of when this situation is appropriate. Thanks for getting back to me @jba!

jba commented 7 years ago

https://code-review.googlesource.com/12930 adds an example.

Not going to close this issue, since I think others might still be unhappy. @rakyll @bradleyfalzon have you guys made your peace with this, or is it still a source of pain?

rakyll commented 7 years ago

@jba, ReceiveSettings significantly improved my case. Thanks for the additional examples.

bradleyfalzon commented 7 years ago

I've also settled on ReceiveSettings for the moment, however, the comment I'm afraid things will only get worse when the StreamingPull API becomes available. in https://github.com/GoogleCloudPlatform/google-cloud-go/issues/569#issuecomment-287763458 leads me to believe this isn't the correct solution for the long term? I tried to clarify this in my comment https://github.com/GoogleCloudPlatform/google-cloud-go/issues/566#issuecomment-288052361 which lead to the recommended solution being the gist https://gist.github.com/jba/9a0d3e6aa2561274cf5d428ae0af2c97.

I still feel having to implement all this ourselves, for what I thought was a relatively normal pattern is a little strange, but the gist does provide the starting point for our own implementation so I'm somewhat satisfied that I can implement this functionality myself now. But the last few messages seemed to suggest using ReceiveSettings is OK - which didn't seem like that was a good long term solution?

pongad commented 7 years ago

@bradleyfalzon The answer is somewhat complex. Please bear with me.

We want to move the client library to streaming eventually, because it will be faster. Unfortunately, in streaming we don't have complete control over the number of messages we pull. For this reason, we cannot say "only pull 1".

If you set ReceiveSettings.MaxOutstandingMessages to 5, the client might pull more than 5 messages, but it will take care to never run more than 5 callbacks concurrently. Limiting the number of callbacks makes sure we don't overwhelm your machines. Since we can pull more messages than we can concurrently process, the latency of some messages will be higher: it's just waiting around and not delivered to other machines. On the other hand, the overall throughput should improve, since your CPU should spend less time idle. If this trade-off works for you, ReceiveSettings is probably the right choice.

On the other hand, if you require that all pulled messages should be immediate processed and no message should be waiting around, the high-performance client unfortunately doesn't support this use case. The gist you linked should work well enough though.

Does this answer your question? Please let me know if I can help more!

bradleyfalzon commented 7 years ago

Thanks @pongad, the response helps clarify the subtle difference. In my case, I certainly only want to pull one message at a time, to leave other systems the ability to handle the remaining messages.

jba commented 7 years ago

Closing this. We've established that the cloud.google.com/go/pubsub package will not support certain use cases, by design, but there are alternatives using lower-level clients.