8 #include <Sawyer/Exception.h>
9 #include <Sawyer/Graph.h>
10 #include <Sawyer/Map.h>
11 #include <Sawyer/Sawyer.h>
12 #include <Sawyer/Stack.h>
14 #include <boost/foreach.hpp>
15 #include <boost/thread/condition_variable.hpp>
16 #include <boost/thread/locks.hpp>
17 #include <boost/thread/mutex.hpp>
18 #include <boost/thread/thread.hpp>
19 #include <boost/version.hpp>
31 template<
class DependencyGraph,
class Functor>
34 DependencyGraph dependencies_;
38 boost::condition_variable workInserted_;
40 boost::thread *workers_;
41 size_t nItemsStarted_;
42 size_t nItemsFinished_;
43 size_t nWorkersRunning_;
44 size_t nWorkersFinished_;
45 std::set<size_t> runningTasks_;
53 : hasStarted_(false), hasWaited_(false), nWorkers_(0), workers_(NULL), nItemsStarted_(0), nItemsFinished_(0),
54 nWorkersRunning_(0), nWorkersFinished_(0) {}
72 : hasStarted_(false), hasWaited_(false), nWorkers_(0), workers_(NULL), nItemsStarted_(0), nItemsFinished_(0),
73 nWorkersRunning_(0), nWorkersFinished_(0) {
75 run(dependencies, nWorkers, functor);
103 void start(
const DependencyGraph &dependencies,
size_t nWorkers, Functor functor) {
104 boost::lock_guard<boost::mutex> lock(mutex_);
106 throw std::runtime_error(
"work can start only once per object");
108 dependencies_ = dependencies;
110 nWorkers = boost::thread::hardware_concurrency();
111 nWorkers_ = std::max((
size_t)1, std::min(nWorkers, dependencies.nVertices()));
112 nItemsStarted_ = nWorkersFinished_ = 0;
113 runningTasks_.clear();
115 startWorkersNS(functor);
123 boost::unique_lock<boost::mutex> lock(mutex_);
124 if (!hasStarted_ || hasWaited_)
129 for (
size_t i=0; i<nWorkers_; ++i)
133 if (dependencies_.nEdges() != 0)
135 dependencies_.clear();
142 template<
class Rep,
class Period>
143 bool tryWaitFor(
const boost::chrono::duration<Rep, Period> &relTime) {
144 const boost::chrono::steady_clock::time_point endAt = boost::chrono::steady_clock::now() + relTime;
145 boost::unique_lock<boost::mutex> lock(mutex_);
146 if (!hasStarted_ || hasWaited_)
150 for (
size_t i = 0; i < nWorkers_; ++i) {
151 if (!workers_[i].try_join_until(endAt))
157 if (dependencies_.nEdges() != 0)
159 dependencies_.clear();
168 void run(
const DependencyGraph &dependencies,
size_t nWorkers, Functor functor) {
169 start(dependencies, nWorkers, functor);
177 boost::lock_guard<boost::mutex> lock(mutex_);
178 return !hasStarted_ || nWorkersFinished_ == nWorkers_;
186 boost::lock_guard<boost::mutex> lock(mutex_);
187 return nItemsStarted_;
194 boost::lock_guard<boost::mutex> lock(mutex_);
195 return nItemsFinished_;
203 boost::lock_guard<boost::mutex> lock(mutex_);
204 return runningTasks_;
212 boost::lock_guard<boost::mutex> lock(mutex_);
213 return std::make_pair(nWorkers_-nWorkersFinished_, nWorkersRunning_);
218 void fillWorkQueueNS() {
219 ASSERT_require(workQueue_.
isEmpty());
220 BOOST_FOREACH (
const typename DependencyGraph::Vertex &vertex, dependencies_.vertices()) {
221 if (vertex.nOutEdges() == 0)
222 workQueue_.
push(vertex.id());
227 void startWorkersNS(Functor functor) {
228 workers_ =
new boost::thread[nWorkers_];
229 for (
size_t i=0; i<nWorkers_; ++i)
230 workers_[i] = boost::thread(startWorker,
this, functor);
234 static void startWorker(
ThreadWorkers *
self, Functor functor) {
235 self->worker(functor);
238 void worker(Functor functor) {
241 boost::unique_lock<boost::mutex> lock(mutex_);
242 while (nItemsFinished_ < nItemsStarted_ && workQueue_.
isEmpty())
243 workInserted_.wait(lock);
244 if (nItemsFinished_ == nItemsStarted_ && workQueue_.
isEmpty()) {
248 ASSERT_forbid(workQueue_.
isEmpty());
249 size_t workItemId = workQueue_.
pop();
250 typename DependencyGraph::ConstVertexIterator workVertex = dependencies_.findVertex(workItemId);
251 ASSERT_require(workVertex->nOutEdges() == 0);
252 typename DependencyGraph::VertexValue workItem = workVertex->value();
257 runningTasks_.insert(workItemId);
259 functor(workItemId, workItem);
263 runningTasks_.erase(workItemId);
266 Container::Map<size_t, typename DependencyGraph::ConstVertexIterator> candidateWorkItems;
267 BOOST_FOREACH (
const typename DependencyGraph::Edge &edge, workVertex->inEdges())
268 candidateWorkItems.insert(edge.source()->id(), edge.source());
269 dependencies_.clearInEdges(workVertex);
270 size_t newWorkInserted = 0;
271 BOOST_FOREACH (
const typename DependencyGraph::ConstVertexIterator &candidate, candidateWorkItems.values()) {
272 if (candidate->nOutEdges() == 0) {
273 workQueue_.
push(candidate->id());
279 if (0 == newWorkInserted) {
281 workInserted_.notify_all();
282 }
else if (1 == newWorkInserted) {
284 }
else if (2 == newWorkInserted) {
285 workInserted_.notify_one();
287 workInserted_.notify_all();
314 template<
class DependencyGraph,
class Functor>
316 workInParallel(
const DependencyGraph &dependencies,
size_t nWorkers, Functor functor) {
321 template<
class DependencyGraph,
class Functor,
class Monitor>
323 workInParallel(
const DependencyGraph &dependencies,
size_t nWorkers, Functor functor,
324 Monitor monitor, boost::chrono::milliseconds period) {
326 workers.
start(dependencies, nWorkers, functor);
~ThreadWorkers()
Destructor.
std::set< size_t > runningTasks()
Tasks currently running.
bool tryWaitFor(const boost::chrono::duration< Rep, Period > &relTime)
Wait for work to complete.
Name space for the entire library.
Stack & push(const Value &value)
Push new item onto stack.
bool isFinished()
Test whether all possible work is finished.
Work list with dependencies.
void wait()
Wait for work to complete.
Value pop()
Pop existing item from stack.
void start(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Start workers and return.
size_t nFinished()
Number of tasks that have completed.
void run(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Synchronously processes tasks.
size_t nStarted()
Number of tasks that have started.
bool isEmpty() const
Determines if the stack is empty.
void workInParallel(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Performs work in parallel.
Error when a cycle is detected.
ThreadWorkers()
Default constructor.
std::pair< size_t, size_t > nWorkers()
Number of worker threads.
ThreadWorkers(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Constructor that synchronously runs the work.