当前位置: 首页>后端>正文

如何设计一个线程池?

  • 为什么需要线程池
  • 如何设计一个线程池
  • 用C++11实现一个线程池

为什么需要线程池

线程的频繁创建和销毁,不仅会消耗系统资源,还会降低系统的稳定性。

线程池预先创建空闲的线程,程序将一个任务传给线程池,线程池就会启动空闲线程来执行这个任务,执行结束以后,该线程并不会销毁,而是再次返回线程池中成为空闲状态,等待执行下一个任务。

使用线程池好处:

  • 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  • 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,- 降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池,就允许任务延期执行或定期执行。

设计线程池

线程池设计的思路都大同小异:将任务写入到阻塞队列中,然后线程池中的空闲线程从队列中获取任务执行,执行完成后再从队列获取新的任务执行。

设计线程池需要考虑的几个特性

  • 任务队列:按照功能分为优先级队列,先进后出队列,同步队列等等
  • 线程数量的控制
  • 线程数量增加或回收

任务队列

线程池中任务队列都是阻塞型的,线程从中获取任务,没有任务则阻塞。
一般有几种特性的队列

  • 优先级队列:支持有优先级的任务
  • 延迟队列:支持任务可以在某个时间点执行
  • 先进后出队列:队列类似于栈一样,任务先进后出
  • 同步队列:不存储任务的队列,将任务push到队列中时阻塞,直到将其分配给idle线程

线程数量

多线程编程中,经常会被问到:该设置多少个线程才合理?

如果线程数设置太多,则对线程调度,上下文切换,缓存等产生较大的影响;如果设置太小,则造成吞吐不够。
而到底设置多少线程数,其实和机器资源的额度和任务特征有关。
我们一般将任务分为几种

  • 计算密集型
  • IO密集型
  • 混合型:计算与IO密集型

引用《Java Concurrency in Practice》中的公式来估算合适的线程数。

计算密集型

线程数 = CPU数 + 1

如果一个线程产生缺页或者其他原因停止运行,则还有一个idle线程会占用CPU,不会导致CPU资源浪费。但是这只是一个相对合理的经验值,并不科学。

IO密集型和混合型

如何设计一个线程池?,第1张
图片.png

如果希望将CPU达到指定的利用率,则按照以下公式来计算线程数


如何设计一个线程池?,第2张
图片.png

所以,最佳线程数并不一定是固定不变的,可以随着任务的差异及资源利用率进行动态调整,这就需要线程池有动态调整线程的能力。

线程的创建与回收

线程池一般会在初始化时创建空闲线程,或者是等到需要的时候再延迟创建。

线程池扩容
当任务较多,已有线程都处于运行状态,而已有线程数没有达到最大线程数限制时,则需要创建新的线程来执行任务。
也会有一些触发条件,如任务等待多久仍然没有被调度则创建新线程等。

线程池缩容

当线程处于空闲状态一段时间后,则需要回收此空闲线程以节约资源。

线程池监控指标

线程池需要能够导出一些指标以观测其健康状况,如

  • 列队中任务数量
  • 运行中线程数量

C++实现线程池

需求

在开发一个C++线程池前,我们先明确几点需求

  • 线程数
    • 能够指定最小和最大线程数
    • 任务等待多久才开始创建新线程
    • 线程空闲多久才开始回收线程
  • 阻塞队列
    • 用来存储任务
    • 当队列为空时,从中获取任务的操作会被阻塞直到有其他线程写入新任务
    • 当队列已满时,将任务写入队列的操作会被阻塞直到有其他线程从队列中获取任务
  • 任务
    • 能够获取任务执行后的返回值
    • 不固化任务的函数签名
    • 任务能够延迟执行

明确需求后,线程池的设计就比较清晰了

如何设计一个线程池?,第3张
截图_选择区域_20220414081321.png

完整代码

https://github.com/ikenchina/thread_pool

阻塞队列

先定义队列的接口,用来存储任务

class Queue
{
public:
    virtual void Push(const Task& task) = 0;
    virtual void Pop() = 0;
    virtual Task Front() = 0;
    virtual bool Empty() = 0;
    virtual size_t Size() = 0;
    virtual void Clear() = 0;
    virtual bool Full() = 0;
};

实现普通队列和优先级队列

class FifoQueue : public Queue
class PriorityQueue : public Queue

再定义阻塞队列的接口

class BlockingQueue
{
public:
    BlockingQueue() {Open();}
    virtual StatusCode Push(const Context& ctx, const Task& task) = 0;
    virtual StatusCode Pop(const Context& ctx, Task* t) = 0;
    virtual size_t QueueSize() = 0;
    virtual bool Empty() = 0;
    virtual void Clear() = 0;
    virtual void Close();
    virtual void Open();
};

