Closed oschulz closed 1 year ago
Sounds great - if that would "just work" in a transparent manner, I think we'd have something very powerful on our hands. Originally, I thought this would be only a high-level thing at the task and remote-call level - without type-stability, since fast code would always be behind some kind of dispatch, and information like "available workers" would obviously never even reach time-critical code.
But if this could become something that can reach through to deep level of code, it would obviously enable completely different kind of applications as well, as a fully-fledged resource-injection framework. Propagation of things like "default AD-engine", etc., spring to mind. And we'd have a clean way to propagate loggers & friends.
We'll have to be careful to keep people from abusing it, though (replace kwargs by context too often, etc.). :-)
I'd also give up some generality for efficiency...
I agree that's a good engineering in many cases. But I suspect it'd let us implement something like "nano-coroutine". It'd also help implementing something like GeneratorsX.jl without a big gun like IRTools.jl. So all these possibilities are very tempting to me :)
@tkf has mentioned the HAMT and I think he had a persistent variant in mind?
Yes, I was thinking about the persistent/immutable data structure.
For remote tasks we'd need to copy the context vars and serialize them, sending them to the remote side as part of invoking the remote function. With strict nesting we might get some useful optimizations here in limiting the amount of state we serialize and send?
Unfortunately, it's very easy to invoke with_context(x => a_1TB_array, y => open(temp_file), z => pointer(...)) do ... end
. I don't think automatic propagation to remote workers is reasonable. I don't think strictly nested scope or persitent data structure would help with it.
In #35833, I tried as much as possible to make the API and implementation Distributed.jl-friendly so that some kind of opt-in/manual propagation is possible. The strategy I had in mind was to get the current snapshot of the backend dictionary by snapshot_context
, send it to remote (maybe after filtering out unsafe contents), and then start the remote execution inside with_context(f, snapshot)
. In a way, snapshot_context
acts like Base.@locals
. I suppose this is a compiler-friendly construct and we can keep it? So far we discussed "in-place mutation" and it seems to be a safe API to have as long as we don't mutate the context variable storage.
I don't think automatic propagation to remote workers is reasonable. I don't think strictly nested scope or persitent data structure would help with it.
But that pretty much precludes the use case behind my original proposal: A clean way to propagate information about available resources (like workers and logging) to child-tasks and across remote calls. Do you thing we should go for using two separate mechanisms, here?
But I suspect it'd let us implement something like "nano-coroutine". It'd also help implementing something like GeneratorsX.jl without a big gun like IRTools.jl. So all these possibilities are very tempting to me :)
I watched the nano-coroutine talk a while back. I agree it's very compelling but didn't realize we might address it this way. Sounds interesting for sure.
We'll have to be careful to keep people from abusing it, though
Yes dynamic scope is easy to abuse and can lead to confusing code. We discussed this a bit following the Go post mentioned in https://github.com/JuliaLang/julia/pull/35833#issuecomment-633754489.
Unfortunately, it's very easy to invoke
with_context(x => a_1TB_array, y => open(temp_file), z => pointer(...)) do ... end
. I don't think automatic propagation to remote workers is reasonable.
I wondered whether we could allow it by sending all keys and poisoning keys containing "unreasonable" state at the remote end (eg, non-serializable state, or things which are huge). By "poisoning" I mean sending the key but making loading the value an error. But that does seem pretty arbitrary and kind of awful :grimacing: Maybe instead it's a per-contextvar Bool which indicates whether it's intended to be distributed? This might make sense because the library declaring the context var has a chance to know it's not going to put large or non-serializable data inside.
I don't think strictly nested scope or persitent data structure would help with it.
Yes it doesn't help with those problems.
@oschulz I'm sorry if I gave you a misleading impression that I'm implementing exactly what was proposed in the OP. I've been trying to abstract out the requirements and trying to design the system that is useful for a wide range of usecase. We need to get the design correct and maximally extensible before actually implementing whatever possible in Distributed.jl. Please remember there are other important usecases as listed in the OP of #35833.
Having said that, I think it'd be reasonable to have (say) Shared
wrapper type s.t. inside with_context(x => Shared(value), y => a_1TB_array) do ... end
, value
would be automatically propagated to the remote calls while a_1TB_array
stays local. This should be implementable purely inside Distributed.jl and without altering anything in the core implemented in #35833.
Maybe instead it's a per-contextvar Bool which indicates whether it's intended to be distributed?
@c42f I don't think it's possible to do this with the UUID-key based implementation like #35833 because the task-local dictionary itself doesn't know about the contextvar properties. That's why I think moving this information to the value side (as in Shared(value)
suggested above) would be easier to implement.
Please remember there are other important usecases as listed in the OP
Sure, of course! I was just wondering it we should split this into two different issues, or if all use cases can be covered by a common mechanism. I think that would be possible (and preferable) if we do propagate across task and remote-call boundaries.
We could always check if the information is "too big" to be forwarded via remote call.
I think that would be possible (and preferable) if we do propagate across task and remote-call boundaries.
It's not possible and not preferable because:
x = Vector{Vector{Int}}
, is there some code expecting x[1]
to be mutated in-place in a specific way (i.e., this context is propagating the identity)? Or, is the program correct if I do x[1] = copy(x[1])
at random points (i.e., this context is propagating the value)?finalizer
).I have proposed various solutions to this problem. I think the discussion would be more productive if you explain why they don't work.
I have proposed various solutions to this problem. I think the discussion would be more productive if you explain why they don't work.
Uh, maybe we misunderstood each other: I'm with you for rejecting things that can't be forwarded, requiring big objects to be wrapped in a Shared
wrapper or so, etc.
Maybe this was a misunderstanding on my side, I had the impression that you didn't want non-explicit propagation to tasks and remotes at all anymore, because you wrote "I don't think automatic propagation to remote workers is reasonable". And I was wondering how many use case would still work if context had to be propagated explicitly - since the code that distributes work to tasks and remotes (say, Transducers :-) ) will often not know about the semantics of the whole context.
I would assume that in the future, we'll have more and more automatic/transparent multi-threaded and also multi-process code execution. So the code that "declares" the context, and the code that "consumes" the context will often not be aware of the task/remote-call barrier in between, and may not share stack. But the code that does the parallelization (say, a multi-threaded broadcast implementation) will not know/care about what's in the context - except for the parts of the context that control parallelization.
So that's why I think context must, in principle, always be automatically forwarded to spawned tasks and to remote calls. But of course we can reject/filter certain types of content, resp. require them to be wrapped appropriately - context should, in my opinion, not be abused as a data store for substantial amounts of data, and that should be discouraged.
It is not possible to know if a context variable holding a heap-allocated data structure is meant to capture the identity or the value
I guess a clean way around that is to only allow context to refer to immutable values (we do have an immutable array type somewhere, don't we?). If we do that, copies can be made as necessary, transparently, without affecting semantics.
I would assume that in the future, we'll have more and more automatic/transparent multi-threaded and also multi-process code execution. So the code that "declares" the context, and the code that "consumes" the context will often not be aware of the task/remote-call barrier in between, and may not share stack. But the code that does the parallelization (say, a multi-threaded broadcast implementation) will not know/care about what's in the context - except for the parts of the context that control parallelization.
Yes this is exactly the reason that some portion of the context needs to be propagated automatically
Distributed.@spawn
exposes (distributed) parallelismIn general (2) is not the end user's top level application code and there's a good chance that (1) might also not be. So they definitely need to be decoupled.
I think this is why @tkf was suggesting the Shared
wrapper. I don't like the idea of manually wrapping and unwrapping, but I suppose the context var get/set interface itself could do that.
I guess in most (sane) cases, the entries of the context would be fairly small and immutable structures anyhow. I guess we can be fairly rigorous and filter everything out that can't be propagated automatically. Maybe we should actually reject everything we don't "like" during context creation/assignment already, to avoid surprises to the user later on?
@oschulz Thanks for the clarification. Indeed, it looks like we have a different notion of "automatic", "explicit", etc. To clarify, I've been using "explicit" to mean that the user does something beyond the standard context variable declaration. This is maybe some kind of declarative API to tell Distributed.jl to propagate certain context variables for every remote-call (opt-in). Or, maybe just using a low-level API to reset the remote context (manual). If what you mean by "automatic" is what I mean by "opt-in", yes, we are actually on the same page. Perhaps I should have mentioned "unconditional propagation is not reasonable" instead of "automatic propagation is not reasonable."
Concretely, I think it is reasonable to have API something like
@contextvar PROCESS_LOCAL_CONTEXT = 1 # not forwarded to remote
@shared_contextvar GLOBAL_CONTEXT = 1 # automatically forwarded to remote
(I think it kind of makes sense to call them @contextobject
and @contextvalue
.)
I guess we can be fairly rigorous and filter everything out that can't be propagated automatically.
This is the point I'm still strongly against as I explained in the last comment https://github.com/JuliaLang/julia/issues/35757#issuecomment-656461947. We should make API explicit and easy to understand and manipulate. Something implicit will cause trouble.
but I suppose the context var get/set interface itself could do that.
@c42f Yeah, I think that's possible. It can be an option to @contextvar
or another macro like @shared_contextvar
. We can totally hide that there is a wrapper like Shared
. Or, there can be two dictionaries as backed. No one other than Distributed.jl and Base would notice this.
"unconditional propagation is not reasonable" instead of "automatic propagation is not reasonable."
I fully agree. I think it's perfectly fine to expect the user to declare a context variable that is to be propagated automatically in a certain way, and to restrict it to certain types of content. It's certainly good to let the user control over what should be restricted to the current process, and what should propagate beyond.
The "local" @contextvar
still be propagated to tasks, though, right? I guess in the future, tasks will be used to much under the hood that the user will often really not even be aware of it. And it's all the same shared memory, so propagation of (almost arbitrary but immutable) content wouldn't be a problem?
Nice to know that we are on the same page!
The "local"
@contextvar
I commented this in the other issue https://github.com/JuliaLang/julia/pull/35833#issuecomment-657019038
It can be an option to
@contextvar
or another macro like@shared_contextvar
. We can totally hide that there is a wrapper likeShared
. Or, there can be two dictionaries as backed. No one other than Distributed.jl and Base would notice this.
Nice, I think this is the way to go.
My inclination is to have options to @contextvar
rather than having @shared_contextvar
, as I feel like there could be other fine grained properties which define how to propagate or otherwise manage/define context variables in the future. (For example, what about GPUs? What about context vars which need code to run when new tasks spawn? Generally I feel like we should minimize the differences between in-process vs out-of-process tasks where possible — this binary distinction isn't the only way to categorize available compute.)
this binary distinction isn't the only way to categorize available compute
Right, it makes sense.
What about context vars which need code to run when new tasks spawn?
I'm glad that you are shooting for this! It'd make it possible to implement something like parallel RNG completely in the user space.
eg hashing and/or traversal in the Dict/HAMT/whatever storage backend.
BTW, regarding the lookup overhead, we can store hash(uuid)
in ContextVar
object so that it's computed only once. Traversal is still a bottleneck, though. Anyway, it's a micro-optimization we can try at some point.
Speaking of parallel RNG, it'd require some thing like
@contextvar RNG_STATE = RNGState(...)
where RNGState
is mutable, right? In general, "no-set!
" direction can simply be defeated by
@contextvar IMMUTABLE_POINTING_TO_MUTABLE = Ref(thing)
How does it interact with the optimization you have in mind? Is it that it's important to make the default get-only?
where RNGState is mutable [...] How does it interact with the optimization you have in mind?
I think the point here is that context should behave like implicit arguments to child functions from the point of view of the compiler. Then the user has a choice to make context immutable or not as necessary, and the compiler can reason about the values of these variables locally in the same way as normal function arguments.
For context like Ref{Int}()
, the compiler still gets benefits in knowing that the address of the Int
cannot change, even though it may need to emit a load of the Int
after running any child functions.
For context like
Ref{Int}()
, the compiler still gets benefits in knowing that the address of theInt
cannot change, even though it may need to emit a load of theInt
after running any child functions.
Thanks for the explanation. It makes sense.
What about context vars which need code to run when new tasks spawn?
I'm glad that you are shooting for this! It'd make it possible to implement something like parallel RNG completely in the user space.
Yes exactly! I feel like whatever we come up with here should be able to support both https://github.com/JuliaLang/julia/pull/34852 and logger context in user space with excellent efficiency. If not, we won't really have solved two of the key use cases.
Regarding mutable context, it basically has to be cloned (at the vary least) in the parent task prior to @spawn
otherwise we'll generate data races everywhere. This is a worry because it forces some code to run in @spawn
even if the functions called in the new task never use that state.
Right, Ref(thing)
example does not really make sense.
I think it's also important to mutate the state in the current context upon @spawn
. Otherwise, after
t1 = @spawn ...
t2 = @spawn ...
we have identical RNG state in t1
and t2
.
I'm glad that you are shooting for this! It'd make it possible to implement something like parallel RNG completely in the user space.
RNGs have actually been on my mind as a potentially very important use case for contexts. While we often forward RNG via an explicit parameter (and should), sometimes (e.g. in for a likelihood function that happens to need an RNG internally, but just takes the model parameters as it's input) it would be great to pass it on via context.
For parallel applications, I usually use a counter-based RNG, so that I can use a common seed and partition the random space in a hierarchical fashion - that does require semantic knowledge, but should be easy to do using the proposed with_context
.
@c42f I think the point here is that context should behave like implicit arguments to child functions from the point of view of the compiler.
I think that's the ideal way to define it, semantically.
EDIT: Oh, except i've just realized that this thread is about communication across distributed tasks, not necessarily about communication across multithreaded tasks? Or is it covering both?
We've wanted something like this for a while too! :) Thanks for opening the issue and discussing it! 👍
Since nothing like this exists right now, we've been toying with the (dirty) idea of (ab)using the logger to get a context that passes to child tasks, given that we currently do pass the logger through to child tasks.
For example, we were considering a thread-aware tracing framework that does something like this, even though it's clearly terrible:
using Logging
struct TraceLogger <: AbstractLogger
span_name::String
parent_logger::AbstractLogger
end
Logging.min_enabled_level(tl::TraceLogger) = Logging.min_enabled_level(tl.parent_logger)
Logging.shouldlog(tl::TraceLogger,args...) = Logging.shouldlog(tl.parent_logger, args...)
Logging.handle_message(tl::TraceLogger,args...) = Logging.handle_message(tl.parent_logger, args...)
ancestor_trace(::Any) = ""
ancestor_trace(tl::TraceLogger) = "$(tl.span_name), $(ancestor_trace(tl.parent_logger))"
function with_span(f, name)
with_logger(TraceLogger(name, Logging.current_logger())) do
info = @timed f()
@info "Finished $(ancestor_trace(Logging.current_logger())): $(info.time)"
return info.value
end
end
julia> with_span("a") do
@sync begin
@async begin
with_span("b") do
@info "HI"
end
end
@async begin
with_span("C") do
@info "BYE"
end
end
end
end
[ Info: HI
[ Info: BYE
[ Info: Finished b, a, : 0.052175791
[ Info: Finished C, a, : 0.000226425
[ Info: Finished a, : 0.101278193
Task (done) @0x00000001136d58d0
:)
Anyway, yeah, sign us up as another interested party!
EDIT: Oh, except i've just realized that this thread is about communication across distributed tasks, not necessarily about communication across multithreaded tasks? Or is it covering both?
Definitely both, also across local tasks. And since my original proposal, @tkf, @c42f and @JeffBezanson have taken this idea even further than I had envisioned originally - something that could possibly also be used in time-critical code. I think this could potentially become an extremely powerful mechanism.
In #50958 I made the semi-intentional decision not to address the remote-call part of this proposal. But a distributed framework could choose to propagate a subset of scoped variables across a remote-call interface.
I have started a small prototype for snapshotting in https://github.com/vchuravy/ScopedValues.jl/pull/6, but I won't be including this into the Base proposal for now. The implementation thereof can live in a package.
(we might also be able to implement a "RemoteScope" on-top of local scopes, but I haven't thought to hard about this).
Would be great to have "remote-enabled" scoped eventually, I think (if possible), to avoid hard-to-predict behavior in cases where remote operation is transparent to the user.
The issue is that copying a scope is a very heavy operation. It's something I decidedly wouldn't want to do on every rpc.
Now a framework should be able to define that it wants to propagate scope with its rpc and snapshot only relevant pieces. Right now for me the design uncertainty with Distributed.jl is large. It may be that we want something like a RemoteSopedValue akin to a RemoteRef, but the cost must be local and not global across the program.
I certainly wouldn't want to send CuContext or CuDevice automatically across the wire.
The issue is that copying a scope is a very heavy operation.
Hm, yes, there is that ... hard to control how much people will put in there.
Seems covered by https://github.com/JuliaLang/julia/pull/50958 for the main case above of @async
and Threads.@spawn
@tkf and me have been discussing ways to propagate information about available workers (or resources in general) in distributed hierarchical computations:
https://discourse.julialang.org/t/propagation-of-available-assigned-worker-ids-in-hierarchical-computations
Adding resource arguments to every function call would be impractical, and using Cassette & friends to add add context by rewriting the whole computation would be very heavy-handed, since it might tough large code stacks (and may also be not be a complete solution when remote calls are involved?).
Could we add something like a
context
field toTask
, in addition to thestorage
field - with the difference thatcontext
is automatically propagated to spawned tasks and remote calls? Adding the possibility to propagate a context through a computation in this fashion could also be useful for other use cases too, I think.