TimelyDataflow / timely-dataflow

A modular implementation of timely dataflow in Rust
MIT License
3.25k stars 272 forks source link

Roadmap? #128

Open JosephWagner opened 6 years ago

JosephWagner commented 6 years ago

Hi,

I was wondering if there was a list of improvements/features/fixes that you were considering implementing. I'm interested in contributing but I don't have a good sense of all the possible areas of work.

I did see 48 and 49, and also it seems like there's logging infrastructure work underway.

frankmcsherry commented 6 years ago

This is a good question, and I don't have a good answer at the moment. I do have a list of things to do, though all the things on the top of brain at the moment are fairly delicate re-wirings of things. If you don't mind, let me round up some folks on monday and have a chat about what things we might be able to break out as starter projects.

The docs and doc_tests are totally good options, though I think you've gotten enough of a feel for the system at this point that perhaps their purpose (onboard people to timely) isn't as well served for you.

There is a todo list at the bottom of readme.md, but it is a bit stale at this point (two of the items got pushed up to user level code in differential, and two of the items are a bit open-ended and speculative). I think refreshing that makes sense, and we can try and break out some easier or more independent parts!

frankmcsherry commented 6 years ago

Ok, we just pushed a 0.5 crates version out, which I think means it is time to start to break things again. Now we will actually try to sort out what some solid todo items are. We have a bit of a list already (of things we need done, and will probably just do), but ideally the thinking stirs up some other ideas.

