DotNETWeekly-io / DotNetWeekly

DotNet weekly newsletter
MIT License
207 stars 3 forks source link

【文章推荐】Channel 类型使用 #721

Closed LessIsMoreInSZ closed 1 month ago

LessIsMoreInSZ commented 2 months ago

https://dotnetcoretutorials.com/using-channels-in-net-core-part-2-advanced-channels/

gaufung commented 1 month ago

image

Channel 是 C# 类型,它可以帮助我们实现类似生产者(Producer)和消费者(Consumer)的设计模式,比如

var myChannel = Channel.CreateUnbounded();
for(int i=0; i < 10; i++){
    await myChannel.Writer.WriteAsync(i);
}
while(true)
{
     var item = await myChannel.Reader.ReadAsync();
     Console.WriteLine(item);
}

那么它和其它使用 Queue 实现有什么区别呢?

如果使用 Queue 的话,实现的大概如下:

class MyProducer {
    private readonly Queue<int> _queue;
    public MyProducer(Queue<int> queue) {
        _queue = queue;
    }
}
class MyConsumer {
    private readonly Queue<int> _queue;
    public MyConsumer(Queue<int> queue) {
        _queue = queue;
    }
}

我们需要将 Queue 的实例作为 ProducerConsumer 的构造函数,这个要求我们只能在 Producer 中使用 Enqueue 而且 Consumer 中使用 Dequeue, 但是这个只能要求开发者保持这个约束。如果使用 Channel 的话,可以规避这个问题

class MyProducer{
    private readonly ChannelWriter<int> _channelWriter;
    public MyProducer(ChannelWriter<int> channelWriter){
        _channelWriter = channelWriter;
    }
}

class MyConsumer{
    private readonly ChannelReader<int> _channelReader;
    public MyConsumer(ChannelReader<int> channelReader){
        _channelReader = channelReader;
    }
}

我们可以传递不同的 Channel 中的实现给不同的 ProducerConsumer

Channel 中的 Writer 可以通知该 Channel 已经关闭,这样 Consumer 可以释放响应的资源

_ = Task.Factory.StartNew(async () => {
    for (int i = 0; i < 10; i++) {
        await channel.Writer.WriteAsync(i);
        await Task.Delay(10);
    }

    channel.Writer.Complete();
});

try
{
    while(true) {
    var item = await channel.Reader.ReadAsync();
    Console.WriteLine(item);
    }
}
catch (ChannelClosedException)
{
    Console.WriteLine("Channel is closed");
}

Writer.Complete() 方法可以让 Reader.ReadAsync() 方法抛出 ChannelClosedException。当然也可以使用 await foreach 方法避免捕获异常

await foreach (var item in channel.Reader.ReadAllAsync()) {
    Console.WriteLine(item);
    await Task.Delay(100);
}

除了 UnboundedChannel, 也可以使用 BoundedChannel, 它可以配置 Channel 在写入的行为

var channelOptions = new BoundedChannelOptions(10) {
    FullMode = BoundedChannelFullMode.Wait
};

var myChannel = Channel.CreateBounded<int>(channelOptions);

比如这里我们创建了只有 10 大小的 Channel, 而且在当 Channel 满了,后续的写入处于等待状态。