Open-NET-Libraries / Open.ChannelExtensions

A set of extensions for optimizing/simplifying System.Threading.Channels usage.
https://open-net-libraries.github.io/Open.ChannelExtensions/api/Open.ChannelExtensions.Extensions.html#methods
MIT License
401 stars 25 forks source link

ReadAllConcurrently completes before last task completes #26

Closed JKamsker closed 1 year ago

JKamsker commented 1 year ago
var sourceFileChannel = ConsumeFilesToChannel(metaJson.Files);

var dlChannel = Channel.CreateBounded<FileMetaDataDto>(10);

var checkTask = sourceFileChannel.ReadAllConcurrently(maxConcurrency: 30, async file =>
{
    var filePath = Path.Combine(rootDirectory, file.Path);
    if (File.Exists(filePath))
    {
        var existingMd5 = await CalculateMd5Async(filePath);
        if (existingMd5 == file.Md5)
        {
            //Console.WriteLine($"Skipping {file.Path} because it already exists and has the same MD5");
            return;
        }
        else
        {
            // Write to DL channel
            await dlChannel.Writer.WriteAsync(file);
        }
    }
    else
    {
        // Write to to DL channel
        await dlChannel.Writer.WriteAsync(file);
    }
}).ContinueWith(async x =>
{

    // This is required since the ReadAllConcurrently completes before the last task completed
    await dlChannel.Writer.WaitToWriteAsync();
    await Task.Delay(TimeSpan.FromMilliseconds(100));
    // End
    dlChannel.Writer.Complete();
});

awaiting ReadAllConcurrently is appearantly not enougth to ensure all tasks it executed, completed. Not doing this hack will result in the dlChannel complaining about already being closed when trying to write to it.

JKamsker commented 1 year ago

Ok lol used the wrong method, should have used ReadAllConcurrentlyAsync xD