/* * Copyright (C) 2007-2011 Xagasoft, All rights reserved. * * This file is part of the libbu++ library and is released under the * terms of the license contained in the file LICENSE. */ #include "bu/itoserver.h" #include <errno.h> #include "bu/tcpserversocket.h" #include "bu/client.h" #include "bu/tcpsocket.h" #include "bu/config.h" Bu::ItoServer::ItoServer() : nTimeoutSec( 1 ), nTimeoutUSec( 0 ) { FD_ZERO( &fdActive ); } Bu::ItoServer::~ItoServer() { while( !qClientCleanup.isEmpty() ) { ItoClient *pCli = qClientCleanup.dequeue(); pCli->join(); delete pCli; } // TODO: Make sure here that each client has shutdown it's socket, and // maybe even written any extra data, we could put a timelimit on this... // anyway, it's not as clean as it could be right now. for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) { ItoClient *pCli = (*i); pCli->join(); delete pCli; } } void Bu::ItoServer::addPort( int nPort, int nPoolSize ) { TcpServerSocket *s = new TcpServerSocket( nPort, nPoolSize ); int nSocket = s->getSocket(); FD_SET( nSocket, &fdActive ); hServers.insert( nSocket, s ); } void Bu::ItoServer::addPort( const String &sAddr, int nPort, int nPoolSize ) { TcpServerSocket *s = new TcpServerSocket( sAddr, nPort, nPoolSize ); int nSocket = s->getSocket(); FD_SET( nSocket, &fdActive ); hServers.insert( nSocket, s ); } void Bu::ItoServer::setTimeout( int nTimeoutSec, int nTimeoutUSec ) { this->nTimeoutSec = nTimeoutSec; this->nTimeoutUSec = nTimeoutUSec; } void Bu::ItoServer::addClient( int nSocket, int nPort ) { ItoClient *pC = new ItoClient( *this, nSocket, nPort, nTimeoutSec, nTimeoutUSec ); imClients.lock(); hClients.insert( nSocket, pC ); imClients.unlock(); pC->start(); } void Bu::ItoServer::run() { for(;;) { struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec }; fd_set fdRead = fdActive; fd_set fdWrite = fdActive; fd_set fdException = fdActive; if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, &fdRead, NULL, &fdException, &xTimeout ) ) < 0 ) { throw ExceptionBase("Error attempting to scan open connections."); } for( ServerHash::iterator i = hServers.begin(); i != hServers.end(); i++ ) { if( FD_ISSET( i.getKey(), &fdRead ) ) { TcpServerSocket *pSrv = i.getValue(); addClient( pSrv->accept(), pSrv->getPort() ); } } while( !qClientCleanup.isEmpty() ) { ItoClient *pCli = qClientCleanup.dequeue(); pCli->join(); delete pCli; } } } void Bu::ItoServer::clientCleanup( int iSocket ) { imClients.lock(); ItoClient *pCli = hClients.get( iSocket ); imClients.unlock(); qClientCleanup.enqueue( pCli ); } Bu::ItoServer::ItoClient::ItoClient( ItoServer &rSrv, int iSocket, int iPort, int nTimeoutSec, int nTimeoutUSec ) : rSrv( rSrv ), iSocket( iSocket ), iPort( iPort ), nTimeoutSec( nTimeoutSec ), nTimeoutUSec( nTimeoutUSec ) { FD_ZERO( &fdActive ); FD_SET( iSocket, &fdActive ); pClient = new Client( new Bu::TcpSocket( iSocket ), new SrvClientLinkFactory( rSrv ) ); } Bu::ItoServer::ItoClient::~ItoClient() { } void Bu::ItoServer::ItoClient::run() { imProto.lock(); rSrv.onNewConnection( pClient, iPort ); pClient->processOutput(); imProto.unlock(); for(;;) { struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec }; fd_set fdRead = fdActive; fd_set fdWrite; fd_set fdException = fdActive; FD_ZERO( &fdWrite ); if( pClient->hasOutput() ) FD_SET( iSocket, &fdWrite ); if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 ) { throw ExceptionBase("Error attempting to scan open connections."); } while( !qMsg.isEmpty() ) { imProto.lock(); Bu::String *pMsg = qMsg.dequeue(); pClient->onMessage( *pMsg ); delete pMsg; pClient->processOutput(); imProto.unlock(); } if( FD_ISSET( iSocket, &fdRead ) ) { imProto.lock(); pClient->processInput(); imProto.unlock(); if( !pClient->isOpen() ) { imProto.lock(); rSrv.onClosedConnection( pClient ); imProto.unlock(); rSrv.clientCleanup( iSocket ); return; } } if( FD_ISSET( iSocket, &fdWrite ) ) { imProto.lock(); pClient->processOutput(); imProto.unlock(); } } } Bu::ItoServer::SrvClientLink::SrvClientLink( ItoClient *pClient ) : pClient( pClient ) { } Bu::ItoServer::SrvClientLink::~SrvClientLink() { } void Bu::ItoServer::SrvClientLink::sendMessage( const Bu::String &sMsg ) { if( !pClient->imProto.trylock() ) { pClient->pClient->onMessage( sMsg ); pClient->pClient->processOutput(); pClient->imProto.unlock(); } else { Bu::String *pMsg = new Bu::String( sMsg ); pClient->qMsg.enqueue( pMsg ); } } Bu::ItoServer::SrvClientLinkFactory::SrvClientLinkFactory( Bu::ItoServer &rSrv ) : rSrv( rSrv ) { } Bu::ItoServer::SrvClientLinkFactory::~SrvClientLinkFactory() { } Bu::ClientLink *Bu::ItoServer::SrvClientLinkFactory::createLink( Bu::Client *pClient ) { rSrv.imClients.lock(); ItoClient *pCli = rSrv.hClients.get( *pClient->getSocket() ); rSrv.imClients.unlock(); return new SrvClientLink( pCli ); }