实现普通阻塞队列,默认使用FifoQueue 来存储任务。

class BaseBlockingQueue : public BlockingQueue
{
public:
    BaseBlockingQueue(std::shared_ptr<Queue> q = std::make_shared<FifoQueue>()) : BlockingQueue()
    {
        queue_ = q;
    }

    virtual StatusCode Push(const Context& ctx, const Task& task)
    {
        std::unique_lock<std::mutex> lock(queue_mutex_);
        // 当队列已满,且没有关闭,则等待有线程从队列中获取任务
        while (queue_->Full() && !shutdown_)
        {
            if (out_queue_cond_.wait_until(lock, ctx.Deadline()) == std::cv_status::timeout)
            {
                // 如果等待超时且超过deadline,则返回Timeout
                if (std::chrono::system_clock::now() >= ctx.Deadline())
                    return StatusCode::Timeout;
            }
        }
        if (shutdown_)
            return StatusCode::Closed;

        queue_->Push(task);
        in_queue_cond_.notify_one();  // 唤醒一个线程
        return StatusCode::Ok;
    }
    
    virtual StatusCode Pop(const Context& ctx, Task* t)
    {
        std::unique_lock<std::mutex> lock(queue_mutex_);        
        std::cv_status cv_status = std::cv_status::no_timeout;  
        // 如果队列为空且没有关闭,则等待(有任务写入队列)
        if (queue_->Empty() && !shutdown_)
        {
            cv_status = in_queue_cond_.wait_until(lock, ctx.Deadline());
        }
        if (shutdown_ && queue_->Empty())
            return StatusCode::Closed;
        if (cv_status == std::cv_status::timeout)
            return StatusCode::Timeout;

        auto task = queue_->Front();
        queue_->Pop();
        out_queue_cond_.notify_one();   // 唤醒一个被阻塞线程
        
        *t = task;
        return StatusCode::Ok;
    }
    
    ......     
}

混合队列继承于BlockingQueue,其分别使用FifoQueue和PriorityQueue来存储普通任务和延迟执行任务。

class MixedBlockingQueue : public BlockingQueue
{
public:
    MixedBlockingQueue(size_t fifo_queue_size = 1000, size_t delay_queue_size=1000) :
    BlockingQueue(),
    fifo_queue_(std::make_shared<FifoQueue>(fifo_queue_size)), 
    delay_queue_(std::make_shared<PriorityQueue>(delay_queue_size))
    {
    }
    
    virtual StatusCode Push(const Context& ctx, const Task& task){
        // 和BaseBlockingQueue几乎一样
        ......
    }
    
    virtual StatusCode Pop(const Context& ctx, Task* t)
    {
        while(true)
        {
            std::unique_lock<std::mutex> lock(queue_mutex_);        
            std::cv_status cv_status = std::cv_status::no_timeout;
            if (isEmpty() && !shutdown_)
            {
                cv_status = in_queue_cond_.wait_until(lock, ctx.Deadline());
            }
            if (shutdown_ && isEmpty())
                return StatusCode::Closed;
            if (cv_status == std::cv_status::timeout)
                return StatusCode::Timeout;

            // 先从delay queue获取,
            // delay queue 对执行时间更为敏感
            if (!delay_queue_->Empty())
            {
                auto task = delay_queue_->Front();
                if (task.exec_time < std::chrono::system_clock::now())
                {      
                    delay_queue_->Pop();
                    *t = task;
                    break;
                }
                // 若task的执行时间还早,则检查fifo queue
                if (!fifo_queue_->Empty())
                {
                    *t = fifo_queue_->Front();
                    fifo_queue_->Pop();
                    break;
                }
                
                // 等待,直到到达task执行时间或者被其他线程唤醒
                auto stat = in_queue_cond_.wait_until(lock, std::min(task.exec_time, ctx.Deadline()));
                if (stat == std::cv_status::timeout)
                {
                    if (ctx.Deadline() < std::chrono::system_clock::now())
                        return StatusCode::Timeout;
                }
                continue;
            }
            if (!fifo_queue_->Empty())
            {
                *t = fifo_queue_->Front();
                fifo_queue_->Pop();
                break;
            }
        }
        out_queue_cond_.notify_one();
        return StatusCode::Ok;
    }
    ......
}

线程池配置

class ThreadPoolSettings
{
    // 最小线程数
    std::atomic<size_t> min_pool_size_;
    
