C++11 实现线程池

本文实现参考了祁宇《深入应用C++11》一书中的实现,并在实验室项目中应用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
 
#include<thread>
#include<iostream>
#include<list>
#include<mutex>
#include<condition_variable>
#include<functional>
#include<memory>
#include<atomic>

//同步队列的实现
template<class T>
class SyncQueue
{
public:
SyncQueue(int max_size):max_size_(max_size),need_stop_(false)
{
}

void Put(const T& x)
{
Add(x);
}

void Put(T&& x)
{
Add(std::forward<T>(x));
}

void Take(T& t)
{
std::unique_lock<std::mutex> locker(mutex_);
not_empty_.wait(locker, [this] {return need_stop_ || NotEmpty(); });

if (need_stop_)
return;
t = queue_.front();
queue_.pop_front();
not_full_.notify_one();
}

void Take(std::list<T>& list)
{
std::unique_lock<std::mutex> locker(mutex_);
not_empty_.wait(locker, [this] {return need_stop_ || NotEmpty(); });

if (need_stop_)
return;
list = std::move(queue_);
not_full_.notify_one();
}

void Stop()
{
{
std::lock_guard<std::mutex> locker(mutex_);
need_stop_ = true;
}
not_empty_.notify_all();
not_full_.notify_all();
}

bool Empty()
{
std::lock_guard<std::mutex> locker(mutex_);
return queue_.empty();
}

bool Full()
{
std::lock_guard<std::mutex> locker(mutex_);
return queue.size() == max_size_;
}

size_t Size()
{
std::lock_guard<std::mutex> locker(mutex_);
return queue_.size();
}

private:
bool NotFull() const
{
//这个地方不应该在用锁,因为在调用点已经锁住
bool full = queue_.size() >= max_size_;
if (full)
std::cout << "缓冲区满了,需要等待。。。" << std::endl;
return !full;
}

bool NotEmpty()
{
bool empty = queue_.empty();
if (empty)
std::cout << "缓冲区空了,需要等待。。。异步线程ID:" << std::this_thread::get_id() << std::endl;
return !empty;
}

void Add(const T& x)
{
std::unique_lock<std::mutex> locker(mutex_);
not_full_.wait(locker, [this] {return need_stop_ || NotFull(); });

if (need_stop_)
return;
queue_.push_back(x);
not_empty_.notify_one();
}

void Add(T&& x)
{
std::unique_lock<std::mutex> locker(mutex_);
not_full_.wait(locker, [this] {return need_stop_ || NotFull(); });

if (need_stop_)
return;
queue_.push_back(std::forward<T>(x));
not_empty_.notify_one();
}
private:
std::list<T> queue_; //缓冲区
std::mutex mutex_; //互斥量,与下面的条件变量结合起来使用
std::condition_variable not_empty_; //不为空的条件变量
std::condition_variable not_full_; //没有满的条件变量
size_t max_size_; //同步队列最大的size
std::atomic_bool need_stop_;
};


const int MaxTaskCount = 100;

//线程池的实现
class ThreadPool
{
public:
using Task = std::function<void()>;
ThreadPool(int threads_num=std::thread::hardware_concurrency()):queue_(MaxTaskCount)
{
Start(threads_num);
}

~ThreadPool()
{
//如果没有停止主动停止线程池
Stop();
}

void Stop()
{
//保证多线程情况下只调用一次StopThreadGroup
std::call_once(flag_, [this] {StopThreadGroup(); });
}

void AddTask(const Task& task)
{
queue_.Put(task);
}

void AddTask(Task&& task)
{
queue_.Put(std::forward<Task>(task));
}
private:
void Start(int threads_num)
{
running_ = true;

for (int i = 0; i < threads_num; ++i)
{
thread_group_.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
}
}

void RunInThread()
{
while (running_)
{
Task t;
queue_.Take(t);

if (running_) t();
}
}

void StopThreadGroup()
{
queue_.Stop();
running_ = false;

for (auto thread : thread_group_)
{
if (thread)
thread->join();
}
thread_group_.clear();
}
private:
std::list<std::shared_ptr<std::thread>> thread_group_; //处理任务的线程组
SyncQueue<Task> queue_; //同步队列
std::atomic_bool running_; //是否停止的标志
std::once_flag flag_;
};


int main()
{
ThreadPool pool;

//修改此处即可应用在自己的项目中
std::thread t1([&pool]
{
for (int i = 0; i < 10; ++i)
{
auto t_id = std::this_thread::get_id();
pool.AddTask([t_id]
{
std::cout << "同步线程1的线程ID:" << t_id << std::endl;
});
}
});
std::this_thread::sleep_for(std::chrono::seconds(2));
pool.Stop();
t1.join();
//修改此处即可应用在自己的项目中

return 0;
}