From 7cc27d9b663adad06b6ac3a16a1fd2037aefd780 Mon Sep 17 00:00:00 2001 From: athile Date: Thu, 1 Jul 2010 15:50:24 -0700 Subject: [PATCH] Add command output string and client/server response. --- CMakeLists.txt | 1 + apps/clientconsole/client.cpp | 62 ++++++++++++++++++----- apps/openmw/engine.cpp | 14 ++++-- apps/openmw/engine.hpp | 3 +- components/commandserver/command.hpp | 19 +++++++ components/commandserver/server.cpp | 48 +++++++++++++----- components/commandserver/server.hpp | 11 +++-- components/misc/tsdeque.hpp | 74 ++++++++++++++++++---------- 8 files changed, 172 insertions(+), 60 deletions(-) create mode 100755 components/commandserver/command.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index fa3ad115df..ae17b7930c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -99,6 +99,7 @@ set(INPUT_HEADER source_group(components\\engine\\input FILES ${INPUT} ${INPUT_HEADER}) set(COMMANDSERVER + components/commandserver/command.hpp components/commandserver/server.hpp components/commandserver/server.cpp) source_group(components\\commandserver FILES ${COMMANDSERVER}) diff --git a/apps/clientconsole/client.cpp b/apps/clientconsole/client.cpp index dac95ce993..119f8c5547 100755 --- a/apps/clientconsole/client.cpp +++ b/apps/clientconsole/client.cpp @@ -9,6 +9,12 @@ using boost::asio::ip::tcp; class Client { protected: + struct Header + { + char magic[4]; + boost::uint32_t dataLength; + }; + boost::asio::io_service mIOService; tcp::socket* mpSocket; @@ -39,11 +45,6 @@ public: bool send (const char* msg) { - struct Header - { - char magic[4]; - boost::uint32_t dataLength; - }; const size_t slen = strlen(msg); const size_t plen = sizeof(Header) + slen + 1; @@ -61,6 +62,33 @@ public: return !ec; } + + bool receive (std::string& reply) + { + Header header; + boost::system::error_code error; + mpSocket->read_some(boost::asio::buffer(&header, sizeof(Header)), error); + + if (error != boost::asio::error::eof) + { + if (strncmp(header.magic, "OMW0", 4) == 0) + { + std::vector msg; + msg.resize(header.dataLength); + + boost::system::error_code error; + mpSocket->read_some(boost::asio::buffer(&msg[0], header.dataLength), error); + if (!error) + { + reply = &msg[0]; + return true; + } + } + else + throw std::exception("Unexpected header!"); + } + return false; + } }; @@ -79,14 +107,26 @@ int main(int argc, char* argv[]) bool bDone = false; do { - std::cout << "> "; - char buffer[1024]; - gets(buffer); + std::cout << "Client> "; + std::string buffer; + std::getline(std::cin, buffer); - if (std::string(buffer) != "quit") - bDone = !client.send(buffer); - else + if (buffer == "quit") bDone = true; + else + { + if (client.send(buffer.c_str())) + { + std::string reply; + if (client.receive(reply)) + std::cout << "Server: " << reply << std::endl; + else + bDone = true; + } + else + bDone = true; + } + } while (!bDone); client.disconnect(); diff --git a/apps/openmw/engine.cpp b/apps/openmw/engine.cpp index 601df6354c..12cb67cca4 100644 --- a/apps/openmw/engine.cpp +++ b/apps/openmw/engine.cpp @@ -32,7 +32,8 @@ OMW::Engine::Engine() : mEnableSky (false) , mpSkyManager (NULL) { - mspCommandServer.reset(new OMW::CommandServer::Server(&mCommands, kCommandServerPort)); + mspCommandServer.reset( + new OMW::CommandServer::Server(&mCommandQueue, kCommandServerPort)); } // Load all BSA files in data directory. @@ -100,11 +101,16 @@ void OMW::Engine::enableSky (bool bEnable) void OMW::Engine::processCommands() { - std::string msg; - while (mCommands.pop_front(msg)) + Command cmd; + while (mCommandQueue.try_pop_front(cmd)) { ///\todo Add actual processing of the received command strings - std::cout << "Command: '" << msg << "'" << std::endl; + std::cout << "Command: '" << cmd.mCommand << "'" << std::endl; + + ///\todo Replace with real output. For now, echo back the string in uppercase + std::string reply(cmd.mCommand); + std::transform(reply.begin(), reply.end(), reply.begin(), toupper); + cmd.mReplyFunction(reply); } } diff --git a/apps/openmw/engine.hpp b/apps/openmw/engine.hpp index 7874070192..6ec9cadf5c 100644 --- a/apps/openmw/engine.hpp +++ b/apps/openmw/engine.hpp @@ -8,6 +8,7 @@ #include "apps/openmw/mwrender/mwscene.hpp" #include "components/misc/tsdeque.hpp" #include "components/commandserver/server.hpp" +#include "components/commandserver/command.hpp" namespace MWRender @@ -31,7 +32,7 @@ namespace OMW bool mEnableSky; MWRender::SkyManager* mpSkyManager; - TsDeque mCommands; + TsDeque mCommandQueue; std::auto_ptr mspCommandServer; // not implemented diff --git a/components/commandserver/command.hpp b/components/commandserver/command.hpp new file mode 100755 index 0000000000..b221d512bf --- /dev/null +++ b/components/commandserver/command.hpp @@ -0,0 +1,19 @@ +#ifndef COMMANDSERVER_COMMAND_HPP +#define COMMANDSERVER_COMMAND_HPP + +namespace OMW +{ + /// + /// A Command is currently defined as a string input that, when processed, + /// will generate a string output. The string output is passed to the + /// mReplyFunction as soon as the command has been processed. + /// + class Command + { + public: + std::string mCommand; + boost::function1 mReplyFunction; + }; +} + +#endif COMMANDSERVER_COMMAND_HPP diff --git a/components/commandserver/server.cpp b/components/commandserver/server.cpp index 94ce03f97a..7b9198e0b1 100755 --- a/components/commandserver/server.cpp +++ b/components/commandserver/server.cpp @@ -9,6 +9,12 @@ using boost::asio::ip::tcp; // namespace OMW { namespace CommandServer { namespace Detail { + struct Header + { + char magic[4]; + size_t dataLength; + } header; + /// /// Tracks an active connection to the CommandServer /// @@ -19,7 +25,9 @@ namespace OMW { namespace CommandServer { namespace Detail { void start(); void stop(); + tcp::socket& socket(); + void reply (std::string s); protected: void handle (); @@ -53,18 +61,29 @@ namespace OMW { namespace CommandServer { namespace Detail { { return mSocket; } + + void Connection::reply (std::string reply) + { + const size_t plen = sizeof(Header) + reply.length() + 1; + + std::vector packet(plen); + Header* pHeader = reinterpret_cast(&packet[0]); + strncpy(pHeader->magic, "OMW0", 4); + pHeader->dataLength = reply.length() + 1; // Include the null terminator + strncpy(&packet[8], reply.c_str(), pHeader->dataLength); + + boost::system::error_code ec; + boost::asio::write(mSocket, boost::asio::buffer(packet), + boost::asio::transfer_all(), ec); + if (ec) + std::cout << "Error: " << ec.message() << std::endl; + } void Connection::handle () { bool bDone = false; while (!bDone) { - struct Header - { - char magic[4]; - size_t dataLength; - } header; - // Read the header boost::system::error_code error; mSocket.read_some(boost::asio::buffer(&header, sizeof(Header)), error); @@ -79,7 +98,7 @@ namespace OMW { namespace CommandServer { namespace Detail { boost::system::error_code error; mSocket.read_some(boost::asio::buffer(&msg[0], header.dataLength), error); if (!error) - mpServer->postMessage( &msg[0] ); + mpServer->postCommand(this, &msg[0]); else bDone = true; } @@ -98,10 +117,10 @@ namespace OMW { namespace CommandServer { using namespace Detail; - Server::Server (Deque* pDeque, const int port) - : mAcceptor (mIOService, tcp::endpoint(tcp::v4(), port)) - , mpCommands (pDeque) - , mbStopping (false) + Server::Server (Deque* pCommandQueue, const int port) + : mAcceptor (mIOService, tcp::endpoint(tcp::v4(), port)) + , mpCommandQueue (pCommandQueue) + , mbStopping (false) { } @@ -149,9 +168,12 @@ namespace OMW { namespace CommandServer { delete ptr; } - void Server::postMessage (const char* s) + void Server::postCommand (Connection* pConnection, const char* s) { - mpCommands->push_back(s); + Command cmd; + cmd.mCommand = s; + cmd.mReplyFunction = std::bind1st(std::mem_fun(&Connection::reply), pConnection); + mpCommandQueue->push_back(cmd); } void Server::threadMain() diff --git a/components/commandserver/server.hpp b/components/commandserver/server.hpp index 8a2a74d1d2..b53e0f23a9 100755 --- a/components/commandserver/server.hpp +++ b/components/commandserver/server.hpp @@ -9,6 +9,7 @@ #include #include "components/misc/tsdeque.hpp" +#include "components/commandserver/command.hpp" namespace OMW { namespace CommandServer { @@ -27,9 +28,9 @@ namespace OMW { namespace CommandServer class Server { public: - typedef TsDeque Deque; + typedef TsDeque Deque; - Server (Deque* pDeque, const int port); + Server (Deque* pCommandQueue, const int port); void start(); void stop(); @@ -39,7 +40,7 @@ namespace OMW { namespace CommandServer typedef std::set ConnectionSet; void removeConnection (Detail::Connection* ptr); - void postMessage (const char* s); + void postCommand (Detail::Connection*, const char* s); void threadMain(); @@ -53,8 +54,8 @@ namespace OMW { namespace CommandServer ConnectionSet mConnections; mutable boost::mutex mConnectionsMutex; - // Pointer to output queue in which to put received strings - Deque* mpCommands; + // Pointer to command queue + Deque* mpCommandQueue; }; }} diff --git a/components/misc/tsdeque.hpp b/components/misc/tsdeque.hpp index db14428bcb..d63a132ef3 100755 --- a/components/misc/tsdeque.hpp +++ b/components/misc/tsdeque.hpp @@ -3,32 +3,54 @@ #include -template -class TsDeque -{ -public: - void push_back (const T& t) - { - boost::mutex::scoped_lock lock(mMutex); - mDeque.push_back(t); - } - - bool pop_front (T& t) - { - boost::mutex::scoped_lock lock(mMutex); - if (!mDeque.empty()) - { - t = mDeque.front(); - mDeque.pop_front(); - return true; - } - else - return false; - } - -protected: - std::deque mDeque; - mutable boost::mutex mMutex; +// +// Adapted from http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html +// +template +class TsDeque +{ +private: + std::deque the_queue; + mutable boost::mutex the_mutex; + boost::condition_variable the_condition_variable; + +public: + void push_back(Data const& data) + { + boost::mutex::scoped_lock lock(the_mutex); + the_queue.push_back(data); + lock.unlock(); + the_condition_variable.notify_one(); + } + + bool empty() const + { + boost::mutex::scoped_lock lock(the_mutex); + return the_queue.empty(); + } + + bool try_pop_front(Data& popped_value) + { + boost::mutex::scoped_lock lock(the_mutex); + if(the_queue.empty()) + return false; + + popped_value=the_queue.front(); + the_queue.pop_front(); + return true; + } + + void wait_and_pop_front(Data& popped_value) + { + boost::mutex::scoped_lock lock(the_mutex); + while(the_queue.empty()) + { + the_condition_variable.wait(lock); + } + + popped_value=the_queue.front(); + the_queue.pop_front(); + } }; #endif // TSDEQUE_H