From 9ab87f39d7fc37e52d742d38b191eed9128d4b5b Mon Sep 17 00:00:00 2001 From: Mike Buland Date: Thu, 7 Feb 2008 19:01:12 +0000 Subject: Wowee, I think all this new stuff works, Conduit I don't need now, so it's not done yet. The Client class now supports a function called getLink() which returns a ClientLink object. This object may then be passed off to any other class and called to send messages to that client object. It is threadsafe if ItoServer is being used, and not for Server. Sending a message via a ClientLink calls the onMessage function on the assosiated protocol. Note that sending messages from within protocol event handlers or functions they call, while safe, may be slow and it's reccomended that you avoid this. --- src/client.cpp | 19 ++++++++++++-- src/client.h | 8 +++++- src/clientlink.cpp | 17 ++++++++++++ src/clientlink.h | 25 ++++++++++++++++++ src/clientlinkfactory.cpp | 17 ++++++++++++ src/clientlinkfactory.h | 26 ++++++++++++++++++ src/conduit.cpp | 7 +++++ src/conduit.h | 26 ++++++++++++++++++ src/itoserver.cpp | 67 ++++++++++++++++++++++++++++++++++++++++++++++- src/itoserver.h | 34 ++++++++++++++++++++++++ src/protocol.cpp | 4 +++ src/protocol.h | 3 +++ src/server.cpp | 31 +++++++++++++++++++++- src/server.h | 24 +++++++++++++++++ src/socket.cpp | 4 +++ src/socket.h | 1 + 16 files changed, 308 insertions(+), 5 deletions(-) create mode 100644 src/clientlink.cpp create mode 100644 src/clientlink.h create mode 100644 src/clientlinkfactory.cpp create mode 100644 src/clientlinkfactory.h create mode 100644 src/conduit.cpp create mode 100644 src/conduit.h diff --git a/src/client.cpp b/src/client.cpp index 1ef9151..d1eb29c 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -11,15 +11,18 @@ #include #include "bu/exceptions.h" #include "bu/protocol.h" +#include "bu/clientlink.h" +#include "bu/clientlinkfactory.h" /** Read buffer size. */ #define RBS (1024*2) -Bu::Client::Client( Bu::Socket *pSocket ) : +Bu::Client::Client( Bu::Socket *pSocket, class Bu::ClientLinkFactory *pfLink ) : pSocket( pSocket ), pProto( NULL ), nRBOffset( 0 ), - bWantsDisconnect( false ) + bWantsDisconnect( false ), + pfLink( pfLink ) { } @@ -233,3 +236,15 @@ void Bu::Client::close() { pSocket->close(); } + +Bu::ClientLink *Bu::Client::getLink() +{ + return pfLink->createLink( this ); +} + +void Bu::Client::onMessage( const Bu::FString &sMsg ) +{ + if( pProto ) + pProto->onMessage( this, sMsg ); +} + diff --git a/src/client.h b/src/client.h index 3764375..20eb8b8 100644 --- a/src/client.h +++ b/src/client.h @@ -16,6 +16,7 @@ namespace Bu { class Protocol; class Socket; + class ClientLinkFactory; /** *@author Mike Buland @@ -24,7 +25,7 @@ namespace Bu class Client { public: - Client( Bu::Socket *pSocket ); + Client( Bu::Socket *pSocket, Bu::ClientLinkFactory *pfLink ); virtual ~Client(); void processInput(); @@ -59,6 +60,10 @@ namespace Bu void disconnect(); bool wantsDisconnect(); + class ClientLink *getLink(); + + void onMessage( const Bu::FString &sMsg ); + private: Bu::Socket *pSocket; Bu::Protocol *pProto; @@ -66,6 +71,7 @@ namespace Bu int nRBOffset; Bu::FString sWriteBuf; bool bWantsDisconnect; + class Bu::ClientLinkFactory *pfLink; }; } diff --git a/src/clientlink.cpp b/src/clientlink.cpp new file mode 100644 index 0000000..7061a71 --- /dev/null +++ b/src/clientlink.cpp @@ -0,0 +1,17 @@ +/* + * Copyright (C) 2007 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#include "bu/clientlink.h" + +Bu::ClientLink::ClientLink() +{ +} + +Bu::ClientLink::~ClientLink() +{ +} + diff --git a/src/clientlink.h b/src/clientlink.h new file mode 100644 index 0000000..e76665b --- /dev/null +++ b/src/clientlink.h @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2007 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#ifndef BU_CLIENT_LINK_H +#define BU_CLIENT_LINK_H + +#include "bu/fstring.h" + +namespace Bu +{ + class ClientLink + { + public: + ClientLink(); + virtual ~ClientLink(); + + virtual void sendMsg( const Bu::FString &sMsg )=0; + }; +}; + +#endif diff --git a/src/clientlinkfactory.cpp b/src/clientlinkfactory.cpp new file mode 100644 index 0000000..523d8ec --- /dev/null +++ b/src/clientlinkfactory.cpp @@ -0,0 +1,17 @@ +/* + * Copyright (C) 2007 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#include "clientlinkfactory.h" + +Bu::ClientLinkFactory::ClientLinkFactory() +{ +} + +Bu::ClientLinkFactory::~ClientLinkFactory() +{ +} + diff --git a/src/clientlinkfactory.h b/src/clientlinkfactory.h new file mode 100644 index 0000000..4d3ed7b --- /dev/null +++ b/src/clientlinkfactory.h @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2007 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#ifndef BU_CLIENT_LINK_FACTORY_H +#define BU_CLIENT_LINK_FACTORY_H + +namespace Bu +{ + class Client; + class ClientLink; + + class ClientLinkFactory + { + public: + ClientLinkFactory(); + virtual ~ClientLinkFactory(); + + virtual Bu::ClientLink *createLink( Bu::Client *pClient )=0; + }; +}; + +#endif diff --git a/src/conduit.cpp b/src/conduit.cpp new file mode 100644 index 0000000..413bf7c --- /dev/null +++ b/src/conduit.cpp @@ -0,0 +1,7 @@ +/* + * Copyright (C) 2007 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + diff --git a/src/conduit.h b/src/conduit.h new file mode 100644 index 0000000..4598bd1 --- /dev/null +++ b/src/conduit.h @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2007 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#ifndef BU_CONDUIT_H +#define BU_CONDUIT_H + +#include "bu/stream.h" +#include "bu/fstring.h" + +namespace Bu +{ + /** + * Simple inter-thread communication stream. This acts like a pair of + * pipes for stream communication between any two things, but without the + * use of pipes, making this a bad choice for IPC. + */ + class Conduit : public Stream + { + }; +} + +#endif diff --git a/src/itoserver.cpp b/src/itoserver.cpp index 7426e8a..db5ae04 100644 --- a/src/itoserver.cpp +++ b/src/itoserver.cpp @@ -127,7 +127,8 @@ Bu::ItoServer::ItoClient::ItoClient( ItoServer &rSrv, int iSocket, int iPort, FD_SET( iSocket, &fdActive ); pClient = new Client( - new Bu::Socket( iSocket ) + new Bu::Socket( iSocket ), + new SrvClientLinkFactory( rSrv ) ); } @@ -137,7 +138,10 @@ Bu::ItoServer::ItoClient::~ItoClient() void *Bu::ItoServer::ItoClient::run() { + imProto.lock(); rSrv.onNewConnection( pClient, iPort ); + pClient->processOutput(); + imProto.unlock(); for(;;) { @@ -151,14 +155,29 @@ void *Bu::ItoServer::ItoClient::run() throw ExceptionBase("Error attempting to scan open connections."); } + while( !qMsg.isEmpty() ) + { + imProto.lock(); + Bu::FString *pMsg = qMsg.dequeue(); + pClient->onMessage( *pMsg ); + delete pMsg; + pClient->processOutput(); + imProto.unlock(); + } + if( FD_ISSET( iSocket, &fdRead ) ) { + imProto.lock(); pClient->processInput(); + imProto.unlock(); if( !pClient->isOpen() ) { + imProto.lock(); rSrv.onClosedConnection( pClient ); + imProto.unlock(); rSrv.clientCleanup( iSocket ); + return NULL; } } @@ -166,9 +185,55 @@ void *Bu::ItoServer::ItoClient::run() // Now we just try to write all the pending data on the socket. // this could be done better eventually, if we care about the socket // wanting to accept writes (using a select). + imProto.lock(); pClient->processOutput(); + imProto.unlock(); } return NULL; } +Bu::ItoServer::SrvClientLink::SrvClientLink( ItoClient *pClient ) : + pClient( pClient ) +{ +} + +Bu::ItoServer::SrvClientLink::~SrvClientLink() +{ +} + +void Bu::ItoServer::SrvClientLink::sendMsg( const Bu::FString &sMsg ) +{ + if( !pClient->imProto.trylock() ) + { + pClient->pClient->onMessage( sMsg ); + pClient->pClient->processOutput(); + pClient->imProto.unlock(); + } + else + { + Bu::FString *pMsg = new Bu::FString( sMsg ); + pClient->qMsg.enqueue( pMsg ); + } +} + +Bu::ItoServer::SrvClientLinkFactory::SrvClientLinkFactory( + Bu::ItoServer &rSrv ) : + rSrv( rSrv ) +{ +} + +Bu::ItoServer::SrvClientLinkFactory::~SrvClientLinkFactory() +{ +} + +Bu::ClientLink *Bu::ItoServer::SrvClientLinkFactory::createLink( + Bu::Client *pClient ) +{ + rSrv.imClients.lock(); + ItoClient *pCli = rSrv.hClients.get( *pClient->getSocket() ); + rSrv.imClients.unlock(); + + return new SrvClientLink( pCli ); +} + diff --git a/src/itoserver.h b/src/itoserver.h index f11960c..09ed1cd 100644 --- a/src/itoserver.h +++ b/src/itoserver.h @@ -17,6 +17,9 @@ #include "bu/itoqueue.h" #include "bu/set.h" +#include "bu/clientlink.h" +#include "bu/clientlinkfactory.h" + namespace Bu { class ServerSocket; @@ -47,6 +50,7 @@ namespace Bu class ItoServer : public Ito { friend class ItoClient; + friend class SrvClientLinkFactory; public: ItoServer(); virtual ~ItoServer(); @@ -65,8 +69,10 @@ namespace Bu virtual void *run(); private: + class SrvClientLink; class ItoClient : public Ito { + friend class Bu::ItoServer::SrvClientLink; public: ItoClient( ItoServer &rSrv, int nSocket, int nPort, int nTimeoutSec, int nTimeoutUSec ); @@ -74,6 +80,9 @@ namespace Bu virtual void *run(); + typedef ItoQueue StringQueue; + StringQueue qMsg; + private: ItoServer &rSrv; Client *pClient; @@ -82,6 +91,31 @@ namespace Bu int iPort; int nTimeoutSec; int nTimeoutUSec; + ItoMutex imProto; + }; + + class SrvClientLink : public Bu::ClientLink + { + public: + SrvClientLink( ItoClient *pClient ); + virtual ~SrvClientLink(); + + virtual void sendMsg( const Bu::FString &sMsg ); + + private: + ItoClient *pClient; + }; + + class SrvClientLinkFactory : public Bu::ClientLinkFactory + { + public: + SrvClientLinkFactory( ItoServer &rSrv ); + virtual ~SrvClientLinkFactory(); + + virtual Bu::ClientLink *createLink( Bu::Client *pClient ); + + private: + ItoServer &rSrv; }; int nTimeoutSec; diff --git a/src/protocol.cpp b/src/protocol.cpp index 7a59586..e197b7f 100644 --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -17,3 +17,7 @@ Bu::Protocol::~Protocol() { } +void Bu::Protocol::onMessage( Bu::Client *pClient, const Bu::FString &sMsg ) +{ +} + diff --git a/src/protocol.h b/src/protocol.h index c557512..61fff93 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -10,6 +10,8 @@ #include +#include "bu/fstring.h" + namespace Bu { class Client; @@ -26,6 +28,7 @@ namespace Bu virtual void onNewConnection( Bu::Client *pClient )=0; virtual void onNewData( Bu::Client *pClient )=0; + virtual void onMessage( Bu::Client *pClient, const Bu::FString &sMsg ); private: diff --git a/src/server.cpp b/src/server.cpp index 861e2e3..cca486a 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -109,10 +109,39 @@ void Bu::Server::addClient( int nSocket, int nPort ) FD_SET( nSocket, &fdActive ); Client *c = new Client( - new Bu::Socket( nSocket ) + new Bu::Socket( nSocket ), + new SrvClientLinkFactory() ); hClients.insert( nSocket, c ); onNewConnection( c, nPort ); } +Bu::Server::SrvClientLink::SrvClientLink( Bu::Client *pClient ) : + pClient( pClient ) +{ +} + +Bu::Server::SrvClientLink::~SrvClientLink() +{ +} + +void Bu::Server::SrvClientLink::sendMsg( const Bu::FString &sMsg ) +{ + pClient->onMessage( sMsg ); +} + +Bu::Server::SrvClientLinkFactory::SrvClientLinkFactory() +{ +} + +Bu::Server::SrvClientLinkFactory::~SrvClientLinkFactory() +{ +} + +Bu::ClientLink *Bu::Server::SrvClientLinkFactory::createLink( + Bu::Client *pClient ) +{ + return new SrvClientLink( pClient ); +} + diff --git a/src/server.h b/src/server.h index 02f3546..1ba5fd7 100644 --- a/src/server.h +++ b/src/server.h @@ -13,6 +13,9 @@ #include "bu/fstring.h" #include "bu/list.h" +#include "bu/clientlink.h" +#include "bu/clientlinkfactory.h" + namespace Bu { class ServerSocket; @@ -58,6 +61,27 @@ namespace Bu virtual void onClosedConnection( Client *pClient )=0; private: + class SrvClientLink : public Bu::ClientLink + { + public: + SrvClientLink( Bu::Client *pClient ); + virtual ~SrvClientLink(); + + virtual void sendMsg( const Bu::FString &sMsg ); + + private: + Bu::Client *pClient; + }; + + class SrvClientLinkFactory : public Bu::ClientLinkFactory + { + public: + SrvClientLinkFactory(); + virtual ~SrvClientLinkFactory(); + + virtual Bu::ClientLink *createLink( Bu::Client *pClient ); + }; + int nTimeoutSec; int nTimeoutUSec; fd_set fdActive; diff --git a/src/socket.cpp b/src/socket.cpp index 7b55c4b..94639b1 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -364,3 +364,7 @@ Bu::FString Bu::Socket::getAddress() const return sAddress; } +Bu::Socket::operator int() const +{ + return nSocket; +} diff --git a/src/socket.h b/src/socket.h index eee0be3..7acf055 100644 --- a/src/socket.h +++ b/src/socket.h @@ -57,6 +57,7 @@ namespace Bu virtual void setBlocking( bool bBlocking=true ); Bu::FString getAddress() const; + operator int() const; private: void setAddress(); -- cgit v1.2.3