前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Rust中channel的使用

Rust中channel的使用

作者头像
fliter
发布于 2024-03-07 05:28:45
发布于 2024-03-07 05:28:45
34900
代码可运行
举报
文章被收录于专栏:旅途散记旅途散记
运行总次数:0
代码可运行

关于Rust中的channel

Rust的channel是一种用于在不同线程间传递信息的通信机制,它实现了线程间的消息传递。

Channel允许在Rust中创建一个消息传递渠道,它返回一个元组结构体,其中包含发送和接收端。发送端用于向通道发送数据,而接收端则用于从通道接收数据。

每个channel由两部分组成:发送端(Sender)和接收端(Receiver)。

发送端用于向channel发送消息,而接收端则用于接收这些消息。这种机制允许线程之间的安全通信,避免了共享内存的复杂性和潜在的数据竞争问题。 (通过通信来共享内存,而非通过共享内存来通信)

Rust的channel为线程间通信提供了一种安全、简单的方式,是构建并发应用的基础工具之一。

channel是Rust标准库的一部分,自Rust 1.0版本以来就包含了这个功能。随着Rust语言和标准库的发展,channel的实现和API可能会有所改进,但其基本概念和用法保持一致。

使用方式

基本步骤如下:

  1. 创建: 使用std::sync::mpsc::channel()函数创建一个新的channel,这个函数返回一个包含发送端(Sender)和接收端(Receiver)的元组。
  2. 发送: 使用发送端的send方法发送消息。send方法接受一个消息值,如果接收端已经被丢弃,会返回一个错误。
  3. 接收: 使用接收端的recv方法接收消息。recv会阻塞当前线程直到一个消息可用,或者channel被关闭。

示例

以下是一个使用channel在两个线程间发送和接收消息的简单例子:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
use std::sync::mpsc;
use std::thread;

fn main() {
    // 创建一个channel
    let (tx, rx) = mpsc::channel();

    // 创建一个新线程,并向其中发送一个消息
    thread::spawn(move || {
        let msg = "Hello from the thread";
        tx.send(msg).unwrap();
        println!("Sent message: {}", msg);
    });

    // 在主线程中接收消息
    let received = rx.recv().unwrap();
    println!("Received message: {}", received);
}

上面例子展示了channel的基本方法:先创建一个channel,然后在一个新线程中发送一个字符串消息,并在主线程中接收这个消息。

注意: 发送端tx通过move关键字移动到新线程中,这是因为Rust的所有权规则要求确保使用数据的线程拥有该数据的所有权。

关于MPSC

其中mpsc是Multi producer, Single consumer FIFO queue的缩写,即多生产者单消费者先入先出队列

Rust标准库提供的channel是MPSC(多生产者,单消费者)模型,这意味着可以有多个发送端(Sender)向同一个接收端(Receiver)发送消息。这种模式非常适用于工作队列模型,其中多个生产者线程生成任务,而单个消费者线程处理这些任务。

除了MPSC之外,还有如下几种模型:

  • SPSC(Single Producer Single Consumer):单生产者单消费者。
  • SPMC(Single Producer Multiple Consumer):单生产者多消费者。
  • MPSC(Multi Producer Single Consumer):多生产者单消费者, Rust中标准的mpsc模型。
  • MPMC(Multi Producer Multi Consumer)*:多生产者多消费者。

MPSC是标准库中使用的模型

不需要阻塞吗?

主线程是否会立马结束退出程序?

在上面的示例中,如果主线程执行得太快,有可能在接收到 子线程发送消息之前就结束了,没打印出接收到的内容程序就退出了.

但事实上,并没有发生这种现象. 即便在新进程段添加休眠3s的代码,thread::sleep(std::time::Duration::from_secs(3));, 程序也不会提早退出.

关于Rust中程序的休眠,可参考Rust中程序休眠的几种方式

这是因为,recv方法是阻塞的,即 它会阻塞当前线程, 直到从通道中接收到消息。

因此,在上面例子中,主线程在调用rx.recv().unwrap()时会阻塞 等待消息的到来。一旦子线程通过tx.send(msg).unwrap();发送了消息,主线程会接收到这个消息并继续执行,之后程序才会正常退出。

探索更多阻塞方式

可以使用join方法,来确保主线程等待一个或多个子线程完成执行。这在处理多个线程时特别有用。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    // 创建一个新线程,并保存其句柄
    let handle = thread::spawn(move || {
        let msg = "Hello from the thread";
        tx.send(msg).unwrap();
        println!("Sent message: {}", msg);
    });

    // 在主线程中接收消息
    let received = rx.recv().unwrap();
    println!("Received message: {}", received);

    // 使用join等待子线程完成
    handle.join().unwrap();
}

