Concurrency can be extremely complicated, and causes problems that will
haunt you in your dreams.
The classical libraries in C/C++ don't protect you from this horror in
any way, and you will have to figure it out yourself. Parallelism is
supposed to be a lot easier, but C/C++ does not have standard libraries
like---for instance---Pythons parallelism modules. The boost libraries, however, provide an extensive interface for concurrency and parallelism.
If you don't want to use boost, don't panic. There are other options. The POSIX pthread library provides a down-to-the-metal concurrency model. It took me a while to find out what it is all about, and I haven't successfully applied all it's possibilities. What I have managed to apply, is a so-called "worker-pool" model. This is one of the easier concurrency applications (it falls under the parallelism category) of the pthread library, but can be quite useful. Here I will demonstrate a "wrapper" that C++ classes can inherit.
Suppose that you have a class Fun, that needs to do some computations. We declare Fun in---say---Fun.hpp:
Fun inherits the worker pool functionality from the class "WorkerPoolWrapper" (imported from WorkerPoolWrapper.hpp).
The class WorkerPoolWrapper has a "pure virtual" member executeJob(void* ).
You, the user, must specify what you want to compute in your own definition of the executeJob method.
Besides implementing executeJob, you must also initiate the worker pool, and somewhere before Fun's destuctor returns, the threads must be "joined", i.e., all worker threads must finish their execution.
In this case, I use the constructor and destructor of Fun to accomplish these things:
The methods initWorkerPool and waitForWorkerPoolToExit are inherited from WorkerPoolWrapper. Lets use Fun to compute the number of primes pi(n) below a number n. We overload operator() as follows:
Notice that this implementation of pi(n) is not the most efficient one. It checks for every integer i between 0 and n whether i is prime or not. This prime test is performed by executeJob.
In the background, WorkerPoolWrapper has a list of jobs that have to be executed. Jobs can be added to this job list using addNewJob(void* ). Once executed, the result of a job must somehow be stored in the job again. Above, the number pi is the sum of the array ns, which makes sense when we look at the implementation of executeJob:
Hence, executeJob transforms the number i pointed to by job into 0 if i is composit, or 1 if i is prime, such that the sum of the i's equals pi(n).
Before we gathered the results in Fun::operator(), we called syncWorkerThreads(). This method lets the program halt until every job in the job list has been executed.
Using the functionoid Fun now works as follows:
The class WorkerPoolWrapper is declared here:
And the members are defined here:
The credits for combining pthread with C++ classes go to Jeremy Friesner.
If you don't want to use boost, don't panic. There are other options. The POSIX pthread library provides a down-to-the-metal concurrency model. It took me a while to find out what it is all about, and I haven't successfully applied all it's possibilities. What I have managed to apply, is a so-called "worker-pool" model. This is one of the easier concurrency applications (it falls under the parallelism category) of the pthread library, but can be quite useful. Here I will demonstrate a "wrapper" that C++ classes can inherit.
Suppose that you have a class Fun, that needs to do some computations. We declare Fun in---say---Fun.hpp:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Fun.hpp | |
#include "WorkerPoolWrapper.hpp" | |
class Fun : public WorkerPoolWrapper { | |
public: | |
Fun(int ); // constructor, pass the number of threads that you want to use | |
~Fun(); // destructor | |
int operator()(int ); // suppose that this class is used for a functionoid | |
// other public members and methods... | |
protected: | |
void executeJob(void* ); // pure virtual function in the base class | |
// other protected members and methods... | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Fun.cpp | |
#include "Fun.hpp" | |
Fun::Fun(int w) { initWorkerPool(w); } // upon construction, Fun initiates the worker pool with w workers | |
Fun::~Fun() { waitForWorkerPoolToExit(); } // upon destruction, Fun joins the threads | |
// ... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// ... Fun.cpp continued | |
int Fun::operator()(int n) { | |
int* ns = new int[n]; // make array with integers 1,...,n | |
for ( int i = 0; i < n; ++i ) ns[i] = i+1; | |
for ( int* job = ns; job < ns+n; ++job ) addNewJob((void*) job); // add the integers as jobs to the job que | |
syncWorkerThreads(); // wait for all jobs to be completed | |
int pi = 0; // gather (transformed) data and compute final result | |
for ( int i = 0; i < n; ++i ) pi += ns[i]; | |
delete[] ns; // free allocated space | |
return pi; // return answer | |
} | |
// ... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// ... Fun.cpp continued | |
void Fun::executeJob(void* job) { | |
int* i = (int*) job; // cast the job back to int* | |
bool prime = true; | |
int d = 2; | |
while ( prime && d != (*i) ) prime = (*i) % (d++); // naive prime test | |
*i = prime; // store the result in the job | |
} |
Using the functionoid Fun now works as follows:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// main.cpp | |
// ... | |
Fun primePi(10); // uses 10 cores! | |
int pi = primePi(144169); // equals 13355 | |
// ... when primePi gets out of scope, the threads are joined |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef WORKERPOOLWRAPPER_HPP | |
#define WORKERPOOLWRAPPER_HPP | |
#include <pthread.h> | |
#include <list> | |
class WorkerPoolWrapper { | |
public: | |
/* constuctor */ | |
WorkerPoolWrapper(); | |
/* destructor */ | |
virtual ~WorkerPoolWrapper(); | |
/* initiate private members and set the number of workers. | |
* Returns true if the threads were successfully started, | |
* false if there was an error starting a thread | |
*/ | |
bool initWorkerPool(int ); | |
/* wait for all workers to finish. First call sendNoMoreJobsSignal | |
* to make sure that they do. | |
*/ | |
void waitForWorkerPoolToExit(); | |
/* Wait for all Workers to finish their jobs. The Workers | |
* will then be waiting for new jobs. (So no joining...) | |
*/ | |
void syncWorkerThreads(); | |
/* Send the worker pools a signal that there will be no more jobs. | |
* This will make them exit their WorkerThreadEntry function. | |
*/ | |
void sendNoMoreJobsSignal(); | |
/* add a new job to the list of jobs. | |
*/ | |
void addNewJob(void* ); | |
protected: | |
/* the member executeJob needs to be implemented by the class that | |
* inherits this class. It should do the actual work that needs | |
* to be done in parallel. | |
*/ | |
virtual void executeJob(void* )=0; | |
private: | |
/* pthread is a C library, and does not know about classes. Therefore | |
* we need the following construction | |
*/ | |
void workerThreadEntry(); | |
static void* workerThreadEntryFunc(void* ); | |
/* mutex and conditional variable for the job list */ | |
pthread_mutex_t jobsMutex; | |
pthread_cond_t jobsConditionalVar; | |
/* mutex and conditional variable for synchronizing */ | |
pthread_mutex_t jobsyncMutex; | |
pthread_cond_t jobsyncConditionalVar; | |
/* the number of jobs and the job list */ | |
volatile int jobsyncCounter; | |
std::list<void*> jobs; | |
/* a flag for terminating the threads */ | |
volatile bool noMoreJobsFlag; | |
/* an array of worker threads */ | |
pthread_t* workerThreads; | |
pthread_attr_t attr; | |
int numOfWorkerThreads; | |
}; | |
#endif |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "WorkerPoolWrapper.hpp" | |
WorkerPoolWrapper::WorkerPoolWrapper() { | |
numOfWorkerThreads = 0; | |
noMoreJobsFlag = true; | |
jobsyncCounter = 0; | |
workerThreads = NULL; | |
} | |
bool WorkerPoolWrapper::initWorkerPool(int numOfWorkerThreads) { | |
this->numOfWorkerThreads = numOfWorkerThreads; | |
workerThreads = new pthread_t[numOfWorkerThreads]; | |
noMoreJobsFlag = false; | |
jobsyncCounter = 0; | |
pthread_attr_init(&attr); | |
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); | |
pthread_mutex_init(&jobsMutex,NULL); | |
pthread_mutex_init(&jobsyncMutex,NULL); | |
pthread_cond_init(&jobsConditionalVar,NULL); | |
pthread_cond_init(&jobsyncConditionalVar,NULL); | |
if ( numOfWorkerThreads < 1 ) return false; | |
else { | |
bool ok = true; | |
for ( int i = 0; i < numOfWorkerThreads; i++ ) { | |
ok = ok && ( pthread_create(workerThreads+i, &attr, workerThreadEntryFunc, this) == 0 ); | |
} | |
return ok; | |
} | |
} | |
WorkerPoolWrapper::~WorkerPoolWrapper() { | |
delete[] workerThreads; // assumes that the threads have been joined! | |
pthread_attr_destroy(&attr); | |
pthread_mutex_destroy(&jobsMutex); | |
pthread_mutex_destroy(&jobsyncMutex); | |
pthread_cond_destroy(&jobsConditionalVar); | |
pthread_cond_destroy(&jobsyncConditionalVar); | |
} | |
void WorkerPoolWrapper::waitForWorkerPoolToExit() { | |
sendNoMoreJobsSignal(); // the user could have given this signal manually | |
for ( int i = 0; i < numOfWorkerThreads; i++ ) { | |
pthread_join(workerThreads[i], NULL); | |
} | |
} | |
void WorkerPoolWrapper::syncWorkerThreads() { | |
pthread_mutex_lock(&jobsyncMutex); | |
while ( jobsyncCounter > 0 ) { | |
pthread_cond_wait(&jobsyncConditionalVar, &jobsyncMutex); | |
} | |
pthread_mutex_unlock(&jobsyncMutex); | |
} | |
void WorkerPoolWrapper::sendNoMoreJobsSignal() { | |
pthread_mutex_lock(&jobsMutex); | |
noMoreJobsFlag = true; | |
pthread_cond_broadcast(&jobsConditionalVar); // wake all waiting threads | |
pthread_mutex_unlock(&jobsMutex); | |
} | |
void WorkerPoolWrapper::addNewJob(void* job) { | |
pthread_mutex_lock(&jobsyncMutex); | |
jobsyncCounter++; | |
pthread_mutex_unlock(&jobsyncMutex); | |
pthread_mutex_lock(&jobsMutex); | |
jobs.push_back(job); | |
pthread_cond_signal(&jobsConditionalVar); // wake a waiting thread | |
pthread_mutex_unlock(&jobsMutex); | |
} | |
void* WorkerPoolWrapper::workerThreadEntryFunc(void* thisObject) { | |
((WorkerPoolWrapper*) thisObject)->workerThreadEntry(); | |
return NULL; | |
} | |
void WorkerPoolWrapper::workerThreadEntry() { | |
void* myJob = NULL; | |
bool lookForAJob = true; | |
while ( lookForAJob ) { | |
if ( myJob != NULL ) { | |
executeJob(myJob); // execute the job | |
myJob = NULL; // reset my job | |
pthread_mutex_lock(&jobsyncMutex); | |
jobsyncCounter--; | |
pthread_cond_signal(&jobsyncConditionalVar); | |
pthread_mutex_unlock(&jobsyncMutex); | |
} | |
else { | |
pthread_mutex_lock(&jobsMutex); | |
if ( jobs.empty() ) { | |
if ( noMoreJobsFlag ) lookForAJob = false; | |
else { | |
pthread_cond_wait(&jobsConditionalVar, &jobsMutex); | |
if ( !jobs.empty() ) { | |
myJob = jobs.front(); | |
jobs.pop_front(); | |
} | |
} | |
} | |
else { | |
myJob = jobs.front(); | |
jobs.pop_front(); | |
} | |
pthread_mutex_unlock(&jobsMutex); | |
} | |
} // while ( lookForAJob ) | |
} |