From 915005e218b5d00939b548de65ce6354f7acb487 Mon Sep 17 00:00:00 2001 From: Mike Buland Date: Fri, 28 Jul 2023 21:18:56 -0700 Subject: Completely redesigned Server and Client. Like, seriously, they're almost completely different. --- src/stable/client.cpp | 184 +++------------- src/stable/client.h | 38 +--- src/stable/clientbuf.cpp | 465 +++++++++++++++++++++++++++++++++++++++ src/stable/clientbuf.h | 179 +++++++++++++++ src/stable/multiserver.cpp | 27 +-- src/stable/multiserver.h | 8 +- src/stable/server.cpp | 342 ++++++++++++++++++++++++----- src/stable/server.h | 97 +++++++- src/stable/serversocket.cpp | 10 + src/stable/serversocket.h | 44 ++++ src/stable/serversockettcp.cpp | 254 +++++++++++++++++++++ src/stable/serversockettcp.h | 65 ++++++ src/stable/socket.cpp | 10 + src/stable/socket.h | 33 +++ src/stable/sockettcp.cpp | 488 +++++++++++++++++++++++++++++++++++++++++ src/stable/sockettcp.h | 128 +++++++++++ src/stable/synchroqueue.h | 100 ++++++++- src/stable/tcpserversocket.cpp | 257 ---------------------- src/stable/tcpserversocket.h | 65 ------ src/stable/tcpsocket.cpp | 482 ---------------------------------------- src/stable/tcpsocket.h | 126 ----------- 21 files changed, 2204 insertions(+), 1198 deletions(-) create mode 100644 src/stable/clientbuf.cpp create mode 100644 src/stable/clientbuf.h create mode 100644 src/stable/serversocket.cpp create mode 100644 src/stable/serversocket.h create mode 100644 src/stable/serversockettcp.cpp create mode 100644 src/stable/serversockettcp.h create mode 100644 src/stable/socket.cpp create mode 100644 src/stable/socket.h create mode 100644 src/stable/sockettcp.cpp create mode 100644 src/stable/sockettcp.h delete mode 100644 src/stable/tcpserversocket.cpp delete mode 100644 src/stable/tcpserversocket.h delete mode 100644 src/stable/tcpsocket.cpp delete mode 100644 src/stable/tcpsocket.h (limited to 'src/stable') diff --git a/src/stable/client.cpp b/src/stable/client.cpp index 56c5094..d2d48d7 100644 --- a/src/stable/client.cpp +++ b/src/stable/client.cpp @@ -6,7 +6,6 @@ */ #include "bu/client.h" -#include "bu/tcpsocket.h" #include #include #include "bu/protocol.h" @@ -24,72 +23,23 @@ #define BU_PROFILE_END( x ) (void)0 #endif -Bu::Client::Client( Bu::TcpSocket *pSocket, - class Bu::ClientLinkFactory *pfLink ) : - pTopStream( pSocket ), - pSocket( pSocket ), +Bu::Client::Client( class Bu::ClientLinkFactory *pfLink ) : pProto( NULL ), bWantsDisconnect( false ), pfLink( pfLink ) { - Bu::ReadWriteMutex::WriteLocker lGlobal( mGlobal ); - lFilts.prepend( pSocket ); } Bu::Client::~Client() { - Bu::ReadWriteMutex::WriteLocker lGlobal( mGlobal ); - for( FilterList::iterator i = lFilts.begin(); i; i++ ) - { - delete *i; - } - pTopStream = pSocket = NULL; delete pfLink; } void Bu::Client::processInput() { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); - mRead.lock(); - char buf[RBS]; - Bu::size nRead, nTotal=0; + Bu::MutexLocker l( mProto ); - BU_PROFILE_START("client.read"); - for(;;) - { - try - { - nRead = pTopStream->read( buf, RBS ); - - if( nRead == 0 ) - { - break; - } - else - { - nTotal += nRead; - qbRead.write( buf, nRead ); - if( !pTopStream->canRead() ) - break; - } - } - catch( Bu::TcpSocketException &e ) - { - pTopStream->close(); - bWantsDisconnect = true; - break; - } - } - BU_PROFILE_END("client.read"); - mRead.unlock(); - - if( nTotal == 0 ) - { - pTopStream->close(); - bWantsDisconnect = true; - } - - if( pProto && nTotal ) + if( pProto && getInputSize() > 0 ) { BU_PROFILE_START("client.process"); pProto->onNewData( this ); @@ -97,197 +47,133 @@ void Bu::Client::processInput() } } -void Bu::Client::processOutput() -{ - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); - mWrite.lock(); - char buf[RBS]; - if( qbWrite.getSize() > 0 ) - { - int nAmnt = RBS; - nAmnt = qbWrite.peek( buf, nAmnt ); - int nReal = pTopStream->write( buf, nAmnt ); - qbWrite.seek( nReal ); - pTopStream->flush(); - } - mWrite.unlock(); -} - void Bu::Client::setProtocol( Protocol *pProto ) { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); + Bu::MutexLocker l( mProto ); this->pProto = pProto; this->pProto->onNewConnection( this ); } Bu::Protocol *Bu::Client::getProtocol() { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); + Bu::MutexLocker l( mProto ); return pProto; } void Bu::Client::clearProtocol() { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); + Bu::MutexLocker l( mProto ); pProto = NULL; } -/* -Bu::String &Bu::Client::getInput() -{ - return sReadBuf; -} - -Bu::String &Bu::Client::getOutput() -{ - return sWriteBuf; -} -*/ bool Bu::Client::isOpen() { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); - if( !pTopStream ) return false; - return pTopStream->isOpen(); + return true; } Bu::size Bu::Client::write( const Bu::String &sData ) { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); - Bu::MutexLocker l( mWrite ); - return qbWrite.write( sData.getStr(), sData.getSize() ); + return cbBuffer.client().write( sData.getStr(), sData.getSize() ); } Bu::size Bu::Client::write( const void *pData, Bu::size nBytes ) { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); - Bu::MutexLocker l( mWrite ); - return qbWrite.write( pData, nBytes ); + return cbBuffer.client().write( pData, nBytes ); } Bu::size Bu::Client::write( int8_t nData ) { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); - Bu::MutexLocker l( mWrite ); - return qbWrite.write( (const char *)&nData, sizeof(nData) ); + return cbBuffer.client().write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::write( int16_t nData ) { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); - Bu::MutexLocker l( mWrite ); - return qbWrite.write( (const char *)&nData, sizeof(nData) ); + return cbBuffer.client().write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::write( int32_t nData ) { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); - Bu::MutexLocker l( mWrite ); - return qbWrite.write( (const char *)&nData, sizeof(nData) ); + return cbBuffer.client().write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::write( int64_t nData ) { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); - Bu::MutexLocker l( mWrite ); - return qbWrite.write( (const char *)&nData, sizeof(nData) ); + return cbBuffer.client().write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::write( uint8_t nData ) { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); - Bu::MutexLocker l( mWrite ); - return qbWrite.write( (const char *)&nData, sizeof(nData) ); + return cbBuffer.client().write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::write( uint16_t nData ) { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); - Bu::MutexLocker l( mWrite ); - return qbWrite.write( (const char *)&nData, sizeof(nData) ); + return cbBuffer.client().write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::write( uint32_t nData ) { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); - Bu::MutexLocker l( mWrite ); - return qbWrite.write( (const char *)&nData, sizeof(nData) ); + return cbBuffer.client().write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::write( uint64_t nData ) { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); - Bu::MutexLocker l( mWrite ); - return qbWrite.write( (const char *)&nData, sizeof(nData) ); + return cbBuffer.client().write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::read( void *pData, Bu::size nBytes ) { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); - Bu::MutexLocker l( mWrite ); - return qbRead.read( pData, nBytes ); + return cbBuffer.client().read( pData, nBytes ); } + Bu::size Bu::Client::peek( void *pData, int nBytes, int nOffset ) { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); - Bu::MutexLocker l( mWrite ); - return qbRead.peek( pData, nBytes, nOffset ); + return cbBuffer.client().peek( pData, nBytes, nOffset ); } + Bu::size Bu::Client::getInputSize() { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); - Bu::MutexLocker l( mWrite ); - return qbRead.getSize(); + return cbBuffer.client().getSize(); } Bu::size Bu::Client::getOutputSize() { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); - Bu::MutexLocker l( mWrite ); - return qbWrite.getSize(); -} - -const Bu::TcpSocket *Bu::Client::getSocket() const -{ - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); - return pSocket; + return cbBuffer.server().getSize(); } void Bu::Client::disconnect() { - Bu::ReadWriteMutex::WriteLocker lGlobal( mGlobal ); + Bu::MutexLocker l( mDisconnect ); bWantsDisconnect = true; } bool Bu::Client::wantsDisconnect() { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); + Bu::MutexLocker l( mDisconnect ); return bWantsDisconnect; } void Bu::Client::close() { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); - pTopStream->close(); } Bu::ClientLink *Bu::Client::getLink() { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); return pfLink->createLink( this ); } void Bu::Client::onMessage( const Bu::String &sMsg ) { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); + Bu::MutexLocker l( mProto ); if( pProto ) pProto->onMessage( this, sMsg ); } void Bu::Client::tick() { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); + Bu::MutexLocker l( mProto ); if( pProto ) pProto->onTick( this ); } @@ -299,9 +185,7 @@ Bu::size Bu::Client::tell() void Bu::Client::seek( Bu::size offset ) { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); - Bu::MutexLocker l( mRead ); - return qbRead.seek( offset ); + return cbBuffer.client().seek( offset ); } void Bu::Client::setPos( Bu::size ) @@ -321,15 +205,11 @@ bool Bu::Client::isEos() void Bu::Client::flush() { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); - processOutput(); } bool Bu::Client::canRead() { - Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); - Bu::MutexLocker l( mRead ); - return qbRead.getSize() > 0; + return cbBuffer.client().getSize() > 0; } bool Bu::Client::canWrite() @@ -374,11 +254,13 @@ Bu::size Bu::Client::getSize() const Bu::size Bu::Client::getBlockSize() const { - return pSocket->getBlockSize(); + return 0; + //return pSocket->getBlockSize(); } Bu::String Bu::Client::getLocation() const { - return pSocket->getLocation(); + return "???"; + //return pSocket->getLocation(); } diff --git a/src/stable/client.h b/src/stable/client.h index abde682..4cd8e3f 100644 --- a/src/stable/client.h +++ b/src/stable/client.h @@ -12,7 +12,7 @@ #include "bu/config.h" #include "bu/string.h" -#include "bu/queuebuf.h" +#include "bu/clientbuf.h" #include "bu/mutex.h" #include "bu/readwritemutex.h" @@ -28,7 +28,6 @@ namespace Bu { class Protocol; class Stream; - class TcpSocket; class ClientLinkFactory; /** @@ -36,15 +35,13 @@ namespace Bu */ class Client : public Bu::Stream { + friend class Server; public: - Client( Bu::TcpSocket *pSocket, Bu::ClientLinkFactory *pfLink); + Client( Bu::ClientLinkFactory *pfLink ); virtual ~Client(); void processInput(); - void processOutput(); - //Bu::String &getInput(); - //Bu::String &getOutput(); Bu::size write( const Bu::String &sData ); Bu::size write( const void *pData, Bu::size nBytes ); Bu::size write( int8_t nData ); @@ -57,7 +54,7 @@ namespace Bu Bu::size write( uint64_t nData ); Bu::size read( void *pData, Bu::size nBytes ); Bu::size peek( void *pData, int nBytes, int nOffset=0 ); -// void seek( int nBytes ); + void seek( int nBytes ); Bu::size getInputSize(); Bu::size getOutputSize(); @@ -69,8 +66,6 @@ namespace Bu void close(); void tick(); - const Bu::TcpSocket *getSocket() const; - void disconnect(); bool wantsDisconnect(); @@ -78,31 +73,22 @@ namespace Bu void onMessage( const Bu::String &sMsg ); - bool hasOutput() { return qbWrite.getSize() > 0; } - bool hasInput() { return qbRead.getSize() > 0; } + bool hasOutput() { return cbBuffer.server().getSize() > 0; } + bool hasInput() { return cbBuffer.client().getSize() > 0; } template void pushFilter() { - filter *pFlt = new filter( *pTopStream ); - pTopStream = pFlt; - lFilts.prepend( pFlt ); } template void pushFilter( p1t p1 ) { - filter *pFlt = new filter( *pTopStream, p1 ); - pTopStream = pFlt; - lFilts.prepend( pFlt ); } template void pushFilter( p1t p1, p2t p2 ) { - filter *pFlt = new filter( *pTopStream, p1, p2 ); - pTopStream = pFlt; - lFilts.prepend( pFlt ); } /* @@ -128,18 +114,12 @@ namespace Bu virtual Bu::String getLocation() const; private: - typedef Bu::List FilterList; - FilterList lFilts; - Bu::Stream *pTopStream; - Bu::TcpSocket *pSocket; Bu::Protocol *pProto; - Bu::QueueBuf qbRead; - Bu::QueueBuf qbWrite; + Bu::ClientBuf cbBuffer; bool bWantsDisconnect; class Bu::ClientLinkFactory *pfLink; - Bu::Mutex mRead; - Bu::Mutex mWrite; - mutable Bu::ReadWriteMutex mGlobal; + mutable Bu::Mutex mProto; + mutable Bu::Mutex mDisconnect; }; } diff --git a/src/stable/clientbuf.cpp b/src/stable/clientbuf.cpp new file mode 100644 index 0000000..493e577 --- /dev/null +++ b/src/stable/clientbuf.cpp @@ -0,0 +1,465 @@ +#include "bu/clientbuf.h" + +#include "bu/mutexlocker.h" + +Bu::ClientBuf::ClientBuf() : + accClientRaw( *this ), + accServer( *this ), + accClientFiltered( &accClient ), + accClient( *this ) +{ +} + +Bu::ClientBuf::~ClientBuf() +{ +} + +Bu::ClientBuf::ClientAccess &Bu::ClientBuf::client() +{ + return accClient; +} + +Bu::ClientBuf::ServerAccess &Bu::ClientBuf::server() +{ + return accServer; +} + +///////// +// ClientAccessRaw +/// + +Bu::ClientBuf::ClientAccessRaw::ClientAccessRaw( Bu::ClientBuf &rBuf ) : + rBuf( rBuf ) +{ +} + +Bu::ClientBuf::ClientAccessRaw::~ClientAccessRaw() +{ +} + +void Bu::ClientBuf::ClientAccessRaw::close() +{ + // Roughly meaningless +} + +Bu::size Bu::ClientBuf::ClientAccessRaw::read( void *pBuf, size iBytes ) +{ + Bu::MutexLocker l( rBuf.mInput ); + return rBuf.qbInput.read( pBuf, iBytes ); +} + +Bu::size Bu::ClientBuf::ClientAccessRaw::write( const void *pBuf, size iBytes ) +{ + Bu::MutexLocker l( rBuf.mOutput ); + return rBuf.qbOutput.write( pBuf, iBytes ); +} + +Bu::size Bu::ClientBuf::ClientAccessRaw::tell() +{ + Bu::MutexLocker l( rBuf.mInput ); + return rBuf.qbInput.tell(); +} + +void Bu::ClientBuf::ClientAccessRaw::seek( Bu::size offset ) +{ + Bu::MutexLocker l( rBuf.mInput ); + rBuf.qbInput.seek( offset ); +} + +void Bu::ClientBuf::ClientAccessRaw::setPos( Bu::size ) +{ +} + +void Bu::ClientBuf::ClientAccessRaw::setPosEnd( Bu::size ) +{ +} + +bool Bu::ClientBuf::ClientAccessRaw::isEos() +{ + Bu::MutexLocker l( rBuf.mInput ); + return rBuf.qbInput.isEos(); +} + +bool Bu::ClientBuf::ClientAccessRaw::isOpen() +{ + Bu::MutexLocker l( rBuf.mInput ); + return rBuf.qbInput.isOpen(); +} + +void Bu::ClientBuf::ClientAccessRaw::flush() +{ + Bu::MutexLocker l( rBuf.mOutput ); + return rBuf.qbOutput.flush(); +} + +bool Bu::ClientBuf::ClientAccessRaw::canRead() +{ + Bu::MutexLocker l( rBuf.mInput ); + return rBuf.qbInput.canRead(); +} + +bool Bu::ClientBuf::ClientAccessRaw::canWrite() +{ + Bu::MutexLocker l( rBuf.mOutput ); + return rBuf.qbOutput.canWrite(); +} + +bool Bu::ClientBuf::ClientAccessRaw::isReadable() +{ + Bu::MutexLocker l( rBuf.mInput ); + return rBuf.qbInput.isReadable(); +} + +bool Bu::ClientBuf::ClientAccessRaw::isWritable() +{ + Bu::MutexLocker l( rBuf.mOutput ); + return rBuf.qbOutput.isWritable(); +} + +bool Bu::ClientBuf::ClientAccessRaw::isSeekable() +{ + Bu::MutexLocker l( rBuf.mInput ); + return rBuf.qbInput.isSeekable(); +} + +bool Bu::ClientBuf::ClientAccessRaw::isBlocking() +{ + return false; +} + +void Bu::ClientBuf::ClientAccessRaw::setBlocking( bool ) +{ +} + +void Bu::ClientBuf::ClientAccessRaw::setSize( Bu::size ) +{ + return; +} + +Bu::size Bu::ClientBuf::ClientAccessRaw::getSize() const +{ + Bu::MutexLocker l( rBuf.mInput ); + return rBuf.qbInput.getSize(); +} + +Bu::size Bu::ClientBuf::ClientAccessRaw::getBlockSize() const +{ + Bu::MutexLocker l( rBuf.mInput ); + return rBuf.qbInput.getBlockSize(); +} + +Bu::String Bu::ClientBuf::ClientAccessRaw::getLocation() const +{ + return "ClientBuf"; +} + +///////// +// ClientAccess +/// + +Bu::ClientBuf::ClientAccess::ClientAccess( Bu::ClientBuf &rBuf ) : + rBuf( rBuf ) +{ +} + +Bu::ClientBuf::ClientAccess::~ClientAccess() +{ +} + +void Bu::ClientBuf::ClientAccess::close() +{ + // Roughly meaningless +} + +Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes ) +{ + char *pBuf = (char *)pBufRaw; + Bu::MutexLocker l( mAccess ); + // Read from QueueBuf first + Bu::size ps = qbPeek.read( pBuf, iBytes ); + iBytes -= ps; + pBuf += ps; + // Request space left? Try the client + if( iBytes > 0 ) + { + ps += rBuf.accClientFiltered.read( pBuf, iBytes ); + } + return ps; +} + +Bu::size Bu::ClientBuf::ClientAccess::peek( void *pData, int iBytes, + int iOffset ) +{ + Bu::MutexLocker l( mAccess ); + // Do we have enough data in the peek buffer to handle this? + if( qbPeek.getSize() < iBytes+iOffset ) + { + // Nope, make an attempt to fill it in. + int nDiff = iBytes-qbPeek.getSize(); + // We have to make our own buffer, since iBytes+nOffeset could be bigger + // than pData. + char *pTmp = new char[nDiff]; + Bu::size ps = rBuf.accClientFiltered.read( pTmp, nDiff ); + if( ps > 0 ) + { + // Add the data read to the peek buffer. + qbPeek.write( pTmp, ps ); + } + delete[] pTmp; + } + + return qbPeek.peek( pData, iBytes, iOffset ); +} + +Bu::size Bu::ClientBuf::ClientAccess::write( const void *pBuf, size iBytes ) +{ + Bu::MutexLocker l( mAccess ); + return rBuf.accClientFiltered.write( pBuf, iBytes ); +} + +Bu::size Bu::ClientBuf::ClientAccess::tell() +{ + Bu::MutexLocker l( mAccess ); + return rBuf.accClientFiltered.tell() + qbPeek.getSize(); +} + +void Bu::ClientBuf::ClientAccess::seek( Bu::size offset ) +{ + Bu::MutexLocker l( mAccess ); + // For this type of stream seek is basically a destructive skip. It's like + // reading the data but with no output buffer. Let's remove data from the + // peek buffer first. + if( qbPeek.getSize() > 0 ) + { + Bu::size amount = Bu::buMax( qbPeek.getSize(), offset ); + qbPeek.seek( amount ); + offset -= amount; + } + + // If there's offset left, then apply it to the underlying stream + if( offset > 0 ) + { + rBuf.accClientFiltered.seek( offset ); + } +} + +void Bu::ClientBuf::ClientAccess::setPos( Bu::size ) +{ +} + +void Bu::ClientBuf::ClientAccess::setPosEnd( Bu::size ) +{ +} + +bool Bu::ClientBuf::ClientAccess::isEos() +{ + Bu::MutexLocker l( mAccess ); + return rBuf.accClientFiltered.isEos(); +} + +bool Bu::ClientBuf::ClientAccess::isOpen() +{ + Bu::MutexLocker l( mAccess ); + return rBuf.accClientFiltered.isOpen(); +} + +void Bu::ClientBuf::ClientAccess::flush() +{ + Bu::MutexLocker l( mAccess ); + return rBuf.accClientFiltered.flush(); +} + +bool Bu::ClientBuf::ClientAccess::canRead() +{ + Bu::MutexLocker l( mAccess ); + return rBuf.accClientFiltered.canRead(); +} + +bool Bu::ClientBuf::ClientAccess::canWrite() +{ + Bu::MutexLocker l( mAccess ); + return rBuf.accClientFiltered.canWrite(); +} + +bool Bu::ClientBuf::ClientAccess::isReadable() +{ + Bu::MutexLocker l( mAccess ); + return rBuf.accClientFiltered.isReadable(); +} + +bool Bu::ClientBuf::ClientAccess::isWritable() +{ + Bu::MutexLocker l( mAccess ); + return rBuf.accClientFiltered.isWritable(); +} + +bool Bu::ClientBuf::ClientAccess::isSeekable() +{ + Bu::MutexLocker l( mAccess ); + return rBuf.accClientFiltered.isSeekable(); +} + +bool Bu::ClientBuf::ClientAccess::isBlocking() +{ + return false; +} + +void Bu::ClientBuf::ClientAccess::setBlocking( bool ) +{ +} + +void Bu::ClientBuf::ClientAccess::setSize( Bu::size ) +{ + return; +} + +Bu::size Bu::ClientBuf::ClientAccess::getSize() const +{ + Bu::MutexLocker l( mAccess ); + return rBuf.accClientFiltered.getSize() + qbPeek.getSize(); +} + +Bu::size Bu::ClientBuf::ClientAccess::getBlockSize() const +{ + Bu::MutexLocker l( mAccess ); + return rBuf.accClientFiltered.getBlockSize(); +} + +Bu::String Bu::ClientBuf::ClientAccess::getLocation() const +{ + return "ClientBuf"; +} + +///////// +// ServerAccess +/// + +Bu::ClientBuf::ServerAccess::ServerAccess( Bu::ClientBuf &rBuf ) : + rBuf( rBuf ) +{ +} + +Bu::ClientBuf::ServerAccess::~ServerAccess() +{ +} + +void Bu::ClientBuf::ServerAccess::close() +{ +} + +Bu::size Bu::ClientBuf::ServerAccess::read( void *pBuf, size iBytes ) +{ + Bu::MutexLocker l( rBuf.mOutput ); + return rBuf.qbOutput.read( pBuf, iBytes ); +} + +Bu::size Bu::ClientBuf::ServerAccess::peek( void *pData, int iBytes, int iOffset ) +{ + Bu::MutexLocker l( rBuf.mOutput ); + return rBuf.qbOutput.peek( pData, iBytes, iOffset ); +} + +Bu::size Bu::ClientBuf::ServerAccess::write( const void *pBuf, size iBytes ) +{ + Bu::MutexLocker l( rBuf.mInput ); + return rBuf.qbInput.write( pBuf, iBytes ); +} + +Bu::size Bu::ClientBuf::ServerAccess::tell() +{ + Bu::MutexLocker l( rBuf.mOutput ); + return rBuf.qbOutput.tell(); +} + +void Bu::ClientBuf::ServerAccess::seek( Bu::size offset ) +{ + Bu::MutexLocker l( rBuf.mOutput ); + rBuf.qbOutput.seek( offset ); +} + +void Bu::ClientBuf::ServerAccess::setPos( Bu::size ) +{ +} + +void Bu::ClientBuf::ServerAccess::setPosEnd( Bu::size ) +{ +} + +bool Bu::ClientBuf::ServerAccess::isEos() +{ + Bu::MutexLocker l( rBuf.mOutput ); + return rBuf.qbOutput.isEos(); +} + +bool Bu::ClientBuf::ServerAccess::isOpen() +{ + Bu::MutexLocker l( rBuf.mOutput ); + return rBuf.qbOutput.isOpen(); +} + +void Bu::ClientBuf::ServerAccess::flush() +{ + Bu::MutexLocker l( rBuf.mInput ); + return rBuf.qbInput.flush(); +} + +bool Bu::ClientBuf::ServerAccess::canRead() +{ + Bu::MutexLocker l( rBuf.mOutput ); + return rBuf.qbOutput.canRead(); +} + +bool Bu::ClientBuf::ServerAccess::canWrite() +{ + Bu::MutexLocker l( rBuf.mInput ); + return rBuf.qbInput.canWrite(); +} + +bool Bu::ClientBuf::ServerAccess::isReadable() +{ + Bu::MutexLocker l( rBuf.mOutput ); + return rBuf.qbOutput.isReadable(); +} + +bool Bu::ClientBuf::ServerAccess::isWritable() +{ + Bu::MutexLocker l( rBuf.mInput ); + return rBuf.qbInput.isWritable(); +} + +bool Bu::ClientBuf::ServerAccess::isSeekable() +{ + Bu::MutexLocker l( rBuf.mOutput ); + return rBuf.qbOutput.isSeekable(); +} + +bool Bu::ClientBuf::ServerAccess::isBlocking() +{ + return false; +} + +void Bu::ClientBuf::ServerAccess::setBlocking( bool ) +{ +} + +void Bu::ClientBuf::ServerAccess::setSize( Bu::size ) +{ + return; +} + +Bu::size Bu::ClientBuf::ServerAccess::getSize() const +{ + Bu::MutexLocker l( rBuf.mOutput ); + return rBuf.qbOutput.getSize(); +} + +Bu::size Bu::ClientBuf::ServerAccess::getBlockSize() const +{ + Bu::MutexLocker l( rBuf.mOutput ); + return rBuf.qbOutput.getBlockSize(); +} + +Bu::String Bu::ClientBuf::ServerAccess::getLocation() const +{ + return "ClientBuf"; +} diff --git a/src/stable/clientbuf.h b/src/stable/clientbuf.h new file mode 100644 index 0000000..7781b6a --- /dev/null +++ b/src/stable/clientbuf.h @@ -0,0 +1,179 @@ +/* + * Copyright (C) 2007-2019 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#ifndef BU_CLIENT_BUF_H +#define BU_CLIENT_BUF_H + +#include "bu/stream.h" +#include "bu/streamstack.h" +#include "bu/queuebuf.h" +#include "bu/mutex.h" + +namespace Bu +{ + /** + * ClientBuf represents two buffered I/O channels, input and output. Each + * is stored in its own Bu::QueueBuf. One side is client oriented and can + * have filters applied ala Bu::StreamStack. The other is server oriented + * and is accesesed raw. Both must be accessed via accessor objects. + * + * Direct access to the buffers isn't allowed, instead you must select a + * perspective. client() provides an accessor where reading consumes data + * coming from the socket and writing provides data to write to the socket. + * server() works in reverse, writing is writing data from the socket that + * we have just read, reading is reading data that should be written to the + * socket. + * + * client() provides a filtered access view of the data, while server() is + * unfiltered. + */ + class ClientBuf + { + public: + class ClientAccess : public Bu::Stream + { + friend class Bu::ClientBuf; + private: + ClientAccess( Bu::ClientBuf &rBuf ); + + public: + virtual ~ClientAccess(); + + virtual void close(); + virtual size read( void *pBuf, size iBytes ); + virtual size peek( void *pData, int iBytes, int iOffset=0 ); + virtual size write( const void *pBuf, size iBytes ); + using Stream::write; + virtual size tell(); + virtual void seek( size offset ); + virtual void setPos( size pos ); + virtual void setPosEnd( size pos ); + virtual bool isEos(); + virtual bool isOpen(); + virtual void flush(); + virtual bool canRead(); + virtual bool canWrite(); + virtual bool isReadable(); + virtual bool isWritable(); + virtual bool isSeekable(); + virtual bool isBlocking(); + virtual void setBlocking( bool bBlocking=true ); + virtual void setSize( size iSize ); + virtual size getSize() const; + virtual size getBlockSize() const; + virtual Bu::String getLocation() const; + + private: + Bu::ClientBuf &rBuf; + Bu::QueueBuf qbPeek; + mutable Bu::Mutex mAccess; + }; + + class ServerAccess : public Bu::Stream + { + friend class Bu::ClientBuf; + private: + ServerAccess( Bu::ClientBuf &rBuf ); + + public: + virtual ~ServerAccess(); + + virtual void close(); + virtual size read( void *pBuf, size iBytes ); + virtual size peek( void *pData, int iBytes, int iOffset=0 ); + virtual size write( const void *pBuf, size iBytes ); + using Stream::write; + virtual size tell(); + virtual void seek( size offset ); + virtual void setPos( size pos ); + virtual void setPosEnd( size pos ); + virtual bool isEos(); + virtual bool isOpen(); + virtual void flush(); + virtual bool canRead(); + virtual bool canWrite(); + virtual bool isReadable(); + virtual bool isWritable(); + virtual bool isSeekable(); + virtual bool isBlocking(); + virtual void setBlocking( bool bBlocking=true ); + virtual void setSize( size iSize ); + virtual size getSize() const; + virtual size getBlockSize() const; + virtual Bu::String getLocation() const; + + private: + Bu::ClientBuf &rBuf; + }; + + private: + class ClientAccessRaw : public Bu::Stream + { + friend class Bu::ClientBuf; + private: + ClientAccessRaw( Bu::ClientBuf &rBuf ); + + public: + virtual ~ClientAccessRaw(); + + virtual void close(); + virtual size read( void *pBuf, size iBytes ); + virtual size write( const void *pBuf, size iBytes ); + using Stream::write; + virtual size tell(); + virtual void seek( size offset ); + virtual void setPos( size pos ); + virtual void setPosEnd( size pos ); + virtual bool isEos(); + virtual bool isOpen(); + virtual void flush(); + virtual bool canRead(); + virtual bool canWrite(); + virtual bool isReadable(); + virtual bool isWritable(); + virtual bool isSeekable(); + virtual bool isBlocking(); + virtual void setBlocking( bool bBlocking=true ); + virtual void setSize( size iSize ); + virtual size getSize() const; + virtual size getBlockSize() const; + virtual Bu::String getLocation() const; + + private: + Bu::ClientBuf &rBuf; + }; + + public: + ClientBuf(); + virtual ~ClientBuf(); + + /** + * Access the client access side. Use directly, do NOT copy or store. + */ + ClientAccess &client(); + + /** + * Access the server access side. Use directly, do NOT copy or store. + */ + ServerAccess &server(); + + private: + ClientAccessRaw accClientRaw; + ServerAccess accServer; + Bu::StreamStack accClientFiltered; + ClientAccess accClient; + Bu::QueueBuf qbOutput; + Bu::QueueBuf qbInput; + Bu::Mutex mOutput; + Bu::Mutex mInput; + friend class Bu::ClientBuf::ClientAccess; + friend class Bu::ClientBuf::ClientAccessRaw; + friend class Bu::ClientBuf::ServerAccess; + }; +} + +#endif diff --git a/src/stable/multiserver.cpp b/src/stable/multiserver.cpp index e0386a7..697725c 100644 --- a/src/stable/multiserver.cpp +++ b/src/stable/multiserver.cpp @@ -8,6 +8,7 @@ #include "bu/multiserver.h" #include "bu/protocol.h" #include "bu/client.h" +#include "bu/serversocket.h" #include "bu/config.h" @@ -21,23 +22,23 @@ Bu::MultiServer::~MultiServer() { } -void Bu::MultiServer::addProtocol( Bu::Protocol *(*proc)(), int iPort, - int nPoolSize ) +void Bu::MultiServer::addProtocol( Bu::Protocol *(*proc)(), + Bu::ServerSocket *pServer ) { - hProtos[iPort] = proc; - addPort( iPort, nPoolSize ); + addServerSocket( pServer ); + int iFd; + pServer->getFd( iFd ); + hProtos.insert( iFd, proc ); } -void Bu::MultiServer::addProtocol( Protocol *(*proc)(), const String &sAddr, - int iPort, int nPoolSize ) +void Bu::MultiServer::onNewConnection( const Bu::ServerSocket *pSrv, + Client *pClient, Bu::Socket * /*pSocket*/ ) { - hProtos[iPort] = proc; - addPort( sAddr, iPort, nPoolSize ); -} - -void Bu::MultiServer::onNewConnection( Bu::Client *pClient, int nPort ) -{ - pClient->setProtocol( hProtos.get( nPort )() ); + int iFd; + if( pSrv->getFd( iFd ) ) + { + pClient->setProtocol( hProtos.get( iFd )() ); + } } void Bu::MultiServer::onClosedConnection( Bu::Client *pClient ) diff --git a/src/stable/multiserver.h b/src/stable/multiserver.h index b12aa90..2490427 100644 --- a/src/stable/multiserver.h +++ b/src/stable/multiserver.h @@ -15,6 +15,7 @@ namespace Bu { class Protocol; class Client; + class ServerSocket; template Protocol *genProtocol() @@ -28,9 +29,7 @@ namespace Bu MultiServer(); virtual ~MultiServer(); - void addProtocol( Protocol *(*proc)(), int iPort, int nPoolSize=40 ); - void addProtocol( Protocol *(*proc)(), const String &sAddr, int iPort, - int nPoolSize=40 ); + void addProtocol( Protocol *(*proc)(), ServerSocket *pServer ); void scan() { @@ -42,7 +41,8 @@ namespace Bu Server::setTimeout( nTimeoutSec, nTimeoutUSec ); } - virtual void onNewConnection( Client *pClient, int nPort ); + virtual void onNewConnection( const Bu::ServerSocket *pSrv, + Client *pClient, Bu::Socket *pSocket ); virtual void onClosedConnection( Client *pClient ); void shutdown(); diff --git a/src/stable/server.cpp b/src/stable/server.cpp index 0552510..592230d 100644 --- a/src/stable/server.cpp +++ b/src/stable/server.cpp @@ -8,10 +8,11 @@ #include "bu/server.h" #include #include -#include "bu/tcpserversocket.h" +#include "bu/serversocket.h" #include "bu/client.h" -#include "bu/tcpsocket.h" +#include "bu/socket.h" #include "bu/config.h" +#include "bu/mutexlocker.h" #ifdef PROFILE_BU_SERVER #define BU_PROFILE_START( x ) Bu::Profiler::getInstance().startEvent( x ) @@ -21,13 +22,34 @@ #define BU_PROFILE_END( x ) (void)0 #endif -Bu::Server::Server() : +#define RBS 1500 + +Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : nTimeoutSec( 0 ), nTimeoutUSec( 0 ), bAutoTick( false ) { BU_PROFILE_START("server"); FD_ZERO( &fdActive ); + + if( iIoWorkers < 1 ) + iIoWorkers = 1; + if( iClientWorkers < 1 ) + iClientWorkers = 1; + + for( int j = 0; j < iIoWorkers; j++ ) + { + IoWorker *pWorker = new IoWorker( *this, qIoEvent, qClientEvent ); + lIoWorker.append( pWorker ); + pWorker->start(); + } + + for( int j = 0; j < iClientWorkers; j++ ) + { + ClientWorker *pWorker = new ClientWorker( *this, qClientEvent ); + lClientWorker.append( pWorker ); + pWorker->start(); + } } Bu::Server::~Server() @@ -36,20 +58,16 @@ Bu::Server::~Server() BU_PROFILE_START("server"); } -void Bu::Server::addPort( int nPort, int nPoolSize ) +void Bu::Server::addServerSocket( Bu::ServerSocket *pSocket ) { - TcpServerSocket *s = new TcpServerSocket( nPort, nPoolSize ); - socket_t nSocket = s->getSocket(); - FD_SET( nSocket, &fdActive ); - hServers.insert( nSocket, s ); -} - -void Bu::Server::addPort( const String &sAddr, int nPort, int nPoolSize ) -{ - TcpServerSocket *s = new TcpServerSocket( sAddr, nPort, nPoolSize ); - socket_t nSocket = s->getSocket(); - FD_SET( nSocket, &fdActive ); - hServers.insert( nSocket, s ); + fd iFd; + if( !pSocket->getFd( iFd ) ) + { + throw Bu::ExceptionBase("Cannot get file descriptor from " + "provided ServerSocket."); + } + FD_SET( iFd, &fdActive ); + hServers.insert( iFd, pSocket ); } void Bu::Server::setTimeout( int nTimeoutSec, int nTimeoutUSec ) @@ -68,11 +86,13 @@ void Bu::Server::scan() fd_set fdException = fdActive; FD_ZERO( &fdWrite ); + mClients.lock(); for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) { if( (*i)->hasOutput() ) FD_SET( i.getKey(), &fdWrite ); } + mClients.unlock(); if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 ) @@ -91,42 +111,17 @@ void Bu::Server::scan() { if( hServers.has( j ) ) { - TcpServerSocket *pSrv = hServers.get( j ); - addClient( pSrv->accept(), pSrv->getPort() ); + Bu::ServerSocket *pSrv = hServers.get( j ); + addClient( pSrv, pSrv->accept() ); } else { - Client *pClient = hClients.get( j ); - BU_PROFILE_START("processInput"); - pClient->processInput(); - BU_PROFILE_END("processInput"); - if( !pClient->isOpen() ) - { - closeClient( j ); - } + qIoEvent.enqueue( new Event( j, Event::Read ) ); } } if( FD_ISSET( j, &fdWrite ) ) { - try - { - Client *pClient = hClients.get( j ); - try - { - BU_PROFILE_START("processOutput"); - pClient->processOutput(); - BU_PROFILE_END("processOutput"); - } - catch( Bu::TcpSocketException &e ) - { - closeClient( j ); - } - } - catch( Bu::HashException &e ) - { - // Do nothing, I guess, the client is already dead... - // TODO: Someday, we may want to handle this more graceully. - } + qIoEvent.enqueue( new Event( j, Event::Write ) ); } } @@ -134,6 +129,7 @@ void Bu::Server::scan() // Now we just try to write all the pending data on all the sockets. // this could be done better eventually, if we care about the socket // wanting to accept writes (using a select). + mClients.lock(); for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) { if( (*i)->wantsDisconnect() && !(*i)->hasOutput() ) @@ -141,6 +137,7 @@ void Bu::Server::scan() lDelete.append( i.getKey() ); } } + mClients.unlock(); for( Bu::List::iterator i = lDelete.begin(); i != lDelete.end(); i++ ) { @@ -153,21 +150,47 @@ void Bu::Server::scan() BU_PROFILE_END("scan"); } -void Bu::Server::addClient( socket_t nSocket, int nPort ) +void Bu::Server::addClient( const Bu::ServerSocket *pSrv, Bu::Socket *pSocket ) { BU_PROFILE_START("addClient"); - FD_SET( nSocket, &fdActive ); + int iFdSrv; + int iFdCli; + if( !pSrv->getFd( iFdSrv ) || !pSrv->getFd( iFdCli ) ) + { + throw Bu::ExceptionBase("No file descriptor?"); + } + FD_SET( iFdCli, &fdActive ); - Client *c = new Client( - new Bu::TcpSocket( nSocket ), + Client *pClient = new Client( new SrvClientLinkFactory() ); - hClients.insert( nSocket, c ); + { + Bu::MutexLocker l( mClients ); + hClients.insert( iFdCli, pClient ); + hSockets.insert( iFdCli, pSocket ); + } - onNewConnection( c, nPort ); + onNewConnection( pSrv, pClient, pSocket ); BU_PROFILE_END("addClient"); } +Bu::Client *Bu::Server::getClient( fd iId ) +{ + Bu::MutexLocker l( mClients ); + return hClients.get( iId ); +} + +bool Bu::Server::getClientAndSocket( fd iId, Bu::Client *&pClient, + Bu::Socket *&pSocket ) +{ + Bu::MutexLocker l( mClients ); + if( !hClients.has( iId ) || !hSockets.has( iId ) ) + return false; + pClient = hClients.get( iId ); + pSocket = hSockets.get( iId ); + return true; +} + Bu::Server::SrvClientLink::SrvClientLink( Bu::Client *pClient ) : pClient( pClient ) { @@ -203,14 +226,45 @@ void Bu::Server::setAutoTick( bool bEnable ) void Bu::Server::tick() { + mClients.lock(); for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) { (*i)->tick(); } + mClients.unlock(); } void Bu::Server::shutdown() { + { + qIoEvent.stop(); + qClientEvent.stop(); + Bu::Server::Event *pEv; + while( (pEv = qIoEvent.drain()) != NULL ) + { + delete pEv; + } + while( (pEv = qClientEvent.drain()) != NULL ) + { + delete pEv; + } + + Bu::MutexLocker l( mWorkers ); + for( IoWorkerList::iterator i = lIoWorker.begin(); i; i++ ) + { + (*i)->join(); + delete *i; + } + lIoWorker.clear(); + for( ClientWorkerList::iterator i = lClientWorker.begin(); + i; i++ ) + { + (*i)->join(); + delete *i; + } + lClientWorker.clear(); + } + for( SrvHash::iterator i = hServers.begin(); i != hServers.end(); i++ ) { delete *i; @@ -218,23 +272,201 @@ void Bu::Server::shutdown() hServers.clear(); - for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) + ClientHash::KeyList lClients = hClients.getKeys(); + for( ClientHash::KeyList::iterator i = lClients.begin(); i; i++ ) { - closeClient( i.getKey() ); + closeClient( *i ); } - hClients.clear(); } -void Bu::Server::closeClient( socket_t iSocket ) +void Bu::Server::closeClient( fd iSocket ) { BU_PROFILE_START("closeClient"); Bu::Client *pClient = hClients.get( iSocket ); + Bu::Socket *pSocket = hSockets.get( iSocket ); onClosedConnection( pClient ); pClient->close(); hClients.erase( iSocket ); + pSocket->close(); + hSockets.erase( iSocket ); FD_CLR( iSocket, &fdActive ); delete pClient; + delete pSocket; BU_PROFILE_END("closeClient"); } +//////// +// Event +//// + +Bu::Server::Event::Event( fd iId, Operation eOp ) : + iId( iId ), + eOp( eOp ) +{ +} + +Bu::Server::Event::~Event() +{ +} + +Bu::Server::fd Bu::Server::Event::getId() const +{ + return iId; +} + +Bu::Server::Event::Operation Bu::Server::Event::getOperation() const +{ + return eOp; +} + +///////// +// IoWorker +//// + +Bu::Server::IoWorker::IoWorker( Bu::Server &rSrv, + Bu::Server::EventQueue &qIoEvent, + Bu::Server::EventQueue &qClientEvent ) : + rSrv( rSrv ), + qIoEvent( qIoEvent ), + qClientEvent( qClientEvent ) +{ +} + +Bu::Server::IoWorker::~IoWorker() +{ +} + +void Bu::Server::IoWorker::run() +{ + while( qIoEvent.isRunning() ) + { + Event *pEv = qIoEvent.dequeue(); + if( pEv == NULL ) + continue; + + Client *pClient; + Socket *pSocket; + if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) ) + { + delete pEv; + continue; + } + + switch( pEv->getOperation() ) + { + case Event::Read: + handleRead( pClient, pSocket ); + break; + + case Event::Write: + handleWrite( pClient, pSocket ); + break; + + case Event::Process: + break; + } + + delete pEv; + } +} + +void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) +{ + char buf[RBS]; + Bu::size iRead; + Bu::size iTotal=0; + + BU_PROFILE_START("client.read"); + for(;;) + { + try + { + iRead = pSocket->read( buf, RBS ); + + if( iRead == 0 ) + { + break; + } + else + { + iTotal += iRead; + pClient->cbBuffer.server().write( buf, iRead ); + if( !pSocket->canRead() ) + break; + } + } + catch( Bu::ExceptionBase &e ) + { + close( pSocket ); + break; + } + } + BU_PROFILE_END("client.read"); + + if( iTotal == 0 ) + { + close( pSocket ); + } + else + { + Bu::Server::fd iFd; + pSocket->getFd( iFd ); + qClientEvent.enqueue( new Event( iFd, Event::Process ) ); + } +} + +void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket ) +{ + char buf[RBS]; + if( pClient->hasOutput() > 0 ) + { + int iAmnt = RBS; + iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); + int iReal = pSocket->write( buf, iAmnt ); + pClient->cbBuffer.server().seek( iReal ); + } +} + +void Bu::Server::IoWorker::close( Bu::Socket *pSocket ) +{ + Bu::Server::fd iFd; + pSocket->getFd( iFd ); + rSrv.closeClient( iFd ); +} + +///////// +// ClientWorker +//// + +Bu::Server::ClientWorker::ClientWorker( Bu::Server &rSrv, + Bu::Server::EventQueue &qEvent ) : + rSrv( rSrv ), + qEvent( qEvent ) +{ +} + +Bu::Server::ClientWorker::~ClientWorker() +{ +} + +void Bu::Server::ClientWorker::run() +{ + while( qEvent.isRunning() ) + { + Event *pEv = qEvent.dequeue(); + if( pEv == NULL ) + continue; + + Client *pClient = rSrv.getClient( pEv->getId() ); + if( pClient == NULL ) + { + delete pEv; + continue; + } + + pClient->processInput(); + delete pEv; + } +} + diff --git a/src/stable/server.h b/src/stable/server.h index d3f0903..d66d9d5 100644 --- a/src/stable/server.h +++ b/src/stable/server.h @@ -20,6 +20,8 @@ #include "bu/clientlink.h" #include "bu/clientlinkfactory.h" #include "bu/hash.h" +#include "bu/synchroqueue.h" +#include "bu/thread.h" #include "bu/config.h" @@ -33,8 +35,8 @@ namespace Bu { - class TcpServerSocket; - class TcpSocket; + class ServerSocket; + class Socket; class Client; /** @@ -60,33 +62,35 @@ namespace Bu class Server { public: - Server(); + Server( int iIoWorkers=4, int iClientWorkers=8 ); virtual ~Server(); #ifdef WIN32 - typedef unsigned int socket_t; + typedef unsigned int fd; #else - typedef int socket_t; + typedef int fd; #endif - void addPort( int nPort, int nPoolSize=40 ); - void addPort( const String &sAddr, int nPort, int nPoolSize=40 ); + void addServerSocket( Bu::ServerSocket *pSocket ); virtual void scan(); void setTimeout( int nTimeoutSec, int nTimeoutUSec=0 ); - void addClient( socket_t nSocket, int nPort ); + void addClient( const Bu::ServerSocket *pSrv, Bu::Socket *pSocket ); + Bu::Client *getClient( fd iId ); + bool getClientAndSocket( fd iId, Bu::Client *&pClient, + Bu::Socket *&pSocket ); void setAutoTick( bool bEnable=true ); void tick(); - virtual void onNewConnection( Client *pClient, int nPort )=0; + virtual void onNewConnection( const Bu::ServerSocket *pSrv, Client *pClient, Bu::Socket *pSocket )=0; virtual void onClosedConnection( Client *pClient )=0; void shutdown(); private: - void closeClient( socket_t iSocket ); + void closeClient( fd iSocket ); class SrvClientLink : public Bu::ClientLink { public: @@ -108,14 +112,83 @@ namespace Bu virtual Bu::ClientLink *createLink( Bu::Client *pClient ); }; + class Event + { + public: + enum Operation + { + Read, + Write, + Process + }; + Event( fd iId, Operation eOp ); + ~Event(); + + fd getId() const; + Operation getOperation() const; + + private: + fd iId; + Operation eOp; + }; + + typedef Bu::SynchroQueue EventQueue; + + class IoWorker : public Bu::Thread + { + public: + IoWorker( Server &rSrv, EventQueue &qIoEvent, + EventQueue &qClientEvent ); + virtual ~IoWorker(); + + protected: + virtual void run(); + + private: + void handleRead( Client *pClient, Socket *pSocket ); + void handleWrite( Client *pClient, Socket *pSocket ); + void close( Socket *pSocket ); + + private: + Server &rSrv; + EventQueue &qIoEvent; + EventQueue &qClientEvent; + }; + + class ClientWorker : public Bu::Thread + { + public: + ClientWorker( Server &rSrv, EventQueue &qEvent ); + virtual ~ClientWorker(); + + protected: + virtual void run(); + + private: + Server &rSrv; + EventQueue &qEvent; + }; + int nTimeoutSec; int nTimeoutUSec; fd_set fdActive; - typedef Hash SrvHash; + typedef Hash SrvHash; SrvHash hServers; - typedef Hash ClientHash; + typedef Hash ClientHash; + typedef Hash SocketHash; ClientHash hClients; + SocketHash hSockets; bool bAutoTick; + Bu::Mutex mClients; + Bu::Mutex mScan; + Bu::Mutex mWorkers; + + EventQueue qIoEvent; + EventQueue qClientEvent; + typedef List IoWorkerList; + typedef List ClientWorkerList; + IoWorkerList lIoWorker; + ClientWorkerList lClientWorker; }; } diff --git a/src/stable/serversocket.cpp b/src/stable/serversocket.cpp new file mode 100644 index 0000000..f0384b7 --- /dev/null +++ b/src/stable/serversocket.cpp @@ -0,0 +1,10 @@ +#include "bu/serversocket.h" + +Bu::ServerSocket::ServerSocket() +{ +} + +Bu::ServerSocket::~ServerSocket() +{ +} + diff --git a/src/stable/serversocket.h b/src/stable/serversocket.h new file mode 100644 index 0000000..cb2591d --- /dev/null +++ b/src/stable/serversocket.h @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2007-2019 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#ifndef BU_SERVER_SOCKET_H +#define BU_SERVER_SOCKET_H + +#include + +namespace Bu +{ + class Socket; + + /** + * Abstract representation of a server socket of some kind. Maybe socket + * isn't strictly accurate. This could be a tcp/ip socket, a named + * filesystem based socket, etc. + * + *@ingroup Serving + */ + class ServerSocket + { + public: + ServerSocket(); + virtual ~ServerSocket(); + + /** + * Accept a new connection, returning a connected Bu::Socket object. + */ + virtual Bu::Socket *accept( int nTimeoutSec=0, int nTimeoutUSec=0 )=0; + + /** + * Provide the contained file descriptor. Return false if there is no + * internal file descriptor as such. If the return value is true, then + * rFdOut will be set to the file descriptor. + */ + virtual bool getFd( int &rFdOut ) const=0; + }; +} + +#endif diff --git a/src/stable/serversockettcp.cpp b/src/stable/serversockettcp.cpp new file mode 100644 index 0000000..c3750b2 --- /dev/null +++ b/src/stable/serversockettcp.cpp @@ -0,0 +1,254 @@ +/* + * Copyright (C) 2007-2019 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#include "bu/config.h" + +#ifndef WIN32 + #include + #include + #include + #include +#endif + +#include +#include +#include +#include +#include +#include +#include +//#include +#include +#include "bu/serversockettcp.h" +#include "bu/sockettcp.h" + +namespace Bu { subExceptionDef( ServerSocketTcpException ) } + +Bu::ServerSocketTcp::ServerSocketTcp( int iPort, int nPoolSize ) : + iPort( iPort ) +{ +#ifdef WIN32 + Bu::Winsock2::getInstance(); +#endif + + /* Create the socket and set it up to accept connections. */ + struct sockaddr_in name; + + /* Give the socket a name. */ + name.sin_family = AF_INET; + name.sin_port = bu_htons( iPort ); + + // I think this specifies who we will accept connections from, + // a good thing to make configurable later on + name.sin_addr.s_addr = bu_htonl( INADDR_ANY ); + + startServer( name, nPoolSize ); +} + +Bu::ServerSocketTcp::ServerSocketTcp(const String &sAddr,int iPort, int nPoolSize) : + iPort( iPort ) +{ +#ifdef WIN32 + Bu::Winsock2::getInstance(); +#endif + + /* Create the socket and set it up to accept connections. */ + struct sockaddr_in name; + + /* Give the socket a name. */ + name.sin_family = AF_INET; + + name.sin_port = bu_htons( iPort ); + +#ifdef WIN32 + name.sin_addr.s_addr = bu_inet_addr( sAddr.getStr() ); +#else + inet_aton( sAddr.getStr(), &name.sin_addr ); +#endif + + startServer( name, nPoolSize ); +} + +Bu::ServerSocketTcp::ServerSocketTcp( socket_t iSocket, bool bInit, int nPoolSize ) : + iSocket( iSocket ), + iPort( 0 ) +{ +#ifdef WIN32 + Bu::Winsock2::getInstance(); +#endif + + if( bInit ) + { + struct sockaddr name; + socklen_t namelen = sizeof(name); + getpeername( iSocket, &name, &namelen ); + + initServer( *((sockaddr_in *)&name), nPoolSize ); + } + else + { + FD_ZERO( &fdActive ); + FD_SET( iSocket, &fdActive ); + } +} + +Bu::ServerSocketTcp::ServerSocketTcp( const ServerSocketTcp &rSrc ) +{ +#ifdef WIN32 + Bu::Winsock2::getInstance(); +#endif + + iSocket = dup( rSrc.iSocket ); + iPort = rSrc.iPort; + FD_ZERO( &fdActive ); + FD_SET( iSocket, &fdActive ); +} + +Bu::ServerSocketTcp::~ServerSocketTcp() +{ +#ifdef WIN32 + if( iSocket != INVALID_SOCKET ) +#else + if( iSocket > -1 ) +#endif + ::close( iSocket ); +} + +void Bu::ServerSocketTcp::startServer( struct sockaddr_in &name, int nPoolSize ) +{ + /* Create the socket. */ + iSocket = bu_socket( PF_INET, SOCK_STREAM, 0 ); + +#ifdef WIN32 + if( iSocket == INVALID_SOCKET ) +#else + if( iSocket < 0 ) +#endif + { + throw Bu::ServerSocketTcpException("Couldn't create a listen socket."); + } + + int opt = 1; + bu_setsockopt( + iSocket, + SOL_SOCKET, + SO_REUSEADDR, + (char *)&opt, + sizeof( opt ) + ); + + initServer( name, nPoolSize ); +} + +void Bu::ServerSocketTcp::initServer( struct sockaddr_in &name, int nPoolSize ) +{ + if( bu_bind( iSocket, (struct sockaddr *) &name, sizeof(name) ) < 0 ) + { + throw Bu::ServerSocketTcpException("Couldn't bind to the listen socket."); + } + + if( bu_listen( iSocket, nPoolSize ) < 0 ) + { + throw Bu::ServerSocketTcpException( + "Couldn't begin listening to the server socket." + ); + } + + FD_ZERO( &fdActive ); + /* Initialize the set of active sockets. */ + FD_SET( iSocket, &fdActive ); +} + +Bu::Socket *Bu::ServerSocketTcp::accept( int nTimeoutSec, int nTimeoutUSec ) +{ + fd_set fdRead = fdActive; + + struct timeval xT; + + xT.tv_sec = nTimeoutSec; + xT.tv_usec = nTimeoutUSec; + + if( TEMP_FAILURE_RETRY( + bu_select( iSocket+1, &fdRead, NULL, NULL, &xT )) < 0 ) + { + throw Bu::ServerSocketTcpException( + "Error scanning for new connections: %s", strerror( errno ) + ); + } + + if( FD_ISSET( iSocket, &fdRead ) ) + { + struct sockaddr_in clientname; + socklen_t size; + int nClient; + + size = sizeof( clientname ); +#ifdef WIN32 + nClient = bu_accept( iSocket, (struct sockaddr *)&clientname, &size); +#else /* not-WIN32 */ +#ifdef __CYGWIN__ + nClient = ::accept( iSocket, (struct sockaddr *)&clientname, + (int *)&size + ); +#else /* not-cygwin */ +#ifdef __APPLE__ + nClient = ::accept( iSocket, (struct sockaddr *)&clientname, (socklen_t*)&size ); +#else /* linux */ + nClient = ::accept( iSocket, (struct sockaddr *)&clientname, &size ); +#endif /* __APPLE__ */ +#endif /* __CYGWIN__ */ +#endif /* WIN32 */ + if( nClient < 0 ) + { + throw Bu::ServerSocketTcpException( + "Error accepting a new connection: %s", strerror( errno ) + ); + } + +#ifndef WIN32 + char tmpa[20]; + inet_ntop( AF_INET, (void *)&clientname.sin_addr, tmpa, 20 ); + //"New connection from host %s, port %hd.", + // tmpa, ntohs (clientname.sin_port) ); +#endif + + { +#ifndef WIN32 + int flags; + flags = fcntl( nClient, F_GETFL, 0 ); + flags |= O_NONBLOCK; + if( fcntl( nClient, F_SETFL, flags ) < 0) + { + throw Bu::ServerSocketTcpException( + "Error setting option on client socket: %s", + strerror( errno ) + ); + } +#else + //------------------------- + // Set the socket I/O mode: In this case FIONBIO + // enables or disables the blocking mode for the + // socket based on the numerical value of iMode. + // If iMode = 0, blocking is enabled; + // If iMode != 0, non-blocking mode is enabled. + u_long iMode = 1; + bu_ioctlsocket(nClient, FIONBIO, &iMode); +#endif + } + + return new SocketTcp( nClient ); + } + + return NULL; +} + +bool Bu::ServerSocketTcp::getFd( int &rFdOut ) const +{ + rFdOut = iSocket; + return true; +} + diff --git a/src/stable/serversockettcp.h b/src/stable/serversockettcp.h new file mode 100644 index 0000000..8e43c76 --- /dev/null +++ b/src/stable/serversockettcp.h @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2007-2019 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#ifndef BU_SERVER_SOCKET_TCP_H +#define BU_SERVER_SOCKET_TCP_H + +#include +#include "bu/string.h" +#include "bu/exceptionbase.h" +#include "bu/serversocket.h" + +#ifdef WIN32 + #include +#else + #include +#endif + +namespace Bu +{ + subExceptionDecl( ServerSocketTcpException ); + + /** + * A single tcp/ip server socket. When created the server socket will bind + * to the specified interface and port, and immediately begin listening for + * connections. When connections come in they are pooled by the networking + * drivers in the kernel until they are accepted, this means that failure + * to keep space in the connection pool will result in connection refusals. + * + * Although the accept function returns an integral file descriptor, it is + * designed to be used with the Socket class. + * + *@ingroup Serving + */ + class ServerSocketTcp : public ServerSocket + { + public: +#ifdef WIN32 + typedef unsigned int socket_t; +#else + typedef int socket_t; +#endif + ServerSocketTcp( int iPort, int nPoolSize=40 ); + ServerSocketTcp( const String &sAddr, int iPort, int nPoolSize=40 ); + ServerSocketTcp( socket_t nSocket, bool bInit, int nPoolSize=40 ); + ServerSocketTcp( const ServerSocketTcp &rSrc ); + virtual ~ServerSocketTcp(); + + virtual Bu::Socket *accept( int nTimeoutSec=0, int nTimeoutUSec=0 ); + virtual bool getFd( int &rFdOut ) const; + + private: + void startServer( struct sockaddr_in &name, int nPoolSize ); + void initServer( struct sockaddr_in &name, int nPoolSize ); + + fd_set fdActive; + socket_t iSocket; + int iPort; + }; +} + +#endif diff --git a/src/stable/socket.cpp b/src/stable/socket.cpp new file mode 100644 index 0000000..95c8773 --- /dev/null +++ b/src/stable/socket.cpp @@ -0,0 +1,10 @@ +#include "bu/socket.h" + +Bu::Socket::Socket() +{ +} + +Bu::Socket::~Socket() +{ +} + diff --git a/src/stable/socket.h b/src/stable/socket.h new file mode 100644 index 0000000..360b3aa --- /dev/null +++ b/src/stable/socket.h @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2007-2019 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#ifndef BU_SOCKET_H +#define BU_SOCKET_H + +#include + +#include "bu/config.h" +#include "bu/stream.h" + +namespace Bu +{ + /** + * + *@ingroup Serving + *@ingroup Streams + */ + class Socket : public Stream + { + public: + Socket(); + virtual ~Socket(); + + virtual bool getFd( int &rFdOut ) const=0; + }; +} + +#endif diff --git a/src/stable/sockettcp.cpp b/src/stable/sockettcp.cpp new file mode 100644 index 0000000..d61f92f --- /dev/null +++ b/src/stable/sockettcp.cpp @@ -0,0 +1,488 @@ +/* + * Copyright (C) 2007-2019 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "bu/sockettcp.h" + +#include "bu/config.h" + +#ifndef WIN32 + #include + #include + #include + #include +#else + #include +#endif + +#define RBS (1024*2) + +namespace Bu { subExceptionDef( SocketTcpException ) } + +Bu::SocketTcp::SocketTcp( handle nSocketTcp ) : + nSocketTcp( nSocketTcp ), + bActive( true ), + bBlocking( true ) +{ +#ifdef WIN32 + Bu::Winsock2::getInstance(); +#endif + setAddress(); +} + +Bu::SocketTcp::SocketTcp( const Bu::String &sAddr, int nPort, int nTimeout, + bool bBlocking ) : + nSocketTcp( 0 ), + bActive( false ), + bBlocking( true ) +{ +#ifdef WIN32 + Bu::Winsock2::getInstance(); +#endif + + /* Create the socket. */ + nSocketTcp = bu_socket( PF_INET, SOCK_STREAM, 0 ); + +#ifdef WIN32 + if( nSocketTcp == INVALID_SOCKET ) +#else + if( nSocketTcp < 0 ) +#endif + { + throw ExceptionBase("Couldn't create socket.\n"); + } + + setBlocking( false ); + + /* Connect to the server. */ + //printf("Resolving hostname (%s)...\n", sAddr ); + { + struct addrinfo *pAddr = NULL; + struct addrinfo aiHints; + memset( &aiHints, 0, sizeof(addrinfo) ); + aiHints.ai_flags = AI_CANONNAME; + aiHints.ai_family = AF_INET; + aiHints.ai_socktype = SOCK_STREAM; + char ibuf[10]; + sprintf( ibuf, "%d", nPort ); + + int ret; + if( (ret = bu_getaddrinfo( + sAddr.getStr(), ibuf, &aiHints, &pAddr )) != 0 ) + { + close(); + throw Bu::SocketTcpException("Couldn't resolve hostname %s (%s).\n", + sAddr.getStr(), bu_gai_strerror(ret)); + } + + bu_connect( + nSocketTcp, + pAddr->ai_addr, + pAddr->ai_addrlen + ); + + sAddress = pAddr->ai_canonname; + + bu_freeaddrinfo( pAddr ); + } + + bActive = true; + + if( nTimeout > 0 ) + { + fd_set rfds, wfds, efds; + int retval; + + FD_ZERO(&rfds); + FD_SET(nSocketTcp, &rfds); + FD_ZERO(&wfds); + FD_SET(nSocketTcp, &wfds); + FD_ZERO(&efds); + FD_SET(nSocketTcp, &efds); + + struct timeval tv; + tv.tv_sec = nTimeout; + tv.tv_usec = 0; + + retval = bu_select( nSocketTcp+1, &rfds, &wfds, &efds, &tv ); + + if( retval == 0 ) + { + close(); + throw ExceptionBase("Connection timeout.\n"); + } + read( NULL, 0 ); // See if we can get any errors out of the way early. + } + + if( bBlocking ) + setBlocking( bBlocking ); +} + +Bu::SocketTcp::~SocketTcp() +{ + close(); +} + +void Bu::SocketTcp::close() +{ + if( bActive ) + { +#ifndef WIN32 + fsync( nSocketTcp ); +#endif +#ifdef WIN32 + #ifndef SHUT_RDWR + #define SHUT_RDWR (SD_BOTH) + #endif +#endif + bu_shutdown( nSocketTcp, SHUT_RDWR ); + ::close( nSocketTcp ); + } + bActive = false; +} + +Bu::size Bu::SocketTcp::read( void *pBuf, Bu::size nBytes ) +{ + fd_set rfds; + FD_ZERO(&rfds); + FD_SET(nSocketTcp, &rfds); + struct timeval tv = {0, 0}; + if( bu_select( nSocketTcp+1, &rfds, NULL, NULL, &tv ) < 0 ) + { + int iErr = errno; + close(); + throw SocketTcpException( SocketTcpException::cRead, strerror(iErr) ); + } + if( FD_ISSET( nSocketTcp, &rfds ) || bBlocking ) + { + int nRead = TEMP_FAILURE_RETRY( + bu_recv( nSocketTcp, (char *) pBuf, nBytes, 0 ) ); + if( nRead == 0 && nBytes > 0 ) + { + close(); + throw SocketTcpException( SocketTcpException::cClosed, "SocketTcp closed."); + } + if( nRead < 0 ) + { +#ifdef WIN32 + int iWSAError = bu_WSAGetLastError(); + if( iWSAError == WSAEWOULDBLOCK ) + return 0; +#else + if( errno == ENETRESET || errno == ECONNRESET ) + { + close(); + throw SocketTcpException( SocketTcpException::cClosed, + strerror(errno) ); + } + if( errno == EAGAIN ) + return 0; + int iErr = errno; + close(); + throw SocketTcpException( SocketTcpException::cRead, strerror(iErr) ); +#endif + } + return nRead; + } + return 0; +} + +Bu::size Bu::SocketTcp::read( void *pBuf, Bu::size nBytes, + uint32_t nSec, uint32_t nUSec ) +{ + struct timeval tv; + Bu::size nRead = 0; + + fd_set rfds; + FD_ZERO(&rfds); + FD_SET(nSocketTcp, &rfds); + +#ifdef WIN32 + DWORD dwStart = GetTickCount(); + uint64_t uOver = dwStart + ((nUSec / 1000) * (nSec * 1000)); + DWORD dwEnd = uOver>4294967295U?uOver-4294967295U:uOver; +#else + struct timeval nt, ct; + gettimeofday( &nt, NULL ); + nt.tv_sec += nSec; + nt.tv_usec += nUSec; +#endif + + for(;;) + { + tv.tv_sec = nSec; + tv.tv_usec = nUSec; + bu_select( nSocketTcp+1, &rfds, NULL, NULL, &tv ); + nRead += read( ((char *)pBuf)+nRead, nBytes-nRead ); + if( nRead >= nBytes ) + break; +#ifdef WIN32 + DWORD dwNow = GetTickCount(); + if( dwNow > dwEnd ) + break; +#else + gettimeofday( &ct, NULL ); + if( (ct.tv_sec > nt.tv_sec) || + (ct.tv_sec == nt.tv_sec && + ct.tv_usec >= nt.tv_usec) ) + break; +#endif + } + return nRead; +} + +Bu::size Bu::SocketTcp::write( const void *pBuf, Bu::size nBytes ) +{ +//#ifdef WIN32 + int nWrote = TEMP_FAILURE_RETRY( + bu_send( nSocketTcp, (const char *) pBuf, nBytes, 0 ) ); +//#else +// int nWrote = TEMP_FAILURE_RETRY( ::write( nSocketTcp, pBuf, nBytes ) ); +//#endif + if( nWrote < 0 ) + { +#ifdef WIN32 + int iWSAError = bu_WSAGetLastError(); + if( iWSAError == WSAEWOULDBLOCK ) + return 0; +#else + if( errno == EAGAIN ) return 0; +#endif + throw SocketTcpException( SocketTcpException::cWrite, strerror(errno) ); + } + return nWrote; +} + +Bu::size Bu::SocketTcp::write( const void *pBuf, Bu::size nBytes, uint32_t nSec, uint32_t nUSec ) +{ + struct timeval tv; + Bu::size nWrote = 0; + + fd_set wfds; + FD_ZERO(&wfds); + FD_SET(nSocketTcp, &wfds); + +#ifdef WIN32 + DWORD dwStart = GetTickCount(); + uint64_t uOver = dwStart + ((nUSec / 1000) * (nSec * 1000)); + DWORD dwEnd = uOver>4294967295U?uOver-4294967295U:uOver; +#else + struct timeval nt, ct; + gettimeofday( &nt, NULL ); + nt.tv_sec += nSec; + nt.tv_usec += nUSec; +#endif + + for(;;) + { + tv.tv_sec = nSec; + tv.tv_usec = nUSec; + bu_select( nSocketTcp+1, NULL, &wfds, NULL, &tv ); + nWrote += write( ((char *)pBuf)+nWrote, nBytes-nWrote ); + if( nWrote >= nBytes ) + break; +#ifdef WIN32 + DWORD dwNow = GetTickCount(); + if( dwNow > dwEnd ) + break; +#else + gettimeofday( &ct, NULL ); + if( (ct.tv_sec > nt.tv_sec) || + (ct.tv_sec == nt.tv_sec && + ct.tv_usec >= nt.tv_usec) ) + break; +#endif + } + return nWrote; +} + +Bu::size Bu::SocketTcp::tell() +{ + throw UnsupportedException(); +} + +void Bu::SocketTcp::seek( Bu::size ) +{ + throw UnsupportedException(); +} + +void Bu::SocketTcp::setPos( Bu::size ) +{ + throw UnsupportedException(); +} + +void Bu::SocketTcp::setPosEnd( Bu::size ) +{ + throw UnsupportedException(); +} + +bool Bu::SocketTcp::isEos() +{ + return !bActive; +} + +bool Bu::SocketTcp::canRead() +{ + fd_set rfds; + FD_ZERO(&rfds); + FD_SET(nSocketTcp, &rfds); + struct timeval tv = { 0, 0 }; + int retval = bu_select( nSocketTcp+1, &rfds, NULL, NULL, &tv ); + if( retval == -1 ) + throw SocketTcpException( + SocketTcpException::cBadRead, + "Bad Read error" + ); + + if( !FD_ISSET( nSocketTcp, &rfds ) ) + return false; + return true; +} + +bool Bu::SocketTcp::canWrite() +{ + fd_set wfds; + FD_ZERO(&wfds); + FD_SET(nSocketTcp, &wfds); + struct timeval tv = { 0, 0 }; + int retval = bu_select( nSocketTcp+1, NULL, &wfds, NULL, &tv ); + if( retval == -1 ) + throw SocketTcpException( + SocketTcpException::cBadRead, + "Bad Read error" + ); + if( !FD_ISSET( nSocketTcp, &wfds ) ) + return false; + return true; +} + +bool Bu::SocketTcp::isReadable() +{ + return true; +} + +bool Bu::SocketTcp::isWritable() +{ + return true; +} + +bool Bu::SocketTcp::isSeekable() +{ + return false; +} + +bool Bu::SocketTcp::isBlocking() +{ +#ifndef WIN32 + return ((fcntl( nSocketTcp, F_GETFL, 0 ) & O_NONBLOCK) != O_NONBLOCK); +#else + return false; +#endif +} + +void Bu::SocketTcp::setBlocking( bool bBlocking ) +{ + this->bBlocking = bBlocking; +#ifndef WIN32 + if( bBlocking ) + { + fcntl( nSocketTcp, F_SETFL, fcntl( nSocketTcp, F_GETFL, 0 ) & (~O_NONBLOCK) ); + } + else + { + fcntl( nSocketTcp, F_SETFL, fcntl( nSocketTcp, F_GETFL, 0 ) | O_NONBLOCK ); + } +#else + u_long iMode; + if( bBlocking ) + iMode = 0; + else + iMode = 1; + //------------------------- + // Set the socket I/O mode: In this case FIONBIO + // enables or disables the blocking mode for the + // socket based on the numerical value of iMode. + // If iMode = 0, blocking is enabled; + // If iMode != 0, non-blocking mode is enabled. + bu_ioctlsocket(nSocketTcp, FIONBIO, &iMode); +#endif +} + +void Bu::SocketTcp::setSize( Bu::size ) +{ +} + +void Bu::SocketTcp::flush() +{ +} + +bool Bu::SocketTcp::isOpen() +{ + return bActive; +} + +void Bu::SocketTcp::setAddress() +{ + struct sockaddr_in addr; + socklen_t len = sizeof(addr); + addr.sin_family = AF_INET; + bu_getpeername( nSocketTcp, (sockaddr *)(&addr), &len ); + sAddress = bu_inet_ntoa( addr.sin_addr ); +} + +Bu::String Bu::SocketTcp::getAddress() const +{ + return sAddress; +} + +Bu::SocketTcp::operator Bu::SocketTcp::handle() const +{ + return nSocketTcp; +} + +Bu::SocketTcp::handle Bu::SocketTcp::getHandle() const +{ + return nSocketTcp; +} + +Bu::SocketTcp::handle Bu::SocketTcp::takeHandle() +{ + handle nRet = nSocketTcp; + bActive = false; + nSocketTcp = 0; + return nRet; +} + +Bu::size Bu::SocketTcp::getSize() const +{ + throw UnsupportedException(); +} + +Bu::size Bu::SocketTcp::getBlockSize() const +{ + return 1500; //TODO: Fix this, it's stupid. +} + +Bu::String Bu::SocketTcp::getLocation() const +{ + return getAddress(); +} + +bool Bu::SocketTcp::getFd( int &rFdOut ) const +{ + rFdOut = nSocketTcp; + return true; +} + diff --git a/src/stable/sockettcp.h b/src/stable/sockettcp.h new file mode 100644 index 0000000..3fc14ef --- /dev/null +++ b/src/stable/sockettcp.h @@ -0,0 +1,128 @@ +/* + * Copyright (C) 2007-2019 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#ifndef BU_SOCKET_TCP_H +#define BU_SOCKET_TCP_H + +#include + +#include "bu/config.h" +#include "bu/socket.h" +#include "bu/string.h" +#include "bu/exceptionbase.h" + +namespace Bu +{ + subExceptionDeclBegin( SocketTcpException ); + enum { + cRead, + cWrite, + cBadRead, + cClosed, + cTimeout + }; + subExceptionDeclEnd(); + + /** + * Network socket stream class. This class provides a mechanism for + * communicating over a network using TCP/IP. It will provide other low + * level protocol and addressing support later on, but for now it's just + * standard STREAM TCP/IP sockets. + * + * Unlike system sockets, these sockets are opened by default in + * non-blocking mode, you can specify your own timeout for opening a socket, + * and a number of non-fatal error messages have been automatically handled + * and treated as standard no-data-available-yet situations on read. + * + * Please note that there is a condition that will occur eventually (at + * least on *nix systems) that will trigger a SIGPIPE condition. This + * will terminate your program immediately unless handled properly. Most + * people doing any connections with SocketTcp will want to put this in + * their program somewhere before they use it: + *@code + #include + ... + ... + ... + sigset( SIGPIPE, SIG_IGN ); // do this before you use a Bu::SocketTcp + @endcode + * When this is done, Bu::SocketTcp will simply throw a broken pipe + * exception just like every other error condition, allowing your program + * to handle it sanely. + * + *@ingroup Serving + *@ingroup Streams + */ + class SocketTcp : public Socket + { + public: +#ifdef WIN32 + typedef unsigned int handle; +#else + typedef int handle; +#endif + + SocketTcp( handle nSocketTcp ); + SocketTcp( const String &sAddr, int nPort, int nTimeout=30, + bool bBlocking=true ); + virtual ~SocketTcp(); + + virtual void close(); + virtual size read( void *pBuf, size nBytes ); + virtual size read( void *pBuf, size nBytes, + uint32_t nSec, uint32_t nUSec=0 ); + virtual size write( const void *pBuf, size nBytes ); + virtual size write( const void *pBuf, size nBytes, + uint32_t nSec, uint32_t nUSec=0 ); + using Stream::write; + + virtual size tell(); + virtual void seek( size offset ); + virtual void setPos( size pos ); + virtual void setPosEnd( size pos ); + virtual bool isEos(); + virtual bool isOpen(); + + virtual void flush(); + + virtual bool canRead(); + virtual bool canWrite(); + + virtual bool isReadable(); + virtual bool isWritable(); + virtual bool isSeekable(); + + virtual bool isBlocking(); + virtual void setBlocking( bool bBlocking=true ); + + virtual void setSize( size iSize ); + + Bu::String getAddress() const; + operator handle() const; + + handle getHandle() const; + handle takeHandle(); + + virtual size getSize() const; + virtual size getBlockSize() const; + virtual Bu::String getLocation() const; + + virtual bool getFd( int &rFdOut ) const; + + private: + void setAddress(); + + handle nSocketTcp; + + bool bActive; + bool bBlocking; + String sReadBuf; + String sAddress; + }; +} + +#endif diff --git a/src/stable/synchroqueue.h b/src/stable/synchroqueue.h index 1c39c2c..b10ec33 100644 --- a/src/stable/synchroqueue.h +++ b/src/stable/synchroqueue.h @@ -44,7 +44,8 @@ namespace Bu SynchroQueue() : pStart( NULL ), pEnd( NULL ), - nSize( 0 ) + nSize( 0 ), + bRunning( true ) { } @@ -76,6 +77,14 @@ namespace Bu */ void enqueue( T pData ) { + mRunning.lock(); + if( !bRunning ) + { + mRunning.unlock(); + throw Bu::ExceptionBase("SynchoQueue is stopped."); + } + mRunning.unlock(); + cBlock.lock(); if( pStart == NULL ) @@ -119,6 +128,14 @@ namespace Bu */ T dequeue( bool bBlock=false ) { + mRunning.lock(); + if( !bRunning ) + { + mRunning.unlock(); + return T(); + } + mRunning.unlock(); + cBlock.lock(); if( pStart == NULL ) { @@ -129,8 +146,18 @@ namespace Bu if( pStart == NULL ) { cBlock.unlock(); - return NULL; + return T(); } + + mRunning.lock(); + if( !bRunning ) + { + mRunning.unlock(); + cBlock.unlock(); + return T(); + } + mRunning.unlock(); + T pTmp = pStart->pData; Item *pDel = pStart; pStart = pStart->pNext; @@ -143,7 +170,7 @@ namespace Bu } cBlock.unlock(); - return NULL; + return T(); } else { @@ -171,6 +198,14 @@ namespace Bu */ T dequeue( int nSec, int nUSec ) { + mRunning.lock(); + if( !bRunning ) + { + mRunning.unlock(); + return T(); + } + mRunning.unlock(); + cBlock.lock(); if( pStart == NULL ) { @@ -179,8 +214,17 @@ namespace Bu if( pStart == NULL ) { cBlock.unlock(); - return NULL; + return T(); + } + + mRunning.lock(); + if( !bRunning ) + { + mRunning.unlock(); + cBlock.unlock(); + return T(); } + mRunning.unlock(); T pTmp = pStart->pData; Item *pDel = pStart; @@ -203,6 +247,35 @@ namespace Bu return pTmp; } } + + T drain() + { + mRunning.lock(); + if( bRunning ) + { + mRunning.unlock(); + return NULL; + } + mRunning.unlock(); + + cBlock.lock(); + if( pStart == NULL ) + { + cBlock.unlock(); + return T(); + } + else + { + T pTmp = pStart->pData; + Item *pDel = pStart; + pStart = pStart->pNext; + delete pDel; + nSize--; + + cBlock.unlock(); + return pTmp; + } + } /** * Checks to see if the queue has data in it or not. Note that there @@ -235,12 +308,31 @@ namespace Bu cBlock.unlock(); } + void stop() + { + mRunning.lock(); + bRunning = false; + mRunning.unlock(); + unblockAll(); + } + + bool isRunning() const + { + bool bRet; + mRunning.lock(); + bRet = bRunning; + mRunning.unlock(); + return bRet; + } + private: Item *pStart; /**< The start of the queue, the next element to dequeue. */ Item *pEnd; /**< The end of the queue, the last element to dequeue. */ long nSize; /**< The number of items in the queue. */ Condition cBlock; /**< The condition for blocking dequeues. */ + mutable Mutex mRunning; + bool bRunning; }; } diff --git a/src/stable/tcpserversocket.cpp b/src/stable/tcpserversocket.cpp deleted file mode 100644 index b1e3461..0000000 --- a/src/stable/tcpserversocket.cpp +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Copyright (C) 2007-2019 Xagasoft, All rights reserved. - * - * This file is part of the libbu++ library and is released under the - * terms of the license contained in the file LICENSE. - */ - -#include "bu/config.h" - -#ifndef WIN32 - #include - #include - #include - #include -#endif - -#include -#include -#include -#include -#include -#include -#include -//#include -#include -#include "bu/tcpserversocket.h" - -namespace Bu { subExceptionDef( TcpServerSocketException ) } - -Bu::TcpServerSocket::TcpServerSocket( int nPort, int nPoolSize ) : - nPort( nPort ) -{ -#ifdef WIN32 - Bu::Winsock2::getInstance(); -#endif - - /* Create the socket and set it up to accept connections. */ - struct sockaddr_in name; - - /* Give the socket a name. */ - name.sin_family = AF_INET; - name.sin_port = bu_htons( nPort ); - - // I think this specifies who we will accept connections from, - // a good thing to make configurable later on - name.sin_addr.s_addr = bu_htonl( INADDR_ANY ); - - startServer( name, nPoolSize ); -} - -Bu::TcpServerSocket::TcpServerSocket(const String &sAddr,int nPort, int nPoolSize) : - nPort( nPort ) -{ -#ifdef WIN32 - Bu::Winsock2::getInstance(); -#endif - - /* Create the socket and set it up to accept connections. */ - struct sockaddr_in name; - - /* Give the socket a name. */ - name.sin_family = AF_INET; - - name.sin_port = bu_htons( nPort ); - -#ifdef WIN32 - name.sin_addr.s_addr = bu_inet_addr( sAddr.getStr() ); -#else - inet_aton( sAddr.getStr(), &name.sin_addr ); -#endif - - startServer( name, nPoolSize ); -} - -Bu::TcpServerSocket::TcpServerSocket( socket_t nServer, bool bInit, int nPoolSize ) : - nServer( nServer ), - nPort( 0 ) -{ -#ifdef WIN32 - Bu::Winsock2::getInstance(); -#endif - - if( bInit ) - { - struct sockaddr name; - socklen_t namelen = sizeof(name); - getpeername( nServer, &name, &namelen ); - - initServer( *((sockaddr_in *)&name), nPoolSize ); - } - else - { - FD_ZERO( &fdActive ); - FD_SET( nServer, &fdActive ); - } -} - -Bu::TcpServerSocket::TcpServerSocket( const TcpServerSocket &rSrc ) -{ -#ifdef WIN32 - Bu::Winsock2::getInstance(); -#endif - - nServer = dup( rSrc.nServer ); - nPort = rSrc.nPort; - FD_ZERO( &fdActive ); - FD_SET( nServer, &fdActive ); -} - -Bu::TcpServerSocket::~TcpServerSocket() -{ -#ifdef WIN32 - if( nServer != INVALID_SOCKET ) -#else - if( nServer > -1 ) -#endif - ::close( nServer ); -} - -void Bu::TcpServerSocket::startServer( struct sockaddr_in &name, int nPoolSize ) -{ - /* Create the socket. */ - nServer = bu_socket( PF_INET, SOCK_STREAM, 0 ); - -#ifdef WIN32 - if( nServer == INVALID_SOCKET ) -#else - if( nServer < 0 ) -#endif - { - throw Bu::TcpServerSocketException("Couldn't create a listen socket."); - } - - int opt = 1; - bu_setsockopt( - nServer, - SOL_SOCKET, - SO_REUSEADDR, - (char *)&opt, - sizeof( opt ) - ); - - initServer( name, nPoolSize ); -} - -void Bu::TcpServerSocket::initServer( struct sockaddr_in &name, int nPoolSize ) -{ - if( bu_bind( nServer, (struct sockaddr *) &name, sizeof(name) ) < 0 ) - { - throw Bu::TcpServerSocketException("Couldn't bind to the listen socket."); - } - - if( bu_listen( nServer, nPoolSize ) < 0 ) - { - throw Bu::TcpServerSocketException( - "Couldn't begin listening to the server socket." - ); - } - - FD_ZERO( &fdActive ); - /* Initialize the set of active sockets. */ - FD_SET( nServer, &fdActive ); -} - -int Bu::TcpServerSocket::getSocket() -{ - return nServer; -} - -int Bu::TcpServerSocket::accept( int nTimeoutSec, int nTimeoutUSec ) -{ - fd_set fdRead = fdActive; - - struct timeval xT; - - xT.tv_sec = nTimeoutSec; - xT.tv_usec = nTimeoutUSec; - - if( TEMP_FAILURE_RETRY( - bu_select( nServer+1, &fdRead, NULL, NULL, &xT )) < 0 ) - { - throw Bu::TcpServerSocketException( - "Error scanning for new connections: %s", strerror( errno ) - ); - } - - if( FD_ISSET( nServer, &fdRead ) ) - { - struct sockaddr_in clientname; - socklen_t size; - int nClient; - - size = sizeof( clientname ); -#ifdef WIN32 - nClient = bu_accept( nServer, (struct sockaddr *)&clientname, &size); -#else /* not-WIN32 */ -#ifdef __CYGWIN__ - nClient = ::accept( nServer, (struct sockaddr *)&clientname, - (int *)&size - ); -#else /* not-cygwin */ -#ifdef __APPLE__ - nClient = ::accept( nServer, (struct sockaddr *)&clientname, (socklen_t*)&size ); -#else /* linux */ - nClient = ::accept( nServer, (struct sockaddr *)&clientname, &size ); -#endif /* __APPLE__ */ -#endif /* __CYGWIN__ */ -#endif /* WIN32 */ - if( nClient < 0 ) - { - throw Bu::TcpServerSocketException( - "Error accepting a new connection: %s", strerror( errno ) - ); - } - -#ifndef WIN32 - char tmpa[20]; - inet_ntop( AF_INET, (void *)&clientname.sin_addr, tmpa, 20 ); - //"New connection from host %s, port %hd.", - // tmpa, ntohs (clientname.sin_port) ); -#endif - - { -#ifndef WIN32 - int flags; - flags = fcntl( nClient, F_GETFL, 0 ); - flags |= O_NONBLOCK; - if( fcntl( nClient, F_SETFL, flags ) < 0) - { - throw Bu::TcpServerSocketException( - "Error setting option on client socket: %s", - strerror( errno ) - ); - } -#else - //------------------------- - // Set the socket I/O mode: In this case FIONBIO - // enables or disables the blocking mode for the - // socket based on the numerical value of iMode. - // If iMode = 0, blocking is enabled; - // If iMode != 0, non-blocking mode is enabled. - u_long iMode = 1; - bu_ioctlsocket(nClient, FIONBIO, &iMode); -#endif - } - - return nClient; - } - - return -1; -} - -int Bu::TcpServerSocket::getPort() -{ - return nPort; -} - diff --git a/src/stable/tcpserversocket.h b/src/stable/tcpserversocket.h deleted file mode 100644 index d15d7bd..0000000 --- a/src/stable/tcpserversocket.h +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (C) 2007-2019 Xagasoft, All rights reserved. - * - * This file is part of the libbu++ library and is released under the - * terms of the license contained in the file LICENSE. - */ - -#ifndef BU_TCP_SERVER_SOCKET_H -#define BU_TCP_SERVER_SOCKET_H - -#include -#include "bu/string.h" -#include "bu/exceptionbase.h" - -#ifdef WIN32 - #include -#else - #include -#endif - -namespace Bu -{ - subExceptionDecl( TcpServerSocketException ); - - /** - * A single tcp/ip server socket. When created the server socket will bind - * to the specified interface and port, and immediately begin listening for - * connections. When connections come in they are pooled by the networking - * drivers in the kernel until they are accepted, this means that failure - * to keep space in the connection pool will result in connection refusals. - * - * Although the accept function returns an integral file descriptor, it is - * designed to be used with the Socket class. - * - *@ingroup Serving - */ - class TcpServerSocket - { - public: -#ifdef WIN32 - typedef unsigned int socket_t; -#else - typedef int socket_t; -#endif - TcpServerSocket( int nPort, int nPoolSize=40 ); - TcpServerSocket( const String &sAddr, int nPort, int nPoolSize=40 ); - TcpServerSocket( socket_t nSocket, bool bInit, int nPoolSize=40 ); - TcpServerSocket( const TcpServerSocket &rSrc ); - virtual ~TcpServerSocket(); - - int accept( int nTimeoutSec=0, int nTimeoutUSec=0 ); - int getSocket(); - int getPort(); - - private: - void startServer( struct sockaddr_in &name, int nPoolSize ); - void initServer( struct sockaddr_in &name, int nPoolSize ); - - fd_set fdActive; - socket_t nServer; - int nPort; - }; -} - -#endif diff --git a/src/stable/tcpsocket.cpp b/src/stable/tcpsocket.cpp deleted file mode 100644 index d036063..0000000 --- a/src/stable/tcpsocket.cpp +++ /dev/null @@ -1,482 +0,0 @@ -/* - * Copyright (C) 2007-2019 Xagasoft, All rights reserved. - * - * This file is part of the libbu++ library and is released under the - * terms of the license contained in the file LICENSE. - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "bu/tcpsocket.h" - -#include "bu/config.h" - -#ifndef WIN32 - #include - #include - #include - #include -#else - #include -#endif - -#define RBS (1024*2) - -namespace Bu { subExceptionDef( TcpSocketException ) } - -Bu::TcpSocket::TcpSocket( handle nTcpSocket ) : - nTcpSocket( nTcpSocket ), - bActive( true ), - bBlocking( true ) -{ -#ifdef WIN32 - Bu::Winsock2::getInstance(); -#endif - setAddress(); -} - -Bu::TcpSocket::TcpSocket( const Bu::String &sAddr, int nPort, int nTimeout, - bool bBlocking ) : - nTcpSocket( 0 ), - bActive( false ), - bBlocking( true ) -{ -#ifdef WIN32 - Bu::Winsock2::getInstance(); -#endif - - /* Create the socket. */ - nTcpSocket = bu_socket( PF_INET, SOCK_STREAM, 0 ); - -#ifdef WIN32 - if( nTcpSocket == INVALID_SOCKET ) -#else - if( nTcpSocket < 0 ) -#endif - { - throw ExceptionBase("Couldn't create socket.\n"); - } - - setBlocking( false ); - - /* Connect to the server. */ - //printf("Resolving hostname (%s)...\n", sAddr ); - { - struct addrinfo *pAddr = NULL; - struct addrinfo aiHints; - memset( &aiHints, 0, sizeof(addrinfo) ); - aiHints.ai_flags = AI_CANONNAME; - aiHints.ai_family = AF_INET; - aiHints.ai_socktype = SOCK_STREAM; - char ibuf[10]; - sprintf( ibuf, "%d", nPort ); - - int ret; - if( (ret = bu_getaddrinfo( - sAddr.getStr(), ibuf, &aiHints, &pAddr )) != 0 ) - { - close(); - throw Bu::TcpSocketException("Couldn't resolve hostname %s (%s).\n", - sAddr.getStr(), bu_gai_strerror(ret)); - } - - bu_connect( - nTcpSocket, - pAddr->ai_addr, - pAddr->ai_addrlen - ); - - sAddress = pAddr->ai_canonname; - - bu_freeaddrinfo( pAddr ); - } - - bActive = true; - - if( nTimeout > 0 ) - { - fd_set rfds, wfds, efds; - int retval; - - FD_ZERO(&rfds); - FD_SET(nTcpSocket, &rfds); - FD_ZERO(&wfds); - FD_SET(nTcpSocket, &wfds); - FD_ZERO(&efds); - FD_SET(nTcpSocket, &efds); - - struct timeval tv; - tv.tv_sec = nTimeout; - tv.tv_usec = 0; - - retval = bu_select( nTcpSocket+1, &rfds, &wfds, &efds, &tv ); - - if( retval == 0 ) - { - close(); - throw ExceptionBase("Connection timeout.\n"); - } - read( NULL, 0 ); // See if we can get any errors out of the way early. - } - - if( bBlocking ) - setBlocking( bBlocking ); -} - -Bu::TcpSocket::~TcpSocket() -{ - close(); -} - -void Bu::TcpSocket::close() -{ - if( bActive ) - { -#ifndef WIN32 - fsync( nTcpSocket ); -#endif -#ifdef WIN32 - #ifndef SHUT_RDWR - #define SHUT_RDWR (SD_BOTH) - #endif -#endif - bu_shutdown( nTcpSocket, SHUT_RDWR ); - ::close( nTcpSocket ); - } - bActive = false; -} - -Bu::size Bu::TcpSocket::read( void *pBuf, Bu::size nBytes ) -{ - fd_set rfds; - FD_ZERO(&rfds); - FD_SET(nTcpSocket, &rfds); - struct timeval tv = {0, 0}; - if( bu_select( nTcpSocket+1, &rfds, NULL, NULL, &tv ) < 0 ) - { - int iErr = errno; - close(); - throw TcpSocketException( TcpSocketException::cRead, strerror(iErr) ); - } - if( FD_ISSET( nTcpSocket, &rfds ) || bBlocking ) - { - int nRead = TEMP_FAILURE_RETRY( - bu_recv( nTcpSocket, (char *) pBuf, nBytes, 0 ) ); - if( nRead == 0 && nBytes > 0 ) - { - close(); - throw TcpSocketException( TcpSocketException::cClosed, "TcpSocket closed."); - } - if( nRead < 0 ) - { -#ifdef WIN32 - int iWSAError = bu_WSAGetLastError(); - if( iWSAError == WSAEWOULDBLOCK ) - return 0; -#else - if( errno == ENETRESET || errno == ECONNRESET ) - { - close(); - throw TcpSocketException( TcpSocketException::cClosed, - strerror(errno) ); - } - if( errno == EAGAIN ) - return 0; - int iErr = errno; - close(); - throw TcpSocketException( TcpSocketException::cRead, strerror(iErr) ); -#endif - } - return nRead; - } - return 0; -} - -Bu::size Bu::TcpSocket::read( void *pBuf, Bu::size nBytes, - uint32_t nSec, uint32_t nUSec ) -{ - struct timeval tv; - Bu::size nRead = 0; - - fd_set rfds; - FD_ZERO(&rfds); - FD_SET(nTcpSocket, &rfds); - -#ifdef WIN32 - DWORD dwStart = GetTickCount(); - uint64_t uOver = dwStart + ((nUSec / 1000) * (nSec * 1000)); - DWORD dwEnd = uOver>4294967295U?uOver-4294967295U:uOver; -#else - struct timeval nt, ct; - gettimeofday( &nt, NULL ); - nt.tv_sec += nSec; - nt.tv_usec += nUSec; -#endif - - for(;;) - { - tv.tv_sec = nSec; - tv.tv_usec = nUSec; - bu_select( nTcpSocket+1, &rfds, NULL, NULL, &tv ); - nRead += read( ((char *)pBuf)+nRead, nBytes-nRead ); - if( nRead >= nBytes ) - break; -#ifdef WIN32 - DWORD dwNow = GetTickCount(); - if( dwNow > dwEnd ) - break; -#else - gettimeofday( &ct, NULL ); - if( (ct.tv_sec > nt.tv_sec) || - (ct.tv_sec == nt.tv_sec && - ct.tv_usec >= nt.tv_usec) ) - break; -#endif - } - return nRead; -} - -Bu::size Bu::TcpSocket::write( const void *pBuf, Bu::size nBytes ) -{ -//#ifdef WIN32 - int nWrote = TEMP_FAILURE_RETRY( - bu_send( nTcpSocket, (const char *) pBuf, nBytes, 0 ) ); -//#else -// int nWrote = TEMP_FAILURE_RETRY( ::write( nTcpSocket, pBuf, nBytes ) ); -//#endif - if( nWrote < 0 ) - { -#ifdef WIN32 - int iWSAError = bu_WSAGetLastError(); - if( iWSAError == WSAEWOULDBLOCK ) - return 0; -#else - if( errno == EAGAIN ) return 0; -#endif - throw TcpSocketException( TcpSocketException::cWrite, strerror(errno) ); - } - return nWrote; -} - -Bu::size Bu::TcpSocket::write( const void *pBuf, Bu::size nBytes, uint32_t nSec, uint32_t nUSec ) -{ - struct timeval tv; - Bu::size nWrote = 0; - - fd_set wfds; - FD_ZERO(&wfds); - FD_SET(nTcpSocket, &wfds); - -#ifdef WIN32 - DWORD dwStart = GetTickCount(); - uint64_t uOver = dwStart + ((nUSec / 1000) * (nSec * 1000)); - DWORD dwEnd = uOver>4294967295U?uOver-4294967295U:uOver; -#else - struct timeval nt, ct; - gettimeofday( &nt, NULL ); - nt.tv_sec += nSec; - nt.tv_usec += nUSec; -#endif - - for(;;) - { - tv.tv_sec = nSec; - tv.tv_usec = nUSec; - bu_select( nTcpSocket+1, NULL, &wfds, NULL, &tv ); - nWrote += write( ((char *)pBuf)+nWrote, nBytes-nWrote ); - if( nWrote >= nBytes ) - break; -#ifdef WIN32 - DWORD dwNow = GetTickCount(); - if( dwNow > dwEnd ) - break; -#else - gettimeofday( &ct, NULL ); - if( (ct.tv_sec > nt.tv_sec) || - (ct.tv_sec == nt.tv_sec && - ct.tv_usec >= nt.tv_usec) ) - break; -#endif - } - return nWrote; -} - -Bu::size Bu::TcpSocket::tell() -{ - throw UnsupportedException(); -} - -void Bu::TcpSocket::seek( Bu::size ) -{ - throw UnsupportedException(); -} - -void Bu::TcpSocket::setPos( Bu::size ) -{ - throw UnsupportedException(); -} - -void Bu::TcpSocket::setPosEnd( Bu::size ) -{ - throw UnsupportedException(); -} - -bool Bu::TcpSocket::isEos() -{ - return !bActive; -} - -bool Bu::TcpSocket::canRead() -{ - fd_set rfds; - FD_ZERO(&rfds); - FD_SET(nTcpSocket, &rfds); - struct timeval tv = { 0, 0 }; - int retval = bu_select( nTcpSocket+1, &rfds, NULL, NULL, &tv ); - if( retval == -1 ) - throw TcpSocketException( - TcpSocketException::cBadRead, - "Bad Read error" - ); - - if( !FD_ISSET( nTcpSocket, &rfds ) ) - return false; - return true; -} - -bool Bu::TcpSocket::canWrite() -{ - fd_set wfds; - FD_ZERO(&wfds); - FD_SET(nTcpSocket, &wfds); - struct timeval tv = { 0, 0 }; - int retval = bu_select( nTcpSocket+1, NULL, &wfds, NULL, &tv ); - if( retval == -1 ) - throw TcpSocketException( - TcpSocketException::cBadRead, - "Bad Read error" - ); - if( !FD_ISSET( nTcpSocket, &wfds ) ) - return false; - return true; -} - -bool Bu::TcpSocket::isReadable() -{ - return true; -} - -bool Bu::TcpSocket::isWritable() -{ - return true; -} - -bool Bu::TcpSocket::isSeekable() -{ - return false; -} - -bool Bu::TcpSocket::isBlocking() -{ -#ifndef WIN32 - return ((fcntl( nTcpSocket, F_GETFL, 0 ) & O_NONBLOCK) != O_NONBLOCK); -#else - return false; -#endif -} - -void Bu::TcpSocket::setBlocking( bool bBlocking ) -{ - this->bBlocking = bBlocking; -#ifndef WIN32 - if( bBlocking ) - { - fcntl( nTcpSocket, F_SETFL, fcntl( nTcpSocket, F_GETFL, 0 ) & (~O_NONBLOCK) ); - } - else - { - fcntl( nTcpSocket, F_SETFL, fcntl( nTcpSocket, F_GETFL, 0 ) | O_NONBLOCK ); - } -#else - u_long iMode; - if( bBlocking ) - iMode = 0; - else - iMode = 1; - //------------------------- - // Set the socket I/O mode: In this case FIONBIO - // enables or disables the blocking mode for the - // socket based on the numerical value of iMode. - // If iMode = 0, blocking is enabled; - // If iMode != 0, non-blocking mode is enabled. - bu_ioctlsocket(nTcpSocket, FIONBIO, &iMode); -#endif -} - -void Bu::TcpSocket::setSize( Bu::size ) -{ -} - -void Bu::TcpSocket::flush() -{ -} - -bool Bu::TcpSocket::isOpen() -{ - return bActive; -} - -void Bu::TcpSocket::setAddress() -{ - struct sockaddr_in addr; - socklen_t len = sizeof(addr); - addr.sin_family = AF_INET; - bu_getpeername( nTcpSocket, (sockaddr *)(&addr), &len ); - sAddress = bu_inet_ntoa( addr.sin_addr ); -} - -Bu::String Bu::TcpSocket::getAddress() const -{ - return sAddress; -} - -Bu::TcpSocket::operator Bu::TcpSocket::handle() const -{ - return nTcpSocket; -} - -Bu::TcpSocket::handle Bu::TcpSocket::getHandle() const -{ - return nTcpSocket; -} - -Bu::TcpSocket::handle Bu::TcpSocket::takeHandle() -{ - handle nRet = nTcpSocket; - bActive = false; - nTcpSocket = 0; - return nRet; -} - -Bu::size Bu::TcpSocket::getSize() const -{ - throw UnsupportedException(); -} - -Bu::size Bu::TcpSocket::getBlockSize() const -{ - return 1500; //TODO: Fix this, it's stupid. -} - -Bu::String Bu::TcpSocket::getLocation() const -{ - return getAddress(); -} - diff --git a/src/stable/tcpsocket.h b/src/stable/tcpsocket.h deleted file mode 100644 index 69cc4fd..0000000 --- a/src/stable/tcpsocket.h +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright (C) 2007-2019 Xagasoft, All rights reserved. - * - * This file is part of the libbu++ library and is released under the - * terms of the license contained in the file LICENSE. - */ - -#ifndef BU_TCP_SOCKET_H -#define BU_TCP_SOCKET_H - -#include - -#include "bu/config.h" -#include "bu/stream.h" -#include "bu/string.h" -#include "bu/exceptionbase.h" - -namespace Bu -{ - subExceptionDeclBegin( TcpSocketException ); - enum { - cRead, - cWrite, - cBadRead, - cClosed, - cTimeout - }; - subExceptionDeclEnd(); - - /** - * Network socket stream class. This class provides a mechanism for - * communicating over a network using TCP/IP. It will provide other low - * level protocol and addressing support later on, but for now it's just - * standard STREAM TCP/IP sockets. - * - * Unlike system sockets, these sockets are opened by default in - * non-blocking mode, you can specify your own timeout for opening a socket, - * and a number of non-fatal error messages have been automatically handled - * and treated as standard no-data-available-yet situations on read. - * - * Please note that there is a condition that will occur eventually (at - * least on *nix systems) that will trigger a SIGPIPE condition. This - * will terminate your program immediately unless handled properly. Most - * people doing any connections with TcpSocket will want to put this in - * their program somewhere before they use it: - *@code - #include - ... - ... - ... - sigset( SIGPIPE, SIG_IGN ); // do this before you use a Bu::TcpSocket - @endcode - * When this is done, Bu::TcpSocket will simply throw a broken pipe - * exception just like every other error condition, allowing your program - * to handle it sanely. - * - *@ingroup Serving - *@ingroup Streams - */ - class TcpSocket : public Stream - { - public: -#ifdef WIN32 - typedef unsigned int handle; -#else - typedef int handle; -#endif - - TcpSocket( handle nTcpSocket ); - TcpSocket( const String &sAddr, int nPort, int nTimeout=30, - bool bBlocking=true ); - virtual ~TcpSocket(); - - virtual void close(); - virtual size read( void *pBuf, size nBytes ); - virtual size read( void *pBuf, size nBytes, - uint32_t nSec, uint32_t nUSec=0 ); - virtual size write( const void *pBuf, size nBytes ); - virtual size write( const void *pBuf, size nBytes, - uint32_t nSec, uint32_t nUSec=0 ); - using Stream::write; - - virtual size tell(); - virtual void seek( size offset ); - virtual void setPos( size pos ); - virtual void setPosEnd( size pos ); - virtual bool isEos(); - virtual bool isOpen(); - - virtual void flush(); - - virtual bool canRead(); - virtual bool canWrite(); - - virtual bool isReadable(); - virtual bool isWritable(); - virtual bool isSeekable(); - - virtual bool isBlocking(); - virtual void setBlocking( bool bBlocking=true ); - - virtual void setSize( size iSize ); - - Bu::String getAddress() const; - operator handle() const; - - handle getHandle() const; - handle takeHandle(); - - virtual size getSize() const; - virtual size getBlockSize() const; - virtual Bu::String getLocation() const; - - private: - void setAddress(); - - handle nTcpSocket; - - bool bActive; - bool bBlocking; - String sReadBuf; - String sAddress; - }; -} - -#endif -- cgit v1.2.3