目录
- Rust 多线程基础
- 同步线程编程
- 基本线程创建
- 线程间通信
- 共享状态
- 线程返回值
- 线程池
- 异步线程编程
- Tokio 异步运行时
- 异步任务
- 异步通道
- 异步共享状态
- 线程安全
- 所有权与借用
- 同步原语
- Send 和 Sync trait
- 性能优化
- 线程数量
- 避免竞争
- 异步 vs 同步
- 最佳实践
- 完整代码示例
- 总结
Rust 多线程基础
Rust 的多线程编程建立在标准库的 std::thread 模块之上。与其他语言不同,Rust 通过其所有权系统和类型系统来保证线程安全,避免了常见的并发问题如数据竞争。
核心概念
- 线程:操作系统级别的执行单元
- 所有权:Rust 的核心概念,确保每个值只有一个所有者
- 借用:临时访问他人拥有的值
- 同步原语:如
Mutex、Arc等,用于线程间协调 - 通道:用于线程间安全通信
同步线程编程
基本线程创建
使用 thread::spawn 函数创建新线程:
1use std::thread; 2use std::time::Duration; 3 4fn main() { 5 println!("主线程开始"); 6 7 // 创建新线程 8 let handle = thread::spawn(|| { 9 for i in 1..=5 { 10 println!("子线程: {}", i); 11 thread::sleep(Duration::from_millis(100)); 12 } 13 }); 14 15 // 主线程继续执行 16 for i in 1..=3 { 17 println!("主线程: {}", i); 18 thread::sleep(Duration::from_millis(150)); 19 } 20 21 // 等待子线程完成 22 handle.join().unwrap(); 23 24 println!("主线程结束"); 25} 26
运行结果:
1主线程开始 2主线程: 1 3子线程: 1 4子线程: 2 5主线程: 2 6子线程: 3 7子线程: 4 8主线程: 3 9子线程: 5 10主线程结束 11
线程间通信
使用通道(channel)实现线程间通信:
1use std::thread; 2use std::sync::mpsc; 3use std::time::Duration; 4 5fn main() { 6 // 创建通道 7 let (tx, rx) = mpsc::channel(); 8 9 // 发送线程 10 thread::spawn(move || { 11 let messages = ["Hello", "from", "the", "spawned", "thread"]; 12 for msg in messages { 13 tx.send(msg).unwrap(); 14 thread::sleep(Duration::from_millis(200)); 15 } 16 }); 17 18 // 接收线程(主线程) 19 println!("等待接收消息..."); 20 for received in rx { 21 println!("接收到: {}", received); 22 } 23 println!("所有消息接收完毕"); 24} 25
运行结果:
1等待接收消息... 2接收到: Hello 3接收到: from 4接收到: the 5接收到: spawned 6接收到: thread 7所有消息接收完毕 8
共享状态
使用 Arc 和 Mutex 实现线程安全的共享状态:
1use std::thread; 2use std::sync::{Arc, Mutex}; 3use std::time::Duration; 4 5fn main() { 6 // 创建共享计数器 7 let counter = Arc::new(Mutex::new(0)); 8 let mut handles = vec![]; 9 10 // 创建 5 个线程 11 for i in 0..5 { 12 let counter = Arc::clone(&counter); 13 let handle = thread::spawn(move || { 14 for _ in 0..10 { 15 let mut num = counter.lock().unwrap(); 16 *num += 1; 17 println!("线程 {}: 计数器 = {}", i, *num); 18 thread::sleep(Duration::from_millis(50)); 19 } 20 }); 21 handles.push(handle); 22 } 23 24 // 等待所有线程完成 25 for handle in handles { 26 handle.join().unwrap(); 27 } 28 29 println!("最终计数器值: {}", *counter.lock().unwrap()); 30} 31
运行结果:
1线程 0: 计数器 = 1 2线程 0: 计数器 = 2 3... 4线程 4: 计数器 = 50 5最终计数器值: 50 6
线程返回值
从线程中返回计算结果:
1use std::thread; 2 3fn main() { 4 let handle = thread::spawn(|| { 5 println!("子线程开始计算"); 6 let mut sum = 0; 7 for i in 1..=100 { 8 sum += i; 9 } 10 println!("子线程计算完成"); 11 sum // 返回值 12 }); 13 14 println!("主线程等待结果..."); 15 let result = handle.join().unwrap(); 16 println!("1 到 100 的和: {}", result); 17} 18
运行结果:
1主线程等待结果... 2子线程开始计算 3子线程计算完成 41 到 100 的和: 5050 5
线程池
创建简单的线程池处理任务:
1use std::thread; 2use std::time::Duration; 3 4fn main() { 5 let jobs = vec![1, 2, 3, 4, 5]; 6 let mut handles = vec![]; 7 8 println!("开始处理任务..."); 9 for job in jobs { 10 let handle = thread::spawn(move || { 11 println!("处理任务 {}", job); 12 thread::sleep(Duration::from_millis(300)); 13 println!("任务 {} 完成", job); 14 job * 2 // 返回处理结果 15 }); 16 handles.push(handle); 17 } 18 19 // 收集所有线程的结果 20 let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect(); 21 println!("所有任务完成。结果: {:?}", results); 22} 23
运行结果:
1开始处理任务... 2处理任务 1 3处理任务 2 4处理任务 3 5处理任务 4 6处理任务 5 7任务 1 完成 8任务 2 完成 9任务 3 完成 10任务 4 完成 11任务 5 完成 12所有任务完成。结果: [2, 4, 6, 8, 10] 13
异步线程编程
Tokio 异步运行时
Rust 的异步编程需要一个异步运行时,最常用的是 Tokio:
添加依赖:
1[dependencies] 2tokio = { version = "1.36.0", features = ["full"] } 3
异步任务
创建和管理异步任务:
1use tokio; 2 3#[tokio::main] 4async fn main() { 5 println!("异步主线程开始"); 6 7 // 创建异步任务 8 let task1 = tokio::spawn(async { 9 println!("异步任务 1 开始"); 10 tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; 11 println!("异步任务 1 完成"); 12 100 13 }); 14 15 let task2 = tokio::spawn(async { 16 println!("异步任务 2 开始"); 17 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; 18 println!("异步任务 2 完成"); 19 200 20 }); 21 22 // 等待所有异步任务完成 23 let (result1, result2) = tokio::join!(task1, task2); 24 println!("任务结果: {}, {}", result1.unwrap(), result2.unwrap()); 25 26 println!("异步主线程结束"); 27} 28
运行结果:
1异步主线程开始 2异步任务 1 开始 3异步任务 2 开始 4异步任务 2 完成 5异步任务 1 完成 6任务结果: 100, 200 7异步主线程结束 8
异步通道
异步线程间的消息传递:
1use tokio; 2use tokio::sync::mpsc; 3 4#[tokio::main] 5async fn main() { 6 // 创建异步通道 7 let (tx, mut rx) = mpsc::channel(32); 8 9 // 发送任务 10 tokio::spawn(async move { 11 for i in 1..=5 { 12 tx.send(i).await.unwrap(); 13 println!("发送: {}", i); 14 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; 15 } 16 }); 17 18 // 接收任务 19 println!("等待接收消息..."); 20 while let Some(msg) = rx.recv().await { 21 println!("接收: {}", msg); 22 } 23 println!("所有消息接收完毕"); 24} 25
运行结果:
1等待接收消息... 2发送: 1 3接收: 1 4发送: 2 5接收: 2 6发送: 3 7接收: 3 8发送: 4 9接收: 4 10发送: 5 11接收: 5 12所有消息接收完毕 13
异步共享状态
使用 Tokio 的 Mutex 实现异步共享状态:
1use tokio; 2use std::sync::Arc; 3use tokio::sync::Mutex; 4 5#[tokio::main] 6async fn main() { 7 // 创建异步共享计数器 8 let counter = Arc::new(Mutex::new(0)); 9 let mut handles = vec![]; 10 11 // 创建 3 个异步任务 12 for i in 0..3 { 13 let counter = Arc::clone(&counter); 14 let handle = tokio::spawn(async move { 15 for _ in 0..5 { 16 let mut num = counter.lock().await; 17 *num += 1; 18 println!("异步任务 {}: 计数器 = {}", i, *num); 19 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; 20 } 21 }); 22 handles.push(handle); 23 } 24 25 // 等待所有异步任务完成 26 for handle in handles { 27 handle.await.unwrap(); 28 } 29 30 println!("最终计数器值: {}", *counter.lock().await); 31} 32
运行结果:
1异步任务 0: 计数器 = 1 2异步任务 1: 计数器 = 2 3异步任务 2: 计数器 = 3 4异步任务 0: 计数器 = 4 5异步任务 1: 计数器 = 5 6异步任务 2: 计数器 = 6 7异步任务 0: 计数器 = 7 8异步任务 1: 计数器 = 8 9异步任务 2: 计数器 = 9 10异步任务 0: 计数器 = 10 11异步任务 1: 计数器 = 11 12异步任务 2: 计数器 = 12 13异步任务 0: 计数器 = 13 14异步任务 1: 计数器 = 14 15异步任务 2: 计数器 = 15 16最终计数器值: 15 17
线程安全
所有权与借用
Rust 的所有权系统是线程安全的基础:
- 所有权:每个值只能有一个所有者
- 借用:可以有多个不可变借用或一个可变借用
- 生命周期:确保引用在所有者有效的期间内有效
同步原语
Rust 提供了多种同步原语:
- Mutex:互斥锁,确保同一时间只有一个线程可以访问数据
- RwLock:读写锁,允许多个读操作或一个写操作
- Arc:原子引用计数,用于在多个线程间共享所有权
- Condvar:条件变量,用于线程间的信号通知
- Barrier:屏障,用于同步多个线程的执行
Send 和 Sync trait
- Send:标记类型可以安全地在线程间转移所有权
- Sync:标记类型可以安全地在线程间共享引用
Rust 的大多数类型都自动实现了这些 trait,但有些类型(如 Rc、RefCell)没有实现,因为它们不是线程安全的。
性能优化
线程数量
- CPU 密集型任务:线程数通常设置为 CPU 核心数
- I/O 密集型任务:线程数可以大于 CPU 核心数
- 异步任务:可以创建大量轻量级的异步任务
避免竞争
- 减少锁的范围:只在必要时持有锁
- 使用无锁数据结构:如
AtomicUsize - 使用通道:优先使用通道进行线程间通信,而不是共享状态
异步 vs 同步
- 同步线程:适合 CPU 密集型任务,每个线程对应一个操作系统线程
- 异步任务:适合 I/O 密集型任务,多个任务可以在同一个操作系统线程上运行
最佳实践
- 优先使用通道:通过消息传递进行线程间通信
- 最小化共享状态:减少线程间的依赖
- 使用 Arc<Mutex>:需要共享可变状态时的标准做法
- 异步编程:I/O 密集型任务优先使用异步
- 错误处理:妥善处理线程中的错误
- 资源管理:确保线程正确释放资源
完整代码示例
以下是一个完整的 Rust 多线程示例,包含了本文介绍的所有功能:
1use std::thread; 2use std::time::Duration; 3use std::sync::{mpsc, Arc, Mutex}; 4use tokio; 5 6// 基本线程示例 7struct ThreadData { 8 name: String, 9} 10 11impl ThreadData { 12 fn new(name: String) -> Self { 13 println!("ThreadData created with name: {}", name); 14 ThreadData { name } 15 } 16 17 fn thread_function(&self) { 18 for i in 1..=5 { 19 println!("[Basic Thread] thread name: {} spawned thread: {}", self.name, i); 20 thread::sleep(Duration::from_millis(100)); 21 } 22 } 23} 24 25// 线程间通信示例 26fn channel_example() { 27 println!("\n=== Channel Example ==="); 28 let (tx, rx) = mpsc::channel(); 29 30 thread::spawn(move || { 31 let messages = ["Hello", "from", "the", "spawned", "thread"]; 32 for msg in messages { 33 tx.send(msg).unwrap(); 34 thread::sleep(Duration::from_millis(200)); 35 } 36 }); 37 38 for received in rx { 39 println!("[Channel] Received: {}", received); 40 } 41} 42 43// 共享状态示例 44fn shared_state_example() { 45 println!("\n=== Shared State Example ==="); 46 let counter = Arc::new(Mutex::new(0)); 47 let mut handles = vec![]; 48 49 for i in 0..5 { 50 let counter = Arc::clone(&counter); 51 let handle = thread::spawn(move || { 52 for _ in 0..10 { 53 let mut num = counter.lock().unwrap(); 54 *num += 1; 55 println!("[Shared State] Thread {}: Counter = {}", i, *num); 56 thread::sleep(Duration::from_millis(50)); 57 } 58 }); 59 handles.push(handle); 60 } 61 62 for handle in handles { 63 handle.join().unwrap(); 64 } 65 66 println!("[Shared State] Final counter value: {}", *counter.lock().unwrap()); 67} 68 69// 线程返回值示例 70fn thread_return_example() { 71 println!("\n=== Thread Return Example ==="); 72 let handle = thread::spawn(move || { 73 let mut sum = 0; 74 for i in 1..=100 { 75 sum += i; 76 } 77 sum 78 }); 79 80 let result = handle.join().unwrap(); 81 println!("[Thread Return] Sum from 1 to 100: {}", result); 82} 83 84// 线程池示例 85fn thread_pool_example() { 86 println!("\n=== Thread Pool Example ==="); 87 let jobs = vec![1, 2, 3, 4, 5]; 88 let mut handles = vec![]; 89 90 for job in jobs { 91 let handle = thread::spawn(move || { 92 println!("[Thread Pool] Processing job {}", job); 93 thread::sleep(Duration::from_millis(300)); 94 println!("[Thread Pool] Job {} completed", job); 95 job * 2 96 }); 97 handles.push(handle); 98 } 99 100 let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect(); 101 println!("[Thread Pool] All jobs completed. Results: {:?}", results); 102} 103 104// 异步线程示例 105async fn async_example() { 106 println!("\n=== Async Thread Example ==="); 107 108 let task1 = tokio::spawn(async { 109 println!("[Async] Task 1 started"); 110 tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; 111 println!("[Async] Task 1 completed"); 112 100 113 }); 114 115 let task2 = tokio::spawn(async { 116 println!("[Async] Task 2 started"); 117 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; 118 println!("[Async] Task 2 completed"); 119 200 120 }); 121 122 let (result1, result2) = tokio::join!(task1, task2); 123 println!("[Async] Task results: {}, {}", result1.unwrap(), result2.unwrap()); 124 125 // 异步通道示例 126 println!("\n=== Async Channel Example ==="); 127 let (tx, mut rx) = tokio::sync::mpsc::channel(32); 128 129 tokio::spawn(async move { 130 for i in 1..=5 { 131 tx.send(i).await.unwrap(); 132 println!("[Async Channel] Sent: {}", i); 133 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; 134 } 135 }); 136 137 while let Some(msg) = rx.recv().await { 138 println!("[Async Channel] Received: {}", msg); 139 } 140 141 // 异步共享状态示例 142 println!("\n=== Async Shared State Example ==="); 143 let counter = Arc::new(tokio::sync::Mutex::new(0)); 144 let mut handles = vec![]; 145 146 for i in 0..3 { 147 let counter = Arc::clone(&counter); 148 let handle = tokio::spawn(async move { 149 for _ in 0..5 { 150 let mut num = counter.lock().await; 151 *num += 1; 152 println!("[Async Shared] Task {}: Counter = {}", i, *num); 153 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; 154 } 155 }); 156 handles.push(handle); 157 } 158 159 for handle in handles { 160 handle.await.unwrap(); 161 } 162 163 println!("[Async Shared] Final counter value: {}", *counter.lock().await); 164} 165 166fn main() { 167 println!("=== Rust Multi-threading Examples ==="); 168 169 // 1. 基本线程示例 170 println!("\n=== Basic Thread Example ==="); 171 let x = 10; 172 173 println!("Creating ThreadData instances..."); 174 let thread_data1 = ThreadData::new("Thread 1".to_string()); 175 let thread_data2 = ThreadData::new("Thread 2".to_string()); 176 177 println!("Spawning threads..."); 178 let handler = thread::spawn(move || { 179 thread_data1.thread_function(); 180 }); 181 182 let handler2 = thread::spawn(move || { 183 thread_data2.thread_function(); 184 }); 185 186 println!("x = {}", x); 187 handler.join().unwrap(); 188 handler2.join().unwrap(); 189 190 // 2. 线程间通信示例 191 channel_example(); 192 193 // 3. 共享状态示例 194 shared_state_example(); 195 196 // 4. 线程返回值示例 197 thread_return_example(); 198 199 // 5. 线程池示例 200 thread_pool_example(); 201 202 // 6. 异步线程示例 203 println!("\n=== Running Async Examples ==="); 204 tokio::runtime::Builder::new_multi_thread() 205 .worker_threads(4) 206 .enable_all() 207 .build() 208 .unwrap() 209 .block_on(async_example()); 210 211 println!("\n=== All Examples Complete! ==="); 212} 213
总结
Rust 提供了强大而安全的多线程编程支持,通过其所有权系统和类型安全特性,使得多线程编程更加可靠。本文介绍了 Rust 中的各种线程使用方法,包括:
- 同步线程:基本线程创建、线程间通信、共享状态、线程返回值和线程池
- 异步线程:基于 Tokio 的异步任务、异步通道和异步共享状态
- 线程安全:所有权系统、同步原语和 Send/Sync trait
- 性能优化:线程数量选择、避免竞争和异步编程
通过本文的学习,你应该能够掌握 Rust 多线程编程的核心概念和实践技巧,编写安全、高效的并发程序。
Rust 的多线程编程虽然有一定的学习曲线,但其带来的安全性和性能优势是值得的。随着经验的积累,你会发现 Rust 的多线程编程变得越来越自然和直观。
希望本文对你有所帮助,祝你在 Rust 多线程编程的道路上越走越远!
《Rust多线程编程学习笔记》 是转载文章,点击查看原文。