JuliaParallel / Dagger.jl

A framework for out-of-core and parallel execution
Other
638 stars 67 forks source link

Various errors working with DTables.jl #438

Open StevenWhitaker opened 1 year ago

StevenWhitaker commented 1 year ago

I tried to create a MWE that was closer to the actual workflow I'm working with. I'm guessing the errors occurring here are related to #437 (one of the four reported errors below is the same as the linked issue). I hope this is helpful and not just extra noise!

Contents of mwe.jl:

using Distributed
nworkers = 1
addprocs(nworkers - nprocs() + 1)

@everywhere using DTables, DataFrames, CSV
@everywhere job_channel = Channel(100)

remotecall(2) do
    while true
        job = take!(job_channel)
        try
            func = job[1]
            args = job[2:end]
            func(args...)
        catch ex
            @info "error $ex"
            @info "stacktrace: $(stacktrace(catch_backtrace()))"
        end
    end
end

remotecall_fetch(2) do
    dt = DTable(x -> CSV.File(x), ["file.csv"]; tabletype = DataFrame)
    df = fetch(dt)
    cols1 = [df[!, c] for c in 1:48]
    cols2 = [df[!, c] for c in 49:102]
    cols = (cols1, cols2)
    cols_appended = (cols1, (cols2..., rand(length(cols2[1]))))
    df = DataFrame(
        (names(df)[1:48] .=> cols_appended[1])...,
        ((names(df)[49:102]..., "appended") .=> cols_appended[2])...;
        copycols = false,
    )
    dt = DTable(df)
    @info "$(length(dt))"
    @info "$(length(dt))"
    df = fetch(dt)
    cols1 = [df[!, c] for c in 1:48]
    cols2 = [df[!, c] for c in 49:102]
    cols = (cols1, cols2)
    df = fetch(dt)
    foreach((:new1, :new2), (rand(length(dt)), rand(length(dt)))) do name, val
        setproperty!(df, name, val)
    end
    dt = DTable(df)
    i = [6, 12, 48, 93, 94]
    dt = select(dt, i...; copycols = false)
    gdt = groupby(dt, Symbol.(names(df)[[6, 12, 48]]))
    gkeys = sort!(collect(keys(gdt)))
    sums = map(gkeys) do key
        reduce(+, gdt[key]; cols = Symbol.(names(df)[[93, 94]]))
    end .|> fetch
end

I included mwe.jl in a fresh Julia session multiple times (meaning each include occurred in its own fresh Julia session) and recorded the following errors. Note that nothing changed in mwe.jl from run to run.

Error 1:

