diff --git a/components/sceneutil/workqueue.cpp b/components/sceneutil/workqueue.cpp index 26a392be4..bb1d1f1f7 100644 --- a/components/sceneutil/workqueue.cpp +++ b/components/sceneutil/workqueue.cpp @@ -1,9 +1,11 @@ #include "workqueue.hpp" +#include + namespace SceneUtil { -void WorkTicket::waitTillDone() +void WorkItem::waitTillDone() { if (mDone > 0) return; @@ -15,7 +17,7 @@ void WorkTicket::waitTillDone() } } -void WorkTicket::signalDone() +void WorkItem::signalDone() { { OpenThreads::ScopedLock lock(mMutex); @@ -25,23 +27,16 @@ void WorkTicket::signalDone() } WorkItem::WorkItem() - : mTicket(new WorkTicket) { - mTicket->setThreadSafeRefUnref(true); } WorkItem::~WorkItem() { } -void WorkItem::doWork() -{ - mTicket->signalDone(); -} - -osg::ref_ptr WorkItem::getTicket() +bool WorkItem::isDone() const { - return mTicket; + return (mDone > 0); } WorkQueue::WorkQueue(int workerThreads) @@ -60,11 +55,7 @@ WorkQueue::~WorkQueue() { OpenThreads::ScopedLock lock(mMutex); while (!mQueue.empty()) - { - WorkItem* item = mQueue.front(); - delete item; mQueue.pop(); - } mIsReleased = true; mCondition.broadcast(); } @@ -76,16 +67,20 @@ WorkQueue::~WorkQueue() } } -osg::ref_ptr WorkQueue::addWorkItem(WorkItem *item) +void WorkQueue::addWorkItem(osg::ref_ptr item) { - osg::ref_ptr ticket = item->getTicket(); + if (item->isDone()) + { + std::cerr << "warning, trying to add a work item that is already completed" << std::endl; + return; + } + OpenThreads::ScopedLock lock(mMutex); mQueue.push(item); mCondition.signal(); - return ticket; } -WorkItem *WorkQueue::removeWorkItem() +osg::ref_ptr WorkQueue::removeWorkItem() { OpenThreads::ScopedLock lock(mMutex); while (mQueue.empty() && !mIsReleased) @@ -94,7 +89,7 @@ WorkItem *WorkQueue::removeWorkItem() } if (mQueue.size()) { - WorkItem* item = mQueue.front(); + osg::ref_ptr item = mQueue.front(); mQueue.pop(); return item; } @@ -111,11 +106,11 @@ void WorkThread::run() { while (true) { - WorkItem* item = mWorkQueue->removeWorkItem(); + osg::ref_ptr item = mWorkQueue->removeWorkItem(); if (!item) return; item->doWork(); - delete item; + item->signalDone(); } } diff --git a/components/sceneutil/workqueue.hpp b/components/sceneutil/workqueue.hpp index 492bbd090..06489dfbb 100644 --- a/components/sceneutil/workqueue.hpp +++ b/components/sceneutil/workqueue.hpp @@ -14,69 +14,53 @@ namespace SceneUtil { - class WorkTicket : public osg::Referenced - { - public: - void waitTillDone(); - - void signalDone(); - - private: - OpenThreads::Atomic mDone; - OpenThreads::Mutex mMutex; - OpenThreads::Condition mCondition; - }; - - class WorkItem + class WorkItem : public osg::Referenced { public: WorkItem(); virtual ~WorkItem(); /// Override in a derived WorkItem to perform actual work. - /// By default, just signals the ticket that the work is done. - virtual void doWork(); + virtual void doWork() {} + + bool isDone() const; - osg::ref_ptr getTicket(); + /// Wait until the work is completed. Usually called from the main thread. + void waitTillDone(); + + /// Internal use by the WorkQueue. + void signalDone(); protected: - osg::ref_ptr mTicket; + OpenThreads::Atomic mDone; + OpenThreads::Mutex mMutex; + OpenThreads::Condition mCondition; }; class WorkQueue; - - class WorkThread : public OpenThreads::Thread - { - public: - WorkThread(WorkQueue* workQueue); - - virtual void run(); - - private: - WorkQueue* mWorkQueue; - }; + class WorkThread; /// @brief A work queue that users can push work items onto, to be completed by one or more background threads. - class WorkQueue + /// @note Work items will be processed in the order that they were given in, however + /// if multiple work threads are involved then it is possible for a later item to complete before earlier items. + class WorkQueue : public osg::Referenced { public: WorkQueue(int numWorkerThreads=1); ~WorkQueue(); /// Add a new work item to the back of the queue. - /// @par The returned WorkTicket may be used by the caller to wait until the work is complete. - osg::ref_ptr addWorkItem(WorkItem* item); + /// @par The work item's waitTillDone() method may be used by the caller to wait until the work is complete. + void addWorkItem(osg::ref_ptr item); /// Get the next work item from the front of the queue. If the queue is empty, waits until a new item is added. /// If the workqueue is in the process of being destroyed, may return NULL. - /// @note The caller must free the returned WorkItem - WorkItem* removeWorkItem(); - - void runThread(); + /// @par Used internally by the WorkThread. + osg::ref_ptr removeWorkItem(); private: bool mIsReleased; - std::queue mQueue; + std::queue > mQueue; OpenThreads::Mutex mMutex; OpenThreads::Condition mCondition; @@ -84,6 +68,17 @@ namespace SceneUtil std::vector mThreads; }; + /// Internally used by WorkQueue. + class WorkThread : public OpenThreads::Thread + { + public: + WorkThread(WorkQueue* workQueue); + + virtual void run(); + + private: + WorkQueue* mWorkQueue; + }; }