Friday, 26 June 2015

A simple class wrapper for parallelism in C++

Concurrency can be extremely complicated, and causes problems that will haunt you in your dreams. The classical libraries in C/C++ don't protect you from this horror in any way, and you will have to figure it out yourself. Parallelism is supposed to be a lot easier, but C/C++ does not have standard libraries like---for instance---Pythons parallelism modules. The boost libraries, however, provide an extensive interface for concurrency and parallelism.
If you don't want to use boost, don't panic. There are other options. The POSIX pthread library provides a down-to-the-metal concurrency model. It took me a while to find out what it is all about, and I haven't successfully applied all it's possibilities. What I have managed to apply, is a so-called "worker-pool" model. This is one of the easier concurrency applications (it falls under the parallelism category) of the pthread library, but can be quite useful. Here I will demonstrate a "wrapper" that C++ classes can inherit.

Suppose that you have a class Fun, that needs to do some computations. We declare Fun in---say---Fun.hpp:
// Fun.hpp
#include "WorkerPoolWrapper.hpp"
class Fun : public WorkerPoolWrapper {
public:
Fun(int ); // constructor, pass the number of threads that you want to use
~Fun(); // destructor
int operator()(int ); // suppose that this class is used for a functionoid
// other public members and methods...
protected:
void executeJob(void* ); // pure virtual function in the base class
// other protected members and methods...
};
view raw Fun.hpp hosted with ❤ by GitHub
Fun inherits the worker pool functionality from the class "WorkerPoolWrapper" (imported from WorkerPoolWrapper.hpp). The class WorkerPoolWrapper has a "pure virtual" member executeJob(void* ). You, the user, must specify what you want to compute in your own definition of the executeJob method. Besides implementing executeJob, you must also initiate the worker pool, and somewhere before Fun's destuctor returns, the threads must be "joined", i.e., all worker threads must finish their execution. In this case, I use the constructor and destructor of Fun to accomplish these things:
// Fun.cpp
#include "Fun.hpp"
Fun::Fun(int w) { initWorkerPool(w); } // upon construction, Fun initiates the worker pool with w workers
Fun::~Fun() { waitForWorkerPoolToExit(); } // upon destruction, Fun joins the threads
// ...
view raw Fun_part1.cpp hosted with ❤ by GitHub
The methods initWorkerPool and waitForWorkerPoolToExit are inherited from WorkerPoolWrapper. Lets use Fun to compute the number of primes pi(n) below a number n. We overload operator() as follows:
// ... Fun.cpp continued
int Fun::operator()(int n) {
int* ns = new int[n]; // make array with integers 1,...,n
for ( int i = 0; i < n; ++i ) ns[i] = i+1;
for ( int* job = ns; job < ns+n; ++job ) addNewJob((void*) job); // add the integers as jobs to the job que
syncWorkerThreads(); // wait for all jobs to be completed
int pi = 0; // gather (transformed) data and compute final result
for ( int i = 0; i < n; ++i ) pi += ns[i];
delete[] ns; // free allocated space
return pi; // return answer
}
// ...
view raw Fun_part2.cpp hosted with ❤ by GitHub
Notice that this implementation of pi(n) is not the most efficient one. It checks for every integer i between 0 and n whether i is prime or not. This prime test is performed by executeJob. In the background, WorkerPoolWrapper has a list of jobs that have to be executed. Jobs can be added to this job list using addNewJob(void* ). Once executed, the result of a job must somehow be stored in the job again. Above, the number pi is the sum of the array ns, which makes sense when we look at the implementation of executeJob:
// ... Fun.cpp continued
void Fun::executeJob(void* job) {
int* i = (int*) job; // cast the job back to int*
bool prime = true;
int d = 2;
while ( prime && d != (*i) ) prime = (*i) % (d++); // naive prime test
*i = prime; // store the result in the job
}
view raw Fun_part3.cpp hosted with ❤ by GitHub
Hence, executeJob transforms the number i pointed to by job into 0 if i is composit, or 1 if i is prime, such that the sum of the i's equals pi(n). Before we gathered the results in Fun::operator(), we called syncWorkerThreads(). This method lets the program halt until every job in the job list has been executed.
Using the functionoid Fun now works as follows:
// main.cpp
// ...
Fun primePi(10); // uses 10 cores!
int pi = primePi(144169); // equals 13355
// ... when primePi gets out of scope, the threads are joined
The class WorkerPoolWrapper is declared here:
#ifndef WORKERPOOLWRAPPER_HPP
#define WORKERPOOLWRAPPER_HPP
#include <pthread.h>
#include <list>
class WorkerPoolWrapper {
public:
/* constuctor */
WorkerPoolWrapper();
/* destructor */
virtual ~WorkerPoolWrapper();
/* initiate private members and set the number of workers.
* Returns true if the threads were successfully started,
* false if there was an error starting a thread
*/
bool initWorkerPool(int );
/* wait for all workers to finish. First call sendNoMoreJobsSignal
* to make sure that they do.
*/
void waitForWorkerPoolToExit();
/* Wait for all Workers to finish their jobs. The Workers
* will then be waiting for new jobs. (So no joining...)
*/
void syncWorkerThreads();
/* Send the worker pools a signal that there will be no more jobs.
* This will make them exit their WorkerThreadEntry function.
*/
void sendNoMoreJobsSignal();
/* add a new job to the list of jobs.
*/
void addNewJob(void* );
protected:
/* the member executeJob needs to be implemented by the class that
* inherits this class. It should do the actual work that needs
* to be done in parallel.
*/
virtual void executeJob(void* )=0;
private:
/* pthread is a C library, and does not know about classes. Therefore
* we need the following construction
*/
void workerThreadEntry();
static void* workerThreadEntryFunc(void* );
/* mutex and conditional variable for the job list */
pthread_mutex_t jobsMutex;
pthread_cond_t jobsConditionalVar;
/* mutex and conditional variable for synchronizing */
pthread_mutex_t jobsyncMutex;
pthread_cond_t jobsyncConditionalVar;
/* the number of jobs and the job list */
volatile int jobsyncCounter;
std::list<void*> jobs;
/* a flag for terminating the threads */
volatile bool noMoreJobsFlag;
/* an array of worker threads */
pthread_t* workerThreads;
pthread_attr_t attr;
int numOfWorkerThreads;
};
#endif
And the members are defined here:
#include "WorkerPoolWrapper.hpp"
WorkerPoolWrapper::WorkerPoolWrapper() {
numOfWorkerThreads = 0;
noMoreJobsFlag = true;
jobsyncCounter = 0;
workerThreads = NULL;
}
bool WorkerPoolWrapper::initWorkerPool(int numOfWorkerThreads) {
this->numOfWorkerThreads = numOfWorkerThreads;
workerThreads = new pthread_t[numOfWorkerThreads];
noMoreJobsFlag = false;
jobsyncCounter = 0;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_mutex_init(&jobsMutex,NULL);
pthread_mutex_init(&jobsyncMutex,NULL);
pthread_cond_init(&jobsConditionalVar,NULL);
pthread_cond_init(&jobsyncConditionalVar,NULL);
if ( numOfWorkerThreads < 1 ) return false;
else {
bool ok = true;
for ( int i = 0; i < numOfWorkerThreads; i++ ) {
ok = ok && ( pthread_create(workerThreads+i, &attr, workerThreadEntryFunc, this) == 0 );
}
return ok;
}
}
WorkerPoolWrapper::~WorkerPoolWrapper() {
delete[] workerThreads; // assumes that the threads have been joined!
pthread_attr_destroy(&attr);
pthread_mutex_destroy(&jobsMutex);
pthread_mutex_destroy(&jobsyncMutex);
pthread_cond_destroy(&jobsConditionalVar);
pthread_cond_destroy(&jobsyncConditionalVar);
}
void WorkerPoolWrapper::waitForWorkerPoolToExit() {
sendNoMoreJobsSignal(); // the user could have given this signal manually
for ( int i = 0; i < numOfWorkerThreads; i++ ) {
pthread_join(workerThreads[i], NULL);
}
}
void WorkerPoolWrapper::syncWorkerThreads() {
pthread_mutex_lock(&jobsyncMutex);
while ( jobsyncCounter > 0 ) {
pthread_cond_wait(&jobsyncConditionalVar, &jobsyncMutex);
}
pthread_mutex_unlock(&jobsyncMutex);
}
void WorkerPoolWrapper::sendNoMoreJobsSignal() {
pthread_mutex_lock(&jobsMutex);
noMoreJobsFlag = true;
pthread_cond_broadcast(&jobsConditionalVar); // wake all waiting threads
pthread_mutex_unlock(&jobsMutex);
}
void WorkerPoolWrapper::addNewJob(void* job) {
pthread_mutex_lock(&jobsyncMutex);
jobsyncCounter++;
pthread_mutex_unlock(&jobsyncMutex);
pthread_mutex_lock(&jobsMutex);
jobs.push_back(job);
pthread_cond_signal(&jobsConditionalVar); // wake a waiting thread
pthread_mutex_unlock(&jobsMutex);
}
void* WorkerPoolWrapper::workerThreadEntryFunc(void* thisObject) {
((WorkerPoolWrapper*) thisObject)->workerThreadEntry();
return NULL;
}
void WorkerPoolWrapper::workerThreadEntry() {
void* myJob = NULL;
bool lookForAJob = true;
while ( lookForAJob ) {
if ( myJob != NULL ) {
executeJob(myJob); // execute the job
myJob = NULL; // reset my job
pthread_mutex_lock(&jobsyncMutex);
jobsyncCounter--;
pthread_cond_signal(&jobsyncConditionalVar);
pthread_mutex_unlock(&jobsyncMutex);
}
else {
pthread_mutex_lock(&jobsMutex);
if ( jobs.empty() ) {
if ( noMoreJobsFlag ) lookForAJob = false;
else {
pthread_cond_wait(&jobsConditionalVar, &jobsMutex);
if ( !jobs.empty() ) {
myJob = jobs.front();
jobs.pop_front();
}
}
}
else {
myJob = jobs.front();
jobs.pop_front();
}
pthread_mutex_unlock(&jobsMutex);
}
} // while ( lookForAJob )
}
The credits for combining pthread with C++ classes go to Jeremy Friesner.