0%

C++线程池

C++线程池的整理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#pragma once
#pragma once
#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
#include<chrono>
#include<ctime>
#include<Windows.h>
class ThreadPool {

public:
ThreadPool() {
this->stop = false;
}
ThreadPool(size_t);
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)->std::future<typename std::result_of<F(Args...)>::type>;//任务入队
void init(size_t threads);
void wait();
~ThreadPool();

private:
std::vector< std::thread > workers; //线程队列
std::queue< std::function<void()> > tasks; //任务队列

std::mutex queue_mutex;
std::condition_variable condition;
std::condition_variable conditionWait;//等待条件
std::mutex wait_mutex;
bool stop;
};

// 构造函数,把线程插入线程队列,插入时调用embrace_back(),用匿名函数lambda初始化Thread对象
inline ThreadPool::ThreadPool(size_t threads) {
this->init(threads);

}

// 添加新的任务到任务队列
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
// 获取函数返回值类型
using return_type = typename std::result_of<F(Args...)>::type;

auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);

std::future<return_type> res = task->get_future();//任务执行完获得其返回值
{
std::unique_lock<std::mutex> lock(queue_mutex); //加锁
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");

tasks.emplace([task]() { (*task)(); }); //把任务加入队列
} //自动解锁
condition.notify_one(); //通知条件变量,唤醒一个线程
return res;
}

inline void ThreadPool::init(size_t threads)
{
this->stop = false;
for (size_t i = 0; i < threads; ++i)
workers.emplace_back(
[this]
{
for (;;)//每一个任何都是死循环,直到其它代码执行中断
{
// task是一个函数类型,从任务队列接收任务
std::function<void()> task;
{
if (this->stop || this->tasks.empty()) {//若有代码调用wait函数,则任务为空或中断时进行通知
this->conditionWait.notify_all();
}
//给互斥量加锁,锁对象生命周期结束后自动解锁
std::unique_lock<std::mutex> lock(this->queue_mutex);

this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });//获取资源锁,若未获取到则进行等待

if (this->stop && this->tasks.empty())//任何为空且有停止时进行返回
return;

//从任务队列取出一个任务
task = std::move(this->tasks.front());
this->tasks.pop();
} // 自动解锁
task();//执行任务
}
}
);
}

inline void ThreadPool::wait()
{
{
std::unique_lock<std::mutex>lock(this->wait_mutex);
this->conditionWait.wait(lock, [this]() {return this->stop || this->tasks.empty(); });//等待任务列表为空或有中断标志
}
}
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread &worker : workers)
worker.join();
}

#endif