From c82dc43edc9fd913e8ddb20bebe778781ec0d6f7 Mon Sep 17 00:00:00 2001 From: Mike Buland Date: Tue, 11 Sep 2007 04:05:26 +0000 Subject: Everything seems to work with the new Bu::ItoServer class, it operates very, very similarly to the Bu::Server class, except that every incoming connection gets it's own thread. This functionality may have to be tuned later, to allow for maintaining a pool of connections as an option, but this is fine for now. --- build.conf | 2 +- src/itoserver.cpp | 178 ++++++++++++++++++++++++++++++++++++++++++++++++ src/itoserver.h | 89 ++++++++++++++++++++++++ src/tests/itoserver.cpp | 69 +++++++++++++++++++ src/trace.h | 10 +++ 5 files changed, 347 insertions(+), 1 deletion(-) create mode 100644 src/itoserver.cpp create mode 100644 src/itoserver.h create mode 100644 src/tests/itoserver.cpp create mode 100644 src/trace.h diff --git a/build.conf b/build.conf index 36f1f8b..2f21860 100644 --- a/build.conf +++ b/build.conf @@ -40,7 +40,7 @@ filesIn("src/tests") filter regexp("^src/tests/(.*)\\.cpp$", "tests/{re:1}"): set "LDFLAGS" += "-L. -lbu++", input "src/{target}.cpp" -["tests/itoqueue1", "tests/itoqueue2", "tests/socketblock"]: +["tests/itoqueue1", "tests/itoqueue2", "tests/socketblock", "tests/itoserver"]: set "LDFLAGS" += "-lpthread" filesIn("src/unit") filter regexp("^src/unit/(.*)\\.cpp$", "unit/{re:1}"): diff --git a/src/itoserver.cpp b/src/itoserver.cpp new file mode 100644 index 0000000..0337057 --- /dev/null +++ b/src/itoserver.cpp @@ -0,0 +1,178 @@ +#include "bu/itoserver.h" +#include +#include "bu/serversocket.h" +#include "bu/client.h" +#include "bu/socket.h" +#include "osx_compatibility.h" + +Bu::ItoServer::ItoServer() : + nTimeoutSec( 1 ), + nTimeoutUSec( 0 ) +{ + FD_ZERO( &fdActive ); +} + +Bu::ItoServer::~ItoServer() +{ +} + +void Bu::ItoServer::addPort( int nPort, int nPoolSize ) +{ + ServerSocket *s = new ServerSocket( nPort, nPoolSize ); + int nSocket = s->getSocket(); + FD_SET( nSocket, &fdActive ); + hServers.insert( nSocket, s ); +} + +void Bu::ItoServer::addPort( const FString &sAddr, int nPort, int nPoolSize ) +{ + ServerSocket *s = new ServerSocket( sAddr, nPort, nPoolSize ); + int nSocket = s->getSocket(); + FD_SET( nSocket, &fdActive ); + hServers.insert( nSocket, s ); +} + +void Bu::ItoServer::setTimeout( int nTimeoutSec, int nTimeoutUSec ) +{ + this->nTimeoutSec = nTimeoutSec; + this->nTimeoutUSec = nTimeoutUSec; +} +/* +void Bu::ItoServer::scan() +{ + struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec }; + + fd_set fdRead = fdActive; + fd_set fdWrite = fdActive; + fd_set fdException = fdActive; + + if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, &fdRead, NULL, &fdException, &xTimeout ) ) < 0 ) + { + throw ExceptionBase("Error attempting to scan open connections."); + } + + for( int j = 0; j < FD_SETSIZE; j++ ) + { + if( FD_ISSET( j, &fdRead ) ) + { + if( hServers.has( j ) ) + { + ServerSocket *pSrv = hServers.get( j ); + addClient( pSrv->accept(), pSrv->getPort() ); + } + else + { + Client *pClient = hClients.get( j ); + pClient->processInput(); + if( !pClient->isOpen() ) + { + onClosedConnection( pClient ); + hClients.erase( j ); + FD_CLR( j, &fdActive ); + } + } + } + } + + // Now we just try to write all the pending data on all the sockets. + // this could be done better eventually, if we care about the socket + // wanting to accept writes (using a select). + for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) + { + (*i)->processOutput(); + } +} +*/ +void Bu::ItoServer::addClient( int nSocket, int nPort ) +{ + ItoClient *pC = new ItoClient( *this, nSocket, nPort, nTimeoutSec, + nTimeoutUSec ); + pC->start(); + +} + +void *Bu::ItoServer::run() +{ + for(;;) + { + struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec }; + + fd_set fdRead = fdActive; + fd_set fdWrite = fdActive; + fd_set fdException = fdActive; + + if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, &fdRead, NULL, &fdException, &xTimeout ) ) < 0 ) + { + throw ExceptionBase("Error attempting to scan open connections."); + } + + for( ServerHash::iterator i = hServers.begin(); i != hServers.end(); i++ ) + { + if( FD_ISSET( i.getKey(), &fdRead ) ) + { + ServerSocket *pSrv = i.getValue(); + addClient( pSrv->accept(), pSrv->getPort() ); + } + } + } + + return NULL; +} + +Bu::ItoServer::ItoClient::ItoClient( ItoServer &rSrv, int iSocket, int iPort, + int nTimeoutSec, int nTimeoutUSec ) : + rSrv( rSrv ), + iSocket( iSocket ), + iPort( iPort ), + nTimeoutSec( nTimeoutSec ), + nTimeoutUSec( nTimeoutUSec ) +{ + FD_ZERO( &fdActive ); + FD_SET( iSocket, &fdActive ); + + pClient = new Client( + new Bu::Socket( iSocket ) + ); + +} + +Bu::ItoServer::ItoClient::~ItoClient() +{ +} + +void *Bu::ItoServer::ItoClient::run() +{ + rSrv.onNewConnection( pClient, iPort ); + + for(;;) + { + struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec }; + + fd_set fdRead = fdActive; + fd_set fdException = fdActive; + + if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, &fdRead, NULL, &fdException, &xTimeout ) ) < 0 ) + { + throw ExceptionBase("Error attempting to scan open connections."); + } + + if( FD_ISSET( iSocket, &fdRead ) ) + { + pClient->processInput(); + if( !pClient->isOpen() ) + { + rSrv.onClosedConnection( pClient ); + + return NULL; + } + } + + // Now we just try to write all the pending data on the socket. + // this could be done better eventually, if we care about the socket + // wanting to accept writes (using a select). + pClient->processOutput(); + } + + return NULL; +} + diff --git a/src/itoserver.h b/src/itoserver.h new file mode 100644 index 0000000..19c34ca --- /dev/null +++ b/src/itoserver.h @@ -0,0 +1,89 @@ +#ifndef BU_ITO_SERVER_H +#define BU_ITO_SERVER_H + +#include + +#include "bu/fstring.h" +#include "bu/list.h" +#include "bu/ito.h" +#include "bu/itomutex.h" +#include "bu/set.h" + +namespace Bu +{ + class ServerSocket; + class Socket; + class Client; + + /** + * Core of a network server. This class is distinct from a ServerSocket in + * that a ServerSocket is one listening socket, nothing more. Socket will + * manage a pool of both ServerSockets and connected Sockets along with + * their protocols and buffers. + * + * To start serving on a new port, use the addPort functions. Each call to + * addPort creates a new ServerSocket, starts it listening, and adds it to + * the server pool. + * + * All of the real work is done by scan, which will wait for up + * to the timeout set by setTimeout before returning if there is no data + * pending. scan should probably be called in some sort of tight + * loop, possibly in it's own thread, or in the main control loop. + * + * In order to use a Server you must subclass it and implement the pure + * virtual functions. These allow you to receive notification of events + * happening within the server itself, and actually makes it useful. + */ + class ItoServer : public Ito + { + public: + ItoServer(); + virtual ~ItoServer(); + + void addPort( int nPort, int nPoolSize=40 ); + void addPort( const FString &sAddr, int nPort, int nPoolSize=40 ); + + //void scan(); + void setTimeout( int nTimeoutSec, int nTimeoutUSec=0 ); + + void addClient( int nSocket, int nPort ); + + virtual void onNewConnection( Client *pClient, int nPort )=0; + virtual void onClosedConnection( Client *pClient )=0; + + virtual void *run(); + + private: + class ItoClient : public Ito + { + public: + ItoClient( ItoServer &rSrv, int nSocket, int nPort, + int nTimeoutSec, int nTimeoutUSec ); + virtual ~ItoClient(); + + virtual void *run(); + + private: + ItoServer &rSrv; + Client *pClient; + fd_set fdActive; + int iSocket; + int iPort; + int nTimeoutSec; + int nTimeoutUSec; + }; + + int nTimeoutSec; + int nTimeoutUSec; + fd_set fdActive; + typedef Hash ServerHash; + ServerHash hServers; + //typedef Bu::Set ClientSet; + //ClientSet sClients; + //typedef Hash ClientHash; + //ClientHash hClients; + ItoMutex im; + }; +} + +#endif diff --git a/src/tests/itoserver.cpp b/src/tests/itoserver.cpp new file mode 100644 index 0000000..4f5c644 --- /dev/null +++ b/src/tests/itoserver.cpp @@ -0,0 +1,69 @@ +#include "bu/itoserver.h" +#include "bu/protocol.h" +#include "bu/client.h" + +#define BU_TRACE +#include "bu/trace.h" + +class ProtocolEcho : public Bu::Protocol +{ +public: + ProtocolEcho() + { + TRACE(); + } + virtual ~ProtocolEcho() + { + TRACE(); + } + + virtual void onNewConnection( Bu::Client *pClient ) + { + TRACE(); + // Huh... + } + + virtual void onNewData( Bu::Client *pClient ) + { + TRACE(); + pClient->write( pClient->getInput().getStr(), pClient->getInputSize() ); + pClient->seek( pClient->getInputSize() ); + } +}; + +class TestServer : public Bu::ItoServer +{ +public: + TestServer() + { + TRACE(); + } + virtual ~TestServer() + { + TRACE(); + } + + virtual void onNewConnection( Bu::Client *pClient, int iPort ) + { + TRACE(); + pClient->setProtocol( new ProtocolEcho() ); + } + + virtual void onClosedConnection( Bu::Client *pClient ) + { + TRACE(); + delete pClient->getProtocol(); + } +}; + +int main() +{ + TRACE(); + + TestServer ts; + + ts.addPort( 5555 ); + ts.start(); + ts.join(); +} + diff --git a/src/trace.h b/src/trace.h new file mode 100644 index 0000000..d7a8485 --- /dev/null +++ b/src/trace.h @@ -0,0 +1,10 @@ +#ifndef BU_TRACE_H +#define BU_TRACE_H + +#ifdef BU_TRACE +# define TRACE() printf("trace: %s\n", __PRETTY_FUNCTION__ ) +#else +# define TRACE() {} +#endif + +#endif -- cgit v1.2.3