BurntSushi / walkdir

Rust library for walking directories recursively.
The Unlicense
1.24k stars 107 forks source link

Rayon support for walkdir iterators #21

Closed joshtriplett closed 6 years ago

joshtriplett commented 7 years ago

I'd love to use Rayon parallel iterators with walkdir. Walkdir's iterators don't implement the necessary traits to do so. @nikomatsakis provided a different pattern that provided some thread-pool-based parallelism, but it would be nice to directly use par_iter / into_par_iter with walkdir. walkdir itself could actually run the walk in parallel in that case, processing multiple directories in parallel.

BurntSushi commented 7 years ago

Is there documentation that will help me write such an implementation? Which traits do I need to implement?

I wouldn't expect this to be an easy task. In fact, I'd expect this to require a complete rewrite of the current implementation. (If my expectations turn out to be true, then perhaps there should be a separate crate for it.)

nikomatsakis commented 7 years ago

If you made the iterator Send, it'd be a start -- that's probably not too hard?

nikomatsakis commented 7 years ago

Ah, I see the problem now. It's the "sorter" closure. You could probably just require that it be Send, though.

We don't have a good "So You Want To Implement Parallel Iterator" tutorial yet. The closest thing is the README describing the internals.

BurntSushi commented 7 years ago

@nikomatsakis Awesome! Perhaps my expectations were wrong. :-) I will give the internal docs a read soon. (I also secretly hope that I can use this in ripgrep, which uses its own home-grown parallel directory traversal... The part I've struggled most with is when to terminate.)

nikomatsakis commented 7 years ago

@BurntSushi glancing through the code it does seem like it'd require something of a rewrite. I'd expect this code to probably act as its own "producer", that is, to drive a consumer directly. Basically you'd get a callback Consumer. Probably the most straightforward way to do this would be to do more of a "top-down" loop.

Basically, parallel iterators generally require a rewrite to work in "push mode", versus sequential iterators, that act in pull mode.

nikomatsakis commented 7 years ago

But I've been playing with various "adapters" that will take sequential iterators and drive parallel code with them; such things get easier if the iterator is Send.

BurntSushi commented 7 years ago

I'd expect this code to probably act as its own "producer", that is, to drive a consumer directly.

@nikomatsakis Yup. This is exactly why I (personally) found it difficult to write a parallel directory traversal.

vandenoever commented 7 years ago

Running in parallel on a spinning disk should give a speedup because it allows the io scheduler to group reads that are close together on the disk.

I've tried to confirm this with a simple program. It has runs serially (s), uses rayon (r) or threads (t). To my amazement, there is no notable difference in speed. Using a different io scheduler (deadline or noop) also makes no difference. The speed is the same as it is with find. (I ran sync && echo 3 > /proc/sys/vm/drop_caches between runs).

Perhaps the data that I'm testing on is not very fragmented.

extern crate walkdir;
extern crate rayon;
use walkdir::WalkDir;
use rayon::prelude::*; 
use std::env;
use std::thread;

fn walk(path: &String) {
    for entry in WalkDir::new(path.as_str()) {
        let entry = entry.unwrap();
        println!("{}", entry.path().display());
    }
}

fn r(paths: Vec<String>) {
    paths.par_iter().map(|path| {
        walk(path);
    }).collect::<Vec<_>>();
}

fn t(paths: Vec<String>) {
    let mut handles = Vec::new();
    for path in paths {
        handles.push(thread::spawn(move || {
            walk(&path);
        }));
    }
    for handle in handles {
        handle.join().expect("Panic waiting for thread.");
    }
}

fn s(paths: Vec<String>) {
    for path in paths {
        walk(&path);
    }
}

fn main() {
    let mut args = env::args();
    args.next();
    let mode = args.next().unwrap();
    let mut paths = Vec::new();
    for path in args {
        paths.push(path);
    }
    if mode == "s" {
        s(paths);
    } else if mode == "t" {
        t(paths);
    } else {
        r(paths);
    }
}
vandenoever commented 7 years ago

Some more measuring on 1M files does show a difference. The results stay within a few seconds when repeated.

echo threads
sync && echo 3 > /proc/sys/vm/drop_caches
time target/release/wd t $dir/*/* | wc -l

sleep 5

echo serial
sync && echo 3 > /proc/sys/vm/drop_caches
time target/release/wd s $dir/*/* | wc -l

sleep 5

echo rayon
sync && echo 3 > /proc/sys/vm/drop_caches
time target/release/wd r $dir/*/* | wc -l

sleep 5

echo find
sync && echo 3 > /proc/sys/vm/drop_caches
time find $dir/*/* |wc -l
threads
1005929

real    4m23.439s
user    0m1.765s
sys     0m4.451s

serial
1005929

real    4m43.735s
user    0m2.135s
sys     0m5.529s

rayon
1005929

real    5m37.135s
user    0m2.074s
sys     0m5.361s

find
1005929

real    4m41.133s
user    0m0.691s
sys     0m3.445s

Running in parallel with threads is a few % faster than find. Running serially is the same as find and using rayon is slower.

joshtriplett commented 7 years ago

Running in parallel on a spinning disk should give a speedup because it allows the io scheduler to group reads that are close together on the disk.

Running in parallel also helps many SSDs, which benefit from having many transactions in flight at once ("queue depth").

vandenoever commented 7 years ago

Doing a naive parallel implementation does give a nice speedup. The code below runs 32 threads and handles the same case as from the timings above in 3m9 which is 28% less time than the fastest so far (threads). The speedup comes at a cost of CPU use. CPU use was 3% during this run.

fn u(paths: Vec<String>) {
    let mut vec = Vec::new();
    for v in paths {
        vec.push(PathBuf::from(v))
    }
    let data = Arc::new(Mutex::new(vec));
    let (tx, rx) = channel();
    let n = 32;
    for _ in 0..n {
        let (data, tx) = (data.clone(), tx.clone());
        thread::spawn(move || {
            let mut buf = Vec::new();
            loop {
                let p: Option<PathBuf> = {
                    let mut data = data.lock().unwrap();
                    data.pop()
                };
                match p.map(|p| p.read_dir()) {
                    Some(Ok(p)) => {
                        for f in p {
                            let entry = f.unwrap();
                            let path = entry.path();
                            if entry.file_type().unwrap().is_dir() {
                                buf.push(path.clone());
                            }
                            tx.send(Some(path)).unwrap();
                        }
                        if buf.len() > 0 {
                            let mut data = data.lock().unwrap();
                            for v in buf.iter() {
                                data.push(v.clone());
                            }
                            buf.clear();
                        }
                    },
                    Some(Err(_)) => {},
                    None => {
                        tx.send(None).unwrap();
                        break;
                    }
                }
            }
        });
        let ten_millis = time::Duration::from_millis(10);
        thread::sleep(ten_millis); // hack to let stack populate
    }
    let mut nclosed = 0;
    while nclosed < n {
        match rx.recv() {
            Ok(Some(p)) => println!("{}", p.to_string_lossy()),
            _ => nclosed += 1
        }
    }
}
real    3m9.369s
user    0m2.129s
sys     0m5.912s

with 1024 threads:

real    2m47.662s
user    0m2.679s
sys     0m6.655s
vandenoever commented 7 years ago

Here's another version. This one only adds threads when the io is slow. With a warm cache it is slower than find (0m4.3s vs 0m2.3s) but with a cold cache it is much faster (3m1s vs 4m41s).

Threads are used to wait for the blocking function read_dir to return. More waiting threads means more information for the io scheduler to come up with an efficient route for the disk head. The number of threads should not be derived from the number of cpu cores because the threads are mostly sleeping. The number should probably be lower than /sys/block/sda/queue/nr_requests which is typically 128.

For the warm cache case, most of the extra time is spent in channel communication: with one waiter, the time is 0m3.6.

So using threads certainly has advantages. It would be nice if the cost of communication could be brought down to make the warm cache case faster.

fn waiter(irx: spmc::Receiver<Option<PathBuf>>, otx: mpsc::Sender<Option<DirEntry>>) {
    thread::spawn(move || {
        loop {
            match irx.recv().unwrap().map(|p| p.read_dir()) {
                Some(Ok(p)) => {
                    for f in p {
                        otx.send(Some(f.unwrap())).unwrap();
                    }
                    otx.send(None).unwrap();
                }
                Some(Err(_)) => {
                    otx.send(None).unwrap();
                },
                None => { break; }
            }
        }
    });
}

fn o(paths: Vec<String>) {
    let mut nwaiters = 1;
    let max_waiters = 100;
    let wait_time = time::Duration::from_millis(20);
    let (itx, irx) = spmc::channel();
    let (otx, orx) = mpsc::channel();
    waiter(irx.clone(), otx.clone());
    let mut jobs = paths.len();
    for v in paths {
        itx.send(Some(PathBuf::from(v))).unwrap();
    }
    loop {
        match orx.try_recv() {
            Ok(Some(entry)) => {
                let path = entry.path();
                println!("{}", path.to_string_lossy());
                if entry.file_type().unwrap().is_dir() {
                    jobs += 1;
                    itx.send(Some(path)).unwrap();
                }
            },
            Ok(None) => {
                jobs -= 1;
                if jobs == 0 {
                    break;
                }
            },
            Err(_) => {
                // slow io, add a waiter
                if nwaiters < max_waiters {
                    nwaiters += 1;
                    waiter(irx.clone(), otx.clone());
                }
                thread::sleep(wait_time);
            }
        }
    }
    for _ in 0..nwaiters {
        itx.send(None).unwrap();
    }
    println!("used waiters: {}", nwaiters);
}
anderejd commented 7 years ago

I found this thread through https://www.reddit.com/r/rust/comments/6eif7r/walkdir_users_we_need_you/

This is interesting to me because trying to use rayon with walkdir was my first toy project with Rust :) But I failed due to the complexity of trying something way beyond my hello world-level Rust skills at the time. I ended up writing a workaround using channels instead (also a BurntSushi crate):

