grpc / grpc-java

The Java gRPC implementation. HTTP/2 based RPC
https://grpc.io/docs/languages/java/
Apache License 2.0
11.42k stars 3.84k forks source link

Allow for batching writes to the framer #502

Open louiscryan opened 9 years ago

louiscryan commented 9 years ago

Need to allow applications to perform a sequence of writes that cause a single flush in the framer to improve throughput.

The typical example for this would be an application thats wants to write messages until isReady() is false and to do more writes when onReady() is called. Even if the application is not flow-control aware (i.e is not using isReady) it would still be useful to allow write batching for bursty streams

One simple option for doing this would be to delay the outbound framer flush while executing onPayload/isReady callbacks though this would only help cases where sends are done inside these callbacks by the same thread. A more thorough API change is probably warranted.

To give some performance context the change described above allows the FlowControlledMessagesPerSecondBenchmark to go from ~700kqps to ~4Mqps on my box

ejona86 commented 9 years ago

Let's brainstorm various options.

Add flush to Call:

public abstract void setAutoFlush(boolean);
public abstract void flush();

Add an inhibit (a la TCP_CORK) to Call:

public abstract void inhibitFlush(boolean);

On the Channel, provide a way to inhibit. This could be a inhibit/allow pair of methods. It must be more powerful than a boolean, though, since more than one could be active at a time. Or it could be we return a special object that is completed when the application no longer wants to inhibit:

public abstract Inhibit inhibit();

public abstract class Inhibit {
  public abstract void complete();
}

Note that adding an API to the Channel is not really generalizable to Server-side, unless we add a way to get the Server instance from a ServerCall.

louiscryan commented 9 years ago

Other options...

  1. Make Call a 'batch' factory ...
public abstract Batch batch();

interface Batch extends AutoCloseable {
  public boolean sendMessage(...);  // return value is equivalent to isRead() call
}

The quirk here is the semantics of 'close', i.e. if a batch is not closed is it not written or just not flushed. This would generalize to the server side.


I like explicit flush calls better than inhibit. If we go with that model I suspect it would actually make the transport stream implementations a little simpler than they are today as they don't have to make their own flush decisions.

ejona86 commented 9 years ago

How would Batch interact with interceptors? It seems like it would be important to make batching orthogonal to the rest of the APIs; it seems important to be able to use the same interceptors, stubs, and reuse your own code independent of batching.

louiscryan commented 9 years ago

If batch is simply a deferred sequence of writes then it really has little impact on interceptors. The primary concern is to ensure a proper closure of message production so that flush can be sent reliably.

I don't really like

sendMessage(Collection messages)

though it's similar to Batch in some ways it doesn't allow for feedback from isReady().

sendMessage(Iterator messages)

would provide the closure and allow for the lazy production of messages and the isReady() check could be done implicitly inside our impl (I kinda like this one)

Then there's also

boolean sendMessage(message, flush)

where the return value is simply isReady() which would allow for writing loops like

do (

} while (sendMessage(msg, false)); stream.flush(); The use of onReady (and by implication onPayload etc.) to implement the closure over flush works reasonably well though this interface pattern would need propagate higher to be effective.
ejona86 commented 9 years ago

I don't like sendMessage(Collection messages) and sendMessage(Iterator messages). I agree using a Collection prevents any flow control feedback. Iterator turns message creation into a callback, which seems unnecessary and harder to use. It would also require having a different method on Call, and so would be a burden to interceptors.

