#include "bu/clientbuf.h" #include "bu/mutexlocker.h" #include "bu/sio.h" Bu::ClientBuf::ClientBuf() : accClientRaw( *this ), accServer( *this ), //accClientFiltered( &accClientRaw ), accClient( *this ) { } Bu::ClientBuf::~ClientBuf() { } Bu::ClientBuf::ClientAccess &Bu::ClientBuf::client() { return accClient; } Bu::ClientBuf::ServerAccess &Bu::ClientBuf::server() { return accServer; } ///////// // ClientAccessRaw /// Bu::ClientBuf::ClientAccessRaw::ClientAccessRaw( Bu::ClientBuf &rBuf ) : rBuf( rBuf ) { } Bu::ClientBuf::ClientAccessRaw::~ClientAccessRaw() { } void Bu::ClientBuf::ClientAccessRaw::close() { // Roughly meaningless } Bu::size Bu::ClientBuf::ClientAccessRaw::read( void *pBuf, size iBytes ) { Bu::MutexLocker l( rBuf.mInput ); return rBuf.qbInput.read( pBuf, iBytes ); } Bu::size Bu::ClientBuf::ClientAccessRaw::write( const void *pBuf, size iBytes ) { Bu::MutexLocker l( rBuf.mOutput ); return rBuf.qbOutput.write( pBuf, iBytes ); } Bu::size Bu::ClientBuf::ClientAccessRaw::tell() { Bu::MutexLocker l( rBuf.mInput ); return rBuf.qbInput.tell(); } void Bu::ClientBuf::ClientAccessRaw::seek( Bu::size offset ) { Bu::MutexLocker l( rBuf.mInput ); rBuf.qbInput.seek( offset ); } void Bu::ClientBuf::ClientAccessRaw::setPos( Bu::size ) { } void Bu::ClientBuf::ClientAccessRaw::setPosEnd( Bu::size ) { } bool Bu::ClientBuf::ClientAccessRaw::isEos() { Bu::MutexLocker l( rBuf.mInput ); return rBuf.qbInput.isEos(); } bool Bu::ClientBuf::ClientAccessRaw::isOpen() { Bu::MutexLocker l( rBuf.mInput ); return rBuf.qbInput.isOpen(); } void Bu::ClientBuf::ClientAccessRaw::flush() { Bu::MutexLocker l( rBuf.mOutput ); return rBuf.qbOutput.flush(); } bool Bu::ClientBuf::ClientAccessRaw::canRead() { Bu::MutexLocker l( rBuf.mInput ); return rBuf.qbInput.canRead(); } bool Bu::ClientBuf::ClientAccessRaw::canWrite() { Bu::MutexLocker l( rBuf.mOutput ); return rBuf.qbOutput.canWrite(); } bool Bu::ClientBuf::ClientAccessRaw::isReadable() { Bu::MutexLocker l( rBuf.mInput ); return rBuf.qbInput.isReadable(); } bool Bu::ClientBuf::ClientAccessRaw::isWritable() { Bu::MutexLocker l( rBuf.mOutput ); return rBuf.qbOutput.isWritable(); } bool Bu::ClientBuf::ClientAccessRaw::isSeekable() { Bu::MutexLocker l( rBuf.mInput ); return rBuf.qbInput.isSeekable(); } bool Bu::ClientBuf::ClientAccessRaw::isBlocking() { return false; } void Bu::ClientBuf::ClientAccessRaw::setBlocking( bool ) { } void Bu::ClientBuf::ClientAccessRaw::setSize( Bu::size ) { return; } Bu::size Bu::ClientBuf::ClientAccessRaw::getSize() const { Bu::MutexLocker l( rBuf.mInput ); return rBuf.qbInput.getSize(); } Bu::size Bu::ClientBuf::ClientAccessRaw::getBlockSize() const { Bu::MutexLocker l( rBuf.mInput ); return rBuf.qbInput.getBlockSize(); } Bu::String Bu::ClientBuf::ClientAccessRaw::getLocation() const { return "ClientBuf"; } ///////// // ClientAccess /// #define accClientFiltered accClientRaw Bu::ClientBuf::ClientAccess::ClientAccess( Bu::ClientBuf &rBuf ) : rBuf( rBuf ) { } Bu::ClientBuf::ClientAccess::~ClientAccess() { } void Bu::ClientBuf::ClientAccess::close() { // Roughly meaningless } Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes ) { Bu::println("ClientAccess::read( ptr, %1 )").arg( iBytes ); char *pBuf = (char *)pBufRaw; Bu::MutexLocker l( mAccess ); // Read from QueueBuf first Bu::size ps = qbPeek.read( pBuf, iBytes ); Bu::println("ClientAccess::read: attempted qbPeek, got %1\n>%2<").arg( ps ).arg( Bu::String(pBuf, ps) ); iBytes -= ps; pBuf += ps; // Request space left? Try the client if( iBytes > 0 ) { ps += rBuf.accClientFiltered.read( pBuf, iBytes ); Bu::println("ClientAccess::read: attempted completion from socket buffer, got %1\n>%2<").arg( ps ).arg( Bu::String(pBuf, ps) ); } return ps; } Bu::size Bu::ClientBuf::ClientAccess::peek( void *pData, int iBytes, int iOffset ) { Bu::println("ClientAccess::peek( ptr, %1, %2 )").arg( iBytes ).arg( iOffset ); Bu::MutexLocker l( mAccess ); // Do we have enough data in the peek buffer to handle this? if( qbPeek.getSize() < iBytes+iOffset ) { Bu::println("ClientAccess::peek: Insufficient buffered data (%1)").arg( qbPeek.getSize() ); // Nope, make an attempt to fill it in. int nDiff = iBytes-qbPeek.getSize(); // We have to make our own buffer, since iBytes+nOffeset could be bigger // than pData. char *pTmp = new char[nDiff]; Bu::size ps = rBuf.accClientFiltered.read( pTmp, nDiff ); Bu::println("ClientAccess::peek: Tried to fill buffer, read %1 of needed %2\n>%3<").arg( ps ).arg( nDiff ).arg( Bu::String(pTmp, ps) ); if( ps > 0 ) { // Add the data read to the peek buffer. qbPeek.write( pTmp, ps ); } delete[] pTmp; Bu::println("ClientAccess::peek: buffer left with %1").arg( qbPeek.getSize() ); } return qbPeek.peek( pData, iBytes, iOffset ); } Bu::size Bu::ClientBuf::ClientAccess::write( const void *pBuf, size iBytes ) { Bu::MutexLocker l( mAccess ); return rBuf.accClientFiltered.write( pBuf, iBytes ); } Bu::size Bu::ClientBuf::ClientAccess::tell() { Bu::MutexLocker l( mAccess ); return rBuf.accClientFiltered.tell() + qbPeek.getSize(); } void Bu::ClientBuf::ClientAccess::seek( Bu::size offset ) { Bu::println("ClientAccess::seek( %1 )").arg( offset ); Bu::MutexLocker l( mAccess ); // For this type of stream seek is basically a destructive skip. It's like // reading the data but with no output buffer. Let's remove data from the // peek buffer first. if( qbPeek.getSize() > 0 ) { Bu::size amount = Bu::buMin( qbPeek.getSize(), offset ); Bu::println("ClientAccess::seek: buffered: %1, amount: %2").arg( qbPeek.getSize() ).arg( amount ); qbPeek.seek( amount ); offset -= amount; } // If there's offset left, then apply it to the underlying stream if( offset > 0 ) { Bu::println("ClientAccess::seek: seeking remaining %1 in socket buffer").arg( offset ); rBuf.accClientFiltered.seek( offset ); } } void Bu::ClientBuf::ClientAccess::setPos( Bu::size ) { } void Bu::ClientBuf::ClientAccess::setPosEnd( Bu::size ) { } bool Bu::ClientBuf::ClientAccess::isEos() { Bu::MutexLocker l( mAccess ); return rBuf.accClientFiltered.isEos(); } bool Bu::ClientBuf::ClientAccess::isOpen() { Bu::MutexLocker l( mAccess ); return rBuf.accClientFiltered.isOpen(); } void Bu::ClientBuf::ClientAccess::flush() { Bu::MutexLocker l( mAccess ); return rBuf.accClientFiltered.flush(); } bool Bu::ClientBuf::ClientAccess::canRead() { Bu::MutexLocker l( mAccess ); return rBuf.accClientFiltered.canRead(); } bool Bu::ClientBuf::ClientAccess::canWrite() { Bu::MutexLocker l( mAccess ); return rBuf.accClientFiltered.canWrite(); } bool Bu::ClientBuf::ClientAccess::isReadable() { Bu::MutexLocker l( mAccess ); return rBuf.accClientFiltered.isReadable(); } bool Bu::ClientBuf::ClientAccess::isWritable() { Bu::MutexLocker l( mAccess ); return rBuf.accClientFiltered.isWritable(); } bool Bu::ClientBuf::ClientAccess::isSeekable() { Bu::MutexLocker l( mAccess ); return rBuf.accClientFiltered.isSeekable(); } bool Bu::ClientBuf::ClientAccess::isBlocking() { return false; } void Bu::ClientBuf::ClientAccess::setBlocking( bool ) { } void Bu::ClientBuf::ClientAccess::setSize( Bu::size ) { return; } Bu::size Bu::ClientBuf::ClientAccess::getSize() const { Bu::MutexLocker l( mAccess ); return rBuf.accClientFiltered.getSize() + qbPeek.getSize(); } Bu::size Bu::ClientBuf::ClientAccess::getBlockSize() const { Bu::MutexLocker l( mAccess ); return rBuf.accClientFiltered.getBlockSize(); } Bu::String Bu::ClientBuf::ClientAccess::getLocation() const { return "ClientBuf"; } #undef accClientFiltered ///////// // ServerAccess /// Bu::ClientBuf::ServerAccess::ServerAccess( Bu::ClientBuf &rBuf ) : rBuf( rBuf ) { } Bu::ClientBuf::ServerAccess::~ServerAccess() { } void Bu::ClientBuf::ServerAccess::close() { } Bu::size Bu::ClientBuf::ServerAccess::read( void *pBuf, size iBytes ) { Bu::MutexLocker l( rBuf.mOutput ); return rBuf.qbOutput.read( pBuf, iBytes ); } Bu::size Bu::ClientBuf::ServerAccess::peek( void *pData, int iBytes, int iOffset ) { Bu::MutexLocker l( rBuf.mOutput ); return rBuf.qbOutput.peek( pData, iBytes, iOffset ); } Bu::size Bu::ClientBuf::ServerAccess::write( const void *pBuf, size iBytes ) { Bu::MutexLocker l( rBuf.mInput ); return rBuf.qbInput.write( pBuf, iBytes ); } Bu::size Bu::ClientBuf::ServerAccess::tell() { Bu::MutexLocker l( rBuf.mOutput ); return rBuf.qbOutput.tell(); } void Bu::ClientBuf::ServerAccess::seek( Bu::size offset ) { Bu::MutexLocker l( rBuf.mOutput ); rBuf.qbOutput.seek( offset ); } void Bu::ClientBuf::ServerAccess::setPos( Bu::size ) { } void Bu::ClientBuf::ServerAccess::setPosEnd( Bu::size ) { } bool Bu::ClientBuf::ServerAccess::isEos() { Bu::MutexLocker l( rBuf.mOutput ); return rBuf.qbOutput.isEos(); } bool Bu::ClientBuf::ServerAccess::isOpen() { Bu::MutexLocker l( rBuf.mOutput ); return rBuf.qbOutput.isOpen(); } void Bu::ClientBuf::ServerAccess::flush() { Bu::MutexLocker l( rBuf.mInput ); return rBuf.qbInput.flush(); } bool Bu::ClientBuf::ServerAccess::canRead() { Bu::MutexLocker l( rBuf.mOutput ); return rBuf.qbOutput.canRead(); } bool Bu::ClientBuf::ServerAccess::canWrite() { Bu::MutexLocker l( rBuf.mInput ); return rBuf.qbInput.canWrite(); } bool Bu::ClientBuf::ServerAccess::isReadable() { Bu::MutexLocker l( rBuf.mOutput ); return rBuf.qbOutput.isReadable(); } bool Bu::ClientBuf::ServerAccess::isWritable() { Bu::MutexLocker l( rBuf.mInput ); return rBuf.qbInput.isWritable(); } bool Bu::ClientBuf::ServerAccess::isSeekable() { Bu::MutexLocker l( rBuf.mOutput ); return rBuf.qbOutput.isSeekable(); } bool Bu::ClientBuf::ServerAccess::isBlocking() { return false; } void Bu::ClientBuf::ServerAccess::setBlocking( bool ) { } void Bu::ClientBuf::ServerAccess::setSize( Bu::size ) { return; } Bu::size Bu::ClientBuf::ServerAccess::getSize() const { Bu::MutexLocker l( rBuf.mOutput ); return rBuf.qbOutput.getSize(); } Bu::size Bu::ClientBuf::ServerAccess::getBlockSize() const { Bu::MutexLocker l( rBuf.mOutput ); return rBuf.qbOutput.getBlockSize(); } Bu::String Bu::ClientBuf::ServerAccess::getLocation() const { return "ClientBuf"; }