/* * Copyright (C) 2007-2019 Xagasoft, All rights reserved. * * This file is part of the libbu++ library and is released under the * terms of the license contained in the file LICENSE. */ #include "bu/client.h" #include "bu/tcpsocket.h" #include #include #include "bu/protocol.h" #include "bu/clientlink.h" #include "bu/clientlinkfactory.h" #include "bu/mutexlocker.h" /** Read buffer size. */ #define RBS (2000) // 1500 is the nominal MTU for ethernet, it's a good guess Bu::Client::Client( Bu::TcpSocket *pSocket, class Bu::ClientLinkFactory *pfLink ) : pTopStream( pSocket ), pSocket( pSocket ), pProto( NULL ), bWantsDisconnect( false ), pfLink( pfLink ) { Bu::ReadWriteMutex::WriteLocker lGlobal( mGlobal ); lFilts.prepend( pSocket ); } Bu::Client::~Client() { Bu::ReadWriteMutex::WriteLocker lGlobal( mGlobal ); for( FilterList::iterator i = lFilts.begin(); i; i++ ) { delete *i; } pTopStream = pSocket = NULL; delete pfLink; } void Bu::Client::processInput() { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); mRead.lock(); char buf[RBS]; Bu::size nRead, nTotal=0; for(;;) { try { nRead = pTopStream->read( buf, RBS ); if( nRead == 0 ) { break; } else { nTotal += nRead; qbRead.write( buf, nRead ); if( !pTopStream->canRead() ) break; } } catch( Bu::TcpSocketException &e ) { pTopStream->close(); bWantsDisconnect = true; break; } } mRead.unlock(); if( nTotal == 0 ) { pTopStream->close(); bWantsDisconnect = true; } if( pProto && nTotal ) { pProto->onNewData( this ); } } void Bu::Client::processOutput() { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); mWrite.lock(); char buf[RBS]; if( qbWrite.getSize() > 0 ) { int nAmnt = RBS; nAmnt = qbWrite.peek( buf, nAmnt ); int nReal = pTopStream->write( buf, nAmnt ); qbWrite.seek( nReal ); pTopStream->flush(); } mWrite.unlock(); } void Bu::Client::setProtocol( Protocol *pProto ) { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); this->pProto = pProto; this->pProto->onNewConnection( this ); } Bu::Protocol *Bu::Client::getProtocol() { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); return pProto; } void Bu::Client::clearProtocol() { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); pProto = NULL; } /* Bu::String &Bu::Client::getInput() { return sReadBuf; } Bu::String &Bu::Client::getOutput() { return sWriteBuf; } */ bool Bu::Client::isOpen() { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); if( !pTopStream ) return false; return pTopStream->isOpen(); } Bu::size Bu::Client::write( const Bu::String &sData ) { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbWrite.write( sData.getStr(), sData.getSize() ); } Bu::size Bu::Client::write( const void *pData, Bu::size nBytes ) { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbWrite.write( pData, nBytes ); } Bu::size Bu::Client::write( int8_t nData ) { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbWrite.write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::write( int16_t nData ) { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbWrite.write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::write( int32_t nData ) { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbWrite.write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::write( int64_t nData ) { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbWrite.write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::write( uint8_t nData ) { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbWrite.write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::write( uint16_t nData ) { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbWrite.write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::write( uint32_t nData ) { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbWrite.write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::write( uint64_t nData ) { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbWrite.write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::read( void *pData, Bu::size nBytes ) { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbRead.read( pData, nBytes ); } Bu::size Bu::Client::peek( void *pData, int nBytes, int nOffset ) { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbRead.peek( pData, nBytes, nOffset ); } Bu::size Bu::Client::getInputSize() { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbRead.getSize(); } Bu::size Bu::Client::getOutputSize() { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbWrite.getSize(); } const Bu::TcpSocket *Bu::Client::getSocket() const { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); return pSocket; } void Bu::Client::disconnect() { Bu::ReadWriteMutex::WriteLocker lGlobal( mGlobal ); bWantsDisconnect = true; } bool Bu::Client::wantsDisconnect() { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); return bWantsDisconnect; } void Bu::Client::close() { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); pTopStream->close(); } Bu::ClientLink *Bu::Client::getLink() { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); return pfLink->createLink( this ); } void Bu::Client::onMessage( const Bu::String &sMsg ) { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); if( pProto ) pProto->onMessage( this, sMsg ); } void Bu::Client::tick() { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); if( pProto ) pProto->onTick( this ); } Bu::size Bu::Client::tell() { return 0; } void Bu::Client::seek( Bu::size offset ) { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mRead ); return qbRead.seek( offset ); } void Bu::Client::setPos( Bu::size ) { throw Bu::ExceptionBase(); } void Bu::Client::setPosEnd( Bu::size ) { throw Bu::ExceptionBase(); } bool Bu::Client::isEos() { return true; } void Bu::Client::flush() { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); processOutput(); } bool Bu::Client::canRead() { Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mRead ); return qbRead.getSize() > 0; } bool Bu::Client::canWrite() { return true; } bool Bu::Client::isReadable() { return true; } bool Bu::Client::isWritable() { return true; } bool Bu::Client::isSeekable() { return false; } bool Bu::Client::isBlocking() { return false; } void Bu::Client::setBlocking( bool ) { throw Bu::ExceptionBase(); } void Bu::Client::setSize( Bu::size ) { throw Bu::ExceptionBase(); } Bu::size Bu::Client::getSize() const { return 0; } Bu::size Bu::Client::getBlockSize() const { return pSocket->getBlockSize(); } Bu::String Bu::Client::getLocation() const { return pSocket->getLocation(); }