dotnet / runtime

.NET is a cross-platform runtime for cloud, mobile, desktop, and IoT apps.
https://docs.microsoft.com/dotnet/core/
MIT License
15.06k stars 4.69k forks source link

[API Proposal]: Add rendezvous channel without buffer #94046

Open Szer opened 11 months ago

Szer commented 11 months ago

Background and motivation

Problem

let ch = Channel.CreateBounded 1
ch.Write "hi"
Console.WriteLine "consumer received"

The problem is - the last line is a lie, there is no guarantee neither that the consumer received anything, nor there is a consumer in the first place all because that channel has a BUFFER.

Reference

Channels are messaging primitives allowing processes/actors/threads/fiber/jobs/tasks to communicate and synchronize their work and data flow.

Synchronization is achieved by blocking both consumer and receiver until they are ready to communicate and pass data between each other. This is actually a default behaviour of channels in theory (pi-calculus) and practice (Go, Kotlin, F# Hopac, CML)

The problem above is solved in Go precisely as described because default Go channels don't have any buffer and they will block the producer until the consumer is ready and vice versa.

In Kotlin it is achieved by creating Channel with capacity = 0

In .NET you can't create BoundedChannel with capacity=0

So I propose to add Channel.CreateRendezvous() which will create a channel without any buffers to achieve messaging synchronization and probably some selective messaging (e.g. read multiple channels, read 1, leave all others intact)

API Proposal

namespace System.Threading.Channels;

public static class Channel
{
    public static Channel<T> CreateRendezvous<T>(RendezvousChannelOptions options)
}

API Usage

let fancyChannel = Channel.CreateRendezvous()
do! fancyChannel.WriteAsync 42 // blocked until consumer is ready
Console.WriteLine "consumer received our message!" // not a lie as we know it definitely received

Alternative Designs

Make BoundedChannel work with capacity=0

Risks

N/A

ghost commented 11 months ago

Tagging subscribers to this area: @dotnet/area-system-threading-channels See info in area-owners.md if you want to be subscribed.

Issue Details
### Background and motivation ## Problem ```fsharp let ch = Channel.CreateBounded 1 ch.Write "hi" Console.WriteLine "consumer received" ``` The problem is - **the last line is a lie**, there is no guarantee neither that the consumer received anything, nor there is a consumer in the first place all because that channel has a BUFFER. ## Reference Channels are messaging primitives allowing processes/actors/threads/fiber/jobs/tasks to communicate and synchronize their work and data flow. Synchronization is achieved by blocking both consumer and receiver until they are ready to communicate and pass data between each other. This is actually a default behaviour of channels in theory ([pi-calculus](https://en.wikipedia.org/wiki/%CE%A0-calculus)) and practice (Go, Kotlin, F# Hopac, CML) The problem above is solved in Go precisely as described because default Go channels don't have any buffer and they will block the producer until the consumer is ready and vice versa. In Kotlin it is achieved by creating Channel with [capacity = 0](https://github.com/Kotlin/kotlinx.coroutines/blob/ed0cf7aa02b1266cb81e65e61b3a97b0e041a817/kotlinx-coroutines-core/common/src/channels/Channel.kt#L779-L784) In .NET you can't create `BoundedChannel` with capacity=0 - because capacity [can't be lower ](https://github.com/dotnet/runtime/blob/2f6f3c55e4c230ac9134abeaef494c17c01c97f7/src/libraries/System.Threading.Channels/src/System/Threading/Channels/BoundedChannel.cs#L48)than 1 - even if we could, it will be UB what will happen with capacity=0 and `BoundedChannelFullMode` other than `Wait` So I propose to add `Channel.CreateRendezvous()` which will create a channel without any buffers to achieve messaging synchronization and probably some selective messaging (e.g. read multiple channels, read 1, leave all others intact) ### API Proposal ```csharp namespace System.Threading.Channels; public static class Channel { public static Channel CreateRendezvous(RendezvousChannelOptions options) } ``` ### API Usage ```fsharp let fancyChannel = Channel.CreateRendezvous() do! c.WriteAsync 42 // blocked until consumer is ready Console.WriteLine "consumer received our message!" // not a lie as we know it definitely received ``` ### Alternative Designs Make BoundedChannel work with capacity=0 ### Risks N/A
Author: Szer
Assignees: -
Labels: `api-suggestion`, `area-System.Threading.Channels`
Milestone: -
Szer commented 11 months ago

@eiriktsarpalis as a maintainer of Channels namespace, could you please take a look at this in your spare time? Thank you!

stephentoub commented 11 months ago

We've considered such an implementation in the past, e.g. with CreateBounded and a capacity of 0 rather than a dedicated new name. The initial prototype of the library actually had it. But some of the APIs in the abstraction would not play well with it. For example, consider the default implementation of ReadAllAsync: https://github.com/dotnet/runtime/blob/655b177da2e423a04a60a331bc6c11fe5ec427c3/src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelReader.netcoreapp.cs#L20-L26 If a producer only ever uses TryWrite, this method will never be able to receive anything, as the only way it could is if the TryRead/TryWrite occurred at exactly the same time, and even then, most likely synchronization would prevent that from ever happening.

Szer commented 11 months ago

@stephentoub do you think it is a show stopper for making such a needed Channel?

Might there be a possibility to create a new abstract class ChannelSlim<T> without those methods?

Lanayx commented 11 months ago

while (await WaitToReadAsync(cancellationToken).ConfigureAwait(false)) { while (TryRead(out T? item)) { yield return item; } }

This should be possible to rewrite as

 while (await Read(cancellationToken, out T? item)) 
 { 
     yield return item; 
 }

People who want to squeeze all performance just won't be using that helper, but such change will allow for the suggestion to happen

stephentoub commented 11 months ago

do you think it is a show stopper for making such a needed Channel?

Showstopper, no, but the proposal needs to factor that in. Just a new implementation of the current abstraction could be a pit of failure.

Might there be a possibility to create a new abstract class ChannelSlim without those methods?

I'm not excited about implementing an entire new set of abstractions for this.

This should be possible to rewrite as

Such a Read method doesn't exist (and can't with such an out parameter while also being an async method). The implementation of such a method in terms of the existing APIs would need to either be in terms of WaitToReadAsync, as in the previous example, or ReadAsync, which will incur an exception when the channel hits eof, e.g.

