Applied-Duality / ReactiveRemote

Observables in the Cloud
Apache License 2.0
1 stars 0 forks source link

Batching, buffering, aggregates #1

Open mariusae opened 10 years ago

mariusae commented 10 years ago

(For the purposes of conversation, I’m going to call the thing we’re working on “Channel” — substitute “Observable”, “Pipe”, whatever.)

In order to treat I/O efficiently, it is paramount to give treatment to byte buffers. A naive channel

Channel[Byte]

would allow you only to pull one byte off at a time. This is untenable for obvious reasons. You could imagine asking for, and receiving a batch of bytes. That is, you might say

val ch: Channel[Byte]
val f: Future[IndexedSeq[Byte]] = ch.read(1024)

but that is still inefficient: just as with mbufs/skbuffs in kernel networking stacks, you want to make it easy to do zero-copy IO and buffering. That is, if I get a ByteBuffer from my IO stack, I want to be able to represent that as an aggregate of Bytes.

At Twitter, we use Bufs to represent byte buffers. (They are defined as being concatenatable and sliceable.) It is possible to view Bufs as being a custom buffering strategy for Byte.

You might ask: why would I do this at all? In the above scenario, why wouldn’t I represent my IO channel as

Channel[Buf]

and just ship individual bufs (of varying size) instead? The chief reason is that you want to be able to control buffering in terms of bytes, not chunks. Your buffering policies (for back-pressure and resource management) is always in terms of bytes. If I’m the reader of such a channel, I want to read a certain number of bytes, not a certain number of Bufs (with unknown size). Conversely, the producer shouldn’t be free to push arbitrary Bufs—they must comply to a size restriction.

An alternative view is this: low level IO like this is simply too different. We’re only going to tackle higher level “object” streams.

But it’s worth a thought.

headinthebox commented 10 years ago

I spent the day looking at Node Streams in more detail, since the Node community is an echo chamber of "back-pressure, sure, ...". The Streams and EventEmitter APIs http://nodejs.org/api/stream.html are classic examples of floor wax + desert topping design. The API is really a low-level IO API for pushing around byte arrays + string encodings, comparable to http://msdn.microsoft.com/en-us/library/system.io.stream.aspx that has push, pull, stop, go, you can even write on a readable stream and put items back.

The big elephant in the room with Node is that really they are in desperate need of Futures and async await (https://developer.mozilla.org/en-US/docs/Mozilla/JavaScript_code_modules/Task.jsm) at which point I think they could simplify things a lot.

But ultimately, it Node streams are about low-level byte manipulation (parsing Http messages and TCP packets).

viktorklang commented 10 years ago

It's interesting to think about the low-level performance implications (allocations, cache-locality, branch-prediction, morphicity(?) of calls etc). Also, I think performance a lot of time has to do with bulk operations.

Could you expose a ReadOnlyByteBuffer to the user instead of the mutable one? At least that would avoid the risk of concurrent modifications.

Wdyt?

mariusae commented 10 years ago

That’s what Bufs are. It certainly helps, but this only solves part of the problem. For example, you still wouldn’t be able to control buffering in terms of byte sizes.

viktorklang commented 10 years ago

So the issue is the source->sink scenario you talked about earlier (if I understand you correctly), where you basically just want to pipeline bytes. I guess one question is if it makes sense to have a generic Buf-like feature (Batch?) which could be sent when one requests X elements, but the receiver only sees the individual elements in the batch. Would that work?

viktorklang commented 10 years ago

If one has Chunks builtin to the protocol there's an interesting capability to chunk and de-chunk streams with maintained async backpressure.

mariusae commented 10 years ago

@viktorklang yes, absolutely. One possibility is to represent a “chunk” of something as a Seq (or even Traversable), but allow for optimized implementations for certain datatypes.

Edited: (Actually, I take that back — Traversable or Seq doesn’t quite capture it. I think we want roughly something that’s “sliceable” and “concatenatable” like Bufs — and of course Traversable so you can actually get something out of it.)

mariusae commented 10 years ago

(My main concern is this: would this make the whole thing more complicated to handle just one (though important) use case?)

viktorklang commented 10 years ago

Yes, it would indeed complicate matters, question is if it should be reified in the model or just be done at the "transport level". What's your take?

mariusae commented 10 years ago

If we wanted to, through this API, support streaming byte streams, then I think it’s required.

viktorklang commented 10 years ago

The alternative would be to push the responsibility to the transport layer and do the classical Nagle-ish solution. What do you think are the Pros/Cons with each solution?

mariusae commented 10 years ago

Yeah. The problem (though maybe it’s not a bad problem) is that this forces you to discretize your byte stream: the transport layer has to have some policy which make a decision about chunk sizes, etc. Higher layers then operate over these chunks. This has a few drawbacks.

(Again: my mind is not at all made up here. This is mostly for thinking through.)

rkuhn commented 10 years ago

Sorry for being late to the party, I was stuck in a traffic jam. Thinking about exposing streams of bytes at byte-level granularity but executing them efficiently raises some interesting questions: while backpressure formulation obviously benefits, consumers of these streams will want to have chunks of data to operate on. Take for example an encryption stage using SSLEngine. It would not do to feed every single byte into the machinery, instead you want to consume as much as you currently have in one go (up to a limit). For me this means that expressing it as Channel[Byte] with magic to make it run in batches would not really cut it, the code which consumes the stream wants to be aware of the batches.

One possible solution is to make the transport layer obey certain granularity rules and then allow the backpressure to be formulated using a weight function such that it can be expressed in bytes. I have a feeling that the resulting API might be challenging to design nicely, but throwing it out there as food for thought.

headinthebox commented 10 years ago

https://github.com/notifications/beacon/f5Np9-JVjG56Yyi3j3ToPNN6Rn1FkXeHvaUNO7XXRy2wnS0htqMhsYDdOqzwOj3x.gif Bought http://shop.oreilly.com/product/0636920028970.do to learn more about control systems.Found 50% off coupon on http://www.retailmenot.com/

(wow that sounds like spam, but isn’t ;-)

viktorklang commented 10 years ago

Perhaps the view of a batched stream should be layered on top, so if you want to view ByteStrings as Bytes that's done inside a Subject?