Closed EvanSchiewe-Entegra closed 4 years ago
Thank you very much for the detailed information. Give me a chance to review the information and I'll get back to you.
Hi @EvanSchiewe-Entegra,
So, after some study, I think the issue arises because the CancellationToken
is used twice. See below:
async void RunChanges(CancellationToken ct)
{
Console.WriteLine("RunChanges: called");
Cursor<RethinkDb.Driver.Model.Change<JObject>> changes = null;
try
{
changes = await R.Db("rethinkdb").Table("jobs")
.Changes().OptArg("include_initial", "true")
.RunChangesAsync<JObject>(conn, ct); <<<-- ONCE HERE
while (await changes.MoveNextAsync(ct)) <<<-- TWICE HERE
{
Console.WriteLine("RunChanges: got a change");
}
}
catch(OperationCanceledException)
{
Console.WriteLine("RunChanges: op canceled");
}
finally
{
Console.WriteLine("RunChanges: finally");
changes?.Close();
Console.WriteLine("RunChanges: changes cursor closed");
}
Console.WriteLine("RunChanges: returning");
}
.RunChangesAsync<JObject>(conn, ct);
while (await changes.MoveNextAsync(ct))
As far as I can tell, here's what happens:
The Main()
thread (TID:M
) starts up.
A connection to the server is established.
TID:B
) is created and blocks on the receive side of the network socket.TID:M
calls RunChanges
.
TID:M
creates and sends the query over the network:
R.Db("rethinkdb").Table("jobs")
.Changes().OptArg("include_initial", "true")
.RunChangesAsync<JObject>(conn, ct);
Just before TID:M
sends the query, an Awaiter
(TaskCompletionSource) is created Awaiter:HashCode:52638671
to process the response, but also registers ct
(this is where the problem might be, more on this later) :https://github.com/bchavez/RethinkDb.Driver/blob/044749812e51b918c09724cbe54a7d99e281cfc1/Source/RethinkDb.Driver/Utils/CancellableTask.cs#L19
There's nothing left to do, so TID:M
bottoms out and TID:M
returns from RunChanges()
then TID:M
awaits Task.Delay(500)
.
TID:M
is returned to the thread pool.TID:B
detects network data has arrived. A server response is waiting to be read.
TID:B
reads the JSON protocol string, a Response
object is created.
TID:B
identifies the Awaiter
(TaskCompletionSource) that is currently awaiting the response.
TID:B
creates a task TID:1
to further process the raw Response
object.
https://github.com/bchavez/RethinkDb.Driver/blob/044749812e51b918c09724cbe54a7d99e281cfc1/Source/RethinkDb.Driver/Net/SocketWrapper.cs#L230-L243
TID:1
calls TrySetResult(response)
and resolves the awaiting task. Awaiter:HashCode:52638671
task is now complete with a resonse object.
TID:1
picks up where TID:M
left off at the first await
continuation here: https://github.com/bchavez/RethinkDb.Driver/blob/044749812e51b918c09724cbe54a7d99e281cfc1/Source/RethinkDb.Driver/Net/Connection.cs#L269-L273
TID:1
creates a new Cursor<T>
.
new Cursor<T>()
a Query.Continue
continuation is sent over the network. This continuation also registers a new Awaiter
to process the response from the cursor continuation.TID:1
returns the new Cusror<T>
fully constructed.
TID:1
resumes the next await continuation here:
changes = await R.Db("rethinkdb").Table("jobs")
.Changes().OptArg("include_initial", "true")
.RunChangesAsync<JObject>(conn, ct);
// TID:1 resumes here with a newly constucted Cursor<T> in 'changes'
while (await changes.MoveNextAsync(ct))
{
Console.WriteLine("RunChanges: got a change");
}
TID:1
loops over MoveNextAsync(ct)
the first-time. Data already exists from the initial response and doesn't await the underlying cursor's in-flight continuation. Immediately returns data. "RunChanges: got a change"
is printed.
TID:1
loops over MoveNextAsync(ct)
a second time. However, no data is available; so TID:1
creates a CancellableTask
using ct
and awaits any event that occurs (either when data arrives from the in-flight continuation or when the cancellation token is canceled): https://github.com/bchavez/RethinkDb.Driver/blob/044749812e51b918c09724cbe54a7d99e281cfc1/Source/RethinkDb.Driver/Net/Cursor.cs#L160-L168
TID:1
is returned to the thread pool because there's nothing left to do.await Task.Delay(500)
wakes up, the thread resuming execution after the await is now TID:2
.
TID:2
prints "Cancelling task"
.
TID:2
invokes cts.Cancel();
. The .Cancel()
call causes cancellationToken.Register
registrations to execute. Remember, back when we registered a cleanup from the first initial .RunChangesAsync<JObject>(conn, ct);
query here: https://github.com/bchavez/RethinkDb.Driver/blob/044749812e51b918c09724cbe54a7d99e281cfc1/Source/RethinkDb.Driver/Utils/CancellableTask.cs#L19
This causes TID:2
to invoke Awaiter:HashCode:52638671:OnCancellation()
to execute, however, this Awaiter:HashCode:52638671
task is already completed from initial .TrySetResult(response)
; and invoking .SetCancelled()
on an already completed task from .RunChangesAsync<JObject>(conn, ct);
won't fly: https://github.com/bchavez/RethinkDb.Driver/blob/044749812e51b918c09724cbe54a7d99e281cfc1/Source/RethinkDb.Driver/Utils/CancellableTask.cs#L22-L25
and I think this is the reason for the exception:
System.InvalidOperationException: An attempt was made to transition a task to a final state when it had already completed.
System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: An attempt was made to transition a task to a final state when it had already completed.
at System.Threading.Tasks.TaskCompletionSource`1.SetCanceled()
at RethinkDb.Driver.Utils.CancellableTask.OnCancellation() in C:\Code\Projects\Public\RethinkDb.Driver\Source\RethinkDb.Driver\Utils\CancellableTask.cs:line 26
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Threading.CancellationCallbackInfo.ExecuteCallback()
at System.Threading.CancellationTokenSource.ExecuteCallbackHandlers(Boolean throwOnFirstException)
--- End of inner exception stack trace ---
at System.Threading.CancellationTokenSource.ExecuteCallbackHandlers(Boolean throwOnFirstException)
at System.Threading.CancellationTokenSource.NotifyCancellation(Boolean throwOnFirstException)
at System.Threading.CancellationTokenSource.Cancel()
at RethinkDb.Driver.Tests.GitHubIssues.Issue146.<Test>d__1.MoveNext() in C:\Code\Projects\Public\RethinkDb.Driver\Source\RethinkDb.Driver.Tests\GitHubIssues\Issue146.cs:line 34
---> (Inner Exception #0) System.InvalidOperationException: An attempt was made to transition a task to a final state when it had already completed.
at System.Threading.Tasks.TaskCompletionSource`1.SetCanceled()
at RethinkDb.Driver.Utils.CancellableTask.OnCancellation() in C:\Code\Projects\Public\RethinkDb.Driver\Source\RethinkDb.Driver\Utils\CancellableTask.cs:line 26
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Threading.CancellationCallbackInfo.ExecuteCallback()
at System.Threading.CancellationTokenSource.ExecuteCallbackHandlers(Boolean throwOnFirstException)<---
Suffice to say, if you just use ct
once, the problem should go away. The code below removes the ct
parameter from .RunChangesAsync(conn)
:
changes = await R.Db("rethinkdb").Table("jobs")
.Changes().OptArg("include_initial", "true")
.RunChangesAsync<JObject>(conn); <<<-- REMOVE ct HERE
while (await changes.MoveNextAsync(ct)) <<<-- KEEP ct HERE
{
Console.WriteLine("RunChanges: got a change");
}
I'm not sure what the best practice should be here, but my gut feeling is you should be able to pass ct
around without being signaled in as many places as you need. We'll probably have to figure out how to clean up those already completed tasks in Awatier
s and figure out how to unregister cancelToken.Register(OnCancellation
registrations once .TrySetResult(response)
has been called.
If you have any feedback, suggestions or ideas, please let me know your thoughts.
Thanks, Brian
Thanks for the detailed response! That is a very easy to follow analysis.
my gut feeling is you should be able to pass ct around without being signaled in as many places as you need
I do agree with you here. It seems like Microsoft does too, at least based on their code samples.
I can't say how often this use case is truly necessary in Rethink client code. In my test case, it doesn't make a big difference if I have to wait for the relatively short RunChangesAsync
to finish before the indefinitely looping changefeed receives the cancellation. I don't think this issue will cause any pain now that I know how to avoid it.
I'm afraid I won't be of much help actually coming up with a clean solution for this, but please do let me know if there's anything I can do to help.
Thanks again for your time!
No problem. Thank you for reporting the issue. After some time sleeping on the problem, I think I have a decent solution that will fix the issue.
I should have a new release later today. However, I just want to take a little more time today to look at Microsoft's CoreFX sources dotnet/corefx and see if I can find better disposable registration CancellationToken
patterns. I skimmed through a few books yesterday and they weren't that much help.
Hi @EvanSchiewe-Entegra,
A new version of RethinkDb.Driver v2.3.150
is now released that contains the fix.
Let me know if it works for you.
Thanks, Brian
Looks great to me!
Thanks, Evan
Version Information
What is the expected behavior?
I expect to be able to pass a CancellationToken to any async method, cancel it, handle the OperationCanceledException, and continue without issues.
What is the actual behavior?
Instead, I am seeing an internal exception from
RethinkDb.Driver.Utils.CancellableTask.OnCancellation()
bubble up where it is trying to callSetCanceled()
after it has already been canceled.Any possible solutions?
Use
TrySetCanceled
instead ofSetCanceled
inCancellableTask.OnCancellation()
would stop the exception from being thrown, but I'm not sure why it's being thrown in the first place.How do you reproduce the issue?
This should be a fairly simple console app that reproduces the issue I'm experiencing. If you see anything strange or wrong that I'm doing please let me know!
Once the token is cancelled,
RunChanges
catches theOperationCanceledException
and exits normally. However, I end up catching an exception fromRethinkDb.Driver.Utils.CancelleableTask
that I don't believe should be raised under normal usage.Can you identify the location in the driver source code where the problem exists?
Unclear - see above
If the bug is confirmed, would you be willing to submit a PR?
Yes