JuliaFolds / Transducers.jl

Efficient transducers for Julia
https://juliafolds.github.io/Transducers.jl/dev/
MIT License
433 stars 24 forks source link

Scoping error (?) with ThreadsX and DataFrames #555

Closed yuvalwas closed 1 year ago

yuvalwas commented 1 year ago

I'm not even sure this has to do with Transducers.jl. It took some effort to provide an MRE, as I really had no idea what was going on:

using DataFrames
using ThreadsX

## this one errors, description in the end
function main() 

    function fun(n) 
        df = DataFrame(a = Int[], b = Int[])
        for i in 1:2
            push!(df, (i,i))
        end
        df
    end

    df = vcat(ThreadsX.map(fun, 1:2)...)
    df
end

df = main()

## This is fine, the `push!` is apparently important to the error
function main2()

    function fun(n) 
        df = DataFrame(a = 1:2, b = 1:2)
        df
    end

    df = vcat(ThreadsX.map(fun, 1:2)...)
    df
end

df = main2()

## This is also fine, using `map` instead of `ThreadsX.map`
function main3()

    function fun(n) 
        df = DataFrame(a = Int[], b = Int[])
        for i in 1:2
            push!(df, (i,i))
        end
        df
    end

    df = vcat(map(fun, 1:2)...)
    df
end

df = main3()

## This is also fine, when I don't refer in main to `df` but call it `df_` instead
function main4()

    function fun(n) 
        # df = DataFrame(a = 1:n, b = 1:n)
        df = DataFrame(a = Int[], b = Int[])
        for i in 1:2
            push!(df, (i,i))
        end
        df
    end

    df_ = vcat(ThreadsX.map(fun, 1:2)...)
    df_
end

df = main4()

The error:

