.NET Rx(Reactive Extensions)它提供了一种强大的数据流操作和组合方式,以便你可以更简单地处理异步数据流,如用户界面事件、异步请求、消息等。在许多情况下,这些数据流可能会很难用常规的迭代技术来表达。
Rx库提供了一种使用可观察序列进行异步编程的模型,它基于观察者设计模式并结合了迭代器模式和功能编程的概念。Rx使开发人员可以对这些数据流进行各种操作,如过滤、选择、转换、合并等。
以下是一些主要的特点:
Install-Package System.Reactive(1)基础使用
using System;
using System.Reactive.Linq;
class Program
{
static void Main()
{
// 创建一个Observable序列
var observable = Observable.Range(1, 5);
// 订阅这个Observable序列,并指定对每个元素和结束时应执行的操作
var subscription = observable.Subscribe(
value => Console.WriteLine($"OnNext: {value}"), // 当得到新值时执行的操作
ex => Console.WriteLine($"OnError: {ex.Message}"), // 当发生错误时执行的操作
() => Console.WriteLine("OnCompleted") // 当序列完成时执行的操作
);
Console.ReadKey();
// 取消订阅
subscription.Dispose();
}
}(2)处理实时数据
using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
class Program
{
static void Main()
{
IObservable<StockPrice> stockPrices = GetRealTimeStockPrices();
IDisposable subscription = stockPrices
.Where(price => price.Value > 100)
.Subscribe(
price => Console.WriteLine($"High stock price detected: {price.Symbol} at {price.Value}"),
ex => Console.WriteLine($"OnError: {ex.Message}"),
() => Console.WriteLine("OnCompleted"));
Console.Read( );
// 在某个时刻,你可能想取消订阅
subscription.Dispose();
}
class StockPrice
{
public string Symbol { get; set; }
public double Value { get; set; }
}
static IObservable<StockPrice> GetRealTimeStockPrices()
{
// 创建一个 Subject
var subject = new Subject<StockPrice>();
// 在后台线程上生成模拟数据
Task.Run(() =>
{
var random = new Random();
while (true)
{
var price = new StockPrice
{
Symbol = "MSFT",
Value = 90 + (random.NextDouble() * 20)
};
// 向 Subject 发送新的价格
subject.OnNext(price);
// 暂停一段时间以模拟实时数据
Thread.Sleep(1000);
}
});
// 返回 Observable
return subject.AsObservable();
}
}
Rx还提供了大量的操作符,比如:
Where, Distinct, Skip, Take 等。Select, SelectMany, Scan, Buffer 等。Concat, Merge, Zip, CombineLatest 等。Catch, Retry, OnErrorResumeNext 等。Using, Delay, TimeInterval, Timeout 答等。这些操作符可以让你更加方便地处理和操作数据流,满足不同场景需要。
https://learn.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh242981(v=vs.103)
https://learn.microsoft.com/en-us/dotnet/api/system.iobservable-1?view=net-7.0&devlangs=csharp&f1url=%3FappId%3DDev16IDEF1%26l%3DEN-US%26k%3Dk(System.IObservable%25601)%3Bk(DevLang-csharp)%26rd%3Dtrue