In the header file
workerpool.hpp
, we declare the WorkerPool class.
The argument of the constructor is the number of workers. If the number of
workers is zero, all tasks are executed serially (which can be convenient for
debugging). A WorkerPool
can not be copied, so we disable the copy
constructor and copy assignment constructor. A Task
is defined as a
function that takes no arguments and returns nothing. A Task
can be
passed to the worker pool using the push_back()
method (which name
is inspired by std::list::push_back()
). The final public method is
the sync()
method that waits for all tasks to be completed.
The private methods and members of
WorkerPool
are a vector of
worker threads, a list of (unfinished) tasks, the worker routine, and some
synchronization objects. The boolean abort_flag
is used to get the
workers to exit their worker routines. The count
integer represents
the number of unfinished tasks. Tasks are guarded by the
task_mut
mutex, and count
by the
count_mut
mutex. The task_cv
condition variable is
used to signal that a new Task
is added to the list of tasks, while
the count_cv
is used to signal that another task has been
completed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef WORKERPOOL_HPP_ | |
#define WORKERPOOL_HPP_ | |
#include <thread> | |
#include <mutex> | |
#include <condition_variable> | |
#include <list> | |
#include <vector> | |
#include <functional> | |
class WorkerPool { | |
public: | |
WorkerPool(size_t num_workers); | |
~WorkerPool(); | |
/// disable copy contructor | |
WorkerPool(const WorkerPool & wp) = delete; | |
WorkerPool& operator=(const WorkerPool & wp) = delete; | |
typedef std::function<void(void)> Task; | |
void push_back(Task task); | |
void sync(); | |
private: | |
std::vector<std::thread> workers; | |
std::list<Task> tasks; | |
bool abort_flag; | |
std::mutex task_mut; | |
std::condition_variable task_cv; | |
int count; | |
std::mutex count_mut; | |
std::condition_variable count_cv; | |
void worker_routine(); | |
}; | |
#endif |
The source file contains the definitions of the methods of
WorkerPool
. The constructor initiates a number of worker threads
and starts their worker_routine()
. The descructor makes sure that
all threads exit their worker routine and joins the threads. This means that the
WorkerPool
uses the RAII technique. The
push_back()
method evaluates the Task
object if there
are no workers, and otherwise adds the task to the tasks
list,
after increasing the count
. We signal that a task has been added in
case a worker is sleeping. The sync()
method only returns when
count == 0
. Finally, the most important part of the code is the
worker_routine()
. When the list tasks
is empty, a
worker waits until a signal is given that a new tasks is added, or the
abort_flag
is set. In the first case, the task is executed, in the
second case, the worker exits the worker_routine()
.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "workerpool.hpp" | |
WorkerPool::WorkerPool(size_t num_workers) { | |
workers.resize(num_workers); | |
abort_flag = false; | |
count = 0; | |
for ( auto & worker : workers ) { | |
worker = std::thread([this](){worker_routine();}); | |
} | |
} | |
WorkerPool::~WorkerPool() { | |
std::unique_lock<std::mutex> task_lock(task_mut); | |
abort_flag = true; | |
task_cv.notify_all(); | |
// release the mutex | |
task_lock.unlock(); | |
// wait for all workers to exit | |
for ( auto & worker : workers ) { | |
worker.join(); | |
} | |
} | |
void WorkerPool::push_back(Task task) { | |
// if there are ZERO workers, just execute the task and return. | |
if ( workers.empty() ) { | |
task(); return; | |
} // else... | |
// first increase the task count | |
std::unique_lock<std::mutex> count_lock(count_mut); | |
++count; | |
count_lock.unlock(); | |
// then add task to list | |
std::unique_lock<std::mutex> task_lock(task_mut); | |
tasks.push_back(task); | |
// and wake up a potential sleeping worker | |
task_cv.notify_one(); | |
} | |
void WorkerPool::sync() { | |
std::unique_lock<std::mutex> count_lock(count_mut); | |
count_cv.wait(count_lock, [this](){return count == 0;}); | |
} | |
void WorkerPool::worker_routine() { | |
while ( true ) { | |
// try to lock the task mutex | |
std::unique_lock<std::mutex> task_lock(task_mut); | |
// task_mut is locked: try to get a new task | |
std::list<Task> my_tasks; | |
if ( !tasks.empty() ) { | |
// move the first element of tasks to my_tasks | |
my_tasks.splice(my_tasks.begin(), tasks, tasks.begin()); | |
} else if ( abort_flag ) { | |
break; // breaks the while ( true ) loop | |
} else { // tasks is emty, but abort_flag is false: wait for more tasks | |
task_cv.wait(task_lock); | |
} | |
// first release the mutex | |
task_lock.unlock(); | |
// then execute acquired tasks in my_tasks | |
for ( auto & task : my_tasks ) { | |
task(); | |
// signal that a task has been completed | |
std::unique_lock<std::mutex> count_lock(count_mut); | |
--count; | |
count_cv.notify_all(); // sync() might be waiting | |
} | |
} // while ( true ) loop | |
} |
Example: drawing the Mandelbrot set in parallel
To give a nice example of how the
WorkerPool
should be used, I
modified some code from
Farouk Ounane
for drawing the Mandelbrot set in C++. Each Task
determines if a
point in the complex plane is part of the Mandelbrot set or not. We use a
closure (lambda expression) to define these tasks, and pass them to the
WorkerPool. The result is shown above. To compile the C++ program with
gcc
, put the files workerpool.cpp
,
workerpool.hpp
and mandelbrot.cpp
in the same
directory, and execute from the terminal
g++ workerpool.cpp mandelbrot.cpp -o mandelbrot -pthread -std=c++11
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* Credits for drawing the Mandelbrot set in C++ go to | |
* Farouk Ounane (https://github.com/ChinksofLight/mandelbrot_cpp) | |
*/ | |
#include <iostream> | |
#include <fstream> // write to file | |
#include <complex> // complex numbers | |
#include <cmath> // pow | |
#include "workerpool.hpp" | |
constexpr int width = 1200; | |
constexpr int height = 1200; | |
constexpr int max_iter = 1000; | |
constexpr int col_depth = 255; | |
int main() { | |
// initiate a worker pool | |
WorkerPool wp(8); // Use 8 threads | |
// make an array of values | |
int vals[width][height]; | |
std::cout << "passing tasks to worker pool..." << std::endl; | |
for ( int i = 0; i < width; ++i ) { | |
for ( int j = 0; j < height; ++j ) { | |
/* use a closure to define the WorkerPool::Task object | |
* the vals array is captured by reference. | |
* NB: we have to capture i and j by value! | |
*/ | |
wp.push_back([=,&vals](){ | |
double x = double(i)/width-1.5; | |
double y = double(j)/height-0.5; | |
std::complex<double> point(x, y); | |
// determine if point is in the Mandelbrot set... | |
std::complex<double> z(0, 0); | |
int nb_iter = 0; | |
while ( std::abs(z) < 2 && nb_iter <= max_iter) { | |
z = z * z + point; | |
nb_iter++; | |
} | |
if ( nb_iter < max_iter ) { | |
// determine color gradient | |
double frac = double(nb_iter) / max_iter; | |
vals[i][j] = int(col_depth * pow(frac, 0.25)); | |
} else { | |
vals[i][j] = 0; | |
} | |
}); | |
} | |
} | |
std::cout << "waiting for workers..." << std::endl; | |
/* before we sync the worker pool, the values in vals | |
* are invalid. accessing them would lead to data races. | |
*/ | |
wp.sync(); | |
/* now all values have been computed and stored in vals. | |
* The WorkerPool is still waiting for new tasks. | |
* For example, we could use it to write the data | |
* to a file in the background. | |
*/ | |
wp.push_back([&](){ | |
std::ofstream image_file("mandelbrot.ppm"); | |
if ( !image_file.is_open() ) { | |
std::cerr << "could not open the image file" << std::endl; | |
return; | |
} // else | |
image_file << "P3\n" << width << " " << height << " " | |
<< col_depth << "\n"; | |
for ( int j = 0; j < height; j++ ) { | |
for ( int i = 0; i < width; i++ ) { | |
int val = vals[i][j]; | |
image_file << 0 << " " << val << " " << val << "\n"; | |
} | |
} | |
image_file.close(); | |
}); | |
std::cout << "writing results to file..." << std::endl; | |
// before we return, we have to wait for the writing task to be completed | |
wp.sync(); | |
return 0; | |
} |