JuliaLang / julia

The Julia Programming Language
https://julialang.org/
MIT License
44.97k stars 5.42k forks source link

@async tasks executes simultaneously with parent task if launched with @spawn #41324

Closed ancapdev closed 3 years ago

ancapdev commented 3 years ago

Commented on this being a potential issue in #40715, and have I believe verified it in 1.7.0-beta2.

Reproducible:

function test()
    println("start")
    root_task = Threads.@spawn begin
        tasks = []
        active = Ref(Threads.threadid())
        for i in 1:10000
            active[] = -1
            yield()
            push!(
                tasks,
                @async begin
                    for j in 1:1000
                        yield()
                        active_thread = $active[]
                        if active_thread != -1
                            @show active_thread, Threads.threadid()
                            break
                        end
                    end
                    nothing
                end
            )
            active[] = Threads.threadid()
        end
        active[] = -1
        wait.(tasks)
    end
    wait(root_task)
    println("finished")
end

Since this is a race issue output isn't deterministic, however running this on 2 threads, I get 1.6.0:

start
finished

1.7.0-beta2:

start
(active_thread, Threads.threadid()) = (1, 2)
(active_thread, Threads.threadid()) = (2, 1)
(active_thread, Threads.threadid()) = (2, 1)
(active_thread, Threads.threadid()) = (1, 2)
(active_thread, Threads.threadid()) = (1, 2)
(active_thread, Threads.threadid()) = (1, 2)
(active_thread, Threads.threadid()) = (1, 2)
(active_thread, Threads.threadid()) = (1, 2)
(active_thread, Threads.threadid()) = (1, 2)
(active_thread, Threads.threadid()) = (2, 1)
(active_thread, Threads.threadid()) = (1, 2)
(active_thread, Threads.threadid()) = (1, 2)
(active_thread, Threads.threadid()) = (1, 2)
(active_thread, Threads.threadid()) = (1, 2)
(active_thread, Threads.threadid()) = (1, 2)
(active_thread, Threads.threadid()) = (1, 2)
(active_thread, Threads.threadid()) = (2, 1)
(active_thread, Threads.threadid()) = (1, 2)
(active_thread, Threads.threadid()) = (1, 2)
(active_thread, Threads.threadid()) = (1, 2)
(active_thread, Threads.threadid()) = (1, 2)
(active_thread, Threads.threadid()) = (1, 2)
(active_thread, Threads.threadid()) = (2, 1)
(active_thread, Threads.threadid()) = (1, 2)
(active_thread, Threads.threadid()) = (1, 2)
(active_thread, Threads.threadid()) = (2, 1)
(active_thread, Threads.threadid()) = (2, 1)
(active_thread, Threads.threadid()) = (1, 2)
(active_thread, Threads.threadid()) = (1, 2)
finished

Maybe I've messed something up in my test here, it's not trivial to check these scheduler behaviors, but I think this confirms what I was concerned about in this comment: https://github.com/JuliaLang/julia/pull/40715#issuecomment-864941120

My core concern is that there are two kinds of scheduler stickiness you might want, either to stick on the system thread, or to stick on the thread as your parent task is (at any moment) scheduled on. At present these are the same, however with task migration they will no longer be. The way I see it, you want to pin your task to system threads when you need access to thread local state or you're interacting with libraries that need to run on particular threads. For the more general Julia async use cases however, what you want is to guarantee non-simultaneous execution of tasks in some group (e.g., parent and sibling async tasks).

It would be nice to consider this requirement prior to making the scheduler more flexible, and potentially also cleaning up the task scheduling API. I may be wrong, but I believe the current Threads.@spawn and @async APIs are separate mostly for legacy reasons, and a cleaner more future proof approach may be to have a single scheduling API with a scheduling policy argument. Policies could include affinity (e.g., free, current-thread, parent-thread, initial-thread), and later would easily accommodate priority levels, NUMA based affinities, or other features that may be interesting as the scope and functionality grows.

Different @async tasks also execute simultaneously with one another (as they may have been launched on different threads), however I took out the detection of that to simplify the code.

vchuravy commented 3 years ago

For the more general Julia async use cases however, what you want is to guarantee non-simultaneous execution of tasks in some group (e.g., parent and sibling async tasks).

To my understanding this was never part of the contract. @async states that it will execute on the thread it has been launched on -- e.g. it is pinned -- but it says nothing about the execution of the parent task. If both of them are @async then by the virtue of both being pinned you are guaranteed to not see any parallel/simultaneous execution, but of course you can see any concurrent interleaving.

What is your use-case for forcing parent/child co-scheduling? In my opinion it is sufficient to state that both must be @async or there won't be any such scheduling guarantees.

What we are actually missing right now is a Threads.@spawn pin=true e.g. execute this code anywhere but pin it on the thread that picked it up.

vchuravy commented 3 years ago

Think about this a minute longer I think I understand your issue. Up until 1.7 @async implied a cooperative schedule with it's parent. Irrespective of the fact if that parent is @spawn or @async. So a user could have written code like:

function fib(x)
         x <= 1 && return x
         p = @async fib(x-1)
        fib(x-2) + fetch(p)
end

and expect @async fib(10) and @spawn fib(10) to be scheduled equivalently. The writer of code in fib is only aware of their own usage of @async and thus could expect the contract that it cooperatively scheduled with its parent.

ancapdev commented 3 years ago

I have a few use cases in my research flow, but one that comes high to mind is running trading simulations. I can run in parallel over different days, parameters, or multiple stochastic evolutions of the simulation, however within each of said instances I have async setup code. This setup code may be doing IO or for reasons of consistency with a realtime system run as async tasks. I was very much looking forward to seeing task migration, as I can often get suboptimal scheduling currently, but I really need a way to bind tasks together in a cooperative group.

