Sunday, 21 June 2020

A C++ worker pool using thread and functional

To celebrate the fifth anniversary of the TBZ533 blog, I decided the revisit the first post. This was about a class that could be inherited to provide worker pool functionality, implemented with the pthread library and function pointers. Using a bit of modern C++, the implementation of a simple worker pool becomes much more elegant.

In the header file workerpool.hpp, we declare the WorkerPool class. The argument of the constructor is the number of workers. If the number of workers is zero, all tasks are executed serially (which can be convenient for debugging). A WorkerPool can not be copied, so we disable the copy constructor and copy assignment constructor. A Task is defined as a function that takes no arguments and returns nothing. A Task can be passed to the worker pool using the push_back() method (which name is inspired by std::list::push_back()). The final public method is the sync() method that waits for all tasks to be completed.

The private methods and members of WorkerPool are a vector of worker threads, a list of (unfinished) tasks, the worker routine, and some synchronization objects. The boolean abort_flag is used to get the workers to exit their worker routines. The count integer represents the number of unfinished tasks. Tasks are guarded by the task_mut mutex, and count by the count_mut mutex. The task_cv condition variable is used to signal that a new Task is added to the list of tasks, while the count_cv is used to signal that another task has been completed.

#ifndef WORKERPOOL_HPP_
#define WORKERPOOL_HPP_
#include <thread>
#include <mutex>
#include <condition_variable>
#include <list>
#include <vector>
#include <functional>
class WorkerPool {
public:
WorkerPool(size_t num_workers);
~WorkerPool();
/// disable copy contructor
WorkerPool(const WorkerPool & wp) = delete;
WorkerPool& operator=(const WorkerPool & wp) = delete;
typedef std::function<void(void)> Task;
void push_back(Task task);
void sync();
private:
std::vector<std::thread> workers;
std::list<Task> tasks;
bool abort_flag;
std::mutex task_mut;
std::condition_variable task_cv;
int count;
std::mutex count_mut;
std::condition_variable count_cv;
void worker_routine();
};
#endif
view raw workerpool.hpp hosted with ❤ by GitHub


The source file contains the definitions of the methods of WorkerPool. The constructor initiates a number of worker threads and starts their worker_routine(). The descructor makes sure that all threads exit their worker routine and joins the threads. This means that the WorkerPool uses the RAII technique. The push_back() method evaluates the Task object if there are no workers, and otherwise adds the task to the tasks list, after increasing the count. We signal that a task has been added in case a worker is sleeping. The sync() method only returns when count == 0. Finally, the most important part of the code is the worker_routine(). When the list tasks is empty, a worker waits until a signal is given that a new tasks is added, or the abort_flag is set. In the first case, the task is executed, in the second case, the worker exits the worker_routine().

#include "workerpool.hpp"
WorkerPool::WorkerPool(size_t num_workers) {
workers.resize(num_workers);
abort_flag = false;
count = 0;
for ( auto & worker : workers ) {
worker = std::thread([this](){worker_routine();});
}
}
WorkerPool::~WorkerPool() {
std::unique_lock<std::mutex> task_lock(task_mut);
abort_flag = true;
task_cv.notify_all();
// release the mutex
task_lock.unlock();
// wait for all workers to exit
for ( auto & worker : workers ) {
worker.join();
}
}
void WorkerPool::push_back(Task task) {
// if there are ZERO workers, just execute the task and return.
if ( workers.empty() ) {
task(); return;
} // else...
// first increase the task count
std::unique_lock<std::mutex> count_lock(count_mut);
++count;
count_lock.unlock();
// then add task to list
std::unique_lock<std::mutex> task_lock(task_mut);
tasks.push_back(task);
// and wake up a potential sleeping worker
task_cv.notify_one();
}
void WorkerPool::sync() {
std::unique_lock<std::mutex> count_lock(count_mut);
count_cv.wait(count_lock, [this](){return count == 0;});
}
void WorkerPool::worker_routine() {
while ( true ) {
// try to lock the task mutex
std::unique_lock<std::mutex> task_lock(task_mut);
// task_mut is locked: try to get a new task
std::list<Task> my_tasks;
if ( !tasks.empty() ) {
// move the first element of tasks to my_tasks
my_tasks.splice(my_tasks.begin(), tasks, tasks.begin());
} else if ( abort_flag ) {
break; // breaks the while ( true ) loop
} else { // tasks is emty, but abort_flag is false: wait for more tasks
task_cv.wait(task_lock);
}
// first release the mutex
task_lock.unlock();
// then execute acquired tasks in my_tasks
for ( auto & task : my_tasks ) {
task();
// signal that a task has been completed
std::unique_lock<std::mutex> count_lock(count_mut);
--count;
count_cv.notify_all(); // sync() might be waiting
}
} // while ( true ) loop
}
view raw workerpool.cpp hosted with ❤ by GitHub


