TimelyDataflow / timely-dataflow

A modular implementation of timely dataflow in Rust
MIT License
3.3k stars 272 forks source link

Broadcast message to all worker nodes while in closure of custom binary frontier operator #583

Open cygithub54 opened 2 months ago

cygithub54 commented 2 months ago

Hey, I'm new to timely and I have a question regarding broadcasting & binary frontiers. I have a custom defined binary_frontier stream operator that is in feedback with itself and runs for some arbitrary amount of iterations. Currently, the ParellizationContracts that I have passed into this operator are Exchange pacts, and route feedback messages based off an exchange ID integer defined on the message payload. However, given some condition, sometimes I want these messages to be sent to all active worker nodes. Currently I can implement this simply by checking the condition on each message, manually cloning messages intended to be global for each worker node, and then giving each copy to the session for the feedback stream. However, this feels a bit unwieldly so I was wondering if there's a way to do this a bit more idiomatically. I've looked into creating a new stream for the global messages and calling the broadcast() operator on it, but it doesn't seem like there's a way to create a new stream object within the binary frontier closure (running into lifetime issues with the required scope variable for the to_stream function). Alternatively, is there a way to get this desired global messaging behavior by modifying the passed in ParalellizationContracts?

antiguru commented 2 months ago

However, this feels a bit unwieldly so I was wondering if there's a way to do this a bit more idiomatically.

That is the best approach at the moment. Timely's progress tracking is based on counting messages, and at the moment, each datum sent downstream must be consumed exactly once for counts to add up. A broadcast channel would violate this constraint because it would receive one datum and produce multiple outputs. This is not a fundamental limitation of Timely, but the benefits of a special implementation versus cloning the data eagerly aren't there; the data needs to be cloned anyway.

If you're interested, here's a commit that replaced a special broadcast operator with the current implementation: ca40906e0e90d2f39994e7ff8436406cf4f2c209