beacon-biosignals / MockTableGenerators.jl

Generate realistic mock datasets as dependent Tables.jl tables
MIT License
1 stars 0 forks source link

errors are not surfaced unless channel is immediately `collect`ed #4

Open kleinschmidt opened 1 year ago

kleinschmidt commented 1 year ago

MWE:

               _
   _       _ _(_)_     |  Documentation: https://docs.julialang.org
  (_)     | (_) (_)    |
   _ _   _| |_  __ _   |  Type "?" for help, "]?" for Pkg help.
  | | | | | | |/ _` |  |
  | | |_| | | | (_| |  |  Version 1.8.5 (2023-01-08)
 _/ |\__'_|_|_|\__'_|  |  Official https://julialang.org/ release
|__/                   |

julia> import Pkg; Pkg.activate("/Users/dkleinschmidt/.julia/dev/MockTableGenerators/")
  Activating project at `~/.julia/dev/MockTableGenerators`

julia> using MockTableGenerators

julia> struct Gen <: TableGenerator end

julia> g = Gen()
Gen()

julia> c = MockTableGenerators.generate(g)
Channel{Any}(10) (closed)

julia> collect(c)
Any[]

julia> c = collect(MockTableGenerators.generate(g))
ERROR: TaskFailedException
Stacktrace:
  [1] try_yieldto(undo::typeof(Base.ensure_rescheduled))
    @ Base ./task.jl:871
  [2] wait()
    @ Base ./task.jl:931
  [3] wait(c::Base.GenericCondition{ReentrantLock})
    @ Base ./condition.jl:124
  [4] take_buffered(c::Channel{Any})
    @ Base ./channels.jl:416
  [5] take!
    @ ./channels.jl:410 [inlined]
  [6] iterate(c::Channel{Any}, state::Nothing)
    @ Base ./channels.jl:498
  [7] iterate
    @ ./channels.jl:496 [inlined]
  [8] _collect(cont::UnitRange{Int64}, itr::Channel{Any}, #unused#::Base.HasEltype, isz::Base.SizeUnknown)
    @ Base ./array.jl:723
  [9] collect(itr::Channel{Any})
    @ Base ./array.jl:712
 [10] top-level scope
    @ REPL[9]:1

    nested task error: MethodError: no method matching table_key(::Gen)
    Stacktrace:
     [1] _generate!(callback::MockTableGenerators.var"#4#6"{Channel{Any}}, rng::Random._GLOBAL_RNG, gen::Gen, deps::Dict{Any, Any})
       @ MockTableGenerators ~/.julia/dev/MockTableGenerators/src/generate.jl:105
     [2] generate(callback::Function, rng::Random._GLOBAL_RNG, dag::Gen)
       @ MockTableGenerators ~/.julia/dev/MockTableGenerators/src/generate.jl:71
     [3] (::MockTableGenerators.var"#3#5"{Random._GLOBAL_RNG, Gen})(ch::Channel{Any})
       @ MockTableGenerators ~/.julia/dev/MockTableGenerators/src/generate.jl:62
     [4] (::Base.var"#591#592"{MockTableGenerators.var"#3#5"{Random._GLOBAL_RNG, Gen}, Channel{Any}})()
       @ Base ./channels.jl:134
kleinschmidt commented 1 year ago

I think this might be a race condition with a buffered channel. bound task failure will be propagated to any task waiting on the channel, but if it's closed by task completion before the buffer is full, then there's no waiting. Here's an even more MWE of this behavior:

julia> c = Channel(1) do ch
           put!(ch, nothing)
           error("aHHHH")
       end
Channel{Any}(1) (closed)

julia> collect(c)
1-element Vector{Any}:
 nothing

julia> collect(Channel(1) do ch
           put!(ch, nothing)
           error("aHHHH")
       end)
ERROR: TaskFailedException
Stacktrace:
 [1] try_yieldto(undo::typeof(Base.ensure_rescheduled))
   @ Base ./task.jl:871
 [2] wait()
   @ Base ./task.jl:931
 [3] wait(c::Base.GenericCondition{ReentrantLock})
   @ Base ./condition.jl:124
 [4] take_buffered(c::Channel{Any})
   @ Base ./channels.jl:416
 [5] take!
   @ ./channels.jl:410 [inlined]
 [6] iterate(c::Channel{Any}, state::Nothing)
   @ Base ./channels.jl:498
 [7] _collect(cont::UnitRange{Int64}, itr::Channel{Any}, #unused#::Base.HasEltype, isz::Base.SizeUnknown)
   @ Base ./array.jl:725
 [8] collect(itr::Channel{Any})
   @ Base ./array.jl:712
 [9] top-level scope
   @ REPL[20]:1

    nested task error: aHHHH
    Stacktrace:
     [1] error(s::String)
       @ Base ./error.jl:35
     [2] (::var"#9#10")(ch::Channel{Any})
       @ Main ./REPL[20]:3
     [3] (::Base.var"#591#592"{var"#9#10", Channel{Any}})()
       @ Base ./channels.jl:134

julia> c = Channel(0) do ch
           put!(ch, nothing)
           error("aHHHH")
       end
Channel{Any}(0) (1 item available)

julia> collect(c)
ERROR: TaskFailedException
Stacktrace:
 [1] try_yieldto(undo::typeof(Base.ensure_rescheduled))
   @ Base ./task.jl:871
 [2] wait()
p   @ Base ./task.jl:931
 [3] wait(c::Base.GenericCondition{ReentrantLock})
   @ Base ./condition.jl:124
 [4] take_unbuffered(c::Channel{Any})
   @ Base ./channels.jl:433
 [5] take!
   @ ./channels.jl:410 [inlined]
 [6] iterate(c::Channel{Any}, state::Nothing)
   @ Base ./channels.jl:498
 [7] _collect(cont::UnitRange{Int64}, itr::Channel{Any}, #unused#::Base.HasEltype, isz::Base.SizeUnknown)
   @ Base ./array.jl:725
 [8] collect(itr::Channel{Any})
   @ Base ./array.jl:712
 [9] top-level scope
   @ REPL[22]:1

    nested task error: aHHHH
    Stacktrace:
     [1] error(s::String)
       @ Base ./error.jl:35
     [2] (::var"#11#12")(ch::Channel{Any})
       @ Main ./REPL[21]:3
     [3] (::Base.var"#591#592"{var"#11#12", Channel{Any}})()
       @ Base ./channels.jl:134
kleinschmidt commented 1 year ago

okay the underlying issue here is this: when we iterate a channel, it checks if the channel is open or ready; if it's neither of these, it returns nothing to indicate that iteration is complete. we can't wait on teh channel at that point because if the channel is closed, then wait will throw a different error (invalid state exception). so if we're going to be able to reliably surface errors encountered during table generation, we need to somehow be able to keep and surface a reference to the bound task.

Another possibility would be to have a "sync" mode which will immediately collect/wait on the channel adn return that value. then we're more or less guaranteed to be waiting when the error is reported...

kleinschmidt commented 1 year ago

A final possibility would be to use an unbuffered channel so that all iteration calls will involve a wait; that seems the safest to me actually since I don't think there's a huge benefit to using a buffered channel if there's nothing async happening on the consumer end. we could change the default for channel capacity to be 0 and allow users to set it higher if they need buffering for some reason.

ericphanson commented 1 year ago

This is probably out of scope for MockTableGenerators, but I think a useful primitive would be an object that wraps a channel but adds nicer semantics on top. Ideally it could be close to drop-in replacement for a channel, but it would store the bound task (using taskref kwarg in the channel to get it out) and it would implement iterate so that errors are propogated better and to avoid the need for stuff like https://github.com/beacon-biosignals/OndaBatches.jl/blob/fa7550a5f25cda2979017f8aa15ffc11d019758f/src/utils.jl#L37-L83 as much. Could also have maybetake! and tryput! from https://github.com/JuliaLang/julia/pull/41966.

kleinschmidt commented 1 year ago

Yeah I dunno, it's tricky because there's two things getting mixed in here: teh channel and the (bound) task. We could maybe create a TaskChannel though that would check the status of the task before every take!/fetch/wait/put!, but it gets a bit circular when considering what gets passed to the task (channel operations inside the task by definition can't benefit from the wrapping...)

iamed2 commented 3 months ago

Fixed by https://github.com/JuliaLang/julia/pull/52981?

iamed2 commented 3 months ago

To obtain similar behaviour as the fix, we could wrap the channel in an iterator that passes through iterate to the channel, but when it sees iterate(channel) === nothing, does:

try
    wait(channel)
catch ex
    ex isa InvalidStateException || rethrow(ex)
end