thread::spawn返回一个JoinHandle,通过调用这个句柄的join方法来确保主线程在子线程完成其执行之后才继续执行

但是因为recv方法本身就是阻塞的,已经确保了主线程会等待至少一个消息的到来,这时再使用join看起来没有太大必要。

但当有多个线程执行独立任务,且这些任务不一定涉及到主线程立即需要的通道通信时,join的作用就变得非常明显了, 如下示例展示了如何创建多个线程,并使用join确保它们都完成了工作:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
use std::thread;
use std::time::Duration;

fn main() {
    // 创建一个向量来存储子线程的句柄
    let mut handles = vec![];

    for i in 0..10 {
        // 创建10个子线程
        let handle = thread::spawn(move || {
            println!("Thread {} is starting", i);
            println!("--------------");
            // 模拟工作负载,耗时1s
            thread::sleep(Duration::from_secs(1));
            println!("Thread {} has finished", i);
            println!("~~~~~~~~~~~~~~");
        });
        handles.push(handle);
    }


    // 等待所有子线程完成
    for handle in handles {
        handle.join().unwrap();
    }

    println!("All threads have finished");
}

输出:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Thread 0 is starting
--------------
Thread 1 is starting
--------------
Thread 3 is starting
--------------
Thread 2 is starting
--------------
Thread 4 is starting
--------------
Thread 5 is starting
--------------
Thread 6 is starting
--------------
Thread 7 is starting
--------------
Thread 9 is starting
--------------
Thread 8 is starting
-------------- (到此都是立刻打印出来; 下面的输出等1s后一股脑打印出来)
Thread 0 has finished
~~~~~~~~~~~~~~
Thread 1 has finished
Thread 2 has finished
Thread 5 has finished
~~~~~~~~~~~~~~
~~~~~~~~~~~~~~
Thread 4 has finished
~~~~~~~~~~~~~~
Thread 6 has finished
~~~~~~~~~~~~~~
Thread 3 has finished
~~~~~~~~~~~~~~
~~~~~~~~~~~~~~
Thread 7 has finished
~~~~~~~~~~~~~~
Thread 8 has finished
~~~~~~~~~~~~~~
Thread 9 has finished
~~~~~~~~~~~~~~
All threads have finished

在这个例子中创建了10个子线程,每个子线程都模拟执行一些操作,然后在主线程中使用一个循环来join这些线程。

通过这种方式,即使这些子线程并没有向主线程发送任何消息,仍然能够确保它们都完成了各自的工作,然后程序才会退出。这就是join在处理多个线程时的优势所在。

使用join确保主线程等待所有子线程完成其任务,这在处理并行计算、执行多个独立任务时特别重要,因为这些任务可能不会立即或根本不会向主线程报告其完成状态。在这种情况下,如果没有使用join,主线程可能会在子线程完成它们的工作之前结束,导致程序提前退出,而且可能留下未完成的后台工作。

Rust channel的更多高阶用法

Rust中的channel不仅仅支持简单的消息传递,还可以用于实现更复杂的并发模式和高级用法。这些用法可以增加程序的灵活性和性能,特别是在处理大量数据、多线程任务或需要高度并行的场景中。

选择性接收(Select)

在处理多个channel时,可能希望能够选择性地接收多个来源的消息。

Rust的标准库目前并没有直接支持select机制,但是crossbeam-channel库提供了这样的功能,使得可以从多个channel中选择性地接收消息。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
use crossbeam_channel::{select, unbounded};
use std::thread;

fn main() {
    let (tx1, rx1) = unbounded();
    let (tx2, rx2) = unbounded();

    thread::spawn(move || {
        tx1.send(1).unwrap();
    });

    thread::spawn(move || {
        tx2.send(2).unwrap();
    });

    select! {
        recv(rx1) -> msg => println!("Received {} from rx1", msg.unwrap()),
        recv(rx2) -> msg => println!("Received {} from rx2", msg.unwrap()),
    }
}

cargo add crossbeam_channel 添加依赖库,

而后多次 cargo run, 可以发现,会在Received 1 from rx1和Received 2 from rx2中随机打印其中一个

如上代码演示了如何在Rust中使用crossbeam-channel库实现选择性接收(select)机制。该机制允许程序从多个不同的channel中接收消息,而不是被限制在单一的channel上等待。这是通过select!宏来实现的,它可以监听多个channel,并在任一channel接收到消息时立即响应。