https://github.com/anderejd/treesum/blob/master/src/main.rs (the toy project) https://github.com/anderejd/sgiter/blob/master/src/lib.rs (the workaround)

I suspect the code linked above does not provide much value for this issue, but you never know.

vandenoever commented 7 years ago

Some more numbers: running on a 4TB hard disk with 60M files:

find         264m19.985s
walkdir      267m35.137s
100 waiters  153m58.516s
vandenoever commented 7 years ago

Turns out walkdir had a trick up its example: BufWriter. I've added that and now the '100 waiters' code outperforms everything.

On an SSD with 1.5M files:

method cold warm
find 0m9.692s 0m1.491s
walkdir 0m10.042s 0m1.855s
100 waiters 0m2.706s 0m1.362s
macro_rules! wout { ($($tt:tt)*) => { {writeln!($($tt)*)}.unwrap() } }

fn waiter(irx: spmc::Receiver<Option<PathBuf>>, otx: mpsc::Sender<Option<DirEntry>>) {
    thread::spawn(move || {
        loop {
            match irx.recv().unwrap().map(|p| p.read_dir()) {
                Some(Ok(p)) => {
                    for f in p {
                        otx.send(Some(f.unwrap())).unwrap();
                    }
                    otx.send(None).unwrap();
                }
                Some(Err(_)) => {
                    otx.send(None).unwrap();
                },
                None => { break; }
            }
        }
    });
}

