TimelyDataflow / timely-dataflow

A modular implementation of timely dataflow in Rust
MIT License
3.26k stars 271 forks source link

Broadcast data to all workers #23

Closed sga001 closed 8 years ago

sga001 commented 8 years ago

Is there an easy way to construct a broadcast operator (similar to exchange) that sends (i.e., copies) all stream data to all workers?

Right now I'm doing the following which sort of works, but I wonder if this is the right way of doing this.

root.scoped(|builder| {
  let index = builder.index();
  let peers = builder.peers();
  let (input, stream) = builder.new_input();
  let mut streams = vec![];

  for i in 0..peers {
      streams.push(stream.exchange(move |_| i as u64));
  }

builder.concatenate(streams).inspect(|x| println!("{}, {:?}", index, x));

Ideally, one could build something like: stream.broadcast().inspect(|x| println!("{}, "{:?}", index, x); My guess is that this can be done provided one implements a new ParallelizationContract. I think the only difference between Exchange and this new Broadcast pact would be the implementation of the Push trait in pushers::exchange::Exchange no?

Any thoughts?

frankmcsherry commented 8 years ago

This is a good issue to have open. The slightly complicated thing for a ParallelizationContract is that it isn't bright enough (without some help) to know how many copies of a record it would make, which is important information for the progress tracking logic (which needs to know how many records are at large on each channel).

In retrospect, this shouldn't be a problem because the channel has some number of targets, and makes some number of copies of each record, but something blocked it when I tried. Let me look into this this evening, and either reconstruct what the issue was (and report back) or pop it in for you.

If it helps, what I do, which is less good than what the channel would do, is a flat_map that puts an index on records and then does an exchange, so you can broadcast even within a computation, rather than just at the inputs. But, less good for sure.

sga001 commented 8 years ago

Cool. Ideally one could implement something like Exchange but passing in a function that returns a collection of u64 rather than a single identifier. That way one can implement both broadcast and multicast with the same channel code.

In particular, I imagine the same code as pushers/exchange.rs but instead of self.hash_func() returning a single index, it returns a vector of indices; one then replaces the masking / modulo logic with a check that ensures index < self.pushers.len() for all indices. The procedure can then push the datum onto the buffers of all of those pushers. In fact, if one were to specify the closure: {|_| vec![1, 1, 1, 1]}, this could lead to an easy way to implement a "duplicate" operator that sends the same data stream multiple times to the same worker (peer with index 1 in this case), though in the broadcast case one would just use {|_| (0..peers).to_vec()}.

Granted I have no idea how the progress tracking logic works or interacts with this, so perhaps my understanding is completely off.

frankmcsherry commented 8 years ago

I looked into this, and it isn't all so horrible.

The thing I couldn't do before when I tried was just add a new implementor of Push, as a drop-in replacement for the existing Exchange implementor. This is because the progress tracking math is agnostic to where records end up, and so exists outside the routing decisions.

While it wouldn't be too hard to jury-rig a few new implementors and create a new ParallelizationContract, it is a bit awkward because the progress tracking logic assumes/requires that each dataflow edge out of an operator output port carries the same set of records (and in particular, the same count of records). A broadcast ParallelizationContract would violate this, and probably screw up a bunch of logic elsewhere.

It might make more sense to implement broadcast as an operator; it would have the look and feel above, with .broadcast().stuff(), and make it clear to the progress tracking logic that something has caused the number of records in play to increase beyond the number of the source stream.

I will investigate!

sga001 commented 8 years ago

I ended up just doing:

other_stuff().unary_stream(Pipeline, "duplicate", move |input, output| {
  input.for_each(|time, data| {
    for datum in data.drain(..) { // datum of type T
      for i in 0..peers {
        output.session(&time).give((i as u64, datum.clone())); 
     }
   }
  });
}).exchange(|x: &(u64, T)|  x.0).more_stuff()

Though I'm not sure if this is horribly inefficient. You mentioned I should do this with a flat_map, but I couldn't quite figure out how to implement the above logic using a flat_map given my limited rust programming ability (though my guess is that performance will be similar to the code above).

frankmcsherry commented 8 years ago

This approach seems totally fine. It should have a bit more overhead than Andrea's implementation, in that the exchange operator will actually think to route something, rather than just duplicating it. Both of them should be improvable to only transmit each datum once between each process, but let's leave the issue open to work towards that sort of goal.

Is your program performance-sensitive wrt the broadcast? I.e. would you notice improvements so that if we try things out, you would say "zomg, sweet" or maybe not? (both are fine; just curious).