boolean sendMessage(message, flush) could work, although it is little different than having a flush() method that is required to be called, with the exception of the possibility of less synchronization. Since there would be many callers over time, seeing the true/false could easily be a readability issue (not knowing if it is flushing or not). Granted, not having the parameter could be a readability issue by omission (not noticing it isn't flushing), but people are pretty familiar with the need to flush on things like OutputStreams. I guess we could have sendMessage(message) always call sendMessage(message, true), and so the second parameter is typically missing, explicit flushing unnecessary, and there would be few calls with the boolean argument. We could make an enum FLUSH/NO_FLUSH instead of boolean if we really wanted.

louiscryan commented 9 years ago

OK. I think we're largely in agreement on

void sendMessage(msg) --> which calls sendMessage(msg, true) boolean sendMessage(msg, flush)

I'll create a PR for that and see how it feels

@benjchristensen - Not sure if this has come up in discussions in reactive streams io land, i.e. how a variable rate message producer reacts to changes in transport flow-control when the writeable window is expressed as bytes not messages.

ejona86 commented 9 years ago

@louiscryan, earlier you were considering channel-wide flush reduction. Are you still interested in that?

louiscryan commented 9 years ago

channel-wide flush reduction is largely handled by the shared write-queue. The proposed change here only affects transport flushes in so much as fewer flushes emanate from the framer.

This change ensures that message sequences are batched into single DATA frames so that the receiving side can parse as a unit and wake the application layer less often. An alternate way to achieve this effect is to have the transport layer flow-controller coalesce writes into a single DATA frame under flush. It also prevents the framer from having to allocate another buffer.

@nmittler

I can try to implement handling for that at the transport layer by (a) - Essentially making the flow-controller writes deferred so that they pack into DATA frames (b) - Having the framer slice the buffer it reports to the transport so it can retain the unused portion

In the case of Netty this would largely mean making sure that we can properly write out a CompositeBuffer into the flow-controller and have its chunks be released properly.

I suspect this will probably not perform as well as explicit batching on an individual stream as the write queue would have to compose individual writes across streams.

ejona86 commented 9 years ago

Ah @louiscryan, that makes sense, that it is only for the framer's benefit. Gotcha.

louiscryan commented 9 years ago

Ill test out the DATA frame merging in Netty and see how it stacks up

nmittler commented 9 years ago

@louiscryan I think having the outbound flow controller coalesce data frames makes sense in general. The only sticky bit that comes to mind is what to do about padding if it's used. For gRPC we're not using it, but a general solution would have to take padding into consideration. Maybe it's simple: we just coalesce adjacent frames if they have no padding.

benjchristensen commented 9 years ago

Not sure if this has come up in discussions in reactive streams io land, i.e. how a variable rate message producer reacts to changes in transport flow-control when the writeable window is expressed as bytes not messages.

Yes the discussion came up a few times and was specifically discussed here: https://github.com/reactive-streams/reactive-streams-jvm/issues/47

We recently implemented a solution with Netty here: https://github.com/ReactiveX/RxNetty/blob/79cd68e01c8fc1139c71ec44b73d53f6b571ec3f/rxnetty/src/main/java/io/reactivex/netty/protocol/tcp/BackpressureManagingHandler.java This correctly stops the upstream Observable from producing and then resumes based on HTTP/TCP network backpressure.

Another example with Aeron is here: https://gist.github.com/benjchristensen/48f17210a674bf38329d#file-rxaeronexample-java-L114 This is somewhat of a toy example but shows how it stops production when backpressure from the network occurs and resumes upstream production when it is able to emit. It also shows the conversion from application object to bytes.

With Reactive Streams implementations over networks the general idea is that the application level requests n items, and the network level converts an item into bytes and then deals with the network level backpressure. This means we can have bounded buffers for the number of items, but need at least one unbounded buffer (or the ability to partially serialize the item with pause/resume etc) to store a single serialized item while writing and/or waiting to write over the network.

Another way of considering this is that the "async writer" is an asynchronous boundary which has its own internal bounded buffer of items and invokes request(n) upstream to allow production as it is capable of receiving. Then it writes out and deals with the conversion to bytes as the network flow-control permits it. As it drains its bounded buffer of items it requests more upstream. With this pattern it naturally bridges the two and correctly stops production if the network can't keep up.

louiscryan commented 9 years ago

@benjchristensen thanks for the pointers. I note the issue with an unbounded buffer vs partial serialization (which is probably rare for anything other than byte-sequence types). Or put another way its hard for the transport layer to do anything other than

request(1)

as it has no idea how large the next produced message will be (hinting and heuristics aside) e.g. https://gist.github.com/benjchristensen/48f17210a674bf38329d#file-rxaeronexample-java-L89

GRPC chose to model this as onReady() rather than request(n) for this reason. In either case you have a cycle which looks like

request(1) --> onValue --> transport write msg --> request(1) if more window available --> ...

which on the read side leads to

transport read msg --> onValue --> transport read next msg --> onValue --> ...

This can perform pretty well but theres a problem if the transport can't coalesce message writes in such a way that the read-side can take advantage of the fact the more than one message is coming off the wire at a time so it can do

transport read many messages --> loop over onValue

In short, don't write in such a way that the read side can't dispatch batches. This is particularly important if the --> implies a thread boundary being crossed

See https://github.com/netty/netty/issues/3852 for just such an issue in Netty's HTTP2 stack.

ejona86 commented 8 years ago

@louiscryan, this issue is almost the same as #994. Do we want to close one?

dsethi commented 5 years ago

Is this implemented? Reposting the other comment here: my use case is sending a large number of small protobufs over rpc -- I need to batch them for throughput. Sample python client code -- which does not work (I.e. no speedup by changing any of the option values) --

channel = grpc.insecure_channel('localhost:2020', options=[('grpc.max_concurrent_streams', 1),
                                                           ('grpc.max_send_message_length', 1 * 1024 * 1024),
                                                           ('grpc.max_receive_message_length', 1 * 1024 * 1024),
                                                           ('grpc.http2.max_frame_size', 16777215),
                                                           ('grpc.http2.write_buffer_size', 10 * 1024 * 1024),
                                                           ('WriteFlag.buffer_hint', True)

Am I missing something?

ejona86 commented 5 years ago

@dsethi, no this is not implemented (main issue is API design). Also, this issue is only for Java, not Python.

dsethi commented 5 years ago

Thanks a lot @ejona86 -- so just to understand a bit better: for python, can I control the write buffer size to do batching? And for java, I can not. So -- the available way to do batching is at the application level by writing batch proto schemas?

ejona86 commented 5 years ago

I don't know the status for Python. You should ask on the grpc/grpc repository. I would suggest requesting the feature and then, yes, you can use a "batched" message with repeated messages inside.

lidizheng commented 5 years ago

@dsethi In C-Core, we won't pack multiple proto messages into one single message frame. But multiple message frame can be sent within a TCP packet. So, the throughput should already be optimized if you can utilize all of your bandwidth with your code.

PS. Since Python layer is slower than C-Core, sending larger messages will reduce the time stays in Python layer, and result in boosted performance.

dsethi commented 5 years ago

@lidizheng -- so in some detail: in my experiment, I have grpc client and server on same machine; and I transfer 1GB of data in two ways: and in case 1, I send 1000 protos of 1MB each, and in case 2, I send 1000,000 protos of 1KB each using rpcs. Case 2 is much slower -- takes ~100 sec vs case 1 is ~3sec. Python vs c does not explain the 100 sec difference. I also tried increasing frame size by : ('grpc.http2.max_frame_size', 16777215) and ('grpc.http2.write_buffer_size', 10 1024 1024) when creating channel. However these didn't help with performance.

I am wondering if grpc/underlying tcp can combine 1000 1 kb protos to 1 MB Frame automatically.

lidizheng commented 5 years ago

@dsethi If you are using C++ or C# the performance gap between your cases will be smaller. Again, it might be a Python issue, not a gRPC issue. Can you profile the time spent in CPython between two of your cases?

gRPC underlying has a write buffer for TCP packets. The write buffer can contain multiple HTTP2 frames. Each message frame is one proto message. So, it can combine protos into TCP packets automatically.

We had a discussion of similar problem in https://github.com/grpc/grpc/issues/19122.

dsethi commented 5 years ago

@lidizheng I can try and write a c++ client to check for this. Also, generally, is there an easy way to change frame size?

I did profiling of python code: I see 9.5 sec spent in "299793 9.531 0.000 9.531 0.000 {method 'acquire' of 'thread.lock' objects}". I have attached the profile as file temp.txt as well. temp.txt temp.txt

Not sure where/why this lock is getting acquired in the rpc implementation?

lidizheng commented 5 years ago

@dsethi In your snippet, I think you got all related channel args. It does change the frame size, and message length. The lock issue has bothered us for a long time, and finally in our recent effort of asyncio stack we might have a chance to solve it.

dsethi commented 5 years ago

thanks @lidizheng -- sure, if I understand you correctly, the lock contention happens if there are multiple RPC calls in flight and they are all async? Is this a python only issue? What is the workaround for this?

Also, just for curiosity: why do we acquire this lock in the implementation?

lidizheng commented 5 years ago

@dsethi It is used for two things: 1) synchronize between the polling thread and application thread, when the polling thread receives a message it will notify the application thread to wake up; 2) work distribution of thread pool on the server-side, underlying concurrent.futures.ThreadPoolExecutor there is a concurrent Queue which almost every operation acquires the lock.

Case 1 applies to both client/server, case 2 applies only to the server. To boost the performance of server, you can set the number of thread to 1 to reduce lock contention. Or, if you are familiar with gevent you can use our experimental feature to use gevent as IO manager.

dsethi commented 5 years ago

Great, thanks @lidizheng -- this is really helpful to know (esp client side as that's where I am currently seeing problem). Assuming that multiple application threads and poling threads share the same lock which causes the overhead (as against 1 lock per application thread, polling thread pair).

So, the workaround for now is to have an application layer message which batches? Also -- is this an issue for all languages?

lidizheng commented 5 years ago

@dsethi What is the gap between your ideal performance and current performance?

Batch messages into larger bulk will help theoretically. Lock issue is unique to Python. If you care about the QPS number, C++, C#, Java and Golang are better choices. gRPC Python is still a great tool to use as glue layer or for IO-intensive task.

dsethi commented 5 years ago

@lidizheng So -- if I batch messages and send 1000 1MB messages, I get 3 sec. With 1000,000 1KB messages on the other hand (same amount of 1 GB data); I get ~90 sec. So the gap is of the order of 80+sec of slowdown due to not batching.

I will try this in Java and report back.

ejona86 commented 5 years ago

In Java you will see a difference between the two because: 1) 1 MB messages will use larger buffers which requires less buffer allocation, 2) small buffers (like 1 KB) have more overhead for writev(), and 3) grpc-java has per-message synchronization between the sending thread and the network thread. All of those would be solved by a batching API.

If there's any more discussion, I'd really appreciate doing that in another issue. This issue (which only applies to Java) has become quite polluted with the recent discussion, which just makes it harder to figure out the current status later. Furthermore, we already know we want this in Java. The only issue is the API to expose and to have time to work on that API.

dsethi commented 5 years ago

Thanks @ejona86 -- very helpful answer -- don't expect much additional discussion on this -- I may have additional experimental results to quote -- is there an issue you can point me to where I can post?