随着多核处理器的普及,如何高效地进行并发与并行编程,已经成为 C# 开发者不可回避的话题。虽然 .NET 提供了诸如 Task
、Parallel
等多种并发工具,但在处理流水线(Pipeline)、异步消息传递等场景时,很多开发者会发现这些工具还不够“顺手”。此时,Dataflow
就派上了用场。
Dataflow 是 .NET 提供的一套基于数据流的并发编程库,属于 System.Threading.Tasks.Dataflow
命名空间。Dataflow 支持将应用划分为多个独立的、可并发执行的“数据块”(Block),每个数据块通过异步的消息传递机制进行数据处理和通信,天然适合构建数据处理流水线、事件驱动架构和异步队列等高效并行系统。简单来说:Dataflow 是把程序拆解为数据经过的各个阶段,各阶段可并发执行,之间通过异步流传递数据。
Block 是 Dataflow 的基石,负责承载数据的接收、处理和输出。分为三类:
BroadcastBlock<T>
, BufferBlock<T>
ActionBlock<T>
TransformBlock<TInput, TOutput>
, TransformManyBlock<TInput, TOutput>
在 Block 之间流动的数据单元,异步、顺序、安全地传递。
通过 LinkTo()
方法,将多个 Block 连接起来,形成完整的数据流管道。
类型 | 作用 |
---|---|
BufferBlock<T> | 数据缓冲区(生产者-消费者队列) |
ActionBlock<T> | 执行异步操作的目标块(如处理消息) |
TransformBlock<T,U> | 将输入转换为新类型的处理块(类似Select) |
BroadcastBlock<T> | 向所有链接块广播数据 |
BatchBlock<T> | 将数据分组为批次 |
JoinBlock<T,U> | 合并多个来源的数据 |
数据流网络 | 通过LinkTo()连接多个块形成处理管道 |
Dataflow 非常适合如下场景:
Dataflow 的核心机制是异步消息传递和消费。简要流程如下:
数据入站
Post()
或 SendAsync()
等方式推入头部 Block(比如 BufferBlock
、TransformBlock
)。MaxDegreeOfParallelism
),由线程池分配线程异步进行处理。LinkTo()
传递到下一个 Block,实现数据流动。可以实现一对一或一对多(广播)连接。Complete()
表示不再有新消息,完成后可等待 Completion
任务,链路自动通告下游 Block 完成。Completion
捕获,保证链路安全关闭。dotnet add package System.Threading.Tasks.Dataflow
下面用一个简单的例子演示 Dataflow 如何串联数据处理流程:假如我们有图片路径列表,需要进行加载(读取图片内容)→ 缩放 → 保存,每一步并行处理。
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespaceDataflowDemo
{
classProgram
{
static async Task Main(string[] args)
{
// 1. 读取图片内容(异步 IO 阶段)
var loadBlock = new TransformBlock<string, byte[]>(async path =>
{
Console.WriteLine($"加载: {path}");
await Task.Delay(100); // 模拟IO
returnnewbyte[1024]; // 模拟图片内容
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
// 2. 缩放图片
var resizeBlock = new TransformBlock<byte[], byte[]>(imageData =>
{
Console.WriteLine("缩放图片");
// 模拟变换
returnnewbyte[512];
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
// 3. 保存图片
var saveBlock = new ActionBlock<byte[]>(imageData =>
{
Console.WriteLine("保存图片");
// 模拟保存
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
// 链接 block,构成数据流
loadBlock.LinkTo(resizeBlock, new DataflowLinkOptions { PropagateCompletion = true });
resizeBlock.LinkTo(saveBlock, new DataflowLinkOptions { PropagateCompletion = true });
// 发送图片路径到第一个 block
string[] imagePaths = { "img1.jpg", "img2.jpg", "img3.jpg", "img4.jpg" };
foreach (var path in imagePaths)
{
await loadBlock.SendAsync(path);
}
// 标记管道已完成,无更多输入
loadBlock.Complete();
// 等待全部处理结束
await saveBlock.Completion;
Console.WriteLine("所有图片处理完成!");
}
}
}
代码要点说明
PropagateCompletion = true
保证上游完成时自动通知下游。Complete()
,最终通过 await Completion
等待管道结束。using System.Threading.Tasks.Dataflow;
// 1. 创建缓冲块(最大容量10)
var buffer = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 10 });
// 2. 创建转换块(将数字转为字符串)
var transform = new TransformBlock<int, string>(n =>
$"ID-{n}", new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
// 3. 创建执行块(异步写入数据库)
var action = new ActionBlock<string>(async s =>
{
await Database.SaveAsync(s);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
// 构建管道
buffer.LinkTo(transform);
transform.LinkTo(action);
// 生产者推送数据
for (int i = 0; i < 100; i++)
{
await buffer.SendAsync(i); // 自动背压控制
}
// 标记完成并等待结束
buffer.Complete();
await action.Completion;
关键配置说明:
BoundedCapacity=10
:防止内存溢出MaxDegreeOfParallelism=4
:允许4个转换任务并行Completion.Wait()
:确保所有数据处理完毕Dataflow 是 .NET 平台上极为强大的并发与异步数据流处理库,非常适合用于构建多阶段数据处理、异步管道、后台流水线等场景,工业级ETL系统、实时交易引擎等复杂应用。通过模块化设计和自动资源管理,彻底解放了开发者对底层线程的操控负担。在需要处理高吞吐量数据流的场景中,它比手动管理Task
或ThreadPool
更高效可靠。它极大地简化了消息流、线程管理、同步与容错的复杂性,让开发者可以更关注于业务的分阶段处理。
参考链接: