Open gmkado opened 3 months ago
It's not AsyncRx.NET's goal to provide general-purpose asynchronous utilities, so I would only want to add features to AsyncGate
if they directly serve Rx-related purposes.
In fact, my first thought here is: should AsyncGate
be made internal
?
This is not an area of AsyncRx.Net that I've explored yet. (We inherited this code when we took over maintenance of Rx, and most of our focus so far has been elsewhere.) So I need to attempt to understand the thinking behind the fact that AsyncGate
exists.
As far as I can tell, it currently appears in just two public APIs: AsyncObservable.Synchronize
and AsyncObserver.Synchronize
. By providing overloads that accept an AsyncGate
, it becomes possible to create multiple synchronized observers and observables that are all mutually synchronized. System.Reactive
offers conceptually equivalent overloads:
public static class Observable
{
public static IObservable<TSource> Synchronize<TSource>(this IObservable<TSource> source, object gate)
}
public static class Observer
{
public static IObserver<T> Synchronize<T>(IObserver<T> observer, object gate)
}
Note that these take a plain object
as the 'gate'. They can do that because in .NET , any object can be used as a synchronization primitive. Rx.NET is simply using the C# lock
keyword on this object to implement the behaviour that Synchronize
promises.
So these Rx.NET methods offer two capabilities:
Synchronize
Synchronize
by writing application code that also calls lock
with the same gate objectAsyncRx.NET faces a challenge in implementing the same service: the synchronization capability available for any object (i.e. what we use through lock
) was designed long before async
and await
, so it just blocks the thread when you use lock
. That would conflict with the entire point of AsyncRx.NET (and of async
/await
in general) which is to enable non-blocking asynchronous work.
So to enable the two capabilities described above we can't use object
as the gate type.
What we would want to do is use the async equivalent of lock
. Unfortunately, no such thing really exists. And even the new System.Threading.Lock
that has appeared in recent .NET 9.0 previews doesn't add any async
support. This was discussed:
https://github.com/dotnet/runtime/issues/34812#issuecomment-1503911687
(The basic issue is that thread affinity is part of the design, which rules out async.)
There is a very long-standing request to add an AsyncLock
to the .NET runtime libraries (open for 5 and a half years as I write this). This suggests that we should not hold our breath waiting for .NET to grow such a feature...
So the original AsyncRx.NET implementors' decision to define their own AsyncGate
seems like the only option in practice.
That shuts down my initial thought: no, this type should not be internal
; it needs to be accessible to application code to support the use cases listed above. Since .NET offers no suitable type and is unlikely to any time soon, AsyncRx.NET would either need to take a dependency on some library that already does (but no well-supported library with a clearly defined future exists), or define its own. And it can't just be an opaque handle—that would work for 1 above, but not 2.
So, given that we need to provide AsyncGate
, and that it needs to be possible for application code to acquire it, cancellation is a reasonable feature request: even though it is very much not our goal to be providing synchronization primitives, we have no choice here, and so there's a case that the thing we provide should be usable in the ways people are likely to want.
However...
There's one notable feature of the discussion linked to above in the proposal for adding AsyncLock
to .NET: many people have attempted to add cancellation to Stephen Toub's original AsyncLock
, and they all seem to end up with buggy implementations. That makes me somewhat reluctant to tread this same treacherous path that has caught out many others in the past.
I'm wondering if we should in fact take a completely different approach. I'm wondering if we should define an IAsyncGate
interface that provides the minimal API that AsyncRx.NET requires (i.e., without async), and to make Synchronize
accept any implementation of that. Our AsyncGate
would continue to provide a minimal implementation, but if applications want to do 2 above with cancellation support, they are free to define their own implementations (and to run into the same bugs everyone else who attempts this seem to run into).
(AsyncRx.NET is still in preview, so we can make breaking changes like this.)
In fact I'd be inclined to make all of AsyncGate
's current public members either private or explicit interface implementations of this new IAsyncGate
, and to define a new AsyncGate.Create
. That way if .NET ever does implement its own async lock, we could move over to using that in place of our AsyncGate
.
So AsyncRx.NET would make no attempt to support cancellation, but it would become possible for application code to supply its own IAsyncGate
implementation that supports this or any other semantics the application wants.
This approach would remove the block that prevents you from doing what you want. It's not quite what you asked for, because we wouldn't be providing the solution we want, but we would a) make it possible for you to write such a solution, and b) leave the door open for using a future .NET-supplied async lock primitive if they ever add such a thing. Also, if you're already using a library that provide such a primitive (and I gather project Orleans does) you could use that. I prefer that to an API design that forces you to use our type.
So this would provide a way for you to do what you want. How does that sound?
@idg10 thanks for the thorough response, using an interface sounds like an appropriate fix for Rx. It's easy enough for me to copy over AsyncGate
and extend it to support cancellation.
I'll admit I'm using some (useful) exposed public classes outside of the scope of Rx. For example, I'm guessing I shouldn't rely on the implementation of RefCountAsyncDisposable
in this other discussion I posted
I'm guessing I shouldn't rely on the implementation of
RefCountAsyncDisposable
I apologise for not noticing that Q&A early. I thought GitHub would notify me when people posted in there, but I guess not!
You can totally rely on all our public IDisposable
implementations continuing to work exactly as they do today. But they are there to be what Rx needs them to be, and not to be the ultimate all-things-to-all-people IDisposable
implementation. We will not be removing them from the public API or making breaking changes. But feature requests will always be evaluated based on whether the feature is aligned to the goals of Rx.
I've prototyped an implementation of this IAsyncGate
concept. However, it turns out that to make that available as a NuGet package, I need to get some help from the .NET Foundation, because our code signing mechanisms are no longer working, and we can't publish NuGet packages (even preview ones) until that is fixed. Sorry!
I'll post again here once a usable preview is available.
(I am still trying to solve the code signing! I was off sick for a bit, and then on vacation for a while, and I now think it's possible that the person at the .NET Foundation whose help I need to get code signing reinstated may be on vacation... But I haven't forgotten this.)
OK, that took a lot longer than expected! (The .NET Foundation wanted to move people over to use of Managed Identities for code signing auth. This is a great idea, but since Rx.NET was the first project to attempt this, it took a while to work out how to get the config right.) Anyway, we are now able to sign packages again! 🎉
There is, finally, a preview build of System.Reactive.Async
with a prototype of this proposed feature up at https://dev.azure.com/dotnet/Rx.NET/_artifacts/feed/RxNet/NuGet/System.Reactive.Async/overview/6.0.0-alpha.44.g5ca1c61383
To use the preview package feed, you'll need to configure NuGet to use this endpoint:
https://pkgs.dev.azure.com/dotnet/Rx.NET/_packaging/RxNet/nuget/v3/index.json
This is how a reference to the specific preview that has this feature looks in a .csproj
:
<PackageReference Include="System.Reactive.Async" Version="6.0.0-alpha.44.g5ca1c61383" />
To test this out, I thought I'd see if I could write an adapter enabling us to use someone else's cancellable asynchronous lock with AsyncRx.NET's Synchronize
operator. I chose Stephen Cleary's AsyncEx library, which defines a Nito.AsyncEx.AsyncLock
type.
This is not an endorsement of that lock type. I have great admiration for Stephen Cleary's work, but I have done nothing to test this, or even to find out what state of maturity he considers this lock to be in. It was simply a readily usable async lock with cancellation support.
It has revealed a possible shortcoming with my design, which I'd be interested in your (or anyone else's) view on.
Here's my adapter:
using System.Reactive.Threading;
namespace TestRxAsyncPluggableGate;
/// <summary>
/// An adapter implementating of AsyncRx.NET's <see cref="IAsyncGate"/> around Nito.AsyncEx's
/// <see cref="Nito.AsyncEx.AsyncLock"/>.
/// </summary>
internal class NitoAsyncLockAsRxAsyncGate : IAsyncGate
{
private IDisposable? _releaser;
public Nito.AsyncEx.AsyncLock Lock { get; } = new();
public async ValueTask<AsyncGateReleaser> LockAsync()
{
// The Nito AsyncLock returns a disposable that we must use to release the lock.
// Since this gate is by design one-at-a-time (it does not support re-entrancy,
// unlike AsyncRx.NET's own AsyncGate) we can be confident that once this
// returns, we will have the lock, and the _releaser will be null.
IDisposable releaser = await this.Lock.LockAsync().ConfigureAwait(false);
// Locking purely to avoid any out-of-order memory issues. A more clever implementation
// could use memory barriers, but it's really easy to introduce bugs that way.
lock (this)
{
if (this._releaser is not null)
{
throw new InvalidOperationException("NitoAsyncLockAsRxAsyncGate in invalid state - _releaser not null even though nothing else should own this lock");
}
this._releaser = releaser;
}
return new AsyncGateReleaser(this);
}
public void Release()
{
// As above, we are using lock just to avoid out-of-order memory access issues.
// We shouldn't ever actually see concurrent access to _releaser here, because LockAsync
// can't proceed until the underlying lock becomes available, and that won't happen
// until this method reaches the point where it calls releaser.Dispose(), which happens
// after we've finished checking and then updating the field.
// But without some sort of memory barrier, it's possible in theory that the memory
// write that sets _releaser to null could become visible to another thread only after
// the call to releaser.Dispose(). Similarly, without memory barriers, the write that
// LockAsync makes to set _releaser might not be visible to this thread even though
// the lock has been acquired.
// (This is all vanishingly unlikely stuff, but that just means problems caused by
// bugs in this area are nearly impossible to reproduce and debug, but can still happen
// in production, especially under heavily load. The simplest way to be safe is just to
// use lock, and since we expect no contention in most cases, this should be fine.)
IDisposable releaser;
lock (this)
{
if (this._releaser is null)
{
throw new InvalidOperationException("Release called when NitoAsyncLockAsRxAsyncGate not held");
}
releaser = this._releaser;
this._releaser = null;
}
releaser.Dispose();
}
}
This was more complex than I had hoped. When I did the original design for IAsyncGate
, I hadn't thought about what it would take to write an adapter for a lock that also wanted to return you some sort of IDisposable
that you had to call Dispose
on. My design requires that IAsyncGate
provide a Release
method, and that this be able to release the lock without being given any additional context. But since Stephen's AsyncLock
requires me to call Dispose
on a specific object that it returns, it has been a bit more complex.
Now there is a YOLO version:
internal class NitoAsyncLockAsRxAsyncGate : IAsyncGate
{
private IDisposable? _releaser;
public Nito.AsyncEx.AsyncLock Lock { get; } = new();
public async ValueTask<AsyncGateReleaser> LockAsync()
{
IDisposable releaser = await this.Lock.LockAsync().ConfigureAwait(false);
this._releaser = releaser;
return new AsyncGateReleaser(this);
}
public void Release()
{
IDisposable releaser = this._releaser;
this._releaser = null;
releaser.Dispose();
}
}
That's actually the essence of it. And perhaps we might think there's no need for any sort of locking around access to the _releaser
field because first line in LockAsync
is inherently in a kind of rendez-vous with the Dispose
call in Release
. However, out of order memory accesses can sometimes really mess with this sort of reasoning. It would take quite a lot of work to be absolutely confident that this simpler version is actually OK.
Either way, this is more complex than I'd hoped. The design of IAsyncGate
was specifically to avoid forcing lock implementations to provide an IDisposable
if they didn't want to. But what I failed to anticipate is that it would make life harder for implementations that wrap something that does provide an IDisposable
for unlocking.
So I'm now wondering whether I should make a small addition to AsyncGateReleaser
. If we gave it two constructors, one taking an IAsyncGate
and the other an IDisposable
, and it would call either Release
or Dispose
when disposed depending on how it was initialized, this lock adapter would just become this:
internal class NitoAsyncLockAsRxAsyncGate : IAsyncGate
{
public Nito.AsyncEx.AsyncLock Lock { get; } = new();
public async ValueTask<AsyncGateReleaser> LockAsync()
{
IDisposable releaser = await this.Lock.LockAsync().ConfigureAwait(false);
return new AsyncGateReleaser(release);
}
public void Release()
{
// Nothing to do, because the IDisposable we return via LockAsync does all the work
}
}
That seems simpler, at the expense of a potentially more confusing design. (There are now two ways you can choose to implement gate release in IAsyncGate
: either you can return an IDisposable
wrapped in an AsyncGateReleaser
or, if that's not convenient, you can just have the AsyncGateRelaser
call Release
(which is the only option in the current design).
Any thoughts?
Tagging @stephencleary since this is based on his AsyncLock
, and he might have a view on the above.
What about:
public interface IAsyncGate
{
ValueTask<IAsyncGateReleaser> LockAsync();
}
public interface IAsyncGateReleaser
{
void Release();
}
Then you can implement like this:
internal class NitoAsyncGate : IAsyncGate
{
public Nito.AsyncEx.AsyncLock Lock { get; } = new();
public async ValueTask<IAsyncGateReleaser> LockAsync()
{
return new Releaser(await Lock.LockAsync().ConfigureAwait(false));
}
private class Releaser(IDisposable releaser) : IAsyncGateReleaser
{
public void Release() => releaser.Dispose();
}
}
Or like this:
internal class MyCustomAsyncGate : IAsyncGate, IAsyncGateReleaser
{
public MyCustomAsyncLock Lock { get; } = new();
public async ValueTask<IAsyncGateReleaser> LockAsync()
{
await Lock.EnterAsync();
return this;
}
public void Release() => Lock.Exit();
}
P.S. I implemented a custom AsyncLock that returns a disposable struct to avoid allocations. It's very efficient, and it would make me want to use public interface IAsyncGate<TAsyncGateReleaser> where TAsyncGateReleaser : IAsyncGateReleaser
, but that's a bit of a mouthful, and probably not a very good fit for this library. I could still implement it like your first example.
The underlying
SemaphoreSlim.WaitAsync
already supports cancellation. Can a parameter be exposed to pass a token down to this line?https://github.com/dotnet/reactive/blob/5f831de0bc70bc660d21c3b9e04e581269691a2a/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGate.cs#L35