SimpleAI
 All Classes Namespaces Files Functions Variables Typedefs Macros Groups Pages
ThreadPool.h
1 /*
2  Copyright (c) 2012 Jakob Progsch, Vaclav Zeman
3 
4  This software is provided 'as-is', without any express or implied
5  warranty. In no event will the authors be held liable for any damages
6  arising from the use of this software.
7 
8  Permission is granted to anyone to use this software for any purpose,
9  including commercial applications, and to alter it and redistribute it
10  freely, subject to the following restrictions:
11 
12  1. The origin of this software must not be misrepresented; you must not
13  claim that you wrote the original software. If you use this software
14  in a product, an acknowledgment in the product documentation would be
15  appreciated but is not required.
16 
17  2. Altered source versions must be plainly marked as such, and must not be
18  misrepresented as being the original software.
19 
20  3. This notice may not be removed or altered from any source
21  distribution.
22  */
23 
24 #pragma once
25 
26 #include <vector>
27 #include <queue>
28 #include <memory>
29 #include <thread>
30 #include <mutex>
31 #include <condition_variable>
32 #include <atomic>
33 #include <future>
34 #include <functional>
35 
36 namespace ai {
37 
38 class ThreadPool final {
39 public:
40  explicit ThreadPool(size_t);
41 
45  template<class F, class ... Args>
46  auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;
47 
48  ~ThreadPool();
49 private:
50  // need to keep track of threads so we can join them
51  std::vector<std::thread> _workers;
52  // the task queue
53  std::queue<std::function<void()> > _tasks;
54 
55  // synchronization
56  std::mutex _queueMutex;
57  std::condition_variable _condition;
58  std::atomic_bool _stop;
59 };
60 
61 // the constructor just launches some amount of workers
62 inline ThreadPool::ThreadPool(size_t threads) :
63  _stop(false) {
64  _workers.reserve(threads);
65  for (size_t i = 0; i < threads; ++i) {
66  _workers.emplace_back([this] {
67  for (;;) {
68  std::function<void()> task;
69  {
70  std::unique_lock<std::mutex> lock(this->_queueMutex);
71  this->_condition.wait(lock, [this] {
72  return this->_stop || !this->_tasks.empty();
73  });
74  if (this->_stop && this->_tasks.empty()) {
75  return;
76  }
77  task = std::move(this->_tasks.front());
78  this->_tasks.pop();
79  }
80 
81  task();
82  }
83  });
84  }
85 }
86 
87 // add new work item to the pool
88 template<class F, class ... Args>
89 auto ThreadPool::enqueue(F&& f, Args&&... args)
90 -> std::future<typename std::result_of<F(Args...)>::type> {
91  using return_type = typename std::result_of<F(Args...)>::type;
92 
93  auto task = std::make_shared<std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
94 
95  std::future<return_type> res = task->get_future();
96  {
97  std::unique_lock<std::mutex> lock(_queueMutex);
98  _tasks.emplace([task]() {(*task)();});
99  }
100  _condition.notify_one();
101  return res;
102 }
103 
104 // the destructor joins all threads
105 inline ThreadPool::~ThreadPool() {
106  _stop = true;
107  _condition.notify_all();
108  for (std::thread &worker : _workers)
109  worker.join();
110 }
111 
112 }
Definition: ThreadPool.h:38
auto enqueue(F &&f, Args &&...args) -> std::future< typename std::result_of< F(Args...)>::type >
Definition: ThreadPool.h:89