在上一节中我们学习了构建一个简单的单线程Web服务器。它每次只能接收并处理一个请求,如果处理过程比较耗时,那么整个系统的吞吐率就很低。这一节我们学习用线程池来对其进行改进。
在我们的想象中,改进后的main函数应该是这样的:
fn main() { let listener = TcpListener::bind(“127.0.0.1:7878”).unwrap(); let tp = ThreadPool::new(4); for stream in listener.incoming() { let stream = stream.unwrap(); tp.execute(|| handle_connection(stream)); }}
应该有一个线程池ThreadPool,调用其关联函数new可以创建一个自定义线程数的线程池。ThreadPool上还应该有一个execute方法,它接收闭包并传递给线程池内的线程,由线程调用闭包。
线程池§
下一步就是完成一个基本的线程池。那线程池的数据结构是什么样的呢?首先线程池里应该有一系列的线程(可以存在Vec中)。为了便于管理这些线程,每个线程还应该带有一些额外的信息,比如id。 线程池(主线程)如何管理这些线程呢?我们之前在四十九节学习过Channel进行线程间通信,那么这里就可以使用Channel对线程进行管理:
- 线程池持有sender
- 线程内轮询receiver,若收到任务就立即执行
暂时先考虑到这里,下面先进行编码:
首先将线程和线程上的额外信息抽象为一个名为Worker的结构体
struct Worker { id: usize, thread: Option,}
这里使用Option来存放创建线程返回的JoinHandle,以便于后面停止线程池时可以取出JoinHandle调用上面的join方法等待现有的任务结束后再停止。下面要为其实现一个new方法,以便于构造Worker。 前面说过线程内要轮询Channel的receiver来接收并执行任务,所以new需要接收一个receiver。这个receiver是什么类型的呢? 首先channel在mpsc模块下,之前学过,mpsc是多个生产者单个消费者的缩写。但是这里是多个线程来消费同一个sender发送的消息,会造成数据竞争。所以需要使用Mutex来对receiver进行加锁。但是多个线程都持有同一个receiver的话,又涉及到多线程的多重所有权。之前第五十节学过的多线程的多重所有权的知识,使用Arc可以解决。所以这里receiver的类型是Arc<Mutex>。 这里的消息应该分为两类:
- 线程需要执行的正常任务
- 线程池要停止时的终止消息
可以使用枚举来定义两个Message的变体来实现。其中正常的任务应该是一个闭包,该闭包是传入thread::spawn的参数,通过查看thread::spawn函数的定义,发现该闭包的类型是FnOnce() -> T + Send + ‘static。那么就可以定义消息和消息中的任务了:
type Job = Box;//定义类型别名省略代码enum Message { NewJob(Job), Terminate,}
然后定义Worker和它的new方法,Worker中的线程是这样的:
- 通过loop循环不断接收消息
- 判断消息类型,正常任务则取出消息中的任务直接执行
- 若为终止消息则终止循环,使该线程结束
struct Worker { id: usize, thread: Option,}impl Worker { fn new(id: usize, receiver: Arc<Mutex>) -> Worker { let thread = thread::spawn(move || loop { let message = receiver.lock().unwrap().recv().unwrap(); match message { Message::NewJob(job) => { println!(“Worker {} got a job; executing.”, id); job(); } Message::Terminate => { println!(“Worker {} was told to terminate.”, id); break; } } }); Worker { id: id, thread: Option::Some(thread), } }}
此处的loop为何不能写成while let循环呢?下面是while let的代码:
while let Ok(job) = receiver.lock().unwrap().recv() {println!(“Worker {} got a job; executing.”, id);job();}
因为锁在循环块外获取,而while 表达式中的值在整个块一直处于作用域中,job() 调用的过程中其仍然持有锁,这意味着其他 worker 因无法获取到锁而不能接收任务。而loop循环时,我们在循环块内获取锁,lock 方法返回的 MutexGuard 在 let job 语句结束之后立刻就被丢弃了。这确保了 recv 调用过程中持有锁,而在 job() 调用前锁就被释放了,这就允许并发处理多个请求了。
Worker到此为止就实现完了,下面看ThreadPool的实现:
pub struct ThreadPool { workers: Vec, sender: mpsc::Sender,}impl ThreadPool {//接收线程数量并返回对应的线程池 pub fn new(size: usize) -> ThreadPool { assert!(size > 0);//先创建一个size大小的Vector let mut workers = Vec::with_capacity(size);//创建channel的sender和receiver let (sender, receiver) = mpsc::channel();//创建带锁的receiver的原子引用 let receiver = Arc::new(Mutex::new(receiver));//创建相应数量的Worker,并把对应的id和receiver传入其中 for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); }//返回线程池 ThreadPool { workers: workers, sender: sender, } }//有任务时直接把任务通过sender发送到对应的receiver pub fn execute(&self, f: F) where F: FnOnce() + Send + ‘static, { self.sender.send(Message::NewJob(Box::new(f))).unwrap(); }}
main.rs中的handle_connection函数与上一节中的一致。cargo run运行,然后同时发送多个请求(可用jmeter或其它工具),输出:
Worker 0 got a job; executing.Worker 2 got a job; executing.Worker 1 got a job; executing.Worker 3 got a job; executing.Worker 2 got a job; executing.Worker 0 got a job; executing.Worker 1 got a job; executing.Worker 3 got a job; executing.Worker 2 got a job; executing.Worker 1 got a job; executing.Worker 3 got a job; executing.Worker 0 got a job; executing.Worker 2 got a job; executing.Worker 1 got a job; executing.Worker 0 got a job; executing.Worker 3 got a job; executing.Worker 2 got a job; executing.Worker 1 got a job; executing.Worker 3 got a job; executing.Worker 0 got a job; executing.Worker 2 got a job; executing.
发现确实有4个线程在执行任务。
为线程池实现Drop trait§
前面的main函数:
fn main() { let listener = TcpListener::bind(“127.0.0.1:7878”).unwrap(); let tp = ThreadPool::new(4); for stream in listener.incoming() { let stream = stream.unwrap(); tp.execute(|| handle_connection(stream)); }}
如果main函数执行完毕,一些变量就走出作用域,其中包括我们的ThreadPool,所以我们需要为其实现Drop trait。走出作用域时,等待现有的任务执行完再清理。但是在此之前需要先向各线程发出终止消息,让其跳出死循环。因为如果不跳出死循环,线程池中的线程就会一直陷在死循环里,而主线程会一直等待其执行完。
实现Drop trait:
impl Drop for ThreadPool { fn drop(&mut self) { for _ in &mut self.workers { self.sender.send(Message::Terminate).unwrap(); } println!(“Shutting down all workers.”); for worker in &mut self.workers { println!(“Shutting down worker {}”, worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } }}
为了更好的理解为什么需要两个分开的循环,想象一下只有两个 worker 的场景。如果在一个单独的循环中遍历每个 worker,在第一次迭代中向通道发出终止消息并对第一个 worker 线程调用 join。如果此时第一个 worker 正忙于处理请求,那么第二个 worker 会收到终止消息并停止。我们会一直等待第一个 worker 结束,不过它永远也不会结束因为第二个线程接收了终止消息。死锁!