From 47627be8e85b2169ab3d9f34b8819cacb083b5bf Mon Sep 17 00:00:00 2001 From: Mike Buland Date: Tue, 22 Mar 2011 20:12:50 +0000 Subject: Bu::Conduit now works exactly as it was advertised some time ago, it uses Bu::QueueBuf and creates a really slick blocking inter-thread I/O system. --- default.bld | 2 +- src/conduit.cpp | 226 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/conduit.h | 38 +++++++++ src/queuebuf.cpp | 5 -- src/queuebuf.h | 2 - src/tests/conduit.cpp | 56 +++++++++++++ 6 files changed, 321 insertions(+), 8 deletions(-) create mode 100644 src/tests/conduit.cpp diff --git a/default.bld b/default.bld index c5f4310..aa7f4cb 100644 --- a/default.bld +++ b/default.bld @@ -119,7 +119,7 @@ target ["tests/bzip2", "tests/streamstack"] } target ["tests/itoserver", "tests/socketblock", "tests/itoheap", - "tests/itoqueue1", "tests/itoqueue2"] + "tests/itoqueue1", "tests/itoqueue2", "tests/conduit"] { LDFLAGS += "-lpthread"; } diff --git a/src/conduit.cpp b/src/conduit.cpp index bb99526..cfa93d8 100644 --- a/src/conduit.cpp +++ b/src/conduit.cpp @@ -5,3 +5,229 @@ * terms of the license contained in the file LICENSE. */ +#include "bu/conduit.h" + +Bu::Conduit::Conduit( int iBlockSize ) : + qb( iBlockSize ), + bBlocking( true ), + bOpen( true ) +{ +} + +Bu::Conduit::~Conduit() +{ +} + +void Bu::Conduit::close() +{ + im.lock(); +// qb.close(); + bOpen = false; + + cBlock.signal(); + im.unlock(); +} + +#include +Bu::size Bu::Conduit::read( void *pBuf, Bu::size nBytes ) +{ + if( !isOpen() ) + { + return 0; + } + im.lock(); + if( bBlocking ) + { + im.unlock(); + cBlock.lock(); + for(;;) + { + im.lock(); + if( qb.getSize() == 0 && bOpen == false ) + { + im.unlock(); + cBlock.unlock(); + return 0; + } + else if( qb.getSize() > 0 ) + { + im.unlock(); + break; + } + im.unlock(); + + cBlock.wait(); + } + + im.lock(); + Bu::size iRet = qb.read( pBuf, nBytes ); + im.unlock(); + + cBlock.unlock(); + return iRet; + } + else + { + Bu::size iRet = qb.read( pBuf, nBytes ); + im.unlock(); + + return iRet; + } +} + +Bu::size Bu::Conduit::peek( void *pBuf, Bu::size nBytes ) +{ + im.lock(); + Bu::size iRet = qb.peek( pBuf, nBytes ); + im.unlock(); + + return iRet; +} + +Bu::size Bu::Conduit::peek( void *pBuf, Bu::size nBytes, Bu::size nSkip ) +{ + im.lock(); + Bu::size iRet = qb.peek( pBuf, nBytes, nSkip ); + im.unlock(); + + return iRet; +} + +Bu::size Bu::Conduit::write( const void *pBuf, Bu::size nBytes ) +{ + im.lock(); + if( bOpen == false ) + { + im.unlock(); + return 0; + } + Bu::size sRet = qb.write( pBuf, nBytes ); + cBlock.signal(); + im.unlock(); + + return sRet; +} + +Bu::size Bu::Conduit::tell() +{ + im.lock(); + Bu::size sRet = qb.tell(); + im.unlock(); + return sRet; +} + +void Bu::Conduit::seek( Bu::size offset ) +{ +} + +void Bu::Conduit::setPos( Bu::size pos ) +{ +} + +void Bu::Conduit::setPosEnd( Bu::size pos ) +{ +} + +bool Bu::Conduit::isEos() +{ + im.lock(); + bool bRet = qb.isEos(); + im.unlock(); + return bRet; +} + +bool Bu::Conduit::isOpen() +{ + im.lock(); + bool bRet = bOpen || (qb.getSize() > 0); + im.unlock(); + return bRet; +} + +void Bu::Conduit::flush() +{ +} + +bool Bu::Conduit::canRead() +{ + im.lock(); + bool bRet = qb.canRead(); + im.unlock(); + return bRet; +} + +bool Bu::Conduit::canWrite() +{ + im.lock(); + bool bRet = qb.canWrite(); + im.unlock(); + return bRet; +} + +bool Bu::Conduit::isReadable() +{ + im.lock(); + bool bRet = qb.isReadable(); + im.unlock(); + return bRet; +} + +bool Bu::Conduit::isWritable() +{ + im.lock(); + bool bRet = qb.isWritable(); + im.unlock(); + return bRet; +} + +bool Bu::Conduit::isSeekable() +{ + im.lock(); + bool bRet = qb.isSeekable(); + im.unlock(); + return bRet; +} + +bool Bu::Conduit::isBlocking() +{ + im.lock(); + bool bRet = bBlocking; + im.unlock(); + return bRet; +} + +void Bu::Conduit::setBlocking( bool bBlocking ) +{ + im.lock(); + this->bBlocking = bBlocking; + im.unlock(); +} + +void Bu::Conduit::setSize( Bu::size iSize ) +{ +} + +Bu::size Bu::Conduit::getSize() const +{ + im.lock(); + Bu::size sRet = qb.getSize(); + im.unlock(); + return sRet; +} + +Bu::size Bu::Conduit::getBlockSize() const +{ + im.lock(); + Bu::size sRet = qb.getBlockSize(); + im.unlock(); + return sRet; +} + +Bu::String Bu::Conduit::getLocation() const +{ + im.lock(); + Bu::String sRet = qb.getLocation(); + im.unlock(); + return sRet; +} + diff --git a/src/conduit.h b/src/conduit.h index cc88667..72b8d52 100644 --- a/src/conduit.h +++ b/src/conduit.h @@ -10,6 +10,9 @@ #include "bu/stream.h" #include "bu/string.h" +#include "bu/queuebuf.h" +#include "bu/itomutex.h" +#include "bu/itocondition.h" namespace Bu { @@ -20,6 +23,41 @@ namespace Bu */ class Conduit : public Stream { + public: + Conduit( int iBlockSize=256 ); + virtual ~Conduit(); + + virtual void close(); + virtual Bu::size read( void *pBuf, Bu::size nBytes ); + virtual Bu::size peek( void *pBuf, Bu::size nBytes ); + virtual Bu::size peek( void *pBuf, Bu::size nBytes, Bu::size nSkip ); + virtual Bu::size write( const void *pBuf, Bu::size nBytes ); + virtual Bu::size tell(); + virtual void seek( Bu::size offset ); + virtual void setPos( Bu::size pos ); + virtual void setPosEnd( Bu::size pos ); + virtual bool isEos(); + virtual bool isOpen(); + virtual void flush(); + virtual bool canRead(); + virtual bool canWrite(); + virtual bool isReadable(); + virtual bool isWritable(); + virtual bool isSeekable(); + virtual bool isBlocking(); + virtual void setBlocking( bool bBlocking=true ); + virtual void setSize( Bu::size iSize ); + + virtual size getSize() const; + virtual size getBlockSize() const; + virtual Bu::String getLocation() const; + + private: + QueueBuf qb; + mutable ItoMutex im; + ItoCondition cBlock; + bool bBlocking; + bool bOpen; }; } diff --git a/src/queuebuf.cpp b/src/queuebuf.cpp index 69abf4b..98d8ee0 100644 --- a/src/queuebuf.cpp +++ b/src/queuebuf.cpp @@ -24,11 +24,6 @@ Bu::QueueBuf::~QueueBuf() delete[] *i; } -int Bu::QueueBuf::getSize() -{ - return iTotalSize; -} - void Bu::QueueBuf::close() { for( BlockList::iterator i = lBlocks.begin(); i; i++ ) diff --git a/src/queuebuf.h b/src/queuebuf.h index 84b50e1..929ca35 100644 --- a/src/queuebuf.h +++ b/src/queuebuf.h @@ -24,8 +24,6 @@ namespace Bu QueueBuf( int iBlockSize=256 ); virtual ~QueueBuf(); - int getSize(); - virtual void close(); virtual Bu::size read( void *pBuf, Bu::size nBytes ); virtual Bu::size peek( void *pBuf, Bu::size nBytes ); diff --git a/src/tests/conduit.cpp b/src/tests/conduit.cpp new file mode 100644 index 0000000..d8d9e03 --- /dev/null +++ b/src/tests/conduit.cpp @@ -0,0 +1,56 @@ +#include "bu/conduit.h" +#include "bu/sio.h" +#include "bu/ito.h" + +using namespace Bu; + +class Reader : public Bu::Ito +{ +public: + Reader( Bu::Conduit &rCond ) : + rCond( rCond ) + { + } + + virtual ~Reader() + { + } + +protected: + virtual void run() + { + while( rCond.isOpen() ) + { + char buf[1025]; + + sio << "Reading..." << sio.flush; + Bu::size iRead = rCond.read( buf, 1024 ); + buf[iRead] = '\0'; + sio << "got " << iRead << " >" << buf << "<" << sio.nl; + } + + sio << "Conduit closed, exting thread." << sio.nl; + } + +private: + Bu::Conduit &rCond; +}; + +int main() +{ + Conduit c; + Reader r( c ); + r.start(); + + sleep( 3 ); + c.write("Hi there", 8 ); + sleep( 3 ); + c.write("Goodbye, soon.", 14 ); + sleep( 3 ); + c.write("...NOW!", 9 ); + c.close(); + sleep( 3 ); + + return 0; +} + -- cgit v1.2.3