Add WorkQueue class

c++11
scrawl 10 years ago
parent 15453e3d90
commit cc71e894e1

@ -41,7 +41,7 @@ add_component_dir (resource
)
add_component_dir (sceneutil
clone attach lightmanager visitor util statesetupdater controller skeleton riggeometry lightcontroller
clone attach lightmanager visitor util statesetupdater controller skeleton riggeometry lightcontroller workqueue
)
add_component_dir (nif

@ -0,0 +1,119 @@
#include "workqueue.hpp"
namespace SceneUtil
{
void WorkTicket::waitTillDone()
{
if (mDone > 0)
return;
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex);
while (mDone == 0)
{
mCondition.wait(&mMutex);
}
}
void WorkTicket::signalDone()
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex);
mDone.exchange(1);
mCondition.broadcast();
}
WorkItem::WorkItem()
: mTicket(new WorkTicket)
{
}
WorkItem::~WorkItem()
{
}
void WorkItem::doWork()
{
mTicket->signalDone();
}
osg::ref_ptr<WorkTicket> WorkItem::getTicket()
{
return mTicket;
}
WorkQueue::WorkQueue(int workerThreads)
: mIsReleased(false)
{
for (int i=0; i<workerThreads; ++i)
{
WorkThread* thread = new WorkThread(this);
mThreads.push_back(thread);
thread->startThread();
}
}
WorkQueue::~WorkQueue()
{
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex);
while (mQueue.size())
{
WorkItem* item = mQueue.front();
delete item;
mQueue.pop();
}
mIsReleased = true;
mCondition.broadcast();
}
for (unsigned int i=0; i<mThreads.size(); ++i)
{
mThreads[i]->join();
delete mThreads[i];
}
}
WorkTicket* WorkQueue::addWorkItem(WorkItem *item)
{
WorkTicket* ticket = item->getTicket().get();
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex);
mQueue.push(item);
mCondition.signal();
return ticket;
}
WorkItem *WorkQueue::removeWorkItem()
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex);
while (!mQueue.size() && !mIsReleased)
{
mCondition.wait(&mMutex);
}
if (mQueue.size())
{
WorkItem* item = mQueue.front();
mQueue.pop();
return item;
}
else
return NULL;
}
WorkThread::WorkThread(WorkQueue *workQueue)
: mWorkQueue(workQueue)
{
}
void WorkThread::run()
{
while (true)
{
WorkItem* item = mWorkQueue->removeWorkItem();
if (!item)
return;
item->doWork();
delete item;
}
}
}

@ -0,0 +1,91 @@
#ifndef OPENMW_COMPONENTS_SCENEUTIL_WORKQUEUE_H
#define OPENMW_COMPONENTS_SCENEUTIL_WORKQUEUE_H
#include <OpenThreads/Atomic>
#include <OpenThreads/Mutex>
#include <OpenThreads/Condition>
#include <OpenThreads/Thread>
#include <osg/Referenced>
#include <osg/ref_ptr>
#include <queue>
namespace SceneUtil
{
class WorkTicket : public osg::Referenced
{
public:
void waitTillDone();
void signalDone();
private:
OpenThreads::Atomic mDone;
OpenThreads::Mutex mMutex;
OpenThreads::Condition mCondition;
};
class WorkItem
{
public:
WorkItem();
virtual ~WorkItem();
/// Override in a derived WorkItem to perform actual work.
/// By default, just signals the ticket that the work is done.
virtual void doWork();
osg::ref_ptr<WorkTicket> getTicket();
protected:
osg::ref_ptr<WorkTicket> mTicket;
};
class WorkQueue;
class WorkThread : public OpenThreads::Thread
{
public:
WorkThread(WorkQueue* workQueue);
virtual void run();
private:
WorkQueue* mWorkQueue;
};
/// @brief A work queue that users can push work items onto, to be completed by one or more background threads.
class WorkQueue
{
public:
WorkQueue(int numWorkerThreads=1);
~WorkQueue();
/// Add a new work item to the back of the queue.
/// @par The returned WorkTicket may be used by the caller to wait until the work is complete.
WorkTicket* addWorkItem(WorkItem* item);
/// Get the next work item from the front of the queue. If the queue is empty, waits until a new item is added.
/// If the workqueue is in the process of being destroyed, may return NULL.
/// @note The caller must free the returned WorkItem
WorkItem* removeWorkItem();
void runThread();
private:
bool mIsReleased;
std::queue<WorkItem*> mQueue;
OpenThreads::Mutex mMutex;
OpenThreads::Condition mCondition;
std::vector<WorkThread*> mThreads;
};
}
#endif
Loading…
Cancel
Save