diff options
| author | Mike Buland <eichlan@xagasoft.com> | 2023-07-29 00:19:13 -0700 | 
|---|---|---|
| committer | Mike Buland <eichlan@xagasoft.com> | 2023-07-29 00:19:13 -0700 | 
| commit | 412173a23c88a49ebaeb982e0c7eeddc5662b8cc (patch) | |
| tree | b00519e715b3ffecd342746e433d8e2dc0eb308e /src/stable | |
| parent | 915005e218b5d00939b548de65ce6354f7acb487 (diff) | |
| download | libbu++-412173a23c88a49ebaeb982e0c7eeddc5662b8cc.tar.gz libbu++-412173a23c88a49ebaeb982e0c7eeddc5662b8cc.tar.bz2 libbu++-412173a23c88a49ebaeb982e0c7eeddc5662b8cc.tar.xz libbu++-412173a23c88a49ebaeb982e0c7eeddc5662b8cc.zip | |
Many bugfixes. It's almost working!
Diffstat (limited to 'src/stable')
| -rw-r--r-- | src/stable/client.h | 1 | ||||
| -rw-r--r-- | src/stable/clientbuf.cpp | 19 | ||||
| -rw-r--r-- | src/stable/clientbuf.h | 3 | ||||
| -rw-r--r-- | src/stable/counterevent.h | 154 | ||||
| -rw-r--r-- | src/stable/server.cpp | 64 | ||||
| -rw-r--r-- | src/stable/server.h | 12 | ||||
| -rw-r--r-- | src/stable/thread.cpp | 5 | ||||
| -rw-r--r-- | src/stable/thread.h | 6 | 
8 files changed, 234 insertions, 30 deletions
| diff --git a/src/stable/client.h b/src/stable/client.h index 4cd8e3f..abe807e 100644 --- a/src/stable/client.h +++ b/src/stable/client.h | |||
| @@ -54,7 +54,6 @@ namespace Bu | |||
| 54 | Bu::size write( uint64_t nData ); | 54 | Bu::size write( uint64_t nData ); | 
| 55 | Bu::size read( void *pData, Bu::size nBytes ); | 55 | Bu::size read( void *pData, Bu::size nBytes ); | 
| 56 | Bu::size peek( void *pData, int nBytes, int nOffset=0 ); | 56 | Bu::size peek( void *pData, int nBytes, int nOffset=0 ); | 
| 57 | void seek( int nBytes ); | ||
| 58 | Bu::size getInputSize(); | 57 | Bu::size getInputSize(); | 
| 59 | Bu::size getOutputSize(); | 58 | Bu::size getOutputSize(); | 
| 60 | 59 | ||
| diff --git a/src/stable/clientbuf.cpp b/src/stable/clientbuf.cpp index 493e577..4e59120 100644 --- a/src/stable/clientbuf.cpp +++ b/src/stable/clientbuf.cpp | |||
| @@ -2,10 +2,12 @@ | |||
| 2 | 2 | ||
| 3 | #include "bu/mutexlocker.h" | 3 | #include "bu/mutexlocker.h" | 
| 4 | 4 | ||
| 5 | #include "bu/sio.h" | ||
| 6 | |||
| 5 | Bu::ClientBuf::ClientBuf() : | 7 | Bu::ClientBuf::ClientBuf() : | 
| 6 | accClientRaw( *this ), | 8 | accClientRaw( *this ), | 
| 7 | accServer( *this ), | 9 | accServer( *this ), | 
| 8 | accClientFiltered( &accClient ), | 10 | //accClientFiltered( &accClientRaw ), | 
| 9 | accClient( *this ) | 11 | accClient( *this ) | 
| 10 | { | 12 | { | 
| 11 | } | 13 | } | 
| @@ -156,7 +158,7 @@ Bu::String Bu::ClientBuf::ClientAccessRaw::getLocation() const | |||
| 156 | ///////// | 158 | ///////// | 
| 157 | // ClientAccess | 159 | // ClientAccess | 
| 158 | /// | 160 | /// | 
| 159 | 161 | #define accClientFiltered accClientRaw | |
| 160 | Bu::ClientBuf::ClientAccess::ClientAccess( Bu::ClientBuf &rBuf ) : | 162 | Bu::ClientBuf::ClientAccess::ClientAccess( Bu::ClientBuf &rBuf ) : | 
| 161 | rBuf( rBuf ) | 163 | rBuf( rBuf ) | 
| 162 | { | 164 | { | 
| @@ -173,16 +175,19 @@ void Bu::ClientBuf::ClientAccess::close() | |||
| 173 | 175 | ||
| 174 | Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes ) | 176 | Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes ) | 
| 175 | { | 177 | { | 
| 178 | Bu::println("ClientAccess::read( ptr, %1 )").arg( iBytes ); | ||
| 176 | char *pBuf = (char *)pBufRaw; | 179 | char *pBuf = (char *)pBufRaw; | 
| 177 | Bu::MutexLocker l( mAccess ); | 180 | Bu::MutexLocker l( mAccess ); | 
| 178 | // Read from QueueBuf first | 181 | // Read from QueueBuf first | 
| 179 | Bu::size ps = qbPeek.read( pBuf, iBytes ); | 182 | Bu::size ps = qbPeek.read( pBuf, iBytes ); | 
| 183 | Bu::println("ClientAccess::read: attempted qbPeek, got %1\n>%2<").arg( ps ).arg( Bu::String(pBuf, ps) ); | ||
| 180 | iBytes -= ps; | 184 | iBytes -= ps; | 
| 181 | pBuf += ps; | 185 | pBuf += ps; | 
| 182 | // Request space left? Try the client | 186 | // Request space left? Try the client | 
| 183 | if( iBytes > 0 ) | 187 | if( iBytes > 0 ) | 
| 184 | { | 188 | { | 
| 185 | ps += rBuf.accClientFiltered.read( pBuf, iBytes ); | 189 | ps += rBuf.accClientFiltered.read( pBuf, iBytes ); | 
| 190 | Bu::println("ClientAccess::read: attempted completion from socket buffer, got %1\n>%2<").arg( ps ).arg( Bu::String(pBuf, ps) ); | ||
| 186 | } | 191 | } | 
| 187 | return ps; | 192 | return ps; | 
| 188 | } | 193 | } | 
| @@ -190,22 +195,26 @@ Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes ) | |||
| 190 | Bu::size Bu::ClientBuf::ClientAccess::peek( void *pData, int iBytes, | 195 | Bu::size Bu::ClientBuf::ClientAccess::peek( void *pData, int iBytes, | 
| 191 | int iOffset ) | 196 | int iOffset ) | 
| 192 | { | 197 | { | 
| 198 | Bu::println("ClientAccess::peek( ptr, %1, %2 )").arg( iBytes ).arg( iOffset ); | ||
| 193 | Bu::MutexLocker l( mAccess ); | 199 | Bu::MutexLocker l( mAccess ); | 
| 194 | // Do we have enough data in the peek buffer to handle this? | 200 | // Do we have enough data in the peek buffer to handle this? | 
| 195 | if( qbPeek.getSize() < iBytes+iOffset ) | 201 | if( qbPeek.getSize() < iBytes+iOffset ) | 
| 196 | { | 202 | { | 
| 203 | Bu::println("ClientAccess::peek: Insufficient buffered data (%1)").arg( qbPeek.getSize() ); | ||
| 197 | // Nope, make an attempt to fill it in. | 204 | // Nope, make an attempt to fill it in. | 
| 198 | int nDiff = iBytes-qbPeek.getSize(); | 205 | int nDiff = iBytes-qbPeek.getSize(); | 
| 199 | // We have to make our own buffer, since iBytes+nOffeset could be bigger | 206 | // We have to make our own buffer, since iBytes+nOffeset could be bigger | 
| 200 | // than pData. | 207 | // than pData. | 
| 201 | char *pTmp = new char[nDiff]; | 208 | char *pTmp = new char[nDiff]; | 
| 202 | Bu::size ps = rBuf.accClientFiltered.read( pTmp, nDiff ); | 209 | Bu::size ps = rBuf.accClientFiltered.read( pTmp, nDiff ); | 
| 210 | Bu::println("ClientAccess::peek: Tried to fill buffer, read %1 of needed %2\n>%3<").arg( ps ).arg( nDiff ).arg( Bu::String(pTmp, ps) ); | ||
| 203 | if( ps > 0 ) | 211 | if( ps > 0 ) | 
| 204 | { | 212 | { | 
| 205 | // Add the data read to the peek buffer. | 213 | // Add the data read to the peek buffer. | 
| 206 | qbPeek.write( pTmp, ps ); | 214 | qbPeek.write( pTmp, ps ); | 
| 207 | } | 215 | } | 
| 208 | delete[] pTmp; | 216 | delete[] pTmp; | 
| 217 | Bu::println("ClientAccess::peek: buffer left with %1").arg( qbPeek.getSize() ); | ||
| 209 | } | 218 | } | 
| 210 | 219 | ||
| 211 | return qbPeek.peek( pData, iBytes, iOffset ); | 220 | return qbPeek.peek( pData, iBytes, iOffset ); | 
| @@ -225,13 +234,15 @@ Bu::size Bu::ClientBuf::ClientAccess::tell() | |||
| 225 | 234 | ||
| 226 | void Bu::ClientBuf::ClientAccess::seek( Bu::size offset ) | 235 | void Bu::ClientBuf::ClientAccess::seek( Bu::size offset ) | 
| 227 | { | 236 | { | 
| 237 | Bu::println("ClientAccess::seek( %1 )").arg( offset ); | ||
| 228 | Bu::MutexLocker l( mAccess ); | 238 | Bu::MutexLocker l( mAccess ); | 
| 229 | // For this type of stream seek is basically a destructive skip. It's like | 239 | // For this type of stream seek is basically a destructive skip. It's like | 
| 230 | // reading the data but with no output buffer. Let's remove data from the | 240 | // reading the data but with no output buffer. Let's remove data from the | 
| 231 | // peek buffer first. | 241 | // peek buffer first. | 
| 232 | if( qbPeek.getSize() > 0 ) | 242 | if( qbPeek.getSize() > 0 ) | 
| 233 | { | 243 | { | 
| 234 | Bu::size amount = Bu::buMax( qbPeek.getSize(), offset ); | 244 | Bu::size amount = Bu::buMin( qbPeek.getSize(), offset ); | 
| 245 | Bu::println("ClientAccess::seek: buffered: %1, amount: %2").arg( qbPeek.getSize() ).arg( amount ); | ||
| 235 | qbPeek.seek( amount ); | 246 | qbPeek.seek( amount ); | 
| 236 | offset -= amount; | 247 | offset -= amount; | 
| 237 | } | 248 | } | 
| @@ -239,6 +250,7 @@ void Bu::ClientBuf::ClientAccess::seek( Bu::size offset ) | |||
| 239 | // If there's offset left, then apply it to the underlying stream | 250 | // If there's offset left, then apply it to the underlying stream | 
| 240 | if( offset > 0 ) | 251 | if( offset > 0 ) | 
| 241 | { | 252 | { | 
| 253 | Bu::println("ClientAccess::seek: seeking remaining %1 in socket buffer").arg( offset ); | ||
| 242 | rBuf.accClientFiltered.seek( offset ); | 254 | rBuf.accClientFiltered.seek( offset ); | 
| 243 | } | 255 | } | 
| 244 | } | 256 | } | 
| @@ -329,6 +341,7 @@ Bu::String Bu::ClientBuf::ClientAccess::getLocation() const | |||
| 329 | { | 341 | { | 
| 330 | return "ClientBuf"; | 342 | return "ClientBuf"; | 
| 331 | } | 343 | } | 
| 344 | #undef accClientFiltered | ||
| 332 | 345 | ||
| 333 | ///////// | 346 | ///////// | 
| 334 | // ServerAccess | 347 | // ServerAccess | 
| diff --git a/src/stable/clientbuf.h b/src/stable/clientbuf.h index 7781b6a..50b6617 100644 --- a/src/stable/clientbuf.h +++ b/src/stable/clientbuf.h | |||
| @@ -164,12 +164,13 @@ namespace Bu | |||
| 164 | private: | 164 | private: | 
| 165 | ClientAccessRaw accClientRaw; | 165 | ClientAccessRaw accClientRaw; | 
| 166 | ServerAccess accServer; | 166 | ServerAccess accServer; | 
| 167 | Bu::StreamStack accClientFiltered; | 167 | //Bu::StreamStack accClientFiltered; | 
| 168 | ClientAccess accClient; | 168 | ClientAccess accClient; | 
| 169 | Bu::QueueBuf qbOutput; | 169 | Bu::QueueBuf qbOutput; | 
| 170 | Bu::QueueBuf qbInput; | 170 | Bu::QueueBuf qbInput; | 
| 171 | Bu::Mutex mOutput; | 171 | Bu::Mutex mOutput; | 
| 172 | Bu::Mutex mInput; | 172 | Bu::Mutex mInput; | 
| 173 | |||
| 173 | friend class Bu::ClientBuf::ClientAccess; | 174 | friend class Bu::ClientBuf::ClientAccess; | 
| 174 | friend class Bu::ClientBuf::ClientAccessRaw; | 175 | friend class Bu::ClientBuf::ClientAccessRaw; | 
| 175 | friend class Bu::ClientBuf::ServerAccess; | 176 | friend class Bu::ClientBuf::ServerAccess; | 
| diff --git a/src/stable/counterevent.h b/src/stable/counterevent.h new file mode 100644 index 0000000..e7ad7a1 --- /dev/null +++ b/src/stable/counterevent.h | |||
| @@ -0,0 +1,154 @@ | |||
| 1 | /* | ||
| 2 | * Copyright (C) 2007-2019 Xagasoft, All rights reserved. | ||
| 3 | * | ||
| 4 | * This file is part of the libbu++ library and is released under the | ||
| 5 | * terms of the license contained in the file LICENSE. | ||
| 6 | */ | ||
| 7 | |||
| 8 | #ifndef BU_COUNTER_EVENT_H | ||
| 9 | #define BU_COUNTER_EVENT_H | ||
| 10 | |||
| 11 | #include <pthread.h> | ||
| 12 | |||
| 13 | #include "bu/mutex.h" | ||
| 14 | #include "bu/condition.h" | ||
| 15 | |||
| 16 | namespace Bu | ||
| 17 | { | ||
| 18 | /** | ||
| 19 | * Represents a true/false state that controls thread synchronization. This | ||
| 20 | * is primarilly intended to control the synchronization state of | ||
| 21 | * multithreaded services. For example, telling all threads when to exit. | ||
| 22 | * | ||
| 23 | * An Event is either set or unset. If the Event is unset then it can be | ||
| 24 | * waited on for something to happen. As soon as the Event is set all | ||
| 25 | * waiting threads are released and new requests to wait are ignored until | ||
| 26 | * the Event is cleared again. | ||
| 27 | * | ||
| 28 | * Threads can also be woken up without setting the Event, which may be | ||
| 29 | * handy in certain circumstances. | ||
| 30 | *@ingroup Threading | ||
| 31 | */ | ||
| 32 | class CounterEvent | ||
| 33 | { | ||
| 34 | public: | ||
| 35 | CounterEvent() : | ||
| 36 | iCount( 0 ) | ||
| 37 | { | ||
| 38 | } | ||
| 39 | |||
| 40 | ~CounterEvent() | ||
| 41 | { | ||
| 42 | } | ||
| 43 | |||
| 44 | /** | ||
| 45 | * Wait indefinitely for the Event to trigger. If the event is already | ||
| 46 | * set, then return immediately. It's important to note that this may | ||
| 47 | * return at any time, not only when the Event is set, so examining the | ||
| 48 | * return value is important. | ||
| 49 | *@returns the set status of the Event. | ||
| 50 | */ | ||
| 51 | int wait() | ||
| 52 | { | ||
| 53 | cBlock.lock(); | ||
| 54 | if( iCount == 0 ) | ||
| 55 | { | ||
| 56 | cBlock.unlock(); | ||
| 57 | return iCount; | ||
| 58 | } | ||
| 59 | cBlock.wait(); | ||
| 60 | int iRet = iCount; | ||
| 61 | cBlock.unlock(); | ||
| 62 | return iRet; | ||
| 63 | } | ||
| 64 | |||
| 65 | /** | ||
| 66 | * Wait for up to nSec seconds and nUSec nanoseconds for the event to | ||
| 67 | * trigger. If the Event is already set then return immediately. | ||
| 68 | *@returns the set status of the Event. | ||
| 69 | */ | ||
| 70 | int wait( int nSec, int nUSec ) | ||
| 71 | { | ||
| 72 | cBlock.lock(); | ||
| 73 | if( iCount == 0 ) | ||
| 74 | { | ||
| 75 | cBlock.unlock(); | ||
| 76 | return iCount; | ||
| 77 | } | ||
| 78 | cBlock.wait( nSec, nUSec ); | ||
| 79 | bool iRet = iCount; | ||
| 80 | cBlock.unlock(); | ||
| 81 | return iRet; | ||
| 82 | } | ||
| 83 | |||
| 84 | /** | ||
| 85 | * Allow one of the waiting threads to unlock without updating the set | ||
| 86 | * state of the Event. | ||
| 87 | */ | ||
| 88 | void unblockOne() | ||
| 89 | { | ||
| 90 | cBlock.lock(); | ||
| 91 | cBlock.signal(); | ||
| 92 | cBlock.unlock(); | ||
| 93 | } | ||
| 94 | |||
| 95 | /** | ||
| 96 | * Allow all waiting threads to unlock and proceed without updating the | ||
| 97 | * set state of the Event. | ||
| 98 | */ | ||
| 99 | void unblockAll() | ||
| 100 | { | ||
| 101 | cBlock.lock(); | ||
| 102 | cBlock.broadcast(); | ||
| 103 | cBlock.unlock(); | ||
| 104 | } | ||
| 105 | |||
| 106 | /** | ||
| 107 | * Find out if the Event is in the set state or not. | ||
| 108 | *@returns True if set, false otherwise. | ||
| 109 | */ | ||
| 110 | bool isZero() | ||
| 111 | { | ||
| 112 | cBlock.lock(); | ||
| 113 | bool bRet = (iCount == 0); | ||
| 114 | cBlock.unlock(); | ||
| 115 | return bRet; | ||
| 116 | } | ||
| 117 | |||
| 118 | /** | ||
| 119 | * Sets the Event's state to true and triggers all waiting threads. | ||
| 120 | */ | ||
| 121 | void decrement() | ||
| 122 | { | ||
| 123 | cBlock.lock(); | ||
| 124 | iCount--; if( iCount < 0 ) iCount = 0; | ||
| 125 | if( iCount == 0 ) | ||
| 126 | cBlock.broadcast(); | ||
| 127 | cBlock.unlock(); | ||
| 128 | } | ||
| 129 | |||
| 130 | void increment() | ||
| 131 | { | ||
| 132 | cBlock.lock(); | ||
| 133 | iCount++; | ||
| 134 | cBlock.unlock(); | ||
| 135 | } | ||
| 136 | |||
| 137 | /** | ||
| 138 | * Sets the Event's state to false. This does NOT trigger any waiting | ||
| 139 | * threads. | ||
| 140 | */ | ||
| 141 | void clear() | ||
| 142 | { | ||
| 143 | cBlock.lock(); | ||
| 144 | iCount = 0; | ||
| 145 | cBlock.unlock(); | ||
| 146 | } | ||
| 147 | |||
| 148 | private: | ||
| 149 | Condition cBlock; /**< The condition for blocking dequeues. */ | ||
| 150 | int iCount; | ||
| 151 | }; | ||
| 152 | } | ||
| 153 | |||
| 154 | #endif | ||
| diff --git a/src/stable/server.cpp b/src/stable/server.cpp index 592230d..3f03a63 100644 --- a/src/stable/server.cpp +++ b/src/stable/server.cpp | |||
| @@ -14,6 +14,8 @@ | |||
| 14 | #include "bu/config.h" | 14 | #include "bu/config.h" | 
| 15 | #include "bu/mutexlocker.h" | 15 | #include "bu/mutexlocker.h" | 
| 16 | 16 | ||
| 17 | #include "bu/sio.h" | ||
| 18 | |||
| 17 | #ifdef PROFILE_BU_SERVER | 19 | #ifdef PROFILE_BU_SERVER | 
| 18 | #define BU_PROFILE_START( x ) Bu::Profiler::getInstance().startEvent( x ) | 20 | #define BU_PROFILE_START( x ) Bu::Profiler::getInstance().startEvent( x ) | 
| 19 | #define BU_PROFILE_END( x ) Bu::Profiler::getInstance().endEvent( x ) | 21 | #define BU_PROFILE_END( x ) Bu::Profiler::getInstance().endEvent( x ) | 
| @@ -39,14 +41,14 @@ Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : | |||
| 39 | 41 | ||
| 40 | for( int j = 0; j < iIoWorkers; j++ ) | 42 | for( int j = 0; j < iIoWorkers; j++ ) | 
| 41 | { | 43 | { | 
| 42 | IoWorker *pWorker = new IoWorker( *this, qIoEvent, qClientEvent ); | 44 | IoWorker *pWorker = new IoWorker( *this ); | 
| 43 | lIoWorker.append( pWorker ); | 45 | lIoWorker.append( pWorker ); | 
| 44 | pWorker->start(); | 46 | pWorker->start(); | 
| 45 | } | 47 | } | 
| 46 | 48 | ||
| 47 | for( int j = 0; j < iClientWorkers; j++ ) | 49 | for( int j = 0; j < iClientWorkers; j++ ) | 
| 48 | { | 50 | { | 
| 49 | ClientWorker *pWorker = new ClientWorker( *this, qClientEvent ); | 51 | ClientWorker *pWorker = new ClientWorker( *this ); | 
| 50 | lClientWorker.append( pWorker ); | 52 | lClientWorker.append( pWorker ); | 
| 51 | pWorker->start(); | 53 | pWorker->start(); | 
| 52 | } | 54 | } | 
| @@ -90,7 +92,10 @@ void Bu::Server::scan() | |||
| 90 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) | 92 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) | 
| 91 | { | 93 | { | 
| 92 | if( (*i)->hasOutput() ) | 94 | if( (*i)->hasOutput() ) | 
| 95 | { | ||
| 96 | Bu::println("Socket has output..."); | ||
| 93 | FD_SET( i.getKey(), &fdWrite ); | 97 | FD_SET( i.getKey(), &fdWrite ); | 
| 98 | } | ||
| 94 | } | 99 | } | 
| 95 | mClients.unlock(); | 100 | mClients.unlock(); | 
| 96 | 101 | ||
| @@ -105,6 +110,9 @@ void Bu::Server::scan() | |||
| 105 | ); | 110 | ); | 
| 106 | } | 111 | } | 
| 107 | 112 | ||
| 113 | evIoCycle.clear(); | ||
| 114 | Bu::println("Cycle clear"); | ||
| 115 | |||
| 108 | for( int j = 0; j < FD_SETSIZE; j++ ) | 116 | for( int j = 0; j < FD_SETSIZE; j++ ) | 
| 109 | { | 117 | { | 
| 110 | if( FD_ISSET( j, &fdRead ) ) | 118 | if( FD_ISSET( j, &fdRead ) ) | 
| @@ -116,15 +124,22 @@ void Bu::Server::scan() | |||
| 116 | } | 124 | } | 
| 117 | else | 125 | else | 
| 118 | { | 126 | { | 
| 127 | evIoCycle.increment(); | ||
| 128 | Bu::println("Increment (read)"); | ||
| 119 | qIoEvent.enqueue( new Event( j, Event::Read ) ); | 129 | qIoEvent.enqueue( new Event( j, Event::Read ) ); | 
| 120 | } | 130 | } | 
| 121 | } | 131 | } | 
| 122 | if( FD_ISSET( j, &fdWrite ) ) | 132 | if( FD_ISSET( j, &fdWrite ) ) | 
| 123 | { | 133 | { | 
| 134 | evIoCycle.increment(); | ||
| 135 | Bu::println("Increment (write)"); | ||
| 124 | qIoEvent.enqueue( new Event( j, Event::Write ) ); | 136 | qIoEvent.enqueue( new Event( j, Event::Write ) ); | 
| 125 | } | 137 | } | 
| 126 | } | 138 | } | 
| 127 | 139 | ||
| 140 | Bu::println("Waiting"); | ||
| 141 | while( evIoCycle.wait() > 0 ) { } | ||
| 142 | |||
| 128 | Bu::List<int> lDelete; | 143 | Bu::List<int> lDelete; | 
| 129 | // Now we just try to write all the pending data on all the sockets. | 144 | // Now we just try to write all the pending data on all the sockets. | 
| 130 | // this could be done better eventually, if we care about the socket | 145 | // this could be done better eventually, if we care about the socket | 
| @@ -155,7 +170,7 @@ void Bu::Server::addClient( const Bu::ServerSocket *pSrv, Bu::Socket *pSocket ) | |||
| 155 | BU_PROFILE_START("addClient"); | 170 | BU_PROFILE_START("addClient"); | 
| 156 | int iFdSrv; | 171 | int iFdSrv; | 
| 157 | int iFdCli; | 172 | int iFdCli; | 
| 158 | if( !pSrv->getFd( iFdSrv ) || !pSrv->getFd( iFdCli ) ) | 173 | if( !pSrv->getFd( iFdSrv ) || !pSocket->getFd( iFdCli ) ) | 
| 159 | { | 174 | { | 
| 160 | throw Bu::ExceptionBase("No file descriptor?"); | 175 | throw Bu::ExceptionBase("No file descriptor?"); | 
| 161 | } | 176 | } | 
| @@ -324,12 +339,8 @@ Bu::Server::Event::Operation Bu::Server::Event::getOperation() const | |||
| 324 | // IoWorker | 339 | // IoWorker | 
| 325 | //// | 340 | //// | 
| 326 | 341 | ||
| 327 | Bu::Server::IoWorker::IoWorker( Bu::Server &rSrv, | 342 | Bu::Server::IoWorker::IoWorker( Bu::Server &rSrv ) : | 
| 328 | Bu::Server::EventQueue &qIoEvent, | 343 | rSrv( rSrv ) | 
| 329 | Bu::Server::EventQueue &qClientEvent ) : | ||
| 330 | rSrv( rSrv ), | ||
| 331 | qIoEvent( qIoEvent ), | ||
| 332 | qClientEvent( qClientEvent ) | ||
| 333 | { | 344 | { | 
| 334 | } | 345 | } | 
| 335 | 346 | ||
| @@ -339,17 +350,21 @@ Bu::Server::IoWorker::~IoWorker() | |||
| 339 | 350 | ||
| 340 | void Bu::Server::IoWorker::run() | 351 | void Bu::Server::IoWorker::run() | 
| 341 | { | 352 | { | 
| 342 | while( qIoEvent.isRunning() ) | 353 | setName("busrv-ioWorker"); | 
| 354 | while( rSrv.qIoEvent.isRunning() ) | ||
| 343 | { | 355 | { | 
| 344 | Event *pEv = qIoEvent.dequeue(); | 356 | Event *pEv = rSrv.qIoEvent.dequeue( true ); | 
| 345 | if( pEv == NULL ) | 357 | if( pEv == NULL ) | 
| 358 | { | ||
| 346 | continue; | 359 | continue; | 
| 360 | } | ||
| 347 | 361 | ||
| 348 | Client *pClient; | 362 | Client *pClient; | 
| 349 | Socket *pSocket; | 363 | Socket *pSocket; | 
| 350 | if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) ) | 364 | if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) ) | 
| 351 | { | 365 | { | 
| 352 | delete pEv; | 366 | delete pEv; | 
| 367 | rSrv.evIoCycle.decrement(); | ||
| 353 | continue; | 368 | continue; | 
| 354 | } | 369 | } | 
| 355 | 370 | ||
| @@ -368,6 +383,9 @@ void Bu::Server::IoWorker::run() | |||
| 368 | } | 383 | } | 
| 369 | 384 | ||
| 370 | delete pEv; | 385 | delete pEv; | 
| 386 | |||
| 387 | Bu::println("decrement"); | ||
| 388 | rSrv.evIoCycle.decrement(); | ||
| 371 | } | 389 | } | 
| 372 | } | 390 | } | 
| 373 | 391 | ||
| @@ -377,12 +395,15 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) | |||
| 377 | Bu::size iRead; | 395 | Bu::size iRead; | 
| 378 | Bu::size iTotal=0; | 396 | Bu::size iTotal=0; | 
| 379 | 397 | ||
| 398 | Bu::println("IoWorker::handleRead: starting"); | ||
| 399 | |||
| 380 | BU_PROFILE_START("client.read"); | 400 | BU_PROFILE_START("client.read"); | 
| 381 | for(;;) | 401 | for(;;) | 
| 382 | { | 402 | { | 
| 383 | try | 403 | try | 
| 384 | { | 404 | { | 
| 385 | iRead = pSocket->read( buf, RBS ); | 405 | iRead = pSocket->read( buf, RBS ); | 
| 406 | Bu::println("IoWorker::handleRead: read() -> %1").arg( iRead ); | ||
| 386 | 407 | ||
| 387 | if( iRead == 0 ) | 408 | if( iRead == 0 ) | 
| 388 | { | 409 | { | 
| @@ -398,6 +419,7 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) | |||
| 398 | } | 419 | } | 
| 399 | catch( Bu::ExceptionBase &e ) | 420 | catch( Bu::ExceptionBase &e ) | 
| 400 | { | 421 | { | 
| 422 | Bu::println("IoWorker::handleRead: exception, closing: %1").arg( e.what() ); | ||
| 401 | close( pSocket ); | 423 | close( pSocket ); | 
| 402 | break; | 424 | break; | 
| 403 | } | 425 | } | 
| @@ -406,24 +428,27 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) | |||
| 406 | 428 | ||
| 407 | if( iTotal == 0 ) | 429 | if( iTotal == 0 ) | 
| 408 | { | 430 | { | 
| 431 | Bu::println("IoWorker::handleRead: read nothing, closing"); | ||
| 409 | close( pSocket ); | 432 | close( pSocket ); | 
| 410 | } | 433 | } | 
| 411 | else | 434 | else | 
| 412 | { | 435 | { | 
| 413 | Bu::Server::fd iFd; | 436 | Bu::Server::fd iFd; | 
| 414 | pSocket->getFd( iFd ); | 437 | pSocket->getFd( iFd ); | 
| 415 | qClientEvent.enqueue( new Event( iFd, Event::Process ) ); | 438 | rSrv.qClientEvent.enqueue( new Event( iFd, Event::Process ) ); | 
| 416 | } | 439 | } | 
| 417 | } | 440 | } | 
| 418 | 441 | ||
| 419 | void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket ) | 442 | void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket ) | 
| 420 | { | 443 | { | 
| 444 | Bu::println("IoWorker::handleWrite() "); | ||
| 421 | char buf[RBS]; | 445 | char buf[RBS]; | 
| 422 | if( pClient->hasOutput() > 0 ) | 446 | if( pClient->hasOutput() > 0 ) | 
| 423 | { | 447 | { | 
| 424 | int iAmnt = RBS; | 448 | int iAmnt = RBS; | 
| 425 | iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); | 449 | iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); | 
| 426 | int iReal = pSocket->write( buf, iAmnt ); | 450 | int iReal = pSocket->write( buf, iAmnt ); | 
| 451 | Bu::println("IoWorker::handleWrite: Copy out: iAmnt=%1, iReal=%2\n>%3<").arg( iAmnt ).arg( iReal ).arg( Bu::String( buf, iReal ) ); | ||
| 427 | pClient->cbBuffer.server().seek( iReal ); | 452 | pClient->cbBuffer.server().seek( iReal ); | 
| 428 | } | 453 | } | 
| 429 | } | 454 | } | 
| @@ -439,10 +464,8 @@ void Bu::Server::IoWorker::close( Bu::Socket *pSocket ) | |||
| 439 | // ClientWorker | 464 | // ClientWorker | 
| 440 | //// | 465 | //// | 
| 441 | 466 | ||
| 442 | Bu::Server::ClientWorker::ClientWorker( Bu::Server &rSrv, | 467 | Bu::Server::ClientWorker::ClientWorker( Bu::Server &rSrv ) : | 
| 443 | Bu::Server::EventQueue &qEvent ) : | 468 | rSrv( rSrv ) | 
| 444 | rSrv( rSrv ), | ||
| 445 | qEvent( qEvent ) | ||
| 446 | { | 469 | { | 
| 447 | } | 470 | } | 
| 448 | 471 | ||
| @@ -452,9 +475,10 @@ Bu::Server::ClientWorker::~ClientWorker() | |||
| 452 | 475 | ||
| 453 | void Bu::Server::ClientWorker::run() | 476 | void Bu::Server::ClientWorker::run() | 
| 454 | { | 477 | { | 
| 455 | while( qEvent.isRunning() ) | 478 | setName("busrv-cntWorker"); | 
| 479 | while( rSrv.qClientEvent.isRunning() ) | ||
| 456 | { | 480 | { | 
| 457 | Event *pEv = qEvent.dequeue(); | 481 | Event *pEv = rSrv.qClientEvent.dequeue( true ); | 
| 458 | if( pEv == NULL ) | 482 | if( pEv == NULL ) | 
| 459 | continue; | 483 | continue; | 
| 460 | 484 | ||
| @@ -464,8 +488,10 @@ void Bu::Server::ClientWorker::run() | |||
| 464 | delete pEv; | 488 | delete pEv; | 
| 465 | continue; | 489 | continue; | 
| 466 | } | 490 | } | 
| 467 | 491 | ||
| 492 | Bu::println("Processing input..."); | ||
| 468 | pClient->processInput(); | 493 | pClient->processInput(); | 
| 494 | Bu::println("Processing input complete."); | ||
| 469 | delete pEv; | 495 | delete pEv; | 
| 470 | } | 496 | } | 
| 471 | } | 497 | } | 
| diff --git a/src/stable/server.h b/src/stable/server.h index d66d9d5..e2b7d53 100644 --- a/src/stable/server.h +++ b/src/stable/server.h | |||
| @@ -22,6 +22,7 @@ | |||
| 22 | #include "bu/hash.h" | 22 | #include "bu/hash.h" | 
| 23 | #include "bu/synchroqueue.h" | 23 | #include "bu/synchroqueue.h" | 
| 24 | #include "bu/thread.h" | 24 | #include "bu/thread.h" | 
| 25 | #include "bu/counterevent.h" | ||
| 25 | 26 | ||
| 26 | #include "bu/config.h" | 27 | #include "bu/config.h" | 
| 27 | 28 | ||
| @@ -137,8 +138,7 @@ namespace Bu | |||
| 137 | class IoWorker : public Bu::Thread | 138 | class IoWorker : public Bu::Thread | 
| 138 | { | 139 | { | 
| 139 | public: | 140 | public: | 
| 140 | IoWorker( Server &rSrv, EventQueue &qIoEvent, | 141 | IoWorker( Server &rSrv ); | 
| 141 | EventQueue &qClientEvent ); | ||
| 142 | virtual ~IoWorker(); | 142 | virtual ~IoWorker(); | 
| 143 | 143 | ||
| 144 | protected: | 144 | protected: | 
| @@ -151,14 +151,13 @@ namespace Bu | |||
| 151 | 151 | ||
| 152 | private: | 152 | private: | 
| 153 | Server &rSrv; | 153 | Server &rSrv; | 
| 154 | EventQueue &qIoEvent; | ||
| 155 | EventQueue &qClientEvent; | ||
| 156 | }; | 154 | }; | 
| 155 | friend class Bu::Server::IoWorker; | ||
| 157 | 156 | ||
| 158 | class ClientWorker : public Bu::Thread | 157 | class ClientWorker : public Bu::Thread | 
| 159 | { | 158 | { | 
| 160 | public: | 159 | public: | 
| 161 | ClientWorker( Server &rSrv, EventQueue &qEvent ); | 160 | ClientWorker( Server &rSrv ); | 
| 162 | virtual ~ClientWorker(); | 161 | virtual ~ClientWorker(); | 
| 163 | 162 | ||
| 164 | protected: | 163 | protected: | 
| @@ -166,8 +165,8 @@ namespace Bu | |||
| 166 | 165 | ||
| 167 | private: | 166 | private: | 
| 168 | Server &rSrv; | 167 | Server &rSrv; | 
| 169 | EventQueue &qEvent; | ||
| 170 | }; | 168 | }; | 
| 169 | friend class Bu::Server::ClientWorker; | ||
| 171 | 170 | ||
| 172 | int nTimeoutSec; | 171 | int nTimeoutSec; | 
| 173 | int nTimeoutUSec; | 172 | int nTimeoutUSec; | 
| @@ -189,6 +188,7 @@ namespace Bu | |||
| 189 | typedef List<ClientWorker *> ClientWorkerList; | 188 | typedef List<ClientWorker *> ClientWorkerList; | 
| 190 | IoWorkerList lIoWorker; | 189 | IoWorkerList lIoWorker; | 
| 191 | ClientWorkerList lClientWorker; | 190 | ClientWorkerList lClientWorker; | 
| 191 | Bu::CounterEvent evIoCycle; | ||
| 192 | }; | 192 | }; | 
| 193 | } | 193 | } | 
| 194 | 194 | ||
| diff --git a/src/stable/thread.cpp b/src/stable/thread.cpp index 5fe034a..99239e9 100644 --- a/src/stable/thread.cpp +++ b/src/stable/thread.cpp | |||
| @@ -77,3 +77,8 @@ void Bu::Thread::yield() | |||
| 77 | sched_yield(); | 77 | sched_yield(); | 
| 78 | } | 78 | } | 
| 79 | 79 | ||
| 80 | void Bu::Thread::setName( const char *sName ) | ||
| 81 | { | ||
| 82 | pthread_setname_np( ptHandle, sName ); | ||
| 83 | } | ||
| 84 | |||
| diff --git a/src/stable/thread.h b/src/stable/thread.h index 5174d1c..0a64390 100644 --- a/src/stable/thread.h +++ b/src/stable/thread.h | |||
| @@ -124,6 +124,12 @@ namespace Bu | |||
| 124 | */ | 124 | */ | 
| 125 | void yield(); | 125 | void yield(); | 
| 126 | 126 | ||
| 127 | /** | ||
| 128 | * Sets the name of the thread. The sName parameter must be a C string | ||
| 129 | * with a max length of 16 bytes including null terminator. | ||
| 130 | */ | ||
| 131 | void setName( const char *sName ); | ||
| 132 | |||
| 127 | private: | 133 | private: | 
| 128 | /** | 134 | /** | 
| 129 | * This is the hidden-heart of the thread system. While run is what the | 135 | * This is the hidden-heart of the thread system. While run is what the | 
