diff options
author | Mike Buland <eichlan@xagasoft.com> | 2023-07-29 00:19:13 -0700 |
---|---|---|
committer | Mike Buland <eichlan@xagasoft.com> | 2023-07-29 00:19:13 -0700 |
commit | 412173a23c88a49ebaeb982e0c7eeddc5662b8cc (patch) | |
tree | b00519e715b3ffecd342746e433d8e2dc0eb308e | |
parent | 915005e218b5d00939b548de65ce6354f7acb487 (diff) | |
download | libbu++-412173a23c88a49ebaeb982e0c7eeddc5662b8cc.tar.gz libbu++-412173a23c88a49ebaeb982e0c7eeddc5662b8cc.tar.bz2 libbu++-412173a23c88a49ebaeb982e0c7eeddc5662b8cc.tar.xz libbu++-412173a23c88a49ebaeb982e0c7eeddc5662b8cc.zip |
Many bugfixes. It's almost working!
-rw-r--r-- | src/stable/client.h | 1 | ||||
-rw-r--r-- | src/stable/clientbuf.cpp | 19 | ||||
-rw-r--r-- | src/stable/clientbuf.h | 3 | ||||
-rw-r--r-- | src/stable/counterevent.h | 154 | ||||
-rw-r--r-- | src/stable/server.cpp | 64 | ||||
-rw-r--r-- | src/stable/server.h | 12 | ||||
-rw-r--r-- | src/stable/thread.cpp | 5 | ||||
-rw-r--r-- | src/stable/thread.h | 6 | ||||
-rw-r--r-- | src/unstable/protocolwebsocket.cpp | 3 |
9 files changed, 236 insertions, 31 deletions
diff --git a/src/stable/client.h b/src/stable/client.h index 4cd8e3f..abe807e 100644 --- a/src/stable/client.h +++ b/src/stable/client.h | |||
@@ -54,7 +54,6 @@ namespace Bu | |||
54 | Bu::size write( uint64_t nData ); | 54 | Bu::size write( uint64_t nData ); |
55 | Bu::size read( void *pData, Bu::size nBytes ); | 55 | Bu::size read( void *pData, Bu::size nBytes ); |
56 | Bu::size peek( void *pData, int nBytes, int nOffset=0 ); | 56 | Bu::size peek( void *pData, int nBytes, int nOffset=0 ); |
57 | void seek( int nBytes ); | ||
58 | Bu::size getInputSize(); | 57 | Bu::size getInputSize(); |
59 | Bu::size getOutputSize(); | 58 | Bu::size getOutputSize(); |
60 | 59 | ||
diff --git a/src/stable/clientbuf.cpp b/src/stable/clientbuf.cpp index 493e577..4e59120 100644 --- a/src/stable/clientbuf.cpp +++ b/src/stable/clientbuf.cpp | |||
@@ -2,10 +2,12 @@ | |||
2 | 2 | ||
3 | #include "bu/mutexlocker.h" | 3 | #include "bu/mutexlocker.h" |
4 | 4 | ||
5 | #include "bu/sio.h" | ||
6 | |||
5 | Bu::ClientBuf::ClientBuf() : | 7 | Bu::ClientBuf::ClientBuf() : |
6 | accClientRaw( *this ), | 8 | accClientRaw( *this ), |
7 | accServer( *this ), | 9 | accServer( *this ), |
8 | accClientFiltered( &accClient ), | 10 | //accClientFiltered( &accClientRaw ), |
9 | accClient( *this ) | 11 | accClient( *this ) |
10 | { | 12 | { |
11 | } | 13 | } |
@@ -156,7 +158,7 @@ Bu::String Bu::ClientBuf::ClientAccessRaw::getLocation() const | |||
156 | ///////// | 158 | ///////// |
157 | // ClientAccess | 159 | // ClientAccess |
158 | /// | 160 | /// |
159 | 161 | #define accClientFiltered accClientRaw | |
160 | Bu::ClientBuf::ClientAccess::ClientAccess( Bu::ClientBuf &rBuf ) : | 162 | Bu::ClientBuf::ClientAccess::ClientAccess( Bu::ClientBuf &rBuf ) : |
161 | rBuf( rBuf ) | 163 | rBuf( rBuf ) |
162 | { | 164 | { |
@@ -173,16 +175,19 @@ void Bu::ClientBuf::ClientAccess::close() | |||
173 | 175 | ||
174 | Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes ) | 176 | Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes ) |
175 | { | 177 | { |
178 | Bu::println("ClientAccess::read( ptr, %1 )").arg( iBytes ); | ||
176 | char *pBuf = (char *)pBufRaw; | 179 | char *pBuf = (char *)pBufRaw; |
177 | Bu::MutexLocker l( mAccess ); | 180 | Bu::MutexLocker l( mAccess ); |
178 | // Read from QueueBuf first | 181 | // Read from QueueBuf first |
179 | Bu::size ps = qbPeek.read( pBuf, iBytes ); | 182 | Bu::size ps = qbPeek.read( pBuf, iBytes ); |
183 | Bu::println("ClientAccess::read: attempted qbPeek, got %1\n>%2<").arg( ps ).arg( Bu::String(pBuf, ps) ); | ||
180 | iBytes -= ps; | 184 | iBytes -= ps; |
181 | pBuf += ps; | 185 | pBuf += ps; |
182 | // Request space left? Try the client | 186 | // Request space left? Try the client |
183 | if( iBytes > 0 ) | 187 | if( iBytes > 0 ) |
184 | { | 188 | { |
185 | ps += rBuf.accClientFiltered.read( pBuf, iBytes ); | 189 | ps += rBuf.accClientFiltered.read( pBuf, iBytes ); |
190 | Bu::println("ClientAccess::read: attempted completion from socket buffer, got %1\n>%2<").arg( ps ).arg( Bu::String(pBuf, ps) ); | ||
186 | } | 191 | } |
187 | return ps; | 192 | return ps; |
188 | } | 193 | } |
@@ -190,22 +195,26 @@ Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes ) | |||
190 | Bu::size Bu::ClientBuf::ClientAccess::peek( void *pData, int iBytes, | 195 | Bu::size Bu::ClientBuf::ClientAccess::peek( void *pData, int iBytes, |
191 | int iOffset ) | 196 | int iOffset ) |
192 | { | 197 | { |
198 | Bu::println("ClientAccess::peek( ptr, %1, %2 )").arg( iBytes ).arg( iOffset ); | ||
193 | Bu::MutexLocker l( mAccess ); | 199 | Bu::MutexLocker l( mAccess ); |
194 | // Do we have enough data in the peek buffer to handle this? | 200 | // Do we have enough data in the peek buffer to handle this? |
195 | if( qbPeek.getSize() < iBytes+iOffset ) | 201 | if( qbPeek.getSize() < iBytes+iOffset ) |
196 | { | 202 | { |
203 | Bu::println("ClientAccess::peek: Insufficient buffered data (%1)").arg( qbPeek.getSize() ); | ||
197 | // Nope, make an attempt to fill it in. | 204 | // Nope, make an attempt to fill it in. |
198 | int nDiff = iBytes-qbPeek.getSize(); | 205 | int nDiff = iBytes-qbPeek.getSize(); |
199 | // We have to make our own buffer, since iBytes+nOffeset could be bigger | 206 | // We have to make our own buffer, since iBytes+nOffeset could be bigger |
200 | // than pData. | 207 | // than pData. |
201 | char *pTmp = new char[nDiff]; | 208 | char *pTmp = new char[nDiff]; |
202 | Bu::size ps = rBuf.accClientFiltered.read( pTmp, nDiff ); | 209 | Bu::size ps = rBuf.accClientFiltered.read( pTmp, nDiff ); |
210 | Bu::println("ClientAccess::peek: Tried to fill buffer, read %1 of needed %2\n>%3<").arg( ps ).arg( nDiff ).arg( Bu::String(pTmp, ps) ); | ||
203 | if( ps > 0 ) | 211 | if( ps > 0 ) |
204 | { | 212 | { |
205 | // Add the data read to the peek buffer. | 213 | // Add the data read to the peek buffer. |
206 | qbPeek.write( pTmp, ps ); | 214 | qbPeek.write( pTmp, ps ); |
207 | } | 215 | } |
208 | delete[] pTmp; | 216 | delete[] pTmp; |
217 | Bu::println("ClientAccess::peek: buffer left with %1").arg( qbPeek.getSize() ); | ||
209 | } | 218 | } |
210 | 219 | ||
211 | return qbPeek.peek( pData, iBytes, iOffset ); | 220 | return qbPeek.peek( pData, iBytes, iOffset ); |
@@ -225,13 +234,15 @@ Bu::size Bu::ClientBuf::ClientAccess::tell() | |||
225 | 234 | ||
226 | void Bu::ClientBuf::ClientAccess::seek( Bu::size offset ) | 235 | void Bu::ClientBuf::ClientAccess::seek( Bu::size offset ) |
227 | { | 236 | { |
237 | Bu::println("ClientAccess::seek( %1 )").arg( offset ); | ||
228 | Bu::MutexLocker l( mAccess ); | 238 | Bu::MutexLocker l( mAccess ); |
229 | // For this type of stream seek is basically a destructive skip. It's like | 239 | // For this type of stream seek is basically a destructive skip. It's like |
230 | // reading the data but with no output buffer. Let's remove data from the | 240 | // reading the data but with no output buffer. Let's remove data from the |
231 | // peek buffer first. | 241 | // peek buffer first. |
232 | if( qbPeek.getSize() > 0 ) | 242 | if( qbPeek.getSize() > 0 ) |
233 | { | 243 | { |
234 | Bu::size amount = Bu::buMax( qbPeek.getSize(), offset ); | 244 | Bu::size amount = Bu::buMin( qbPeek.getSize(), offset ); |
245 | Bu::println("ClientAccess::seek: buffered: %1, amount: %2").arg( qbPeek.getSize() ).arg( amount ); | ||
235 | qbPeek.seek( amount ); | 246 | qbPeek.seek( amount ); |
236 | offset -= amount; | 247 | offset -= amount; |
237 | } | 248 | } |
@@ -239,6 +250,7 @@ void Bu::ClientBuf::ClientAccess::seek( Bu::size offset ) | |||
239 | // If there's offset left, then apply it to the underlying stream | 250 | // If there's offset left, then apply it to the underlying stream |
240 | if( offset > 0 ) | 251 | if( offset > 0 ) |
241 | { | 252 | { |
253 | Bu::println("ClientAccess::seek: seeking remaining %1 in socket buffer").arg( offset ); | ||
242 | rBuf.accClientFiltered.seek( offset ); | 254 | rBuf.accClientFiltered.seek( offset ); |
243 | } | 255 | } |
244 | } | 256 | } |
@@ -329,6 +341,7 @@ Bu::String Bu::ClientBuf::ClientAccess::getLocation() const | |||
329 | { | 341 | { |
330 | return "ClientBuf"; | 342 | return "ClientBuf"; |
331 | } | 343 | } |
344 | #undef accClientFiltered | ||
332 | 345 | ||
333 | ///////// | 346 | ///////// |
334 | // ServerAccess | 347 | // ServerAccess |
diff --git a/src/stable/clientbuf.h b/src/stable/clientbuf.h index 7781b6a..50b6617 100644 --- a/src/stable/clientbuf.h +++ b/src/stable/clientbuf.h | |||
@@ -164,12 +164,13 @@ namespace Bu | |||
164 | private: | 164 | private: |
165 | ClientAccessRaw accClientRaw; | 165 | ClientAccessRaw accClientRaw; |
166 | ServerAccess accServer; | 166 | ServerAccess accServer; |
167 | Bu::StreamStack accClientFiltered; | 167 | //Bu::StreamStack accClientFiltered; |
168 | ClientAccess accClient; | 168 | ClientAccess accClient; |
169 | Bu::QueueBuf qbOutput; | 169 | Bu::QueueBuf qbOutput; |
170 | Bu::QueueBuf qbInput; | 170 | Bu::QueueBuf qbInput; |
171 | Bu::Mutex mOutput; | 171 | Bu::Mutex mOutput; |
172 | Bu::Mutex mInput; | 172 | Bu::Mutex mInput; |
173 | |||
173 | friend class Bu::ClientBuf::ClientAccess; | 174 | friend class Bu::ClientBuf::ClientAccess; |
174 | friend class Bu::ClientBuf::ClientAccessRaw; | 175 | friend class Bu::ClientBuf::ClientAccessRaw; |
175 | friend class Bu::ClientBuf::ServerAccess; | 176 | friend class Bu::ClientBuf::ServerAccess; |
diff --git a/src/stable/counterevent.h b/src/stable/counterevent.h new file mode 100644 index 0000000..e7ad7a1 --- /dev/null +++ b/src/stable/counterevent.h | |||
@@ -0,0 +1,154 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2007-2019 Xagasoft, All rights reserved. | ||
3 | * | ||
4 | * This file is part of the libbu++ library and is released under the | ||
5 | * terms of the license contained in the file LICENSE. | ||
6 | */ | ||
7 | |||
8 | #ifndef BU_COUNTER_EVENT_H | ||
9 | #define BU_COUNTER_EVENT_H | ||
10 | |||
11 | #include <pthread.h> | ||
12 | |||
13 | #include "bu/mutex.h" | ||
14 | #include "bu/condition.h" | ||
15 | |||
16 | namespace Bu | ||
17 | { | ||
18 | /** | ||
19 | * Represents a true/false state that controls thread synchronization. This | ||
20 | * is primarilly intended to control the synchronization state of | ||
21 | * multithreaded services. For example, telling all threads when to exit. | ||
22 | * | ||
23 | * An Event is either set or unset. If the Event is unset then it can be | ||
24 | * waited on for something to happen. As soon as the Event is set all | ||
25 | * waiting threads are released and new requests to wait are ignored until | ||
26 | * the Event is cleared again. | ||
27 | * | ||
28 | * Threads can also be woken up without setting the Event, which may be | ||
29 | * handy in certain circumstances. | ||
30 | *@ingroup Threading | ||
31 | */ | ||
32 | class CounterEvent | ||
33 | { | ||
34 | public: | ||
35 | CounterEvent() : | ||
36 | iCount( 0 ) | ||
37 | { | ||
38 | } | ||
39 | |||
40 | ~CounterEvent() | ||
41 | { | ||
42 | } | ||
43 | |||
44 | /** | ||
45 | * Wait indefinitely for the Event to trigger. If the event is already | ||
46 | * set, then return immediately. It's important to note that this may | ||
47 | * return at any time, not only when the Event is set, so examining the | ||
48 | * return value is important. | ||
49 | *@returns the set status of the Event. | ||
50 | */ | ||
51 | int wait() | ||
52 | { | ||
53 | cBlock.lock(); | ||
54 | if( iCount == 0 ) | ||
55 | { | ||
56 | cBlock.unlock(); | ||
57 | return iCount; | ||
58 | } | ||
59 | cBlock.wait(); | ||
60 | int iRet = iCount; | ||
61 | cBlock.unlock(); | ||
62 | return iRet; | ||
63 | } | ||
64 | |||
65 | /** | ||
66 | * Wait for up to nSec seconds and nUSec nanoseconds for the event to | ||
67 | * trigger. If the Event is already set then return immediately. | ||
68 | *@returns the set status of the Event. | ||
69 | */ | ||
70 | int wait( int nSec, int nUSec ) | ||
71 | { | ||
72 | cBlock.lock(); | ||
73 | if( iCount == 0 ) | ||
74 | { | ||
75 | cBlock.unlock(); | ||
76 | return iCount; | ||
77 | } | ||
78 | cBlock.wait( nSec, nUSec ); | ||
79 | bool iRet = iCount; | ||
80 | cBlock.unlock(); | ||
81 | return iRet; | ||
82 | } | ||
83 | |||
84 | /** | ||
85 | * Allow one of the waiting threads to unlock without updating the set | ||
86 | * state of the Event. | ||
87 | */ | ||
88 | void unblockOne() | ||
89 | { | ||
90 | cBlock.lock(); | ||
91 | cBlock.signal(); | ||
92 | cBlock.unlock(); | ||
93 | } | ||
94 | |||
95 | /** | ||
96 | * Allow all waiting threads to unlock and proceed without updating the | ||
97 | * set state of the Event. | ||
98 | */ | ||
99 | void unblockAll() | ||
100 | { | ||
101 | cBlock.lock(); | ||
102 | cBlock.broadcast(); | ||
103 | cBlock.unlock(); | ||
104 | } | ||
105 | |||
106 | /** | ||
107 | * Find out if the Event is in the set state or not. | ||
108 | *@returns True if set, false otherwise. | ||
109 | */ | ||
110 | bool isZero() | ||
111 | { | ||
112 | cBlock.lock(); | ||
113 | bool bRet = (iCount == 0); | ||
114 | cBlock.unlock(); | ||
115 | return bRet; | ||
116 | } | ||
117 | |||
118 | /** | ||
119 | * Sets the Event's state to true and triggers all waiting threads. | ||
120 | */ | ||
121 | void decrement() | ||
122 | { | ||
123 | cBlock.lock(); | ||
124 | iCount--; if( iCount < 0 ) iCount = 0; | ||
125 | if( iCount == 0 ) | ||
126 | cBlock.broadcast(); | ||
127 | cBlock.unlock(); | ||
128 | } | ||
129 | |||
130 | void increment() | ||
131 | { | ||
132 | cBlock.lock(); | ||
133 | iCount++; | ||
134 | cBlock.unlock(); | ||
135 | } | ||
136 | |||
137 | /** | ||
138 | * Sets the Event's state to false. This does NOT trigger any waiting | ||
139 | * threads. | ||
140 | */ | ||
141 | void clear() | ||
142 | { | ||
143 | cBlock.lock(); | ||
144 | iCount = 0; | ||
145 | cBlock.unlock(); | ||
146 | } | ||
147 | |||
148 | private: | ||
149 | Condition cBlock; /**< The condition for blocking dequeues. */ | ||
150 | int iCount; | ||
151 | }; | ||
152 | } | ||
153 | |||
154 | #endif | ||
diff --git a/src/stable/server.cpp b/src/stable/server.cpp index 592230d..3f03a63 100644 --- a/src/stable/server.cpp +++ b/src/stable/server.cpp | |||
@@ -14,6 +14,8 @@ | |||
14 | #include "bu/config.h" | 14 | #include "bu/config.h" |
15 | #include "bu/mutexlocker.h" | 15 | #include "bu/mutexlocker.h" |
16 | 16 | ||
17 | #include "bu/sio.h" | ||
18 | |||
17 | #ifdef PROFILE_BU_SERVER | 19 | #ifdef PROFILE_BU_SERVER |
18 | #define BU_PROFILE_START( x ) Bu::Profiler::getInstance().startEvent( x ) | 20 | #define BU_PROFILE_START( x ) Bu::Profiler::getInstance().startEvent( x ) |
19 | #define BU_PROFILE_END( x ) Bu::Profiler::getInstance().endEvent( x ) | 21 | #define BU_PROFILE_END( x ) Bu::Profiler::getInstance().endEvent( x ) |
@@ -39,14 +41,14 @@ Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : | |||
39 | 41 | ||
40 | for( int j = 0; j < iIoWorkers; j++ ) | 42 | for( int j = 0; j < iIoWorkers; j++ ) |
41 | { | 43 | { |
42 | IoWorker *pWorker = new IoWorker( *this, qIoEvent, qClientEvent ); | 44 | IoWorker *pWorker = new IoWorker( *this ); |
43 | lIoWorker.append( pWorker ); | 45 | lIoWorker.append( pWorker ); |
44 | pWorker->start(); | 46 | pWorker->start(); |
45 | } | 47 | } |
46 | 48 | ||
47 | for( int j = 0; j < iClientWorkers; j++ ) | 49 | for( int j = 0; j < iClientWorkers; j++ ) |
48 | { | 50 | { |
49 | ClientWorker *pWorker = new ClientWorker( *this, qClientEvent ); | 51 | ClientWorker *pWorker = new ClientWorker( *this ); |
50 | lClientWorker.append( pWorker ); | 52 | lClientWorker.append( pWorker ); |
51 | pWorker->start(); | 53 | pWorker->start(); |
52 | } | 54 | } |
@@ -90,7 +92,10 @@ void Bu::Server::scan() | |||
90 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) | 92 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) |
91 | { | 93 | { |
92 | if( (*i)->hasOutput() ) | 94 | if( (*i)->hasOutput() ) |
95 | { | ||
96 | Bu::println("Socket has output..."); | ||
93 | FD_SET( i.getKey(), &fdWrite ); | 97 | FD_SET( i.getKey(), &fdWrite ); |
98 | } | ||
94 | } | 99 | } |
95 | mClients.unlock(); | 100 | mClients.unlock(); |
96 | 101 | ||
@@ -105,6 +110,9 @@ void Bu::Server::scan() | |||
105 | ); | 110 | ); |
106 | } | 111 | } |
107 | 112 | ||
113 | evIoCycle.clear(); | ||
114 | Bu::println("Cycle clear"); | ||
115 | |||
108 | for( int j = 0; j < FD_SETSIZE; j++ ) | 116 | for( int j = 0; j < FD_SETSIZE; j++ ) |
109 | { | 117 | { |
110 | if( FD_ISSET( j, &fdRead ) ) | 118 | if( FD_ISSET( j, &fdRead ) ) |
@@ -116,15 +124,22 @@ void Bu::Server::scan() | |||
116 | } | 124 | } |
117 | else | 125 | else |
118 | { | 126 | { |
127 | evIoCycle.increment(); | ||
128 | Bu::println("Increment (read)"); | ||
119 | qIoEvent.enqueue( new Event( j, Event::Read ) ); | 129 | qIoEvent.enqueue( new Event( j, Event::Read ) ); |
120 | } | 130 | } |
121 | } | 131 | } |
122 | if( FD_ISSET( j, &fdWrite ) ) | 132 | if( FD_ISSET( j, &fdWrite ) ) |
123 | { | 133 | { |
134 | evIoCycle.increment(); | ||
135 | Bu::println("Increment (write)"); | ||
124 | qIoEvent.enqueue( new Event( j, Event::Write ) ); | 136 | qIoEvent.enqueue( new Event( j, Event::Write ) ); |
125 | } | 137 | } |
126 | } | 138 | } |
127 | 139 | ||
140 | Bu::println("Waiting"); | ||
141 | while( evIoCycle.wait() > 0 ) { } | ||
142 | |||
128 | Bu::List<int> lDelete; | 143 | Bu::List<int> lDelete; |
129 | // Now we just try to write all the pending data on all the sockets. | 144 | // Now we just try to write all the pending data on all the sockets. |
130 | // this could be done better eventually, if we care about the socket | 145 | // this could be done better eventually, if we care about the socket |
@@ -155,7 +170,7 @@ void Bu::Server::addClient( const Bu::ServerSocket *pSrv, Bu::Socket *pSocket ) | |||
155 | BU_PROFILE_START("addClient"); | 170 | BU_PROFILE_START("addClient"); |
156 | int iFdSrv; | 171 | int iFdSrv; |
157 | int iFdCli; | 172 | int iFdCli; |
158 | if( !pSrv->getFd( iFdSrv ) || !pSrv->getFd( iFdCli ) ) | 173 | if( !pSrv->getFd( iFdSrv ) || !pSocket->getFd( iFdCli ) ) |
159 | { | 174 | { |
160 | throw Bu::ExceptionBase("No file descriptor?"); | 175 | throw Bu::ExceptionBase("No file descriptor?"); |
161 | } | 176 | } |
@@ -324,12 +339,8 @@ Bu::Server::Event::Operation Bu::Server::Event::getOperation() const | |||
324 | // IoWorker | 339 | // IoWorker |
325 | //// | 340 | //// |
326 | 341 | ||
327 | Bu::Server::IoWorker::IoWorker( Bu::Server &rSrv, | 342 | Bu::Server::IoWorker::IoWorker( Bu::Server &rSrv ) : |
328 | Bu::Server::EventQueue &qIoEvent, | 343 | rSrv( rSrv ) |
329 | Bu::Server::EventQueue &qClientEvent ) : | ||
330 | rSrv( rSrv ), | ||
331 | qIoEvent( qIoEvent ), | ||
332 | qClientEvent( qClientEvent ) | ||
333 | { | 344 | { |
334 | } | 345 | } |
335 | 346 | ||
@@ -339,17 +350,21 @@ Bu::Server::IoWorker::~IoWorker() | |||
339 | 350 | ||
340 | void Bu::Server::IoWorker::run() | 351 | void Bu::Server::IoWorker::run() |
341 | { | 352 | { |
342 | while( qIoEvent.isRunning() ) | 353 | setName("busrv-ioWorker"); |
354 | while( rSrv.qIoEvent.isRunning() ) | ||
343 | { | 355 | { |
344 | Event *pEv = qIoEvent.dequeue(); | 356 | Event *pEv = rSrv.qIoEvent.dequeue( true ); |
345 | if( pEv == NULL ) | 357 | if( pEv == NULL ) |
358 | { | ||
346 | continue; | 359 | continue; |
360 | } | ||
347 | 361 | ||
348 | Client *pClient; | 362 | Client *pClient; |
349 | Socket *pSocket; | 363 | Socket *pSocket; |
350 | if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) ) | 364 | if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) ) |
351 | { | 365 | { |
352 | delete pEv; | 366 | delete pEv; |
367 | rSrv.evIoCycle.decrement(); | ||
353 | continue; | 368 | continue; |
354 | } | 369 | } |
355 | 370 | ||
@@ -368,6 +383,9 @@ void Bu::Server::IoWorker::run() | |||
368 | } | 383 | } |
369 | 384 | ||
370 | delete pEv; | 385 | delete pEv; |
386 | |||
387 | Bu::println("decrement"); | ||
388 | rSrv.evIoCycle.decrement(); | ||
371 | } | 389 | } |
372 | } | 390 | } |
373 | 391 | ||
@@ -377,12 +395,15 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) | |||
377 | Bu::size iRead; | 395 | Bu::size iRead; |
378 | Bu::size iTotal=0; | 396 | Bu::size iTotal=0; |
379 | 397 | ||
398 | Bu::println("IoWorker::handleRead: starting"); | ||
399 | |||
380 | BU_PROFILE_START("client.read"); | 400 | BU_PROFILE_START("client.read"); |
381 | for(;;) | 401 | for(;;) |
382 | { | 402 | { |
383 | try | 403 | try |
384 | { | 404 | { |
385 | iRead = pSocket->read( buf, RBS ); | 405 | iRead = pSocket->read( buf, RBS ); |
406 | Bu::println("IoWorker::handleRead: read() -> %1").arg( iRead ); | ||
386 | 407 | ||
387 | if( iRead == 0 ) | 408 | if( iRead == 0 ) |
388 | { | 409 | { |
@@ -398,6 +419,7 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) | |||
398 | } | 419 | } |
399 | catch( Bu::ExceptionBase &e ) | 420 | catch( Bu::ExceptionBase &e ) |
400 | { | 421 | { |
422 | Bu::println("IoWorker::handleRead: exception, closing: %1").arg( e.what() ); | ||
401 | close( pSocket ); | 423 | close( pSocket ); |
402 | break; | 424 | break; |
403 | } | 425 | } |
@@ -406,24 +428,27 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) | |||
406 | 428 | ||
407 | if( iTotal == 0 ) | 429 | if( iTotal == 0 ) |
408 | { | 430 | { |
431 | Bu::println("IoWorker::handleRead: read nothing, closing"); | ||
409 | close( pSocket ); | 432 | close( pSocket ); |
410 | } | 433 | } |
411 | else | 434 | else |
412 | { | 435 | { |
413 | Bu::Server::fd iFd; | 436 | Bu::Server::fd iFd; |
414 | pSocket->getFd( iFd ); | 437 | pSocket->getFd( iFd ); |
415 | qClientEvent.enqueue( new Event( iFd, Event::Process ) ); | 438 | rSrv.qClientEvent.enqueue( new Event( iFd, Event::Process ) ); |
416 | } | 439 | } |
417 | } | 440 | } |
418 | 441 | ||
419 | void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket ) | 442 | void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket ) |
420 | { | 443 | { |
444 | Bu::println("IoWorker::handleWrite() "); | ||
421 | char buf[RBS]; | 445 | char buf[RBS]; |
422 | if( pClient->hasOutput() > 0 ) | 446 | if( pClient->hasOutput() > 0 ) |
423 | { | 447 | { |
424 | int iAmnt = RBS; | 448 | int iAmnt = RBS; |
425 | iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); | 449 | iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); |
426 | int iReal = pSocket->write( buf, iAmnt ); | 450 | int iReal = pSocket->write( buf, iAmnt ); |
451 | Bu::println("IoWorker::handleWrite: Copy out: iAmnt=%1, iReal=%2\n>%3<").arg( iAmnt ).arg( iReal ).arg( Bu::String( buf, iReal ) ); | ||
427 | pClient->cbBuffer.server().seek( iReal ); | 452 | pClient->cbBuffer.server().seek( iReal ); |
428 | } | 453 | } |
429 | } | 454 | } |
@@ -439,10 +464,8 @@ void Bu::Server::IoWorker::close( Bu::Socket *pSocket ) | |||
439 | // ClientWorker | 464 | // ClientWorker |
440 | //// | 465 | //// |
441 | 466 | ||
442 | Bu::Server::ClientWorker::ClientWorker( Bu::Server &rSrv, | 467 | Bu::Server::ClientWorker::ClientWorker( Bu::Server &rSrv ) : |
443 | Bu::Server::EventQueue &qEvent ) : | 468 | rSrv( rSrv ) |
444 | rSrv( rSrv ), | ||
445 | qEvent( qEvent ) | ||
446 | { | 469 | { |
447 | } | 470 | } |
448 | 471 | ||
@@ -452,9 +475,10 @@ Bu::Server::ClientWorker::~ClientWorker() | |||
452 | 475 | ||
453 | void Bu::Server::ClientWorker::run() | 476 | void Bu::Server::ClientWorker::run() |
454 | { | 477 | { |
455 | while( qEvent.isRunning() ) | 478 | setName("busrv-cntWorker"); |
479 | while( rSrv.qClientEvent.isRunning() ) | ||
456 | { | 480 | { |
457 | Event *pEv = qEvent.dequeue(); | 481 | Event *pEv = rSrv.qClientEvent.dequeue( true ); |
458 | if( pEv == NULL ) | 482 | if( pEv == NULL ) |
459 | continue; | 483 | continue; |
460 | 484 | ||
@@ -464,8 +488,10 @@ void Bu::Server::ClientWorker::run() | |||
464 | delete pEv; | 488 | delete pEv; |
465 | continue; | 489 | continue; |
466 | } | 490 | } |
467 | 491 | ||
492 | Bu::println("Processing input..."); | ||
468 | pClient->processInput(); | 493 | pClient->processInput(); |
494 | Bu::println("Processing input complete."); | ||
469 | delete pEv; | 495 | delete pEv; |
470 | } | 496 | } |
471 | } | 497 | } |
diff --git a/src/stable/server.h b/src/stable/server.h index d66d9d5..e2b7d53 100644 --- a/src/stable/server.h +++ b/src/stable/server.h | |||
@@ -22,6 +22,7 @@ | |||
22 | #include "bu/hash.h" | 22 | #include "bu/hash.h" |
23 | #include "bu/synchroqueue.h" | 23 | #include "bu/synchroqueue.h" |
24 | #include "bu/thread.h" | 24 | #include "bu/thread.h" |
25 | #include "bu/counterevent.h" | ||
25 | 26 | ||
26 | #include "bu/config.h" | 27 | #include "bu/config.h" |
27 | 28 | ||
@@ -137,8 +138,7 @@ namespace Bu | |||
137 | class IoWorker : public Bu::Thread | 138 | class IoWorker : public Bu::Thread |
138 | { | 139 | { |
139 | public: | 140 | public: |
140 | IoWorker( Server &rSrv, EventQueue &qIoEvent, | 141 | IoWorker( Server &rSrv ); |
141 | EventQueue &qClientEvent ); | ||
142 | virtual ~IoWorker(); | 142 | virtual ~IoWorker(); |
143 | 143 | ||
144 | protected: | 144 | protected: |
@@ -151,14 +151,13 @@ namespace Bu | |||
151 | 151 | ||
152 | private: | 152 | private: |
153 | Server &rSrv; | 153 | Server &rSrv; |
154 | EventQueue &qIoEvent; | ||
155 | EventQueue &qClientEvent; | ||
156 | }; | 154 | }; |
155 | friend class Bu::Server::IoWorker; | ||
157 | 156 | ||
158 | class ClientWorker : public Bu::Thread | 157 | class ClientWorker : public Bu::Thread |
159 | { | 158 | { |
160 | public: | 159 | public: |
161 | ClientWorker( Server &rSrv, EventQueue &qEvent ); | 160 | ClientWorker( Server &rSrv ); |
162 | virtual ~ClientWorker(); | 161 | virtual ~ClientWorker(); |
163 | 162 | ||
164 | protected: | 163 | protected: |
@@ -166,8 +165,8 @@ namespace Bu | |||
166 | 165 | ||
167 | private: | 166 | private: |
168 | Server &rSrv; | 167 | Server &rSrv; |
169 | EventQueue &qEvent; | ||
170 | }; | 168 | }; |
169 | friend class Bu::Server::ClientWorker; | ||
171 | 170 | ||
172 | int nTimeoutSec; | 171 | int nTimeoutSec; |
173 | int nTimeoutUSec; | 172 | int nTimeoutUSec; |
@@ -189,6 +188,7 @@ namespace Bu | |||
189 | typedef List<ClientWorker *> ClientWorkerList; | 188 | typedef List<ClientWorker *> ClientWorkerList; |
190 | IoWorkerList lIoWorker; | 189 | IoWorkerList lIoWorker; |
191 | ClientWorkerList lClientWorker; | 190 | ClientWorkerList lClientWorker; |
191 | Bu::CounterEvent evIoCycle; | ||
192 | }; | 192 | }; |
193 | } | 193 | } |
194 | 194 | ||
diff --git a/src/stable/thread.cpp b/src/stable/thread.cpp index 5fe034a..99239e9 100644 --- a/src/stable/thread.cpp +++ b/src/stable/thread.cpp | |||
@@ -77,3 +77,8 @@ void Bu::Thread::yield() | |||
77 | sched_yield(); | 77 | sched_yield(); |
78 | } | 78 | } |
79 | 79 | ||
80 | void Bu::Thread::setName( const char *sName ) | ||
81 | { | ||
82 | pthread_setname_np( ptHandle, sName ); | ||
83 | } | ||
84 | |||
diff --git a/src/stable/thread.h b/src/stable/thread.h index 5174d1c..0a64390 100644 --- a/src/stable/thread.h +++ b/src/stable/thread.h | |||
@@ -124,6 +124,12 @@ namespace Bu | |||
124 | */ | 124 | */ |
125 | void yield(); | 125 | void yield(); |
126 | 126 | ||
127 | /** | ||
128 | * Sets the name of the thread. The sName parameter must be a C string | ||
129 | * with a max length of 16 bytes including null terminator. | ||
130 | */ | ||
131 | void setName( const char *sName ); | ||
132 | |||
127 | private: | 133 | private: |
128 | /** | 134 | /** |
129 | * This is the hidden-heart of the thread system. While run is what the | 135 | * This is the hidden-heart of the thread system. While run is what the |
diff --git a/src/unstable/protocolwebsocket.cpp b/src/unstable/protocolwebsocket.cpp index 15b7288..fa2d882 100644 --- a/src/unstable/protocolwebsocket.cpp +++ b/src/unstable/protocolwebsocket.cpp | |||
@@ -21,7 +21,8 @@ | |||
21 | 21 | ||
22 | #include <stdlib.h> | 22 | #include <stdlib.h> |
23 | 23 | ||
24 | #define DEBUG( X ) { } (void)0 | 24 | //#define DEBUG( X ) { } (void)0 |
25 | #define DEBUG( X ) { X; } (void)0 | ||
25 | 26 | ||
26 | Bu::ProtocolWebSocket::ProtocolWebSocket() : | 27 | Bu::ProtocolWebSocket::ProtocolWebSocket() : |
27 | eStatus( stProtoId ) | 28 | eStatus( stProtoId ) |