Example: drawing the Mandelbrot set in parallel



To give a nice example of how the WorkerPool should be used, I modified some code from Farouk Ounane for drawing the Mandelbrot set in C++. Each Task determines if a point in the complex plane is part of the Mandelbrot set or not. We use a closure (lambda expression) to define these tasks, and pass them to the WorkerPool. The result is shown above. To compile the C++ program with gcc, put the files workerpool.cpp, workerpool.hpp and mandelbrot.cpp in the same directory, and execute from the terminal
  g++ workerpool.cpp mandelbrot.cpp -o mandelbrot -pthread -std=c++11
  

/* Credits for drawing the Mandelbrot set in C++ go to
* Farouk Ounane (https://github.com/ChinksofLight/mandelbrot_cpp)
*/
#include <iostream>
#include <fstream> // write to file
#include <complex> // complex numbers
#include <cmath> // pow
#include "workerpool.hpp"
constexpr int width = 1200;
constexpr int height = 1200;
constexpr int max_iter = 1000;
constexpr int col_depth = 255;
int main() {
// initiate a worker pool
WorkerPool wp(8); // Use 8 threads
// make an array of values
int vals[width][height];
std::cout << "passing tasks to worker pool..." << std::endl;
for ( int i = 0; i < width; ++i ) {
for ( int j = 0; j < height; ++j ) {
/* use a closure to define the WorkerPool::Task object
* the vals array is captured by reference.
* NB: we have to capture i and j by value!
*/
wp.push_back([=,&vals](){
double x = double(i)/width-1.5;
double y = double(j)/height-0.5;
std::complex<double> point(x, y);
// determine if point is in the Mandelbrot set...
std::complex<double> z(0, 0);
int nb_iter = 0;
while ( std::abs(z) < 2 && nb_iter <= max_iter) {
z = z * z + point;
nb_iter++;
}
if ( nb_iter < max_iter ) {
// determine color gradient
double frac = double(nb_iter) / max_iter;
vals[i][j] = int(col_depth * pow(frac, 0.25));
} else {
vals[i][j] = 0;
}
});
}
}
std::cout << "waiting for workers..." << std::endl;
/* before we sync the worker pool, the values in vals
* are invalid. accessing them would lead to data races.
*/
wp.sync();
/* now all values have been computed and stored in vals.
* The WorkerPool is still waiting for new tasks.
* For example, we could use it to write the data
* to a file in the background.
*/
wp.push_back([&](){
std::ofstream image_file("mandelbrot.ppm");
if ( !image_file.is_open() ) {
std::cerr << "could not open the image file" << std::endl;
return;
} // else
image_file << "P3\n" << width << " " << height << " "
<< col_depth << "\n";
for ( int j = 0; j < height; j++ ) {
for ( int i = 0; i < width; i++ ) {
int val = vals[i][j];
image_file << 0 << " " << val << " " << val << "\n";
}
}
image_file.close();
});
std::cout << "writing results to file..." << std::endl;
// before we return, we have to wait for the writing task to be completed
wp.sync();
return 0;
}
view raw mandelbrot.cpp hosted with ❤ by GitHub