Use OpenThreads instead of boost thread

This should allow OpenMW to work better with git versions of openscenegraph. OSG dev version 3.5.5 added the setting of thread affinity for the main thread. The problem is that in the boost/standard threading libraries, the affinity of a thread is inherited by any further threads launched from that thread, leading to these threads always running on the same core as the main thread unless you tell them not to.

With OpenThreads, the default affinity of a thread is none, no matter what parent thread it was launched from.

So, when using custom threading with OSG 3.6+, we have these options:
1. explicitely tell OSG to *not* set the thread affinity
or 2. explicitely set the thread affinity of additional threads created (possible with boost, but not possible with std::thread)
or 3. use OpenThreads
or 4. accept the suboptimal performance of non-OSG threads (in OpenMW's case the sound streaming & video threads) running on the same core as the main thread

This patch opts for 3.)

Reference: http://forum.openscenegraph.org/viewtopic.php?t=16158
pull/1099/head
scrawl 8 years ago
parent 105fcc5e20
commit 530fb61ad0

@ -3,12 +3,16 @@
#include <iostream> #include <iostream>
#include <vector> #include <vector>
#include <memory> #include <memory>
#include <cstring>
#include <stdint.h> #include <stdint.h>
#include <components/vfs/manager.hpp> #include <components/vfs/manager.hpp>
#include <boost/thread.hpp> #include <OpenThreads/Thread>
#include <OpenThreads/Condition>
#include <OpenThreads/Mutex>
#include <OpenThreads/ScopedLock>
#include "openal_output.hpp" #include "openal_output.hpp"
#include "sound_decoder.hpp" #include "sound_decoder.hpp"
@ -243,31 +247,31 @@ const ALfloat OpenAL_SoundStream::sBufferLength = 0.125f;
// //
// A background streaming thread (keeps active streams processed) // A background streaming thread (keeps active streams processed)
// //
struct OpenAL_Output::StreamThread { struct OpenAL_Output::StreamThread : public OpenThreads::Thread {
typedef std::vector<OpenAL_SoundStream*> StreamVec; typedef std::vector<OpenAL_SoundStream*> StreamVec;
StreamVec mStreams; StreamVec mStreams;
volatile bool mQuitNow; volatile bool mQuitNow;
boost::mutex mMutex; OpenThreads::Mutex mMutex;
boost::condition_variable mCondVar; OpenThreads::Condition mCondVar;
boost::thread mThread;
StreamThread() StreamThread()
: mQuitNow(false), mThread(boost::ref(*this)) : mQuitNow(false)
{ {
start();
} }
~StreamThread() ~StreamThread()
{ {
mQuitNow = true; mQuitNow = true;
mMutex.lock(); mMutex.unlock(); mMutex.lock(); mMutex.unlock();
mCondVar.notify_all(); mCondVar.broadcast();
mThread.join(); join();
} }
// boost::thread entry point // thread entry point
void operator()() virtual void run()
{ {
boost::unique_lock<boost::mutex> lock(mMutex); OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex);
while(!mQuitNow) while(!mQuitNow)
{ {
StreamVec::iterator iter = mStreams.begin(); StreamVec::iterator iter = mStreams.begin();
@ -279,31 +283,30 @@ struct OpenAL_Output::StreamThread {
++iter; ++iter;
} }
mCondVar.timed_wait(lock, boost::posix_time::milliseconds(50)); mCondVar.wait(&mMutex, 50);
} }
} }
void add(OpenAL_SoundStream *stream) void add(OpenAL_SoundStream *stream)
{ {
boost::unique_lock<boost::mutex> lock(mMutex); OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex);
if(std::find(mStreams.begin(), mStreams.end(), stream) == mStreams.end()) if(std::find(mStreams.begin(), mStreams.end(), stream) == mStreams.end())
{ {
mStreams.push_back(stream); mStreams.push_back(stream);
lock.unlock(); mCondVar.broadcast();
mCondVar.notify_all();
} }
} }
void remove(OpenAL_SoundStream *stream) void remove(OpenAL_SoundStream *stream)
{ {
boost::lock_guard<boost::mutex> lock(mMutex); OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex);
StreamVec::iterator iter = std::find(mStreams.begin(), mStreams.end(), stream); StreamVec::iterator iter = std::find(mStreams.begin(), mStreams.end(), stream);
if(iter != mStreams.end()) mStreams.erase(iter); if(iter != mStreams.end()) mStreams.erase(iter);
} }
void removeAll() void removeAll()
{ {
boost::lock_guard<boost::mutex> lock(mMutex); OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex);
mStreams.clear(); mStreams.clear();
} }
@ -468,7 +471,7 @@ ALint OpenAL_SoundStream::refillQueue()
if(got < data.size()) if(got < data.size())
{ {
mIsFinished = true; mIsFinished = true;
memset(&data[got], mSilence, data.size()-got); std::memset(&data[got], mSilence, data.size()-got);
} }
if(got > 0) if(got > 0)
{ {
@ -1023,7 +1026,7 @@ double OpenAL_Output::getStreamOffset(MWBase::SoundStreamPtr sound)
{ {
if(!sound->mHandle) return 0.0; if(!sound->mHandle) return 0.0;
OpenAL_SoundStream *stream = reinterpret_cast<OpenAL_SoundStream*>(sound->mHandle); OpenAL_SoundStream *stream = reinterpret_cast<OpenAL_SoundStream*>(sound->mHandle);
boost::lock_guard<boost::mutex> lock(mStreamThread->mMutex); OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mStreamThread->mMutex);
return stream->getStreamOffset(); return stream->getStreamOffset();
} }
@ -1031,7 +1034,7 @@ float OpenAL_Output::getStreamLoudness(MWBase::SoundStreamPtr sound)
{ {
if(!sound->mHandle) return 0.0; if(!sound->mHandle) return 0.0;
OpenAL_SoundStream *stream = reinterpret_cast<OpenAL_SoundStream*>(sound->mHandle); OpenAL_SoundStream *stream = reinterpret_cast<OpenAL_SoundStream*>(sound->mHandle);
boost::lock_guard<boost::mutex> lock(mStreamThread->mMutex); OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mStreamThread->mMutex);
return stream->getCurrentLoudness(); return stream->getCurrentLoudness();
} }
@ -1039,7 +1042,7 @@ bool OpenAL_Output::isStreamPlaying(MWBase::SoundStreamPtr sound)
{ {
if(!sound->mHandle) return false; if(!sound->mHandle) return false;
OpenAL_SoundStream *stream = reinterpret_cast<OpenAL_SoundStream*>(sound->mHandle); OpenAL_SoundStream *stream = reinterpret_cast<OpenAL_SoundStream*>(sound->mHandle);
boost::lock_guard<boost::mutex> lock(mStreamThread->mMutex); OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mStreamThread->mMutex);
return stream->isPlaying(); return stream->isPlaying();
} }

