Use std types for WorkQueue

pull/578/head
elsid 5 years ago
parent da65a3b8d4
commit 3251687a3d
No known key found for this signature in database
GPG Key ID: B845CB9FEE18AB40

@ -2,69 +2,55 @@
#include <components/debug/debuglog.hpp> #include <components/debug/debuglog.hpp>
#include <numeric>
namespace SceneUtil namespace SceneUtil
{ {
void WorkItem::waitTillDone() void WorkItem::waitTillDone()
{ {
if (mDone > 0) if (mDone)
return; return;
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex); std::unique_lock<std::mutex> lock(mMutex);
while (mDone == 0) while (!mDone)
{ {
mCondition.wait(&mMutex); mCondition.wait(lock);
} }
} }
void WorkItem::signalDone() void WorkItem::signalDone()
{ {
{ {
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex); std::unique_lock<std::mutex> lock(mMutex);
mDone.exchange(1); mDone = true;
} }
mCondition.broadcast(); mCondition.notify_all();
}
WorkItem::WorkItem()
{
}
WorkItem::~WorkItem()
{
} }
bool WorkItem::isDone() const bool WorkItem::isDone() const
{ {
return (mDone > 0); return mDone;
} }
WorkQueue::WorkQueue(int workerThreads) WorkQueue::WorkQueue(int workerThreads)
: mIsReleased(false) : mIsReleased(false)
{ {
for (int i=0; i<workerThreads; ++i) for (int i=0; i<workerThreads; ++i)
{ mThreads.emplace_back(std::make_unique<WorkThread>(*this));
WorkThread* thread = new WorkThread(this);
mThreads.push_back(thread);
thread->startThread();
}
} }
WorkQueue::~WorkQueue() WorkQueue::~WorkQueue()
{ {
{ {
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex); std::unique_lock<std::mutex> lock(mMutex);
while (!mQueue.empty()) while (!mQueue.empty())
mQueue.pop_back(); mQueue.pop_back();
mIsReleased = true; mIsReleased = true;
mCondition.broadcast(); mCondition.notify_all();
} }
for (unsigned int i=0; i<mThreads.size(); ++i) mThreads.clear();
{
mThreads[i]->join();
delete mThreads[i];
}
} }
void WorkQueue::addWorkItem(osg::ref_ptr<WorkItem> item, bool front) void WorkQueue::addWorkItem(osg::ref_ptr<WorkItem> item, bool front)
@ -75,20 +61,20 @@ void WorkQueue::addWorkItem(osg::ref_ptr<WorkItem> item, bool front)
return; return;
} }
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex); std::unique_lock<std::mutex> lock(mMutex);
if (front) if (front)
mQueue.push_front(item); mQueue.push_front(item);
else else
mQueue.push_back(item); mQueue.push_back(item);
mCondition.signal(); mCondition.notify_one();
} }
osg::ref_ptr<WorkItem> WorkQueue::removeWorkItem() osg::ref_ptr<WorkItem> WorkQueue::removeWorkItem()
{ {
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex); std::unique_lock<std::mutex> lock(mMutex);
while (mQueue.empty() && !mIsReleased) while (mQueue.empty() && !mIsReleased)
{ {
mCondition.wait(&mMutex); mCondition.wait(lock);
} }
if (!mQueue.empty()) if (!mQueue.empty())
{ {
@ -102,25 +88,26 @@ osg::ref_ptr<WorkItem> WorkQueue::removeWorkItem()
unsigned int WorkQueue::getNumItems() const unsigned int WorkQueue::getNumItems() const
{ {
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex); std::unique_lock<std::mutex> lock(mMutex);
return mQueue.size(); return mQueue.size();
} }
unsigned int WorkQueue::getNumActiveThreads() const unsigned int WorkQueue::getNumActiveThreads() const
{ {
unsigned int count = 0; return std::accumulate(mThreads.begin(), mThreads.end(), 0u,
for (unsigned int i=0; i<mThreads.size(); ++i) [] (auto r, const auto& t) { return r + t->isActive(); });
{
if (mThreads[i]->isActive())
++count;
}
return count;
} }
WorkThread::WorkThread(WorkQueue *workQueue) WorkThread::WorkThread(WorkQueue& workQueue)
: mWorkQueue(workQueue) : mWorkQueue(&workQueue)
, mActive(false) , mActive(false)
, mThread([this] { run(); })
{
}
WorkThread::~WorkThread()
{ {
mThread.join();
} }
void WorkThread::run() void WorkThread::run()

@ -1,16 +1,14 @@
#ifndef OPENMW_COMPONENTS_SCENEUTIL_WORKQUEUE_H #ifndef OPENMW_COMPONENTS_SCENEUTIL_WORKQUEUE_H
#define OPENMW_COMPONENTS_SCENEUTIL_WORKQUEUE_H #define OPENMW_COMPONENTS_SCENEUTIL_WORKQUEUE_H
#include <OpenThreads/Atomic>
#include <OpenThreads/Mutex>
#include <OpenThreads/Condition>
#include <OpenThreads/Thread>
#include <osg/Referenced> #include <osg/Referenced>
#include <osg/ref_ptr> #include <osg/ref_ptr>
#include <atomic> #include <atomic>
#include <queue> #include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
namespace SceneUtil namespace SceneUtil
{ {
@ -18,9 +16,6 @@ namespace SceneUtil
class WorkItem : public osg::Referenced class WorkItem : public osg::Referenced
{ {
public: public:
WorkItem();
virtual ~WorkItem();
/// Override in a derived WorkItem to perform actual work. /// Override in a derived WorkItem to perform actual work.
virtual void doWork() {} virtual void doWork() {}
@ -35,10 +30,10 @@ namespace SceneUtil
/// Set abort flag in order to return from doWork() as soon as possible. May not be respected by all WorkItems. /// Set abort flag in order to return from doWork() as soon as possible. May not be respected by all WorkItems.
virtual void abort() {} virtual void abort() {}
protected: private:
OpenThreads::Atomic mDone; std::atomic_bool mDone {false};
OpenThreads::Mutex mMutex; std::mutex mMutex;
OpenThreads::Condition mCondition; std::condition_variable mCondition;
}; };
class WorkThread; class WorkThread;
@ -70,25 +65,28 @@ namespace SceneUtil
bool mIsReleased; bool mIsReleased;
std::deque<osg::ref_ptr<WorkItem> > mQueue; std::deque<osg::ref_ptr<WorkItem> > mQueue;
mutable OpenThreads::Mutex mMutex; mutable std::mutex mMutex;
OpenThreads::Condition mCondition; std::condition_variable mCondition;
std::vector<WorkThread*> mThreads; std::vector<std::unique_ptr<WorkThread>> mThreads;
}; };
/// Internally used by WorkQueue. /// Internally used by WorkQueue.
class WorkThread : public OpenThreads::Thread class WorkThread
{ {
public: public:
WorkThread(WorkQueue* workQueue); WorkThread(WorkQueue& workQueue);
virtual void run(); ~WorkThread();
bool isActive() const; bool isActive() const;
private: private:
WorkQueue* mWorkQueue; WorkQueue* mWorkQueue;
std::atomic<bool> mActive; std::atomic<bool> mActive;
std::thread mThread;
void run();
}; };

Loading…
Cancel
Save