I think there's opportunity to improve the API and clarity of the semantics around scheduling, and doing so now when these change are coming in would seem a good time. For everyone's safety and benefit the old APIs could maintain the old behavior, and we can move forward with a new and more expressive API that people can opt-in to get the benefits of a more flexible scheduler without being caught off guard with behavior changes.

vchuravy commented 3 years ago

So one very unsatisfying "fix" would be to have @async include a:

current_task().sticky = true
vtjnash commented 3 years ago

After verbal discussion in #multithreading (per the usual call schedule), we said that fix seems good, but that it belongs in schedule(), after we set the tid, if the tid was previously unset. That should get it working well for v1.7, and then we can think about designing some sort of actual task-group mechanism later

ancapdev commented 3 years ago

After verbal discussion in #multithreading (per the usual call schedule), we said that fix seems good, but that it belongs in schedule(), after we set the tid, if the tid was previously unset. That should get it working well for v1.7, and then we can think about designing some sort of actual task-group mechanism later

That's the slack channel, is it? This fi makes it no worse than before, but makes task migration useless for most of my use cases. Where is the best place to discuss this going forward? Are there other users with more heavy / complex task usage?

vchuravy commented 3 years ago

Part of the problem is that we are in the feature freeze for 1.7 and the options boiled down to: revert #40715 or do #41334.

We also consider the option that we could make @async tasks increment current_task().sticky, and then decrement it once they are finished, so that a task could be migrated once again, but there is no good location for the decrement.

That's the slack channel, is it?

Yes and a regular call on multithreading topics.

This fi makes it no worse than before, but makes task migration useless for most of my use cases.

It is rather unfortunate that a single @async can poison a parent task and prevent it from being migrated. Going forward we were thinking a Taskgroup construct that expresses the kind of co-scheduling you are thinking about would be useful.

Are there other users with more heavy / complex task usage?

Most use-cases I have seen a either fully @spawn or @async. Of course this issue arises at the composition of the both. I would like to see us migrate away from @async to @spawn + TaskGroup.

ancapdev commented 3 years ago

I'll be interested to see where that design discussion goes, so I'll join slack and try to stay in the loop.

Would it be correct to assume if I manually flip the sticky flag the task may later migrate? I have some control over this, in fact I don't use the standard @async as I have my own event loop and a few context variables to propagate and other state to manage. After flipping that flag, does it need to be seen by the current thread and moved into a global queue, or is there some way to force this? Apologies for all the questions, I'll get around to reading through the scheduling code.

vtjnash commented 3 years ago

If a task flips its own flag, then it will immediately be eligible for migration the next time it yields

ancapdev commented 3 years ago

However if an async child task flips the parent bit, then what's required to have it migrate? Presumably it's sitting in a thread local queue at this point, since it was sticky.

tkf commented 3 years ago

We also consider the option that we could make @async tasks increment current_task().sticky, and then decrement it once they are finished, so that a task could be migrated once again, but there is no good location for the decrement.

This sounds like a good idea to me. Did you guys consider decrementing at the end of child task? i.e. @async f() becomes

let p = current_task()
    p.sticky += 1
    @async try
        f()
    finally
        p.sticky -= 1
    end
end

?

vchuravy commented 3 years ago

The issue is that this must hold true for Tasks that are created without @async. Maybe doing it for @async would be a worthwhile optimization

ancapdev commented 3 years ago

If Task.sticky is made into a counter, might it more sense to expose it through an API like requiresticky(t) and releasesticky(t). This would permit the implementation to move tasks to/from thread local queues and make the more quickly available for scheduling, enforce thread safe access, check for underflow, etc. You could go a step further and return a token from the acquisition API to be provided on release for some extra guarding, and even release automatically when the token is finalized.

tkf commented 3 years ago

Tasks that are created without @async

How about doing this inside schedule?

if t.sticky > 0 || Threads.nthreads() == 1
    if tid == 0
        original_code = t.code
        parent_task = current_task()
        parent_task.sticky += 1
        t.code = function wrapper_code()
            try
                original_code()
            finally
                parent_task.sticky -= 1
            end
        end

        tid = Threads.threadid()
        ccall(:jl_set_task_tid, Cvoid, (Any, Cint), t, tid-1)
    end
    push!(Workqueues[tid], t)
else
    ...

If Task.sticky is made into a counter, might it more sense to expose it through an API like requiresticky(t) and releasesticky(t).

I think it'd be a good idea to minimize the API around stickiness. @async is already concurrent, can introduce races, and so many programs have been working "by accident" (in a sense). It also imposes extra complexity on the scheduler and makes it hard for julia to provide better parallelism. I think a better way forward is to make it easy to write safe concurrent programs (e.g., by developing more tooling) and slowly remove @async everywhere.

ancapdev commented 3 years ago

Agreed on minimizing API, although at present sticky is exposed as a field that can be peeked and poked, which isn't ideal when you want to attach behavior to it. If you can't attach behavior to the sticky flag then any scheduler changes are lazy, which provides little help if you're stuck with a bunch of tasks piled up on a thread whilst the rest of your system is idle -- this presently happens quite easily (e.g,. if you spawn a bunch of tasks that immediately take a lock, yield, and lets the scheduler pick up another task, forever binding it to the thread). More generally, even if you want to strive for a simple user facing API, I'd prefer a tiered layer of interfaces that serve as extension points for more complex use cases or API experimentation.