rust 多线程
多线程并发编程
使用多线程
use std::thread;
use std::time::Duration;
fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
}
有几点值得注意:
- 线程内部的代码使用闭包来执行
- main 线程一旦结束,程序就立刻结束,因此需要保持它的存活,直到其它子线程完成自己的任务
- thread::sleep 会让当前线程休眠指定的时间,随后其它线程会被调度运行,因此就算你的电脑只有一个 CPU 核心,该程序也会表现的如同多 CPU 核心一般,这就是并发!
在线程闭包中使用 move
move 关键字在闭包中的使用可以让该闭包拿走环境中某个值的所有权,同样地,你可以使用 move 来将所有权从一个线程转移到另外一个线程。
use std::thread;
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(move || {
println!("Here's a vector: {:?}", v);
});
handle.join().unwrap();
// 下面代码会报错borrow of moved value: `v`
// println!("{:?}",v);
}
线程局部变量(Thread Local Variable)
标准库
use std::cell::RefCell;
use std::thread;
thread_local!(static FOO: RefCell<u32> = RefCell::new(1));
FOO.with(|f| {
assert_eq!(*f.borrow(), 1);
*f.borrow_mut() = 2;
});
// 每个线程开始时都会拿到线程局部变量的FOO的初始值
let t = thread::spawn(move|| {
FOO.with(|f| {
assert_eq!(*f.borrow(), 1);
*f.borrow_mut() = 3;
});
});
// 等待线程完成
t.join().unwrap();
// 尽管子线程中修改为了3,我们在这里依然拥有main线程中的局部值:2
FOO.with(|f| {
assert_eq!(*f.borrow(), 2);
});
第三方库
use thread_local::ThreadLocal;
use std::sync::Arc;
use std::cell::Cell;
use std::thread;
let tls = Arc::new(ThreadLocal::new());
// 创建多个线程
for _ in 0..5 {
let tls2 = tls.clone();
thread::spawn(move || {
// 将计数器加1
let cell = tls2.get_or(|| Cell::new(0));
cell.set(cell.get() + 1);
}).join().unwrap();
}
// 一旦所有子线程结束,收集它们的线程局部变量中的计数器值,然后进行求和
let tls = Arc::try_unwrap(tls).unwrap();
let total = tls.into_iter().fold(0, |x, y| x + y.get());
// 和为5
assert_eq!(total, 5);
线程间的消息传递
use std::sync::mpsc;
use std::thread;
fn main() {
// 创建一个消息通道, 返回一个元组:(发送者,接收者)
let (tx, rx) = mpsc::channel();
// 创建线程,并发送消息
thread::spawn(move || {
// 发送一个数字1, send方法返回Result<T,E>,通过unwrap进行快速错误处理
tx.send(1).unwrap();
// 下面代码将报错,因为编译器自动推导出通道传递的值是i32类型,那么Option<i32>类型将产生不匹配错误
// tx.send(Some(1)).unwrap()
});
// 在主线程中接收子线程发送的消息并输出
println!("receive {}", rx.recv().unwrap());
}
- 若值的类型实现了Copy特征,则直接复制一份该值,然后传输过去,例如之前的i32类型
- 若值没有实现Copy,则它的所有权会被转移给接收端,在发送端继续使用该值将报错
使用多发送者
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
tx.send(String::from("hi from raw tx")).unwrap();
});
thread::spawn(move || {
tx1.send(String::from("hi from cloned tx")).unwrap();
});
for received in rx {
println!("Got: {}", received);
}
}
以上代码并不复杂,但仍有几点需要注意:
- tx,rx对应发送者和接收者,它们的类型由编译器自动推导: tx.send(1)发送了整数,因此它们分别是mpsc::Sender<i32>和mpsc::Receiver<i32>类型,需要注意,由于内部是泛型实现,一旦类型被推导确定,该通道就只能传递对应类型的值, 例如此例中非i32类型的值将导致编译错误
- 接收消息的操作rx.recv()会阻塞当前线程,直到读取到值,或者通道被关闭
- 需要使用move将tx的所有权转移到子线程的闭包中
同步和异步通道
异步通道
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx)= mpsc::channel();
let handle = thread::spawn(move || {
println!("发送之前");
tx.send(1).unwrap();
println!("发送之后");
});
println!("睡眠之前");
thread::sleep(Duration::from_secs(3));
println!("睡眠之后");
println!("receive {}", rx.recv().unwrap());
handle.join().unwrap();
}
同步通道
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// 设置消息队列大小
let (tx, rx)= mpsc::sync_channel(0);
let handle = thread::spawn(move || {
println!("发送之前");
tx.send(1).unwrap();
println!("发送之后");
});
println!("睡眠之前");
thread::sleep(Duration::from_secs(3));
println!("睡眠之后");
println!("receive {}", rx.recv().unwrap());
handle.join().unwrap();
}
传输多种类型的数据
use std::sync::mpsc::{self, Receiver, Sender};
enum Fruit {
Apple(u8),
Orange(String)
}
fn main() {
let (tx, rx): (Sender<Fruit>, Receiver<Fruit>) = mpsc::channel();
tx.send(Fruit::Orange("sweet".to_string())).unwrap();
tx.send(Fruit::Apple(2)).unwrap();
for _ in 0..2 {
match rx.recv().unwrap() {
Fruit::Apple(count) => println!("received {} apples", count),
Fruit::Orange(flavor) => println!("received {} oranges", flavor),
}
}
}
线程同步:锁、Condvar 和信号量
共享内存可以说是同步的灵魂,因为消息传递的底层实际上也是通过共享内存来实现,两者的区别如下:
共享内存相对消息传递能节省多次内存拷贝的成本
共享内存的实现简洁的多
共享内存的锁竞争更多
消息传递适用的场景很多,我们下面列出了几个主要的使用场景:需要可靠和简单的(简单不等于简洁)实现时
需要模拟现实世界,例如用消息去通知某个目标执行相应的操作时
需要一个任务处理流水线(管道)时,等等
互斥锁 Mutex
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
读写锁 RwLock
use std::sync::RwLock;
fn main() {
let lock = RwLock::new(5);
// 同一时间允许多个读
{
let r1 = lock.read().unwrap();
let r2 = lock.read().unwrap();
assert_eq!(*r1, 5);
assert_eq!(*r2, 5);
} // 读锁在此处被drop
// 同一时间只允许一个写
{
let mut w = lock.write().unwrap();
*w += 1;
assert_eq!(*w, 6);
// 以下代码会panic,因为读和写不允许同时存在
// 写锁w直到该语句块结束才被释放,因此下面的读锁依然处于`w`的作用域中
// let r1 = lock.read();
// println!("{:?}",r1);
}// 写锁在此处被drop
}
同时允许多个读,但最多只能有一个写
读和写不能同时存在
读可以使用read、try_read,写write、try_write, 在实际项目中,try_xxx会安全的多
线程同步:Atomic 原子类型与内存顺序
原子指的是一系列不可被 CPU 上下文交换的机器指令,这些指令组合在一起就形成了原子操作。在多核 CPU 下,当某个 CPU 核心开始运行原子操作时,会先暂停其它 CPU 内核对内存的操作,以保证原子操作不会被其它 CPU 内核所干扰。
use std::ops::Sub;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread::{self, JoinHandle};
use std::time::Instant;
const N_TIMES: u64 = 10000000;
const N_THREADS: usize = 10;
static R: AtomicU64 = AtomicU64::new(0);
fn add_n_times(n: u64) -> JoinHandle<()> {
thread::spawn(move || {
for _ in 0..n {
R.fetch_add(1, Ordering::Relaxed);
}
})
}
fn main() {
let s = Instant::now();
let mut threads = Vec::with_capacity(N_THREADS);
for _ in 0..N_THREADS {
threads.push(add_n_times(N_TIMES));
}
for thread in threads {
thread.join().unwrap();
}
assert_eq!(N_TIMES * N_THREADS as u64, R.load(Ordering::Relaxed));
println!("{:?}",Instant::now().sub(s));
}
Atomic 能替代锁吗
对于复杂的场景下,锁的使用简单粗暴,不容易有坑
std::sync::atomic包中仅提供了数值类型的原子操作:AtomicBool, AtomicIsize, AtomicUsize, AtomicI8, AtomicU16等,而锁可以应用于各种类型
在有些情况下,必须使用锁来配合,例如上一章节中使用Mutex配合Condvar
基于 Send 和 Sync 的线程安全
- 点赞
- 收藏
- 关注作者
评论(0)