具体来说,代码的功能如下:

  1. 引入库:首先,引入了crossbeam_channelselectunbounded,以及std::threadcrossbeam_channel是一个提供了高性能channel实现的外部库,包括了select机制。unbounded用于创建一个无界(unbounded)的channel,即没有容量限制的channel。
  2. 创建无界channel:通过调用unbounded()函数,创建了两个无界channel,分别是tx1/rx1tx2/rx2。这里,tx1tx2是发送端(Sender),而rx1rx2是接收端(Receiver)。
  3. 发送消息:接下来,创建了两个线程,每个线程向各自的channel发送一个整数消息,第一个线程通过tx1发送1,第二个线程通过tx2发送2。这两个线程是并行执行的,因此发送操作是异步的。
  4. 选择性接收消息select!宏用于同时监听rx1rx2这两个接收端。当任一channel接收到消息时,select!宏会立即匹配到相应的分支并执行。这里有两个recv调用,分别对应两个接收端。一旦任一接收端接收到消息,对应的代码块就会执行,并打印出接收到的消息及其来源。msg.unwrap()用于获取Result类型中的消息值,前提是没有发生错误。

代码中的select!宏使得程序不必在单一的channel上阻塞等待,而是可以灵活地处理来自多个源的消息。这种模式在需要处理多个异步事件源时非常有用,例如在网络服务器或并发系统中处理来自不同客户端或任务的输入。

有点类似Go的select语句

迭代器接收

Receiver实现了Iterator,这意味着可以使用迭代器的方式接收所有可用的消息,直到channel被关闭。这种方式简化了接收端的代码,特别是当需要处理所有消息而不必关心接收的具体时机时。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        for i in 1..=5 {
            tx.send(i).unwrap();
        }
    });

    // 通过迭代器接收消息
    for received in rx {
        println!("Received: {}", received);
    }
}

