#include "workqueue.hpp" namespace SceneUtil { void WorkTicket::waitTillDone() { if (mDone > 0) return; OpenThreads::ScopedLock lock(mMutex); while (mDone == 0) { mCondition.wait(&mMutex); } } void WorkTicket::signalDone() { { OpenThreads::ScopedLock lock(mMutex); mDone.exchange(1); } mCondition.broadcast(); } WorkItem::WorkItem() : mTicket(new WorkTicket) { mTicket->setThreadSafeRefUnref(true); } WorkItem::~WorkItem() { } void WorkItem::doWork() { mTicket->signalDone(); } osg::ref_ptr WorkItem::getTicket() { return mTicket; } WorkQueue::WorkQueue(int workerThreads) : mIsReleased(false) { for (int i=0; istartThread(); } } WorkQueue::~WorkQueue() { { OpenThreads::ScopedLock lock(mMutex); while (mQueue.size()) { WorkItem* item = mQueue.front(); delete item; mQueue.pop(); } mIsReleased = true; mCondition.broadcast(); } for (unsigned int i=0; ijoin(); delete mThreads[i]; } } osg::ref_ptr WorkQueue::addWorkItem(WorkItem *item) { osg::ref_ptr ticket = item->getTicket(); OpenThreads::ScopedLock lock(mMutex); mQueue.push(item); mCondition.signal(); return ticket; } WorkItem *WorkQueue::removeWorkItem() { OpenThreads::ScopedLock lock(mMutex); while (!mQueue.size() && !mIsReleased) { mCondition.wait(&mMutex); } if (mQueue.size()) { WorkItem* item = mQueue.front(); mQueue.pop(); return item; } else return NULL; } WorkThread::WorkThread(WorkQueue *workQueue) : mWorkQueue(workQueue) { } void WorkThread::run() { while (true) { WorkItem* item = mWorkQueue->removeWorkItem(); if (!item) return; item->doWork(); delete item; } } }