aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Buland <eichlan@xagasoft.com>2011-03-22 20:12:50 +0000
committerMike Buland <eichlan@xagasoft.com>2011-03-22 20:12:50 +0000
commit47627be8e85b2169ab3d9f34b8819cacb083b5bf (patch)
treed1a321b51c602f09039472918bb27618749ac461
parent88004d87d513dcba767b1dae1e5199a89b22ce36 (diff)
downloadlibbu++-47627be8e85b2169ab3d9f34b8819cacb083b5bf.tar.gz
libbu++-47627be8e85b2169ab3d9f34b8819cacb083b5bf.tar.bz2
libbu++-47627be8e85b2169ab3d9f34b8819cacb083b5bf.tar.xz
libbu++-47627be8e85b2169ab3d9f34b8819cacb083b5bf.zip
Bu::Conduit now works exactly as it was advertised some time ago, it uses
Bu::QueueBuf and creates a really slick blocking inter-thread I/O system.
-rw-r--r--default.bld2
-rw-r--r--src/conduit.cpp226
-rw-r--r--src/conduit.h38
-rw-r--r--src/queuebuf.cpp5
-rw-r--r--src/queuebuf.h2
-rw-r--r--src/tests/conduit.cpp56
6 files changed, 321 insertions, 8 deletions
diff --git a/default.bld b/default.bld
index c5f4310..aa7f4cb 100644
--- a/default.bld
+++ b/default.bld
@@ -119,7 +119,7 @@ target ["tests/bzip2", "tests/streamstack"]
119} 119}
120 120
121target ["tests/itoserver", "tests/socketblock", "tests/itoheap", 121target ["tests/itoserver", "tests/socketblock", "tests/itoheap",
122 "tests/itoqueue1", "tests/itoqueue2"] 122 "tests/itoqueue1", "tests/itoqueue2", "tests/conduit"]
123{ 123{
124 LDFLAGS += "-lpthread"; 124 LDFLAGS += "-lpthread";
125} 125}
diff --git a/src/conduit.cpp b/src/conduit.cpp
index bb99526..cfa93d8 100644
--- a/src/conduit.cpp
+++ b/src/conduit.cpp
@@ -5,3 +5,229 @@
5 * terms of the license contained in the file LICENSE. 5 * terms of the license contained in the file LICENSE.
6 */ 6 */
7 7
8#include "bu/conduit.h"
9
10Bu::Conduit::Conduit( int iBlockSize ) :
11 qb( iBlockSize ),
12 bBlocking( true ),
13 bOpen( true )
14{
15}
16
17Bu::Conduit::~Conduit()
18{
19}
20
21void Bu::Conduit::close()
22{
23 im.lock();
24// qb.close();
25 bOpen = false;
26
27 cBlock.signal();
28 im.unlock();
29}
30
31#include <stdio.h>
32Bu::size Bu::Conduit::read( void *pBuf, Bu::size nBytes )
33{
34 if( !isOpen() )
35 {
36 return 0;
37 }
38 im.lock();
39 if( bBlocking )
40 {
41 im.unlock();
42 cBlock.lock();
43 for(;;)
44 {
45 im.lock();
46 if( qb.getSize() == 0 && bOpen == false )
47 {
48 im.unlock();
49 cBlock.unlock();
50 return 0;
51 }
52 else if( qb.getSize() > 0 )
53 {
54 im.unlock();
55 break;
56 }
57 im.unlock();
58
59 cBlock.wait();
60 }
61
62 im.lock();
63 Bu::size iRet = qb.read( pBuf, nBytes );
64 im.unlock();
65
66 cBlock.unlock();
67 return iRet;
68 }
69 else
70 {
71 Bu::size iRet = qb.read( pBuf, nBytes );
72 im.unlock();
73
74 return iRet;
75 }
76}
77
78Bu::size Bu::Conduit::peek( void *pBuf, Bu::size nBytes )
79{
80 im.lock();
81 Bu::size iRet = qb.peek( pBuf, nBytes );
82 im.unlock();
83
84 return iRet;
85}
86
87Bu::size Bu::Conduit::peek( void *pBuf, Bu::size nBytes, Bu::size nSkip )
88{
89 im.lock();
90 Bu::size iRet = qb.peek( pBuf, nBytes, nSkip );
91 im.unlock();
92
93 return iRet;
94}
95
96Bu::size Bu::Conduit::write( const void *pBuf, Bu::size nBytes )
97{
98 im.lock();
99 if( bOpen == false )
100 {
101 im.unlock();
102 return 0;
103 }
104 Bu::size sRet = qb.write( pBuf, nBytes );
105 cBlock.signal();
106 im.unlock();
107
108 return sRet;
109}
110
111Bu::size Bu::Conduit::tell()
112{
113 im.lock();
114 Bu::size sRet = qb.tell();
115 im.unlock();
116 return sRet;
117}
118
119void Bu::Conduit::seek( Bu::size offset )
120{
121}
122
123void Bu::Conduit::setPos( Bu::size pos )
124{
125}
126
127void Bu::Conduit::setPosEnd( Bu::size pos )
128{
129}
130
131bool Bu::Conduit::isEos()
132{
133 im.lock();
134 bool bRet = qb.isEos();
135 im.unlock();
136 return bRet;
137}
138
139bool Bu::Conduit::isOpen()
140{
141 im.lock();
142 bool bRet = bOpen || (qb.getSize() > 0);
143 im.unlock();
144 return bRet;
145}
146
147void Bu::Conduit::flush()
148{
149}
150
151bool Bu::Conduit::canRead()
152{
153 im.lock();
154 bool bRet = qb.canRead();
155 im.unlock();
156 return bRet;
157}
158
159bool Bu::Conduit::canWrite()
160{
161 im.lock();
162 bool bRet = qb.canWrite();
163 im.unlock();
164 return bRet;
165}
166
167bool Bu::Conduit::isReadable()
168{
169 im.lock();
170 bool bRet = qb.isReadable();
171 im.unlock();
172 return bRet;
173}
174
175bool Bu::Conduit::isWritable()
176{
177 im.lock();
178 bool bRet = qb.isWritable();
179 im.unlock();
180 return bRet;
181}
182
183bool Bu::Conduit::isSeekable()
184{
185 im.lock();
186 bool bRet = qb.isSeekable();
187 im.unlock();
188 return bRet;
189}
190
191bool Bu::Conduit::isBlocking()
192{
193 im.lock();
194 bool bRet = bBlocking;
195 im.unlock();
196 return bRet;
197}
198
199void Bu::Conduit::setBlocking( bool bBlocking )
200{
201 im.lock();
202 this->bBlocking = bBlocking;
203 im.unlock();
204}
205
206void Bu::Conduit::setSize( Bu::size iSize )
207{
208}
209
210Bu::size Bu::Conduit::getSize() const
211{
212 im.lock();
213 Bu::size sRet = qb.getSize();
214 im.unlock();
215 return sRet;
216}
217
218Bu::size Bu::Conduit::getBlockSize() const
219{
220 im.lock();
221 Bu::size sRet = qb.getBlockSize();
222 im.unlock();
223 return sRet;
224}
225
226Bu::String Bu::Conduit::getLocation() const
227{
228 im.lock();
229 Bu::String sRet = qb.getLocation();
230 im.unlock();
231 return sRet;
232}
233
diff --git a/src/conduit.h b/src/conduit.h
index cc88667..72b8d52 100644
--- a/src/conduit.h
+++ b/src/conduit.h
@@ -10,6 +10,9 @@
10 10
11#include "bu/stream.h" 11#include "bu/stream.h"
12#include "bu/string.h" 12#include "bu/string.h"
13#include "bu/queuebuf.h"
14#include "bu/itomutex.h"
15#include "bu/itocondition.h"
13 16
14namespace Bu 17namespace Bu
15{ 18{
@@ -20,6 +23,41 @@ namespace Bu
20 */ 23 */
21 class Conduit : public Stream 24 class Conduit : public Stream
22 { 25 {
26 public:
27 Conduit( int iBlockSize=256 );
28 virtual ~Conduit();
29
30 virtual void close();
31 virtual Bu::size read( void *pBuf, Bu::size nBytes );
32 virtual Bu::size peek( void *pBuf, Bu::size nBytes );
33 virtual Bu::size peek( void *pBuf, Bu::size nBytes, Bu::size nSkip );
34 virtual Bu::size write( const void *pBuf, Bu::size nBytes );
35 virtual Bu::size tell();
36 virtual void seek( Bu::size offset );
37 virtual void setPos( Bu::size pos );
38 virtual void setPosEnd( Bu::size pos );
39 virtual bool isEos();
40 virtual bool isOpen();
41 virtual void flush();
42 virtual bool canRead();
43 virtual bool canWrite();
44 virtual bool isReadable();
45 virtual bool isWritable();
46 virtual bool isSeekable();
47 virtual bool isBlocking();
48 virtual void setBlocking( bool bBlocking=true );
49 virtual void setSize( Bu::size iSize );
50
51 virtual size getSize() const;
52 virtual size getBlockSize() const;
53 virtual Bu::String getLocation() const;
54
55 private:
56 QueueBuf qb;
57 mutable ItoMutex im;
58 ItoCondition cBlock;
59 bool bBlocking;
60 bool bOpen;
23 }; 61 };
24} 62}
25 63
diff --git a/src/queuebuf.cpp b/src/queuebuf.cpp
index 69abf4b..98d8ee0 100644
--- a/src/queuebuf.cpp
+++ b/src/queuebuf.cpp
@@ -24,11 +24,6 @@ Bu::QueueBuf::~QueueBuf()
24 delete[] *i; 24 delete[] *i;
25} 25}
26 26
27int Bu::QueueBuf::getSize()
28{
29 return iTotalSize;
30}
31
32void Bu::QueueBuf::close() 27void Bu::QueueBuf::close()
33{ 28{
34 for( BlockList::iterator i = lBlocks.begin(); i; i++ ) 29 for( BlockList::iterator i = lBlocks.begin(); i; i++ )
diff --git a/src/queuebuf.h b/src/queuebuf.h
index 84b50e1..929ca35 100644
--- a/src/queuebuf.h
+++ b/src/queuebuf.h
@@ -24,8 +24,6 @@ namespace Bu
24 QueueBuf( int iBlockSize=256 ); 24 QueueBuf( int iBlockSize=256 );
25 virtual ~QueueBuf(); 25 virtual ~QueueBuf();
26 26
27 int getSize();
28
29 virtual void close(); 27 virtual void close();
30 virtual Bu::size read( void *pBuf, Bu::size nBytes ); 28 virtual Bu::size read( void *pBuf, Bu::size nBytes );
31 virtual Bu::size peek( void *pBuf, Bu::size nBytes ); 29 virtual Bu::size peek( void *pBuf, Bu::size nBytes );
diff --git a/src/tests/conduit.cpp b/src/tests/conduit.cpp
new file mode 100644
index 0000000..d8d9e03
--- /dev/null
+++ b/src/tests/conduit.cpp
@@ -0,0 +1,56 @@
1#include "bu/conduit.h"
2#include "bu/sio.h"
3#include "bu/ito.h"
4
5using namespace Bu;
6
7class Reader : public Bu::Ito
8{
9public:
10 Reader( Bu::Conduit &rCond ) :
11 rCond( rCond )
12 {
13 }
14
15 virtual ~Reader()
16 {
17 }
18
19protected:
20 virtual void run()
21 {
22 while( rCond.isOpen() )
23 {
24 char buf[1025];
25
26 sio << "Reading..." << sio.flush;
27 Bu::size iRead = rCond.read( buf, 1024 );
28 buf[iRead] = '\0';
29 sio << "got " << iRead << " >" << buf << "<" << sio.nl;
30 }
31
32 sio << "Conduit closed, exting thread." << sio.nl;
33 }
34
35private:
36 Bu::Conduit &rCond;
37};
38
39int main()
40{
41 Conduit c;
42 Reader r( c );
43 r.start();
44
45 sleep( 3 );
46 c.write("Hi there", 8 );
47 sleep( 3 );
48 c.write("Goodbye, soon.", 14 );
49 sleep( 3 );
50 c.write("...NOW!", 9 );
51 c.close();
52 sleep( 3 );
53
54 return 0;
55}
56