dotnet / dotNext

Next generation API for .NET
https://dotnet.github.io/dotNext/
MIT License
1.62k stars 121 forks source link

Persistent channels cannot be reentrant for consumption #136

Closed gwhzh21 closed 1 year ago

gwhzh21 commented 1 year ago

Using sample https://dotnet.github.io/dotNext/features/threading/channel.html ,but not 500M,only 1K. 1.// consumer 2.set Location="e:/testdata",then open program,Wait a minute,then files created.close program.Repeat three times. 3.consumer 4.open program 5.when consum 1K items,show System.Runtime.Serialization.SerializationException: Binary stream '0' does not contain a valid BinaryHeader. Possible causes are invalid stream or object version change between serialization and deserialization. at System.Runtime.Serialization.Formatters.Binary.BinaryParser.Run() at System.Runtime.Serialization.Formatters.Binary.ObjectReader.Deserialize(BinaryParser serParser) at System.Runtime.Serialization.Formatters.Binary.BinaryFormatter.Deserialize(Stream serializationStream) 6.reopen promgram
when consum 1K items,show System.Runtime.Serialization.SerializationException: Binary stream '0' does not contain a valid BinaryHeader. Possible causes are invalid stream or object version change between serialization and deserialization. at System.Runtime.Serialization.Formatters.Binary.BinaryParser.Run() at System.Runtime.Serialization.Formatters.Binary.ObjectReader.Deserialize(BinaryParser serParser) at System.Runtime.Serialization.Formatters.Binary.BinaryFormatter.Deserialize(Stream serializationStream) .... the error that cannot be de-ordered is caused and the program has to be restarted. After restarting, you can continue to consume, What configuration did I overlook?

sakno commented 1 year ago

@gwhzh21 , could you provide full repro code and steps? What you mean by // consumer ?

gwhzh21 commented 1 year ago

using (var channel = new MySerializationChannel(new PersistentChannelOptions { Location="e:/testdb", PartitionCapacity = 100 })) {
// var consumer = Consume(channel.Reader); var producer = Produce(channel.Writer); await Task.WhenAll( // consumer //, producer ); }

gwhzh21 commented 1 year ago

private static async Task Produce(ChannelWriter writer) { for (decimal i = 0M; i < 1000; i++) { await writer.WriteAsync(i); } }

gwhzh21 commented 1 year ago

private static async Task Consume(ChannelReader reader) { try {

            while (await reader.WaitToReadAsync())
            {
                await Task.Delay(100);
                while (reader.TryRead(out var item))
                {
                    Console.WriteLine(item);
                }

            }
            Console.WriteLine("f");
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
        }
    }
gwhzh21 commented 1 year ago

// consumer
comments consumer

sakno commented 1 year ago

When to consume the channel, if it is commented out? Ideally, the code is a unit test reproducing the issue, e.g. https://github.com/dotnet/dotNext/blob/master/src/DotNext.Tests/Threading/Channels/PersistentChannelTests.cs

gwhzh21 commented 1 year ago

https://github.com/gwhzh21/testcode my test code

sakno commented 1 year ago

The bug is confirmed. Can be reproduced without binary formatter.