public static async ValueTask<(bool, T)> ReadAsync<T>(this ChannelReader<T> reader, CancellationToken cancellationToken)
{
    try
    {
        return (true, await reader.ReadAsync(cancellationToken).ConfigureAwait(false));
    }
    catch (ChannelClosedException)
    {
         return (false, default(T));
    }
}

Beyond the akwardness of consuming that, it puts exceptions on the success path, which is something we try to avoid.

But even if that particular example gets rewritten, the problem isn't whether it can be rewritten, but that it needs to be: any code out there written against a ChannelReader<T> might interact very poorly with such a channel implementation, as if it uses WaitToReadAsync/TryRead (which is often the recommended pairing for both efficiency and if the channel isn't infinite) rather than ReadAsync, it would never work with someone using TryWrite on the other side.

Szer commented 11 months ago

I'm not excited about implementing an entire new set of abstractions for this.

not an entirely new set, but a more "core" Channel without all methods.

e.g.

// new core abstraction
abstract class ChannelCore<T> {
  // all core methods go here
}

// new fancy channel
class RendezvousChannel<T>: ChannelCore<T> {
  ...
}

// old abstraction, having all the same methods, where most of them come from ChannelCore
abstract class Channel<T>: ChannelCore<T> {
  // TryRead, TryWrite goes here
}

// old implementation, no changes here
class BoundedChannel<T>: Channel<T> {
  ...
}

I'm not sure whether this is a binary-breaking change, but that could make RendezvousChannel possible

nazar554 commented 11 months ago

Is this similar to AsyncExchanger from dotNext?

Szer commented 11 months ago

Is this similar to AsyncExchanger from dotNext?

kind of, but the channel we are discussing is one-way

xeromorph commented 11 months ago

If a producer only ever uses TryWrite, this method will never be able to receive anything, as the only way it could is if the TryRead/TryWrite occurred at exactly the same time, and even then, most likely synchronization would prevent that from ever happening.

I don't think it's a valid point, cause FWIW there's already an abstraction leak present in case of UnboundedChannel, where you have implicit guarantee for TryWrite to always succeed. In general, I'd assume a fall back to async WriteAsync is pertinent in case of TryWrite returning false while channel isn't closed, and that's what unbuffered(rendezvous) channel would return unconditionally by design.

stephentoub commented 11 months ago

there's already an abstraction leak present in case of UnboundedChannel, where you have implicit guarantee for TryWrite to always succeed

That's not leaky. It's fine for TryWrite to always succeed, and it's fine for it to sometimes succeed. The problem is if it never succeeds, which is the concern here.

TryWrite / WaitToWriteAsync are the core of the abstraction; they're abstract, while WriteAsync is virtual (with a default implementation built on top of TryWrite and WaitToWriteAsync); same on the read side, with TryRead / WaitToReadAsync being abstract, and ReadAsync being a virtual with a default implementation layered on top. The expectation is that you can use WaitToWriteAsync, and when it completes with true, a TryWrite will typically succeed (whether it's yours or another competing write, or in the rare case where it's "consumed" by the operation being canceled). The difficulty with such a rendezvous channel is this may not be the case, depending on how the read side is implemented.

