From 915005e218b5d00939b548de65ce6354f7acb487 Mon Sep 17 00:00:00 2001 From: Mike Buland Date: Fri, 28 Jul 2023 21:18:56 -0700 Subject: Completely redesigned Server and Client. Like, seriously, they're almost completely different. --- src/stable/server.cpp | 342 ++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 287 insertions(+), 55 deletions(-) (limited to 'src/stable/server.cpp') diff --git a/src/stable/server.cpp b/src/stable/server.cpp index 0552510..592230d 100644 --- a/src/stable/server.cpp +++ b/src/stable/server.cpp @@ -8,10 +8,11 @@ #include "bu/server.h" #include #include -#include "bu/tcpserversocket.h" +#include "bu/serversocket.h" #include "bu/client.h" -#include "bu/tcpsocket.h" +#include "bu/socket.h" #include "bu/config.h" +#include "bu/mutexlocker.h" #ifdef PROFILE_BU_SERVER #define BU_PROFILE_START( x ) Bu::Profiler::getInstance().startEvent( x ) @@ -21,13 +22,34 @@ #define BU_PROFILE_END( x ) (void)0 #endif -Bu::Server::Server() : +#define RBS 1500 + +Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : nTimeoutSec( 0 ), nTimeoutUSec( 0 ), bAutoTick( false ) { BU_PROFILE_START("server"); FD_ZERO( &fdActive ); + + if( iIoWorkers < 1 ) + iIoWorkers = 1; + if( iClientWorkers < 1 ) + iClientWorkers = 1; + + for( int j = 0; j < iIoWorkers; j++ ) + { + IoWorker *pWorker = new IoWorker( *this, qIoEvent, qClientEvent ); + lIoWorker.append( pWorker ); + pWorker->start(); + } + + for( int j = 0; j < iClientWorkers; j++ ) + { + ClientWorker *pWorker = new ClientWorker( *this, qClientEvent ); + lClientWorker.append( pWorker ); + pWorker->start(); + } } Bu::Server::~Server() @@ -36,20 +58,16 @@ Bu::Server::~Server() BU_PROFILE_START("server"); } -void Bu::Server::addPort( int nPort, int nPoolSize ) +void Bu::Server::addServerSocket( Bu::ServerSocket *pSocket ) { - TcpServerSocket *s = new TcpServerSocket( nPort, nPoolSize ); - socket_t nSocket = s->getSocket(); - FD_SET( nSocket, &fdActive ); - hServers.insert( nSocket, s ); -} - -void Bu::Server::addPort( const String &sAddr, int nPort, int nPoolSize ) -{ - TcpServerSocket *s = new TcpServerSocket( sAddr, nPort, nPoolSize ); - socket_t nSocket = s->getSocket(); - FD_SET( nSocket, &fdActive ); - hServers.insert( nSocket, s ); + fd iFd; + if( !pSocket->getFd( iFd ) ) + { + throw Bu::ExceptionBase("Cannot get file descriptor from " + "provided ServerSocket."); + } + FD_SET( iFd, &fdActive ); + hServers.insert( iFd, pSocket ); } void Bu::Server::setTimeout( int nTimeoutSec, int nTimeoutUSec ) @@ -68,11 +86,13 @@ void Bu::Server::scan() fd_set fdException = fdActive; FD_ZERO( &fdWrite ); + mClients.lock(); for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) { if( (*i)->hasOutput() ) FD_SET( i.getKey(), &fdWrite ); } + mClients.unlock(); if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 ) @@ -91,42 +111,17 @@ void Bu::Server::scan() { if( hServers.has( j ) ) { - TcpServerSocket *pSrv = hServers.get( j ); - addClient( pSrv->accept(), pSrv->getPort() ); + Bu::ServerSocket *pSrv = hServers.get( j ); + addClient( pSrv, pSrv->accept() ); } else { - Client *pClient = hClients.get( j ); - BU_PROFILE_START("processInput"); - pClient->processInput(); - BU_PROFILE_END("processInput"); - if( !pClient->isOpen() ) - { - closeClient( j ); - } + qIoEvent.enqueue( new Event( j, Event::Read ) ); } } if( FD_ISSET( j, &fdWrite ) ) { - try - { - Client *pClient = hClients.get( j ); - try - { - BU_PROFILE_START("processOutput"); - pClient->processOutput(); - BU_PROFILE_END("processOutput"); - } - catch( Bu::TcpSocketException &e ) - { - closeClient( j ); - } - } - catch( Bu::HashException &e ) - { - // Do nothing, I guess, the client is already dead... - // TODO: Someday, we may want to handle this more graceully. - } + qIoEvent.enqueue( new Event( j, Event::Write ) ); } } @@ -134,6 +129,7 @@ void Bu::Server::scan() // Now we just try to write all the pending data on all the sockets. // this could be done better eventually, if we care about the socket // wanting to accept writes (using a select). + mClients.lock(); for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) { if( (*i)->wantsDisconnect() && !(*i)->hasOutput() ) @@ -141,6 +137,7 @@ void Bu::Server::scan() lDelete.append( i.getKey() ); } } + mClients.unlock(); for( Bu::List::iterator i = lDelete.begin(); i != lDelete.end(); i++ ) { @@ -153,21 +150,47 @@ void Bu::Server::scan() BU_PROFILE_END("scan"); } -void Bu::Server::addClient( socket_t nSocket, int nPort ) +void Bu::Server::addClient( const Bu::ServerSocket *pSrv, Bu::Socket *pSocket ) { BU_PROFILE_START("addClient"); - FD_SET( nSocket, &fdActive ); + int iFdSrv; + int iFdCli; + if( !pSrv->getFd( iFdSrv ) || !pSrv->getFd( iFdCli ) ) + { + throw Bu::ExceptionBase("No file descriptor?"); + } + FD_SET( iFdCli, &fdActive ); - Client *c = new Client( - new Bu::TcpSocket( nSocket ), + Client *pClient = new Client( new SrvClientLinkFactory() ); - hClients.insert( nSocket, c ); + { + Bu::MutexLocker l( mClients ); + hClients.insert( iFdCli, pClient ); + hSockets.insert( iFdCli, pSocket ); + } - onNewConnection( c, nPort ); + onNewConnection( pSrv, pClient, pSocket ); BU_PROFILE_END("addClient"); } +Bu::Client *Bu::Server::getClient( fd iId ) +{ + Bu::MutexLocker l( mClients ); + return hClients.get( iId ); +} + +bool Bu::Server::getClientAndSocket( fd iId, Bu::Client *&pClient, + Bu::Socket *&pSocket ) +{ + Bu::MutexLocker l( mClients ); + if( !hClients.has( iId ) || !hSockets.has( iId ) ) + return false; + pClient = hClients.get( iId ); + pSocket = hSockets.get( iId ); + return true; +} + Bu::Server::SrvClientLink::SrvClientLink( Bu::Client *pClient ) : pClient( pClient ) { @@ -203,14 +226,45 @@ void Bu::Server::setAutoTick( bool bEnable ) void Bu::Server::tick() { + mClients.lock(); for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) { (*i)->tick(); } + mClients.unlock(); } void Bu::Server::shutdown() { + { + qIoEvent.stop(); + qClientEvent.stop(); + Bu::Server::Event *pEv; + while( (pEv = qIoEvent.drain()) != NULL ) + { + delete pEv; + } + while( (pEv = qClientEvent.drain()) != NULL ) + { + delete pEv; + } + + Bu::MutexLocker l( mWorkers ); + for( IoWorkerList::iterator i = lIoWorker.begin(); i; i++ ) + { + (*i)->join(); + delete *i; + } + lIoWorker.clear(); + for( ClientWorkerList::iterator i = lClientWorker.begin(); + i; i++ ) + { + (*i)->join(); + delete *i; + } + lClientWorker.clear(); + } + for( SrvHash::iterator i = hServers.begin(); i != hServers.end(); i++ ) { delete *i; @@ -218,23 +272,201 @@ void Bu::Server::shutdown() hServers.clear(); - for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) + ClientHash::KeyList lClients = hClients.getKeys(); + for( ClientHash::KeyList::iterator i = lClients.begin(); i; i++ ) { - closeClient( i.getKey() ); + closeClient( *i ); } - hClients.clear(); } -void Bu::Server::closeClient( socket_t iSocket ) +void Bu::Server::closeClient( fd iSocket ) { BU_PROFILE_START("closeClient"); Bu::Client *pClient = hClients.get( iSocket ); + Bu::Socket *pSocket = hSockets.get( iSocket ); onClosedConnection( pClient ); pClient->close(); hClients.erase( iSocket ); + pSocket->close(); + hSockets.erase( iSocket ); FD_CLR( iSocket, &fdActive ); delete pClient; + delete pSocket; BU_PROFILE_END("closeClient"); } +//////// +// Event +//// + +Bu::Server::Event::Event( fd iId, Operation eOp ) : + iId( iId ), + eOp( eOp ) +{ +} + +Bu::Server::Event::~Event() +{ +} + +Bu::Server::fd Bu::Server::Event::getId() const +{ + return iId; +} + +Bu::Server::Event::Operation Bu::Server::Event::getOperation() const +{ + return eOp; +} + +///////// +// IoWorker +//// + +Bu::Server::IoWorker::IoWorker( Bu::Server &rSrv, + Bu::Server::EventQueue &qIoEvent, + Bu::Server::EventQueue &qClientEvent ) : + rSrv( rSrv ), + qIoEvent( qIoEvent ), + qClientEvent( qClientEvent ) +{ +} + +Bu::Server::IoWorker::~IoWorker() +{ +} + +void Bu::Server::IoWorker::run() +{ + while( qIoEvent.isRunning() ) + { + Event *pEv = qIoEvent.dequeue(); + if( pEv == NULL ) + continue; + + Client *pClient; + Socket *pSocket; + if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) ) + { + delete pEv; + continue; + } + + switch( pEv->getOperation() ) + { + case Event::Read: + handleRead( pClient, pSocket ); + break; + + case Event::Write: + handleWrite( pClient, pSocket ); + break; + + case Event::Process: + break; + } + + delete pEv; + } +} + +void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) +{ + char buf[RBS]; + Bu::size iRead; + Bu::size iTotal=0; + + BU_PROFILE_START("client.read"); + for(;;) + { + try + { + iRead = pSocket->read( buf, RBS ); + + if( iRead == 0 ) + { + break; + } + else + { + iTotal += iRead; + pClient->cbBuffer.server().write( buf, iRead ); + if( !pSocket->canRead() ) + break; + } + } + catch( Bu::ExceptionBase &e ) + { + close( pSocket ); + break; + } + } + BU_PROFILE_END("client.read"); + + if( iTotal == 0 ) + { + close( pSocket ); + } + else + { + Bu::Server::fd iFd; + pSocket->getFd( iFd ); + qClientEvent.enqueue( new Event( iFd, Event::Process ) ); + } +} + +void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket ) +{ + char buf[RBS]; + if( pClient->hasOutput() > 0 ) + { + int iAmnt = RBS; + iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); + int iReal = pSocket->write( buf, iAmnt ); + pClient->cbBuffer.server().seek( iReal ); + } +} + +void Bu::Server::IoWorker::close( Bu::Socket *pSocket ) +{ + Bu::Server::fd iFd; + pSocket->getFd( iFd ); + rSrv.closeClient( iFd ); +} + +///////// +// ClientWorker +//// + +Bu::Server::ClientWorker::ClientWorker( Bu::Server &rSrv, + Bu::Server::EventQueue &qEvent ) : + rSrv( rSrv ), + qEvent( qEvent ) +{ +} + +Bu::Server::ClientWorker::~ClientWorker() +{ +} + +void Bu::Server::ClientWorker::run() +{ + while( qEvent.isRunning() ) + { + Event *pEv = qEvent.dequeue(); + if( pEv == NULL ) + continue; + + Client *pClient = rSrv.getClient( pEv->getId() ); + if( pClient == NULL ) + { + delete pEv; + continue; + } + + pClient->processInput(); + delete pEv; + } +} + -- cgit v1.2.3