ERROR: TaskFailedException
Stacktrace:
  [1] wait
    @ .\task.jl:349 [inlined]
  [2] fetch
    @ .\task.jl:369 [inlined]
  [3] _reduce(ctx::Transducers.CancellableDACContext, rf::Transducers.Reduction{Transducers.Map{Transducers.var"#193#194"{Vector, Transducers.Map{var"#fun#7"}}}, Transducers.BottomRF{Transducers.Completing{typeof(BangBang.append!!)}}}, init::BangBang.NoBang.Empty{Vector}, reducible::Transducers.SizedReducible{Vector{UnitRange{Int64}}, Int64})
    @ Transducers C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\reduce.jl:160
  [4] _transduce_assoc_nocomplete
    @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\reduce.jl:139 [inlined]
  [5] transduce_assoc(xform::Transducers.Map{Transducers.var"#193#194"{Vector, Transducers.Map{var"#fun#7"}}}, step::Transducers.Completing{typeof(BangBang.append!!)}, init::BangBang.NoBang.Empty{Vector}, coll0::Vector{UnitRange{Int64}}; simd::Val{false}, basesize::Nothing, stoppable::Nothing, nestlevel::Nothing)
    @ Transducers C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\reduce.jl:108
  [6] transduce_assoc
    @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\reduce.jl:84 [inlined]
  [7] foldxt(step::typeof(BangBang.append!!), xform::Transducers.Map{Transducers.var"#193#194"{Vector, Transducers.Map{var"#fun#7"}}}, itr::Vector{UnitRange{Int64}}; init::BangBang.NoBang.Empty{Vector},
 kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Transducers C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\reduce.jl:246
  [8] foldxt
    @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\reduce.jl:246 [inlined]
  [9] #_tcopy#192
    @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\reduce.jl:359 [inlined]
 [10] _tcopy
    @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\reduce.jl:356 [inlined]
 [11] #tcopy#190
    @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\reduce.jl:354 [inlined]
 [12] tcopy
    @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\reduce.jl:354 [inlined]
 [13] #tcollect#200
    @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\reduce.jl:434 [inlined]
 [14] __map(f::Function, itr::UnitRange{Int64}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ ThreadsX.Implementations C:\Users\Optimus Prime\.julia\packages\ThreadsX\vXfYv\src\map.jl:1
 [15] __map
    @ C:\Users\Optimus Prime\.julia\packages\ThreadsX\vXfYv\src\map.jl:1 [inlined]
 [16] _map(::Function, ::UnitRange{Int64}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ ThreadsX.Implementations C:\Users\Optimus Prime\.julia\packages\ThreadsX\vXfYv\src\map.jl:13
 [17] _map
    @ C:\Users\Optimus Prime\.julia\packages\ThreadsX\vXfYv\src\map.jl:12 [inlined]
 [18] map(::Function, ::UnitRange{Int64}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ ThreadsX.Implementations C:\Users\Optimus Prime\.julia\packages\ThreadsX\vXfYv\src\map.jl:30
 [19] map
    @ C:\Users\Optimus Prime\.julia\packages\ThreadsX\vXfYv\src\map.jl:20 [inlined]
 [20] main()
    @ Main c:\Users\Optimus Prime\Projects\clusterless\bug.jl:15
 [21] top-level scope
    @ c:\Users\Optimus Prime\Projects\clusterless\bug.jl:19

    nested task error: AssertionError: length(col2) == nrows
    Stacktrace:
      [1] _row_inserter!(df::DataFrame, loc::Int64, row::Tuple{Int64, Int64}, mode::Val{:push}, promote::Bool)
        @ DataFrames C:\Users\Optimus Prime\.julia\packages\DataFrames\LteEl\src\dataframe\insertion.jl:690
      [2] #push!#342
        @ C:\Users\Optimus Prime\.julia\packages\DataFrames\LteEl\src\dataframe\insertion.jl:455 [inlined]
      [3] push!(df::DataFrame, row::Tuple{Int64, Int64})
        @ DataFrames C:\Users\Optimus Prime\.julia\packages\DataFrames\LteEl\src\dataframe\insertion.jl:455
      [4] (::var"#fun#7")(n::Int64)
        @ Main c:\Users\Optimus Prime\Projects\clusterless\bug.jl:10
      [5] next
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\library.jl:54 [inlined]
      [6] macro expansion
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\core.jl:181 [inlined]
      [7] _foldl_array
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\processes.jl:187 [inlined]
      [8] __foldl__
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\processes.jl:182 [inlined]
      [9] #transduce#142
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\processes.jl:523 [inlined]
     [10] transduce
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\processes.jl:512 [inlined]
     [11] transduce(xform::Transducers.Composition{Transducers.Map{var"#fun#7"}, Transducers.Enumerate{Int64}}, f::Transducers.AdHocRF{Transducers.var"#151#154"{MicroCollections.UndefVector{Union{}, typeof(MicroCollections.default_factory)}}, typeof(identity), Transducers.var"#rf#153", typeof(identity), typeof(identity), Nothing}, init::MicroCollections.UndefVector{Union{}, typeof(MicroCollections.default_factory)}, coll::UnitRange{Int64}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
        @ Transducers C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\processes.jl:506
     [12] transduce
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\processes.jl:504 [inlined]
     [13] _copy(xf::Transducers.Map{var"#fun#7"}, #unused#::Type{Vector}, arr::UnitRange{Int64}, #unused#::Transducers.SizeStable, #unused#::Base.HasShape{1})
        @ Transducers C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\processes.jl:924
     [14] copy
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\processes.jl:884 [inlined]
     [15] #193
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\reduce.jl:359 [inlined]
     [16] next
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\library.jl:54 [inlined]
     [17] next(rf::Transducers.Reduction{Transducers.NoComplete, Transducers.Reduction{Transducers.Map{Transducers.var"#193#194"{Vector, Transducers.Map{var"#fun#7"}}}, Transducers.BottomRF{Transducers.Completing{typeof(BangBang.append!!)}}}}, result::BangBang.NoBang.Empty{Vector}, input::UnitRange{Int64})
        @ Transducers C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\core.jl:787
     [18] macro expansion
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\core.jl:181 [inlined]
     [19] _foldl_array
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\processes.jl:187 [inlined]
     [20] __foldl__
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\processes.jl:182 [inlined]
     [21] foldl_basecase
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\processes.jl:365 [inlined]
     [22] _reduce_basecase(rf::Transducers.Reduction{Transducers.Map{Transducers.var"#193#194"{Vector, Transducers.Map{var"#fun#7"}}}, Transducers.BottomRF{Transducers.Completing{typeof(BangBang.append!!)}}}, init::BangBang.NoBang.Empty{Vector}, reducible::Transducers.SizedReducible{SubArray{UnitRange{Int64}, 1, Vector{UnitRange{Int64}}, Tuple{UnitRange{Int64}}, true}, Int64})
        @ Transducers C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\threading_utils.jl:58
     [23] _reduce(ctx::Transducers.CancellableDACContext, rf::Transducers.Reduction{Transducers.Map{Transducers.var"#193#194"{Vector, Transducers.Map{var"#fun#7"}}}, Transducers.BottomRF{Transducers.Completing{typeof(BangBang.append!!)}}}, init::BangBang.NoBang.Empty{Vector}, reducible::Transducers.SizedReducible{SubArray{UnitRange{Int64}, 1, Vector{UnitRange{Int64}}, Tuple{UnitRange{Int64}}, true}, Int64})
        @ Transducers C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\reduce.jl:150
     [24] (::Transducers.var"#178#179"{Transducers.Reduction{Transducers.Map{Transducers.var"#193#194"{Vector, Transducers.Map{var"#fun#7"}}}, Transducers.BottomRF{Transducers.Completing{typeof(BangBang.append!!)}}}, BangBang.NoBang.Empty{Vector}, Transducers.CancellableDACContext, Transducers.SizedReducible{SubArray{UnitRange{Int64}, 1, Vector{UnitRange{Int64}}, Tuple{UnitRange{Int64}}, true}, Int64}})()
        @ Transducers .\threadingconstructs.jl:373

    caused by: AssertionError: length(col) == nrows
    Stacktrace:
      [1] _row_inserter!(df::DataFrame, loc::Int64, row::Tuple{Int64, Int64}, mode::Val{:push}, promote::Bool)
        @ DataFrames C:\Users\Optimus Prime\.julia\packages\DataFrames\LteEl\src\dataframe\insertion.jl:657
      [2] #push!#342
        @ C:\Users\Optimus Prime\.julia\packages\DataFrames\LteEl\src\dataframe\insertion.jl:455 [inlined]
      [3] push!(df::DataFrame, row::Tuple{Int64, Int64})
        @ DataFrames C:\Users\Optimus Prime\.julia\packages\DataFrames\LteEl\src\dataframe\insertion.jl:455
      [4] (::var"#fun#7")(n::Int64)
        @ Main c:\Users\Optimus Prime\Projects\clusterless\bug.jl:10
      [5] next
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\library.jl:54 [inlined]
      [6] macro expansion
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\core.jl:181 [inlined]
      [7] _foldl_array
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\processes.jl:187 [inlined]
      [8] __foldl__
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\processes.jl:182 [inlined]
      [9] #transduce#142
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\processes.jl:523 [inlined]
     [10] transduce
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\processes.jl:512 [inlined]
     [11] transduce(xform::Transducers.Composition{Transducers.Map{var"#fun#7"}, Transducers.Enumerate{Int64}}, f::Transducers.AdHocRF{Transducers.var"#151#154"{MicroCollections.UndefVector{Union{}, typeof(MicroCollections.default_factory)}}, typeof(identity), Transducers.var"#rf#153", typeof(identity), typeof(identity), Nothing}, init::MicroCollections.UndefVector{Union{}, typeof(MicroCollections.default_factory)}, coll::UnitRange{Int64}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
        @ Transducers C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\processes.jl:506
     [12] transduce
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\processes.jl:504 [inlined]
     [13] _copy(xf::Transducers.Map{var"#fun#7"}, #unused#::Type{Vector}, arr::UnitRange{Int64}, #unused#::Transducers.SizeStable, #unused#::Base.HasShape{1})
        @ Transducers C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\processes.jl:924
     [14] copy
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\processes.jl:884 [inlined]
     [15] #193
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\reduce.jl:359 [inlined]
     [16] next
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\library.jl:54 [inlined]
     [17] next(rf::Transducers.Reduction{Transducers.NoComplete, Transducers.Reduction{Transducers.Map{Transducers.var"#193#194"{Vector, Transducers.Map{var"#fun#7"}}}, Transducers.BottomRF{Transducers.Completing{typeof(BangBang.append!!)}}}}, result::BangBang.NoBang.Empty{Vector}, input::UnitRange{Int64})
        @ Transducers C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\core.jl:787
     [18] macro expansion
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\core.jl:181 [inlined]
     [19] _foldl_array
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\processes.jl:187 [inlined]
     [20] __foldl__
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\processes.jl:182 [inlined]
     [21] foldl_basecase
        @ C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\processes.jl:365 [inlined]
     [22] _reduce_basecase(rf::Transducers.Reduction{Transducers.Map{Transducers.var"#193#194"{Vector, Transducers.Map{var"#fun#7"}}}, Transducers.BottomRF{Transducers.Completing{typeof(BangBang.append!!)}}}, init::BangBang.NoBang.Empty{Vector}, reducible::Transducers.SizedReducible{SubArray{UnitRange{Int64}, 1, Vector{UnitRange{Int64}}, Tuple{UnitRange{Int64}}, true}, Int64})
        @ Transducers C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\threading_utils.jl:58
     [23] _reduce(ctx::Transducers.CancellableDACContext, rf::Transducers.Reduction{Transducers.Map{Transducers.var"#193#194"{Vector, Transducers.Map{var"#fun#7"}}}, Transducers.BottomRF{Transducers.Completing{typeof(BangBang.append!!)}}}, init::BangBang.NoBang.Empty{Vector}, reducible::Transducers.SizedReducible{SubArray{UnitRange{Int64}, 1, Vector{UnitRange{Int64}}, Tuple{UnitRange{Int64}}, true}, Int64})
        @ Transducers C:\Users\Optimus Prime\.julia\packages\Transducers\IWhZW\src\reduce.jl:150
     [24] (::Transducers.var"#178#179"{Transducers.Reduction{Transducers.Map{Transducers.var"#193#194"{Vector, Transducers.Map{var"#fun#7"}}}, Transducers.BottomRF{Transducers.Completing{typeof(BangBang.append!!)}}}, BangBang.NoBang.Empty{Vector}, Transducers.CancellableDACContext, Transducers.SizedReducible{SubArray{UnitRange{Int64}, 1, Vector{UnitRange{Int64}}, Tuple{UnitRange{Int64}}, true}, Int64}})()
        @ Transducers .\threadingconstructs.jl:373
MasonProtter commented 1 year ago

Yeah, this isn't a problem with Trasducers.jl per se, but really just a consequence of julia's own scoping rules. I can also reproduce the problem just using Base's Threads.@threads instead of ThreadsX.jl

Because you reference df in the function main as well as inside the inner function fun, then df becomes a captured variable in the closure and you get a race condition.

For example, if we instead write:

function main() 
    function fun(n) 
        local df = DataFrame(a = Int[], b = Int[]) # <------ Note the use of `local` here
        for i in 1:2
            push!(df, (i,i))
        end
        df
    end
    df = vcat(ThreadsX.map(fun, 1:2)...)
    df
end

there's no problem. There's also no problem if we just used a different name outside than inside.