1
0
Fork 0
mirror of https://github.com/OpenMW/openmw.git synced 2025-02-06 23:45:35 +00:00

Do not use std::shared_mutex to wait for job for async physics

std::shared_mutex in combination with std::condition_variable_any may
lead to a situation when notify_all does not wake up all waiting threads
on Windows. Use separate std::mutex and std::condition_variable to
notify about new job. Encapsulate all workers synchronization logic into
a separate type.
This commit is contained in:
elsid 2023-03-05 16:11:10 +01:00
parent 31ae1cd339
commit 0040da3497
No known key found for this signature in database
GPG key ID: 4DE04C198CBA7625
2 changed files with 79 additions and 36 deletions

View file

@ -340,10 +340,63 @@ namespace MWPhysics
+ std::to_string(static_cast<std::underlying_type_t<LockingPolicy>>(lockingPolicy))); + std::to_string(static_cast<std::underlying_type_t<LockingPolicy>>(lockingPolicy)));
} }
} }
class PhysicsTaskScheduler::WorkersSync
{
public:
void waitForWorkers()
{
std::unique_lock lock(mWorkersDoneMutex);
if (mFrameCounter != mWorkersFrameCounter)
mWorkersDone.wait(lock);
} }
namespace MWPhysics void wakeUpWorkers()
{ {
const std::lock_guard lock(mHasJobMutex);
++mFrameCounter;
mHasJob.notify_all();
}
void stopWorkers()
{
const std::lock_guard lock(mHasJobMutex);
mShouldStop = true;
mHasJob.notify_all();
}
void workIsDone()
{
const std::lock_guard lock(mWorkersDoneMutex);
++mWorkersFrameCounter;
mWorkersDone.notify_all();
}
template <class F>
void runWorker(F&& f) noexcept
{
std::size_t lastFrame = 0;
std::unique_lock lock(mHasJobMutex);
while (!mShouldStop)
{
mHasJob.wait(lock, [&] { return mShouldStop || mFrameCounter != lastFrame; });
lastFrame = mFrameCounter;
lock.unlock();
f();
lock.lock();
}
}
private:
std::size_t mWorkersFrameCounter = 0;
std::condition_variable mWorkersDone;
std::mutex mWorkersDoneMutex;
std::condition_variable mHasJob;
bool mShouldStop = false;
std::size_t mFrameCounter = 0;
std::mutex mHasJobMutex;
};
PhysicsTaskScheduler::PhysicsTaskScheduler( PhysicsTaskScheduler::PhysicsTaskScheduler(
float physicsDt, btCollisionWorld* collisionWorld, MWRender::DebugDrawer* debugDrawer) float physicsDt, btCollisionWorld* collisionWorld, MWRender::DebugDrawer* debugDrawer)
: mDefaultPhysicsDt(physicsDt) : mDefaultPhysicsDt(physicsDt)
@ -356,9 +409,7 @@ namespace MWPhysics
, mNumJobs(0) , mNumJobs(0)
, mRemainingSteps(0) , mRemainingSteps(0)
, mLOSCacheExpiry(Settings::Manager::getInt("lineofsight keep inactive cache", "Physics")) , mLOSCacheExpiry(Settings::Manager::getInt("lineofsight keep inactive cache", "Physics"))
, mFrameCounter(0)
, mAdvanceSimulation(false) , mAdvanceSimulation(false)
, mQuit(false)
, mNextJob(0) , mNextJob(0)
, mNextLOS(0) , mNextLOS(0)
, mFrameNumber(0) , mFrameNumber(0)
@ -371,6 +422,7 @@ namespace MWPhysics
, mTimeBegin(0) , mTimeBegin(0)
, mTimeEnd(0) , mTimeEnd(0)
, mFrameStart(0) , mFrameStart(0)
, mWorkersSync(mNumThreads >= 1 ? std::make_unique<WorkersSync>() : nullptr)
{ {
if (mNumThreads >= 1) if (mNumThreads >= 1)
{ {
@ -395,11 +447,11 @@ namespace MWPhysics
waitForWorkers(); waitForWorkers();
{ {
MaybeExclusiveLock lock(mSimulationMutex, mLockingPolicy); MaybeExclusiveLock lock(mSimulationMutex, mLockingPolicy);
mQuit = true;
mNumJobs = 0; mNumJobs = 0;
mRemainingSteps = 0; mRemainingSteps = 0;
mHasJob.notify_all();
} }
if (mWorkersSync != nullptr)
mWorkersSync->stopWorkers();
for (auto& thread : mThreads) for (auto& thread : mThreads)
thread.join(); thread.join();
} }
@ -456,7 +508,14 @@ namespace MWPhysics
assert(mSimulations != &simulations); assert(mSimulations != &simulations);
waitForWorkers(); waitForWorkers();
prepareWork(timeAccum, simulations, frameStart, frameNumber, stats);
if (mWorkersSync != nullptr)
mWorkersSync->wakeUpWorkers();
}
void PhysicsTaskScheduler::prepareWork(float& timeAccum, std::vector<Simulation>& simulations,
osg::Timer_t frameStart, unsigned int frameNumber, osg::Stats& stats)
{
// This function run in the main thread. // This function run in the main thread.
// While the mSimulationMutex is held, background physics threads can't run. // While the mSimulationMutex is held, background physics threads can't run.
@ -489,7 +548,6 @@ namespace MWPhysics
mPhysicsDt = newDelta; mPhysicsDt = newDelta;
mSimulations = &simulations; mSimulations = &simulations;
mAdvanceSimulation = (mRemainingSteps != 0); mAdvanceSimulation = (mRemainingSteps != 0);
++mFrameCounter;
mNumJobs = mSimulations->size(); mNumJobs = mSimulations->size();
mNextLOS.store(0, std::memory_order_relaxed); mNextLOS.store(0, std::memory_order_relaxed);
mNextJob.store(0, std::memory_order_release); mNextJob.store(0, std::memory_order_release);
@ -510,7 +568,6 @@ namespace MWPhysics
} }
mAsyncStartTime = mTimer->tick(); mAsyncStartTime = mTimer->tick();
mHasJob.notify_all();
if (mAdvanceSimulation) if (mAdvanceSimulation)
mBudget.update(mTimer->delta_s(timeStart, mTimer->tick()), 1, mBudgetCursor); mBudget.update(mTimer->delta_s(timeStart, mTimer->tick()), 1, mBudgetCursor);
} }
@ -687,18 +744,10 @@ namespace MWPhysics
void PhysicsTaskScheduler::worker() void PhysicsTaskScheduler::worker()
{ {
std::size_t lastFrame = 0; mWorkersSync->runWorker([this] {
std::shared_lock lock(mSimulationMutex); std::shared_lock lock(mSimulationMutex);
while (!mQuit)
{
if (lastFrame == mFrameCounter)
{
mHasJob.wait(lock, [&] { return mQuit || lastFrame != mFrameCounter; });
lastFrame = mFrameCounter;
}
doSimulation(); doSimulation();
} });
} }
void PhysicsTaskScheduler::updateActorsPositions() void PhysicsTaskScheduler::updateActorsPositions()
@ -817,10 +866,8 @@ namespace MWPhysics
mLOSCache.end()); mLOSCache.end());
} }
mTimeEnd = mTimer->tick(); mTimeEnd = mTimer->tick();
if (mWorkersSync != nullptr)
std::unique_lock lock(mWorkersDoneMutex); mWorkersSync->workIsDone();
++mWorkersFrameCounter;
mWorkersDone.notify_all();
} }
void PhysicsTaskScheduler::syncWithMainThread() void PhysicsTaskScheduler::syncWithMainThread()
@ -842,10 +889,7 @@ namespace MWPhysics
// https://docs.microsoft.com/en-us/windows/win32/sync/slim-reader-writer--srw--locks // https://docs.microsoft.com/en-us/windows/win32/sync/slim-reader-writer--srw--locks
void PhysicsTaskScheduler::waitForWorkers() void PhysicsTaskScheduler::waitForWorkers()
{ {
if (mNumThreads == 0) if (mWorkersSync != nullptr)
return; mWorkersSync->waitForWorkers();
std::unique_lock lock(mWorkersDoneMutex);
if (mFrameCounter != mWorkersFrameCounter)
mWorkersDone.wait(lock);
} }
} }

