首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >RxJava之连接操作符介绍

RxJava之连接操作符介绍

作者头像
103style
发布2022-12-19 13:28:35
发布2022-12-19 13:28:35
21400
代码可运行
举报
运行总次数:0
代码可运行

转载请以链接形式标明出处: 本文出自:103style的博客

连接相关的操作符 以及 官方介绍

RxJava 之 连接操作符 官方介绍 :Connectable Observable Operators

  • ConnectableObservable.connect( ) instructs a Connectable Observable to begin emitting items 指示Connectable Observable开始发出项目
  • Observable.publish( ) represents an Observable as a Connectable Observable 将Observable表示为可连接的Observable
  • Observable.replay( ) ensures that all Subscribers see the same sequence of emitted items, even if they subscribe after the Observable begins emitting the items 确保所有订阅者都看到相同的发射项目序列,即使他们在Observable开始发布项目后订阅
  • ConnectableObservable.refCount( ) makes a Connectable Observable behave like an ordinary Observable 使Connectable Observable的行为类似于普通的Observable

示例:

非连接操作
代码语言:javascript
代码运行次数:0
运行
复制
ConnectableObservable firstMillion = Observable.range(1, 1000000)
        .sample(7, TimeUnit.MILLISECONDS)
        .publish();

firstMillion.subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #1:" + it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: " + it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #1 complete");
    }
});

firstMillion.subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #2:" + it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: " + it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #2 complete");
    }
});

输出:

代码语言:javascript
代码运行次数:0
运行
复制
Subscriber #1:391999
Sequence #1 complete
Subscriber #2:556663
Sequence #2 complete

publish and connect

官方示例:

代码语言:javascript
代码运行次数:0
运行
复制
ConnectableObservable firstMillion = Observable.range(1, 1000000)
        .sample(7, TimeUnit.MILLISECONDS)
        .publish();

firstMillion.subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #1:" + it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: " + it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #1 complete");
    }
});

firstMillion.subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #2:" + it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: " + it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #2 complete");
    }
});

firstMillion.connect();

输出:

代码语言:javascript
代码运行次数:0
运行
复制
Subscriber #1:984513
Subscriber #2:984513
Sequence #1 complete
Sequence #2 complete

publish and refCount

官方示例:

代码语言:javascript
代码运行次数:0
运行
复制
ConnectableObservable firstMillion = Observable.range(1, 1000000)
        .sample(7, TimeUnit.MILLISECONDS)
        .publish();

firstMillion.refCount().subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #1:" + it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: " + it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #1 complete");
    }
});

firstMillion.refCount().subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #2:" + it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: " + it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #2 complete");
    }
});

输出:

代码语言:javascript
代码运行次数:0
运行
复制
Subscriber #1:438899
Sequence #1 complete
Subscriber #2:684698
Sequence #2 complete

以上

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-06-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 连接相关的操作符 以及 官方介绍
  • 示例:
    • 非连接操作
    • publish and connect
    • publish and refCount
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档