输出:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-03-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 旅途散记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
【crossbeam系列】4 crossbeam-channel:加强版channel
这一期的内容会轻松一些,讲讲crossbeam中的channel。可是有人就要问了在标准库里面已经有了std::sync::mpsc,为什么crossbeam又要搞出一套channel呢?首先我们来看看标准库中的channel有哪些不足吧
MikeLoveRust
2020/09/23
3K0
【Rust 基础篇】Rust 通道(Channel)
在 Rust 中,通道(Channel)是一种用于在多个线程之间传递数据的并发原语。通道提供了一种安全且高效的方式,允许线程之间进行通信和同步。本篇博客将详细介绍 Rust 中通道的使用方法,包含代码示例和对定义的详细解释。
繁依Fanyi
2023/10/12
4200
29.Rust-线程
thread::sleep() 会让线程睡眠一段时间,某个线程睡眠的时候会让出 CPU,可以让不同的线程交替执行,要看操作系统如何调度线程。
面向加薪学习
2022/06/30
2470
Rust入坑指南:齐头并进(下)
前文中我们聊了Rust如何管理线程以及如何利用Rust中的锁进行编程。今天我们继续学习并发编程。
Jackeyzhe
2020/03/25
8750
Rust异步编程之Future并发处理
上篇文章我们知道,Rust的Future是异步执行,await时是阻塞在当前的异步任务task上,直到完成。
newbmiao
2024/01/11
5190
Rust异步编程之Future并发处理
【Rust 基础篇】Rust 通道实现单个消费者多个生产者模式
在 Rust 中,我们可以使用通道(Channel)来实现单个消费者多个生产者模式,简称为 MPMC。MPMC 是一种常见的并发模式,适用于多个线程同时向一个通道发送数据,而另一个线程从通道中消费数据的场景。本篇博客将详细介绍 Rust 中单个消费者多个生产者模式的实现方法,包含代码示例和对定义的详细解释。
繁依Fanyi
2023/10/12
5510
C 和 Java 没那么香了,Serverless 时代 Rust 即将称王?
作者 | 马超       责编 | 张红月 出品 | CSDN博客 Serverless的核心理念就是函数式计算,开发者无须再关注具体的模块,云上部署的粒度变成了程序函数,自动伸缩、扩容等工作完全由云服务负责。 Serverless Computing,即”无服务器计算”,其实这一概念在刚刚提出的时候并没有获得太多的关注,直到2014年AWS Lambda这一里程碑式的产品出现。Serverless算是正式走进了云计算的舞台。2018年5月,Google在KubeCon+CloudNative 201
博文视点Broadview
2023/05/06
2470
C 和 Java 没那么香了,Serverless 时代 Rust 即将称王?
【翻译】从头实现Rust异步执行器
原文:https://stjepang.github.io/2020/01/31/build-your-own-executor.html 现在我们已经构建了block_on函数,是时候进一步将其转换为一个真正的执行器了。我们希望我们的遗执行器不只是一次运行一个future,而是同时运行多个future!
MikeLoveRust
2020/07/23
9210
Rust并发控制之Channel
Rust 官方sync包中提供了mpsc模式的 (多生产者,单消费者:multi-producer, single-consumer) channel,可以实现基于消息并发控制,而不是依赖控制内存共享(加锁)。这正是 go 语言作者 R. Pike 所推崇的方式:
newbmiao
2023/12/13
3590
Rust并发控制之Channel
透过 Rust 探索系统的本原:并发原语
几周前我写了篇关于并发的文章(透过 rust 探索系统的本原:并发篇),从使用者的角度介绍了常用的处理并发的工具:Mutex / RwLock / Channel,以及 async/await。今天我们讲讲这些并发手段背后的原语。这些原语,大家在操作系统课程时大多学过,但如果不是做一些底层的开发,估计大家都不记得了。今天,我们就来简单聊聊这些基础的并发原语,了解它们的差异,明白它们使用的场景,对撰写高性能的并发应用有很大的帮助。
tyrchen
2021/04/07
1.2K0
透过 Rust 探索系统的本原:并发原语
rust多线程
在rust中,多线程编程不算困难,但是也需要留心和别的编程语言中不同的地方。rust的标准库中提供的thread库来帮助我们进行多线程编程。在使用的时候需要使用use std::thread来引入thread库即可。
zy010101
2023/05/28
1K0
【Rust 日报】2021-03-25 linux-next的rust-next分支被合并了!
Github: https://github.com/zesterer/flume
MikeLoveRust
2021/04/22
6820
Rust Druid 之Selector选择器使用
Druid 内部也是基于事件循环的,当程序调用 AppLauncher::launch() 方法时,程序进入事件循环。在事件循环中,窗体间的消息传递是使用Selector来进行。
8菠萝
2021/05/24
1.1K0
rust的并发编程
并发的方式 多进程 多线程 协程 多线程遇到的问题 数据竞争 内存不安全和未定义的行为 常用的两种线程模型(rust都支持) 锁管理临界区 消息通信 rust的并发 通过后thread::spawn关键字 自定义线程通过Builder::new 线程从并发模型 数据共享 Rrc实现变量-可以读,但是没法修改 互斥mutex。 arc和mutex。共享变量 支持读写锁RwLock 通过消息通信 mpse模块 channel和sync_channel rust中的线程安全 parking_lot检查死锁 保证安
李子健
2022/05/14
4430
【Rust日报】2019-12-19 Writing BPF code in Rust
Includes new APIs, utilities, and fixes. Some highlights:
MikeLoveRust
2019/12/25
6630
【Rust日报】2019-12-19 Writing BPF code in Rust
RUST练习生如何在生产环境构建万亿流量|得物技术
在《得物新一代可观测性架构:海量数据下的存算分离设计与实践》一文中,我们探讨了存算分离架构如何通过解耦计算与存储资源,显著降低存储成本并提升系统扩展性。然而,仅优化存储成本不足以支撑高效可观测性系统的全局目标。在生产环境中,计算层作为可观测性体系的核心模块,需在处理日益复杂和动态的大流量数据时,保持高性能、强稳定性与优异的资源利用效率。
得物技术
2025/01/21
1630
RUST练习生如何在生产环境构建万亿流量|得物技术
Rust常用并发示例代码
如果method1()被多次调用,就会创建多个线程,如果希望不管调用多少次,只能有1个线程,在不使用线程池的前提下,有1个简单的办法:
菩提树下的杨过
2022/09/28
1K0
Rust常用并发示例代码
Rust学习笔记之并发
今天,我们继续「Rust学习笔记」的探索。我们来谈谈关于「Rust学习笔记之并发」的相关知识点。
前端柒八九
2023/08/01
2990
Rust学习笔记之并发
Rust高并发编程总结
Serverless的概念火了,业界已经不再讨论要不要用Serverless的问题了,而是高喊Serverless First的口号力求快速拥抱Serverless,无服务器并不是Serverless的本质,不需要关心服务器的情况就能高效工作,才是Serverless胜出的核心要义。互联网时代流量的大起大落,很多科技巨头在面对流量的冲击时也都败下阵来,针对前几个月B站的崩溃事件,笔者还曾写过《B站的前端崩了,后端的你别慌》来分析来龙去脉,而Serverless凭借快速伸缩的自动弹性特点,可以从容应对类似的冲击,这也让这种新技术出尽的风头。
beyondma
2021/10/01
1.3K0
Rust 中的 QUIC 实现 --- quinn
QUIC 是基于 UDP 的多路复用、安全传输协议。可以简单理解为在用户空间将 TCP 里的机制实现了一遍,比如拥塞控制、流量控制等。好处是升级比较方便,TCP 协议栈是内核中实现的,只能随内核升级,而 QUIC 可灵活升级。
谛听
2022/01/30
4.2K0
相关推荐
【crossbeam系列】4 crossbeam-channel:加强版channel
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档