首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Rx.NET基础使用

Rx.NET基础使用

作者头像
JusterZhu
发布2023-09-18 17:25:02
发布2023-09-18 17:25:02
6990
举报
文章被收录于专栏:JusterZhuJusterZhu

1.概要

.NET Rx(Reactive Extensions)它提供了一种强大的数据流操作和组合方式,以便你可以更简单地处理异步数据流,如用户界面事件、异步请求、消息等。在许多情况下,这些数据流可能会很难用常规的迭代技术来表达。

Rx库提供了一种使用可观察序列进行异步编程的模型,它基于观察者设计模式并结合了迭代器模式和功能编程的概念。Rx使开发人员可以对这些数据流进行各种操作,如过滤、选择、转换、合并等。

以下是一些主要的特点:

  • 它将所有数据源视为可观察数据流(或被称为可观察对象)。
  • 它提供了丰富的API允许开发者对这些可观察对象进行转换、过滤、聚合、连接等操作。
  • 它提供了一种统一方式处理同步和异步数据源。
  • 它有助于管理和协调异步操作和事件,降低了代码复杂性。

2.详细内容

安装

代码语言:javascript
复制
Install-Package System.Reactive

使用

(1)基础使用

代码语言:javascript
复制
using System;
using System.Reactive.Linq;

class Program
{
    static void Main()
    {
        // 创建一个Observable序列
        var observable = Observable.Range(1, 5);

        // 订阅这个Observable序列,并指定对每个元素和结束时应执行的操作
        var subscription = observable.Subscribe(
            value => Console.WriteLine($"OnNext: {value}"),  // 当得到新值时执行的操作
            ex => Console.WriteLine($"OnError: {ex.Message}"),  // 当发生错误时执行的操作
            () => Console.WriteLine("OnCompleted")  // 当序列完成时执行的操作
        );

        Console.ReadKey();
        
        // 取消订阅
        subscription.Dispose();
    }
}

(2)处理实时数据

代码语言:javascript
复制
    using System;
    using System.Reactive.Linq;
    using System.Reactive.Subjects;

    class Program
    {
        static void Main()
        {
            IObservable<StockPrice> stockPrices = GetRealTimeStockPrices();

            IDisposable subscription = stockPrices
                .Where(price => price.Value > 100)
                .Subscribe(
                    price => Console.WriteLine($"High stock price detected: {price.Symbol} at {price.Value}"),
                    ex => Console.WriteLine($"OnError: {ex.Message}"),
                    () => Console.WriteLine("OnCompleted"));

            Console.Read( );

            // 在某个时刻,你可能想取消订阅
            subscription.Dispose();
        }

        class StockPrice
        {
            public string Symbol { get; set; }
            public double Value { get; set; }
        }

       static  IObservable<StockPrice> GetRealTimeStockPrices()
        {
            // 创建一个 Subject
            var subject = new Subject<StockPrice>();

            // 在后台线程上生成模拟数据
            Task.Run(() =>
            {
                var random = new Random();
                while (true)
                {
                    var price = new StockPrice
                    {
                        Symbol = "MSFT",
                        Value = 90 + (random.NextDouble() * 20)
                    };

                    // 向 Subject 发送新的价格
                    subject.OnNext(price);

                    // 暂停一段时间以模拟实时数据
                    Thread.Sleep(1000);
                }
            });

            // 返回 Observable
            return subject.AsObservable();
        }
    }

Rx还提供了大量的操作符,比如:

  • Filtering: 过滤序列中的元素。比如: Where, Distinct, Skip, Take 等。
  • Transforming: 转换序列中的元素。比如: Select, SelectMany, Scan, Buffer 等。
  • Combining: 组合多个序列。比如: Concat, Merge, Zip, CombineLatest 等。
  • Error Handling: 处理错误。比如: Catch, Retry, OnErrorResumeNext 等。
  • Utility: 其他功能。比如: Using, Delay, TimeInterval, Timeout 答等。

这些操作符可以让你更加方便地处理和操作数据流,满足不同场景需要。

Ref

https://learn.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh242981(v=vs.103)

https://learn.microsoft.com/en-us/dotnet/api/system.iobservable-1?view=net-7.0&devlangs=csharp&f1url=%3FappId%3DDev16IDEF1%26l%3DEN-US%26k%3Dk(System.IObservable%25601)%3Bk(DevLang-csharp)%26rd%3Dtrue

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-09-16 23:24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 JusterZhu 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.概要
  • 2.详细内容
    • 安装
    • 使用
  • Ref
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档