dfdx / Spark.jl

Julia binding for Apache Spark
Other
205 stars 39 forks source link

Make reduce more resilient. #30

Closed aviks closed 7 years ago

aviks commented 7 years ago

In Julia, a reduce operation will usually throw an exception for empty collections. When a spark reduce gets mapped to the partitions, on some partitions the Julia reduction would get run on empty collections, even if the top level RDD is not empty.

aviks commented 7 years ago

Note that this change is required when running in a cluster. In local mode, everything works fine. But when I try this on a 3 node cluster, a group_by_key followed by a reduce fails without this change.

dfdx commented 7 years ago

I see. Have you received an inline comment I posted?

On Mon, May 8, 2017 at 11:47 AM Avik Sengupta notifications@github.com wrote:

Note that this change is required when running in a cluster. In local mode, everything works fine. But when I try this on a 3 node cluster, a group_by_key followed by a reduce fails without this change.

— You are receiving this because you commented.

Reply to this email directly, view it on GitHub https://github.com/dfdx/Spark.jl/pull/30#issuecomment-299809121, or mute the thread https://github.com/notifications/unsubscribe-auth/AAaIiwEs0j1plGC6gTVQQMALONVAV_3tks5r3taGgaJpZM4NSCiz .

aviks commented 7 years ago

No, can't see any inline comments.

dfdx commented 7 years ago

Argh, seems like reviews are broken on GH :( This is definitely a good change, but I'm worried that try-catch will also silently capture other types of exceptions not related to empty iterators. So it would be nice to rethrow all such (not-related) exceptions in the catch clause.

On Mon, May 8, 2017 at 1:02 PM Avik Sengupta notifications@github.com wrote:

No, can't see any inline comments.

— You are receiving this because you commented.

Reply to this email directly, view it on GitHub https://github.com/dfdx/Spark.jl/pull/30#issuecomment-299825778, or mute the thread https://github.com/notifications/unsubscribe-auth/AAaIiy0ysqe73YaJwDn4ZwAb4YFIPVwuks5r3ugfgaJpZM4NSCiz .

aviks commented 7 years ago

That is certainly a valid concern, and i've been thinking about this, but cant see a reasonable answer. I had to do this to more forward, but I'll see how to improve this.

dfdx commented 7 years ago

I believe we can start with something like:

try
    return [reduce(f, it)]
catch ex
    if isa(ex, MethodError) && ex.f == zero
        return []
   else
        throw(ex)
    end
end

reduce attempts to create a zero value of underlying eltype, but since eltype is unknown (i.e. Any), calling zero(Any) results in a MethodError that we can catch.

An alternative way is to check if the iterator is empty:

if done(it, start(it))
    return []
else 
    return reduce(f, it)
end

But I need to verify that start is idempotent for network iterators, i.e. that we can call it for testing without actually reading anything from the buffer.

Anyway, I'm going to merge this to let you go further, but keep the discussion open until we figure out the best way to fix it.

aviks commented 7 years ago

The check for the empty iterator does not work, since on the executors, the iterator is a Task, and calls to done for these are not idempotent. Code like this is what I'd tried initially -- it didn't work unfortunately.

The check for the exception is more likely to work. But the error on zero is thrown for a limited set of functions. Otherwise, reduce throws a different error : https://github.com/JuliaLang/julia/blob/25f241ccf84f200be5cadc3af35b6c9eeef9504f/base/reduce.jl#L240-L252

Checking for a handful of specific exceptions seems like a fragile option. But I agree the current situation is also not ideal. I'll keep thinking.

aviks commented 7 years ago

OK, so ... the problem here is that all the iterator on which reduce gets run is a Task, and it is not safe to call done(..) on a task more than once.

In 0.6, the Channel iterator has been fixed to allow idempotent done() calls. Separately, iterations over Tasks has been deprecated in 0.6. As a result, we will have to move to using Channels in 0.6 in any case, at which point we can solve this problem properly, without the try... catch.

dfdx commented 7 years ago

Sounds promising! If I haven't forgotten how to use channels, the change should be as simple as:

function load_stream(io::IO)
    ch = Channel{Vector{UInt8}}(1024)    
    code, _next = readobj(io)
    @async while code != END_OF_DATA_SECTION
        put!(ch, _next)
        code, _next = readobj(io)
    end
    return ch
end

Item added to my "Migrate to Julia-0.6" list :)