diff options
| author | Mike Buland <eichlan@xagasoft.com> | 2011-03-22 20:12:50 +0000 |
|---|---|---|
| committer | Mike Buland <eichlan@xagasoft.com> | 2011-03-22 20:12:50 +0000 |
| commit | 47627be8e85b2169ab3d9f34b8819cacb083b5bf (patch) | |
| tree | d1a321b51c602f09039472918bb27618749ac461 /src | |
| parent | 88004d87d513dcba767b1dae1e5199a89b22ce36 (diff) | |
| download | libbu++-47627be8e85b2169ab3d9f34b8819cacb083b5bf.tar.gz libbu++-47627be8e85b2169ab3d9f34b8819cacb083b5bf.tar.bz2 libbu++-47627be8e85b2169ab3d9f34b8819cacb083b5bf.tar.xz libbu++-47627be8e85b2169ab3d9f34b8819cacb083b5bf.zip | |
Bu::Conduit now works exactly as it was advertised some time ago, it uses
Bu::QueueBuf and creates a really slick blocking inter-thread I/O system.
Diffstat (limited to 'src')
| -rw-r--r-- | src/conduit.cpp | 226 | ||||
| -rw-r--r-- | src/conduit.h | 38 | ||||
| -rw-r--r-- | src/queuebuf.cpp | 5 | ||||
| -rw-r--r-- | src/queuebuf.h | 2 | ||||
| -rw-r--r-- | src/tests/conduit.cpp | 56 |
5 files changed, 320 insertions, 7 deletions
diff --git a/src/conduit.cpp b/src/conduit.cpp index bb99526..cfa93d8 100644 --- a/src/conduit.cpp +++ b/src/conduit.cpp | |||
| @@ -5,3 +5,229 @@ | |||
| 5 | * terms of the license contained in the file LICENSE. | 5 | * terms of the license contained in the file LICENSE. |
| 6 | */ | 6 | */ |
| 7 | 7 | ||
| 8 | #include "bu/conduit.h" | ||
| 9 | |||
| 10 | Bu::Conduit::Conduit( int iBlockSize ) : | ||
| 11 | qb( iBlockSize ), | ||
| 12 | bBlocking( true ), | ||
| 13 | bOpen( true ) | ||
| 14 | { | ||
| 15 | } | ||
| 16 | |||
| 17 | Bu::Conduit::~Conduit() | ||
| 18 | { | ||
| 19 | } | ||
| 20 | |||
| 21 | void Bu::Conduit::close() | ||
| 22 | { | ||
| 23 | im.lock(); | ||
| 24 | // qb.close(); | ||
| 25 | bOpen = false; | ||
| 26 | |||
| 27 | cBlock.signal(); | ||
| 28 | im.unlock(); | ||
| 29 | } | ||
| 30 | |||
| 31 | #include <stdio.h> | ||
| 32 | Bu::size Bu::Conduit::read( void *pBuf, Bu::size nBytes ) | ||
| 33 | { | ||
| 34 | if( !isOpen() ) | ||
| 35 | { | ||
| 36 | return 0; | ||
| 37 | } | ||
| 38 | im.lock(); | ||
| 39 | if( bBlocking ) | ||
| 40 | { | ||
| 41 | im.unlock(); | ||
| 42 | cBlock.lock(); | ||
| 43 | for(;;) | ||
| 44 | { | ||
| 45 | im.lock(); | ||
| 46 | if( qb.getSize() == 0 && bOpen == false ) | ||
| 47 | { | ||
| 48 | im.unlock(); | ||
| 49 | cBlock.unlock(); | ||
| 50 | return 0; | ||
| 51 | } | ||
| 52 | else if( qb.getSize() > 0 ) | ||
| 53 | { | ||
| 54 | im.unlock(); | ||
| 55 | break; | ||
| 56 | } | ||
| 57 | im.unlock(); | ||
| 58 | |||
| 59 | cBlock.wait(); | ||
| 60 | } | ||
| 61 | |||
| 62 | im.lock(); | ||
| 63 | Bu::size iRet = qb.read( pBuf, nBytes ); | ||
| 64 | im.unlock(); | ||
| 65 | |||
| 66 | cBlock.unlock(); | ||
| 67 | return iRet; | ||
| 68 | } | ||
| 69 | else | ||
| 70 | { | ||
| 71 | Bu::size iRet = qb.read( pBuf, nBytes ); | ||
| 72 | im.unlock(); | ||
| 73 | |||
| 74 | return iRet; | ||
| 75 | } | ||
| 76 | } | ||
| 77 | |||
| 78 | Bu::size Bu::Conduit::peek( void *pBuf, Bu::size nBytes ) | ||
| 79 | { | ||
| 80 | im.lock(); | ||
| 81 | Bu::size iRet = qb.peek( pBuf, nBytes ); | ||
| 82 | im.unlock(); | ||
| 83 | |||
| 84 | return iRet; | ||
| 85 | } | ||
| 86 | |||
| 87 | Bu::size Bu::Conduit::peek( void *pBuf, Bu::size nBytes, Bu::size nSkip ) | ||
| 88 | { | ||
| 89 | im.lock(); | ||
| 90 | Bu::size iRet = qb.peek( pBuf, nBytes, nSkip ); | ||
| 91 | im.unlock(); | ||
| 92 | |||
| 93 | return iRet; | ||
| 94 | } | ||
| 95 | |||
| 96 | Bu::size Bu::Conduit::write( const void *pBuf, Bu::size nBytes ) | ||
| 97 | { | ||
| 98 | im.lock(); | ||
| 99 | if( bOpen == false ) | ||
| 100 | { | ||
| 101 | im.unlock(); | ||
| 102 | return 0; | ||
| 103 | } | ||
| 104 | Bu::size sRet = qb.write( pBuf, nBytes ); | ||
| 105 | cBlock.signal(); | ||
| 106 | im.unlock(); | ||
| 107 | |||
| 108 | return sRet; | ||
| 109 | } | ||
| 110 | |||
| 111 | Bu::size Bu::Conduit::tell() | ||
| 112 | { | ||
| 113 | im.lock(); | ||
| 114 | Bu::size sRet = qb.tell(); | ||
| 115 | im.unlock(); | ||
| 116 | return sRet; | ||
| 117 | } | ||
| 118 | |||
| 119 | void Bu::Conduit::seek( Bu::size offset ) | ||
| 120 | { | ||
| 121 | } | ||
| 122 | |||
| 123 | void Bu::Conduit::setPos( Bu::size pos ) | ||
| 124 | { | ||
| 125 | } | ||
| 126 | |||
| 127 | void Bu::Conduit::setPosEnd( Bu::size pos ) | ||
| 128 | { | ||
| 129 | } | ||
| 130 | |||
| 131 | bool Bu::Conduit::isEos() | ||
| 132 | { | ||
| 133 | im.lock(); | ||
| 134 | bool bRet = qb.isEos(); | ||
| 135 | im.unlock(); | ||
| 136 | return bRet; | ||
| 137 | } | ||
| 138 | |||
| 139 | bool Bu::Conduit::isOpen() | ||
| 140 | { | ||
| 141 | im.lock(); | ||
| 142 | bool bRet = bOpen || (qb.getSize() > 0); | ||
| 143 | im.unlock(); | ||
| 144 | return bRet; | ||
| 145 | } | ||
| 146 | |||
| 147 | void Bu::Conduit::flush() | ||
| 148 | { | ||
| 149 | } | ||
| 150 | |||
| 151 | bool Bu::Conduit::canRead() | ||
| 152 | { | ||
| 153 | im.lock(); | ||
| 154 | bool bRet = qb.canRead(); | ||
| 155 | im.unlock(); | ||
| 156 | return bRet; | ||
| 157 | } | ||
| 158 | |||
| 159 | bool Bu::Conduit::canWrite() | ||
| 160 | { | ||
| 161 | im.lock(); | ||
| 162 | bool bRet = qb.canWrite(); | ||
| 163 | im.unlock(); | ||
| 164 | return bRet; | ||
| 165 | } | ||
| 166 | |||
| 167 | bool Bu::Conduit::isReadable() | ||
| 168 | { | ||
| 169 | im.lock(); | ||
| 170 | bool bRet = qb.isReadable(); | ||
| 171 | im.unlock(); | ||
| 172 | return bRet; | ||
| 173 | } | ||
| 174 | |||
| 175 | bool Bu::Conduit::isWritable() | ||
| 176 | { | ||
| 177 | im.lock(); | ||
| 178 | bool bRet = qb.isWritable(); | ||
| 179 | im.unlock(); | ||
| 180 | return bRet; | ||
| 181 | } | ||
| 182 | |||
| 183 | bool Bu::Conduit::isSeekable() | ||
| 184 | { | ||
| 185 | im.lock(); | ||
| 186 | bool bRet = qb.isSeekable(); | ||
| 187 | im.unlock(); | ||
| 188 | return bRet; | ||
| 189 | } | ||
| 190 | |||
| 191 | bool Bu::Conduit::isBlocking() | ||
| 192 | { | ||
| 193 | im.lock(); | ||
| 194 | bool bRet = bBlocking; | ||
| 195 | im.unlock(); | ||
| 196 | return bRet; | ||
| 197 | } | ||
| 198 | |||
| 199 | void Bu::Conduit::setBlocking( bool bBlocking ) | ||
| 200 | { | ||
| 201 | im.lock(); | ||
| 202 | this->bBlocking = bBlocking; | ||
| 203 | im.unlock(); | ||
| 204 | } | ||
| 205 | |||
| 206 | void Bu::Conduit::setSize( Bu::size iSize ) | ||
| 207 | { | ||
| 208 | } | ||
| 209 | |||
| 210 | Bu::size Bu::Conduit::getSize() const | ||
| 211 | { | ||
| 212 | im.lock(); | ||
| 213 | Bu::size sRet = qb.getSize(); | ||
| 214 | im.unlock(); | ||
| 215 | return sRet; | ||
| 216 | } | ||
| 217 | |||
| 218 | Bu::size Bu::Conduit::getBlockSize() const | ||
| 219 | { | ||
| 220 | im.lock(); | ||
| 221 | Bu::size sRet = qb.getBlockSize(); | ||
| 222 | im.unlock(); | ||
| 223 | return sRet; | ||
| 224 | } | ||
| 225 | |||
| 226 | Bu::String Bu::Conduit::getLocation() const | ||
| 227 | { | ||
| 228 | im.lock(); | ||
| 229 | Bu::String sRet = qb.getLocation(); | ||
| 230 | im.unlock(); | ||
| 231 | return sRet; | ||
| 232 | } | ||
| 233 | |||
diff --git a/src/conduit.h b/src/conduit.h index cc88667..72b8d52 100644 --- a/src/conduit.h +++ b/src/conduit.h | |||
| @@ -10,6 +10,9 @@ | |||
| 10 | 10 | ||
| 11 | #include "bu/stream.h" | 11 | #include "bu/stream.h" |
| 12 | #include "bu/string.h" | 12 | #include "bu/string.h" |
| 13 | #include "bu/queuebuf.h" | ||
| 14 | #include "bu/itomutex.h" | ||
| 15 | #include "bu/itocondition.h" | ||
| 13 | 16 | ||
| 14 | namespace Bu | 17 | namespace Bu |
| 15 | { | 18 | { |
| @@ -20,6 +23,41 @@ namespace Bu | |||
| 20 | */ | 23 | */ |
| 21 | class Conduit : public Stream | 24 | class Conduit : public Stream |
| 22 | { | 25 | { |
| 26 | public: | ||
| 27 | Conduit( int iBlockSize=256 ); | ||
| 28 | virtual ~Conduit(); | ||
| 29 | |||
| 30 | virtual void close(); | ||
| 31 | virtual Bu::size read( void *pBuf, Bu::size nBytes ); | ||
| 32 | virtual Bu::size peek( void *pBuf, Bu::size nBytes ); | ||
| 33 | virtual Bu::size peek( void *pBuf, Bu::size nBytes, Bu::size nSkip ); | ||
| 34 | virtual Bu::size write( const void *pBuf, Bu::size nBytes ); | ||
| 35 | virtual Bu::size tell(); | ||
| 36 | virtual void seek( Bu::size offset ); | ||
| 37 | virtual void setPos( Bu::size pos ); | ||
| 38 | virtual void setPosEnd( Bu::size pos ); | ||
| 39 | virtual bool isEos(); | ||
| 40 | virtual bool isOpen(); | ||
| 41 | virtual void flush(); | ||
| 42 | virtual bool canRead(); | ||
| 43 | virtual bool canWrite(); | ||
| 44 | virtual bool isReadable(); | ||
| 45 | virtual bool isWritable(); | ||
| 46 | virtual bool isSeekable(); | ||
| 47 | virtual bool isBlocking(); | ||
| 48 | virtual void setBlocking( bool bBlocking=true ); | ||
| 49 | virtual void setSize( Bu::size iSize ); | ||
| 50 | |||
| 51 | virtual size getSize() const; | ||
| 52 | virtual size getBlockSize() const; | ||
| 53 | virtual Bu::String getLocation() const; | ||
| 54 | |||
| 55 | private: | ||
| 56 | QueueBuf qb; | ||
| 57 | mutable ItoMutex im; | ||
| 58 | ItoCondition cBlock; | ||
| 59 | bool bBlocking; | ||
| 60 | bool bOpen; | ||
| 23 | }; | 61 | }; |
| 24 | } | 62 | } |
| 25 | 63 | ||
diff --git a/src/queuebuf.cpp b/src/queuebuf.cpp index 69abf4b..98d8ee0 100644 --- a/src/queuebuf.cpp +++ b/src/queuebuf.cpp | |||
| @@ -24,11 +24,6 @@ Bu::QueueBuf::~QueueBuf() | |||
| 24 | delete[] *i; | 24 | delete[] *i; |
| 25 | } | 25 | } |
| 26 | 26 | ||
| 27 | int Bu::QueueBuf::getSize() | ||
| 28 | { | ||
| 29 | return iTotalSize; | ||
| 30 | } | ||
| 31 | |||
| 32 | void Bu::QueueBuf::close() | 27 | void Bu::QueueBuf::close() |
| 33 | { | 28 | { |
| 34 | for( BlockList::iterator i = lBlocks.begin(); i; i++ ) | 29 | for( BlockList::iterator i = lBlocks.begin(); i; i++ ) |
diff --git a/src/queuebuf.h b/src/queuebuf.h index 84b50e1..929ca35 100644 --- a/src/queuebuf.h +++ b/src/queuebuf.h | |||
| @@ -24,8 +24,6 @@ namespace Bu | |||
| 24 | QueueBuf( int iBlockSize=256 ); | 24 | QueueBuf( int iBlockSize=256 ); |
| 25 | virtual ~QueueBuf(); | 25 | virtual ~QueueBuf(); |
| 26 | 26 | ||
| 27 | int getSize(); | ||
| 28 | |||
| 29 | virtual void close(); | 27 | virtual void close(); |
| 30 | virtual Bu::size read( void *pBuf, Bu::size nBytes ); | 28 | virtual Bu::size read( void *pBuf, Bu::size nBytes ); |
| 31 | virtual Bu::size peek( void *pBuf, Bu::size nBytes ); | 29 | virtual Bu::size peek( void *pBuf, Bu::size nBytes ); |
diff --git a/src/tests/conduit.cpp b/src/tests/conduit.cpp new file mode 100644 index 0000000..d8d9e03 --- /dev/null +++ b/src/tests/conduit.cpp | |||
| @@ -0,0 +1,56 @@ | |||
| 1 | #include "bu/conduit.h" | ||
| 2 | #include "bu/sio.h" | ||
| 3 | #include "bu/ito.h" | ||
| 4 | |||
| 5 | using namespace Bu; | ||
| 6 | |||
| 7 | class Reader : public Bu::Ito | ||
| 8 | { | ||
| 9 | public: | ||
| 10 | Reader( Bu::Conduit &rCond ) : | ||
| 11 | rCond( rCond ) | ||
| 12 | { | ||
| 13 | } | ||
| 14 | |||
| 15 | virtual ~Reader() | ||
| 16 | { | ||
| 17 | } | ||
| 18 | |||
| 19 | protected: | ||
| 20 | virtual void run() | ||
| 21 | { | ||
| 22 | while( rCond.isOpen() ) | ||
| 23 | { | ||
| 24 | char buf[1025]; | ||
| 25 | |||
| 26 | sio << "Reading..." << sio.flush; | ||
| 27 | Bu::size iRead = rCond.read( buf, 1024 ); | ||
| 28 | buf[iRead] = '\0'; | ||
| 29 | sio << "got " << iRead << " >" << buf << "<" << sio.nl; | ||
| 30 | } | ||
| 31 | |||
| 32 | sio << "Conduit closed, exting thread." << sio.nl; | ||
| 33 | } | ||
| 34 | |||
| 35 | private: | ||
| 36 | Bu::Conduit &rCond; | ||
| 37 | }; | ||
| 38 | |||
| 39 | int main() | ||
| 40 | { | ||
| 41 | Conduit c; | ||
| 42 | Reader r( c ); | ||
| 43 | r.start(); | ||
| 44 | |||
| 45 | sleep( 3 ); | ||
| 46 | c.write("Hi there", 8 ); | ||
| 47 | sleep( 3 ); | ||
| 48 | c.write("Goodbye, soon.", 14 ); | ||
| 49 | sleep( 3 ); | ||
| 50 | c.write("...NOW!", 9 ); | ||
| 51 | c.close(); | ||
| 52 | sleep( 3 ); | ||
| 53 | |||
| 54 | return 0; | ||
| 55 | } | ||
| 56 | |||