View file

@ -71,6 +71,8 @@ namespace MWPhysics
// ~PhysicsTaskScheduler() // ~PhysicsTaskScheduler()
private: private:
class WorkersSync;
void doSimulation(); void doSimulation();
void worker(); void worker();
void updateActorsPositions(); void updateActorsPositions();
@ -85,6 +87,8 @@ namespace MWPhysics
void afterPostSim(); void afterPostSim();
void syncWithMainThread(); void syncWithMainThread();
void waitForWorkers(); void waitForWorkers();
void prepareWork(float& timeAccum, std::vector<Simulation>& simulations, osg::Timer_t frameStart,
unsigned int frameNumber, osg::Stats& stats);
std::unique_ptr<WorldFrameData> mWorldFrameData; std::unique_ptr<WorldFrameData> mWorldFrameData;
std::vector<Simulation>* mSimulations = nullptr; std::vector<Simulation>* mSimulations = nullptr;
@ -107,22 +111,15 @@ namespace MWPhysics
int mNumJobs; int mNumJobs;
int mRemainingSteps; int mRemainingSteps;
int mLOSCacheExpiry; int mLOSCacheExpiry;
std::size_t mFrameCounter;
bool mAdvanceSimulation; bool mAdvanceSimulation;
bool mQuit;
std::atomic<int> mNextJob; std::atomic<int> mNextJob;
std::atomic<int> mNextLOS; std::atomic<int> mNextLOS;
std::vector<std::thread> mThreads; std::vector<std::thread> mThreads;
std::size_t mWorkersFrameCounter = 0;
std::condition_variable mWorkersDone;
std::mutex mWorkersDoneMutex;
mutable std::shared_mutex mSimulationMutex; mutable std::shared_mutex mSimulationMutex;
mutable std::shared_mutex mCollisionWorldMutex; mutable std::shared_mutex mCollisionWorldMutex;
mutable std::shared_mutex mLOSCacheMutex; mutable std::shared_mutex mLOSCacheMutex;
mutable std::mutex mUpdateAabbMutex; mutable std::mutex mUpdateAabbMutex;
std::condition_variable_any mHasJob;
unsigned int mFrameNumber; unsigned int mFrameNumber;
const osg::Timer* mTimer; const osg::Timer* mTimer;
@ -135,6 +132,8 @@ namespace MWPhysics
osg::Timer_t mTimeBegin; osg::Timer_t mTimeBegin;
osg::Timer_t mTimeEnd; osg::Timer_t mTimeEnd;
osg::Timer_t mFrameStart; osg::Timer_t mFrameStart;
std::unique_ptr<WorkersSync> mWorkersSync;
}; };
} }