Open StevenWhitaker opened 11 months ago
Do you have a reproducer for this one, just to help me debug it reliably?
I am working on a better reproducer, but I believe the behavior I pointed out in JuliaParallel/Dagger.jl#445 (related to JuliaParallel/Dagger.jl#438) is essentially the same I am reporting here---all those CPURAMDevices
needed to be evicted when Julia closed because they were not being evicted earlier. It's just that before I wasn't running my code enough times to get to the point where the OOM manager killed one of my processes.
@jpsamaroo Here's a MWE that shows the ever-growing memory utilization. It took me running main
about 120 times to get the OOM manager to kill worker 2 (I have 32 GB RAM). Let me know if you need data as well (i.e., "file.csv"
).
using Distributed
addprocs(5 - nprocs(); exeflags = "--heap-size-hint=3G")
@everywhere using DTables, DataFrames, CSV
@everywhere const DT = Ref{DTable}()
@everywhere mutable struct DTableCols
key_names
value_names
keys
values
end
function main()
remotecall_fetch(query, 2)
end
@everywhere function query()
dt1 = load_dt()
dt2 = add_value_col!(dt1)
dt3 = update_value_col!(dt2)
@info "" length(dt3)
dt4 = calc_value_cols(dt3)
dt5 = select(dt4, [6; 12; 103:113]...; copycols = false)
dt_agg = aggregate_dt(dt5)
return fetch(dt_agg)
end
@everywhere function load_dt()
isassigned(DT) && return DT[]
file = "file.csv"
GC.enable(false)
dt = DTable(x -> CSV.File(x), [file]; tabletype = DataFrame)
GC.enable(true)
DT[] = dt
return dt
end
@everywhere function add_value_col!(dt)
dt_cols = create_dt_cols(dt, 1:48, 49:102)
dt_cols.value_names = [dt_cols.value_names; "RAND"]
dt_cols.values = (dt_cols.values..., rand(length(dt_cols.values[1])))
return create_dt_from_cols(dt_cols; is_sorted = true)
end
@everywhere function create_dt_cols(dt, key_cols, value_cols)
df = fetch(dt)
key_names = names(df)[key_cols]
value_names = names(df)[value_cols]
keys = [df[!, i] for i in key_cols]
values = [df[!, i] for i in value_cols]
return DTableCols(key_names, value_names, keys, values)
end
@everywhere function create_dt_from_cols(dt_cols; is_sorted = false)
df = DataFrame(
(dt_cols.key_names .=> dt_cols.keys)...,
(dt_cols.value_names .=> dt_cols.values)...;
copycols = false,
)
is_sorted || sort!(df)
return DTable(df)
end
@everywhere function update_value_col!(dt)
dt_cols = create_dt_cols(dt, 1:48, 49:103)
dt_cols.values = (
dt_cols.values[1:10]...,
rand(length(dt_cols.values[1])),
dt_cols.values[12:end]...,
)
return create_dt_from_cols(dt_cols; is_sorted = true)
end
@everywhere function calc_value_cols(dt)
newvals = Vector{Float64}[]
for i = 1:10
v = calc_new_value(dt, i)
push!(newvals, v)
end
return append_value_cols(dt, newvals)
end
@everywhere function calc_new_value(dt, i)
dt_cols = create_dt_cols(dt, 1:48, 49:103)
return abs.(dt_cols.values[i])
end
@everywhere function append_value_cols(dt, newvals)
df = fetch(dt)
for (i, v) in enumerate(newvals)
setproperty!(df, "NEW$i", v)
end
return DTable(df)
end
@everywhere function aggregate_dt(dt)
key_names = [Symbol("6"), Symbol("12")]
gdt = groupby(fetch(dt), key_names)
gkeys = sort!(collect(keys(gdt)))
key_pairs = key_names .=> invert(gkeys)
value_names = [[Symbol("RAND")]; Symbol.("NEW", 1:10)]
sums = fetch(reduce(+, gdt; cols = value_names))
sorted = sortperm(invert(sums[key_names]))
value_pairs = map(value_names) do value
value => sums[Symbol(:result_, value)][sorted]
end
return DTable(DataFrame(key_pairs..., value_pairs...))
end
@everywhere invert(x) = [[x[j][i] for j = 1:length(x)] for i = 1:length(x[1])]
@everywhere function Base.reduce(f, df::DataFrames.AbstractDataFrame; cols)
NamedTuple(col => reduce(f, df[!, col]) for col in cols)
end
@everywhere function Base.reduce(f, gdt::DataFrames.GroupedDataFrame; cols)
gkeys = keys(gdt)
dims = keys(gkeys[1])
merge(
NamedTuple(dim => getproperty.(gkeys, dim) for dim in dims),
NamedTuple(
Symbol(:result_, col) => [reduce(f, gdt[k]; cols = [col])[col] for k in gkeys]
for col in cols
),
)
end
One thing I remembered was that when I was benchmarking dtables.jl around release time I had a really bad time with running it in wsl2. I would barely get to a quarter of the table size which I could run successfully on Linux due to the weird memory management wsl does
Let's keep this in mind when looking at this, Linux will behave differently for sure. I'll try to have a look at it this week
I just ran the exact same code as in #61 (which in turn is the same as the MWE above but with a call to enable_disk_caching!(50, 2^10 * 20)
). This time, instead of a BoundsError
, I got AssertionError: Failed to migrate 183.839 MiB for ref 1624
:
julia> include("mwe.jl"); for i = 1:100 (i % 10 == 0 && @show(i)); main() end
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
i = 10
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
i = 20
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
i = 30
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
ERROR: On worker 2:
AssertionError: Failed to migrate 183.839 MiB for ref 1624
Stacktrace:
[1] #105
@ ~/.julia/packages/MemPool/l9nLj/src/storage.jl:887
[2] with_lock
@ ~/.julia/packages/MemPool/l9nLj/src/lock.jl:80
[3] #sra_migrate!#103
@ ~/.julia/packages/MemPool/l9nLj/src/storage.jl:849
[4] sra_migrate!
@ ~/.julia/packages/MemPool/l9nLj/src/storage.jl:826 [inlined]
[5] write_to_device!
@ ~/.julia/packages/MemPool/l9nLj/src/storage.jl:817
[6] #poolset#160
@ ~/.julia/packages/MemPool/l9nLj/src/datastore.jl:386
[7] #tochunk#139
@ ~/.julia/packages/Dagger/M13n0/src/chunks.jl:267
[8] tochunk (repeats 2 times)
@ ~/.julia/packages/Dagger/M13n0/src/chunks.jl:259 [inlined]
[9] #DTable#1
@ ~/.julia/packages/DTables/BjdY2/src/table/dtable.jl:38
[10] DTable
@ ~/.julia/packages/DTables/BjdY2/src/table/dtable.jl:28
[11] #create_dt_from_cols#9
@ ~/tmp/mwe.jl:76
[12] create_dt_from_cols
@ ~/tmp/mwe.jl:68 [inlined]
[13] add_value_col!
@ ~/tmp/mwe.jl:53
[14] query
@ ~/tmp/mwe.jl:26
[15] #invokelatest#2
@ ./essentials.jl:819 [inlined]
[16] invokelatest
@ ./essentials.jl:816
[17] #110
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
[18] run_work_thunk
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
[19] macro expansion
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
[20] #109
@ ./task.jl:514
Stacktrace:
[1] remotecall_fetch(::Function, ::Distributed.Worker; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
@ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
[2] remotecall_fetch(::Function, ::Distributed.Worker)
@ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
[3] #remotecall_fetch#162
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
[4] remotecall_fetch
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
[5] main
@ ~/tmp/mwe.jl:19 [inlined]
[6] top-level scope
@ ./REPL[1]:1
I wonder if this is essentially the same issue as the OP, where data is being kept longer than it should. Just in this case, instead of a process getting killed, MemPool errors because we end up exceeding the 20 GB of disk space I said MemPool could use. If it is the same issue, then WSL 2 memory management shouldn't have anything to do with this.
I ran the MWE (with and without enabling disk caching) on Windows (not WSL 2).
main
about 200 times and never ran into any memory issues.enable_disk_caching!(50, 2^10 * 20)
: I could not replicate the AssertionError: Failed to migrate
, but I did always get the BoundsError
mentioned in #61.So there definitely is a difference in behavior between WSL 2 and Windows.
I did get it reproduced twice with the MemPool fix from the other issue 5 processes, no threads
julia> d = DTable((a=rand(Int, N1),), N1 ÷ 100)
ERROR: AssertionError: Failed to migrate 10.240 MiB for ref 5646
Stacktrace:
[1] (::MemPool.var"#105#113"{Bool, MemPool.SimpleRecencyAllocator, MemPool.RefState, Int64})()
@ MemPool C:\Users\krynjupc\.julia\dev\MemPool\src\storage.jl:920
[2] with_lock(f::MemPool.var"#105#113"{Bool, MemPool.SimpleRecencyAllocator, MemPool.RefState, Int64}, lock::MemPool.NonReentrantLock, cond::Bool)
@ MemPool C:\Users\krynjupc\.julia\dev\MemPool\src\lock.jl:80
[3] sra_migrate!(sra::MemPool.SimpleRecencyAllocator, state::MemPool.RefState, ref_id::Int64, to_mem::Missing; read::Bool, locked::Bool)
@ MemPool C:\Users\krynjupc\.julia\dev\MemPool\src\storage.jl:882
[4] sra_migrate!(sra::MemPool.SimpleRecencyAllocator, state::MemPool.RefState, ref_id::Int64, to_mem::Missing)
@ MemPool C:\Users\krynjupc\.julia\dev\MemPool\src\storage.jl:859 [inlined]
[5] write_to_device!(sra::MemPool.SimpleRecencyAllocator, state::MemPool.RefState, ref_id::Int64)
@ MemPool C:\Users\krynjupc\.julia\dev\MemPool\src\storage.jl:850
[6]
@ MemPool C:\Users\krynjupc\.julia\dev\MemPool\src\datastore.jl:386
[7] tochunk(x::@NamedTuple{a::Vector{Int64}}, proc::OSProc, scope::AnyScope; persist::Bool, cache::Bool, device::Nothing, kwargs::@Kwargs{})
@ Dagger C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\chunks.jl:267
[8] tochunk
@ Dagger C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\chunks.jl:259 [inlined]
[9] tochunk(x::@NamedTuple{a::Vector{Int64}})
@ Dagger C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\chunks.jl:259
[10] DTable(table::@NamedTuple{a::Vector{Int64}}, chunksize::Int64; tabletype::Nothing, interpartition_merges::Bool)
@ DTables C:\Users\krynjupc\.julia\packages\DTables\BjdY2\src\table\dtable.jl:122
[11] DTable(table::@NamedTuple{a::Vector{Int64}}, chunksize::Int64)
@ DTables C:\Users\krynjupc\.julia\packages\DTables\BjdY2\src\table\dtable.jl:61
[12] top-level scope
@ REPL[56]:1
Some type information was truncated. Use `show(err)` to see complete types.
julia> map(x -> (r=x.a + 1,), d) |> fetch
ERROR: ThunkFailedException:
Root Exception Type: CapturedException
Root Exception:
AssertionError: Failed to migrate 10.240 MiB for ref 5051
Stacktrace:
[1] #105
@ C:\Users\krynjupc\.julia\dev\MemPool\src\storage.jl:920
[2] with_lock
@ C:\Users\krynjupc\.julia\dev\MemPool\src\lock.jl:83
[3] #sra_migrate!#103
@ C:\Users\krynjupc\.julia\dev\MemPool\src\storage.jl:882
[4] #120
@ C:\Users\krynjupc\.julia\dev\MemPool\src\storage.jl:1001
[5] with_lock
@ C:\Users\krynjupc\.julia\dev\MemPool\src\lock.jl:80
[6] with_lock
@ C:\Users\krynjupc\.julia\dev\MemPool\src\lock.jl:78
[7] read_from_device
@ C:\Users\krynjupc\.julia\dev\MemPool\src\storage.jl:991 [inlined]
[8] _getlocal
@ C:\Users\krynjupc\.julia\dev\MemPool\src\datastore.jl:433
[9] #174
@ C:\Users\krynjupc\.julia\dev\MemPool\src\datastore.jl:425
[10] #invokelatest#2
@ .\essentials.jl:899
[11] invokelatest
@ .\essentials.jl:896
[12] #110
@ C:\Users\krynjupc\AppData\Local\Programs\Julia-1.11.0-DEV\share\julia\stdlib\v1.11\Distributed\src\process_messages.jl:286
[13] run_work_thunk
@ C:\Users\krynjupc\AppData\Local\Programs\Julia-1.11.0-DEV\share\julia\stdlib\v1.11\Distributed\src\process_messages.jl:70
[14] #109
@ C:\Users\krynjupc\AppData\Local\Programs\Julia-1.11.0-DEV\share\julia\stdlib\v1.11\Distributed\src\process_messages.jl:286
Stacktrace:
[1] #remotecall_fetch#159
@ C:\Users\krynjupc\AppData\Local\Programs\Julia-1.11.0-DEV\share\julia\stdlib\v1.11\Distributed\src\remotecall.jl:465
[2] remotecall_fetch
@ C:\Users\krynjupc\AppData\Local\Programs\Julia-1.11.0-DEV\share\julia\stdlib\v1.11\Distributed\src\remotecall.jl:454
[3] remotecall_fetch
@ C:\Users\krynjupc\AppData\Local\Programs\Julia-1.11.0-DEV\share\julia\stdlib\v1.11\Distributed\src\remotecall.jl:492 [inlined]
[4] #173
@ C:\Users\krynjupc\.julia\dev\MemPool\src\datastore.jl:424 [inlined]
[5] forwardkeyerror
@ C:\Users\krynjupc\.julia\dev\MemPool\src\datastore.jl:409
[6] poolget
@ C:\Users\krynjupc\.julia\dev\MemPool\src\datastore.jl:423
[7] move
@ C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\chunks.jl:98
[8] move
@ C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\chunks.jl:96
[9] #invokelatest#2
@ .\essentials.jl:899 [inlined]
[10] invokelatest
@ .\essentials.jl:896 [inlined]
[11] #154
@ C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\sch\Sch.jl:1475
Stacktrace:
[1] wait
@ .\task.jl:354 [inlined]
[2] fetch
@ .\task.jl:374 [inlined]
[3] fetch_report
@ C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\sch\util.jl:241
[4] do_task
@ C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\sch\Sch.jl:1502
[5] #132
@ C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\sch\Sch.jl:1243
Root Thunk: Thunk(id=2224, #38(Dagger.WeakChunk(1, 5051, WeakRef(Dagger.Chunk{@NamedTuple{a::Vector{Int64}}, DRef, OSProc, AnyScope}(@NamedTuple{a::Vector{Int64}},
UnitDomain(), DRef(1, 5051, 0x0000000000a3d738), OSProc(1), AnyScope(), false))), #129))
Inner Thunk: Thunk(id=2325, isnonempty(Thunk[2224](#38, Any[Dagger.WeakChunk(1, 5051, WeakRef(Dagger.Chunk{@NamedTuple{a::Vector{Int64}}, DRef, OSProc, AnyScope}(@NamedTuple{a::Vector{Int64}}, UnitDomain(), DRef(1, 5051, 0x0000000000a3d738), OSProc(1), AnyScope(), false))), var"#129#130"()])))
This Thunk: Thunk(id=2325, isnonempty(Thunk[2224](#38, Any[Dagger.WeakChunk(1, 5051, WeakRef(Dagger.Chunk{@NamedTuple{a::Vector{Int64}}, DRef, OSProc, AnyScope}(@NamedTuple{a::Vector{Int64}}, UnitDomain(), DRef(1, 5051, 0x0000000000a3d738), OSProc(1), AnyScope(), false))), var"#129#130"()])))
Stacktrace:
[1] fetch(t::Dagger.ThunkFuture; proc::OSProc, raw::Bool)
@ Dagger C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\eager_thunk.jl:16
[2] fetch
@ C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\eager_thunk.jl:11 [inlined]
[3] #fetch#75
@ C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\eager_thunk.jl:58 [inlined]
[4] fetch
@ C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\eager_thunk.jl:54 [inlined]
[5] #10
@ C:\Users\krynjupc\.julia\packages\DTables\BjdY2\src\table\dtable.jl:233 [inlined]
[6] filter(f::DTables.var"#10#13"{Vector{Dagger.EagerThunk}}, a::Vector{Tuple{Int64, Union{Dagger.EagerThunk, Dagger.Chunk}}})
@ Base .\array.jl:2673
[7] trim!(d::DTable)
@ DTables C:\Users\krynjupc\.julia\packages\DTables\BjdY2\src\table\dtable.jl:233
[8] trim(d::DTable)
@ DTables C:\Users\krynjupc\.julia\packages\DTables\BjdY2\src\table\dtable.jl:242
[9] retrieve_partitions
@ C:\Users\krynjupc\.julia\packages\DTables\BjdY2\src\table\dtable.jl:179 [inlined]
[10] fetch(d::DTable)
@ DTables C:\Users\krynjupc\.julia\packages\DTables\BjdY2\src\table\dtable.jl:167
[11] |>(x::DTable, f::typeof(fetch))
@ Base .\operators.jl:917
[12] top-level scope
@ REPL[57]:1
reproducer, no files needed
just run julia -p4
and run the last 5 lines over and over till it appears (and then any further call will generate the error again)
ENV["JULIA_MEMPOOL_EXPERIMENTAL_FANCY_ALLOCATOR"] = "true"
ENV["JULIA_MEMPOOL_EXPERIMENTAL_MEMORY_BOUND"] = string(2 * (2^30)) # 2GB
# ENV["JULIA_MEMPOOL_EXPERIMENTAL_DISK_CACHE"] = "C:\\Users\\krynjupc\\.mempool\\demo_session_$(rand(Int))"
using Distributed
@info(
"Execution environment details",
julia_version=VERSION,
n_workers=Distributed.nworkers(),
n_procs=Distributed.nprocs(),
n_threads=Threads.nthreads(),
)
function view_cache()
!isdir(ENV["JULIA_MEMPOOL_EXPERIMENTAL_DISK_CACHE"]) && return []
map(
x -> (basename(x), round(filesize(x) / 2^20, digits=2)),
readdir(ENV["JULIA_MEMPOOL_EXPERIMENTAL_DISK_CACHE"], join=true)
)
end
using DTables
DTables.enable_disk_caching!()
using MemPool
using Dagger
N1 = 2^27 # 1GB
d = DTable((a=rand(Int, N1),), N1 ÷ 100)
map(x -> (r=x.a + 1,), d) |> fetch
MemPool.GLOBAL_DEVICE[]
view_cache()
Can't reproduce with the fix https://github.com/JuliaData/MemPool.jl/pull/74 Stressed it really hard and I didn't get any errors
Will cut a release soon
I just tested the new releases of DTables.jl/Dagger.jl/MemPool.jl using the reproducer I mentioned above.
Without disk caching enabled:
MemPool.datastore
.With enable_disk_caching!(50, 2^10 * 20)
:
AssertionError: Failed to migrate
still occurs on both WSL 2 and Windows (though it does take many more calls to main
than before to experience the error).So, it looks like the issue is not entirely resolved yet.
@StevenWhitaker when this occurs, how much does Base.format_bytes(MemPool.GLOBAL_DEVICE[].device_size)
report, and what amount of memory is reported by the OS as being used by Julia (out of the total system RAM)? If MemPool's LRU truly thinks it's running out of memory, then this will trigger (I probably should make the error more informative).
@jpsamaroo I ran the following code to grab the info you requested. Let me know if you need any other info.
julia> include("mwe.jl"); for i = 1:200
totalmem = Base.format_bytes(Sys.total_physical_memory())
memusage = map(procs()) do id
remotecall_fetch(id) do
parse(Int, split(read(`ps -p $(getpid()) -o rss`, String), "\n")[end-1]) * 1000
end
end
totalmemusage = Base.format_bytes(sum(memusage))
worker_memusage = Base.format_bytes(memusage[2])
device_size = remotecall_fetch(2) do
DTables.Dagger.MemPool.GLOBAL_DEVICE[].device_size[]
end |> Base.format_bytes # Remove `device_size` when disk caching not enabled
@info "" i device_size worker_memusage totalmemusage totalmem
main()
end
Without disk caching enabled on WSL 2:
┌ Info:
│ i = 131
│ worker_memusage = "26.100 GiB"
│ totalmemusage = "29.153 GiB"
└ totalmem = "30.994 GiB"
With enable_disk_caching!(50, 2^10 * 20)
on WSL 2:
AssertionError
:
┌ Info:
│ i = 117
│ device_size = "19.417 GiB"
│ worker_memusage = "7.348 GiB"
│ totalmemusage = "13.088 GiB"
└ totalmem = "30.994 GiB"
┌ Info:
│ device_size = "19.590 GiB"
│ worker_memusage = "7.283 GiB"
│ totalmemusage = "13.040 GiB"
└ totalmem = "30.994 GiB"
So, it looks like the device is running out of memory, but why?
Can you try throwing in some @everywhere GC.gc()
calls every iteration and see if that delays or eliminates the OOM situation? If so, then it means that we should start automatically calling the GC to reduce memory usage when we start hitting RAM limits.
I added @everywhere GC.gc()
at the start of each iteration. It delayed the OOM issue until iteration 300, but the Julia process still was killed.
Any updates yet on this front?
@StevenWhitaker can you try out https://github.com/JuliaData/MemPool.jl/pull/75 and see if it delays the OOM further or eliminates it? You'll probably want to fiddle with the MemPool.MEM_RESERVED[]
value to see which avoids the OOM best while minimizing overhead - it's set to 512MB to be conservative, but 2GB works on my machine better (since I run earlyoom
which is more eager than the Linux kernel's OOM killer).
@jpsamaroo I tried using the default value of MemPool.MEM_RESERVED[]
, 2 GB, and 10 GB, and in all cases I got the same error (that looks different than what I reported earlier):
``` From worker 4: ┌ Error: Error on 4 while connecting to peer 3, exiting From worker 4: │ exception = From worker 4: │ ConcurrencyViolationError("lock must be held") From worker 4: │ Stacktrace: From worker 4: │ [1] concurrency_violation() From worker 4: │ @ Base ./condition.jl:8 From worker 4: │ [2] assert_havelock From worker 4: │ @ ./condition.jl:25 [inlined] From worker 4: │ [3] assert_havelock From worker 4: │ @ ./condition.jl:48 [inlined] From worker 4: │ [4] assert_havelock From worker 4: │ @ ./condition.jl:72 [inlined] From worker 4: │ [5] notify(c::Condition, arg::Any, all::Bool, error::Bool) From worker 4: │ @ Base ./condition.jl:150 From worker 4: │ [6] #notify#622 From worker 4: │ @ ./condition.jl:148 [inlined] From worker 4: │ [7] notify (repeats 2 times) From worker 4: │ @ ./condition.jl:148 [inlined] From worker 4: │ [8] set_worker_state From worker 4: │ @ ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:148 [inlined] From worker 4: │ [9] Distributed.Worker(id::Int64, r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, manager::Distributed.DefaultClusterManager; version::Nothing, config::WorkerConfig) From worker 4: │ @ Distributed ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:126 From worker 4: │ [10] Worker From worker 4: │ @ ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:116 [inlined] From worker 4: │ [11] connect_to_peer(manager::Distributed.DefaultClusterManager, rpid::Int64, wconfig::WorkerConfig) From worker 4: │ @ Distributed ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:364 From worker 4: │ [12] (::Distributed.var"#121#123"{Int64, WorkerConfig})() From worker 4: │ @ Distributed ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:350 From worker 4: │ [13] exec_conn_func(w::Distributed.Worker) From worker 4: │ @ Distributed ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:181 From worker 4: │ [14] (::Distributed.var"#21#24"{Distributed.Worker})() From worker 4: │ @ Distributed ./task.jl:514 From worker 4: └ @ Distributed ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:370 ┌ Error: Error when tearing down scheduler │ exception = │ TaskFailedException │ │ nested task error: no process with id 4 exists │ Stacktrace: │ [1] error(s::String) │ @ Base ./error.jl:35 │ [2] worker_from_id(pg::Distributed.ProcessGroup, i::Int64) │ @ Distributed ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:1098 │ [3] worker_from_id │ @ ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:1090 [inlined] │ [4] #remote_do#170 │ @ ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:557 [inlined] │ [5] remote_do │ @ ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:557 [inlined] │ [6] cleanup_proc(state::Dagger.Sch.ComputeState, p::Dagger.OSProc, log_sink::TimespanLogging.NoOpLog) │ @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:409 │ [7] (::Dagger.Sch.var"#105#108"{Dagger.Context, Dagger.Sch.ComputeState, Dagger.OSProc})() │ @ Dagger.Sch ./task.jl:514 │ Stacktrace: │ [1] sync_end(c::Channel{Any}) │ @ Base ./task.jl:445 │ [2] macro expansion │ @ ./task.jl:477 [inlined] │ [3] scheduler_exit(ctx::Dagger.Context, state::Dagger.Sch.ComputeState, options::Dagger.Sch.SchedulerOptions) │ @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:615 │ [4] compute_dag(ctx::Dagger.Context, d::Dagger.Thunk; options::Dagger.Sch.SchedulerOptions) │ @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:471 │ [5] compute_dag │ @ ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:431 [inlined] │ [6] compute(ctx::Dagger.Context, d::Dagger.Thunk; options::Dagger.Sch.SchedulerOptions) │ @ Dagger ~/.julia/packages/Dagger/lhyAj/src/compute.jl:23 │ [7] compute │ @ ~/.julia/packages/Dagger/lhyAj/src/compute.jl:22 [inlined] │ [8] macro expansion │ @ ~/.julia/packages/Dagger/lhyAj/src/sch/eager.jl:27 [inlined] │ [9] (::Dagger.Sch.var"#58#60"{Dagger.Context})() │ @ Dagger.Sch ./threadingconstructs.jl:416 └ @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:473 Error in eager scheduler: Cannot serialize a WeakChunk Stacktrace: [1] error(s::String) @ Base ./error.jl:35 [2] serialize(io::Distributed.ClusterSerializer{Sockets.TCPSocket}, wc::Dagger.WeakChunk) @ Dagger ~/.julia/packages/Dagger/lhyAj/src/chunks.jl:298 [3] serialize_any(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, x::Any) @ Serialization ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:678 [4] serialize @ ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:657 [inlined] [5] serialize(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, a::Vector{Pair{Union{Nothing, Symbol}, Any}}) @ Serialization ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:277 [6] serialize_any(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, x::Any) @ Serialization ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:678 [7] serialize(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, x::Any) @ Serialization ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:657 [8] serialize_any(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, x::Any) @ Serialization ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:678 [9] serialize @ ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:657 [inlined] [10] serialize(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, t::Tuple{Bool, Dagger.ThunkFailedException{RemoteException}}) @ Serialization ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:205 [11] serialize(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, t::Tuple{Distributed.RRID, Tuple{Bool, Dagger.ThunkFailedException{RemoteException}}, Int64}) @ Serialization ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:205 [12] serialize_msg(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, o::Distributed.CallMsg{:call_fetch}) @ Distributed ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/messages.jl:78 [13] #invokelatest#2 @ ./essentials.jl:819 [inlined] [14] invokelatest @ ./essentials.jl:816 [inlined] [15] send_msg_(w::Distributed.Worker, header::Distributed.MsgHeader, msg::Distributed.CallMsg{:call_fetch}, now::Bool) @ Distributed ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/messages.jl:181 [16] send_msg @ ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/messages.jl:122 [inlined] [17] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}}) @ Distributed ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:460 [18] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any}) @ Distributed ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454 [19] remotecall_fetch(::Function, ::Int64, ::Distributed.RRID, ::Vararg{Any}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}}) @ Distributed ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [20] remotecall_fetch(::Function, ::Int64, ::Distributed.RRID, ::Vararg{Any}) @ Distributed ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [21] call_on_owner(::Function, ::Future, ::Tuple{Bool, Dagger.ThunkFailedException{RemoteException}}, ::Vararg{Any}) @ Distributed ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:565 [22] macro expansion @ ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:681 [inlined] [23] macro expansion @ ./lock.jl:267 [inlined] [24] put!(r::Future, v::Tuple{Bool, Dagger.ThunkFailedException{RemoteException}}) @ Distributed ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:680 [25] #put!#71 @ ~/.julia/packages/Dagger/lhyAj/src/eager_thunk.jl:24 [inlined] [26] fill_registered_futures!(state::Dagger.Sch.ComputeState, node::Dagger.Thunk, failed::Bool) @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/util.jl:59 [27] finish_failed!(state::Dagger.Sch.ComputeState, thunk::Dagger.Thunk, origin::Dagger.Thunk) @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/util.jl:183 [28] set_failed!(state::Dagger.Sch.ComputeState, origin::Dagger.Thunk, thunk::Dagger.Thunk) @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/util.jl:180 [29] set_failed! @ ~/.julia/packages/Dagger/lhyAj/src/sch/util.jl:177 [inlined] [30] finish_task!(ctx::Dagger.Context, state::Dagger.Sch.ComputeState, node::Dagger.Thunk, thunk_failed::Bool) @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:915 [31] (::Dagger.Sch.var"#103#104"{Dagger.Context, Dagger.Sch.ComputeState, Dagger.OSProc, NamedTuple{(:time_pressure, :storage_pressure, :storage_capacity, :loadavg, :threadtime, :gc_allocd, :transfer_rate), Tuple{UInt64, UInt64, UInt64, Tuple{Float64, Float64, Float64}, UInt64, Int64, UInt64}}, RemoteException, Int64, Dagger.ThreadProc, Int64})() @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:591 [32] lock(f::Dagger.Sch.var"#103#104"{Dagger.Context, Dagger.Sch.ComputeState, Dagger.OSProc, NamedTuple{(:time_pressure, :storage_pressure, :storage_capacity, :loadavg, :threadtime, :gc_allocd, :transfer_rate), Tuple{UInt64, UInt64, UInt64, Tuple{Float64, Float64, Float64}, UInt64, Int64, UInt64}}, RemoteException, Int64, Dagger.ThreadProc, Int64}, l::ReentrantLock) @ Base ./lock.jl:229 [33] scheduler_run(ctx::Dagger.Context, state::Dagger.Sch.ComputeState, d::Dagger.Thunk, options::Dagger.Sch.SchedulerOptions) @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:542 [34] compute_dag(ctx::Dagger.Context, d::Dagger.Thunk; options::Dagger.Sch.SchedulerOptions) @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:466 [35] compute_dag @ ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:431 [inlined] [36] compute(ctx::Dagger.Context, d::Dagger.Thunk; options::Dagger.Sch.SchedulerOptions) @ Dagger ~/.julia/packages/Dagger/lhyAj/src/compute.jl:23 [37] compute @ ~/.julia/packages/Dagger/lhyAj/src/compute.jl:22 [inlined] [38] macro expansion @ ~/.julia/packages/Dagger/lhyAj/src/sch/eager.jl:27 [inlined] [39] (::Dagger.Sch.var"#58#60"{Dagger.Context})() @ Dagger.Sch ./threadingconstructs.jl:416 From worker 3: UNHANDLED TASK ERROR: AssertionError: id > 0Worker 4 terminated. From worker 3: Stacktrace: Unhandled Task ERROR: EOFError: read end of file Stacktrace: [1] (::Base.var"#wait_locked#715")(s::Sockets.TCPSocket, buf::IOBuffer, nb::Int64) @ Base ./stream.jl:947 [2] unsafe_read(s::Sockets.TCPSocket, p::Ptr{UInt8}, nb::UInt64) @ Base ./stream.jl:955 [3] unsafe_read @ ./io.jl:761 [inlined] [4] unsafe_read(s::Sockets.TCPSocket, p::Base.RefValue{NTuple{4, Int64}}, n::Int64) @ Base ./io.jl:760 [5] read! @ ./io.jl:762 [inlined] [6] deserialize_hdr_raw @ ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/messages.jl:167 [inlined] [7] message_handler_loop(r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, incoming::Bool) @ Distributed ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:172 [8] process_tcp_streams(r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, incoming::Bool) @ Distributed ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:133 [9] (::Distributed.var"#103#104"{Sockets.TCPSocket, Sockets.TCPSocket, Bool})() @ Distributed ./task.jl:514 From worker 3: [1] Distributed.Worker(id::Int64, conn_func::Nothing) From worker 3: @ Distributed ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:133 From worker 3: [2] Worker From worker 3: @ ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:131 [inlined] From worker 3: [3] worker_from_id(pg::Distributed.ProcessGroup, i::Int64) From worker 3: @ Distributed ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:1100 From worker 3: [4] worker_from_id From worker 3: @ ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:1090 [inlined] From worker 3: [5] message_handler_loop(r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, incoming::Bool) From worker 3: @ Distributed ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:213 From worker 3: [6] process_tcp_streams(r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, incoming::Bool) From worker 3: @ Distributed ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:133 From worker 3: [7] (::Distributed.var"#103#104"{Sockets.TCPSocket, Sockets.TCPSocket, Bool})() From worker 3: @ Distributed ./task.jl:514 From worker 3: From worker 3: caused by: Cookie read failed. Connection closed by peer. From worker 3: Stacktrace: From worker 3: [1] error(s::String) From worker 3: @ Base ./error.jl:35 From worker 3: [2] process_hdr(s::Sockets.TCPSocket, validate_cookie::Bool) From worker 3: @ Distributed ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:259 From worker 3: [3] message_handler_loop(r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, incoming::Bool) From worker 3: @ Distributed ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:158 From worker 3: [4] process_tcp_streams(r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, incoming::Bool) From worker 3: @ Distributed ~/programs/julia/julia-1.9.4/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:133 From worker 3: [5] (::Distributed.var"#103#104"{Sockets.TCPSocket, Sockets.TCPSocket, Bool})() From worker 3: @ Distributed ./task.jl:514 ```
The error looks different, but it probably is the same error (i.e., OOM manager kills one of my Julia processes) because I notice one process is missing after the error. Oh, and now I see the Worker 4 terminated.
in the stacktrace.
Note that the error occurred sooner with 10 GB reserved.
This is with disk caching disabled; I'm assuming JuliaData/MemPool.jl#75 does nothing about the disk caching errors I've also been experiencing.
The error shown should be fixed by https://github.com/JuliaParallel/Dagger.jl/issues/450#issuecomment-1812894331, you need to be using Julia 1.11 with my Distributed PR at all times for reliable distributed computing.
And yes, it doesn't yet handle disk caching, but I can add support for that (we'll nicely ask the LRU to swap more stuff to disk if it can).
Oh, I thought JuliaLang/Distributed.jl#4 was only relevant if running code with multiple threads, but I'm running julia
with -t1
. But I will try again tomorrow with that fix.
With disk caching enabled, as far as I can tell, my issue isn't running out of RAM, it's running out of allotted disk space. So the LRU needs to delete stuff that is no longer needed, but it seems like that isn't happening.
My reproducer still fails.
With JuliaLang/Distributed.jl#4 and JuliaData/MemPool.jl#75 and the most recent nightly build of Julia:
``` $ ~/programs/julia/julia-9fc1b653c4/bin/julia --project -t1 --heap-size-hint=3G _ _ _ _(_)_ | Documentation: https://docs.julialang.org (_) | (_) (_) | _ _ _| |_ __ _ | Type "?" for help, "]?" for Pkg help. | | | | | | |/ _` | | | | |_| | | | (_| | | Version 1.11.0-DEV.1105 (2023-12-15) _/ |\__'_|_|_|\__'_| | Commit 9fc1b653c43 (0 days old master) |__/ | julia> include("mwe.jl"); @show(Base.format_bytes(DTables.Dagger.MemPool.MEM_RESERVED[])); for i = 1:300 (i % 10 == 0 && @show(i)); main() end Base.format_bytes(DTables.Dagger.MemPool.MEM_RESERVED[]) = "512.000 MiB" From worker 2: ┌ Info: From worker 2: └ length(dt3) = 233930 From worker 2: ┌ Info: From worker 2: └ length(dt3) = 233930 From worker 2: ┌ Info: From worker 2: └ length(dt3) = 233930 From worker 2: ┌ Info: From worker 2: └ length(dt3) = 233930 From worker 2: GC error (probable corruption) From worker 2: Allocations: 191540514 (Pool: 191538428; Big: 2086); GC: 3793 From worker 2: #0x7fcfcafd02e0::Memory{ From worker 2: !!! ERROR in jl_ -- ABORTING !!! From worker 2: From worker 2: [6894] signal 6 (-6): Aborted From worker 2: in expression starting at none:0 From worker 2: Allocations: 191540514 (Pool: 191538428; Big: 2086); GC: 3793 ERROR: ┌ Warning: Worker 2 died, rescheduling work └ @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:546 Worker 2 terminated.ProcessExitedException (┌ Error: Error assigning workers │ exception = │ ProcessExitedException(2) │ Stacktrace: │ [1] worker_from_id(pg::Distributed.ProcessGroup, i::Int64) │ @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/cluster.jl:1121 │ [2] worker_from_id(pg::Distributed.ProcessGroup, i::Int64) │ @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/cluster.jl:1118 [inlined] │ [3] remote_do │ @ ~/tmp/distributed_fix/Distributed.jl/src/remotecall.jl:557 [inlined] │ [4] cleanup_proc(state::Dagger.Sch.ComputeState, p::Dagger.OSProc, log_sink::TimespanLogging.NoOpLog) │ @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:409 │ [5] monitor_procs_changed!(ctx::Dagger.Context, state::Dagger.Sch.ComputeState) │ @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:887 │ [6] (::Dagger.Sch.var"#100#102"{Dagger.Context, Dagger.Sch.ComputeState})() │ @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:509 └ @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:511 2Unhandled Task ERROR: EOFError: read end of file Stacktrace: [1] (::Base.var"#wait_locked#797")(s::Sockets.TCPSocket, buf::IOBuffer, nb::Int64) @ Base ./stream.jl:950 [2] unsafe_read(s::Sockets.TCPSocket, p::Ptr{UInt8}, nb::UInt64) @ Base ./stream.jl:958 [3] unsafe_read @ ./io.jl:882 [inlined] [4] unsafe_read(s::Sockets.TCPSocket, p::Base.RefValue{NTuple{4, Int64}}, n::Int64) @ Base ./io.jl:881 [5] read! @ ./io.jl:886 [inlined] [6] deserialize_hdr_raw @ ~/tmp/distributed_fix/Distributed.jl/src/messages.jl:167 [inlined] [7] message_handler_loop(r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, incoming::Bool) @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/process_messages.jl:172 [8] process_tcp_streams(r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, incoming::Bool) @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/process_messages.jl:133 [9] (::Distributed.var"#113#114"{Sockets.TCPSocket, Sockets.TCPSocket, Bool})() @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/process_messages.jl:121 ) Stacktrace: [1] try_yieldto(undo::typeof(Base.ensure_rescheduled)) @ Base ./task.jl:944 [2] wait() @ Base ./task.jl:1008 [3] wait(c::Base.GenericCondition{ReentrantLock}; first::Bool) @ Base ./condition.jl:130 [4] wait @ Base ./condition.jl:125 [inlined] [5] take_buffered(c::Channel{Any}) @ Base ./channels.jl:477 [6] take!(c::Channel{Any}) @ Base ./channels.jl:471 [7] take!(::Distributed.RemoteValue) @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/remotecall.jl:726 [8] remotecall_fetch(::Function, ::Distributed.Worker; kwargs::@Kwargs{}) @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/remotecall.jl:461 [9] remotecall_fetch(::Function, ::Distributed.Worker) @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/remotecall.jl:454 [10] remotecall_fetch @ ~/tmp/distributed_fix/Distributed.jl/src/remotecall.jl:492 [inlined] [11] main @ ~/tmp/distributed_fix/mwe.jl:19 [inlined] [12] top-level scope @ ./REPL[1]:1 ```
``` $ JULIA_MEMPOOL_MEMORY_RESERVED=2000000000 ~/programs/julia/julia-9fc1b653c4/bin/julia --project -t 1 --heap-size-hint=3G _ _ _ _(_)_ | Documentation: https://docs.julialang.org (_) | (_) (_) | _ _ _| |_ __ _ | Type "?" for help, "]?" for Pkg help. | | | | | | |/ _` | | | | |_| | | | (_| | | Version 1.11.0-DEV.1105 (2023-12-15) _/ |\__'_|_|_|\__'_| | Commit 9fc1b653c43 (0 days old master) |__/ | julia> include("mwe.jl"); @show(Base.format_bytes(DTables.Dagger.MemPool.MEM_RESERVED[])); for i = 1:300 (i % 10 == 0 && @show(i)); main() end Base.format_bytes(DTables.Dagger.MemPool.MEM_RESERVED[]) = "1.863 GiB" From worker 2: ┌ Info: From worker 2: └ length(dt3) = 233930 From worker 2: ┌ Info: From worker 2: └ length(dt3) = 233930 From worker 2: ┌ Info: From worker 2: └ length(dt3) = 233930 From worker 2: ┌ Info: From worker 2: └ length(dt3) = 233930 From worker 2: GC error (probable corruption) From worker 2: Allocations: 191545919 (Pool: 191543833; Big: 2086); GC: 3751 From worker 2: #0x7f7c13df0180::Memory{ From worker 2: !!! ERROR in jl_ -- ABORTING !!! From worker 2: From worker 2: [6675] signal 6 (-6): Aborted From worker 2: in expression starting at none:0 From worker 2: Allocations: 191545919 (Pool: 191543833; Big: 2086); GC: 3751 ┌ Warning: Worker 2 died, rescheduling work └ @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:546 ERROR: Worker 2 terminated.ProcessExitedException ┌ Error: Error assigning workers │ exception = │ ProcessExitedException(2) │ Stacktrace: │ [1] worker_from_id(pg::Distributed.ProcessGroup, i::Int64) │ @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/cluster.jl:1121 │ [2] worker_from_id(pg::Distributed.ProcessGroup, i::Int64) │ @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/cluster.jl:1118 [inlined] │ [3] remote_do │ @ ~/tmp/distributed_fix/Distributed.jl/src/remotecall.jl:557 [inlined] │ [4] cleanup_proc(state::Dagger.Sch.ComputeState, p::Dagger.OSProc, log_sink::TimespanLogging.NoOpLog) │ @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:409 │ [5] monitor_procs_changed!(ctx::Dagger.Context, state::Dagger.Sch.ComputeState) │ @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:887 │ [6] (::Dagger.Sch.var"#100#102"{Dagger.Context, Dagger.Sch.ComputeState})() │ @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:509 └ @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:511 (2Unhandled Task ERROR: EOFError: read end of file Stacktrace: [1] (::Base.var"#wait_locked#797")(s::Sockets.TCPSocket, buf::IOBuffer, nb::Int64) @ Base ./stream.jl:950 [2] unsafe_read(s::Sockets.TCPSocket, p::Ptr{UInt8}, nb::UInt64) @ Base ./stream.jl:958 [3] unsafe_read @ ./io.jl:882 [inlined] [4] unsafe_read(s::Sockets.TCPSocket, p::Base.RefValue{NTuple{4, Int64}}, n::Int64) @ Base ./io.jl:881 [5] read! @ ./io.jl:886 [inlined] [6] deserialize_hdr_raw @ ~/tmp/distributed_fix/Distributed.jl/src/messages.jl:167 [inlined] [7] message_handler_loop(r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, incoming::Bool) @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/process_messages.jl:172 [8] process_tcp_streams(r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, incoming::Bool) @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/process_messages.jl:133 [9] (::Distributed.var"#113#114"{Sockets.TCPSocket, Sockets.TCPSocket, Bool})() @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/process_messages.jl:121 ) Stacktrace: [1] try_yieldto(undo::typeof(Base.ensure_rescheduled)) @ Base ./task.jl:944 [2] wait() @ Base ./task.jl:1008 [3] wait(c::Base.GenericCondition{ReentrantLock}; first::Bool) @ Base ./condition.jl:130 [4] wait @ Base ./condition.jl:125 [inlined] [5] take_buffered(c::Channel{Any}) @ Base ./channels.jl:477 [6] take!(c::Channel{Any}) @ Base ./channels.jl:471 [7] take!(::Distributed.RemoteValue) @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/remotecall.jl:726 [8] remotecall_fetch(::Function, ::Distributed.Worker; kwargs::@Kwargs{}) @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/remotecall.jl:461 [9] remotecall_fetch(::Function, ::Distributed.Worker) @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/remotecall.jl:454 [10] remotecall_fetch @ ~/tmp/distributed_fix/Distributed.jl/src/remotecall.jl:492 [inlined] [11] main @ ~/tmp/distributed_fix/mwe.jl:19 [inlined] [12] top-level scope @ ./REPL[1]:1 ```
``` $ JULIA_MEMPOOL_MEMORY_RESERVED=10000000000 ~/programs/julia/julia-9fc1b653c4/bin/julia --project -t1 --heap-size-hint=3G _ _ _ _(_)_ | Documentation: https://docs.julialang.org (_) | (_) (_) | _ _ _| |_ __ _ | Type "?" for help, "]?" for Pkg help. | | | | | | |/ _` | | | | |_| | | | (_| | | Version 1.11.0-DEV.1105 (2023-12-15) _/ |\__'_|_|_|\__'_| | Commit 9fc1b653c43 (0 days old master) |__/ | julia> include("mwe.jl"); @show(Base.format_bytes(DTables.Dagger.MemPool.MEM_RESERVED[])); for i = 1:300 (i % 10 == 0 && @show(i)); main() end Base.format_bytes(DTables.Dagger.MemPool.MEM_RESERVED[]) = "9.313 GiB" From worker 2: ┌ Info: From worker 2: └ length(dt3) = 233930 From worker 2: ┌ Info: From worker 2: └ length(dt3) = 233930 From worker 2: ┌ Info: From worker 2: └ length(dt3) = 233930 From worker 2: ┌ Info: From worker 2: └ length(dt3) = 233930 From worker 2: From worker 2: [5814] signal 11 (1): Segmentation fault From worker 2: in expression starting at none:0 From worker 2: Allocations: 191516083 (Pool: 191513997; Big: 2086); GC: 6403 ERROR: ┌ Warning: Worker 2 died, rescheduling work └ @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:546 Worker 2 terminated.ProcessExitedException┌ Warning: Worker 2 died, rescheduling work └ @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:546 (┌ Error: Error assigning workers │ exception = │ ProcessExitedException(2) │ Stacktrace: │ [1] worker_from_id(pg::Distributed.ProcessGroup, i::Int64) │ @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/cluster.jl:1121 │ [2] worker_from_id(pg::Distributed.ProcessGroup, i::Int64) │ @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/cluster.jl:1118 [inlined] │ [3] remote_do │ @ ~/tmp/distributed_fix/Distributed.jl/src/remotecall.jl:557 [inlined] │ [4] cleanup_proc(state::Dagger.Sch.ComputeState, p::Dagger.OSProc, log_sink::TimespanLogging.NoOpLog) │ @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:409 │ [5] monitor_procs_changed!(ctx::Dagger.Context, state::Dagger.Sch.ComputeState) │ @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:887 │ [6] (::Dagger.Sch.var"#100#102"{Dagger.Context, Dagger.Sch.ComputeState})() │ @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:509 └ @ Dagger.Sch ~/.julia/packages/Dagger/lhyAj/src/sch/Sch.jl:511 2Unhandled Task ERROR: EOFError: read end of file Stacktrace: [1] (::Base.var"#wait_locked#797")(s::Sockets.TCPSocket, buf::IOBuffer, nb::Int64) @ Base ./stream.jl:950 [2] unsafe_read(s::Sockets.TCPSocket, p::Ptr{UInt8}, nb::UInt64) @ Base ./stream.jl:958 [3] unsafe_read @ ./io.jl:882 [inlined] [4] unsafe_read(s::Sockets.TCPSocket, p::Base.RefValue{NTuple{4, Int64}}, n::Int64) @ Base ./io.jl:881 [5] read! @ ./io.jl:886 [inlined] [6] deserialize_hdr_raw @ ~/tmp/distributed_fix/Distributed.jl/src/messages.jl:167 [inlined] [7] message_handler_loop(r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, incoming::Bool) @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/process_messages.jl:172 [8] process_tcp_streams(r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, incoming::Bool) @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/process_messages.jl:133 [9] (::Distributed.var"#113#114"{Sockets.TCPSocket, Sockets.TCPSocket, Bool})() @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/process_messages.jl:121 ) Stacktrace: [1] try_yieldto(undo::typeof(Base.ensure_rescheduled)) @ Base ./task.jl:944 [2] wait() @ Base ./task.jl:1008 [3] wait(c::Base.GenericCondition{ReentrantLock}; first::Bool) @ Base ./condition.jl:130 [4] wait @ Base ./condition.jl:125 [inlined] [5] take_buffered(c::Channel{Any}) @ Base ./channels.jl:477 [6] take!(c::Channel{Any}) @ Base ./channels.jl:471 [7] take!(::Distributed.RemoteValue) @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/remotecall.jl:726 [8] remotecall_fetch(::Function, ::Distributed.Worker; kwargs::@Kwargs{}) @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/remotecall.jl:461 [9] remotecall_fetch(::Function, ::Distributed.Worker) @ Distributed ~/tmp/distributed_fix/Distributed.jl/src/remotecall.jl:454 [10] remotecall_fetch @ ~/tmp/distributed_fix/Distributed.jl/src/remotecall.jl:492 [inlined] [11] main @ ~/tmp/distributed_fix/mwe.jl:19 [inlined] [12] top-level scope @ ./REPL[1]:1 ```
I can try with a different Julia commit if you think the issue here is the particular commit I used. (And let me know if you have a commit in mind.)
@jpsamaroo Happy Holidays! Any updates on this front?
And to reiterate some questions I had:
Oh, I thought https://github.com/JuliaLang/Distributed.jl/pull/4 was only relevant if running code with multiple threads, but I'm running
julia
with -t1
.
Is my assumption wrong, i.e., even with one thread we need that Distributed.jl fix?
With disk caching enabled, as far as I can tell, my issue isn't running out of RAM, it's running out of allotted disk space. So the LRU needs to delete stuff that is no longer needed, but it seems like that isn't happening.
Any thoughts on this?
I tried a couple of variations of my reproducer to collect a few more data points in case it helps with this issue:
I realized that I accidentally had been using multiple threads on the worker processes (because I didn't realize the -t 1
option didn't automatically propagate to workers added with addprocs
). So, I updated my reproducer to include "-t 1"
in the exeflags
kwarg of addprocs
. I then ran the reproducer both with and without disk caching, as before:
main
due to running out of memory and swap.AssertionError: Failed to migrate
as before occurred after 303 calls to main
.I then decided to remove the two GC.enable
lines in load_dt
:
main
, but memory and swap usage was steadily rising.AssertionError: Failed to migrate
as before occurred after 239 calls to main
.I then tried wrapping the call to query
in main
with Dagger.with_options(query; scope = ProcessScope(myid()))
:
main
.AssertionError: Failed to migrate
as before occurred after 39 calls to main
.There are no memory or disk caching problems when I don't add any processes, regardless of whether or not multiple threads are used.
TL;DR Everything I tried still resulted in failure, except removing processes altogether.
Happy Holidays!
Is my assumption wrong, i.e., even with one thread we need that Distributed.jl fix?
Probably yes, as it may still be a race with multiple async tasks. I haven't really experienced that, but it can probably still occur.
Any thoughts on this?
I still need to implement this in https://github.com/JuliaData/MemPool.jl/pull/75 - I have a long TODO list from before the holidays, so I'm slowly working down it. Thanks for you patience :smile:
W.r.t the non-disk-caching OOMs, I've put together https://github.com/JuliaData/MemPool.jl/pull/76, which together with https://github.com/JuliaData/MemPool.jl/pull/75 and https://github.com/JuliaParallel/Dagger.jl/tree/jps/chained-dtors significantly reduces the amount of memory that Dagger keeps around, and also forces GC calls when we're running out of memory (tunable with the new JULIA_MEMPOOL_MEMORY_RESERVED
env var, or MemPool.MEM_RESERVED[]
). I haven't tested it heavily yet with DTables, but I've seen some solid improvements with DArray
operations. At least for single worker tests, I see very consistent, much lower memory usage, rather than the wild sawtooth pattern that we're used to seeing.
I'll keep at this, but thank you for the detailed updates and patience while I work through these issues!
Thanks for your work, and no worries about having a long TODO list!
I just tried adding JuliaData/MemPool.jl#75 and https://github.com/JuliaParallel/Dagger.jl/tree/jps/chained-dtors to my environment, in conjunction with DTables v0.4.3, and DTables failed to precompile (segfault). I saw this on Julia 1.9.4 and 1.10.0. (But it precompiled fine on 1.11 (2024-01-11 nightly), even without your Distributed fix.) Any thoughts on why DTables would fail to precompile?
Odd, can you provide a stacktrace of the segfault?
In Julia 1.10.0, but it looked the same in Julia 1.9.4.
This is the log when precompiling after updating packages (it's very long, so I had to truncate):
```
Precompiling project...
✗ DTables
10 dependencies successfully precompiled in 24 seconds. 43 already precompiled.
1 dependency had output during precompilation:
┌ Dagger
│ Task
│ Task(next=Task(next=Task(next=Task(next=Task(next=Task(next=Task(next=Task(next=Task(next=Task(next=nothing, queue=Base.IntrusiveLinkedList{Task}(head=
And here's the segfault that comes when calling precompile
:
``` (chained-dtors) pkg> precompile Precompiling project... ✗ DTables 0 dependencies successfully precompiled in 1 seconds. 53 already precompiled. ERROR: The following 1 direct dependency failed to precompile: DTables [20c56dc6-594c-4682-91cf-1d46875b1eba] Failed to precompile DTables [20c56dc6-594c-4682-91cf-1d46875b1eba] to "/home/steven/.julia/compiled/v1.10/DTables/jl_f6HpNj". [1629] signal (6.-6): Aborted in expression starting at /home/steven/.julia/packages/DTables/EiSy4/src/DTables.jl:7 pthread_kill at /lib/x86_64-linux-gnu/libc.so.6 (unknown line) raise at /lib/x86_64-linux-gnu/libc.so.6 (unknown line) abort at /lib/x86_64-linux-gnu/libc.so.6 (unknown line) get_item_for_reloc at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/staticdata.c:1798 [inlined] jl_read_reloclist at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/staticdata.c:1874 jl_restore_system_image_from_stream_ at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/staticdata.c:2996 jl_restore_package_image_from_stream at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/staticdata.c:3418 jl_restore_incremental_from_buf at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/staticdata.c:3465 ijl_restore_package_image_from_file at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/staticdata.c:3549 _include_from_serialized at ./loading.jl:1052 _require_search_from_serialized at ./loading.jl:1575 _require at ./loading.jl:1932 __require_prelocked at ./loading.jl:1806 jfptr___require_prelocked_80742.1 at /home/steven/programs/julia/julia-1.10.0/lib/julia/sys.so (unknown line) _jl_invoke at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/gf.c:2894 [inlined] ijl_apply_generic at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/gf.c:3076 jl_apply at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/julia.h:1982 [inlined] jl_f__call_in_world at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/builtins.c:831 #invoke_in_world#3 at ./essentials.jl:921 [inlined] invoke_in_world at ./essentials.jl:918 [inlined] _require_prelocked at ./loading.jl:1797 macro expansion at ./loading.jl:1784 [inlined] macro expansion at ./lock.jl:267 [inlined] __require at ./loading.jl:1747 jfptr___require_80707.1 at /home/steven/programs/julia/julia-1.10.0/lib/julia/sys.so (unknown line) _jl_invoke at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/gf.c:2894 [inlined] ijl_apply_generic at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/gf.c:3076 jl_apply at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/julia.h:1982 [inlined] jl_f__call_in_world at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/builtins.c:831 #invoke_in_world#3 at ./essentials.jl:921 [inlined] invoke_in_world at ./essentials.jl:918 [inlined] require at ./loading.jl:1740 jfptr_require_80704.1 at /home/steven/programs/julia/julia-1.10.0/lib/julia/sys.so (unknown line) _jl_invoke at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/gf.c:2894 [inlined] ijl_apply_generic at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/gf.c:3076 jl_apply at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/julia.h:1982 [inlined] call_require at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/toplevel.c:481 [inlined] eval_import_path at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/toplevel.c:518 eval_import_from at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/toplevel.c:635 [inlined] eval_import_from at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/toplevel.c:626 jl_toplevel_eval_flex at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/toplevel.c:742 jl_eval_module_expr at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/toplevel.c:215 [inlined] jl_toplevel_eval_flex at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/toplevel.c:736 jl_toplevel_eval_flex at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/toplevel.c:877 ijl_toplevel_eval_in at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/toplevel.c:985 eval at ./boot.jl:385 [inlined] include_string at ./loading.jl:2070 _jl_invoke at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/gf.c:2894 [inlined] ijl_apply_generic at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/gf.c:3076 _include at ./loading.jl:2130 include at ./Base.jl:495 [inlined] include_package_for_output at ./loading.jl:2216 jfptr_include_package_for_output_80987.1 at /home/steven/programs/julia/julia-1.10.0/lib/julia/sys.so (unknown line) _jl_invoke at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/gf.c:2894 [inlined] ijl_apply_generic at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/gf.c:3076 jl_apply at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/julia.h:1982 [inlined] do_call at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/interpreter.c:126 eval_value at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/interpreter.c:223 eval_stmt_value at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/interpreter.c:174 [inlined] eval_body at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/interpreter.c:617 jl_interpret_toplevel_thunk at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/interpreter.c:775 jl_toplevel_eval_flex at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/toplevel.c:934 jl_toplevel_eval_flex at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/toplevel.c:877 ijl_toplevel_eval_in at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/toplevel.c:985 eval at ./boot.jl:385 [inlined] include_string at ./loading.jl:2070 include_string at ./loading.jl:2080 [inlined] exec_options at ./client.jl:316 _start at ./client.jl:552 jfptr__start_82703.1 at /home/steven/programs/julia/julia-1.10.0/lib/julia/sys.so (unknown line) _jl_invoke at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/gf.c:2894 [inlined] ijl_apply_generic at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/gf.c:3076 jl_apply at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/julia.h:1982 [inlined] true_main at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/jlapi.c:582 jl_repl_entrypoint at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/src/jlapi.c:731 main at /cache/build/builder-amdci4-6/julialang/julia-release-1-dot-10/cli/loader_exe.c:58 unknown function (ip: 0x7f524eabcd8f) __libc_start_main at /lib/x86_64-linux-gnu/libc.so.6 (unknown line) unknown function (ip: 0x4010b8) Allocations: 2906 (Pool: 2897; Big: 9); GC: ```
Yeah I'm dealing with these errors too now that I've opened the PR. I'll get those fixed and then let you know when you can try it.
@jpsamaroo I just saw JuliaLang/julia#40626. Does this issue affect Dagger.jl at all, i.e., could this issue be the cause of what I'm seeing with memory seeming like it doesn't get freed?
Huh, I hadn't thought of that, but now that you mention it, every Dagger task does rely on fetch
ing the result of a Julia Task
to get the Dagger task's return value (https://github.com/JuliaParallel/Dagger.jl/blob/2110d621212591b6652cfeb8d6ba668255358d10/src/processor.jl#L164-L172). That could certainly cause over-preservation of task results. I'll investigate this week and get back to you!
Also, the jps/chained-dtors
branch has been merged into Dagger master (with fixes for the precompile errors), so please give that a try when you get a chance!
@jpsamaroo I've been trying several different things, so here's a report of my findings.
chained-dtors
: Without disk caching, this did seem to improve memory utilization (I set MEM_RESERVED[]
to 4 GiB), meaning I don't think I ever saw OOM killing in this case. I did notice some strange memory reporting, though (see below). With disk caching I still experienced the AssertionError
(I know that wasn't the focus for this change though).
chained-dtors
change (but I didn't specifically test without it). I called main
from my MWE 50 times, ran @everywhere GC.gc()
five times, and then looked at memory utilization. Calling Base.summarysize
on MemPool.datastore
on worker 2 reported 1.618 GiB, while the top
command reported 13.1 GiB for that process. And I couldn't find any other Julia variable(s) that could account for the additional ~11 GiB top
thought Julia was using. I'm not really sure what to make of this, but I thought it might be useful for you to know.DTable
vs creating my own DCTable
that just wraps a DRef
(that refers to a DataFrame
). I'll post the actual code below, but the upshot is that using my custom DCTable
is significantly faster and uses much less memory. Is that to be expected, i.e., does DTables.jl/Dagger.jl really come with that much overhead?```julia @everywhere using DTables, DataFrames, CSV if @isdefined(USE_DISK_CACHING) && USE_DISK_CACHING @info "disk caching enabled" enable_disk_caching!(8, 20 * 2^10) # 8% max memory to lead to ~512 MiB per process to match custom code else @info "no disk caching" end @everywhere const DT = Ref{DTable}() @everywhere mutable struct DTableCols key_names value_names keys values end function main() remotecall_fetch(query, 2) end @everywhere function query() dt1 = load_dt() dt2 = add_value_col!(dt1) dt3 = update_value_col!(dt2) @info "" length(dt3) dt4 = calc_value_cols(dt3) dt5 = select(dt4, [6; 12; 103:113]...; copycols = false) dt_agg = aggregate_dt(dt5) return fetch(dt_agg) end @everywhere function load_dt() isassigned(DT) && return DT[] file = "file.csv" dt = DTable(x -> CSV.File(x), [file]; tabletype = DataFrame) DT[] = dt return dt end @everywhere function add_value_col!(dt) dt_cols = create_dt_cols(dt, 1:48, 49:102) dt_cols.value_names = [dt_cols.value_names; "RAND"] dt_cols.values = (dt_cols.values..., rand(length(dt_cols.values[1]))) return create_dt_from_cols(dt_cols; is_sorted = true) end @everywhere function create_dt_cols(dt, key_cols, value_cols) df = fetch(dt) key_names = names(df)[key_cols] value_names = names(df)[value_cols] keys = [df[!, i] for i in key_cols] values = [df[!, i] for i in value_cols] return DTableCols(key_names, value_names, keys, values) end @everywhere function create_dt_from_cols(dt_cols; is_sorted = false) df = DataFrame( (dt_cols.key_names .=> dt_cols.keys)..., (dt_cols.value_names .=> dt_cols.values)...; copycols = false, ) is_sorted || sort!(df) return DTable(df) end @everywhere function update_value_col!(dt) dt_cols = create_dt_cols(dt, 1:48, 49:103) dt_cols.values = ( dt_cols.values[1:10]..., rand(length(dt_cols.values[1])), dt_cols.values[12:end]..., ) return create_dt_from_cols(dt_cols; is_sorted = true) end @everywhere function calc_value_cols(dt) newvals = Vector{Float64}[] for i = 1:10 v = calc_new_value(dt, i) push!(newvals, v) end return append_value_cols(dt, newvals) end @everywhere function calc_new_value(dt, i) dt_cols = create_dt_cols(dt, 1:48, 49:103) return abs.(dt_cols.values[i]) end @everywhere function append_value_cols(dt, newvals) df = fetch(dt) for (i, v) in enumerate(newvals) setproperty!(df, "NEW$i", v) end return DTable(df) end @everywhere function aggregate_dt(dt) key_names = [Symbol("6"), Symbol("12")] gdt = groupby(fetch(dt), key_names) gkeys = sort!(collect(keys(gdt))) key_pairs = key_names .=> invert(gkeys) value_names = [[Symbol("RAND")]; Symbol.("NEW", 1:10)] sums = fetch(reduce(+, gdt; cols = value_names)) sorted = sortperm(invert(sums[key_names])) value_pairs = map(value_names) do value value => sums[Symbol(:result_, value)][sorted] end return DTable(DataFrame(key_pairs..., value_pairs...)) end @everywhere invert(x) = [[x[j][i] for j = 1:length(x)] for i = 1:length(x[1])] @everywhere function Base.reduce(f, df::DataFrames.AbstractDataFrame; cols) NamedTuple(col => reduce(f, df[!, col]) for col in cols) end @everywhere function Base.reduce(f, gdt::DataFrames.GroupedDataFrame; cols) gkeys = keys(gdt) dims = keys(gkeys[1]) merge( NamedTuple(dim => getproperty.(gkeys, dim) for dim in dims), NamedTuple( Symbol(:result_, col) => [reduce(f, gdt[k]; cols = [col])[col] for k in gkeys] for col in cols ), ) end ```
```julia @everywhere using MemPool, DataFrames, CSV if @isdefined(USE_DISK_CACHING) && USE_DISK_CACHING @info "disk caching enabled" @everywhere let total_mem = Sys.total_memory() ÷ 2 # mem_per_proc = Int(total_mem ÷ nprocs()) # This is too much memory for testing! mem_per_proc = 512 * 2^20 config = MemPool.DiskCacheConfig(; toggle = true, membound = mem_per_proc, diskbound = 20 * 2^30) MemPool.setup_global_device!(config) end else @info "no disk caching" end @everywhere struct DCTable ref::DRef DCTable(df::DataFrame) = new(poolset(df)) end # Call `copy` to be a fairer comparison to DTables.jl's `fetch`. # Code is much faster without `copy`! @everywhere Base.fetch(dt::DCTable) = copy(poolget(dt.ref)) @everywhere Base.length(dt::DCTable) = nrow(fetch(dt)) @everywhere function DataFrames.select(dt::DCTable, args...; kwargs...) df = fetch(dt) selected = select(df, args...; kwargs...) DCTable(selected) end @everywhere const DT = Ref{DCTable}() @everywhere mutable struct DCTableCols key_names value_names keys values end function main() remotecall_fetch(query, 2) end @everywhere function query() dt1 = load_dt() dt2 = add_value_col!(dt1) dt3 = update_value_col!(dt2) @info "" length(dt3) dt4 = calc_value_cols(dt3) dt5 = select(dt4, [6; 12; 103:113]...; copycols = false) dt_agg = aggregate_dt(dt5) return fetch(dt_agg) end @everywhere function load_dt() isassigned(DT) && return DT[] file = "file.csv" df = CSV.read(file, DataFrame) dt = DCTable(df) DT[] = dt return dt end @everywhere function add_value_col!(dt) dt_cols = create_dt_cols(dt, 1:48, 49:102) dt_cols.value_names = [dt_cols.value_names; "RAND"] dt_cols.values = (dt_cols.values..., rand(length(dt_cols.values[1]))) return create_dt_from_cols(dt_cols; is_sorted = true) end @everywhere function create_dt_cols(dt, key_cols, value_cols) df = fetch(dt) key_names = names(df)[key_cols] value_names = names(df)[value_cols] keys = [df[!, i] for i in key_cols] values = [df[!, i] for i in value_cols] return DCTableCols(key_names, value_names, keys, values) end @everywhere function create_dt_from_cols(dt_cols; is_sorted = false) df = DataFrame( (dt_cols.key_names .=> dt_cols.keys)..., (dt_cols.value_names .=> dt_cols.values)...; copycols = false, ) is_sorted || sort!(df) return DCTable(df) end @everywhere function update_value_col!(dt) dt_cols = create_dt_cols(dt, 1:48, 49:103) dt_cols.values = ( dt_cols.values[1:10]..., rand(length(dt_cols.values[1])), dt_cols.values[12:end]..., ) return create_dt_from_cols(dt_cols; is_sorted = true) end @everywhere function calc_value_cols(dt) newvals = Vector{Float64}[] for i = 1:10 v = calc_new_value(dt, i) push!(newvals, v) end return append_value_cols(dt, newvals) end @everywhere function calc_new_value(dt, i) dt_cols = create_dt_cols(dt, 1:48, 49:103) return abs.(dt_cols.values[i]) end @everywhere function append_value_cols(dt, newvals) df = fetch(dt) for (i, v) in enumerate(newvals) setproperty!(df, "NEW$i", v) end return DCTable(df) end @everywhere function aggregate_dt(dt) key_names = [Symbol("6"), Symbol("12")] gdt = groupby(fetch(dt), key_names) gkeys = sort!(collect(keys(gdt))) key_pairs = key_names .=> invert(gkeys) value_names = [[Symbol("RAND")]; Symbol.("NEW", 1:10)] sums = fetch(reduce(+, gdt; cols = value_names)) sorted = sortperm(invert(sums[key_names])) value_pairs = map(value_names) do value value => sums[Symbol(:result_, value)][sorted] end return DCTable(DataFrame(key_pairs..., value_pairs...)) end @everywhere invert(x) = [[x[j][i] for j = 1:length(x)] for i = 1:length(x[1])] @everywhere function Base.reduce(f, df::DataFrames.AbstractDataFrame; cols) NamedTuple(col => reduce(f, df[!, col]) for col in cols) end @everywhere function Base.reduce(f, gdt::DataFrames.GroupedDataFrame; cols) gkeys = keys(gdt) dims = keys(gkeys[1]) merge( NamedTuple(dim => getproperty.(gkeys, dim) for dim in dims), NamedTuple( Symbol(:result_, col) => [reduce(f, gdt[k]; cols = [col])[col] for k in gkeys] for col in cols ), ) end ```
Results: For each of the following I started Julia 1.9.4 with julia --project -p 4 -t 1 --heap-size-hint=3G
.
dtables.jl
:
julia> include("dtables.jl"); @time for i = 1:50
memusage = map(procs()) do id
remotecall_fetch(id) do
parse(Int, split(read(`ps -p $(getpid()) -o rss`, String), "\n")[end-1]) * 1000
end
end
totalmemusage = Base.format_bytes(sum(memusage))
worker_memusage = Base.format_bytes(memusage[2])
@info "$i" worker_memusage totalmemusage
main()
end
⋮
┌ Info: 50
│ worker_memusage = "24.741 GiB"
└ totalmemusage = "28.720 GiB"
129.672974 seconds (9.29 M allocations: 590.653 MiB, 0.12% gc time, 2.99% compilation time: 22% of which was recompilation)
custom.jl
:
julia> include("custom.jl"); @time for i = 1:50
memusage = map(procs()) do id
remotecall_fetch(id) do
parse(Int, split(read(`ps -p $(getpid()) -o rss`, String), "\n")[end-1]) * 1000
end
end
totalmemusage = Base.format_bytes(sum(memusage))
worker_memusage = Base.format_bytes(memusage[2])
@info "$i" worker_memusage totalmemusage
main()
end
⋮
┌ Info: 50
│ worker_memusage = "9.517 GiB"
└ totalmemusage = "11.166 GiB"
19.154547 seconds (882.60 k allocations: 61.428 MiB, 2.22% compilation time)
dtables.jl
with disk caching:
julia> USE_DISK_CACHING = true; include("dtables.jl"); @time for i = 1:50
memusage = map(procs()) do id
remotecall_fetch(id) do
parse(Int, split(read(`ps -p $(getpid()) -o rss`, String), "\n")[end-1]) * 1000
end
end
totalmemusage = Base.format_bytes(sum(memusage))
worker_memusage = Base.format_bytes(memusage[2])
device_size = remotecall_fetch(2) do
DTables.Dagger.MemPool.GLOBAL_DEVICE[].device_size[]
end |> Base.format_bytes
@info "$i" device_size worker_memusage totalmemusage
main()
end
⋮
┌ Info: 50
│ device_size = "10.812 GiB"
│ worker_memusage = "8.309 GiB"
└ totalmemusage = "12.879 GiB"
130.644326 seconds (9.75 M allocations: 621.888 MiB, 0.16% gc time, 3.08% compilation time: 21% of which was recompilation)
custom.jl
with disk caching:
julia> USE_DISK_CACHING = true; include("custom.jl"); @time for i = 1:50
memusage = map(procs()) do id
remotecall_fetch(id) do
parse(Int, split(read(`ps -p $(getpid()) -o rss`, String), "\n")[end-1]) * 1000
end
end
totalmemusage = Base.format_bytes(sum(memusage))
worker_memusage = Base.format_bytes(memusage[2])
device_size = remotecall_fetch(2) do
MemPool.GLOBAL_DEVICE[].device_size[]
end |> Base.format_bytes
@info "$i" device_size worker_memusage totalmemusage
main()
end
⋮
┌ Info: 50
│ device_size = "8.436 GiB"
│ worker_memusage = "6.284 GiB"
└ totalmemusage = "7.925 GiB"
39.369563 seconds (921.19 k allocations: 63.999 MiB, 0.03% gc time, 1.27% compilation time)
``` [336ed68f] CSV v0.10.12 [20c56dc6] DTables v0.4.3 [a93c6f00] DataFrames v1.6.1 [f9f48841] MemPool v0.4.6 [8ba89e20] Distributed ```
I'd like to try to reproduce this locally so I can figure out where Dagger is adding overwhelming overhead (which in general it should not, when working with sufficiently large files). Can you post your file.csv
to somewhere I can download it, or provide a script for generating a compatible equivalent? The one I have locally is apparently not in the correct format.
Here's a script that generates a .csv file that can reproduce the above behavior:
using CSV, DataFrames, InlineStrings, Random
const NROWS = 233930
const NCOLS = 102
const ELTYPE = [Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, InlineStrings.String1, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, InlineStrings.String15, Int64, Int64, Int64, Int64, InlineStrings.String1, Int64, Int64, Int64, Int64, Int64]
const NUM_UNIQUE = [12, 1, 1, 1, 1, 3, 12, 2, 4, 1, 12, 9, 13, 32, 1292, 13, 32, 493, 2, 3, 3, 3, 3, 2, 3, 2, 367, 462, 8, 369, 192, 28, 28, 193, 43, 243, 243, 48871, 4, 8, 10, 2, 3, 3, 5, 3, 3, 5]
function generate()
Random.seed!(0)
input = map(1:NCOLS) do i
name = string(i)
if i <= length(ELTYPE)
if ELTYPE[i] isa AbstractString
# Oops, this branch is never taken, but the resulting file still reproduces the issue.
col = string.(rand(1:NUM_UNIQUE[i]-1, NROWS))
col[1:12] .= " "
return name => ELTYPE[i].(col)
else
col = rand(1:NUM_UNIQUE[i], NROWS)
return name => col
end
else
col = rand(NROWS)
return name => col
end
end
df = DataFrame(input)
sort!(@view(df[13:end, :]), 1:length(ELTYPE))
CSV.write("file.csv", df)
return df
end
generate()
I have the following setup:
julia --project -t1 --heap-size-hint=3G
addprocs(4; exeflags = "--heap-size-hint=3G")
The actual query includes loading a table from a .csv file into a
DTable
(with aDataFrame
table type). Operations includeselect
ing columns,fetch
ing the table into aDataFrame
for adding/removing rows/columns and other processing as needed, and re-wrapping the table in aDTable
to later be processed further. At the end of processing, the result is returned as aDataFrame
.The .csv file contains a table with 233930 rows and 102 columns: 1 column of
InlineStrings.String15
, 2 columns ofInlineStrings.String1
, 45 columns ofInt64
, and 54 columns ofFloat64
.The issue: I noticed that if I keep running the same query repeatedly, the
MemPool.datastore
on worker 2 consumes more and more memory, as determined byEventually, the memory usage grows enough to cause my WSL 2 Linux OOM manager to kill worker 2, crashing my program.
Notably, I do not observe this growth in memory usage in the following scenarios:
addprocs
), orDataFrame
s exclusively (i.e., not using DTables.jl at all).I do observe this growth in memory usage in the following additional scenarios:
NamedTuple
as the table type for theDTable
s, orMemPool.datastore
on worker 1 (not worker 2) is what consumes more and more memory. However, I never ran into any issues with the OOM manager killing my processes.)I'm posting this issue in DTables.jl in case there's something DTables.jl is doing that somehow causes the MemPool.jl data store to keep references around longer than expected, but of course please transfer this issue to Dagger.jl or MemPool.jl as needed.
Please let me know if there is any other information that would help with finding the root cause of this issue.