googleapis / google-cloud-cpp

C++ Client Libraries for Google Cloud Services
https://cloud.google.com/
Apache License 2.0
544 stars 362 forks source link

Sharing a pool of gRPC threads/connections with N topics for Pub/Sub? #13707

Open anthonyalayo opened 6 months ago

anthonyalayo commented 6 months ago

What component of google-cloud-cpp is this feature request for? Pub/Sub

Is your feature request related to a problem? Please describe. Thanks for a great product. I've been looking at your samples for using the Pub/Sub publisher class. There is one example in particular where it shows you how to pass in your own thread pool: https://cloud.google.com/cpp/docs/reference/pubsub/latest/classgoogle_1_1cloud_1_1pubsub_1_1Publisher#thread-safety

I had some suspicions about gRPC connections and the like, so I dug into the source code.

Each call of MakePublisherConnection is still tied to a topic. It is unclear if can use the same thread pool for multiple calls to MakePublisherConnection or not. Inside MakePublisherConnection I see gRPC channels being connected per thread, so I'm assuming that it's not expected to use the same thread pool across multiple topics?

That leaves me to my point of confusion. What is the expected usage for a scenario that I would assume is common:

  1. You have multiple topics you publish to
  2. You want a pool of gRPC connections that can be shared across all topics

Describe the solution you'd like I would like a sample to document that, or even have this issue be the documentation.

Describe alternatives you've considered I am currently iterating on ways I can play with the API to do what I expect, but it isn't ideal.

dbolduc commented 6 months ago

While not the Pub/Sub expert, let me jump in and confirm a few of your suspicions.

Each call of MakePublisherConnection is still tied to a topic.

Yes.

It is unclear if can use the same thread pool for multiple calls to MakePublisherConnection or not. Inside MakePublisherConnection I see gRPC channels being connected per thread, so I'm assuming that it's not expected to use the same thread pool across multiple topics?

So there are two things here:

  1. The thread pool servicing the CompletionQueue
  2. The pool of gRPC channels

For 1, the thread pool servicing the CompletionQueue can be shared across connections, by using the GrpcCompletionQueueOption, as you found in the documentation.

For 2, the pool of gRPC channels cannot be reused by Publishers that are tied to different topics, with the current API. Although this is a totally reasonable use case.


For the sake of completeness, I should point out that the BlockingPublisher lets you reuse the gRPC channel pool for different topics. You could make one BlockingPublisherConnection and share it across multiple clients, providing a different Topic in each call to Publish(...).

https://cloud.google.com/cpp/docs/reference/pubsub/latest/classgoogle_1_1cloud_1_1pubsub_1_1BlockingPublisher#classgoogle_1_1cloud_1_1pubsub_1_1BlockingPublisher_1a37e3aae927a3fb1fa2e83f1663d621a1

Of course, you would be using a blocking API. Maybe that is not good enough for your use case.

anthonyalayo commented 6 months ago

Solid reply, thanks as usual @dbolduc. I think an easy workaround for now would be

  1. Use the BlockingPublisher, setting up one BlockingPublisherConnection
  2. Have it called on a pool of background threads that I manage

I'll go with that for now, but what do you think about this?

For 2, the pool of gRPC channels cannot be reused by Publishers that are tied to different topics, with the current API. Although this is a totally reasonable use case.

Perhaps that could be discussed among maintainers for a future release?

dbolduc commented 6 months ago

Perhaps that could be discussed among maintainers for a future release?

Definitely. I will start the brainstorming of ways for the fancy Publisher to support sharing gRPC channel pools...

Option A : Override the topic via a call option

At some point we decided not to add per-call options for the Publisher. https://github.com/googleapis/google-cloud-cpp/issues/7689#issuecomment-1278214607

We could add the equivalent of pubsub::SubscriptionOption, except it would be pubsub::TopicOption. What I don't know is if an option to override the topic makes any sense in the case of something like a message ordering publisher. I would have to defer to the pubsub experts.

Note that Options are not the only way to do this. We could also add the equivalent of a bigtable::Table::WithNewTarget(...) to change the resource. Plumbing it through the many layers of pubsub might be tricky though.

Option B: offer a way to provide a channel pool to MakePublisherConnection

