/* * Copyright (C) 2007-2019 Xagasoft, All rights reserved. * * This file is part of the libbu++ library and is released under the * terms of the license contained in the file LICENSE. */ #include "bu/server.h" #include #include #include #include #include "bu/serversocket.h" #include "bu/client.h" #include "bu/socket.h" #include "bu/config.h" #include "bu/mutexlocker.h" #include "bu/sio.h" #ifdef PROFILE_BU_SERVER #define BU_PROFILE_START( x ) Bu::Profiler::getInstance().startEvent( x ) #define BU_PROFILE_END( x ) Bu::Profiler::getInstance().endEvent( x ) #else #define BU_PROFILE_START( x ) (void)0 #define BU_PROFILE_END( x ) (void)0 #endif #define RBS 1500 #define MAX_EVENTS 20 namespace Bu { class __ServerCore { public: __ServerCore() { fdRead = epoll_create( 1 ); fdWrite = epoll_create( 1 ); fdEvent = eventfd( 0, 0 ); struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = fdEvent; epoll_ctl( fdRead, EPOLL_CTL_ADD, fdEvent, &ev ); epoll_ctl( fdWrite, EPOLL_CTL_ADD, fdEvent, &ev ); } ~__ServerCore() { close( fdRead ); close( fdWrite ); close( fdEvent ); } Server::fd fdRead; Server::fd fdWrite; Server::fd fdEvent; }; } Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : pCore( new Bu::__ServerCore() ), nTimeoutSec( 0 ), nTimeoutUSec( 0 ), bAutoTick( false ), tMonitorWrite( *this ), bRunning( true ) { BU_PROFILE_START("server"); if( iIoWorkers < 1 ) iIoWorkers = 1; if( iClientWorkers < 1 ) iClientWorkers = 1; for( int j = 0; j < iIoWorkers; j++ ) { IoWorker *pWorker = new IoWorker( *this ); lIoWorker.append( pWorker ); pWorker->start(); } for( int j = 0; j < iClientWorkers; j++ ) { ClientWorker *pWorker = new ClientWorker( *this ); lClientWorker.append( pWorker ); pWorker->start(); } tMonitorWrite.start(); } Bu::Server::~Server() { shutdown(); tMonitorWrite.join(); delete pCore; BU_PROFILE_START("server"); } void Bu::Server::addServerSocket( Bu::ServerSocket *pSocket ) { fd iFd; if( !pSocket->getFd( iFd ) ) { throw Bu::ExceptionBase("Cannot get file descriptor from " "provided ServerSocket."); } struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = iFd; epoll_ctl( pCore->fdRead, EPOLL_CTL_ADD, iFd, &ev ); hServers.insert( iFd, pSocket ); } void Bu::Server::setTimeout( int nTimeoutSec, int nTimeoutUSec ) { this->nTimeoutSec = nTimeoutSec; this->nTimeoutUSec = nTimeoutUSec; } void Bu::Server::scan() { BU_PROFILE_START("scan"); mClients.lock(); for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) { if( (*i)->hasOutput() ) { // Queue it for output here? } } mClients.unlock(); struct epoll_event ev[MAX_EVENTS]; int iCount = epoll_wait( pCore->fdRead, ev, MAX_EVENTS, nTimeoutSec*1000+nTimeoutUSec/1000 ); if( iCount < 0 ) { char buf[1024]; strerror_r( errno, buf, 1024 ); BU_PROFILE_END("scan"); throw ExceptionBase( Bu::String("Error attempting to scan open connections: %1: %2").arg( errno ).arg( buf ).end().getStr() ); } for( int j = 0; j < iCount; j++ ) { if( hServers.has( ev[j].data.fd ) ) { Bu::ServerSocket *pSrv = hServers.get( ev[j].data.fd ); addClient( pSrv, pSrv->accept() ); } else { qIoEvent.enqueue( new Event( ev[j].data.fd, Event::Read ) ); } } Bu::List lDelete; // Now we just try to write all the pending data on all the sockets. // this could be done better eventually, if we care about the socket // wanting to accept writes (using a select). mClients.lock(); for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) { if( (*i)->wantsDisconnect() && !(*i)->hasOutput() ) { lDelete.append( i.getKey() ); } } mClients.unlock(); for( Bu::List::iterator i = lDelete.begin(); i != lDelete.end(); i++ ) { closeClient( *i ); } if( bAutoTick ) tick(); BU_PROFILE_END("scan"); } void Bu::Server::addClient( const Bu::ServerSocket *pSrv, Bu::Socket *pSocket ) { BU_PROFILE_START("addClient"); int iFdSrv; int iFdCli; if( !pSrv->getFd( iFdSrv ) || !pSocket->getFd( iFdCli ) ) { throw Bu::ExceptionBase("No file descriptor?"); } Client *pClient = new Client( iFdCli, ServerInterface( *this ) ); { Bu::MutexLocker l( mClients ); if( hClients.has( iFdCli ) || hSockets.has( iFdCli ) ) { // Oops, it got closed and we haven't managed to get to it yet. // We're not going to be able to write anyhting, we may as well // just clean up. // // This can happen in a variety of situations, but mostly when // we get a disconnect followed immediately be a reconnect. struct epoll_event ev; epoll_ctl( pCore->fdRead, EPOLL_CTL_DEL, iFdCli, &ev ); epoll_ctl( pCore->fdWrite, EPOLL_CTL_DEL, iFdCli, &ev ); Client *pOldClient = hClients.get( iFdCli ); Socket *pOldSocket = hSockets.get( iFdCli ); hClients.erase( iFdCli ); hSockets.erase( iFdCli ); onClosedConnection( pOldClient ); pOldClient->close(); pOldSocket->close(); delete pOldClient; delete pOldSocket; } hClients.insert( iFdCli, pClient ); hSockets.insert( iFdCli, pSocket ); } struct epoll_event ev; ev.events = EPOLLIN | EPOLLONESHOT; ev.data.fd = iFdCli; epoll_ctl( pCore->fdRead, EPOLL_CTL_ADD, iFdCli, &ev ); ev.events = 0; epoll_ctl( pCore->fdWrite, EPOLL_CTL_ADD, iFdCli, &ev ); onNewConnection( pSrv, pClient, pSocket ); BU_PROFILE_END("addClient"); } Bu::Client *Bu::Server::getClient( fd iId ) { Bu::MutexLocker l( mClients ); if( !hClients.has( iId ) ) return NULL; return hClients.get( iId ); } bool Bu::Server::getClientAndSocket( fd iId, Bu::Client *&pClient, Bu::Socket *&pSocket ) { Bu::MutexLocker l( mClients ); if( !hClients.has( iId ) || !hSockets.has( iId ) ) return false; pClient = hClients.get( iId ); pSocket = hSockets.get( iId ); return true; } Bu::Server::SrvClientLink::SrvClientLink( Bu::Client *pClient ) : pClient( pClient ) { } Bu::Server::SrvClientLink::~SrvClientLink() { } void Bu::Server::SrvClientLink::sendMessage( const Bu::String &sMsg ) { pClient->onMessage( sMsg ); } Bu::Server::SrvClientLinkFactory::SrvClientLinkFactory() { } Bu::Server::SrvClientLinkFactory::~SrvClientLinkFactory() { } Bu::ClientLink *Bu::Server::SrvClientLinkFactory::createLink( Bu::Client *pClient ) { return new SrvClientLink( pClient ); } void Bu::Server::setAutoTick( bool bEnable ) { bAutoTick = bEnable; } void Bu::Server::tick() { mClients.lock(); for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) { (*i)->tick(); } mClients.unlock(); } void Bu::Server::shutdown() { { int64_t iVal = 1; write( pCore->fdEvent, &iVal, sizeof(int64_t) ); } { qIoEvent.stop(); qClientEvent.stop(); Bu::Server::Event *pEv; while( (pEv = qIoEvent.drain()) != NULL ) { delete pEv; } while( (pEv = qClientEvent.drain()) != NULL ) { delete pEv; } Bu::MutexLocker l( mWorkers ); for( IoWorkerList::iterator i = lIoWorker.begin(); i; i++ ) { (*i)->join(); delete *i; } lIoWorker.clear(); for( ClientWorkerList::iterator i = lClientWorker.begin(); i; i++ ) { (*i)->join(); delete *i; } lClientWorker.clear(); } for( SrvHash::iterator i = hServers.begin(); i != hServers.end(); i++ ) { delete *i; } hServers.clear(); ClientHash::KeyList lClients = hClients.getKeys(); for( ClientHash::KeyList::iterator i = lClients.begin(); i; i++ ) { closeClient( *i ); } Bu::println("Cleaning up clients. Erased %1, remaining: %2.").arg( lClients.getSize() ).arg( hClients.getSize() ); hClients.clear(); } void Bu::Server::clientReadReady( fd iFd ) { // Re-arm struct epoll_event ev; ev.events = EPOLLIN | EPOLLONESHOT; ev.data.fd = iFd; epoll_ctl( pCore->fdRead, EPOLL_CTL_MOD, iFd, &ev ); } void Bu::Server::clientWriteReady( fd iFd ) { // Re-arm struct epoll_event ev; ev.events = EPOLLOUT | EPOLLONESHOT; ev.data.fd = iFd; epoll_ctl( pCore->fdWrite, EPOLL_CTL_MOD, iFd, &ev ); } void Bu::Server::closeClient( fd iSocket ) { Bu::MutexLocker l( mClients ); BU_PROFILE_START("closeClient"); struct epoll_event ev; epoll_ctl( pCore->fdRead, EPOLL_CTL_DEL, iSocket, &ev ); epoll_ctl( pCore->fdWrite, EPOLL_CTL_DEL, iSocket, &ev ); Bu::Client *pClient = hClients.get( iSocket ); Bu::Socket *pSocket = hSockets.get( iSocket ); onClosedConnection( pClient ); pClient->close(); hClients.erase( iSocket ); pSocket->close(); hSockets.erase( iSocket ); delete pClient; delete pSocket; BU_PROFILE_END("closeClient"); } //////// // WriteMonitor //// Bu::Server::WriteMonitor::WriteMonitor( Server &rSrv ) : rSrv( rSrv ) { } Bu::Server::WriteMonitor::~WriteMonitor() { } void Bu::Server::WriteMonitor::run() { setName("busrv-writeMon"); struct epoll_event ev[MAX_EVENTS]; for(;;) { int iCount = epoll_wait( rSrv.pCore->fdWrite, ev, MAX_EVENTS, 5000 ); if( iCount < 0 ) { // Bad error? return; } for( int j = 0; j < iCount; j++ ) { if( ev[j].data.fd == rSrv.pCore->fdEvent ) { // got event on fdEvent, exiting... return; } else { // Queue the write rSrv.qIoEvent.enqueue( new Event( ev[j].data.fd, Event::Write ) ); } } } } //////// // Event //// Bu::Server::Event::Event( fd iId, Operation eOp ) : iId( iId ), eOp( eOp ) { } Bu::Server::Event::~Event() { } Bu::Server::fd Bu::Server::Event::getId() const { return iId; } Bu::Server::Event::Operation Bu::Server::Event::getOperation() const { return eOp; } ///////// // IoWorker //// Bu::Server::IoWorker::IoWorker( Bu::Server &rSrv ) : rSrv( rSrv ) { } Bu::Server::IoWorker::~IoWorker() { } void Bu::Server::IoWorker::run() { setName("busrv-ioWorker"); while( rSrv.qIoEvent.isRunning() ) { Event *pEv = rSrv.qIoEvent.dequeue( true ); if( pEv == NULL ) { continue; } Client *pClient; Socket *pSocket; if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) ) { delete pEv; continue; } switch( pEv->getOperation() ) { case Event::Read: handleRead( pClient, pSocket ); break; case Event::Write: handleWrite( pClient, pSocket ); break; case Event::Process: break; } delete pEv; } } void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) { char buf[RBS]; Bu::size iRead; Bu::size iTotal=0; BU_PROFILE_START("client.read"); try { for(;;) { try { iRead = pSocket->read( buf, RBS ); if( iRead == 0 ) { break; } else { iTotal += iRead; pClient->cbBuffer.server().write( buf, iRead ); if( !pSocket->canRead() ) break; } } catch( Bu::ExceptionBase &e ) { pClient->disconnect(); //close( pSocket ); return; } } } catch( std::exception &e ) { // Probably the socket is dead. We should maybe disconnect, but we'll // also notice soon enough anyway? } BU_PROFILE_END("client.read"); if( iTotal == 0 ) { pClient->disconnect(); //close( pSocket ); } else { Bu::Server::fd iFd; pSocket->getFd( iFd ); rSrv.qClientEvent.enqueue( new Event( iFd, Event::Process ) ); rSrv.clientReadReady( iFd ); } } void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket ) { char buf[RBS]; try { while( pClient->hasOutput() ) { int iAmnt = RBS; iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); int iReal = pSocket->write( buf, iAmnt ); pClient->cbBuffer.server().seek( iReal ); if( iReal < iAmnt ) { // We wrote less than expected, the kernel buffer must be full, // we should queue ourselves again. rSrv.clientWriteReady( pClient->getId() ); break; } } } catch( std::exception &e ) { // Error working with socket, it's probably closed. if( pClient->hasOutput() ) rSrv.clientWriteReady( pClient->getId() ); } } ///////// // ClientWorker //// Bu::Server::ClientWorker::ClientWorker( Bu::Server &rSrv ) : rSrv( rSrv ) { } Bu::Server::ClientWorker::~ClientWorker() { } void Bu::Server::ClientWorker::run() { setName("busrv-cntWorker"); while( rSrv.qClientEvent.isRunning() ) { Event *pEv = rSrv.qClientEvent.dequeue( true ); if( pEv == NULL ) continue; try { Client *pClient = rSrv.getClient( pEv->getId() ); if( pClient == NULL ) { delete pEv; continue; } pClient->processInput(); if( pClient->getOutputSize() > 0 ) { rSrv.clientWriteReady( pClient->getId() ); } } catch( std::exception &e ) { // Probably we're fine, the client just closed between queuing and // working. } catch(...) { } delete pEv; } }