Are there broad areas that motivate you? There are a few levels here, from the low level communication gunk (tracking down copies and removing them; tracking down alloc/deallocs that shouldn't exist) to higher level "ergonomic issues in operator definitions". I'm up for mentoring some of these, but it would be best if it lined up with your interests. If no specific areas right now, no worries and I'll try and whip up a list some elements of which might be more tempting.

JosephWagner commented 6 years ago

Networking and low level programming are areas I want to learn more about, so low level communication gunk sounds awesome!

frankmcsherry commented 6 years ago

I've pushed a few concrete options as part of the README.md, in the contributing section. It's a good question how easy it would be to bite off various pieces here, and I don't know that any are easily extracted and worked on. Probably the easiest wrt the communication plane is tracking down spurious copies and grokking how best to evade them.

There is another unlisted outstanding issue that the timely_communication crate should probably have a "broadcast" channel implementation. Right now this is emulated using point-to-point channels, but it makes for a bit of a waste. One could instead add a new channel construction method, some new metadata to the message framing struct, and rig the communicator to deposit received messages in each worker's mailboxes (rather than extracting a worker id and using that).

JosephWagner commented 6 years ago

Thanks so much for listing out these options! I'm going to poke around in the codebase more and see if I can find an easy issue to get my feet wet. At first glance, using the bytes crate to avoid bullet point 2 in https://github.com/frankmcsherry/timely-dataflow/issues/111 seems relatively straightforward.

It seems like many areas of work involve sorting out various performance issues -- do you think adding benchmarking is worthwhile? It feels weird to work on performance without first establishing a baseline. We could even track it over time.

frankmcsherry commented 6 years ago

Benchmarking is totally a good thing (and something that they are trying to get going at ETHZürich), but it has some complications.

The main issue here is that timely can pretty easily saturate weak network connections (1Gb, and we've saturated 10Gb), and this becomes more true the more worker cores you have in place. This mean that it can be a bit tricky to tease out communication performance issues without the right set-up. The "right set-up" could mean either i. a fancy network rig, or perhaps ii. a computer with enough cores that one could do loopback TCP (no network involved). It seems reasonable that one could do this, but it is more than just cargo bench, I think.

We could also put more effort into mocking for the components, so that perhaps we could extract the internals of the communication infrastructure and benchmark it without actually having data coming over the wire. That seems sane, but I don't have anything on the tip of my brain about how easy that would be. Happy to think about it though (either out loud or privately).

Regarding the bytes crate thing: this sounds like a good thing, but will have a few surprising issues I bet. One is that timely does some tricks with serialized data, currently using owned Vec<u8> that might need to become bytes owned slices, and you are going to run into these (and perhaps squeal with dismay when you do).

A second "issue", perhaps not to worry about yet, is that there are currently some mutterings about re-thinking the communication architecture to be a bit more data-driven. Right now the communication threads peel off bytes from the incoming network connections and drop them into queues for operators, which the operators are then expected to find when they next run. There is an interest (mine, maybe others too) in promoting information about the communication to the root of the worker, so that

  1. the worker knows more about which operators may need to run (rather than polling all of them), and
  2. the worker knows that once it has run all of these operators, the bytes memory backing the slices that were handed out should be available for re-use (alternately, that there is some discipline that causes this memory to become available).

I suspect that whatever happens here, the candidate bytes modifications will be relevant (but may need to be ported). There is probably a symmetric need on the send side, where what is currently serialization into freshly created Vec<u8> buffers should become "getting a handle to a &mut [u8] from the worker" so that serialized data all lands in one (or few) allocations and can be efficiently handed off.

Maybe as step 1 we could try and get an example where we can see timely behaving badly due to copies. There is an examples/exchange.rs that intends to demonstrate the throughput of the exchange channel, which can be run with multiple processes to exercise the communication infrastructure (or, we should verify that it does, by contrasting it with the same number of workers in the same process, which avoids serialization->network->deserialization).

Edit: also, the workload we initially saw overheads due to copies (and got improvements by removing them) was the pagerank project. It moves a bunch of (u32, f32) data around and does a ranks[x.0] += x.1; for each tuple, so things like copies stood out clearly enough. I'm not sure if there is more perf to gain from removing even more copies (could be, but it could be bottlenecked elsewhere now), but this was a "real world" problem where we saw some gains (example gains).

frankmcsherry commented 6 years ago

I was pondering the proposed bytes changes, and wanted to leave some breadcrumbs about which parts of the code would most likely be touched (I'm not sure it is obvious, and there is a bit of "cleverness" associated with it).

Right now timely_communication::networking::BinaryReceiver::recv_loop is where bytes get peeled off of content returned from the kernel. This is currently done by creating a new Vec<u8>, but it seems like it could be done by swapping the populated Vec<u8> into a BytesMut and peeling off parts of it rather than the new Vec<u8>s.

Ideally, the networking threads would maintain references to these large-ish allocations and recover them when the number of outstanding BytesMuts hits zero; I'm not sure if the bytes crate supports that (perhaps capturing an initial empty BytesMut to get the pointer, and then hoping that the crate supports methods to recover allocations, I guess?).

This would also mean that the channels we set up would move BytesMut rather than Vec<u8>, which is a fine change to make. Also, the Serialize trait which currently has a method

    fn from_bytes(&mut Vec<u8>) -> Self;

probably wants to take an owned BytesMut instead. This is going to immediately explode a few things, including i. the derived implementation for T: Abomonation+Clone, which might just be a syntactic tweak (it should work for implementors of DerefMut<Target=[u8]>), and ii. the Serialize implementation of timely::dataflow::channels::Message, which is going to need some love.

I'm happy to help out with the Message rework, but the intuition is that it represents an enum that is either owned data (a (T, Vec<D>)) or serialized data (a Vec<u8> or BytesMut, with the intent that it should be de-abomonated). There are various methods that could use a clean-up that roughly behave like Rust's Cow type, allowing you to make an enum instance into its owned form if you want, but also allowing you to borrow it in either case. This was done in pre-BytesMut days, where it was annoying to split a Vec<u8> up into disjoint parts, which may have complicated the design somewhat. Because this is Rust, it is hard to know if a simpler design works without trying it and seeing what Rust says about it... =/

frankmcsherry commented 6 years ago

Reading on the bytes repo, it seems there is discussion of a into_vec() method that would let you reclaim allocations (in some cases), but .. the associated PR has been sitting since last August. Not clear that this will happen in a hurry.

Also (potential footgun) small slices are stored in-line, so stashing a length zero BytesMut wouldn't allow for recovery; would need to be at least 32 bytes because that is the point at which the inline storage breaks. grumble...

frankmcsherry commented 6 years ago

I threw up a lightweight replacement for the bytes crate as #133. It is blocked on me having no idea if the code is in fact safe, nor any clue how to test it. But, it does what I think is the minimal set of things we would need to do, and allows resource recovery (as well as some generality in the backing store of memory, in case that ends up being useful (e.g. communicating between processes with shared memory).

JosephWagner commented 6 years ago

Wow, between #133 and #135 it seems like a lot of the stepping stones are in place. I'm happy to take this conversation over to issue #111, but it sounds like the next step is to, as you suggested, use exchange.rs to see how much additional allocation occurs due to inter-process communication. I could use #135 for this.

I thought I recognized collectl plots in some of your blog posts -- I could use that track memory usage over time. Hmm... perhaps the ideal metric is fraction of runtime spent allocating memory. What do you think?

I'll poke around with collectl and maybe ePBF and see if I make a helpful graph or two.

frankmcsherry commented 6 years ago

111 sounds like a good place! I'll drop a note there explaining what #133 and #135 are about.