From 2c0e64510c02fd0c37a18bb8910bd4768ed54a2d Mon Sep 17 00:00:00 2001 From: elsid Date: Sun, 5 Mar 2023 16:11:10 +0100 Subject: [PATCH] Do not use std::shared_mutex to wait for job for async physics std::shared_mutex in combination with std::condition_variable_any may lead to a situation when notify_all does not wake up all waiting threads on Windows. Use separate std::mutex and std::condition_variable to notify about new job. Encapsulate all workers synchronization logic into a separate type. --- apps/openmw/mwphysics/mtphysics.cpp | 102 ++++++++++++++++++++-------- apps/openmw/mwphysics/mtphysics.hpp | 13 ++-- 2 files changed, 79 insertions(+), 36 deletions(-) diff --git a/apps/openmw/mwphysics/mtphysics.cpp b/apps/openmw/mwphysics/mtphysics.cpp index ea2060bfc6..dabd1fdcad 100644 --- a/apps/openmw/mwphysics/mtphysics.cpp +++ b/apps/openmw/mwphysics/mtphysics.cpp @@ -332,10 +332,63 @@ namespace MWPhysics + std::to_string(static_cast>(lockingPolicy))); } } -} -namespace MWPhysics -{ + class PhysicsTaskScheduler::WorkersSync + { + public: + void waitForWorkers() + { + std::unique_lock lock(mWorkersDoneMutex); + if (mFrameCounter != mWorkersFrameCounter) + mWorkersDone.wait(lock); + } + + void wakeUpWorkers() + { + const std::lock_guard lock(mHasJobMutex); + ++mFrameCounter; + mHasJob.notify_all(); + } + + void stopWorkers() + { + const std::lock_guard lock(mHasJobMutex); + mShouldStop = true; + mHasJob.notify_all(); + } + + void workIsDone() + { + const std::lock_guard lock(mWorkersDoneMutex); + ++mWorkersFrameCounter; + mWorkersDone.notify_all(); + } + + template + void runWorker(F&& f) noexcept + { + std::size_t lastFrame = 0; + std::unique_lock lock(mHasJobMutex); + while (!mShouldStop) + { + mHasJob.wait(lock, [&] { return mShouldStop || mFrameCounter != lastFrame; }); + lastFrame = mFrameCounter; + lock.unlock(); + f(); + lock.lock(); + } + } + + private: + std::size_t mWorkersFrameCounter = 0; + std::condition_variable mWorkersDone; + std::mutex mWorkersDoneMutex; + std::condition_variable mHasJob; + bool mShouldStop = false; + std::size_t mFrameCounter = 0; + std::mutex mHasJobMutex; + }; + PhysicsTaskScheduler::PhysicsTaskScheduler(float physicsDt, btCollisionWorld *collisionWorld, MWRender::DebugDrawer* debugDrawer) : mDefaultPhysicsDt(physicsDt) , mPhysicsDt(physicsDt) @@ -347,9 +400,7 @@ namespace MWPhysics , mNumJobs(0) , mRemainingSteps(0) , mLOSCacheExpiry(Settings::Manager::getInt("lineofsight keep inactive cache", "Physics")) - , mFrameCounter(0) , mAdvanceSimulation(false) - , mQuit(false) , mNextJob(0) , mNextLOS(0) , mFrameNumber(0) @@ -362,6 +413,7 @@ namespace MWPhysics , mTimeBegin(0) , mTimeEnd(0) , mFrameStart(0) + , mWorkersSync(mNumThreads >= 1 ? std::make_unique() : nullptr) { if (mNumThreads >= 1) { @@ -386,11 +438,11 @@ namespace MWPhysics waitForWorkers(); { MaybeExclusiveLock lock(mSimulationMutex, mLockingPolicy); - mQuit = true; mNumJobs = 0; mRemainingSteps = 0; - mHasJob.notify_all(); } + if (mWorkersSync != nullptr) + mWorkersSync->stopWorkers(); for (auto& thread : mThreads) thread.join(); } @@ -444,7 +496,14 @@ namespace MWPhysics void PhysicsTaskScheduler::applyQueuedMovements(float & timeAccum, std::vector&& simulations, osg::Timer_t frameStart, unsigned int frameNumber, osg::Stats& stats) { waitForWorkers(); + prepareWork(timeAccum, std::move(simulations), frameStart, frameNumber, stats); + if (mWorkersSync != nullptr) + mWorkersSync->wakeUpWorkers(); + } + void PhysicsTaskScheduler::prepareWork(float& timeAccum, std::vector&& simulations, + osg::Timer_t frameStart, unsigned int frameNumber, osg::Stats& stats) + { // This function run in the main thread. // While the mSimulationMutex is held, background physics threads can't run. @@ -477,7 +536,6 @@ namespace MWPhysics mPhysicsDt = newDelta; mSimulations = std::move(simulations); mAdvanceSimulation = (mRemainingSteps != 0); - ++mFrameCounter; mNumJobs = mSimulations.size(); mNextLOS.store(0, std::memory_order_relaxed); mNextJob.store(0, std::memory_order_release); @@ -498,7 +556,6 @@ namespace MWPhysics } mAsyncStartTime = mTimer->tick(); - mHasJob.notify_all(); if (mAdvanceSimulation) mBudget.update(mTimer->delta_s(timeStart, mTimer->tick()), 1, mBudgetCursor); } @@ -667,18 +724,10 @@ namespace MWPhysics void PhysicsTaskScheduler::worker() { - std::size_t lastFrame = 0; - std::shared_lock lock(mSimulationMutex); - while (!mQuit) - { - if (lastFrame == mFrameCounter) - { - mHasJob.wait(lock, [&] { return mQuit || lastFrame != mFrameCounter; }); - lastFrame = mFrameCounter; - } - + mWorkersSync->runWorker([this] { + std::shared_lock lock(mSimulationMutex); doSimulation(); - } + }); } void PhysicsTaskScheduler::updateActorsPositions() @@ -790,10 +839,8 @@ namespace MWPhysics mLOSCache.end()); } mTimeEnd = mTimer->tick(); - - std::unique_lock lock(mWorkersDoneMutex); - ++mWorkersFrameCounter; - mWorkersDone.notify_all(); + if (mWorkersSync != nullptr) + mWorkersSync->workIsDone(); } void PhysicsTaskScheduler::syncWithMainThread() @@ -811,10 +858,7 @@ namespace MWPhysics // https://docs.microsoft.com/en-us/windows/win32/sync/slim-reader-writer--srw--locks void PhysicsTaskScheduler::waitForWorkers() { - if (mNumThreads == 0) - return; - std::unique_lock lock(mWorkersDoneMutex); - if (mFrameCounter != mWorkersFrameCounter) - mWorkersDone.wait(lock); + if (mWorkersSync != nullptr) + mWorkersSync->waitForWorkers(); } } diff --git a/apps/openmw/mwphysics/mtphysics.hpp b/apps/openmw/mwphysics/mtphysics.hpp index e993471095..a072ad41ea 100644 --- a/apps/openmw/mwphysics/mtphysics.hpp +++ b/apps/openmw/mwphysics/mtphysics.hpp @@ -68,6 +68,8 @@ namespace MWPhysics void releaseSharedStates(); // destroy all objects whose destructor can't be safely called from ~PhysicsTaskScheduler() private: + class WorkersSync; + void doSimulation(); void worker(); void updateActorsPositions(); @@ -82,6 +84,8 @@ namespace MWPhysics void afterPostSim(); void syncWithMainThread(); void waitForWorkers(); + void prepareWork(float& timeAccum, std::vector&& simulations, osg::Timer_t frameStart, + unsigned int frameNumber, osg::Stats& stats); std::unique_ptr mWorldFrameData; std::vector mSimulations; @@ -104,22 +108,15 @@ namespace MWPhysics int mNumJobs; int mRemainingSteps; int mLOSCacheExpiry; - std::size_t mFrameCounter; bool mAdvanceSimulation; - bool mQuit; std::atomic mNextJob; std::atomic mNextLOS; std::vector mThreads; - std::size_t mWorkersFrameCounter = 0; - std::condition_variable mWorkersDone; - std::mutex mWorkersDoneMutex; - mutable std::shared_mutex mSimulationMutex; mutable std::shared_mutex mCollisionWorldMutex; mutable std::shared_mutex mLOSCacheMutex; mutable std::mutex mUpdateAabbMutex; - std::condition_variable_any mHasJob; unsigned int mFrameNumber; const osg::Timer* mTimer; @@ -132,6 +129,8 @@ namespace MWPhysics osg::Timer_t mTimeBegin; osg::Timer_t mTimeEnd; osg::Timer_t mFrameStart; + + std::unique_ptr mWorkersSync; }; }