JuliaML / MLUtils.jl

Utilities and abstractions for Machine Learning tasks
MIT License
107 stars 20 forks source link

`TaskPoolEx` leads to unreliable Dataloaders #142

Closed RomeoV closed 1 year ago

RomeoV commented 1 year ago

When a program is interrupted (e.g. by any error) while a DataLoader is being read, the Dataloader can not be read from anymore, and the Julia session needs to be restarted (which can be very annoying). I've dug into the details and found that the reason is that the default parallel exector is set to

_default_executor() = TaskPoolEx(basesize=1, background=Threads.nthreads() > 1)

So it seems that the TaskPoolEx is globally defined (a sort of singleton?) and may hang, such that we can't execute in parallel anymore...

Here's a MWE

# launch with `julia --threads auto`
using FastAI
using FastAI.Flux
using FastVision
using FastAI.Flux.MLUtils
using FastAI.MLUtils.Transducers: ThreadedEx

path = load(datasets()["CUB_200_2011"])
data = FastAI.Datasets.loadfolderdata(
    path,
    filterfn = FastVision.isimagefile,
    loadfn = loadfile,
)
dl = DataLoader(data; parallel=true)
collect(dl);  # <--- interrupt this with Ctrl-C
first(dl)     # <--- this will wait forever now

dl = DataLoader(data; parallel=true)  # <--- even redefining doesn't help
first(dl)     # <--- this will wait forever now

# however we can fix it!
import FastAI.Flux.MLUtils._default_executor
_default_executor() = ThreadedEx()
dl = DataLoader(data; parallel=true)
first(dl)  # now it works again

I therefore recommend that the default executor stays ThreadedEx, and a user may choose to try the ThreadPoolEx if they want to speed up performance even more. However, I would also expect thread pools to not be that helpful, since usually dataloaders are not created at such a high rate.

RomeoV commented 1 year ago

Also the comment claims that thread pools are not used anyways, so there seems to be some confusion one way or the other. Perhaps related #33 ? https://github.com/JuliaML/MLUtils.jl/blob/ff2fcc1ba9e5690c0e393fe7e5003fcfabcd2d19/src/parallel.jl#L72-L76

lorenzoh commented 1 year ago

That comment must be outdated đŸ˜… The issue with ThreadedEx was that it would utilize every thread, including the primary one, leading amongst other things, to an SSH connection freezing because all CPU was being used by Julia.

Before moving back to ThreadedEx, we would have to ascertain that

What have been your experiences, if any, running GPU training with ThreadedEx?

RomeoV commented 1 year ago

I would assume this is easily fixed by running e.g. julia --threads 14 if your machine has 16 threads etc, no? Also, in cases where you want to set it and forget it, the fact that it can utilize that much CPU actually seems like a good sign to me.

RomeoV commented 1 year ago

Note also what it says in the TaskPoolEx documentation:

Warning: It is highly discouraged to use this executor in Julia packages; especially those that are used as libraries rather than end-user applications. This is because the whole purpose of this executor is to prevent Julia runtime from doing the right thing for managing tasks. Ideally, the library user should be able to pass an executor as an argument so that your library function can be used with any executors including TaskPoolEx.

RomeoV commented 1 year ago

Regarding the GPU utilization question, my mental model tells me this:

  1. The data preprocessing is faster than the GPU processing. In that case, the cpu threads sit idle, waiting to push the results into a channel, and are free to do any data moving while they're waiting.
  2. The data preprocessing is slower than the GPU processing. In that case, the cpu should be running with all resources.

Either way, benchmarking it would probably be a good idea, but I would expect differerent results depending on the batch size, data type, gpu model, etc.

jochalek commented 1 year ago

I think I encountered this same problem. Attempting to use DataLoader(...; parallel = true ) in some cases will result in it hanging. Reverting to ThreadedEx fixes the problem.

Modifying the example in docs results in this MWE:

using MLUtils

Xtrain = rand(10, 100);
array_loader = DataLoader(Xtrain; batchsize=2, parallel=true);
first(array_loader)     # <--- this will hang forever.

A smaller data size does work for me

using MLUtils

Xtrain = rand(10, 50);
array_loader = DataLoader(Xtrain; batchsize=2, parallel=true);
first(array_loader) 

Stacktrace from interrupting the hanging DataLoader