Imagine a method someone writes that will read one item from one of two channels, whichever has data first:

public static ValueTask<T> ReadAnyAsync<T>(ChannelReader<T> reader1, ChannelReader<T> reader2);

You can't just use ReadAsync on both reader1 and reader2, as doing so could end up consuming an item from both readers rather than just one of them, even if you cancel the second read the moment the first completes. So the typical implementation of such a method in terms of the abstraction would be to use WaitToReadAsync on both, effectively serving the same purpose as a "zero-byte read" common with networking and sockets, in order to be notified when data is available to read without actually consuming it. Only once you've been notified of data being available can you then consume it with TryRead. (And if you're dealing with concurrency and the possibility of someone else taking the item out from under you, you can loop.)

Same goes for the write side, e.g.

public static ValueTask WriteAnyAsync<T>(T item, ChannelWriter<T> writer1, ChannelWriter<T> writer2);

where you want to write the item to one and only one of the specified writers. If you were to WriteAsync on both, you could potentially end up writing the item to both, which would be erroneous. So you use WaitToWriteAsync to be notified of when space is available, and then TryWrite.

With a rendezvous channel, that ReadAnyAsync will work just fine if the channels are being written to with WriteAsync. And such a WriteAnyAsync will work just fine if the channels are being read from with ReadAsync. But if you were to use ReadAnyAsync with WriteAnyAsync, you'll deadlock, because none of the reads or writes will ever succeed.

My original prototype for the channels library had such a rendezvous channel, except it was exposed via a CreateUnbuffered method rather than a CreateRendezvous method, simply because I felt the terminology was less academic and more broadly understandable. But we removed it prior to shipping the initial release of System.Threading.Channels because of this issue. We can of course reconsider, but the core concern remains. See https://github.com/dotnet/runtime/issues/24831 for reference.

Szer commented 10 months ago

Just for reference, Hopac library has both TryRead/TryWrite (as Ch.Try.give and Ch.Try.take) on Channel object (which can only be unbuffered) with such doc

module Ch =
  module Try =
    /// Creates a job that attempts to give a value to another job waiting on
    /// the given channel.  The result indicates whether a value was given or
    /// not.  Note that the other side of the communication must be blocked on
    /// the channel for communication to happen.
    val inline give: Ch<'x> -> 'x -> Job<bool>

    /// Creates a job that attempts to take a value from another job waiting on
    /// the given channel.  Note that the other side of the communication must
    /// be blocked on the channel for communication to happen.
    val inline take: Ch<'x> -> Job<option<'x>>
Szer commented 10 months ago

@stephentoub do you have any thoughts on the future of this proposal? At least "approved in principle" or "closed by design"?

stephentoub commented 10 months ago

@stephentoub do you have any thoughts on the future of this proposal? At least "approved in principle" or "closed by design"?

As I've noted, I'm fine with the concept and am simply concerned about possible ramifications.

stephentoub commented 9 months ago

One possible mitigation would be to give the unbuffered channel a delay option, such that TryRead/TryWrite would actually be more like e.g.

// pseudo-code for the logic; actual implementation would be more efficient
if (!TryWrite(item))
{
    using var cts = new CancellationTokenSource(_delay);
    try { await WriteAsync(item, cts.Token); }
    catch { return false; }
}
return true;

where _delay would be set by an argument passed to CreateUnbuffered and would default to something small and selected based on testing. The goal here would be to make it so that, by default, calls to TryRead/Write could still be used in the expected patterns, and would probabilistically allow for forward progress. A _delay of 0 could be selected to disable it.

Thoughts?

We'd need to actually try it out, though.

xeromorph commented 9 months ago

@stephentoub what a clever idea to preserve overall API consistency, I concur. I wonder though how would sync TryWrite handle calling async WriteAsync as a fallback, some kind of spin-lock or just blatant sync-over-async there?

stephentoub commented 9 months ago

@stephentoub what a clever idea to preserve overall API consistency, I concur. I wonder though how would sync TryWrite handle calling async WriteAsync as a fallback, some kind of spin-lock or just blatant sync-over-async there?

It would necessarily block in some way until either the request was satisfied, it would never be satisfied (e.g. the channel was closed), or the delay expired. How that blocking is achieved, whether via a kernel wait or spinning or some combination, would be an implementation detail. The TryWrite override and the WriteAsync override are part of the same implementation on a sealed internal type, so TryWrite wouldn't actually need to delegate to WriteAsync, but rather utilize a similar approach.