Open hniksic opened 3 years ago
I think I would prefer a name like into_serial_iter()
or something, but I could see the concept being useful. Your implementation looks like what I would expect.
@nikomatsakis Thanks for the review. I am of course open to a different name. I would propose into_ser_iter
for brevity and analogy with into_par_iter
. Also, we could even implement IntoIterator
for ParallelIterator
, though that might make it too easy to introduce serialization by accident (e.g. end a par_iter chain with .into_iter().sum()
instead of just sum()
etc.)
Your implementation looks like what I would expect.
Could the implementation use the Rayon thread pool? I'm really uncomfortable with each into_ser_iter()
unconditionally creating a new thread, but I'm not sure it's safe to use a thread from the Rayon thread pool either, due to possibility of deadlocks. E.g. if all threads in the pool are waiting in Sender::send()
, will there be anyone left to make progress? I don't know enough about Rayon's architecture to predict whether this is a problem in practice, but I wouldn't want to introduce a deadlock that can occur through natural usage of the API.
Hmm, I admit I hadn't read the code that closely. I'm thinking a bit more about this now. I had expected to use a channel for this, yes, but I'm wondering if there isn't a kind of hazard where using this on a Rayon worker thread would compose poorly and create deadlock hazards.
(As far as naming, I would definitely not want to implement IntoIterator
, too footgun-prone.)
it would be cleaner to have blocks handling algorithms in rayon. it needs some discussions though.
See also #210. I would like to solve this issue, but I'm really wary of how this will compose. It's not too hard to write something that will work with specific assumptions, but we need to be generally robust with what we add to Rayon's API. Even par_bridge
already struggles there, e.g. people have gotten stuck with a "serial" iterator that does some parallel processing internally, and then work-stealing inverts the dependency.
@cuviper I agree that we should be very careful not to introduce deadlocks. This is why the proof-of-concept implementation unconditionally spawns a new thread, which guarantees deadlock-free operation at the cost of, well, unconditionally spawning a new thread. I would hope that a better implementation is possible without significant changes to Rayon's internals, but it's far from obvious (to me) how to write one. But I'm willing to invest time into it, so if you have some pointers where to look at, they'd be appreciated!
Thanks for the pointer to #210, I missed that issue previously. The discussion there is mostly about whether the implementation should preserve order. I think it shouldn't because an order-preserving parallel-to-serial bridge is both harder to implement and requires performance tradeoffs (if it can even be made to avoid both unbounded allocation and possibility of deadlock). I think the proof-of-concept implementation in this issue is better than the brief one given in a comment there because this one terminates iteration when the sequential iterator is dropped.
This issue is technically a duplicate of #210. If you want, we can keep this one open and make it specifically about the implementation strategy for the non-order-preserving sequential iterator, keeping the semantics pegged to that of the proof-of-concept (and fixing the name). On the other hand, if you'd prefer to keep the design possibilities open, then this issue should be closed as duplicate of #210.
This is why the proof-of-concept implementation unconditionally spawns a new thread, which guarantees deadlock-free operation at the cost of, well, unconditionally spawning a new thread.
Well...
- uses rendezvous channel for backpressure and to avoid values getting stuck in an intermediate buffer;
This leaves a deadlock hazard that the serial side must not call into rayon in any other way. If the entire threadpool is blocked on the channel, waiting for the Iterator
to consume something, then other rayon calls will get queued up and blocked.
- correctly handles the returned iterator not getting exhausted to the end - dropping
SerBridgeImpl
will also drop the receiver and causeParallelIterator::try_for_each
to return an error, terminating the parallel iteration.
Rust can't generally rely on drop for safety, which is why mem::forget
is a safe function. As long as that iterator lives at all, leaked or even just neglected, the pool will be stuck. I guess here it's not really a safety concern per se, "just" a deadlock, but still.
I would hope that a better implementation is possible without significant changes to Rayon's internals, but it's far from obvious (to me) how to write one. But I'm willing to invest time into it, so if you have some pointers where to look at, they'd be appreciated!
It's not obvious to me either, or I probably would have attempted it already. :wink:
This leaves a deadlock hazard that the serial side must not call into rayon in any other way[...]
Good point, I didn't think of that. Perhaps we could avoid it by replacing the blocking send()
with a loop that calls try_send()
:
let par_iter = par_iterable.into_par_iter();
let (tx, rx) = crossbeam_channel::bounded(0);
std::thread::spawn(move || {
let _ = par_iter.try_for_each(|mut item| {
loop {
match tx.send(item) {
Err(TrySendError::Full(returned_item)) => item = returned_item,
other => return other, // success or disconnect
}
rayon::yield(); // do other Rayon business here
}
});
});
Something like rayon::yield()
doesn't seem to exist in the public API, but might exist internally?
- correctly handles the returned iterator not getting exhausted to the end - dropping
SerBridgeImpl
will also drop the receiver and cause ParallelIterator::try_for_each to return an error, terminating the parallel iteration.Rust can't generally rely on drop for safety,
That's clear, though the linked article is mostly about memory safety. Here by "correctly" I meant the code handles an explicit drop by actually noticing when the channel is dropped and terminating the iteration by returning an error from try_for_each()
. (Some toy implementations of into_ser_iter()
don't even do that much).
If the serial iterator is neither dropped nor exhausted, our options are somewhat limited. It would be nice to be able to apply backpressure in a clean way, but barring that, the option of calling rayon::yield()
to avoid deadlock seems workable.
Do you think the above approach could work? In that case it might actually be ok to offload the work using rayon::spawn()
instead of thread::spawn()
, which would resolve the other concern I had with the proof-of-concept impl.
Something like
rayon::yield()
doesn't seem to exist in the public API, but might exist internally?
It doesn't, but there's a note in the par_bridge()
that we might want it, #548 -- and oh hmm, I even had a working branch there. I'm going to have to think about why I never pursued that further...
It's not a total panacea though. If there's not other work available, then it becomes a busy-loop burning CPU. Or else we could try some sleep heuristic, since we don't have a precise way to wake up between both external (crossbeam) events and internal work-available events.
I also fear that work-stealing might end up quickly stealing from the same set jobs of this iterator, getting them all recursively stacked on one thread instead of spreading out on all threads. That's also that hazard if we yielded in par_bridge()
.
In that case it might actually be ok to offload the work using
rayon::spawn()
instead ofthread::spawn()
,
Oh, both of those unfortunately require 'static
too; I glossed over that before. Maybe there's a way to restructure this with in_place_scope
to spawn
and then callback to some FnMut(&mut SerBridgeImpl)
, but that's getting messy.
Compromise upon compromise -- but I'm glad to have you exploring this!
It's not a total panacea though. If there's not other work available, then it becomes a busy-loop burning CPU.
True, and such an issue might be lurking behind #795 in par_bridge()
which I reported previously. Busy-looping is certainly better than deadlock, but to support this properly there should be built-in support for backpressure, probably both in par_bridge()
and here.
I can't tell how big an issue recursive stealing is.
Oh, both of those unfortunately require
'static
too
It never occurred to me that it might not be acceptable to require 'static
. I'm not sure if that can be avoided while keeping the current interface. As specified, the adapter completely moves the processing to the background, leaving the current thread to interact with the serial iterator. Since the current thread must remain active, into_ser_iter()
itself cannot wait for the parallel processing to finish, as I understand the scope-based interfaces imply.
Requiring 'static
rules out a lot of borrowed iterators, especially any par_iter()
or par_iter_mut()
-- unless those started with 'static
data, which would be unusual. And yes, that does have real implications for the interface. Maybe there could be one "easy" conversion for 'static
iterators, and another callback-oriented mode for the borrowed case, hopefully sharing implementation details underneath.
Maybe there could be one "easy" conversion for
'static
iterators, and another callback-oriented mode for the borrowed case
That makes sense. We already have the callback-oriented interface of sorts, ParallelIterator::for_each()
- but it requires an Fn
callback. The conversion to full iterator allows great freedom with using the iterator, but requires 'static
on the original parallel iterator. There could also be a middle ground, a callback-based interface like for_each
, that doesn't require 'static
bound on the parallel iterator, but still allows FnMut
non-Send closure (because it executes the callback in the current thread). There are already crates that define such interfaces, sometimes called internal iterators.
Maybe this option should be offered separately, after issues with the ordinary iterator (requiring 'static
) are worked out.
My inclination right now is not to support this-- I suspect that gathering into a Vec
will typically be faster.
[...] I suspect that gathering into a Vec will typically be faster.
@nikomatsakis The trouble is some iterators produce a huge number of items that doesn't fit into available memory, and that you have to process as they are being generated. Of course, batching results into intermediate Vec
s of fixed size works, but is not as convenient to express.
Can you fold
them at all into partial results?
Can you
fold
them at all into partial results?
I don't think so. In one case we for_each()
them on the parallel iterator passing it a closure with side effects (still Fn
because it uses fine-grained locks as described here). In other cases the items get filtered, turned into fixed-size records containing just numbers, and dumped to file. The disk-saving part is an example where something like ser_bridge()
would come in useful, currently we just use channels manually.
I can concur with @hniksic's assertions here. I'm currently using Rayon in a PR to Ruffle's scanner binary (which you may know about because GitHub decided my commit message should get tagged in here three times). The proposed serial bridge impl (which I copied into my PR, assuming that's OK) makes it very easy to report partial results in case, say, the scan is cancelled halfway through; or something irrecoverably panics the host process.
I eventually implemented multiprocess scanning to deal with unrecoverable panics, so that alone isn't as much of a concern, but being able to terminate the scan and still get partial data out of it is still useful. Batching everything up into a Vec
wouldn't work for this use case. Keep in mind that this is a process intended to run on ~170,000 Flash files, to check for bugs in Ruffle, and it takes 5 hours to complete even with 16 cores.
My untested opinion about performance/thread-scalability, at least in my use case, is that the internal channel lock this uses will probably be less contended for than, say, if I had Arc<Mutex>
'd the CSV writer I'm using so that I could share it across all threads. If I had done it the latter way, then each process thread would have to wait for file I/O to finish, which would be a bottleneck at high core counts. The internal channel in SerBridge
can presumably buffer some number of results for the mainthread to write at a later time.
However, if this use case is considered too niche, it might make more sense to package this code up as a library crate.
Similar use case. I currently need to process 400-500 GB of blockchain data. All the file reading and decoding can be done in parallel, but certain steps of computation must follow chronological order.
So, if I can do something like:
(0...end)
.into_par_iter()
.map(|h| read_block_number(h)) // <- parallel
.into_sync_ser_iter()
.map(|block| do_something(block) // <~ sequential
Or even something like (chained):
(0...end)
.into_par_iter()
.map(|h| read_block_number(h)) // <- parallel
.into_sync_ser_iter()
.map(|block| do_something(block) // <~ sequential
.into_par_iter()
.map(|h| read_block_number(h)) // <- parallel
.into_sync_ser_iter()
.map(|block| do_something(block) // <~ sequential
.into_par_iter()
.map(|h| read_block_number(h)) // <- parallel
.into_sync_ser_iter()
.map(|block| do_something(block) // <~ sequential
I somehow implemented a tiny crate for my own usage: https://github.com/Congyuwang/Synced-Parallel-Iterator.
I assign each worker a synced channel to buffer their output,
and a registry
channel to record thread order when each worker thread is polling a new task.
So, when I consume this iterator, I can consult this registry
channel which worker buffer to read from.
I somehow cannot get rid of a Mutex lock of guarding the upstream iterator
to ensure the registry
record the correct task polling order, which seems a bit expensive though:
worker -> lock upstream iterator -> call `next()` -> `send` its own thread ID to `registry` -> release lock (of upstream iterator) -> process its task -> store result in its own buffer
I described various solutions to this problem here: https://github.com/rayon-rs/rayon/issues/1070
Here is my solution: https://github.com/rayon-rs/rayon/pull/1071
@safinaskar FWIW I don't think #1071 resolves this. This issue is about funneling a parallel iterator into a serial one, the inverse of par_bridge()
. While the example given uses .into_par_iter()
on a sequence followed by a .map()
and .ser_bridge()
, that's just an example. The idea of ser_bridge()
is that it works on Parallelterator
, so one can use iterator combinators such as fold()
and flat_map()
before it. Your solution operates outside the parallel combinator ecosystem, and only supports .map()
.
Now I think solution is https://crates.io/crates/pariter ( https://dpc.pw/adding-parallelism-to-your-rust-iterators )
In summary, I would like to propose adding
ser_bridge()
, a bridge that convertsParallelIterator
into an ordinaryIterator
, sort of the opposite ofpar_bridge()
. In this issue I present a toy implementation and would like to inquire about the feasibility of providing a production-ready one.Rayon already provides
par_bridge()
, an extension toIterator
that converts anyIterator
into aParallelIterator
. In most cases you can then proceed to transform the parallel iterator, ending with a consuming operation likefold()
,sum()
orfor_each()
. But in some cases you want to funnel the values produced by a parallel iterator back into an ordinary iterator, e.g. in order to feed them to a single-threaded resource. This is expressed in this StackOverflow question (which also proposes the nameser_bridge()
), but also in my own usage of Rayon.The goal would be to enable writing code like this:
Note that
sink.write()
requires mutable, and therefore single-threaded, access. Although one could imagine this particular case being solved by putting the sink in anArc<Mutex>
or a channel (the latter being the resolution of the SO question), in many cases it is much more convenient to obtain an iterator. If nothing else, it allows us to pass the result of the processing to a function that expects an iterator, such as those from theitertools
crate.If this is considered useful, I would like to submit this toy implementation for review:
This implementation has the following nice properties:
SerBridgeImpl
will also drop the receiver and causeParallelIterator::try_for_each
to return an error, terminating the parallel iteration.The downside, and the reason I consider it a toy implementation, is that it creates a whole new thread for the bridging.
My questions are:
crossbeam_channel
?