Open bill-poole opened 2 years ago
I spent some time looking into this. The proposed solution to use CancellationToken.Register
is also flawed, because that API does not support async callbacks, and GrainCancellationTokenSource.Cancel
returns a Task
.
My vote to solve this problem goes to GrainCancellationTokenSource.CreateLinkedTokenSource(CancellationToken)
It follows the already established patterns, and still maintains grain specific types. I do not like the implicit conversion idea.
For now, we are working around this problem by creating a task from the incoming CancellationToken
and combining that with the grain method call using Task.WhenAny
.
Then, when we see the cancellation task finished first, we call GrainCancellationTokenSource.Cancel
.
This is what the helper class looks like:
public static class CancellableGrainMethodInvoker
{
/// <summary>
/// The retry policy to propagate cancellation to the grain
/// </summary>
private static readonly Lazy<AsyncRetryPolicy> CancellationRetryPolicy =
new Lazy<AsyncRetryPolicy>(() => Policy.Handle<Exception>().RetryAsync(3));
/// <summary>
/// Invokes a method on a grain (<see cref="global::Orleans.IGrain"/>) with support for cancellation
/// Because Orleans uses a special type called <see cref="global::Orleans.GrainCancellationToken"/>, we cannot simply use a <see cref="CancellationToken"/>.
/// Grain cancellation tokens are special because they can send a cancellation signal to the (possibly remote).
/// This helper method accepts an existing cancellation token and links it to a new grain cancellation token which you can then use to invoke a grain method.
/// </summary>
/// <param name="grain">The grain that will be invoked</param>
/// <param name="grainMethod">The grain method that will be invoked</param>
/// <param name="cancellationToken">The cancellation token that should cancel the grain method if cancelled</param>
/// <typeparam name="TGrain">The type of grain</typeparam>
/// <typeparam name="TResult">The type of result</typeparam>
/// <returns>The result of the grain method</returns>
/// <exception cref="OperationCanceledException">When the cancellation token was triggered</exception>
public static Task<TResult> InvokeAsync<TGrain, TResult>(
TGrain grain,
Func<TGrain, GrainCancellationToken, Task<TResult>> grainMethod,
CancellationToken cancellationToken) where TGrain : IGrain =>
InvokeAsync(
grain,
grainMethod,
static (grain, grainMethod, grainCancellationToken) => grainMethod(grain, grainCancellationToken),
cancellationToken);
/// <summary>
/// Because Orleans uses a special type called <see cref="global::Orleans.GrainCancellationToken"/>, we cannot simply use a <see cref="CancellationToken"/>.
/// Grain cancellation tokens are special because they can send a cancellation signal to the (possibly remote) grain.
/// This helper method accepts an existing cancellation token and links it to a new grain cancellation token which you can then use to invoke a grain method.
/// </summary>
/// <param name="grain">The grain that will be invoked</param>
/// <param name="grainMethod">The grain method that will be invoked</param>
/// <param name="cancellationToken">The cancellation token that should cancel the grain method if cancelled</param>
/// <typeparam name="TGrain">The type of grain</typeparam>
/// <exception cref="OperationCanceledException">When the cancellation token was triggered</exception>
public static Task InvokeAsync<TGrain>(
TGrain grain,
Func<TGrain, GrainCancellationToken, Task> grainMethod,
CancellationToken cancellationToken)
where TGrain: IGrain =>
InvokeAsync(
grain,
grainMethod,
static (grain, grainMethod, grainCancellationToken) => grainMethod(grain, grainCancellationToken),
cancellationToken);
/// <summary>
/// Invokes a method on a grain (<see cref="global::Orleans.IGrain"/>) with support for cancellation
/// Because Orleans uses a special type called <see cref="global::Orleans.GrainCancellationToken"/>, we cannot simply use a <see cref="CancellationToken"/>.
/// Grain cancellation tokens are special because they can send a cancellation signal to the (possibly remote).
/// This helper method accepts an existing cancellation token and links it to a new grain cancellation token which you can then use to invoke a grain method.
/// </summary>
/// <param name="grain">The grain that will be invoked</param>
/// <param name="state">The state to pass in to avoid closures (this allows you to make your callback static)</param>
/// <param name="grainMethod">The grain method that will be invoked</param>
/// <param name="cancellationToken">The cancellation token that should cancel the grain method if cancelled</param>
/// <typeparam name="TGrain">The type of grain</typeparam>
/// <typeparam name="TState">The type of state</typeparam>
/// <typeparam name="TResult">The type of result</typeparam>
/// <returns>The result of the grain method</returns>
/// <exception cref="OperationCanceledException">When the cancellation token was triggered</exception>
public static async Task<TResult> InvokeAsync<TGrain, TState, TResult>(
TGrain grain,
TState state,
Func<TGrain, TState, GrainCancellationToken, Task<TResult>> grainMethod,
CancellationToken cancellationToken) where TGrain: IGrain
{
// Immediately throw an OperationCanceledException and do not even invoke the grain if the cancellation token is already cancelled
cancellationToken.ThrowIfCancellationRequested();
// Create a task that will throw an OperationCanceledException when the cancellationToken is canceled
await using var cancellationTokenTaskSource = new CancellationTokenTaskSource(cancellationToken);
var cancellationTask = cancellationTokenTaskSource.Task;
// Create a grain cancellation token to communicate a possible cancellation event to the grain
using var grainCancellationTokenSource = new GrainCancellationTokenSource();
var grainCancellationToken = grainCancellationTokenSource.Token;
var grainMethodTask = grainMethod.Invoke(grain, state, grainCancellationToken);
if (await Task.WhenAny(grainMethodTask, cancellationTask) == grainMethodTask)
{
// The grain method completed first, await it to propagate inner exceptions or cancellations
return await grainMethodTask;
}
// This will propagate the cancellation to the grain, which can be running on a different machine
// Since this is a possible over-the-wire operation, we use a retry policy to subdue transient errors
await CancellationRetryPolicy.Value.ExecuteAsync(() => grainCancellationTokenSource.Cancel());
// This will propagate the OperationCanceledException up the call stack
await cancellationTask;
throw new UnreachableException("At this point, the OperationCanceledException should have been thrown already");
}
/// <summary>
/// Because Orleans uses a special type called <see cref="global::Orleans.GrainCancellationToken"/>, we cannot simply use a <see cref="CancellationToken"/>.
/// Grain cancellation tokens are special because they can send a cancellation signal to the (possibly remote) grain.
/// This helper method accepts an existing cancellation token and links it to a new grain cancellation token which you can then use to invoke a grain method.
/// </summary>
/// <param name="grain">The grain that will be invoked</param>
/// <param name="state">The state to pass in to avoid closures (this allows you to make your callback static)</param>
/// <param name="grainMethod">The grain method that will be invoked</param>
/// <param name="cancellationToken">The cancellation token that should cancel the grain method if cancelled</param>
/// <typeparam name="TGrain">The type of grain</typeparam>
/// <typeparam name="TState">The type of state</typeparam>
/// <exception cref="OperationCanceledException">When the cancellation token was triggered</exception>
public static async Task InvokeAsync<TGrain, TState>(
TGrain grain,
TState state,
Func<TGrain, TState, GrainCancellationToken, Task> grainMethod,
CancellationToken cancellationToken)
where TGrain: IGrain
{
// Immediately throw an OperationCanceledException and do not even invoke the grain if the cancellation token is already cancelled
cancellationToken.ThrowIfCancellationRequested();
// Create a task that will throw an OperationCanceledException when the cancellationToken is canceled
await using var cancellationTokenTaskSource = new CancellationTokenTaskSource(cancellationToken);
var cancellationTask = cancellationTokenTaskSource.Task;
// Create a grain cancellation token to communicate a possible cancellation event to the grain
using var grainCancellationTokenSource = new GrainCancellationTokenSource();
var grainCancellationToken = grainCancellationTokenSource.Token;
var grainMethodTask = grainMethod.Invoke(grain, state, grainCancellationToken);
if (await Task.WhenAny(grainMethodTask, cancellationTask) == grainMethodTask)
{
// The grain method completed first, await it to propagate inner exceptions or cancellations
await grainMethodTask;
return;
}
// This will propagate the cancellation to the grain, which can be running on a different machine
// Since this is a possible over-the-wire operation, we use a retry policy to subdue transient errors
await CancellationRetryPolicy.Value.ExecuteAsync(() => grainCancellationTokenSource.Cancel());
// This will propagate the OperationCanceledException up the call stack
await cancellationTask;
}
}
And this is what CancellationTokenTaskSource
looks like (there's various ways to do this, but you need to be able to turn a CancellationToken into an awaitable task. The ugly part is that this Task doesn't always complete)
/// <summary>
/// Holds the task for a cancellation token, as well as the token registration. The registration is disposed when this instance is disposed.
/// </summary>
public sealed class CancellationTokenTaskSource : IDisposable, IAsyncDisposable
{
/// <summary>
/// The cancellation token registration, if any. This is <c>null</c> if the registration was not necessary.
/// </summary>
private readonly CancellationTokenRegistration? _cancellationTokenRegistration;
/// <summary>
/// Creates a task for the specified cancellation token, registering with the token if necessary.
/// </summary>
/// <param name="cancellationToken">The cancellation token to observe.</param>
public CancellationTokenTaskSource(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
Task = Task.FromCanceled(cancellationToken);
return;
}
var tcs = new TaskCompletionSource();
_cancellationTokenRegistration = cancellationToken.Register(
static (state, innerCancellationToken) => ((TaskCompletionSource?)state)!.SetCanceled(innerCancellationToken),
tcs
);
Task = tcs.Task;
}
/// <summary>
/// Gets the task for the source cancellation token.
/// </summary>
public Task Task { get; }
/// <summary>
/// Disposes the cancellation token registration, if any.
/// Note that this may cause <see cref="Task"/> to never complete.
/// </summary>
public void Dispose() => _cancellationTokenRegistration?.Dispose();
/// <summary>
/// Disposes the cancellation token registration, if any.
/// Note that this may cause <see cref="Task"/> to never complete.
/// </summary>
public async ValueTask DisposeAsync()
{
if (_cancellationTokenRegistration != null)
{
await _cancellationTokenRegistration.Value.DisposeAsync();
}
}
}
Finally, this is how the helper class can be used:
interface IMyGrain : IGrain
{
Task DoSomethingAsync(GrainCancellationToken grainCancellationToken);
}
class MyGrain : Grain, IMyGrain
{
public Task DoSomethingAsync(GrainCancellationToken grainCancellationToken)
{
// TODO
}
}
class HelloWorldController
{
private readonly IGrainFactory _grainFactory;
public HelloWorldController(IGrainFactory grainFactory)
{
_grainFactory = grainFactory;
}
public async Task<IActionResult> Index(CancellationToken cancellationToken)
{
var myGrain = _grainFactory.GetGrain<IMyGrain>(0);
await CancellableGrainMethodInvoker.InvokeAsync(
myGrain,
static (grain, grainCancellationToken) => grain.DoSomethingAsync(grainCancellationToken),
cancellationToken);
}
}
There overloads of CancellableGrainMethodInvoker.InvokeAsync are designed to avoid allocation, you can pass a state parameter which is then passed into the lambda so you don't need closures. This should allow you to make your lambdas static.
The proposed solution to use
CancellationToken.Register
is also flawed, because that API does not support async callbacks, andGrainCancellationTokenSource.Cancel
returns aTask
Good point. I had somehow missed that. CancellationTokenSource
has both sync and async cancellation operations, and the async operation is CancelAsync
, not Cancel
. But GrainCancellationTokenSource
only has an async cancellation operation, and it's named Cancel
, not CancelAsync
.
I guess then it should actually be:
using var cts = new GrainCancellationTokenSource();
using (cancellationToken.Register(
static cts => ((GrainCancellationTokenSource)cts!).Cancel().Wait(), cts))
{
await grain.Method(..., cts.Token);
}
This is unfortunately however "sync-over-async".
We don't generally expect the CancellationTokenSource.Cancel
method to fail. However, if any delegate registered by the CancellationToken.Register
method fails, then that exception is thrown by the CancellationTokenSource.Cancel
method. I likewise therefore assume that if any delegate registered by the GrainCancellationToken.Register
method fails, then that exception is thrown by the GrainCancellationTokenSource.Cancel
method; meaning, the Task
returned by the GrainCancellationTokenSource.Cancel
method will complete in a failed state.
The only way we can get that exception to propagate to where the CancellationTokenSource.Cancel
method is invoked is for the exception to propagate from the lambda passed to the CancellationToken.Register
method, which means the lambda has to wait for that operation to complete, which means "sync-over-async".
If I understand @amoerie's suggestion, the exception would instead propagate to where the grain method is invoked. If that is acceptable, then that could also be achieved as follows:
Task? cancellationTask = null;
using var cts = new GrainCancellationTokenSource();
using (cancellationToken.Register(
cts => cancellationTask = ((GrainCancellationTokenSource)cts!).Cancel(), cts))
{
await grain.Method(..., cts.Token);
if (cancellationTask is not null)
{
await cancellationTask;
}
}
The above approach does however require a heap allocation for the lambda passed to the Register
method to capture the cancellationTask
variable.
Another approach (based on @amoerie's suggestion) would be:
using var cts = new GrainCancellationTokenSource();
var task = grain.Method(..., cts.Token); // Invoke the grain method
try
{
// Wait for the grain method to complete or cancellationToken to be cancelled.
await task.WaitAsync(cancellationToken);
}
catch (OperationCanceledException ex) when (ex.CancellationToken == cancellationToken)
{
// cancellationToken was cancelled, so cancel the GrainCancellationToken and then await the
// grain method to propagate the exception.
await cts.Cancel();
await task;
}
I think this shows that a solution like a GrainCancellationTokenSource.CreateLinkedTokenSource(CancellationToken)
method is desperately needed.
This is unfortunately however "sync-over-async".
This is unacceptable for us.
We don't generally expect the CancellationTokenSource.Cancel method to fail.
Could it fail if the remote grain has since been deactivated, or gone offline unexpectedly due to network issues?
I likewise therefore assume that if any delegate registered by the GrainCancellationToken.Register method fails, then that exception is thrown by the GrainCancellationTokenSource.Cancel method
Are exceptions thrown on the remote grain propagated back to the client if you call GrainCancellationToken.Cancel?
If I understand @amoerie's suggestion, the exception would instead propagate to where the grain method is invoked. If that is acceptable
You understand correctly, and this is definitely acceptable for us. This behavior feels like idiomatic C#: an async method that is canceled throws an OperationCanceledException.
Another approach (based on @amoerie's suggestion) would be:
Ah I like how you used Task.Wait to combine the task and the cancellation token. This looks like a good equivalent to my helper class, yes.
Could it fail if the remote grain has since been deactivated, or gone offline unexpectedly due to network issues?
Maybe, I'm not sure. @ReubenBond? I certainly think that it shouldn't fail. i.e., I don't think any delegate registered with the CancellationToken.Register
method should ever throw an exception. A cancellation token should be able to be passed to any method at any time and the token cancelled at any time without any error occurring.
A method accepting a cancellation token may ignore the cancellation; but it should never cause the CancellationTokenSource.Cancel
method to throw an exception by registering a delegate with the CancellationToken.Register
method that then throws an exception when the delegate is invoked.
If a remote grain has been deactivated, then we expect any method invoked on that grain to fail; but we don't expect the GrainCancellationTokenSource.Cancel
method to fail if any corresponding GrainCancellationToken
has been passed to a method call on a grain that has been deactivated. i.e., if a method is invoked on a deactivated grain, I'd expect the GrainCancellationToken
passed to that method to be ignored.
A grain method can end prematurely due to either an error occurring (e.g., a network error) or the given GrainCancellationToken
being canceled. If an error occurs and then the cancellation token is cancelled, it shouldn't matter because the grain method will have terminated before the cancellation token was cancelled. If the error happens at the same time as a cancellation, then it's a race as to which is effected first. But either way, the delegate given to the GrainCancellationToken.Register
method by the Orleans runtime should not ever fail in my opinion, which means the GrainCancellationTokenSource.Cancel
method should never throw.
But, I don't know whether this is true of the Orleans runtime and GrainCancellationToken
s.
Are exceptions thrown on the remote grain propagated back to the client if you call GrainCancellationToken.Cancel?
I assume you mean if you call GrainCancellationTokenSource.Cancel
?
If an exception has been thrown by the remote grain before you call GrainCancellationTokenSource.Cancel
, then that exception is propagated to where the method was invoked locally. i.e., the grain method completes before GrainCancellationTokenSource.Cancel
is called.
If you invoke GrainCancellationTokenSource.Cancel
before the remote grain throws an exception, then the operation is cancelled before the exception is thrown. Of course, this actually just sends a cancellation message to the remote grain, and the grain could throw an exception before that cancellation message is received or recognized/respected by the in-progress grain method. So it's possible for the grain method to be cancelled and throw an exception.
I suspect in that case the local grain method invocation will be cancelled when the GrainCancellationToken
is cancelled and the exception thrown by the remote grain ignored (although it may be logged by the silo hosting the remote grain).
Perhaps the best solution for this issue would be for Orleans to permit a cancelable grain method to accept a CancellationToken
parameter rather than requiring a GrainCancellationToken
parameter and have the Orleans runtime wire up and handle the cancellation signal. i.e., allow a grain method to accept either a CancellationToken
parameter or a GrainCancellationToken
parameter.
That would arguably obviate the need for GrainCancellationTokenSource
/GrainCancellationToken
at all, making both obsolete and only needed for backwards compatibility (although still used internally by the Orleans runtime).
GrainCancellationToken
has different semantics from CancellationToken
since it's potentially sending a message over the wire and networks aren't reliable.
An implementation may want to repeatedly retry cancellations if there is a failure (for example, if it's a longer running or expensive operation that would really benefit from being cancelled) or it may want to send one retry, or perhaps just attempt to cancel it once and log (or ignore) any failure to send the cancellation.
One potential, simple implementation from the Discord, though a more resilient implementation with retires would also be an option if a use case merits that:
On the subject of
GrainCancellationToken
, I've got this I've been using for a while and seems fine but are there any obvious issues I should consider? I'm looking to just do a best effort one-time ping when cancellation is triggeredpublic static class GrainCancellationTokenExtensions { public static GrainCancellationToken SimpleGrainCancellationToken (this CancellationToken cancellationToken) { var gcts = new GrainCancellationTokenSource(); // Best effort notification of cancellation to down stream participants cancellationToken.Register(() => gcts.Cancel()); return gcts.Token; } }
GrainCancellationToken has different semantics from CancellationToken since it's potentially sending a message over the wire and networks aren't reliable. An implementation may want to repeatedly retry cancellations if there is a failure
Okay yes, I just noticed that is also mentioned in the docs.
But the fact remains that invoking an Orleans grain method from a non-Orleans service (e.g., from a gRPC or HTTP/REST service in ASP.NET) means we must wire up a GrainCancellationTokenSource
to a CancellationToken
because cancellation of the gRPC or HTTP request is triggered from a CancellationTokenSource
created/owned/managed by the underlying framework (i.e., gRPC/ASP.NET).
i.e., we must wire up an asynchronous cancellation to a synchronous trigger, and that completely inescapable - i.e., outside our control.
We definitely should not be doing "sync-over-async" (as explained here), so that makes our options somewhat limited.
I also think it is also problematic that the GrainCancellationTokenSource.Cancel
method does not accept a cancellation token. All async operations that could take a long time to complete should be cancelable. I've added a new issue (#8634) proposing an overload be added that accepts a cancellation token.
I can see two problems with the proposed GrainCancellationTokenExtensions.SimpleGrainCancellationToken
extension method:
CancellationTokenRegistration
returned from the CancellationToken.Register
method is never disposed (although I'm not sure how problematic that really is).Task
returned from GrainCancellationTokenSource.Cancel
is not observed, which means if it fails, it will trigger an unobserved task exception.I think the correct way to do "fire and forget" cancellations would be:
using var cts = new GrainCancellationTokenSource();
using (context.CancellationToken.Register(
static cts => Task.Run(async () =>
{
try
{
await ((GrainCancellationTokenSource)cts!).Cancel());
}
catch
{
}
}), cts))
{
await grain.Method(..., cts.Token);
}
We need the delegate to swallow all exceptions so we don't get an unobserved task exception if/when the GrainCancellationTokenSource.Cancel
operation fails.
I don't think retrying GrainCancellationTokenSource.Cancel
on failure is the best approach, although Orleans doesn't give us any alternatives to retry cancellations. I assume that if two grain methods are invoked concurrently using GrainCancellationToken
s attached to the same GrainCancellationTokenSource
and the GrainCancellationTokenSource.Cancel
method is invoked and one cancellation fails, then the entire cancellation operation is abandoned and the GrainCancellationTokenSource.Cancel
method immediately throws an exception. If so, then retrying the GrainCancellationTokenSource.Cancel
method would cause a second cancellation message to be sent to both grains, even though only one of the two actually failed.
I think it would be better if each grain cancellation operation were to be independently retried if/when necessary. However, that would require an enhancement to Orleans.
Note that the only reason not to do "fire and forget" cancellations is if you want/need to retry failed cancellations, and I suspect that isn't a common requirement. I'd actually be happy to just immediately cancel all client-side grain methods and not bother sending cancellation messages at all, which would be possible with the enhancement I proposed in #8634. Note that this is how REST and gRPC remote calls are cancelled because the HTTP protocol has no way to cancel a request.
@bill-poole Just as an add-on to your answer, you can also simply use Task.Ignore()
to suppress unobserved exceptions.
Thanks @amoerie. I didn't realize Orleans provided an Ignore
extension method for Task
. So, we can now do "fire and forget" cancellation as:
using var cts = new GrainCancellationTokenSource();
using (cancellationToken.Register(
static cts => ((GrainCancellationTokenSource)cts!).Cancel()).Ignore(), cts))
{
await grain.Method(..., cts.Token);
}
It's good practice to make grain methods cancelable by passing a
GrainCancellationToken
to each grain method. However, if we are calling a grain method via an Orleans client, then we have aCancellationToken
, not aGrainCancellationToken
.Ideally, grain methods would just accept a
CancellationToken
rather than aGrainCancellationToken
, which would make grains more "POCO". However, in the absence of that, either or both of the following two additions would be helpful:GrainCancellationTokenSource.CreateLinkedTokenSource(CancellationToken)
method like what exists on theCancellationTokenSource
class. This would allow us to more easily create aGrainCancellationTokenSource
linked to an existingCancellationToken
).CancellationToken
to aGrainCancellationToken
so we can pass an existingCancellationToken
directly to a grain method accepting aGrainCancellationToken
.In the absence of either of the above, we are forced to write code like below.