aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/stable/client.h1
-rw-r--r--src/stable/clientbuf.cpp19
-rw-r--r--src/stable/clientbuf.h3
-rw-r--r--src/stable/counterevent.h154
-rw-r--r--src/stable/server.cpp64
-rw-r--r--src/stable/server.h12
-rw-r--r--src/stable/thread.cpp5
-rw-r--r--src/stable/thread.h6
-rw-r--r--src/unstable/protocolwebsocket.cpp3
9 files changed, 236 insertions, 31 deletions
diff --git a/src/stable/client.h b/src/stable/client.h
index 4cd8e3f..abe807e 100644
--- a/src/stable/client.h
+++ b/src/stable/client.h
@@ -54,7 +54,6 @@ namespace Bu
54 Bu::size write( uint64_t nData ); 54 Bu::size write( uint64_t nData );
55 Bu::size read( void *pData, Bu::size nBytes ); 55 Bu::size read( void *pData, Bu::size nBytes );
56 Bu::size peek( void *pData, int nBytes, int nOffset=0 ); 56 Bu::size peek( void *pData, int nBytes, int nOffset=0 );
57 void seek( int nBytes );
58 Bu::size getInputSize(); 57 Bu::size getInputSize();
59 Bu::size getOutputSize(); 58 Bu::size getOutputSize();
60 59
diff --git a/src/stable/clientbuf.cpp b/src/stable/clientbuf.cpp
index 493e577..4e59120 100644
--- a/src/stable/clientbuf.cpp
+++ b/src/stable/clientbuf.cpp
@@ -2,10 +2,12 @@
2 2
3#include "bu/mutexlocker.h" 3#include "bu/mutexlocker.h"
4 4
5#include "bu/sio.h"
6
5Bu::ClientBuf::ClientBuf() : 7Bu::ClientBuf::ClientBuf() :
6 accClientRaw( *this ), 8 accClientRaw( *this ),
7 accServer( *this ), 9 accServer( *this ),
8 accClientFiltered( &accClient ), 10 //accClientFiltered( &accClientRaw ),
9 accClient( *this ) 11 accClient( *this )
10{ 12{
11} 13}
@@ -156,7 +158,7 @@ Bu::String Bu::ClientBuf::ClientAccessRaw::getLocation() const
156///////// 158/////////
157// ClientAccess 159// ClientAccess
158/// 160///
159 161#define accClientFiltered accClientRaw
160Bu::ClientBuf::ClientAccess::ClientAccess( Bu::ClientBuf &rBuf ) : 162Bu::ClientBuf::ClientAccess::ClientAccess( Bu::ClientBuf &rBuf ) :
161 rBuf( rBuf ) 163 rBuf( rBuf )
162{ 164{
@@ -173,16 +175,19 @@ void Bu::ClientBuf::ClientAccess::close()
173 175
174Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes ) 176Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes )
175{ 177{
178 Bu::println("ClientAccess::read( ptr, %1 )").arg( iBytes );
176 char *pBuf = (char *)pBufRaw; 179 char *pBuf = (char *)pBufRaw;
177 Bu::MutexLocker l( mAccess ); 180 Bu::MutexLocker l( mAccess );
178 // Read from QueueBuf first 181 // Read from QueueBuf first
179 Bu::size ps = qbPeek.read( pBuf, iBytes ); 182 Bu::size ps = qbPeek.read( pBuf, iBytes );
183 Bu::println("ClientAccess::read: attempted qbPeek, got %1\n>%2<").arg( ps ).arg( Bu::String(pBuf, ps) );
180 iBytes -= ps; 184 iBytes -= ps;
181 pBuf += ps; 185 pBuf += ps;
182 // Request space left? Try the client 186 // Request space left? Try the client
183 if( iBytes > 0 ) 187 if( iBytes > 0 )
184 { 188 {
185 ps += rBuf.accClientFiltered.read( pBuf, iBytes ); 189 ps += rBuf.accClientFiltered.read( pBuf, iBytes );
190 Bu::println("ClientAccess::read: attempted completion from socket buffer, got %1\n>%2<").arg( ps ).arg( Bu::String(pBuf, ps) );
186 } 191 }
187 return ps; 192 return ps;
188} 193}
@@ -190,22 +195,26 @@ Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes )
190Bu::size Bu::ClientBuf::ClientAccess::peek( void *pData, int iBytes, 195Bu::size Bu::ClientBuf::ClientAccess::peek( void *pData, int iBytes,
191 int iOffset ) 196 int iOffset )
192{ 197{
198 Bu::println("ClientAccess::peek( ptr, %1, %2 )").arg( iBytes ).arg( iOffset );
193 Bu::MutexLocker l( mAccess ); 199 Bu::MutexLocker l( mAccess );
194 // Do we have enough data in the peek buffer to handle this? 200 // Do we have enough data in the peek buffer to handle this?
195 if( qbPeek.getSize() < iBytes+iOffset ) 201 if( qbPeek.getSize() < iBytes+iOffset )
196 { 202 {
203 Bu::println("ClientAccess::peek: Insufficient buffered data (%1)").arg( qbPeek.getSize() );
197 // Nope, make an attempt to fill it in. 204 // Nope, make an attempt to fill it in.
198 int nDiff = iBytes-qbPeek.getSize(); 205 int nDiff = iBytes-qbPeek.getSize();
199 // We have to make our own buffer, since iBytes+nOffeset could be bigger 206 // We have to make our own buffer, since iBytes+nOffeset could be bigger
200 // than pData. 207 // than pData.
201 char *pTmp = new char[nDiff]; 208 char *pTmp = new char[nDiff];
202 Bu::size ps = rBuf.accClientFiltered.read( pTmp, nDiff ); 209 Bu::size ps = rBuf.accClientFiltered.read( pTmp, nDiff );
210 Bu::println("ClientAccess::peek: Tried to fill buffer, read %1 of needed %2\n>%3<").arg( ps ).arg( nDiff ).arg( Bu::String(pTmp, ps) );
203 if( ps > 0 ) 211 if( ps > 0 )
204 { 212 {
205 // Add the data read to the peek buffer. 213 // Add the data read to the peek buffer.
206 qbPeek.write( pTmp, ps ); 214 qbPeek.write( pTmp, ps );
207 } 215 }
208 delete[] pTmp; 216 delete[] pTmp;
217 Bu::println("ClientAccess::peek: buffer left with %1").arg( qbPeek.getSize() );
209 } 218 }
210 219
211 return qbPeek.peek( pData, iBytes, iOffset ); 220 return qbPeek.peek( pData, iBytes, iOffset );
@@ -225,13 +234,15 @@ Bu::size Bu::ClientBuf::ClientAccess::tell()
225 234
226void Bu::ClientBuf::ClientAccess::seek( Bu::size offset ) 235void Bu::ClientBuf::ClientAccess::seek( Bu::size offset )
227{ 236{
237 Bu::println("ClientAccess::seek( %1 )").arg( offset );
228 Bu::MutexLocker l( mAccess ); 238 Bu::MutexLocker l( mAccess );
229 // For this type of stream seek is basically a destructive skip. It's like 239 // For this type of stream seek is basically a destructive skip. It's like
230 // reading the data but with no output buffer. Let's remove data from the 240 // reading the data but with no output buffer. Let's remove data from the
231 // peek buffer first. 241 // peek buffer first.
232 if( qbPeek.getSize() > 0 ) 242 if( qbPeek.getSize() > 0 )
233 { 243 {
234 Bu::size amount = Bu::buMax( qbPeek.getSize(), offset ); 244 Bu::size amount = Bu::buMin( qbPeek.getSize(), offset );
245 Bu::println("ClientAccess::seek: buffered: %1, amount: %2").arg( qbPeek.getSize() ).arg( amount );
235 qbPeek.seek( amount ); 246 qbPeek.seek( amount );
236 offset -= amount; 247 offset -= amount;
237 } 248 }
@@ -239,6 +250,7 @@ void Bu::ClientBuf::ClientAccess::seek( Bu::size offset )
239 // If there's offset left, then apply it to the underlying stream 250 // If there's offset left, then apply it to the underlying stream
240 if( offset > 0 ) 251 if( offset > 0 )
241 { 252 {
253 Bu::println("ClientAccess::seek: seeking remaining %1 in socket buffer").arg( offset );
242 rBuf.accClientFiltered.seek( offset ); 254 rBuf.accClientFiltered.seek( offset );
243 } 255 }
244} 256}
@@ -329,6 +341,7 @@ Bu::String Bu::ClientBuf::ClientAccess::getLocation() const
329{ 341{
330 return "ClientBuf"; 342 return "ClientBuf";
331} 343}
344#undef accClientFiltered
332 345
333///////// 346/////////
334// ServerAccess 347// ServerAccess
diff --git a/src/stable/clientbuf.h b/src/stable/clientbuf.h
index 7781b6a..50b6617 100644
--- a/src/stable/clientbuf.h
+++ b/src/stable/clientbuf.h
@@ -164,12 +164,13 @@ namespace Bu
164 private: 164 private:
165 ClientAccessRaw accClientRaw; 165 ClientAccessRaw accClientRaw;
166 ServerAccess accServer; 166 ServerAccess accServer;
167 Bu::StreamStack accClientFiltered; 167 //Bu::StreamStack accClientFiltered;
168 ClientAccess accClient; 168 ClientAccess accClient;
169 Bu::QueueBuf qbOutput; 169 Bu::QueueBuf qbOutput;
170 Bu::QueueBuf qbInput; 170 Bu::QueueBuf qbInput;
171 Bu::Mutex mOutput; 171 Bu::Mutex mOutput;
172 Bu::Mutex mInput; 172 Bu::Mutex mInput;
173
173 friend class Bu::ClientBuf::ClientAccess; 174 friend class Bu::ClientBuf::ClientAccess;
174 friend class Bu::ClientBuf::ClientAccessRaw; 175 friend class Bu::ClientBuf::ClientAccessRaw;
175 friend class Bu::ClientBuf::ServerAccess; 176 friend class Bu::ClientBuf::ServerAccess;
diff --git a/src/stable/counterevent.h b/src/stable/counterevent.h
new file mode 100644
index 0000000..e7ad7a1
--- /dev/null
+++ b/src/stable/counterevent.h
@@ -0,0 +1,154 @@
1/*
2 * Copyright (C) 2007-2019 Xagasoft, All rights reserved.
3 *
4 * This file is part of the libbu++ library and is released under the
5 * terms of the license contained in the file LICENSE.
6 */
7
8#ifndef BU_COUNTER_EVENT_H
9#define BU_COUNTER_EVENT_H
10
11#include <pthread.h>
12
13#include "bu/mutex.h"
14#include "bu/condition.h"
15
16namespace Bu
17{
18 /**
19 * Represents a true/false state that controls thread synchronization. This
20 * is primarilly intended to control the synchronization state of
21 * multithreaded services. For example, telling all threads when to exit.
22 *
23 * An Event is either set or unset. If the Event is unset then it can be
24 * waited on for something to happen. As soon as the Event is set all
25 * waiting threads are released and new requests to wait are ignored until
26 * the Event is cleared again.
27 *
28 * Threads can also be woken up without setting the Event, which may be
29 * handy in certain circumstances.
30 *@ingroup Threading
31 */
32 class CounterEvent
33 {
34 public:
35 CounterEvent() :
36 iCount( 0 )
37 {
38 }
39
40 ~CounterEvent()
41 {
42 }
43
44 /**
45 * Wait indefinitely for the Event to trigger. If the event is already
46 * set, then return immediately. It's important to note that this may
47 * return at any time, not only when the Event is set, so examining the
48 * return value is important.
49 *@returns the set status of the Event.
50 */
51 int wait()
52 {
53 cBlock.lock();
54 if( iCount == 0 )
55 {
56 cBlock.unlock();
57 return iCount;
58 }
59 cBlock.wait();
60 int iRet = iCount;
61 cBlock.unlock();
62 return iRet;
63 }
64
65 /**
66 * Wait for up to nSec seconds and nUSec nanoseconds for the event to
67 * trigger. If the Event is already set then return immediately.
68 *@returns the set status of the Event.
69 */
70 int wait( int nSec, int nUSec )
71 {
72 cBlock.lock();
73 if( iCount == 0 )
74 {
75 cBlock.unlock();
76 return iCount;
77 }
78 cBlock.wait( nSec, nUSec );
79 bool iRet = iCount;
80 cBlock.unlock();
81 return iRet;
82 }
83
84 /**
85 * Allow one of the waiting threads to unlock without updating the set
86 * state of the Event.
87 */
88 void unblockOne()
89 {
90 cBlock.lock();
91 cBlock.signal();
92 cBlock.unlock();
93 }
94
95 /**
96 * Allow all waiting threads to unlock and proceed without updating the
97 * set state of the Event.
98 */
99 void unblockAll()
100 {
101 cBlock.lock();
102 cBlock.broadcast();
103 cBlock.unlock();
104 }
105
106 /**
107 * Find out if the Event is in the set state or not.
108 *@returns True if set, false otherwise.
109 */
110 bool isZero()
111 {
112 cBlock.lock();
113 bool bRet = (iCount == 0);
114 cBlock.unlock();
115 return bRet;
116 }
117
118 /**
119 * Sets the Event's state to true and triggers all waiting threads.
120 */
121 void decrement()
122 {
123 cBlock.lock();
124 iCount--; if( iCount < 0 ) iCount = 0;
125 if( iCount == 0 )
126 cBlock.broadcast();
127 cBlock.unlock();
128 }
129
130 void increment()
131 {
132 cBlock.lock();
133 iCount++;
134 cBlock.unlock();
135 }
136
137 /**
138 * Sets the Event's state to false. This does NOT trigger any waiting
139 * threads.
140 */
141 void clear()
142 {
143 cBlock.lock();
144 iCount = 0;
145 cBlock.unlock();
146 }
147
148 private:
149 Condition cBlock; /**< The condition for blocking dequeues. */
150 int iCount;
151 };
152}
153
154#endif
diff --git a/src/stable/server.cpp b/src/stable/server.cpp
index 592230d..3f03a63 100644
--- a/src/stable/server.cpp
+++ b/src/stable/server.cpp
@@ -14,6 +14,8 @@
14#include "bu/config.h" 14#include "bu/config.h"
15#include "bu/mutexlocker.h" 15#include "bu/mutexlocker.h"
16 16
17#include "bu/sio.h"
18
17#ifdef PROFILE_BU_SERVER 19#ifdef PROFILE_BU_SERVER
18#define BU_PROFILE_START( x ) Bu::Profiler::getInstance().startEvent( x ) 20#define BU_PROFILE_START( x ) Bu::Profiler::getInstance().startEvent( x )
19#define BU_PROFILE_END( x ) Bu::Profiler::getInstance().endEvent( x ) 21#define BU_PROFILE_END( x ) Bu::Profiler::getInstance().endEvent( x )
@@ -39,14 +41,14 @@ Bu::Server::Server( int iIoWorkers, int iClientWorkers ) :
39 41
40 for( int j = 0; j < iIoWorkers; j++ ) 42 for( int j = 0; j < iIoWorkers; j++ )
41 { 43 {
42 IoWorker *pWorker = new IoWorker( *this, qIoEvent, qClientEvent ); 44 IoWorker *pWorker = new IoWorker( *this );
43 lIoWorker.append( pWorker ); 45 lIoWorker.append( pWorker );
44 pWorker->start(); 46 pWorker->start();
45 } 47 }
46 48
47 for( int j = 0; j < iClientWorkers; j++ ) 49 for( int j = 0; j < iClientWorkers; j++ )
48 { 50 {
49 ClientWorker *pWorker = new ClientWorker( *this, qClientEvent ); 51 ClientWorker *pWorker = new ClientWorker( *this );
50 lClientWorker.append( pWorker ); 52 lClientWorker.append( pWorker );
51 pWorker->start(); 53 pWorker->start();
52 } 54 }
@@ -90,7 +92,10 @@ void Bu::Server::scan()
90 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) 92 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ )
91 { 93 {
92 if( (*i)->hasOutput() ) 94 if( (*i)->hasOutput() )
95 {
96 Bu::println("Socket has output...");
93 FD_SET( i.getKey(), &fdWrite ); 97 FD_SET( i.getKey(), &fdWrite );
98 }
94 } 99 }
95 mClients.unlock(); 100 mClients.unlock();
96 101
@@ -105,6 +110,9 @@ void Bu::Server::scan()
105 ); 110 );
106 } 111 }
107 112
113 evIoCycle.clear();
114 Bu::println("Cycle clear");
115
108 for( int j = 0; j < FD_SETSIZE; j++ ) 116 for( int j = 0; j < FD_SETSIZE; j++ )
109 { 117 {
110 if( FD_ISSET( j, &fdRead ) ) 118 if( FD_ISSET( j, &fdRead ) )
@@ -116,15 +124,22 @@ void Bu::Server::scan()
116 } 124 }
117 else 125 else
118 { 126 {
127 evIoCycle.increment();
128 Bu::println("Increment (read)");
119 qIoEvent.enqueue( new Event( j, Event::Read ) ); 129 qIoEvent.enqueue( new Event( j, Event::Read ) );
120 } 130 }
121 } 131 }
122 if( FD_ISSET( j, &fdWrite ) ) 132 if( FD_ISSET( j, &fdWrite ) )
123 { 133 {
134 evIoCycle.increment();
135 Bu::println("Increment (write)");
124 qIoEvent.enqueue( new Event( j, Event::Write ) ); 136 qIoEvent.enqueue( new Event( j, Event::Write ) );
125 } 137 }
126 } 138 }
127 139
140 Bu::println("Waiting");
141 while( evIoCycle.wait() > 0 ) { }
142
128 Bu::List<int> lDelete; 143 Bu::List<int> lDelete;
129 // Now we just try to write all the pending data on all the sockets. 144 // Now we just try to write all the pending data on all the sockets.
130 // this could be done better eventually, if we care about the socket 145 // this could be done better eventually, if we care about the socket
@@ -155,7 +170,7 @@ void Bu::Server::addClient( const Bu::ServerSocket *pSrv, Bu::Socket *pSocket )
155 BU_PROFILE_START("addClient"); 170 BU_PROFILE_START("addClient");
156 int iFdSrv; 171 int iFdSrv;
157 int iFdCli; 172 int iFdCli;
158 if( !pSrv->getFd( iFdSrv ) || !pSrv->getFd( iFdCli ) ) 173 if( !pSrv->getFd( iFdSrv ) || !pSocket->getFd( iFdCli ) )
159 { 174 {
160 throw Bu::ExceptionBase("No file descriptor?"); 175 throw Bu::ExceptionBase("No file descriptor?");
161 } 176 }
@@ -324,12 +339,8 @@ Bu::Server::Event::Operation Bu::Server::Event::getOperation() const
324// IoWorker 339// IoWorker
325//// 340////
326 341
327Bu::Server::IoWorker::IoWorker( Bu::Server &rSrv, 342Bu::Server::IoWorker::IoWorker( Bu::Server &rSrv ) :
328 Bu::Server::EventQueue &qIoEvent, 343 rSrv( rSrv )
329 Bu::Server::EventQueue &qClientEvent ) :
330 rSrv( rSrv ),
331 qIoEvent( qIoEvent ),
332 qClientEvent( qClientEvent )
333{ 344{
334} 345}
335 346
@@ -339,17 +350,21 @@ Bu::Server::IoWorker::~IoWorker()
339 350
340void Bu::Server::IoWorker::run() 351void Bu::Server::IoWorker::run()
341{ 352{
342 while( qIoEvent.isRunning() ) 353 setName("busrv-ioWorker");
354 while( rSrv.qIoEvent.isRunning() )
343 { 355 {
344 Event *pEv = qIoEvent.dequeue(); 356 Event *pEv = rSrv.qIoEvent.dequeue( true );
345 if( pEv == NULL ) 357 if( pEv == NULL )
358 {
346 continue; 359 continue;
360 }
347 361
348 Client *pClient; 362 Client *pClient;
349 Socket *pSocket; 363 Socket *pSocket;
350 if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) ) 364 if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) )
351 { 365 {
352 delete pEv; 366 delete pEv;
367 rSrv.evIoCycle.decrement();
353 continue; 368 continue;
354 } 369 }
355 370
@@ -368,6 +383,9 @@ void Bu::Server::IoWorker::run()
368 } 383 }
369 384
370 delete pEv; 385 delete pEv;
386
387 Bu::println("decrement");
388 rSrv.evIoCycle.decrement();
371 } 389 }
372} 390}
373 391
@@ -377,12 +395,15 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket )
377 Bu::size iRead; 395 Bu::size iRead;
378 Bu::size iTotal=0; 396 Bu::size iTotal=0;
379 397
398 Bu::println("IoWorker::handleRead: starting");
399
380 BU_PROFILE_START("client.read"); 400 BU_PROFILE_START("client.read");
381 for(;;) 401 for(;;)
382 { 402 {
383 try 403 try
384 { 404 {
385 iRead = pSocket->read( buf, RBS ); 405 iRead = pSocket->read( buf, RBS );
406 Bu::println("IoWorker::handleRead: read() -> %1").arg( iRead );
386 407
387 if( iRead == 0 ) 408 if( iRead == 0 )
388 { 409 {
@@ -398,6 +419,7 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket )
398 } 419 }
399 catch( Bu::ExceptionBase &e ) 420 catch( Bu::ExceptionBase &e )
400 { 421 {
422 Bu::println("IoWorker::handleRead: exception, closing: %1").arg( e.what() );
401 close( pSocket ); 423 close( pSocket );
402 break; 424 break;
403 } 425 }
@@ -406,24 +428,27 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket )
406 428
407 if( iTotal == 0 ) 429 if( iTotal == 0 )
408 { 430 {
431 Bu::println("IoWorker::handleRead: read nothing, closing");
409 close( pSocket ); 432 close( pSocket );
410 } 433 }
411 else 434 else
412 { 435 {
413 Bu::Server::fd iFd; 436 Bu::Server::fd iFd;
414 pSocket->getFd( iFd ); 437 pSocket->getFd( iFd );
415 qClientEvent.enqueue( new Event( iFd, Event::Process ) ); 438 rSrv.qClientEvent.enqueue( new Event( iFd, Event::Process ) );
416 } 439 }
417} 440}
418 441
419void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket ) 442void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket )
420{ 443{
444 Bu::println("IoWorker::handleWrite() ");
421 char buf[RBS]; 445 char buf[RBS];
422 if( pClient->hasOutput() > 0 ) 446 if( pClient->hasOutput() > 0 )
423 { 447 {
424 int iAmnt = RBS; 448 int iAmnt = RBS;
425 iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); 449 iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt );
426 int iReal = pSocket->write( buf, iAmnt ); 450 int iReal = pSocket->write( buf, iAmnt );
451 Bu::println("IoWorker::handleWrite: Copy out: iAmnt=%1, iReal=%2\n>%3<").arg( iAmnt ).arg( iReal ).arg( Bu::String( buf, iReal ) );
427 pClient->cbBuffer.server().seek( iReal ); 452 pClient->cbBuffer.server().seek( iReal );
428 } 453 }
429} 454}
@@ -439,10 +464,8 @@ void Bu::Server::IoWorker::close( Bu::Socket *pSocket )
439// ClientWorker 464// ClientWorker
440//// 465////
441 466
442Bu::Server::ClientWorker::ClientWorker( Bu::Server &rSrv, 467Bu::Server::ClientWorker::ClientWorker( Bu::Server &rSrv ) :
443 Bu::Server::EventQueue &qEvent ) : 468 rSrv( rSrv )
444 rSrv( rSrv ),
445 qEvent( qEvent )
446{ 469{
447} 470}
448 471
@@ -452,9 +475,10 @@ Bu::Server::ClientWorker::~ClientWorker()
452 475
453void Bu::Server::ClientWorker::run() 476void Bu::Server::ClientWorker::run()
454{ 477{
455 while( qEvent.isRunning() ) 478 setName("busrv-cntWorker");
479 while( rSrv.qClientEvent.isRunning() )
456 { 480 {
457 Event *pEv = qEvent.dequeue(); 481 Event *pEv = rSrv.qClientEvent.dequeue( true );
458 if( pEv == NULL ) 482 if( pEv == NULL )
459 continue; 483 continue;
460 484
@@ -464,8 +488,10 @@ void Bu::Server::ClientWorker::run()
464 delete pEv; 488 delete pEv;
465 continue; 489 continue;
466 } 490 }
467 491
492 Bu::println("Processing input...");
468 pClient->processInput(); 493 pClient->processInput();
494 Bu::println("Processing input complete.");
469 delete pEv; 495 delete pEv;
470 } 496 }
471} 497}
diff --git a/src/stable/server.h b/src/stable/server.h
index d66d9d5..e2b7d53 100644
--- a/src/stable/server.h
+++ b/src/stable/server.h
@@ -22,6 +22,7 @@
22#include "bu/hash.h" 22#include "bu/hash.h"
23#include "bu/synchroqueue.h" 23#include "bu/synchroqueue.h"
24#include "bu/thread.h" 24#include "bu/thread.h"
25#include "bu/counterevent.h"
25 26
26#include "bu/config.h" 27#include "bu/config.h"
27 28
@@ -137,8 +138,7 @@ namespace Bu
137 class IoWorker : public Bu::Thread 138 class IoWorker : public Bu::Thread
138 { 139 {
139 public: 140 public:
140 IoWorker( Server &rSrv, EventQueue &qIoEvent, 141 IoWorker( Server &rSrv );
141 EventQueue &qClientEvent );
142 virtual ~IoWorker(); 142 virtual ~IoWorker();
143 143
144 protected: 144 protected:
@@ -151,14 +151,13 @@ namespace Bu
151 151
152 private: 152 private:
153 Server &rSrv; 153 Server &rSrv;
154 EventQueue &qIoEvent;
155 EventQueue &qClientEvent;
156 }; 154 };
155 friend class Bu::Server::IoWorker;
157 156
158 class ClientWorker : public Bu::Thread 157 class ClientWorker : public Bu::Thread
159 { 158 {
160 public: 159 public:
161 ClientWorker( Server &rSrv, EventQueue &qEvent ); 160 ClientWorker( Server &rSrv );
162 virtual ~ClientWorker(); 161 virtual ~ClientWorker();
163 162
164 protected: 163 protected:
@@ -166,8 +165,8 @@ namespace Bu
166 165
167 private: 166 private:
168 Server &rSrv; 167 Server &rSrv;
169 EventQueue &qEvent;
170 }; 168 };
169 friend class Bu::Server::ClientWorker;
171 170
172 int nTimeoutSec; 171 int nTimeoutSec;
173 int nTimeoutUSec; 172 int nTimeoutUSec;
@@ -189,6 +188,7 @@ namespace Bu
189 typedef List<ClientWorker *> ClientWorkerList; 188 typedef List<ClientWorker *> ClientWorkerList;
190 IoWorkerList lIoWorker; 189 IoWorkerList lIoWorker;
191 ClientWorkerList lClientWorker; 190 ClientWorkerList lClientWorker;
191 Bu::CounterEvent evIoCycle;
192 }; 192 };
193} 193}
194 194
diff --git a/src/stable/thread.cpp b/src/stable/thread.cpp
index 5fe034a..99239e9 100644
--- a/src/stable/thread.cpp
+++ b/src/stable/thread.cpp
@@ -77,3 +77,8 @@ void Bu::Thread::yield()
77 sched_yield(); 77 sched_yield();
78} 78}
79 79
80void Bu::Thread::setName( const char *sName )
81{
82 pthread_setname_np( ptHandle, sName );
83}
84
diff --git a/src/stable/thread.h b/src/stable/thread.h
index 5174d1c..0a64390 100644
--- a/src/stable/thread.h
+++ b/src/stable/thread.h
@@ -124,6 +124,12 @@ namespace Bu
124 */ 124 */
125 void yield(); 125 void yield();
126 126
127 /**
128 * Sets the name of the thread. The sName parameter must be a C string
129 * with a max length of 16 bytes including null terminator.
130 */
131 void setName( const char *sName );
132
127 private: 133 private:
128 /** 134 /**
129 * This is the hidden-heart of the thread system. While run is what the 135 * This is the hidden-heart of the thread system. While run is what the
diff --git a/src/unstable/protocolwebsocket.cpp b/src/unstable/protocolwebsocket.cpp
index 15b7288..fa2d882 100644
--- a/src/unstable/protocolwebsocket.cpp
+++ b/src/unstable/protocolwebsocket.cpp
@@ -21,7 +21,8 @@
21 21
22#include <stdlib.h> 22#include <stdlib.h>
23 23
24#define DEBUG( X ) { } (void)0 24//#define DEBUG( X ) { } (void)0
25#define DEBUG( X ) { X; } (void)0
25 26
26Bu::ProtocolWebSocket::ProtocolWebSocket() : 27Bu::ProtocolWebSocket::ProtocolWebSocket() :
27 eStatus( stProtoId ) 28 eStatus( stProtoId )