翻了一下知乎的吃灰收藏夹,看到一个简易线程池项目,名字叫CTPL拿来学习一下。 https://github.com/vit-vit/ctpl 线程池工作原理: 线程池初始化时会创建一定数量的线程,这些线程会不断从任务队列中取任务并执行。

每个任务都是一个函数,push 函数放入队列并分配给空闲线程执行。

线程池支持动态调整线程数量。

当线程池被停止时,所有线程会安全地停止工作并退出。

点击github链接,创建codespaces,免去了配置环境的烦恼。

首先安装boost

1
sudo apt-get install libboost-all-dev

然后编译运行

1
g++ -I. example.cpp -o example && ./example

非常顺利,一遍编译通过。这个项目非常短小,只有一个头文件。

下面,我们具体分析一下这个头文件里写了啥子。

初始化传来线程数量,执行resize,创建线程,以及每个线程的flag,置为false,注意他的操作:this->flags[i] = std::make_shared<std::atomic<bool>>(false);。思考一下为什么这么写?

set_thread

为每一个线程set_thread()初始化一下,以从队列中取出任务运行。 我们来仔细看一下set_thread 的实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void set_thread(int i) {
    std::shared_ptr<std::atomic<bool>> flag(this->flags[i]);  // a copy of the shared ptr to the flag
    auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {
        std::atomic<bool> & _flag = *flag;
        std::function<void(int id)> * _f;
        bool isPop = this->q.pop(_f);
        while (true) {
            while (isPop) {  // if there is anything in the queue
                std::unique_ptr<std::function<void(int id)>> func(_f);  // at return, delete the function even if an exception occurred
                (*_f)(i);

                if (_flag)
                    return;  // the thread is wanted to stop, return even if the queue is not empty yet
                else
                    isPop = this->q.pop(_f);
            }

            // the queue is empty here, wait for the next command
            std::unique_lock<std::mutex> lock(this->mutex);
            ++this->nWaiting;
            this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });
            --this->nWaiting;

            if (!isPop)
                return;  // if the queue is empty and this->isDone == true or *flag then return
        }
    };
    this->threads[i].reset(new std::thread(f));  // compiler may not support std::make_unique()
}

首先定义一个function f,f里有什么?

从队列中取出任务,运行。

一个很自然的问题就出现了,队列为空怎么办?python的queue.get是一个阻塞方法,获得不到会一直停在那里,那么c++呢?

那你只能用cv.wait()了,如果队列为空,线程会进入等待状态,直到以下条件之一满足:

  • 有新的任务加入队列(isPop 为 true),
  • 线程池被标记为完成(this->isDone 为 true),
  • 或者该线程的停止标志已经被设置(_flag 为 true)。

notify_all() 会唤醒所有等待 cv 的线程。

如果有多个线程在 wait() 上等待,当 notify_all() 被调用时,它们都会被唤醒,然后继续执行。

push

将任务放到队列中:使用push方法。 说实话,看push确实很吃力。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
template<typename F>
auto push(F && f) ->std::future<decltype(f(0))> {
    auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));

    auto _f = new std::function<void(int id)>([pck](int id) {
        (*pck)(id);
    });
    this->q.push(_f);

    std::unique_lock<std::mutex> lock(this->mutex);
    this->cv.notify_one();

    return pck->get_future();
}

首先是一个模版, 先找我能看懂的部分:f是一个可调用对象(如函数、lambda 或者函数指针),表示用户传入的函数。ok 返回值是一个future,也可以理解。 第一步:创建一个packaged_task<decltype(f(0))(int)>(std::forward(f)),我看不懂了。 gpt说std::packaged_task:这是一个封装了可调用对象(如函数、lambda 或者其他可调用对象)的模板类。它的目的是提供一个异步执行的任务,并且能够通过 std::future 来获取任务的结果。看起来封装了很多,那我们封装了什么呢?不知道,似懂非懂继续看。

std::forward(f) 用于完美转发 f 参数。这样可以确保传递给 std::packaged_task 的 f 函数类型和调用 push 时传入的类型一致。如果 f 是一个右值,它会被作为右值转发,如果是左值,则作为左值转发。

第二步:创建 std::function 并包装 pck,我不懂了,为什么还要包装pck。

第三步,放到队列里,ok这样空队列就会变成有一个元素的队列里,这样我们只需要通知一个监听cv的线程,让他把这个任务领走就好了。

最后,返回一个future。std::packaged_task 将任务的结果与 future 关联。

回到问题,pck和function为什么要包装两次,因为是两种不同的东西,pck为了获取future,使得异步执行后的结果可以通过 future 获取。而function为了将封装的任务与线程池的工作机制相结合,并传递线程id。

所以push造成的结果就是:队列里多了一个function,一个线程被唤醒 取出队列并执行这个function,很合理!

pop

再来看一下pop:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// pops a functional wraper to the original function
std::function<void(int)> pop() {
    std::function<void(int id)> * _f = nullptr;
    this->q.pop(_f);
    std::unique_ptr<std::function<void(int id)>> func(_f);  // at return, delete the function even if an exception occurred
    
    std::function<void(int)> f;
    if (_f)
        f = *_f;
    return f;
}

赋值构造函数复制一个function到f中,这里发生一次深拷贝。这里还利用 std::unique_ptr 管理 _f 指向的内存,确保内存的自动释放。

最后,我们来看看stop函数。

stop

立即结束,和运行完再结束两种选择。 区别在于给每一个线程发送一个结束指令,还是全部直接this->isDone = true; 之前我们不是说过cv条件变量,这就是停止的两种条件,都是会停止?! 那干嘛不用一种? 测试一下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <cstdio>
#include <chrono>
#include <thread>
#include "ctpl.h"

void task(int id) {
    printf("Task %d started\n", id);
    std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟一个耗时任务
    printf("Task %d finished\n", id);
}

int main() {
    // 创建一个包含 3 个线程的线程池
    ctpl::thread_pool pool(3);

    // 向线程池提交 5 个任务
    for (int i = 0; i < 5; ++i) {
        pool.push(task);  // 添加任务时传递 id 参数
    }

    printf("Calling stop(false), waiting for tasks to finish...\n");
    std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟一个耗时任务
    pool.stop(false); // stop(false) 表示不等待任务完成,直接停止线程池

    printf("Thread pool stopped.\n");

    return 0;
}
/*
Task 2 started
Calling stop(false), waiting for tasks to finish...
Task 0 started
Task 1 started
Task 2 finished
Task 0 finished
Task 1 finished
Thread pool stopped.
*/

stop(false)还是需要等待所有task运行完了再结束。