#include "workqueue.hpp" #include #include namespace SceneUtil { void WorkItem::waitTillDone() { if (mDone) return; std::unique_lock lock(mMutex); while (!mDone) { mCondition.wait(lock); } } void WorkItem::signalDone() { { std::unique_lock lock(mMutex); mDone = true; } mCondition.notify_all(); } bool WorkItem::isDone() const { return mDone; } WorkQueue::WorkQueue(std::size_t workerThreads) : mIsReleased(false) { start(workerThreads); } WorkQueue::~WorkQueue() { stop(); } void WorkQueue::start(std::size_t workerThreads) { { const std::lock_guard lock(mMutex); mIsReleased = false; } while (mThreads.size() < workerThreads) mThreads.emplace_back(std::make_unique(*this)); } void WorkQueue::stop() { { std::unique_lock lock(mMutex); while (!mQueue.empty()) mQueue.pop_back(); mIsReleased = true; mCondition.notify_all(); } mThreads.clear(); } void WorkQueue::addWorkItem(osg::ref_ptr item, bool front) { if (item->isDone()) { Log(Debug::Error) << "Error: trying to add a work item that is already completed"; return; } std::unique_lock lock(mMutex); if (front) mQueue.push_front(std::move(item)); else mQueue.push_back(std::move(item)); mCondition.notify_one(); } osg::ref_ptr WorkQueue::removeWorkItem() { std::unique_lock lock(mMutex); while (mQueue.empty() && !mIsReleased) { mCondition.wait(lock); } if (!mQueue.empty()) { osg::ref_ptr item = std::move(mQueue.front()); mQueue.pop_front(); return item; } return nullptr; } unsigned int WorkQueue::getNumItems() const { std::unique_lock lock(mMutex); return mQueue.size(); } unsigned int WorkQueue::getNumActiveThreads() const { return std::accumulate( mThreads.begin(), mThreads.end(), 0u, [](auto r, const auto& t) { return r + t->isActive(); }); } WorkThread::WorkThread(WorkQueue& workQueue) : mWorkQueue(&workQueue) , mActive(false) , mThread([this] { run(); }) { } WorkThread::~WorkThread() { mThread.join(); } void WorkThread::run() { while (true) { osg::ref_ptr item = mWorkQueue->removeWorkItem(); if (!item) return; mActive = true; item->doWork(); item->signalDone(); mActive = false; } } bool WorkThread::isActive() const { return mActive; } }