facebook / wangle

Wangle is a framework providing a set of common client/server abstractions for building services in a consistent, modular, and composable way.
Apache License 2.0
3.05k stars 539 forks source link

ClientDispatcherBase added asynchronously does not get added to read pipeline #103

Closed enis closed 1 month ago

enis commented 7 years ago

In Apache HBase, we are doing a C++ client using wangle (HBASE-14850). We used to have the pipeline + ClientDispatcher setup in a blocking call, which we are changing to be async.

We create a ClientDispatcher (which is a subclass of wangle::ClientDispatcherBase) in the callback from ClientBootstrap::connect(), and then use this dispatcher to send RPCs, but I noticed is that this dispatcher then can be used to send requests, but it is not added to the pipeline for the responses. We can use the dispatcher to send the requests, but when the server replies back, the last handler does not pass the message back to the dispatcher, and logs a warning that it reached end of pipeline (https://github.com/facebook/wangle/blob/master/wangle/channel/HandlerContext-inl.h#L168).

Can you guys spot anything wrong with this logic? Example code (and our previous code) from client side uses Future::get() before creating the ClientDispatcher which works.

folly::Future<std::shared_ptr<HBaseService>> ConnectionFactory::Connect(
     std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> client, const std::string &hostname,
     uint16_t port) {
       auto promise = std::make_shared<folly::Promise<std::shared_ptr<HBaseService>>>();
        client->connect(
               folly::SocketAddress(hostname, port, true),
               std::chrono::duration_cast<milliseconds>(connect_timeout_))
                               .then([=](SerializePipeline *pipeline) mutable {
                 auto dispatcher = std::make_shared<ClientDispatcher>(hostname + ":" + folly::to<std::string>(port));
                dispatcher->setPipeline(pipeline);
                pipeline->finalize();
                promise->setValue(dispatcher);
           });

       return promise->getFuture(); 
}

// Further RPCs are send with something like this: 
ConnectionFactory::Connect(...).then( // send the RPC )
// the RPC response from server however does not pass the message back to the ClientDispatcher.  

Appreciate the help.