Refactor WorkQueue, merge WorkTicket and WorkItem

Allow the caller to hold on to the WorkItem. This makes it possible for a derived WorkItem to store the result of the work within the WorkItem itself.
coverity_scan
scrawl 9 years ago
parent df57d4bfba
commit b7e69cbc64

@ -1,9 +1,11 @@
#include "workqueue.hpp" #include "workqueue.hpp"
#include <iostream>
namespace SceneUtil namespace SceneUtil
{ {
void WorkTicket::waitTillDone() void WorkItem::waitTillDone()
{ {
if (mDone > 0) if (mDone > 0)
return; return;
@ -15,7 +17,7 @@ void WorkTicket::waitTillDone()
} }
} }
void WorkTicket::signalDone() void WorkItem::signalDone()
{ {
{ {
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex); OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex);
@ -25,23 +27,16 @@ void WorkTicket::signalDone()
} }
WorkItem::WorkItem() WorkItem::WorkItem()
: mTicket(new WorkTicket)
{ {
mTicket->setThreadSafeRefUnref(true);
} }
WorkItem::~WorkItem() WorkItem::~WorkItem()
{ {
} }
void WorkItem::doWork() bool WorkItem::isDone() const
{ {
mTicket->signalDone(); return (mDone > 0);
}
osg::ref_ptr<WorkTicket> WorkItem::getTicket()
{
return mTicket;
} }
WorkQueue::WorkQueue(int workerThreads) WorkQueue::WorkQueue(int workerThreads)
@ -60,11 +55,7 @@ WorkQueue::~WorkQueue()
{ {
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex); OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex);
while (!mQueue.empty()) while (!mQueue.empty())
{
WorkItem* item = mQueue.front();
delete item;
mQueue.pop(); mQueue.pop();
}
mIsReleased = true; mIsReleased = true;
mCondition.broadcast(); mCondition.broadcast();
} }
@ -76,16 +67,20 @@ WorkQueue::~WorkQueue()
} }
} }
osg::ref_ptr<WorkTicket> WorkQueue::addWorkItem(WorkItem *item) void WorkQueue::addWorkItem(osg::ref_ptr<WorkItem> item)
{ {
osg::ref_ptr<WorkTicket> ticket = item->getTicket(); if (item->isDone())
{
std::cerr << "warning, trying to add a work item that is already completed" << std::endl;
return;
}
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex); OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex);
mQueue.push(item); mQueue.push(item);
mCondition.signal(); mCondition.signal();
return ticket;
} }
WorkItem *WorkQueue::removeWorkItem() osg::ref_ptr<WorkItem> WorkQueue::removeWorkItem()
{ {
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex); OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex);
while (mQueue.empty() && !mIsReleased) while (mQueue.empty() && !mIsReleased)
@ -94,7 +89,7 @@ WorkItem *WorkQueue::removeWorkItem()
} }
if (mQueue.size()) if (mQueue.size())
{ {
WorkItem* item = mQueue.front(); osg::ref_ptr<WorkItem> item = mQueue.front();
mQueue.pop(); mQueue.pop();
return item; return item;
} }
@ -111,11 +106,11 @@ void WorkThread::run()
{ {
while (true) while (true)
{ {
WorkItem* item = mWorkQueue->removeWorkItem(); osg::ref_ptr<WorkItem> item = mWorkQueue->removeWorkItem();
if (!item) if (!item)
return; return;
item->doWork(); item->doWork();
delete item; item->signalDone();
} }
} }

@ -14,69 +14,53 @@
namespace SceneUtil namespace SceneUtil
{ {
class WorkTicket : public osg::Referenced class WorkItem : public osg::Referenced
{
public:
void waitTillDone();
void signalDone();
private:
OpenThreads::Atomic mDone;
OpenThreads::Mutex mMutex;
OpenThreads::Condition mCondition;
};
class WorkItem
{ {
public: public:
WorkItem(); WorkItem();
virtual ~WorkItem(); virtual ~WorkItem();
/// Override in a derived WorkItem to perform actual work. /// Override in a derived WorkItem to perform actual work.
/// By default, just signals the ticket that the work is done. virtual void doWork() {}
virtual void doWork();
bool isDone() const;
osg::ref_ptr<WorkTicket> getTicket(); /// Wait until the work is completed. Usually called from the main thread.
void waitTillDone();
/// Internal use by the WorkQueue.
void signalDone();
protected: protected:
osg::ref_ptr<WorkTicket> mTicket; OpenThreads::Atomic mDone;
OpenThreads::Mutex mMutex;
OpenThreads::Condition mCondition;
}; };
class WorkQueue; class WorkQueue;
class WorkThread;
class WorkThread : public OpenThreads::Thread
{
public:
WorkThread(WorkQueue* workQueue);
virtual void run();
private:
WorkQueue* mWorkQueue;
};
/// @brief A work queue that users can push work items onto, to be completed by one or more background threads. /// @brief A work queue that users can push work items onto, to be completed by one or more background threads.
class WorkQueue /// @note Work items will be processed in the order that they were given in, however
/// if multiple work threads are involved then it is possible for a later item to complete before earlier items.
class WorkQueue : public osg::Referenced
{ {
public: public:
WorkQueue(int numWorkerThreads=1); WorkQueue(int numWorkerThreads=1);
~WorkQueue(); ~WorkQueue();
/// Add a new work item to the back of the queue. /// Add a new work item to the back of the queue.
/// @par The returned WorkTicket may be used by the caller to wait until the work is complete. /// @par The work item's waitTillDone() method may be used by the caller to wait until the work is complete.
osg::ref_ptr<WorkTicket> addWorkItem(WorkItem* item); void addWorkItem(osg::ref_ptr<WorkItem> item);
/// Get the next work item from the front of the queue. If the queue is empty, waits until a new item is added. /// Get the next work item from the front of the queue. If the queue is empty, waits until a new item is added.
/// If the workqueue is in the process of being destroyed, may return NULL. /// If the workqueue is in the process of being destroyed, may return NULL.
/// @note The caller must free the returned WorkItem /// @par Used internally by the WorkThread.
WorkItem* removeWorkItem(); osg::ref_ptr<WorkItem> removeWorkItem();
void runThread();
private: private:
bool mIsReleased; bool mIsReleased;
std::queue<WorkItem*> mQueue; std::queue<osg::ref_ptr<WorkItem> > mQueue;
OpenThreads::Mutex mMutex; OpenThreads::Mutex mMutex;
OpenThreads::Condition mCondition; OpenThreads::Condition mCondition;
@ -84,6 +68,17 @@ namespace SceneUtil
std::vector<WorkThread*> mThreads; std::vector<WorkThread*> mThreads;
}; };
/// Internally used by WorkQueue.
class WorkThread : public OpenThreads::Thread
{
public:
WorkThread(WorkQueue* workQueue);
virtual void run();
private:
WorkQueue* mWorkQueue;
};
} }

Loading…
Cancel
Save