#include "workqueue.hpp" #include namespace SceneUtil { void WorkItem::waitTillDone() { if (mDone > 0) return; OpenThreads::ScopedLock lock(mMutex); while (mDone == 0) { mCondition.wait(&mMutex); } } void WorkItem::signalDone() { { OpenThreads::ScopedLock lock(mMutex); mDone.exchange(1); } mCondition.broadcast(); } WorkItem::WorkItem() { } WorkItem::~WorkItem() { } bool WorkItem::isDone() const { return (mDone > 0); } WorkQueue::WorkQueue(int workerThreads) : mIsReleased(false) { for (int i=0; istartThread(); } } WorkQueue::~WorkQueue() { { OpenThreads::ScopedLock lock(mMutex); while (!mQueue.empty()) mQueue.pop(); mIsReleased = true; mCondition.broadcast(); } for (unsigned int i=0; ijoin(); delete mThreads[i]; } } void WorkQueue::addWorkItem(osg::ref_ptr item) { if (item->isDone()) { std::cerr << "warning, trying to add a work item that is already completed" << std::endl; return; } OpenThreads::ScopedLock lock(mMutex); mQueue.push(item); mCondition.signal(); } osg::ref_ptr WorkQueue::removeWorkItem() { OpenThreads::ScopedLock lock(mMutex); while (mQueue.empty() && !mIsReleased) { mCondition.wait(&mMutex); } if (!mQueue.empty()) { osg::ref_ptr item = mQueue.front(); mQueue.pop(); return item; } else return NULL; } WorkThread::WorkThread(WorkQueue *workQueue) : mWorkQueue(workQueue) { } void WorkThread::run() { while (true) { osg::ref_ptr item = mWorkQueue->removeWorkItem(); if (!item) return; item->doWork(); item->signalDone(); } } }