Add command output string and client/server response.

pull/7/head
athile 15 years ago
parent d44f322b8a
commit 7cc27d9b66

@ -99,6 +99,7 @@ set(INPUT_HEADER
source_group(components\\engine\\input FILES ${INPUT} ${INPUT_HEADER}) source_group(components\\engine\\input FILES ${INPUT} ${INPUT_HEADER})
set(COMMANDSERVER set(COMMANDSERVER
components/commandserver/command.hpp
components/commandserver/server.hpp components/commandserver/server.hpp
components/commandserver/server.cpp) components/commandserver/server.cpp)
source_group(components\\commandserver FILES ${COMMANDSERVER}) source_group(components\\commandserver FILES ${COMMANDSERVER})

@ -9,6 +9,12 @@ using boost::asio::ip::tcp;
class Client class Client
{ {
protected: protected:
struct Header
{
char magic[4];
boost::uint32_t dataLength;
};
boost::asio::io_service mIOService; boost::asio::io_service mIOService;
tcp::socket* mpSocket; tcp::socket* mpSocket;
@ -39,11 +45,6 @@ public:
bool send (const char* msg) bool send (const char* msg)
{ {
struct Header
{
char magic[4];
boost::uint32_t dataLength;
};
const size_t slen = strlen(msg); const size_t slen = strlen(msg);
const size_t plen = sizeof(Header) + slen + 1; const size_t plen = sizeof(Header) + slen + 1;
@ -61,6 +62,33 @@ public:
return !ec; return !ec;
} }
bool receive (std::string& reply)
{
Header header;
boost::system::error_code error;
mpSocket->read_some(boost::asio::buffer(&header, sizeof(Header)), error);
if (error != boost::asio::error::eof)
{
if (strncmp(header.magic, "OMW0", 4) == 0)
{
std::vector<char> msg;
msg.resize(header.dataLength);
boost::system::error_code error;
mpSocket->read_some(boost::asio::buffer(&msg[0], header.dataLength), error);
if (!error)
{
reply = &msg[0];
return true;
}
}
else
throw std::exception("Unexpected header!");
}
return false;
}
}; };
@ -79,14 +107,26 @@ int main(int argc, char* argv[])
bool bDone = false; bool bDone = false;
do do
{ {
std::cout << "> "; std::cout << "Client> ";
char buffer[1024]; std::string buffer;
gets(buffer); std::getline(std::cin, buffer);
if (std::string(buffer) != "quit") if (buffer == "quit")
bDone = !client.send(buffer); bDone = true;
else
{
if (client.send(buffer.c_str()))
{
std::string reply;
if (client.receive(reply))
std::cout << "Server: " << reply << std::endl;
else
bDone = true;
}
else else
bDone = true; bDone = true;
}
} while (!bDone); } while (!bDone);
client.disconnect(); client.disconnect();

@ -32,7 +32,8 @@ OMW::Engine::Engine()
: mEnableSky (false) : mEnableSky (false)
, mpSkyManager (NULL) , mpSkyManager (NULL)
{ {
mspCommandServer.reset(new OMW::CommandServer::Server(&mCommands, kCommandServerPort)); mspCommandServer.reset(
new OMW::CommandServer::Server(&mCommandQueue, kCommandServerPort));
} }
// Load all BSA files in data directory. // Load all BSA files in data directory.
@ -100,11 +101,16 @@ void OMW::Engine::enableSky (bool bEnable)
void OMW::Engine::processCommands() void OMW::Engine::processCommands()
{ {
std::string msg; Command cmd;
while (mCommands.pop_front(msg)) while (mCommandQueue.try_pop_front(cmd))
{ {
///\todo Add actual processing of the received command strings ///\todo Add actual processing of the received command strings
std::cout << "Command: '" << msg << "'" << std::endl; std::cout << "Command: '" << cmd.mCommand << "'" << std::endl;
///\todo Replace with real output. For now, echo back the string in uppercase
std::string reply(cmd.mCommand);
std::transform(reply.begin(), reply.end(), reply.begin(), toupper);
cmd.mReplyFunction(reply);
} }
} }

@ -8,6 +8,7 @@
#include "apps/openmw/mwrender/mwscene.hpp" #include "apps/openmw/mwrender/mwscene.hpp"
#include "components/misc/tsdeque.hpp" #include "components/misc/tsdeque.hpp"
#include "components/commandserver/server.hpp" #include "components/commandserver/server.hpp"
#include "components/commandserver/command.hpp"
namespace MWRender namespace MWRender
@ -31,7 +32,7 @@ namespace OMW
bool mEnableSky; bool mEnableSky;
MWRender::SkyManager* mpSkyManager; MWRender::SkyManager* mpSkyManager;
TsDeque<std::string> mCommands; TsDeque<OMW::Command> mCommandQueue;
std::auto_ptr<OMW::CommandServer::Server> mspCommandServer; std::auto_ptr<OMW::CommandServer::Server> mspCommandServer;
// not implemented // not implemented

@ -0,0 +1,19 @@
#ifndef COMMANDSERVER_COMMAND_HPP
#define COMMANDSERVER_COMMAND_HPP
namespace OMW
{
///
/// A Command is currently defined as a string input that, when processed,
/// will generate a string output. The string output is passed to the
/// mReplyFunction as soon as the command has been processed.
///
class Command
{
public:
std::string mCommand;
boost::function1<void, std::string> mReplyFunction;
};
}
#endif COMMANDSERVER_COMMAND_HPP

@ -9,6 +9,12 @@ using boost::asio::ip::tcp;
// //
namespace OMW { namespace CommandServer { namespace Detail { namespace OMW { namespace CommandServer { namespace Detail {
struct Header
{
char magic[4];
size_t dataLength;
} header;
/// ///
/// Tracks an active connection to the CommandServer /// Tracks an active connection to the CommandServer
/// ///
@ -19,7 +25,9 @@ namespace OMW { namespace CommandServer { namespace Detail {
void start(); void start();
void stop(); void stop();
tcp::socket& socket(); tcp::socket& socket();
void reply (std::string s);
protected: protected:
void handle (); void handle ();
@ -54,17 +62,28 @@ namespace OMW { namespace CommandServer { namespace Detail {
return mSocket; return mSocket;
} }
void Connection::reply (std::string reply)
{
const size_t plen = sizeof(Header) + reply.length() + 1;
std::vector<char> packet(plen);
Header* pHeader = reinterpret_cast<Header*>(&packet[0]);
strncpy(pHeader->magic, "OMW0", 4);
pHeader->dataLength = reply.length() + 1; // Include the null terminator
strncpy(&packet[8], reply.c_str(), pHeader->dataLength);
boost::system::error_code ec;
boost::asio::write(mSocket, boost::asio::buffer(packet),
boost::asio::transfer_all(), ec);
if (ec)
std::cout << "Error: " << ec.message() << std::endl;
}
void Connection::handle () void Connection::handle ()
{ {
bool bDone = false; bool bDone = false;
while (!bDone) while (!bDone)
{ {
struct Header
{
char magic[4];
size_t dataLength;
} header;
// Read the header // Read the header
boost::system::error_code error; boost::system::error_code error;
mSocket.read_some(boost::asio::buffer(&header, sizeof(Header)), error); mSocket.read_some(boost::asio::buffer(&header, sizeof(Header)), error);
@ -79,7 +98,7 @@ namespace OMW { namespace CommandServer { namespace Detail {
boost::system::error_code error; boost::system::error_code error;
mSocket.read_some(boost::asio::buffer(&msg[0], header.dataLength), error); mSocket.read_some(boost::asio::buffer(&msg[0], header.dataLength), error);
if (!error) if (!error)
mpServer->postMessage( &msg[0] ); mpServer->postCommand(this, &msg[0]);
else else
bDone = true; bDone = true;
} }
@ -98,9 +117,9 @@ namespace OMW { namespace CommandServer {
using namespace Detail; using namespace Detail;
Server::Server (Deque* pDeque, const int port) Server::Server (Deque* pCommandQueue, const int port)
: mAcceptor (mIOService, tcp::endpoint(tcp::v4(), port)) : mAcceptor (mIOService, tcp::endpoint(tcp::v4(), port))
, mpCommands (pDeque) , mpCommandQueue (pCommandQueue)
, mbStopping (false) , mbStopping (false)
{ {
} }
@ -149,9 +168,12 @@ namespace OMW { namespace CommandServer {
delete ptr; delete ptr;
} }
void Server::postMessage (const char* s) void Server::postCommand (Connection* pConnection, const char* s)
{ {
mpCommands->push_back(s); Command cmd;
cmd.mCommand = s;
cmd.mReplyFunction = std::bind1st(std::mem_fun(&Connection::reply), pConnection);
mpCommandQueue->push_back(cmd);
} }
void Server::threadMain() void Server::threadMain()

@ -9,6 +9,7 @@
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include "components/misc/tsdeque.hpp" #include "components/misc/tsdeque.hpp"
#include "components/commandserver/command.hpp"
namespace OMW { namespace CommandServer namespace OMW { namespace CommandServer
{ {
@ -27,9 +28,9 @@ namespace OMW { namespace CommandServer
class Server class Server
{ {
public: public:
typedef TsDeque<std::string> Deque; typedef TsDeque<Command> Deque;
Server (Deque* pDeque, const int port); Server (Deque* pCommandQueue, const int port);
void start(); void start();
void stop(); void stop();
@ -39,7 +40,7 @@ namespace OMW { namespace CommandServer
typedef std::set<Detail::Connection*> ConnectionSet; typedef std::set<Detail::Connection*> ConnectionSet;
void removeConnection (Detail::Connection* ptr); void removeConnection (Detail::Connection* ptr);
void postMessage (const char* s); void postCommand (Detail::Connection*, const char* s);
void threadMain(); void threadMain();
@ -53,8 +54,8 @@ namespace OMW { namespace CommandServer
ConnectionSet mConnections; ConnectionSet mConnections;
mutable boost::mutex mConnectionsMutex; mutable boost::mutex mConnectionsMutex;
// Pointer to output queue in which to put received strings // Pointer to command queue
Deque* mpCommands; Deque* mpCommandQueue;
}; };
}} }}

@ -3,32 +3,54 @@
#include <boost/thread.hpp> #include <boost/thread.hpp>
template <typename T> //
// Adapted from http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html
//
template<typename Data>
class TsDeque class TsDeque
{ {
private:
std::deque<Data> the_queue;
mutable boost::mutex the_mutex;
boost::condition_variable the_condition_variable;
public: public:
void push_back (const T& t) void push_back(Data const& data)
{ {
boost::mutex::scoped_lock lock(mMutex); boost::mutex::scoped_lock lock(the_mutex);
mDeque.push_back(t); the_queue.push_back(data);
lock.unlock();
the_condition_variable.notify_one();
} }
bool pop_front (T& t) bool empty() const
{ {
boost::mutex::scoped_lock lock(mMutex); boost::mutex::scoped_lock lock(the_mutex);
if (!mDeque.empty()) return the_queue.empty();
}
bool try_pop_front(Data& popped_value)
{ {
t = mDeque.front(); boost::mutex::scoped_lock lock(the_mutex);
mDeque.pop_front(); if(the_queue.empty())
return false;
popped_value=the_queue.front();
the_queue.pop_front();
return true; return true;
} }
else
return false; void wait_and_pop_front(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
while(the_queue.empty())
{
the_condition_variable.wait(lock);
} }
protected: popped_value=the_queue.front();
std::deque<T> mDeque; the_queue.pop_front();
mutable boost::mutex mMutex; }
}; };
#endif // TSDEQUE_H #endif // TSDEQUE_H

Loading…
Cancel
Save