Channels: C# 实现高效的线程间通信

在C#中,Channel是.NET Core 3.0及更高版本引入的一种新的集合类型,位于System.Threading.Channels命名空间下。主要用于实现生产者-消费者模式,支持异步编程、高性能和线程安全。

应用场景

  • o 生产者-消费者模式:最典型的应用场景。
  • o 流水线模式:多个步骤通过Channel传递数据。
  • o 发布-订阅模式:生产者发布消息,多个消费者订阅。

安装

在使用 .NET Core 或 .NET 5/6 时,默认包含的。如果项目中没有这个库,可以使用 NuGet 包管理器安装:

dotnet add package System.Threading.Channels

1. Channel的类型

  • o 无界通道(Unbounded Channel):可以容纳任意数量的元素。创建方式如下:
    var channel = Channel.CreateUnbounded<int>();
  • o 有界通道(Bounded Channel):具有最大容量限制。创建方式如下:
    var channel = Channel.CreateBounded<int>(10); // 最大容量为10

2. Channel的使用

  • o 生产者:通过channel.Writer.WriteAsync()方法写入数据。
    await channel.Writer.WriteAsync(data);
  • o 消费者:通过channel.Reader.ReadAllAsync()channel.Reader.WaitToReadAsync()读取数据。
    await foreach (var item in channel.Reader.ReadAllAsync())
    {
    Console.WriteLine(item);
    }

3. Channel的配置

  • o 单生产者/单消费者模式:可以通过SingleWriterSingleReader属性设置。
  • o 满时策略(FullMode):当有界通道已满时,可以选择以下策略:
    • o Wait:等待直到有空间。
    • o DropNewest:丢弃最新数据。
    • o DropOldest:丢弃最旧数据。
    • o DropWrite:丢弃写入的数据。

4. 高级用法

  • o 批量处理:可以一次读取多个元素。
    IAsyncEnumerable<int> batch = channel.Reader.ReadBatchAsync(10);
  • o 取消令牌(CancellationToken):支持取消读取或写入操作。
    await channel.Reader.WaitToReadAsync(cts.Token);

示例:生产者消费者模式

using System;
using System.Threading.Channels;
using System.Threading.Tasks;

classProgram
{
static async Task Main(string[] args)
{
var channel = Channel.CreateUnbounded<int>();

// 启动生产者和消费者
var producerTask = ProduceItems(channel);
var consumerTask = ConsumeItems(channel);

// 等待任务完成
await Task.WhenAll(producerTask, consumerTask);
}

static async Task ProduceItems(Channel<int> channel)
{
for (int i = 0; i < 10; i++)
{
await channel.Writer.WriteAsync(i);
Console.WriteLine($"Produced: {i}");
}
channel.Writer.Complete(); // 完成写入
}

static async Task ConsumeItems(Channel<int> channel)
{
awaitforeach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Consumed: {item}");
}
}
}

总结

C# Channels是一种强大的异步编程工具,适用于处理并发和流式数据。

原文链接:,转发请注明来源!