dotnet / dotNext

Next generation API for .NET
https://dotnet.github.io/dotNext/
MIT License
1.57k stars 119 forks source link

Possible improvement for PersistentChannel #104

Closed fernandozago closed 2 years ago

fernandozago commented 2 years ago

Hello guys.

Great work here in this repo.

I have a question about PersistentChannel, if there is a way to do a "transactional" read for PersistentChannelReader.

if there is an exception while reading: The item that is received during the foreach loop and any exception is thrown, the current item is lost.

await foreach (var item in channel.Reader.ReadAllAsync(cts.Token))
        {
            cts.Token.ThrowIfCancellationRequested(); //<-- When throws exception here
            await ProcessMessageAsync(item.Message, cts.Token); //<-- Or here this message is lost 
        }

Maybe add something to commit the "transaction".

await foreach (var item in channel.Reader.ReadAllAsync(cts.Token))
        {
            cts.Token.ThrowIfCancellationRequested(); //<-- When throws exception here
            await ProcessMessageAsync(item.Message, cts.Token); //<-- Or here this message is lost 
            channel.Reader.Advance(); // <-- Manual advance on the stream
        }
sakno commented 2 years ago

Hi @fernandozago . PersistentChannel follows the common API design provided by ChannelReader class from .NET library. I can't add Advance method to the class outside of .NEXT library.

However, since .NET 6, ChannelReader offers optional TryPeek method that allows to obtain the message without removing it from the queue. Is it make sense for you? If yes, I can consider implementation of this method. Currently, this method is not supported.

sakno commented 2 years ago

TryPeek is definitely right direction as per dotnet/runtime#31182.

fernandozago commented 2 years ago

Hi @sakno ! Thanks for the response.

Yes, makes sense. But will require to change section for a different loop right?

await foreach (...)
        { ... }

However, WaitToReadAsync is doing the AdvanceAsync for the cursor. I'll not be able to take advantage of asynchronous programing.

while (await channel.Reader.WaitToReadAsync(cts.Token)) //<-- This will advante either way
{
    if (channel.Reader.TryPeek(out var item)) //<-- TryPeek is a sync method
    {
        cts.Token.ThrowIfCancellationRequested();
        await ProcessMessageAsync(item, cts.Token);
        await channel.Reader.ReadAsync(cts.Token); //<-- Consume from the queue and advance the stream cursor 
    }
}

WaitToReadAsync should not advance the cursor stream

Thank you very much sir.

sakno commented 2 years ago

@fernandozago , yeah. I'm thinking about new extra API for such a purpose. However, I can't add new methods to ChannelReader<T> class. Probably, it will be a separate method or property in PersistentChannel<T> class.

fernandozago commented 2 years ago

@sakno Great!

Thank you very much.

Let me know if i can help with something along the lines of testing and examples

=)

sakno commented 2 years ago

@fernandozago , early implementation has been pushed to develop branch. You need to create PersistentChannelOptions as follows:

new PersistentChannelOptions { ReliableEnumeration = true }

If option ReliableEnumeration enabled, ReadAllAsync behaves exactly as you expected. Under the hood, invocation of MoveNextAsync method confirms previous read. Note that reliable async enumerator a bit slower than normal async enumerator.

fernandozago commented 2 years ago

@sakno Fantastic!

I'll check it out now!

fernandozago commented 2 years ago

@sakno I did some tests, looks awesome.

But, related to the dotnet/runtime#31182 Maybe a good ideia lock the SingleReader = true when trying to use ReliableEnumeration = true.

Both configurations SingleReader = true and ReliableEnumeration = true may generate some unpredictable bugs when trying to concurrently.

Thanks. This change is exactly what i need.

Another thing, I'm experimenting using the protobuf serialization with the PersistentChannel and came up with this code:

Installing the protobuf package: protobuf-net

[ProtoContract]
private class Model
{
    [ProtoMember(1)]
    public Guid Data { get; set; }
}

public sealed class ProtobufSerializationChannel<T> : PersistentChannel<T, T>
    where T : notnull
{
    private readonly PrefixStyle _prefixStyle;

    public ProtobufSerializationChannel(PersistentChannelOptions options, PrefixStyle protobufPrefixStyle) : base(options)
    {
        _prefixStyle = protobufPrefixStyle;
    }

    protected override ValueTask<T> DeserializeAsync(Stream input, CancellationToken token) =>
        ValueTask.FromResult(Serializer.DeserializeWithLengthPrefix<T>(input, _prefixStyle));

    protected override ValueTask SerializeAsync(T input, Stream output, CancellationToken token)
    {
        Serializer.SerializeWithLengthPrefix(output, input, _prefixStyle);
        return ValueTask.CompletedTask;
    }
}
sakno commented 2 years ago

@fernandozago , ReliableEnumeration has impact on ReadAllAsync method only. ReadAsync and TryRead are not affected. ReliableEnumeration has no tight coupling with SingleReader property. You can set it to false. However, there is only one reliable enumerator can exist even if multiple readers are allowed. This behavior is controlled by the internal lock. It means that any other reader will wait for the end of enumeration.

sakno commented 2 years ago

Related to serialization logic: the code shown by you is not optimal. It doesn't use async stream operations. It means that all ser/deser logic passes through async-to-sync transition.

fernandozago commented 2 years ago

@sakno Sounds good about ReliableEnumeration!

The serialization logic of protobuf meant to be sync because there is no IO call on DeserializeWithLengthPrefix and SerializeWithLengthPrefix.

I'm benchmarking it using BenchmarkDotNet and checking the difference between using Marshal and Protobuf

sakno commented 2 years ago

Closing issue. Fixed in 4.4.0. Release has been published.