翻了一下知乎的吃灰收藏夹,看到一个简易线程池项目,名字叫CTPL拿来学习一下。
https://github.com/vit-vit/ctpl
线程池工作原理:
线程池初始化时会创建一定数量的线程,这些线程会不断从任务队列中取任务并执行。
每个任务都是一个函数,push 函数放入队列并分配给空闲线程执行。
线程池支持动态调整线程数量。
当线程池被停止时,所有线程会安全地停止工作并退出。
点击github链接,创建codespaces,免去了配置环境的烦恼。
首先安装boost
1
|
sudo apt-get install libboost-all-dev
|
然后编译运行
1
|
g++ -I. example.cpp -o example && ./example
|
非常顺利,一遍编译通过。这个项目非常短小,只有一个头文件。
下面,我们具体分析一下这个头文件里写了啥子。
初始化传来线程数量,执行resize,创建线程,以及每个线程的flag,置为false,注意他的操作:this->flags[i] = std::make_shared<std::atomic<bool>>(false);
。思考一下为什么这么写?
set_thread
为每一个线程set_thread()初始化一下,以从队列中取出任务运行。
我们来仔细看一下set_thread 的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
void set_thread(int i) {
std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag
auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {
std::atomic<bool> & _flag = *flag;
std::function<void(int id)> * _f;
bool isPop = this->q.pop(_f);
while (true) {
while (isPop) { // if there is anything in the queue
std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
(*_f)(i);
if (_flag)
return; // the thread is wanted to stop, return even if the queue is not empty yet
else
isPop = this->q.pop(_f);
}
// the queue is empty here, wait for the next command
std::unique_lock<std::mutex> lock(this->mutex);
++this->nWaiting;
this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });
--this->nWaiting;
if (!isPop)
return; // if the queue is empty and this->isDone == true or *flag then return
}
};
this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()
}
|
首先定义一个function f,f里有什么?
从队列中取出任务,运行。
一个很自然的问题就出现了,队列为空怎么办?python的queue.get是一个阻塞方法,获得不到会一直停在那里,那么c++呢?
那你只能用cv.wait()了,如果队列为空,线程会进入等待状态,直到以下条件之一满足:
- 有新的任务加入队列(isPop 为 true),
- 线程池被标记为完成(this->isDone 为 true),
- 或者该线程的停止标志已经被设置(_flag 为 true)。
notify_all() 会唤醒所有等待 cv 的线程。
如果有多个线程在 wait() 上等待,当 notify_all() 被调用时,它们都会被唤醒,然后继续执行。
push
将任务放到队列中:使用push方法。
说实话,看push确实很吃力。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
template<typename F>
auto push(F && f) ->std::future<decltype(f(0))> {
auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));
auto _f = new std::function<void(int id)>([pck](int id) {
(*pck)(id);
});
this->q.push(_f);
std::unique_lock<std::mutex> lock(this->mutex);
this->cv.notify_one();
return pck->get_future();
}
|
首先是一个模版,
先找我能看懂的部分:f是一个可调用对象(如函数、lambda 或者函数指针),表示用户传入的函数。ok
返回值是一个future,也可以理解。
第一步:创建一个packaged_task<decltype(f(0))(int)>(std::forward(f)),我看不懂了。
gpt说std::packaged_task:这是一个封装了可调用对象(如函数、lambda 或者其他可调用对象)的模板类。它的目的是提供一个异步执行的任务,并且能够通过 std::future 来获取任务的结果。看起来封装了很多,那我们封装了什么呢?不知道,似懂非懂继续看。
std::forward(f) 用于完美转发 f 参数。这样可以确保传递给 std::packaged_task 的 f 函数类型和调用 push 时传入的类型一致。如果 f 是一个右值,它会被作为右值转发,如果是左值,则作为左值转发。
第二步:创建 std::function 并包装 pck,我不懂了,为什么还要包装pck。
第三步,放到队列里,ok这样空队列就会变成有一个元素的队列里,这样我们只需要通知一个监听cv的线程,让他把这个任务领走就好了。
最后,返回一个future。std::packaged_task 将任务的结果与 future 关联。
回到问题,pck和function为什么要包装两次,因为是两种不同的东西,pck为了获取future,使得异步执行后的结果可以通过 future 获取。而function为了将封装的任务与线程池的工作机制相结合,并传递线程id。
所以push造成的结果就是:队列里多了一个function,一个线程被唤醒 取出队列并执行这个function,很合理!
pop
再来看一下pop:
1
2
3
4
5
6
7
8
9
10
11
|
// pops a functional wraper to the original function
std::function<void(int)> pop() {
std::function<void(int id)> * _f = nullptr;
this->q.pop(_f);
std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
std::function<void(int)> f;
if (_f)
f = *_f;
return f;
}
|
赋值构造函数复制一个function到f中,这里发生一次深拷贝。这里还利用 std::unique_ptr 管理 _f 指向的内存,确保内存的自动释放。
最后,我们来看看stop函数。
stop
立即结束,和运行完再结束两种选择。
区别在于给每一个线程发送一个结束指令,还是全部直接this->isDone = true;
之前我们不是说过cv条件变量,这就是停止的两种条件,都是会停止?!
那干嘛不用一种?
测试一下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
#include <cstdio>
#include <chrono>
#include <thread>
#include "ctpl.h"
void task(int id) {
printf("Task %d started\n", id);
std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟一个耗时任务
printf("Task %d finished\n", id);
}
int main() {
// 创建一个包含 3 个线程的线程池
ctpl::thread_pool pool(3);
// 向线程池提交 5 个任务
for (int i = 0; i < 5; ++i) {
pool.push(task); // 添加任务时传递 id 参数
}
printf("Calling stop(false), waiting for tasks to finish...\n");
std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟一个耗时任务
pool.stop(false); // stop(false) 表示不等待任务完成,直接停止线程池
printf("Thread pool stopped.\n");
return 0;
}
/*
Task 2 started
Calling stop(false), waiting for tasks to finish...
Task 0 started
Task 1 started
Task 2 finished
Task 0 finished
Task 1 finished
Thread pool stopped.
*/
|
stop(false)还是需要等待所有task运行完了再结束。