JuliaParallel / DTables.jl

Distributed table structures and data manipulation operations built on top of Dagger.jl
MIT License
73 stars 4 forks source link

How to ensure `DTable`s stay on one process #58

Open StevenWhitaker opened 11 months ago

StevenWhitaker commented 11 months ago

I would like to use DTables.jl where each DTable or GDTable exists on exactly one process and does not migrate (so, effectively using DTables.jl just for its out-of-core processing capabilities). It looks like Dagger.@spawn has some support for ensuring a task executes on a given process. Does DTables.jl support the scope kwarg of Dagger.@spawn? And is what I want to do possible/feasible?

StevenWhitaker commented 11 months ago

I tried using Dagger.with_options to keep all GDTable chunks on a single process, but it doesn't look like it's working quite right:

julia> using Distributed; addprocs(1); @everywhere using Dagger, DTables, DataFrames

julia> Dagger.with_options(; scope = ProcessScope(myid())) do
           dt = DTable(DataFrame(a = rand(1:5, 100), b = 1:100))
           gdt = groupby(dt, :a)
           map(c -> (c.scope, c.processor), gdt.dtable.chunks)
       end
5-element Vector{Tuple{AnyScope, OSProc}}:
 (AnyScope(), OSProc(1))
 (AnyScope(), OSProc(1))
 (AnyScope(), OSProc(1))
 (AnyScope(), OSProc(1))
 (AnyScope(), OSProc(1))

Any tips? I guess it appears all the chunks are on the correct process, but since the scope isn't what I set it to be, could the chunks be migrated to another process?

krynju commented 11 months ago

@jpsamaroo is this expected or is it a bug? Seems like the result chunk doesn't inherit options Options get passed into the task scope properly. Haven't seen options of results tested anywhere in tests

julia> t = Dagger.with_options(; scope=ProcessScope(1)) do
           Dagger.spawn(Dagger.get_options)
       end
EagerThunk (finished)

julia> fetch(t.future.future)[2].scope
AnyScope()

julia> fetch(t)
(scope = ProcessScope: worker == 1,)