From cc71e894e1ec30782ebe9e6ef398f5b270f9ceb9 Mon Sep 17 00:00:00 2001 From: scrawl Date: Wed, 10 Jun 2015 18:15:31 +0200 Subject: [PATCH] Add WorkQueue class --- components/CMakeLists.txt | 2 +- components/sceneutil/workqueue.cpp | 119 +++++++++++++++++++++++++++++ components/sceneutil/workqueue.hpp | 91 ++++++++++++++++++++++ 3 files changed, 211 insertions(+), 1 deletion(-) create mode 100644 components/sceneutil/workqueue.cpp create mode 100644 components/sceneutil/workqueue.hpp diff --git a/components/CMakeLists.txt b/components/CMakeLists.txt index 5a4ec605b..8ce770657 100644 --- a/components/CMakeLists.txt +++ b/components/CMakeLists.txt @@ -41,7 +41,7 @@ add_component_dir (resource ) add_component_dir (sceneutil - clone attach lightmanager visitor util statesetupdater controller skeleton riggeometry lightcontroller + clone attach lightmanager visitor util statesetupdater controller skeleton riggeometry lightcontroller workqueue ) add_component_dir (nif diff --git a/components/sceneutil/workqueue.cpp b/components/sceneutil/workqueue.cpp new file mode 100644 index 000000000..a709eae85 --- /dev/null +++ b/components/sceneutil/workqueue.cpp @@ -0,0 +1,119 @@ +#include "workqueue.hpp" + +namespace SceneUtil +{ + +void WorkTicket::waitTillDone() +{ + if (mDone > 0) + return; + + OpenThreads::ScopedLock lock(mMutex); + while (mDone == 0) + { + mCondition.wait(&mMutex); + } +} + +void WorkTicket::signalDone() +{ + OpenThreads::ScopedLock lock(mMutex); + mDone.exchange(1); + mCondition.broadcast(); +} + +WorkItem::WorkItem() + : mTicket(new WorkTicket) +{ +} + +WorkItem::~WorkItem() +{ +} + +void WorkItem::doWork() +{ + mTicket->signalDone(); +} + +osg::ref_ptr WorkItem::getTicket() +{ + return mTicket; +} + +WorkQueue::WorkQueue(int workerThreads) + : mIsReleased(false) +{ + for (int i=0; istartThread(); + } +} + +WorkQueue::~WorkQueue() +{ + { + OpenThreads::ScopedLock lock(mMutex); + while (mQueue.size()) + { + WorkItem* item = mQueue.front(); + delete item; + mQueue.pop(); + } + mIsReleased = true; + mCondition.broadcast(); + } + + for (unsigned int i=0; ijoin(); + delete mThreads[i]; + } +} + +WorkTicket* WorkQueue::addWorkItem(WorkItem *item) +{ + WorkTicket* ticket = item->getTicket().get(); + OpenThreads::ScopedLock lock(mMutex); + mQueue.push(item); + mCondition.signal(); + return ticket; +} + +WorkItem *WorkQueue::removeWorkItem() +{ + OpenThreads::ScopedLock lock(mMutex); + while (!mQueue.size() && !mIsReleased) + { + mCondition.wait(&mMutex); + } + if (mQueue.size()) + { + WorkItem* item = mQueue.front(); + mQueue.pop(); + return item; + } + else + return NULL; +} + +WorkThread::WorkThread(WorkQueue *workQueue) + : mWorkQueue(workQueue) +{ +} + +void WorkThread::run() +{ + while (true) + { + WorkItem* item = mWorkQueue->removeWorkItem(); + if (!item) + return; + item->doWork(); + delete item; + } +} + +} diff --git a/components/sceneutil/workqueue.hpp b/components/sceneutil/workqueue.hpp new file mode 100644 index 000000000..720fbaa68 --- /dev/null +++ b/components/sceneutil/workqueue.hpp @@ -0,0 +1,91 @@ +#ifndef OPENMW_COMPONENTS_SCENEUTIL_WORKQUEUE_H +#define OPENMW_COMPONENTS_SCENEUTIL_WORKQUEUE_H + +#include +#include +#include +#include + +#include +#include + +#include + +namespace SceneUtil +{ + + class WorkTicket : public osg::Referenced + { + public: + void waitTillDone(); + + void signalDone(); + + private: + OpenThreads::Atomic mDone; + OpenThreads::Mutex mMutex; + OpenThreads::Condition mCondition; + }; + + class WorkItem + { + public: + WorkItem(); + virtual ~WorkItem(); + + /// Override in a derived WorkItem to perform actual work. + /// By default, just signals the ticket that the work is done. + virtual void doWork(); + + osg::ref_ptr getTicket(); + + protected: + osg::ref_ptr mTicket; + }; + + class WorkQueue; + + class WorkThread : public OpenThreads::Thread + { + public: + WorkThread(WorkQueue* workQueue); + + virtual void run(); + + private: + WorkQueue* mWorkQueue; + }; + + /// @brief A work queue that users can push work items onto, to be completed by one or more background threads. + class WorkQueue + { + public: + WorkQueue(int numWorkerThreads=1); + ~WorkQueue(); + + /// Add a new work item to the back of the queue. + /// @par The returned WorkTicket may be used by the caller to wait until the work is complete. + WorkTicket* addWorkItem(WorkItem* item); + + /// Get the next work item from the front of the queue. If the queue is empty, waits until a new item is added. + /// If the workqueue is in the process of being destroyed, may return NULL. + /// @note The caller must free the returned WorkItem + WorkItem* removeWorkItem(); + + void runThread(); + + private: + bool mIsReleased; + std::queue mQueue; + + OpenThreads::Mutex mMutex; + OpenThreads::Condition mCondition; + + std::vector mThreads; + }; + + + +} + +#endif