JuliaParallel / Dagger.jl

A framework for out-of-core and parallel execution
Other
629 stars 67 forks source link

Which is the analogue command of Distributed.pmap? #426

Closed martinmestre closed 1 year ago

martinmestre commented 1 year ago

Hi, thanks for this package. Besides my title question, could you explain the difference between Dagger's version of pmap and Dagger.@spawn ? I want to run distributed, not multithreading. Thanks

jpsamaroo commented 1 year ago

There are a variety of ways to achieve the same goals as pmap, but the first is to just map over your collection, using fetch to materialize the results:

A = rand(100:200, 1000)
B = fetch.(map(A) do a
    arr = Dagger.@spawn rand(a, a)
    return Dagger.@spawn sum(arr)
end)

This works better when you want to do a lot of work for each element in your collection (so the example above is a case where you'd want to take this approach). Another alternative, which is closer to how pmap works, is partitioning your input manually:

A = rand(100_000)
partsize = 100
B = collect(Iterators.flatten(fetch.(map(Iterators.partition(A, partsize)) do part
       Dagger.@spawn map(x->x*2, part)
end)))

Here you want to do something cheap (x->x*2) for each element, so you can tune partsize to whatever metric you desire (maybe length(A) / length(workers()) to get one evenly-sized partition for each worker?). In this way, you're fully in control of the amount of parallelism you get and the granularity of each partition.

I want to run distributed, not multithreading.

This is more difficult - if you just want one CPU core per worker, then just don't start your workers with multiple threads. If that isn't sufficient, you can tell Dagger to force all computations to happen on only one thread of each worker:

Dagger.with_options(;scope=Dagger.scope(thread=1)) do
     @sync for i in 1:10
         Dagger.spawn() do
             println("Hello from worker ", myid(), ", thread ", Threads.threadid())
         end
     end
end

where I get:

      From worker 2:    Hello from worker 2, thread 1
      From worker 3:    Hello from worker 3, thread 1
      From worker 3:    Hello from worker 3, thread 1
      From worker 3:    Hello from worker 3, thread 1
      From worker 3:    Hello from worker 3, thread 1
      From worker 2:    Hello from worker 2, thread 1
      From worker 3:    Hello from worker 3, thread 1
      From worker 2:    Hello from worker 2, thread 1
      From worker 2:    Hello from worker 2, thread 1
      From worker 2:    Hello from worker 2, thread 1
jpsamaroo commented 1 year ago

Closing for now, but feel free to reopen if you don't feel like this answer is sufficient for your needs :slightly_smiling_face:

martinmestre commented 1 year ago

Thanks, my work case is of heavy tasks, so your first example would be good where the granularity is the minimum. I do not understand where in this example you are telling to use multithread. Which is the modifcation of the first example needed to use distributed. I am starting with parallel computing, when I have a node with 4 cores with two threads each, should I send 8 workers in multithreading or 8 in distributed? If I have 2 nodes, can I use distributed combined with multithreading and the code knows how to perform? Thank you very much.

jpsamaroo commented 1 year ago

I do not understand where in this example you are telling to use multithread.

I wasn't recommending to use multithreading, as requested when you said:

I want to run distributed, not multithreading.

The examples above work only with distributed if you don't start Julia with extra threads, but they may use multithreading if you do start Julia with multiple threads (Dagger automatically parallelizes tasks, just like Julia does with Threads.@spawn).

when I have a node with 4 cores with two threads each, should I send 8 workers in multithreading or 8 in distributed

Depends on your use case, but I would personally use 8 threads. The first two examples I provided should work fine for this.