julia> first(array_loader)
^CERROR: InterruptException:
Stacktrace:
  [1] try_yieldto(undo::typeof(Base.ensure_rescheduled))
    @ Base ./task.jl:871
  [2] wait()
    @ Base ./task.jl:931
  [3] wait(c::Base.GenericCondition{ReentrantLock})
    @ Base ./condition.jl:124
  [4] take_buffered(c::Channel{Any})
    @ Base ./channels.jl:416
  [5] take!(c::Channel{Any})
    @ Base ./channels.jl:410
  [6] iterate(#unused#::MLUtils.Loader, state::MLUtils.LoaderState)
    @ MLUtils ~/.julia/packages/MLUtils/R44Zf/src/parallel.jl:140
  [7] iterate(loader::MLUtils.Loader)
    @ MLUtils ~/.julia/packages/MLUtils/R44Zf/src/parallel.jl:132
  [8] iterate(e::DataLoader{Matrix{Float64}, Random._GLOBAL_RNG, Val{nothing}})
    @ MLUtils ~/.julia/packages/MLUtils/R44Zf/src/eachobs.jl:173
  [9] first(itr::DataLoader{Matrix{Float64}, Random._GLOBAL_RNG, Val{nothing}})
    @ Base ./abstractarray.jl:424
 [10] top-level scope
    @ REPL[5]:1

What have been your experiences, if any, running GPU training with ThreadedEx?

I have had no problems with GPU training and ThreadedEx. Generally for long running tasks I'll SSH and start training inside a tmux session, and then leave and let it run. SSH back into the machine while training is running has been fine. This has been my experience playing with Kaggle datasets on FastAI.jl and Flux.

ToucheSir commented 1 year ago

How friendly is stopping a DataLoader using ThreadedEx using Ctrl+C? I think that would be a good test to run: create a dataset which doesn't require any network or disk access to load data and see if running a dataloader over it can be interrupted.

jochalek commented 1 year ago

After 'dev MLUtils' with ThreadedEx:

julia> using MLUtils

julia> Xtrain = rand(10, 100);

julia> Ytrain = rand('a':'z', 100);

julia> train_loader = DataLoader((data=Xtrain, label=Ytrain), batchsize=5, shuffle=true, parallel=true);

julia> for epoch in 1:10000
                for (x, y) in train_loader  # access via tuple destructuring
                  @assert size(x) == (10, 5)
                  @assert size(y) == (5,)
                  # loss += f(x, y) # etc, runs 100 * 20 times
                end
              end
^CERROR: InterruptException:
Stacktrace:
 [1] try_yieldto(undo::typeof(Base.ensure_rescheduled))
   @ Base ./task.jl:871
 [2] wait()
   @ Base ./task.jl:931
 [3] wait(c::Base.GenericCondition{ReentrantLock})
   @ Base ./condition.jl:124
 [4] take_buffered(c::Channel{Any})
   @ Base ./channels.jl:416
 [5] take!(c::Channel{Any})
   @ Base ./channels.jl:410
 [6] iterate(#unused#::MLUtils.Loader, state::MLUtils.LoaderState)
   @ MLUtils ~/.julia/dev/MLUtils/src/parallel.jl:143
 [7] iterate(#unused#::DataLoader{NamedTuple{(:data, :label), Tuple{Matrix{Float64}, Vector{Char}}}, Random._GLOBAL_RNG, Val{nothing}}, ::Tuple{MLUtils.Loader, MLUtils.LoaderState})
   @ MLUtils ~/.julia/dev/MLUtils/src/eachobs.jl:179
 [8] top-level scope
   @ ./REPL[17]:6

julia> for epoch in 1:10000
                for (x, y) in train_loader  # access via tuple destructuring
                  @assert size(x) == (10, 5)
                  @assert size(y) == (5,)
                  # loss += f(x, y) # etc, runs 100 * 20 times
                end
              end

julia> train_loader = DataLoader((data=Xtrain, label=Ytrain), batchsize=10, shuffle=true, parallel=true); # Redefined DataLoader

julia> for epoch in 1:10000
                for (x, y) in train_loader  # access via tuple destructuring
                  @assert size(x) == (10, 5)
                  @assert size(y) == (5,)
                  # loss += f(x, y) # etc, runs 100 * 20 times
                end
              end
ERROR: AssertionError: size(x) == (10, 5)   # <--- expected with batchsize=10 in redefined DataLoader
Stacktrace:
 [1] top-level scope
   @ ./REPL[19]:3

julia>

Does the above example meet the criteria you intended to test? Interrupting DataLoader with ThreadedEx seems okay.

RomeoV commented 1 year ago

How friendly is stopping a DataLoader using ThreadedEx using Ctrl+C? I think that would be a good test to run: create a dataset which doesn't require any network or disk access to load data and see if running a dataloader over it can be interrupted.

Usually, the situation is that I start a model training, but notice after a few epochs that the loss isn't behaving as I want it to, I notice a bug in the code, etc. and so I interrupt the training with Ctrl+C. This lead me to the situation where i can not restart training without restarting the REPL, which takes several minutes for my application.

ToucheSir commented 1 year ago

Good to know. I'll let @lorenzoh make the final call about whether to switch over.

lorenzoh commented 1 year ago

Sorry for my late reply! I think with all the issues cropping up it seems like a good idea to revert to ThreadedEx. I'll leave some additional comments on the PR. Thanks everyone for the discussion!