Open staticfloat opened 4 years ago
Maybe something like
function wait_timeout(c, timeout)
ch = Channel(2)
@async begin
res = wait(c)
push!(ch, Some(res))
end
@async begin
sleep(timeout)
push!(ch, nothing)
end
res = take!(ch)
return res
end
That way if the return value is nothing
, you know the operation timed-out.
The two implementations suggested here seem to leak tasks. It might be possible to avoid this by using Timer
since it's close
-able.
In general, it'd be nice if we can take concurrency API seriously #33248 and avoid unstructured concurrency piling up. Timeout is one of the biggest success story of structured concurrency, after all: Timeouts and cancellation for humans — njs blog
The two implementations suggested here seem to leak tasks.
What does that mean?
It means that tasks spawn by the function are still running after the function call ends. This violates the black box rule; i.e., all the relevant computations end at the end of the function (see the Julep and https://github.com/JuliaLang/julia/issues/33248#issuecomment-531139728). This is bad because the errors occur in the leaked tasks are silently ignored and there is no backpressure for flooding the task scheduler.
Can't that be solved by wrapping the entire function in a @sync
block?
Don't we need to cancel the other task for that? That's why the black box rule and task cancellation are the two must-have properties of the structured concurrency.
(But it might be possible to solve this particular issue by just using Timer
without implementing full-blown structured concurrency.)
Ah, you're right of course. I got confused and thought we were notifying c
automatically here, but we're not.
@JeffBezanson is flooding the task scheduler with tasks all sitting and waiting on things an issue for the runtime? For instance, if I am timing out a lot, I could imagine adding a new task to the scheduler every couple of seconds, and for a long-running instance (like a StorageServer) this could add up to thousands of tasks.
notifying
c
Yeah, notifying c
acts as the cancellation in one direction. We can close
the Timer
for cancellation in the other direction.
Even without closing the timer, the timer will eventually elapse, and the task will go away. It may throw an error, but that's fine, because we're not catching them. ;)
Cleaner to close the timer, for sure.
What happens if you wait
on the same thing twice and the first call didn't timeout? If you don't clean up the timer in the first one, isn't there a chance that the first timeout calls notify
during the second wait
?
(Edit: edited twice; first I thought it was stupid but then realized that it might happen for re-wait
able objects like Channel
)
Concretely:
c :: Channel
wait_timeout(c, 0.1) === nothing && return
x = take!(c)
wait_timeout(c, 0.1) === nothing && return # the timer from the first `wait_timeout` can fire here
y = take!(c)
The timer within the first wait_timeout()
is signaling on a separate Channel
from the one in the second wait_timeout()
? I don't see how the second wait_timeout()
can be effected by the first.
It's taking two things from the same channel c
. I thought your plan was to notify
one of the cond_
objects in the channel (maybe c.cond_wait
)?
Oh, I understand now. Your concern is that if wait_timeout(c)
does a notify(c.cond_wait)
internally when its timeout has elapsed (in order to stop the task from its wait()
) it will falsely notify someone else who is also waiting on it. That's a good concern.
Perhaps instead we can instead throwto()
in order to interrupt the other Task?
using Test
function wait_timeout(c, timeout)
ch = Channel{Bool}(2)
happy_task = @async begin
wait(c)
push!(ch, true)
end
timeout_task = @async begin
sleep(timeout)
push!(ch, false)
# Stop the other task from waiting anymore
Base.throwto(happy_task, InterruptException())
end
return take!(ch)
end
# First, test no wait, wait
@testset "no wait, wait" begin
c = Channel(10)
put!(c, 1)
@test wait_timeout(c, 1.0) == true
@test take!(c) == 1
@test wait_timeout(c, 1.0) == false
end
# Next, test wait, no wait
@testset "wait, no wait" begin
c = Channel(10)
@test wait_timeout(c, 1.0) == false
put!(c, 1)
@test wait_timeout(c, 1.0) == true
@test take!(c) == 1
end
# Next, test no wait, no wait, after sleeping for more than timeout:
@testset "no wait, no wait" begin
c = Channel(10)
put!(c, 1)
@test wait_timeout(c, 1.0) == true
@test take!(c) == 1
sleep(2.0)
put!(c, 2)
@test wait_timeout(c, 1.0) == true
@test take!(c) == 2
end
# Finally, test wait, wait
@testset "wait, wait" begin
c = Channel(10)
@test wait_timeout(c, 1.0) == false
@test wait_timeout(c, 1.0) == false
end
I guess the question is whether we are absolutely positive that happy_task
won't leave some shared global state (either c
or ch
, I guess?) in some corrupted state if that exception is thrown? I don't know how throwto
is implemented, but it looks somewhat evil and risky to me :)
Is it so bad to continue to just wait in the happy task? The downside are that the task and the channel live longer than strictly necessary, I guess?
IIUC it's unsafe to call schedule(task, err, error=true)
and throwto(task, err)
with the task
that is already scheduled. That's how I understand the comments from @vtjnash in https://discourse.julialang.org/t/stop-terminate-a-sub-task-started-with-async/32193/6
Reading the code briefly, my impression is that these functions assume the ownership of the thread-local queue. So, I guess it's unsafe to run them outside the scheduler code. But wait
can use some internal assumptions so maybe there is a way out?
OK, so here is what I meant by Timer
-based implementation (now I changed the return value to Bool
):
function wait_timeout(c::Channel, timeout::Real)
cancel = Atomic{Bool}(false)
isready(c) && return true
@sync begin
timer = Timer(timeout)
try
@async begin
try
wait(timer)
catch
return
end
cancel[] = true
lock(c.cond_wait) do
notify(c.cond_wait)
end
end
lock(c)
try
while !isready(c)
check_channel_state(c)
wait(c.cond_wait)
cancel[] && return false
end
finally
unlock(c)
end
finally
close(timer)
end
end
return true
end
I don't think it leaks a task (or more precisely a leaked task would be cleaned up soon enough; it'd be nice if @sync
is exception-safe so that we don't have this quirk).
It passes @staticfloat's tests (thanks for sharing those, BTW!).
Alright, this all inspired me to read up on the various links that @tkf posted.
All the structured concurrency stuff is interesting, but in terms of a short term solution that can be added incrementally to packages, I liked .Net's cancellation framework best, by far. We also need something like that for the language server rather sooner than later, and that is the same framework that the VS Code team is using across VS Code. Sooo, I created https://github.com/davidanthoff/CancellationTokens.jl :) For now it is not thread safe, so can only be used with single threaded tasks, but the .Net original is thread safe and the source MIT licensed, so we just need to copy their design over. But the public API of the package should be more or less done. Feedback and help welcome!
read up on the various links that @tkf posted.
Mission accomplished! :laughing:
So my initial response was "why not just use Atomic{Bool}
" but I guess you have the state machine to ensure that the cancel
call ends when all the "receiver" sides acknowledged the request?
There is also support for waiting on a cancel token, which I don't think would work with a pure atomic, right? And the .Net implementation supports callback handlers, which I haven't added, but I think also need more than just an atomic.
Ah, I see. That makes sense.
My implementation wait_timeout(c::Channel, timeout::Real)
above https://github.com/JuliaLang/julia/issues/36217#issuecomment-643719590 looks like easily generalizable. In particular, I think we can implement waitfirst(waitables...)
quite easily.
I wonder if it makes sense to define internal interface something like
wait′(waitable, cancel) -> success::Bool
_notify(waitable)
where cancel
is a Ref{Bool}
-like object that defines thread-safe cancel[]
. For normal wait
, we can use
struct NoCancel end
Base.getindex(::NoCancel) = false
It's pretty straightforward to implement this for Channel
and Task
:
using Base.Threads: Atomic
using Base: check_channel_state
function _wait(t::Task, cancel)
if !istaskdone(t)
lock(t.donenotify)
try
while !istaskdone(t)
wait(t.donenotify)
cancel[] && return false # added this line
end
finally
unlock(t.donenotify)
end
end
return true # returning `true` instead
end
function _notify(t::Task)
lock(t.donenotify) do
notify(t.donenotify)
end
end
function wait′(t::Task, cancel = NoCancel())
t === current_task() && error("deadlock detected: cannot wait on current task")
ok = _wait(t, cancel)
if istaskfailed(t)
throw(TaskFailedException(t))
end
return ok
end
function _notify(c::Channel)
lock(c.cond_wait) do
notify(c.cond_wait)
end
end
function wait′(c::Channel, cancel = NoCancel())
isready(c) && return true # returning `true` instead
lock(c)
try
while !isready(c)
check_channel_state(c)
wait(c.cond_wait)
cancel[] && return false # added this line
end
finally
unlock(c)
end
return true # returning `true` instead
end
With this implementation, we can now define
function waitfirst(waitables...)
winner = Ref{Any}()
cancel = Atomic{Bool}(false)
@sync for w in waitables
@async begin
if wait′($w, $cancel)
$cancel[] = true
foreach(_notify, $waitables)
$winner[] = $w
end
end
end
return winner[]
end
I think it's possible to define wait′
and _notify
for Timer
. Once it's implemented, it's quite easy to derive wait_timeout
from waitfirst
:
function wait_timeout(w, timeout::Real)
t = Timer()
try
return waitfirst(w, t) === w
finally
close(T)
end
end
I don't think a function like one of these should be implemented by notifying the waited-for object. Others might be waiting for the same objects, and they would get spurious wakeups.
What are the other implementation strategies?
Maybe we can define waitfirst(waitables...)
if we have waitfirst(::Condition, ::Condition)
as a primitive? I'm guessing that the cancellable wait can then use "shared" (e.g., c.cond_wait
for Channel
) and "unshared" Condition
s. If we notify
the unshared condition for cancellation, we can avoid the spurious wakeups?
That seems rather complex, like you're trying to put a round peg (old multiplexing event systems) into a square hole (our Task-based ownership design). Our objects are intended to essentially already have both capabilities through the existing Task system, which is why (unlike Go and C#) we have been trying not to additionally bolt things like CancellationTokens and waitmultiple
on afterwards (note that the implementation of CancellationTokens appears to itself mostly just be a thin wrapper around Timer and Future! 🙂). Here's how I intended the implementation of this to work:
function wait_until(c, timeout::Real) # `c` is any object that is both wait-able and cancel-able (e.g. any IO or a Channel, etc.)
timer = Timer(timeout) do t
isready(c) || close(c)
end
try
return wait(c)
finally
close(timer)
end
end
Short, sweet, simple, no minor resource leaks—and essentially how the timeouts are implemented for the various resources in the FileWatching module.
So essentially you close
something that you can wait
on to cancel it?
I don't see, though, how that would make something like a cancellation token framework entirely unnecessary, given that you can only really close a thing, not an operation?
Because you can also only wait
on a thing, and not an operation. The expectation thus is that the only reason you want to be able to cancel something is that there's already a resource attached. Otherwise, why are you trying to waste compute cycles on a pure operation with no output?
Don't you need close(::Task)
for wait_until
to work in full generality? For example, wait_until(c::Channel, timeout)
shouldn't close c
, right? (Discussion above: https://github.com/JuliaLang/julia/issues/36217#issuecomment-643685832). I can imagine wait_until
would work if I do
function wait_until(c::Channel, timeout::Real)
w = @async wait(c)
timer = Timer(timeout) do t
isready(c) || close(w)
end
try
return wait(w)
finally
close(timer)
end
end
I'd be very happy if we have close(::Task)
:slightly_smiling_face:
Tasks are not resources—they are owners—ergo they should not have close
(cf. past discussion at #6283)
OK, I guess discussion for it can happen elsewhere.
Anyway, I'm still interested in your implementation strategy for wait_until(c::Channel, timeout)
that does not close c
.
I'd suggest that there's never a need to interrupt a wait to perform work in Julia, since the Task system already does that. Thus the only reason to interrupt work seems to be to terminate a resource, thereby causing these concepts to combine seamlessly!
I agree close
-ing resource is a very powerful construct. That's why I'm suggesting to use close(::Timer)
from the beginning of this thread.
Anyway, what about
your implementation strategy for
wait_until(c::Channel, timeout)
that does not closec
?
Intentionally not directly possible
So no other way out than waitfirst(::Condition, ::Condition)
?
This is obviously a very low-level primitive operation. But something like this might be required so that people can explore building more modern concurrency systems on top of it.
I agree close
on a resource is a good way to handle this when applicable, but I don't find it 100% satisfying --- what if some clients are willing to wait longer than others for the same event? I.e. a timeout seems to be more a property of a specific request, such that one request for the same resource might time out when another doesn't. You could get around that by wrapping the resource in a proxy object of some kind though, I suppose?
Yes, the idea also is that this also discourages race-y code designs like the one you describe, with multiple tasks operating on the same resources simultaneously.
What about when I don't want to close()
the resource? E.g. what if I have a protocol over UDP that requires me to request a retransmit if a packet goes missing? I need a wait to express "wait for 10 seconds, and if nothing happens, perform some internal logic and then request another packet". I don't want to close the socket because then I might miss a future packet coming in, and I don't want to leave the recv()
hanging on there forever, because that means I need to duplicate my receiver handling logic (for successful handling in one place, and unsuccessful handling in another).
That design sounds awfully full of race conditions, which is why I think that ought to be difficult in Julia. It's currently intended to force you to think about how to only have one (active) recv
call site, while yours sounds like you'd need the same logic in triplicate (every recv
call site always needs to handle all the cases from all recv
call sites—there's no way to avoid that). It also misses the simple elegance of Julia's actual design. Instead, we're intentionally forcing you to model your send and receive state machines separately:
waiting = true # this is our finite-state-machine
on_recv(msg) = (waiting = false) # trivial state update
t = Timer(0, 10) do t
waiting && on_send(socket)
end
while waiting
on_recv(recv(socket))
end
close(t) # recv successful (future `on_recv` should ignore any duplicates)
close(socket) # don't care about possible in-flight packets anymore, discard them
(initial version of this used @sync
and @async
for some parts of this, but that's not needed here since the FSM is so simple)
I disagree that forcing the user to split receive success and receive failure behavior across two separate tasks is elegant.
It's currently intended to force you to think about how to only have one (active)
recv
call site,
That is precisely what I'm arguing for; that a single recv()
can signal both receive success and receive failure. To put my original question into code, it would look something like:
val = recv(sock; timeout=10.0)
if val === nothing
retransmit(sock)
# Either return nothing, raise error, reiterate recv call, or something else
end
(I'm also open to recv()
throwing a TimeoutError
or something, if we want to try
/catch
here instead)
(every
recv
call site always needs to handle all the cases from allrecv
call sites—there's no way to avoid that)
That's exactly what I want; for the receive handling logic to be in once place not split across different places. I believe you're saying that the fundamental pattern should be:
was_successful = false
@async begin
sleep(timeout)
if !was_successful
handle_error()
end
close(sock)
end
recv(sock)
was_successful = true
This design makes debugging more difficult, errors do not propagate in a linear manner (because some will be hidden in tasks that we do not wait()
upon) and requires that I close()
my socket for any issue that induces a timeout.
I do not think it is reasonable to require that users must close()
then re-connect()
or re-bind()
their sockets if something times out. In many applications, (such as low-latency audio transport) it would be typical for a recv()
to timeout many times a second, as you want to react very quickly to a packet missing a deadline.
State machines are hard to write, use, and debug. Otherwise, we don't have things like for
loop and coroutine in many languages. I don't think forcing users to write state machines make user code simple.
That code isn't the pattern that I posted for your use case: using close(sock)
after might be the end state after many failures, and works well for typical cases since it's a terminal case for the FSM (so you don't need an external channel to synchronize it). Those two code examples look like they'd end up having many interleaved state machines going on (this is UDP, so remember that events can happen in any order with any number of repetitions)—which would then need additional external mechanisms to synchronize. That's why the code I gave has one Task handling all on_recv
message and one handling on_send
messages together in a pair, until success, then moves onto the next action (in my example, just a boring close
call, but could be a new pair of recv/send operations).
State machines are hard to write, use, and debug.
I'm going to need a reference for that. I tried google, but the titles of the results I got back for "why is a state machine bad" seemed to follow the general pattern "why I thought they were {bad | hard to write | maintain | complicated | use | debug | antiquated}, but came to realize they led to far better code"
Let's step away from the example of retransmitting on failure and think of the more general case; I want to recv()
from a socket, or more generally, wait()
upon a resource, get notified by a timeout and end the recv()
/wait()
, and not destroy the resource (via close()
) in the event of a timeout. I assert that that basic functionality is fundamental to writing robust distributed processes. If we cannot express that in Julia, I argue that our event model is fundamentally broken. As weak evidence, I put forth the fact that every major distributed API integrates timeouts into its methods, including the C socket API, the python threading/multiprocessing API and basically any other API you want to look at.
That seems like poor evidence, given that we use a platform abstraction layer (libuv) to improve upon the timeout capabilities from those APIs. As we talked about offline, there are good design patterns from only using the few primitives we have already.
State machines are hard to write, use, and debug.
I'm going to need a reference for that.
These are two examples of "structured programming" approach vs state machine:
yield
in many languages (e.g., Python) vs Julia's iterate
async
/await
vs callback hellAs a more specific example in concurrent programming, Deadlocks in non-hierarchical CSP - Roman Elizarov - Medium is a nice post explaining how tricky communicating finite state machines can be. There are a few links to relevant discussions on select
for building state machines in https://github.com/python-trio/trio/issues/242#issuecomment-353760723.
State machines are hard to write, use, and debug.
I'm going to need a reference for that.
Arguably, the entire point of both closures and coroutines is to express what is ultimately just a state machine in a more convenient, less error prone way. After going to that much language design trouble to not force users to explicitly write out state machines, it seems like a bit of weak sauce to say that here it's suddenly just fine.
Thanks for the links. The conclusion in the trio link particularly seems be exactly the same as mine. That first post (Medium) includes a CFSM—it's not an attempt to avoid them, only to demonstrate problems with getting them wrong. And the second one (trio) shows ways to simplify most FSM into linear form (by breaking them up into their independent parts), but hypothesizes that they can't be simplified if any cleanup, timeout, or retry actions are present aside from close
. Both links talk about how using timeouts or select to base your program is a recipe for headaches, while using coroutines (as we do) to build your FSM is less error prone as it forces you to consider which parts are independent and isolate them.
I also appreciated this conclusion to the trio posts:
Well, no. The correct behaviour is such case [of reaching a limit such as time] is shutting the client down [by calling
close
] and possibly logging the event.
I had also asked about this on Discourse, and I agree that there should be a way to timeout on a wait/read. I would love to be able to use julia to run my instruments, but this has proven impossible without this feature.
In my particular case, I want to work with a SCPI instrument over TCP. This involves sending text commands and reading back the response. If I make a typo in the command, I would want the read to time out so that I can try again.
I find the given solution of needing to close the socket (to abort the read) and recreate everything over again to be pretty poor.
As an example (this could be interactive or not, but I might not be able to send an interrupt):
tcp = connect(host,port)
println(tcp, ":MEASURE:VOLT?")
voltage = readline(tcp) # all good here
println(tcp, ":MEASURE:VILT?") # oh no!
voltage2 = readline(tcp) # blocks forever
Note that I tried the following:
function read_timeout(tcp::TCPSocket, timeout=10)
# read asynchronously
ch = Channel{String}(c -> put!(c,readline(tcp)), 0)
# set timeout timer
tm = Timer(t -> isready(ch) || close(ch), timeout)
# wait until value is read or timeout
out = nothing
try
out = take!(ch)
close(ch)
catch
@info "Command timed out!"
finally
close(tm)
end
return out
end
But it stops working when a timeout does happen, I can't seem to read from the socket anymore, even though it is still open.
For what it's worth, inspired by PR #39027, I was able to write a function to read from a TCPSocket
with timeout, which doesn't require killing and re-establishing the whole connection. It seems to work well in the bit of testing I have done.
function read_timeout(tcp::TCPSocket, timeout=5)
# create timeout timer
status_before = tcp.status
tm = Timer(timeout) do t
lock(tcp.cond)
tcp.status = Base.StatusEOF
notify(tcp.cond)
unlock(tcp.cond)
end
# read synchronously
out = readline(tcp)
# check timer
if isopen(tm) # readline worked
close(tm)
return out
else # got EOF because of timer
tcp.status = status_before
@info "Read timed out!"
return nothing
end
end
You should not mutate tcp.status
on the way out–that is dangerous. Note that if you want unreliable connections, building those on top of reliable connections is rather awkward. It is more likely that you want UDP.
It's not that the connection is unreliable, it's that sometimes there is nothing to read and I want to get control back (being able to write again) without having to close the socket. If there is a better way to do this (having a timeout read), I would very much appreciate knowing how.
It appears that we do not support timeouts in
wait()
. To motivate the need for this, it's a pretty critical basic functionality for writing resilient servers; blocking forever due to a dropped packet is unacceptable, so servers must instead be written in an explicitly polling style, which is bad for latency and for CPU usage.Implementation-wise,
wait(c)
waits upon a condition for anotify()
to push it back onto the scheduler's work queue. It seems to me that timeouts can be implemented by starting a second task thatsleep()
's for a certain amount of time, then unconditionallynotify()
's that condition. The consumer of thewait()
call would then need to disambiguate a timeout from an actual event.Taking a look at the different things that can be
wait()
'ed upon, most are implemented in terms ofCondition
objects, so a very simple@async (sleep(timeout); notify(c))
should work, and the_FDWatcher
fromFileWatching
also notifies a condition in the end, therefore I believe a uniform API is possible here.