julia> include("mwe.jl")
      From worker 2:    [ Info: 233930
      From worker 2:    [ Info: 233930
ERROR: LoadError: On worker 2:
MethodError: Cannot `convert` an object of type
  Vector{Any} to an object of type
  Union{Dagger.Thunk, Dagger.Chunk}

Closest candidates are:
  convert(::Type{T}, ::T) where T
   @ Base Base.jl:64

Stacktrace:
  [1] get!
    @ ./dict.jl:455
  [2] reschedule_syncdeps!
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:116
  [3] reschedule_syncdeps!
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:100 [inlined]
  [4] #eager_submit_internal!#96
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:93
  [5] eager_submit_internal!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:11 [inlined]
  [6] eager_submit_internal!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:9
  [7] #invokelatest#2
    @ ./essentials.jl:819
  [8] invokelatest
    @ ./essentials.jl:816
  [9] #110
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
 [10] run_work_thunk
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
 [11] macro expansion
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
 [12] #109
    @ ./task.jl:514
Stacktrace:
  [1] #remotecall_fetch#159
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
  [2] remotecall_fetch
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
  [3] #remotecall_fetch#162
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
  [4] remotecall_fetch
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
  [5] eager_submit!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:128
  [6] eager_launch!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:195
  [7] enqueue!
    @ ~/.julia/packages/Dagger/ZOt9H/src/queue.jl:12 [inlined]
  [8] #spawn#88
    @ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:322
  [9] spawn
    @ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:286 [inlined]
 [10] #39
    @ ~/.julia/packages/DTables/bA4g3/src/operations/operations.jl:35 [inlined]
 [11] iterate
    @ ./generator.jl:47 [inlined]
 [12] _collect
    @ ./array.jl:802
 [13] collect_similar
    @ ./array.jl:711
 [14] map
    @ ./abstractarray.jl:3263
 [15] map
    @ ~/.julia/packages/DTables/bA4g3/src/operations/operations.jl:35
 [16] _manipulate
    @ ~/.julia/packages/DTables/bA4g3/src/operations/dataframes_interface.jl:89
 [17] #manipulate#247
    @ ~/.julia/packages/DTables/bA4g3/src/operations/dataframes_interface.jl:48
 [18] #select#258
    @ ~/.julia/packages/DTables/bA4g3/src/operations/dataframes_interface.jl:171
 [19] #7
    @ ~/tmp/mwe.jl:47
 [20] #invokelatest#2
    @ ./essentials.jl:819 [inlined]
 [21] invokelatest
    @ ./essentials.jl:816
 [22] #110
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
 [23] run_work_thunk
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
 [24] macro expansion
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
 [25] #109
    @ ./task.jl:514
Stacktrace:
 [1] remotecall_fetch(::Function, ::Distributed.Worker; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
 [2] remotecall_fetch(::Function, ::Distributed.Worker)
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
 [3] #remotecall_fetch#162
   @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
 [4] remotecall_fetch(::Function, ::Int64)
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
 [5] top-level scope
   @ ~/tmp/mwe.jl:22
 [6] include(fname::String)
   @ Base.MainInclude ./client.jl:478
 [7] top-level scope
   @ REPL[1]:1
in expression starting at /home/steven/tmp/mwe.jl:22

Error 2:

julia> include("mwe.jl")
      From worker 2:    [ Info: 233930
      From worker 2:    [ Info: 233930
ERROR: LoadError: On worker 2:
UndefRefError: access to undefined reference
Stacktrace:
  [1] getindex
    @ ./essentials.jl:13 [inlined]
  [2] get!
    @ ./dict.jl:465
  [3] reschedule_syncdeps!
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:116
  [4] reschedule_syncdeps!
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:100 [inlined]
  [5] #eager_submit_internal!#96
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:93
  [6] eager_submit_internal!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:11 [inlined]
  [7] eager_submit_internal!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:9
  [8] #invokelatest#2
    @ ./essentials.jl:819
  [9] invokelatest
    @ ./essentials.jl:816
 [10] #110
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
 [11] run_work_thunk
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
 [12] macro expansion
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
 [13] #109
    @ ./task.jl:514
Stacktrace:
  [1] #remotecall_fetch#159
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
  [2] remotecall_fetch
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
  [3] #remotecall_fetch#162
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
  [4] remotecall_fetch
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
  [5] eager_submit!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:128
  [6] eager_launch!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:195
  [7] enqueue!
    @ ~/.julia/packages/Dagger/ZOt9H/src/queue.jl:12 [inlined]
  [8] #spawn#88
    @ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:322
  [9] spawn
    @ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:286 [inlined]
 [10] #15
    @ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:401 [inlined]
 [11] iterate
    @ ./generator.jl:47 [inlined]
 [12] collect
    @ ./array.jl:782
 [13] chunk_lengths
    @ ~/.julia/packages/DTables/bA4g3/src/table/dtable.jl:254
 [14] length
    @ ~/.julia/packages/DTables/bA4g3/src/table/dtable.jl:258
 [15] #7
    @ ~/tmp/mwe.jl:42
 [16] #invokelatest#2
    @ ./essentials.jl:819 [inlined]
 [17] invokelatest
    @ ./essentials.jl:816
 [18] #110
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
 [19] run_work_thunk
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
 [20] macro expansion
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
 [21] #109
    @ ./task.jl:514
Stacktrace:
 [1] remotecall_fetch(::Function, ::Distributed.Worker; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
 [2] remotecall_fetch(::Function, ::Distributed.Worker)
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
 [3] #remotecall_fetch#162
   @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
 [4] remotecall_fetch(::Function, ::Int64)
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
 [5] top-level scope
   @ ~/tmp/mwe.jl:22
 [6] include(fname::String)
   @ Base.MainInclude ./client.jl:478
 [7] top-level scope
   @ REPL[1]:1
in expression starting at /home/steven/tmp/mwe.jl:22

Error 3:

julia> include("mwe.jl")
      From worker 2:    [ Info: 233930
      From worker 2:    [ Info: 233930

[7463] signal (11.1): Segmentation fault
in expression starting at /home/steven/tmp/mwe.jl:22
jl_object_id__cold at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/builtins.c:417
type_hash at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/jltypes.c:1332
typekey_hash at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/jltypes.c:1344
jl_precompute_memoized_dt at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/jltypes.c:1409
inst_datatype_inner at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/jltypes.c:1731
jl_inst_arg_tuple_type at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/jltypes.c:1826
arg_type_tuple at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2100 [inlined]
jl_lookup_generic_ at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2884 [inlined]
ijl_apply_generic at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2936
collect_task_inputs at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:392
signature at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:256
#99 at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:680
lock at ./lock.jl:229
schedule! at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:642 [inlined]
schedule! at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:642 [inlined]
scheduler_run at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:508
#compute_dag#82 at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:449
compute_dag at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:414 [inlined]
#compute#141 at /home/steven/.julia/packages/Dagger/ZOt9H/src/compute.jl:23
compute at /home/steven/.julia/packages/Dagger/ZOt9H/src/compute.jl:22 [inlined]
macro expansion at /home/steven/.julia/packages/Dagger/ZOt9H/src/sch/eager.jl:28 [inlined]
#50 at ./threadingconstructs.jl:410
unknown function (ip: 0x7efbf8213f8f)
_jl_invoke at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2758 [inlined]
ijl_apply_generic at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/gf.c:2940
jl_apply at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/julia.h:1880 [inlined]
start_task at /cache/build/default-amdci5-5/julialang/julia-release-1-dot-9/src/task.c:1092
Allocations: 34071935 (Pool: 34043867; Big: 28068); GC: 39
zsh: segmentation fault  julia --project

Error 3b: Occasionally the segfault was preceded by one or more occurrences of:

Unhandled Task ERROR: ArgumentError: destination has fewer elements than required
Stacktrace:
 [1] copyto!(dest::Vector{Dagger.Sch.ProcessorState}, src::Base.ValueIterator{Dict{Dagger.Processor, Dagger.Sch.ProcessorState}})
   @ Base ./abstractarray.jl:949
 [2] _collect
   @ ./array.jl:713 [inlined]
 [3] collect
   @ ./array.jl:707 [inlined]
 [4] macro expansion
   @ ~/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:1189 [inlined]
 [5] (::Dagger.Sch.var"#126#133"{Dagger.Sch.ProcessorInternalState, UInt64, RemoteChannel{Channel{Any}}, Dagger.ThreadProc})()
   @ Dagger.Sch ./task.jl:134

Error 4:

julia> include("mwe.jl")
      From worker 2:    [ Info: 233930
      From worker 2:    [ Info: 233930
ERROR: LoadError: On worker 2:
AssertionError: Multiple concurrent writes to Dict detected!
Stacktrace:
  [1] rehash!
    @ ./dict.jl:208
  [2] _setindex!
    @ ./dict.jl:355 [inlined]
  [3] get!
    @ ./dict.jl:477
  [4] reschedule_syncdeps!
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:116
  [5] reschedule_syncdeps!
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:100 [inlined]
  [6] #eager_submit_internal!#96
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:93
  [7] eager_submit_internal!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:11 [inlined]
  [8] eager_submit_internal!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:9
  [9] #invokelatest#2
    @ ./essentials.jl:819
 [10] invokelatest
    @ ./essentials.jl:816
 [11] #110
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
 [12] run_work_thunk
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
 [13] macro expansion
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
 [14] #109
    @ ./task.jl:514
Stacktrace:
  [1] #remotecall_fetch#159
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
  [2] remotecall_fetch
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
  [3] #remotecall_fetch#162
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
  [4] remotecall_fetch
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
  [5] eager_submit!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:128
  [6] eager_launch!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:195
  [7] enqueue!
    @ ~/.julia/packages/Dagger/ZOt9H/src/queue.jl:12 [inlined]
  [8] #spawn#88
    @ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:322
  [9] spawn
    @ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:286 [inlined]
 [10] #48
    @ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:401 [inlined]
 [11] iterate
    @ ./generator.jl:47 [inlined]
 [12] collect_to!
    @ ./array.jl:840 [inlined]
 [13] collect_to_with_first!
    @ ./array.jl:818 [inlined]
 [14] collect
    @ ./array.jl:792
 [15] #reduce#42
    @ ~/.julia/packages/DTables/bA4g3/src/operations/operations.jl:111
 [16] #14
    @ ~/tmp/mwe.jl:51
 [17] iterate
    @ ./generator.jl:47 [inlined]
 [18] collect_to!
    @ ./array.jl:840 [inlined]
 [19] collect_to_with_first!
    @ ./array.jl:818 [inlined]
 [20] _collect
    @ ./array.jl:812
 [21] collect_similar
    @ ./array.jl:711
 [22] map
    @ ./abstractarray.jl:3263
 [23] #7
    @ ~/tmp/mwe.jl:50
 [24] #invokelatest#2
    @ ./essentials.jl:819 [inlined]
 [25] invokelatest
    @ ./essentials.jl:816
 [26] #110
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
 [27] run_work_thunk
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
 [28] macro expansion
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
 [29] #109
    @ ./task.jl:514
Stacktrace:
 [1] remotecall_fetch(::Function, ::Distributed.Worker; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
 [2] remotecall_fetch(::Function, ::Distributed.Worker)
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
 [3] #remotecall_fetch#162
   @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
 [4] remotecall_fetch(::Function, ::Int64)
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
 [5] top-level scope
   @ ~/tmp/mwe.jl:22
 [6] include(fname::String)
   @ Base.MainInclude ./client.jl:478
 [7] top-level scope
   @ REPL[1]:1
in expression starting at /home/steven/tmp/mwe.jl:22

Comments:

jpsamaroo commented 1 year ago

@StevenWhitaker can you try reproducing these again on Dagger master?

StevenWhitaker commented 1 year ago

Thanks for getting a patch released!

The issues are different now, so that's something ;)

Now I observe the following behavior (EDIT: when running Julia with multiple threads):

I realized that I start Julia with multiple threads by default, so I also ran the code with a single thread (julia --project -t1). In this case, I saw the Unhandled Task ERROR once (incidentally, the first time), and every time I ran the code (including the first time) it ran to completion.

So, besides the one sporadic error, this issue seems to be addressed, assuming the issues I observed with multiple threads are due to the interplay between Distributed and Threads.

StevenWhitaker commented 1 year ago

Edit to my previous comment:

I'm running my actual code with a single thread now, and it also hangs, so there might be something else still at play.

jpsamaroo commented 1 year ago

I can reproduce the hangs - I'll keep investigating! Thanks for your patience :slightly_smiling_face:

jpsamaroo commented 1 year ago

Running through your example with Dagger's logging enabled, I find that we spend a good bit of time (about 0.3-0.5 s for me) in the reduce calls at the end, which are running in serial over 233K keys - at this pace, I can see why it looks like it's hanging :laughing:

A large portion of the time is spent in the GC (about 40% time over ~80K allocations totaling ~500MB), so I suspect allocations are what's killing performance. If I can figure out how to reduce those allocations, it would also be reasonable to parallelize the reduce calls (by doing two maps, one to launch a task per key, and one to fetch the results), and that should give us much better runtimes.

Additionally, the other calls that took a while are select and groupby, so we could probably look into improving those a bit.

EDIT: Those timings and allocations are so high because of logging - they drop significantly when logging is disabled, although then I see a ton of long-lived allocations that threaten to crash Julia. I still need to see if some of those allocations can be reduced.

EDIT 2: Silly me, these reductions are already asynchronous :smile: I guess the task completes before we return from reduce anyway, since we're only running with 1 thread.

jpsamaroo commented 1 year ago

Ok, something that I would recommend is, instead of the map -> reduce pattern, just use a single reduce call: reduce(+, gdt; cols=Symbol.(names(df)[[93,94]])). This appears to be much more memory and time efficient, which makes sense because it can internally do more optimizations (it already knows that you intend to reduce over each key in the group).

Can you test that and confirm whether it speeds your script up sufficiently for it to complete in a reasonable amount of time?

StevenWhitaker commented 1 year ago

Thanks for the tip. I tried it out on my actual project (not the exact example in the OP), and it does seem to help, but I still see the code hang occasionally. I'm pretty sure it's not just taking forever, because when the code does complete, it doesn't take that long, and when it hangs the cpu utilization drops to 0.

It actually seems to be the case that my code hangs only when calling my main function again after a successful run. Or at least the chances of hanging are higher in that case. I'm not really sure why that would be the case, though.

I also saw a new error (when calling fetch on a DTable, with Dagger v0.18.4 and DTables v0.4.2):

Dagger.ThunkFailedException{Dagger.ThunkFailedException{CapturedException}}(Thunk[3](isnonempty, Any[Thunk[2](_file_load, Any["path/to/file.csv", NRBS.var"#1#2"(), DataFrames.DataFrame])]), Thunk[2](_file_load, Any["path/to/file.csv", NRBS.var"#1#2"(), DataFrames.DataFrame]), Dagger.ThunkFailedException{CapturedException}(Thunk[2](_file_load, Any["path/to/file.csv", NRBS.var"#1#2"(), DataFrames.DataFrame]), Thunk[2](_file_load, Any["path/to/file.csv", NRBS.var"#1#2"(), DataFrames.DataFrame]), CapturedException(UndefRefError(), Any[(getindex at essentials.jl:13 [inlined], 1), (get! at dict.jl:465, 1), (OSProc at processor.jl:109 [inlined], 2), (do_task at Sch.jl:1368, 1), (macro expansion at Sch.jl:1243 [inlined], 1), (#132 at task.jl:134, 1)])))

It looks like it has to do with file loading, so this is the code I use to load .csv files:

DTable(x -> CSV.File(x), [filepath]; tabletype = DataFrame)

I only saw the error once, though.

And another one-time error (in the function with the reduce call):

      From worker 4:    ┌ 2023-10-24T13:00:07.238 ] pid: 20516 proc: 4 Error:  Error on 4 while connecting to peer 3, exiting
      From worker 4:    │   exception =
      From worker 4:    │    ConcurrencyViolationError("lock must be held")
      From worker 4:    │    Stacktrace:
      From worker 4:    │      [1] concurrency_violation()
      From worker 4:    │        @ Base ./condition.jl:8
      From worker 4:    │      [2] assert_havelock
      From worker 4:    │        @ ./condition.jl:25 [inlined]
      From worker 4:    │      [3] assert_havelock
      From worker 4:    │        @ ./condition.jl:48 [inlined]
      From worker 4:    │      [4] assert_havelock
      From worker 4:    │        @ ./condition.jl:72 [inlined]
      From worker 4:    │      [5] notify(c::Condition, arg::Any, all::Bool, error::Bool)
      From worker 4:    │        @ Base ./condition.jl:150
      From worker 4:    │      [6] #notify#622
      From worker 4:    │        @ ./condition.jl:148 [inlined]
      From worker 4:    │      [7] notify (repeats 2 times)
      From worker 4:    │        @ ./condition.jl:148 [inlined]
      From worker 4:    │      [8] set_worker_state
      From worker 4:    │        @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:148 [inlined]
      From worker 4:    │      [9] Distributed.Worker(id::Int, r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, manager::Distributed.DefaultClusterManager; version::Nothing, config::WorkerConfig)
      From worker 4:    │        @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:126
      From worker 4:    │     [10] Worker
      From worker 4:    │        @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:116 [inlined]
      From worker 4:    │     [11] connect_to_peer(manager::Distributed.DefaultClusterManager, rpid::Int, wconfig::WorkerConfig)
      From worker 4:    │        @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:363
      From worker 4:    │     [12] (::Distributed.var"#121#123"{Int, WorkerConfig})()
      From worker 4:    │        @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:349
      From worker 4:    │     [13] exec_conn_func(w::Distributed.Worker)
      From worker 4:    │        @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:181
      From worker 4:    │     [14] (::Distributed.var"#21#24"{Distributed.Worker})()
      From worker 4:    └        @ Distributed ./task.jl:514

The above errors occurred when calling my main function the first time.