fn hundred_waiters(paths: Vec<String>) {
    let mut nwaiters = 1;
    let max_waiters = 100;
    let wait_time = time::Duration::from_millis(20);
    let (itx, irx) = spmc::channel();
    let (otx, orx) = mpsc::channel();
    waiter(irx.clone(), otx.clone());
    let mut jobs = paths.len();
    for v in paths {
        itx.send(Some(PathBuf::from(v))).unwrap();
    }
    let mut out = io::BufWriter::new(io::stdout());
    loop {
        match orx.try_recv() {
            Ok(Some(entry)) => {
                let path = entry.path();
                wout!(out, "{}", path.to_string_lossy());
                if entry.file_type().unwrap().is_dir() {
                    jobs += 1;
                    itx.send(Some(path)).unwrap();
                }
            },
            Ok(None) => {
                jobs -= 1;
                if jobs == 0 {
                    break;
                }
            },
            Err(_) => {
                if nwaiters < max_waiters {
                    nwaiters += 1;
                    waiter(irx.clone(), otx.clone());
                }
                thread::sleep(wait_time);
            }
        }
    }
    for _ in 0..nwaiters {
        itx.send(None).unwrap();
    }
    error!("waiters: {}", nwaiters);
}

The issue that remains to be solved is how to efficiently sort while working in parallel. A simple solution would be to only support sorting when not running in parallel. Another option is to 'fire and forget': do a readdir, but only collect and handle the result that is needed at the moment. That way, the filesystem cache is prepped with the right data when you need it.

kdar commented 7 years ago

Doesn't the ignore package of ripgrep accomplish this or am I missing something? Although I would find it strange to use the ignore package for parallel walkdir.

BurntSushi commented 7 years ago

The ignore package does have a recursive parallel directory iterator, but the API is strange and it doesn't use rayon.

BurntSushi commented 6 years ago

I am going to close this. I don't think anyone has any concrete ideas on how to use rayon for this, and even if someone did (or some other parallel implementation), I think it would be best prototyped in a separate crate.

the8472 commented 5 years ago

Parallel traversal wouldn't just benefit HDDs, on one NFS system it takes me ~16s just to traverse a mere 1000 directories with 40000 files. The main cost there is the opendir latency. The filesystem can sustain a decent amount of IOPS and throughput but it is terrible in latency compared to local NVMe drives. Running it with 32 threads allows me to do the same task in less than a second.

Also note that internally performing the work in parallel and exposing parallel result iteration via rayon APIs are somewhat orthogonal. In principle the former could even be done with zero API changes by coordinating filter_entry calls across threads, although thread-safe predicates would be more efficient.

dpc commented 2 years ago

The issue that remains to be solved is how to efficiently sort while working in parallel.

I hope I'm not completely out of context here and not saying something irrelevant, but I remember asking about having rayon parallel iterator preserving the order, and it wasn't available so I ended up implementing this manually myself in one of my projects.

The idea is to dispatch work to parallel workers with sequential ID, and then assemble it back when converting to a sorted iter, keeping out_of_order items around on the receiving end until they are needed. Theoretically this can lead to unbounded memory consumption, but in practice - if you dispatch stuff in order, and your processing time is even roughly uniform, the result will arrive roughly in order. It wouldn't be difficult to implement a gating of dispatching to workers based on some shared atomic, to completely prevent possibility of memory explosion, though I never bothered.

the8472 commented 2 years ago

For some uses only parallelizing when no sorting function is passed and operating in single-threaded mode when sorting is requested would also be acceptable.

dpc commented 2 years ago

Parallel processing of entries returned by walkdir preserving order should be possible with https://docs.rs/dpc-pariter/

MilesCranmer commented 4 months ago

Found this on Google and with some more effort I found an alternative. Pasting here for others: https://github.com/Byron/jwalk.