    // 最大线程数
    std::atomic<size_t> max_pool_size_;
    
    // 空闲线程时间:当超过此阈值线程仍然无法获取到任务,则销毁此线程
    std::chrono::nanoseconds keepalive_time_ = std::chrono::seconds(10);
    
    // 线程扩容时间:当Push任务到队列且队列已满时,等待超过此阈值则进行新建一个新线程
    std::chrono::nanoseconds scaleout_time_ = std::chrono::milliseconds(300);
};

线程池

class ThreadPool {
public:
    // 构造函数
    // 传入配置和阻塞队列(默认使用混合队列)
    ThreadPool(ThreadPoolSettings settings, 
    std::shared_ptr<BlockingQueue> queue = std::make_shared<MixedBlockingQueue>());、
    
    // 禁止拷贝
    ThreadPool(const ThreadPool &) = delete;
    ThreadPool(ThreadPool &&) = delete;
    ThreadPool & operator=(const ThreadPool &) = delete;
    ThreadPool & operator=(ThreadPool &&) = delete;
    
    
    // 用于动态调整线程池配置
    ThreadPoolSettings& GetSettings() {return settings_;}
}

启动与停止

// 启动线程池,创建最少idle 线程
void Start();

// 停止线程池,回收所有线程
// 如果force=false:等待所有任务执行完成,再回收线程
// 如果force=true:等待正在执行的任务执行完,再回收线程
void Stop(bool force = false);

提交任务

template<class F, class... Args>
auto ScheduleTask(const Context& ctx, F&& f, Args&&... args) 
    -> std::tuple<StatusCode, std::future<typename std::result_of<F(Args...)>::type>>
{
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared<std::packaged_task<return_type()> >(
        std::bind(std::forward<F>(f), std::forward<Args>(args)...)
    );
        
    std::future<return_type> res = task->get_future();
    auto tt = Task([task](){(*task)();});
    auto ret = scheduleTask(ctx, tt);
    return std::make_tuple(ret, std::move(res));
}
  • 函数签名
    • 参数:利用可变模板来支持任意形参
    • 返回值:函数返回一个std::tuple
      • 第一个元素是状态码StatusCode,
      • 第二个元素是任务返回值的std::future
StatusCode ThreadPool::scheduleTask(const Context& ctx, Task& task)
{
    StatusCode res = StatusCode::Ok;

    while(true)
    {
        // Context的Deadline 和 IncreaseThreadDuration 中较小者作为 Push队列的超时时间
        Context ctx2 = Context::WithDeadline(ctx, TimeAddDuration(std::chrono::system_clock::now(), settings_.IncreaseThreadDuration()));
        if (ctx.Deadline() < ctx2.Deadline())
        {
            ctx2.SetDeadline(ctx.Deadline());
        }
        
        res = queue_->Push(ctx2, task);

        // 如果需要回收线程
        if (needGcIdleWorkers())
            GcIdleWorkers();

        // 如果线程池还没有线程,则新建一个
        if (available_workers_size_ == 0)
        {
            uint expected = 0; 
            std::unique_lock<std::mutex> lock(thread_mutex_);
            if (available_workers_size_.compare_exchange_strong(expected, 1))
                CreateThread();
        }

        if (res != StatusCode::Timeout)
            break;

        // 如果超过Context的Deadline
        if (std::chrono::system_clock::now() >= ctx.Deadline())
            break;
        
        // 尝试新建一个线程
        TryIncreaseThreads();
    }

    return res;
}

使用

int main(int argc, char* argv[])
{
    ThreadPoolSettings settings;
    auto increase_duration = std::chrono::milliseconds(50);
    settings.SetIdleThreadDuration(std::chrono::milliseconds(200)).SetMaxThreadSize(4).SetMinThreadSize(0);
    settings.SetIncreaseThreadDuration(increase_duration);

    ThreadPool tp(settings);
    tp.Start();
    
    auto ctx = Context::WithTimeout(Context(), std::chrono::seconds(1));

    auto resp = tp.ScheduleTask(ctx, [](int a) {return a*a;}, 2);
    if (std::get<0>(resp) != StatusCode::Ok)
        return 1;
    
    auto delay_resp = tp.ScheduleDelayTask(Context(), std::chrono::seconds(1), test_func, 3);
    if (std::get<0>(resp) != StatusCode::Ok)
        return 1;

    cout << "2 * 2 = " << std::get<1>(resp).get() << endl;
    cout << "3 * 3 = " << std::get<1>(delay_resp).get() << endl;

    tp.Stop();
}

https://www.xamrdz.com/backend/36f1940126.html

相关文章: