-
-
Save maxthinkthink-tech/8fc52d106525ce966651499946cd9f6f to your computer and use it in GitHub Desktop.
Revisions
-
jl2 revised this gist
Mar 27, 2011 . 1 changed file with 77 additions and 6 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -7,6 +7,9 @@ #include <stdlib.h> // Base task for Tasks // run() should be overloaded and expensive calculations done there // showTask() is for debugging and can be deleted if not used class Task { public: Task() {} @@ -15,47 +18,75 @@ class Task { virtual void showTask()=0; }; // Wrapper around std::queue with some mutex protection class WorkQueue { public: WorkQueue() { // Initialize the mutex protecting the queue pthread_mutex_init(&qmtx,0); // wcond is a condition variable that's signaled // when new work arrives pthread_cond_init(&wcond, 0); } ~WorkQueue() { // Cleanup pthreads pthread_mutex_destroy(&qmtx); pthread_cond_destroy(&wcond); } // Retrieves the next task from the queue Task *nextTask() { // The return value Task *nt = 0; // Lock the queue mutex pthread_mutex_lock(&qmtx); // Check if there's work if (finished && tasks.size() == 0) { // If not return null (0) nt = 0; } else { // Not finished, but there are no tasks, so wait for // wcond to be signalled if (tasks.size()==0) { pthread_cond_wait(&wcond, &qmtx); } // get the next task nt = tasks.front(); tasks.pop(); // For debugging if (nt) nt->showTask(); } // Unlock the mutex and return pthread_mutex_unlock(&qmtx); return nt; } // Add a task void addTask(Task *nt) { // Only add the task if the queue isn't marked finished if (!finished) { // Lock the queue pthread_mutex_lock(&qmtx); // Add the task tasks.push(nt); // signal there's new work pthread_cond_signal(&wcond); // Unlock the mutex pthread_mutex_unlock(&qmtx); } } // Mark the queue finished void finish() { pthread_mutex_lock(&qmtx); finished = true; // Signal the condition variable in case any threads are waiting pthread_cond_signal(&wcond); pthread_mutex_unlock(&qmtx); } // Check if there's work bool hasWork() { return (tasks.size()>0); } @@ -67,6 +98,7 @@ class WorkQueue { pthread_cond_t wcond; }; // Function that retrieves a task from a queue, runs it and deletes it static void *getWork(void* param) { Task *mw = 0; WorkQueue *wq = (WorkQueue*)param; @@ -79,14 +111,17 @@ static void *getWork(void* param) { class ThreadPool { public: // Allocate a thread pool and set them to work trying to get tasks ThreadPool(int n) : _numThreads(n) { printf("Creating a thread pool with %d threads\n", n); threads = new pthread_t[n]; for (int i=0; i< n; ++i) { pthread_create(&(threads[i]), 0, getWork, &workQueue); } } // Wait for the threads to finish, then delete them ~ThreadPool() { workQueue.finish(); waitForCompletion(); @@ -95,16 +130,21 @@ class ThreadPool { } delete [] threads; } // Add a task void addTask(Task *nt) { workQueue.addTask(nt); } // Tell the tasks to finish and return void finish() { workQueue.finish(); } // Checks if there is work to do bool hasWork() { return workQueue.hasWork(); } // Super inefficient way to wait for all tasks to finish void waitForCompletion() { while (workQueue.hasWork()) {} } @@ -115,33 +155,56 @@ class ThreadPool { WorkQueue workQueue; }; // stdout is a shared resource, so protected it with a mutex static pthread_mutex_t console_mutex = PTHREAD_MUTEX_INITIALIZER; // Debugging function void showTask(int n) { pthread_mutex_lock(&console_mutex); printf("Adding fib(%d)\n", n); pthread_mutex_unlock(&console_mutex); } // Task to compute fibonacci numbers // It's more efficient to use an iterative algorithm, but // the recursive algorithm takes longer and is more interesting // than sleeping for X seconds to show parrallelism class FibTask : public Task { public: FibTask(int n) : Task(), _n(n) {} ~FibTask() { // Debug prints pthread_mutex_lock(&console_mutex); printf("tid(%d) - fibd(%d) being deleted\n", pthread_self(), _n); pthread_mutex_unlock(&console_mutex); } virtual void run() { // Note: it's important that this isn't contained in the console mutex lock long long val = innerFib(_n); // Show results pthread_mutex_lock(&console_mutex); printf("Fibd %d = %lld\n",_n, val); pthread_mutex_unlock(&console_mutex); // The following won't work in parrallel: // pthread_mutex_lock(&console_mutex); // printf("Fibd %d = %lld\n",_n, innerFib(_n)); // pthread_mutex_unlock(&console_mutex); } virtual void showTask() { // More debug printing pthread_mutex_lock(&console_mutex); printf("thread %d computing fibonacci %d\n", pthread_self(), _n); pthread_mutex_unlock(&console_mutex); } private: // Slow computation of fibonacci sequence // To make things interesting, and perhaps imporove load balancing, these // inner computations could be added to the task queue // Ideally set a lower limit on when that's done // (i.e. don't create a task for fib(2)) because thread overhead makes it // not worth it long long innerFib(long long n) { if (n<=1) { return 1; } return innerFib(n-1) + innerFib(n-2); @@ -150,13 +213,21 @@ class FibTask : public Task { }; int main(int argc, char *argv[]) { // Create a thread pool ThreadPool *tp = new ThreadPool(12); // Create work for it for (int i=0;i<100; ++i) { int rv = rand() % 50 + 1; showTask(rv); tp->addTask(new FibTask(rv)); } // Finish up tp->finish(); // Delete it delete tp; printf("Done with all work!\n"); } -
jl2 revised this gist
Mar 27, 2011 . 1 changed file with 18 additions and 15 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,16 +1,18 @@ #include <stdio.h> #include <queue> #include <unistd.h> #include <pthread.h> #include <stdlib.h> class Task { public: Task() {} virtual ~Task() {} virtual void run()=0; virtual void showTask()=0; }; class WorkQueue { @@ -33,6 +35,7 @@ class WorkQueue { pthread_cond_wait(&wcond, &qmtx); } nt = tasks.front(); if (nt) nt->showTask(); tasks.pop(); } pthread_mutex_unlock(&qmtx); @@ -52,6 +55,7 @@ class WorkQueue { pthread_cond_signal(&wcond); pthread_mutex_unlock(&qmtx); } bool hasWork() { return (tasks.size()>0); } @@ -67,7 +71,6 @@ static void *getWork(void* param) { Task *mw = 0; WorkQueue *wq = (WorkQueue*)param; while (mw = wq->nextTask()) { mw->run(); delete mw; } @@ -86,6 +89,7 @@ class ThreadPool { ~ThreadPool() { workQueue.finish(); waitForCompletion(); for (int i=0; i<_numThreads; ++i) { pthread_join(threads[i], 0); } @@ -114,7 +118,7 @@ class ThreadPool { static pthread_mutex_t console_mutex = PTHREAD_MUTEX_INITIALIZER; void showTask(int n) { pthread_mutex_lock(&console_mutex); printf("Adding fib(%d)\n", n); pthread_mutex_unlock(&console_mutex); } @@ -123,37 +127,36 @@ class FibTask : public Task { FibTask(int n) : Task(), _n(n) {} ~FibTask() { pthread_mutex_lock(&console_mutex); printf("tid(%d) - fibd(%d) being deleted\n", pthread_self(), _n); pthread_mutex_unlock(&console_mutex); } virtual void run() { long long val = innerFib(_n); pthread_mutex_lock(&console_mutex); printf("Fibd %d = %lld\n",_n, val); pthread_mutex_unlock(&console_mutex); } virtual void showTask() { pthread_mutex_lock(&console_mutex); printf("thread %d computing fibonacci %d\n", pthread_self(), _n); pthread_mutex_unlock(&console_mutex); } private: long long innerFib(long long n) { if (n<=1) { return 1; } return innerFib(n-1) + innerFib(n-2); } long long _n; }; int main(int argc, char *argv[]) { ThreadPool *tp = new ThreadPool(4); for (int i=0;i<70; ++i) { int rv = rand() % 50 + 1; showTask(rv); tp->addTask(new FibTask(rv)); } tp->finish(); delete tp; printf("Done with all work!\n"); } -
jl2 created this gist
Mar 27, 2011 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,159 @@ #include <stdio.h> #include <queue> #include <pthread.h> #include <stdlib.h> class Task { public: Task() {} ~Task() {} virtual void run()=0; virtual void indicateTaken()=0; }; class WorkQueue { public: WorkQueue() { pthread_mutex_init(&qmtx,0); pthread_cond_init(&wcond, 0); } ~WorkQueue() { pthread_mutex_destroy(&qmtx); pthread_cond_destroy(&wcond); } Task *nextTask() { Task *nt; pthread_mutex_lock(&qmtx); if (finished && tasks.size() == 0) { nt = 0; } else { if (tasks.size()==0) { pthread_cond_wait(&wcond, &qmtx); } nt = tasks.front(); tasks.pop(); } pthread_mutex_unlock(&qmtx); return nt; } void addTask(Task *nt) { if (!finished) { pthread_mutex_lock(&qmtx); tasks.push(nt); pthread_cond_signal(&wcond); pthread_mutex_unlock(&qmtx); } } void finish() { pthread_mutex_lock(&qmtx); finished = true; pthread_cond_signal(&wcond); pthread_mutex_unlock(&qmtx); } bool hasWork() { return (tasks.size()>0); } private: std::queue<Task*> tasks; bool finished; pthread_mutex_t qmtx; pthread_cond_t wcond; }; static void *getWork(void* param) { Task *mw = 0; WorkQueue *wq = (WorkQueue*)param; while (mw = wq->nextTask()) { mw->indicateTaken(); mw->run(); delete mw; } return 0; } class ThreadPool { public: ThreadPool(int n) : _numThreads(n) { printf("Creating a thread pool with %d threads\n", n); threads = new pthread_t[n]; for (int i=0; i< n; ++i) { pthread_create(&(threads[i]), 0, getWork, &workQueue); } } ~ThreadPool() { workQueue.finish(); for (int i=0; i<_numThreads; ++i) { pthread_join(threads[i], 0); } delete [] threads; } void addTask(Task *nt) { workQueue.addTask(nt); } void finish() { workQueue.finish(); } bool hasWork() { return workQueue.hasWork(); } void waitForCompletion() { while (workQueue.hasWork()) {} } private: pthread_t *threads; int _numThreads; WorkQueue workQueue; }; static pthread_mutex_t console_mutex = PTHREAD_MUTEX_INITIALIZER; void showTask(int n) { pthread_mutex_lock(&console_mutex); printf("Adding fibonacci task %d\n", n); pthread_mutex_unlock(&console_mutex); } class FibTask : public Task { public: FibTask(int n) : Task(), _n(n) {} ~FibTask() { pthread_mutex_lock(&console_mutex); printf("Fibonacci task %d being deleted\n", _n); pthread_mutex_unlock(&console_mutex); } virtual void run() { int val = innerFib(_n); pthread_mutex_lock(&console_mutex); printf("Fibd %d = %d\n",_n, val); pthread_mutex_unlock(&console_mutex); } virtual void indicateTaken() { pthread_mutex_lock(&console_mutex); printf("Took fibonacci task %d\n", _n); pthread_mutex_unlock(&console_mutex); } private: int innerFib(int n) { if (n<=1) { return 1; } return innerFib(n-1) + innerFib(n-2); } int _n; }; int main(int argc, char *argv[]) { ThreadPool *tp = new ThreadPool(8); for (int i=0;i<30; ++i) { int rv = rand() % 30 + 9; showTask(rv); tp->addTask(new FibTask(rv)); } tp->finish(); tp->waitForCompletion(); delete tp; printf("Done with all work!\n"); }