If changing the topic on the fly is something we want to disallow, we would need a way to inject an existing channel pool into MakePublisherConnection.

e.g. something like:

class GrpcChannelPool {
 private:
  // Only accessed through internal library functions
  std::vector<std::shared_ptr<grpc::Channel>> channels_;
  // I think we need to hold this too.
  std::shared_ptr<GrpcAuthenticationStrategy> auth_;
};

// Supply your own gRPC channel pool for the connection to use.
struct PubSubChannelPoolOption {
  using Type = GrpcChannelPool;
};

// Returns an object that holds a channel pool, to be given to a `PublisherConnection`.
GrpcChannelPool MakeChannelPool(Options options);

MakeChannelPool() would do something along the lines of this CreateGrpcChannel(): https://github.com/googleapis/google-cloud-cpp/blob/af29661d19edd95bab7a0db22d436219490f713b/google/cloud/pubsub/internal/publisher_stub_factory.cc#L71

The stub factory would look for the presence of the GrpcChannelPoolOption and use the channels therein if the option is present instead of creating new channels.

anthonyalayo commented 6 months ago

Both sound reasonable in my opinion. When I first started fiddling with the API, I expected something like Option B (since the nuances around Option A are unclear to me too).

coryan commented 6 months ago

This document covers some of the questions here: https://cloud.google.com/cpp/docs/background-threads (if nothing else it may help the search engines find it in the future).

On the channels: keep in mind that gRPC shares sockets under the hood, that is, different channels use the same socket if (1) the use the same endpoint, (2) they use the same authentication / credentials, (3) the have the same channel attributes, and (4) they are configured to use the global pool of sockets (I may have forgotten some other condition, but you get the idea). There are some tradeoffs. The last condition requires some synchronization overhead.

anthonyalayo commented 6 months ago

This document covers some of the questions here: https://cloud.google.com/cpp/docs/background-threads (if nothing else it may help the search engines find it in the future).

I was looking for something like this and couldn't find it. Appreciate this!

anthonyalayo commented 6 months ago

In case it helps, I ended up going with something like this in the interim:

auto PubSubPublisher::makePublisherConnection( const std::string & topic ) -> std::shared_ptr<g::pubsub::PublisherConnection>
{
    // use our own completion queue / thread pool for all pub/sub connections
    // in the future, we may be able to specify a gRPC connection pool as well
    // https://github.com/googleapis/google-cloud-cpp/issues/13707
    auto publisherConnection = g::pubsub::MakePublisherConnection(
        g::pubsub::Topic( kProjectId, topic ),
        g::Options{}
            .set<g::GrpcCompletionQueueOption>( mCq )
            .set<g::pubsub::MaxHoldTimeOption>( kMaxHoldTime )
            .set<g::pubsub::MaxBatchBytesOption>( kMaxBatchBytes )
            .set<g::pubsub::MaxBatchMessagesOption>( kMaxBatchMessages ) );

    return publisherConnection;
}

void PubSubPublisher::publish( const std::string & topic, const google::protobuf::Message & message )
{
    // store a map of topic connections
    // in the future, we may be able to use a single publisher connection for multiple topics
    // https://github.com/googleapis/google-cloud-cpp/issues/13707
    if ( !mTopicConnections.contains( topic ) )
    {
        mTopicConnections[topic] = makePublisherConnection( topic );
    }

    auto publisher = g::pubsub::Publisher( mTopicConnections[topic] );

    g::pubsub::MessageBuilder messageBuilder{};
    messageBuilder.SetData( message.SerializeAsString() );

    publisher
        .Publish( std::move( messageBuilder ).Build() )
        .then( [topic]( g::future<g::StatusOr<std::string>> future )
               { handlePublishResponse( std::move( future ), topic ); } );
}

Where I'm maintaining a map of publisher connections per topic in order to keep using the async API.

anthonyalayo commented 3 months ago

@dbolduc @coryan how does it look for adding this to the road map?

coryan commented 3 months ago

@scotthart as I no longer define the roadmap on this project.

scotthart commented 3 months ago

We'll take this feature request into consideration in our upcoming planning, but I cannot commit to any time frame at present. If/when we decide to work on this feature, we'll update this issue accordingly.