aboutsummaryrefslogtreecommitdiff
path: root/src/stable/client.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/stable/client.cpp')
-rw-r--r--src/stable/client.cpp184
1 files changed, 33 insertions, 151 deletions
diff --git a/src/stable/client.cpp b/src/stable/client.cpp
index 56c5094..d2d48d7 100644
--- a/src/stable/client.cpp
+++ b/src/stable/client.cpp
@@ -6,7 +6,6 @@
6 */ 6 */
7 7
8#include "bu/client.h" 8#include "bu/client.h"
9#include "bu/tcpsocket.h"
10#include <stdlib.h> 9#include <stdlib.h>
11#include <errno.h> 10#include <errno.h>
12#include "bu/protocol.h" 11#include "bu/protocol.h"
@@ -24,72 +23,23 @@
24#define BU_PROFILE_END( x ) (void)0 23#define BU_PROFILE_END( x ) (void)0
25#endif 24#endif
26 25
27Bu::Client::Client( Bu::TcpSocket *pSocket, 26Bu::Client::Client( class Bu::ClientLinkFactory *pfLink ) :
28 class Bu::ClientLinkFactory *pfLink ) :
29 pTopStream( pSocket ),
30 pSocket( pSocket ),
31 pProto( NULL ), 27 pProto( NULL ),
32 bWantsDisconnect( false ), 28 bWantsDisconnect( false ),
33 pfLink( pfLink ) 29 pfLink( pfLink )
34{ 30{
35 Bu::ReadWriteMutex::WriteLocker lGlobal( mGlobal );
36 lFilts.prepend( pSocket );
37} 31}
38 32
39Bu::Client::~Client() 33Bu::Client::~Client()
40{ 34{
41 Bu::ReadWriteMutex::WriteLocker lGlobal( mGlobal );
42 for( FilterList::iterator i = lFilts.begin(); i; i++ )
43 {
44 delete *i;
45 }
46 pTopStream = pSocket = NULL;
47 delete pfLink; 35 delete pfLink;
48} 36}
49 37
50void Bu::Client::processInput() 38void Bu::Client::processInput()
51{ 39{
52 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 40 Bu::MutexLocker l( mProto );
53 mRead.lock();
54 char buf[RBS];
55 Bu::size nRead, nTotal=0;
56 41
57 BU_PROFILE_START("client.read"); 42 if( pProto && getInputSize() > 0 )
58 for(;;)
59 {
60 try
61 {
62 nRead = pTopStream->read( buf, RBS );
63
64 if( nRead == 0 )
65 {
66 break;
67 }
68 else
69 {
70 nTotal += nRead;
71 qbRead.write( buf, nRead );
72 if( !pTopStream->canRead() )
73 break;
74 }
75 }
76 catch( Bu::TcpSocketException &e )
77 {
78 pTopStream->close();
79 bWantsDisconnect = true;
80 break;
81 }
82 }
83 BU_PROFILE_END("client.read");
84 mRead.unlock();
85
86 if( nTotal == 0 )
87 {
88 pTopStream->close();
89 bWantsDisconnect = true;
90 }
91
92 if( pProto && nTotal )
93 { 43 {
94 BU_PROFILE_START("client.process"); 44 BU_PROFILE_START("client.process");
95 pProto->onNewData( this ); 45 pProto->onNewData( this );
@@ -97,197 +47,133 @@ void Bu::Client::processInput()
97 } 47 }
98} 48}
99 49
100void Bu::Client::processOutput()
101{
102 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal );
103 mWrite.lock();
104 char buf[RBS];
105 if( qbWrite.getSize() > 0 )
106 {
107 int nAmnt = RBS;
108 nAmnt = qbWrite.peek( buf, nAmnt );
109 int nReal = pTopStream->write( buf, nAmnt );
110 qbWrite.seek( nReal );
111 pTopStream->flush();
112 }
113 mWrite.unlock();
114}
115
116void Bu::Client::setProtocol( Protocol *pProto ) 50void Bu::Client::setProtocol( Protocol *pProto )
117{ 51{
118 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 52 Bu::MutexLocker l( mProto );
119 this->pProto = pProto; 53 this->pProto = pProto;
120 this->pProto->onNewConnection( this ); 54 this->pProto->onNewConnection( this );
121} 55}
122 56
123Bu::Protocol *Bu::Client::getProtocol() 57Bu::Protocol *Bu::Client::getProtocol()
124{ 58{
125 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 59 Bu::MutexLocker l( mProto );
126 return pProto; 60 return pProto;
127} 61}
128 62
129void Bu::Client::clearProtocol() 63void Bu::Client::clearProtocol()
130{ 64{
131 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 65 Bu::MutexLocker l( mProto );
132 pProto = NULL; 66 pProto = NULL;
133} 67}
134/*
135Bu::String &Bu::Client::getInput()
136{
137 return sReadBuf;
138}
139
140Bu::String &Bu::Client::getOutput()
141{
142 return sWriteBuf;
143}
144*/
145 68
146bool Bu::Client::isOpen() 69bool Bu::Client::isOpen()
147{ 70{
148 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 71 return true;
149 if( !pTopStream ) return false;
150 return pTopStream->isOpen();
151} 72}
152 73
153Bu::size Bu::Client::write( const Bu::String &sData ) 74Bu::size Bu::Client::write( const Bu::String &sData )
154{ 75{
155 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 76 return cbBuffer.client().write( sData.getStr(), sData.getSize() );
156 Bu::MutexLocker l( mWrite );
157 return qbWrite.write( sData.getStr(), sData.getSize() );
158} 77}
159 78
160Bu::size Bu::Client::write( const void *pData, Bu::size nBytes ) 79Bu::size Bu::Client::write( const void *pData, Bu::size nBytes )
161{ 80{
162 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 81 return cbBuffer.client().write( pData, nBytes );
163 Bu::MutexLocker l( mWrite );
164 return qbWrite.write( pData, nBytes );
165} 82}
166 83
167Bu::size Bu::Client::write( int8_t nData ) 84Bu::size Bu::Client::write( int8_t nData )
168{ 85{
169 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 86 return cbBuffer.client().write( (const char *)&nData, sizeof(nData) );
170 Bu::MutexLocker l( mWrite );
171 return qbWrite.write( (const char *)&nData, sizeof(nData) );
172} 87}
173 88
174Bu::size Bu::Client::write( int16_t nData ) 89Bu::size Bu::Client::write( int16_t nData )
175{ 90{
176 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 91 return cbBuffer.client().write( (const char *)&nData, sizeof(nData) );
177 Bu::MutexLocker l( mWrite );
178 return qbWrite.write( (const char *)&nData, sizeof(nData) );
179} 92}
180 93
181Bu::size Bu::Client::write( int32_t nData ) 94Bu::size Bu::Client::write( int32_t nData )
182{ 95{
183 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 96 return cbBuffer.client().write( (const char *)&nData, sizeof(nData) );
184 Bu::MutexLocker l( mWrite );
185 return qbWrite.write( (const char *)&nData, sizeof(nData) );
186} 97}
187 98
188Bu::size Bu::Client::write( int64_t nData ) 99Bu::size Bu::Client::write( int64_t nData )
189{ 100{
190 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 101 return cbBuffer.client().write( (const char *)&nData, sizeof(nData) );
191 Bu::MutexLocker l( mWrite );
192 return qbWrite.write( (const char *)&nData, sizeof(nData) );
193} 102}
194 103
195Bu::size Bu::Client::write( uint8_t nData ) 104Bu::size Bu::Client::write( uint8_t nData )
196{ 105{
197 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 106 return cbBuffer.client().write( (const char *)&nData, sizeof(nData) );
198 Bu::MutexLocker l( mWrite );
199 return qbWrite.write( (const char *)&nData, sizeof(nData) );
200} 107}
201 108
202Bu::size Bu::Client::write( uint16_t nData ) 109Bu::size Bu::Client::write( uint16_t nData )
203{ 110{
204 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 111 return cbBuffer.client().write( (const char *)&nData, sizeof(nData) );
205 Bu::MutexLocker l( mWrite );
206 return qbWrite.write( (const char *)&nData, sizeof(nData) );
207} 112}
208 113
209Bu::size Bu::Client::write( uint32_t nData ) 114Bu::size Bu::Client::write( uint32_t nData )
210{ 115{
211 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 116 return cbBuffer.client().write( (const char *)&nData, sizeof(nData) );
212 Bu::MutexLocker l( mWrite );
213 return qbWrite.write( (const char *)&nData, sizeof(nData) );
214} 117}
215 118
216Bu::size Bu::Client::write( uint64_t nData ) 119Bu::size Bu::Client::write( uint64_t nData )
217{ 120{
218 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 121 return cbBuffer.client().write( (const char *)&nData, sizeof(nData) );
219 Bu::MutexLocker l( mWrite );
220 return qbWrite.write( (const char *)&nData, sizeof(nData) );
221} 122}
222 123
223Bu::size Bu::Client::read( void *pData, Bu::size nBytes ) 124Bu::size Bu::Client::read( void *pData, Bu::size nBytes )
224{ 125{
225 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 126 return cbBuffer.client().read( pData, nBytes );
226 Bu::MutexLocker l( mWrite );
227 return qbRead.read( pData, nBytes );
228} 127}
229 128
129
230Bu::size Bu::Client::peek( void *pData, int nBytes, int nOffset ) 130Bu::size Bu::Client::peek( void *pData, int nBytes, int nOffset )
231{ 131{
232 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 132 return cbBuffer.client().peek( pData, nBytes, nOffset );
233 Bu::MutexLocker l( mWrite );
234 return qbRead.peek( pData, nBytes, nOffset );
235} 133}
236 134
135
237Bu::size Bu::Client::getInputSize() 136Bu::size Bu::Client::getInputSize()
238{ 137{
239 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 138 return cbBuffer.client().getSize();
240 Bu::MutexLocker l( mWrite );
241 return qbRead.getSize();
242} 139}
243 140
244Bu::size Bu::Client::getOutputSize() 141Bu::size Bu::Client::getOutputSize()
245{ 142{
246 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 143 return cbBuffer.server().getSize();
247 Bu::MutexLocker l( mWrite );
248 return qbWrite.getSize();
249}
250
251const Bu::TcpSocket *Bu::Client::getSocket() const
252{
253 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal );
254 return pSocket;
255} 144}
256 145
257void Bu::Client::disconnect() 146void Bu::Client::disconnect()
258{ 147{
259 Bu::ReadWriteMutex::WriteLocker lGlobal( mGlobal ); 148 Bu::MutexLocker l( mDisconnect );
260 bWantsDisconnect = true; 149 bWantsDisconnect = true;
261} 150}
262 151
263bool Bu::Client::wantsDisconnect() 152bool Bu::Client::wantsDisconnect()
264{ 153{
265 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 154 Bu::MutexLocker l( mDisconnect );
266 return bWantsDisconnect; 155 return bWantsDisconnect;
267} 156}
268 157
269void Bu::Client::close() 158void Bu::Client::close()
270{ 159{
271 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal );
272 pTopStream->close();
273} 160}
274 161
275Bu::ClientLink *Bu::Client::getLink() 162Bu::ClientLink *Bu::Client::getLink()
276{ 163{
277 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal );
278 return pfLink->createLink( this ); 164 return pfLink->createLink( this );
279} 165}
280 166
281void Bu::Client::onMessage( const Bu::String &sMsg ) 167void Bu::Client::onMessage( const Bu::String &sMsg )
282{ 168{
283 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 169 Bu::MutexLocker l( mProto );
284 if( pProto ) 170 if( pProto )
285 pProto->onMessage( this, sMsg ); 171 pProto->onMessage( this, sMsg );
286} 172}
287 173
288void Bu::Client::tick() 174void Bu::Client::tick()
289{ 175{
290 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 176 Bu::MutexLocker l( mProto );
291 if( pProto ) 177 if( pProto )
292 pProto->onTick( this ); 178 pProto->onTick( this );
293} 179}
@@ -299,9 +185,7 @@ Bu::size Bu::Client::tell()
299 185
300void Bu::Client::seek( Bu::size offset ) 186void Bu::Client::seek( Bu::size offset )
301{ 187{
302 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 188 return cbBuffer.client().seek( offset );
303 Bu::MutexLocker l( mRead );
304 return qbRead.seek( offset );
305} 189}
306 190
307void Bu::Client::setPos( Bu::size ) 191void Bu::Client::setPos( Bu::size )
@@ -321,15 +205,11 @@ bool Bu::Client::isEos()
321 205
322void Bu::Client::flush() 206void Bu::Client::flush()
323{ 207{
324 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal );
325 processOutput();
326} 208}
327 209
328bool Bu::Client::canRead() 210bool Bu::Client::canRead()
329{ 211{
330 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 212 return cbBuffer.client().getSize() > 0;
331 Bu::MutexLocker l( mRead );
332 return qbRead.getSize() > 0;
333} 213}
334 214
335bool Bu::Client::canWrite() 215bool Bu::Client::canWrite()
@@ -374,11 +254,13 @@ Bu::size Bu::Client::getSize() const
374 254
375Bu::size Bu::Client::getBlockSize() const 255Bu::size Bu::Client::getBlockSize() const
376{ 256{
377 return pSocket->getBlockSize(); 257 return 0;
258 //return pSocket->getBlockSize();
378} 259}
379 260
380Bu::String Bu::Client::getLocation() const 261Bu::String Bu::Client::getLocation() const
381{ 262{
382 return pSocket->getLocation(); 263 return "???";
264 //return pSocket->getLocation();
383} 265}
384 266