@ -1,5 +1,6 @@
#include "audiodecoder.hpp" #include "audiodecoder.hpp"
#include <stdexcept>
extern "C" extern "C"
{ {

@ -1,6 +1,7 @@
#include "videostate.hpp" #include "videostate.hpp"
#include <iostream> #include <iostream>
#include <stdexcept>
#include <osg/Texture2D> #include <osg/Texture2D>
@ -102,14 +103,14 @@ void PacketQueue::put(AVPacket *pkt)
this->last_pkt = pkt1; this->last_pkt = pkt1;
this->nb_packets++; this->nb_packets++;
this->size += pkt1->pkt.size; this->size += pkt1->pkt.size;
this->cond.notify_one(); this->cond.signal();
this->mutex.unlock(); this->mutex.unlock();
} }
int PacketQueue::get(AVPacket *pkt, VideoState *is) int PacketQueue::get(AVPacket *pkt, VideoState *is)
{ {
boost::unique_lock<boost::mutex> lock(this->mutex); OpenThreads::ScopedLock<OpenThreads::Mutex> lock(this->mutex);
while(!is->mQuit) while(!is->mQuit)
{ {
AVPacketList *pkt1 = this->first_pkt; AVPacketList *pkt1 = this->first_pkt;
@ -129,7 +130,7 @@ int PacketQueue::get(AVPacket *pkt, VideoState *is)
if(this->flushing) if(this->flushing)
break; break;
this->cond.wait(lock); this->cond.wait(&this->mutex);
} }
return -1; return -1;
@ -138,7 +139,7 @@ int PacketQueue::get(AVPacket *pkt, VideoState *is)
void PacketQueue::flush() void PacketQueue::flush()
{ {
this->flushing = true; this->flushing = true;
this->cond.notify_one(); this->cond.signal();
} }
void PacketQueue::clear() void PacketQueue::clear()
@ -233,7 +234,7 @@ void VideoState::video_display(VideoPicture *vp)
void VideoState::video_refresh() void VideoState::video_refresh()
{ {
boost::mutex::scoped_lock lock(this->pictq_mutex); OpenThreads::ScopedLock<OpenThreads::Mutex> lock(this->pictq_mutex);
if(this->pictq_size == 0) if(this->pictq_size == 0)
return; return;
@ -245,7 +246,7 @@ void VideoState::video_refresh()
this->pictq_rindex = (pictq_rindex+1) % VIDEO_PICTURE_ARRAY_SIZE; this->pictq_rindex = (pictq_rindex+1) % VIDEO_PICTURE_ARRAY_SIZE;
this->frame_last_pts = vp->pts; this->frame_last_pts = vp->pts;
this->pictq_size--; this->pictq_size--;
this->pictq_cond.notify_one(); this->pictq_cond.signal();
} }
else else
{ {
@ -275,7 +276,7 @@ void VideoState::video_refresh()
// update queue for next picture // update queue for next picture
this->pictq_size--; this->pictq_size--;
this->pictq_rindex = (this->pictq_rindex+1) % VIDEO_PICTURE_ARRAY_SIZE; this->pictq_rindex = (this->pictq_rindex+1) % VIDEO_PICTURE_ARRAY_SIZE;
this->pictq_cond.notify_one(); this->pictq_cond.signal();
} }
} }
@ -286,9 +287,9 @@ int VideoState::queue_picture(AVFrame *pFrame, double pts)
/* wait until we have a new pic */ /* wait until we have a new pic */
{ {
boost::unique_lock<boost::mutex> lock(this->pictq_mutex); OpenThreads::ScopedLock<OpenThreads::Mutex> lock(this->pictq_mutex);
while(this->pictq_size >= VIDEO_PICTURE_QUEUE_SIZE && !this->mQuit) while(this->pictq_size >= VIDEO_PICTURE_QUEUE_SIZE && !this->mQuit)
this->pictq_cond.timed_wait(lock, boost::posix_time::milliseconds(1)); this->pictq_cond.wait(&this->pictq_mutex, 1);
} }
if(this->mQuit) if(this->mQuit)
return -1; return -1;
@ -371,9 +372,18 @@ static void our_free_buffer(void *opaque, uint8_t *data)
av_free(data); av_free(data);
} }
class VideoThread : public OpenThreads::Thread
{
public:
VideoThread(VideoState* self)
: mVideoState(self)
{
start();
}
void VideoState::video_thread_loop(VideoState *self) virtual void run()
{ {
VideoState* self = mVideoState;
AVPacket pkt1, *packet = &pkt1; AVPacket pkt1, *packet = &pkt1;
int frameFinished; int frameFinished;
AVFrame *pFrame; AVFrame *pFrame;
@ -430,8 +440,23 @@ void VideoState::video_thread_loop(VideoState *self)
av_free(self->rgbaFrame); av_free(self->rgbaFrame);
} }
void VideoState::decode_thread_loop(VideoState *self) private:
VideoState* mVideoState;
};
class ParseThread : public OpenThreads::Thread
{
public:
ParseThread(VideoState* self)
: mVideoState(self)
{
start();
}
virtual void run()
{ {
VideoState* self = mVideoState;
AVFormatContext *pFormatCtx = self->format_ctx; AVFormatContext *pFormatCtx = self->format_ctx;
AVPacket pkt1, *packet = &pkt1; AVPacket pkt1, *packet = &pkt1;
@ -505,7 +530,7 @@ void VideoState::decode_thread_loop(VideoState *self)
if((self->audio_st && self->audioq.size > MAX_AUDIOQ_SIZE) || if((self->audio_st && self->audioq.size > MAX_AUDIOQ_SIZE) ||
(self->video_st && self->videoq.size > MAX_VIDEOQ_SIZE)) (self->video_st && self->videoq.size > MAX_VIDEOQ_SIZE))
{ {
boost::this_thread::sleep(boost::posix_time::milliseconds(10)); OpenThreads::Thread::microSleep(10 * 1000);
continue; continue;
} }
@ -534,6 +559,10 @@ void VideoState::decode_thread_loop(VideoState *self)
self->mQuit = true; self->mQuit = true;
} }
private:
VideoState* mVideoState;
};
bool VideoState::update() bool VideoState::update()
{ {
@ -587,7 +616,7 @@ int VideoState::stream_open(int stream_index, AVFormatContext *pFormatCtx)
this->video_st = pFormatCtx->streams + stream_index; this->video_st = pFormatCtx->streams + stream_index;
codecCtx->get_buffer2 = our_get_buffer; codecCtx->get_buffer2 = our_get_buffer;
this->video_thread = boost::thread(video_thread_loop, this); this->video_thread.reset(new VideoThread(this));
break; break;
default: default:
@ -669,7 +698,7 @@ void VideoState::init(boost::shared_ptr<std::istream> inputstream, const std::st
} }
this->parse_thread = boost::thread(decode_thread_loop, this); this->parse_thread.reset(new ParseThread(this));
} }
void VideoState::deinit() void VideoState::deinit()
@ -681,10 +710,16 @@ void VideoState::deinit()
mAudioDecoder.reset(); mAudioDecoder.reset();
if (this->parse_thread.joinable()) if (this->parse_thread.get())
this->parse_thread.join(); {
if (this->video_thread.joinable()) this->parse_thread->join();
this->video_thread.join(); this->parse_thread.reset();
}
if (this->video_thread.get())
{
this->video_thread->join();
this->video_thread.reset();
}
if(this->audio_st) if(this->audio_st)
avcodec_close((*this->audio_st)->codec); avcodec_close((*this->audio_st)->codec);
@ -779,7 +814,7 @@ ExternalClock::ExternalClock()
void ExternalClock::setPaused(bool paused) void ExternalClock::setPaused(bool paused)
{ {
boost::mutex::scoped_lock lock(mMutex); OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex);
if (mPaused == paused) if (mPaused == paused)
return; return;
if (paused) if (paused)
@ -793,7 +828,7 @@ void ExternalClock::setPaused(bool paused)
uint64_t ExternalClock::get() uint64_t ExternalClock::get()
{ {
boost::mutex::scoped_lock lock(mMutex); OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex);
if (mPaused) if (mPaused)
return mPausedAt; return mPausedAt;
else else
@ -802,7 +837,7 @@ uint64_t ExternalClock::get()
void ExternalClock::set(uint64_t time) void ExternalClock::set(uint64_t time)
{ {
boost::mutex::scoped_lock lock(mMutex); OpenThreads::ScopedLock<OpenThreads::Mutex> lock(mMutex);
mTimeBase = av_gettime() - time; mTimeBase = av_gettime() - time;
mPausedAt = time; mPausedAt = time;
} }

@ -2,8 +2,14 @@
#define VIDEOPLAYER_VIDEOSTATE_H #define VIDEOPLAYER_VIDEOSTATE_H
#include <stdint.h> #include <stdint.h>
#include <vector>
#include <memory>
#include <boost/thread.hpp> #include <boost/shared_ptr.hpp>
#include <OpenThreads/Thread>
#include <OpenThreads/Mutex>
#include <OpenThreads/Condition>
#include <osg/ref_ptr> #include <osg/ref_ptr>
namespace osg namespace osg
@ -34,6 +40,8 @@ struct VideoState;
class MovieAudioFactory; class MovieAudioFactory;
class MovieAudioDecoder; class MovieAudioDecoder;
class VideoThread;
class ParseThread;
struct ExternalClock struct ExternalClock
{ {
@ -43,7 +51,7 @@ struct ExternalClock
uint64_t mPausedAt; uint64_t mPausedAt;
bool mPaused; bool mPaused;
boost::mutex mMutex; OpenThreads::Mutex mMutex;
void setPaused(bool paused); void setPaused(bool paused);
uint64_t get(); uint64_t get();
@ -62,8 +70,8 @@ struct PacketQueue {
int nb_packets; int nb_packets;
int size; int size;
boost::mutex mutex; OpenThreads::Mutex mutex;
boost::condition_variable cond; OpenThreads::Condition cond;
void put(AVPacket *pkt); void put(AVPacket *pkt);
int get(AVPacket *pkt, VideoState *is); int get(AVPacket *pkt, VideoState *is);
@ -141,11 +149,11 @@ struct VideoState {
VideoPicture pictq[VIDEO_PICTURE_ARRAY_SIZE]; VideoPicture pictq[VIDEO_PICTURE_ARRAY_SIZE];
AVFrame* rgbaFrame; // used as buffer for the frame converted from its native format to RGBA AVFrame* rgbaFrame; // used as buffer for the frame converted from its native format to RGBA
int pictq_size, pictq_rindex, pictq_windex; int pictq_size, pictq_rindex, pictq_windex;
boost::mutex pictq_mutex; OpenThreads::Mutex pictq_mutex;
boost::condition_variable pictq_cond; OpenThreads::Condition pictq_cond;
boost::thread parse_thread; std::auto_ptr<ParseThread> parse_thread;
boost::thread video_thread; std::auto_ptr<VideoThread> video_thread;
volatile bool mSeekRequested; volatile bool mSeekRequested;
uint64_t mSeekPos; uint64_t mSeekPos;

Loading…
Cancel
Save