diff options
Diffstat (limited to 'src/stable/server.cpp')
-rw-r--r-- | src/stable/server.cpp | 342 |
1 files changed, 287 insertions, 55 deletions
diff --git a/src/stable/server.cpp b/src/stable/server.cpp index 0552510..592230d 100644 --- a/src/stable/server.cpp +++ b/src/stable/server.cpp | |||
@@ -8,10 +8,11 @@ | |||
8 | #include "bu/server.h" | 8 | #include "bu/server.h" |
9 | #include <errno.h> | 9 | #include <errno.h> |
10 | #include <unistd.h> | 10 | #include <unistd.h> |
11 | #include "bu/tcpserversocket.h" | 11 | #include "bu/serversocket.h" |
12 | #include "bu/client.h" | 12 | #include "bu/client.h" |
13 | #include "bu/tcpsocket.h" | 13 | #include "bu/socket.h" |
14 | #include "bu/config.h" | 14 | #include "bu/config.h" |
15 | #include "bu/mutexlocker.h" | ||
15 | 16 | ||
16 | #ifdef PROFILE_BU_SERVER | 17 | #ifdef PROFILE_BU_SERVER |
17 | #define BU_PROFILE_START( x ) Bu::Profiler::getInstance().startEvent( x ) | 18 | #define BU_PROFILE_START( x ) Bu::Profiler::getInstance().startEvent( x ) |
@@ -21,13 +22,34 @@ | |||
21 | #define BU_PROFILE_END( x ) (void)0 | 22 | #define BU_PROFILE_END( x ) (void)0 |
22 | #endif | 23 | #endif |
23 | 24 | ||
24 | Bu::Server::Server() : | 25 | #define RBS 1500 |
26 | |||
27 | Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : | ||
25 | nTimeoutSec( 0 ), | 28 | nTimeoutSec( 0 ), |
26 | nTimeoutUSec( 0 ), | 29 | nTimeoutUSec( 0 ), |
27 | bAutoTick( false ) | 30 | bAutoTick( false ) |
28 | { | 31 | { |
29 | BU_PROFILE_START("server"); | 32 | BU_PROFILE_START("server"); |
30 | FD_ZERO( &fdActive ); | 33 | FD_ZERO( &fdActive ); |
34 | |||
35 | if( iIoWorkers < 1 ) | ||
36 | iIoWorkers = 1; | ||
37 | if( iClientWorkers < 1 ) | ||
38 | iClientWorkers = 1; | ||
39 | |||
40 | for( int j = 0; j < iIoWorkers; j++ ) | ||
41 | { | ||
42 | IoWorker *pWorker = new IoWorker( *this, qIoEvent, qClientEvent ); | ||
43 | lIoWorker.append( pWorker ); | ||
44 | pWorker->start(); | ||
45 | } | ||
46 | |||
47 | for( int j = 0; j < iClientWorkers; j++ ) | ||
48 | { | ||
49 | ClientWorker *pWorker = new ClientWorker( *this, qClientEvent ); | ||
50 | lClientWorker.append( pWorker ); | ||
51 | pWorker->start(); | ||
52 | } | ||
31 | } | 53 | } |
32 | 54 | ||
33 | Bu::Server::~Server() | 55 | Bu::Server::~Server() |
@@ -36,20 +58,16 @@ Bu::Server::~Server() | |||
36 | BU_PROFILE_START("server"); | 58 | BU_PROFILE_START("server"); |
37 | } | 59 | } |
38 | 60 | ||
39 | void Bu::Server::addPort( int nPort, int nPoolSize ) | 61 | void Bu::Server::addServerSocket( Bu::ServerSocket *pSocket ) |
40 | { | 62 | { |
41 | TcpServerSocket *s = new TcpServerSocket( nPort, nPoolSize ); | 63 | fd iFd; |
42 | socket_t nSocket = s->getSocket(); | 64 | if( !pSocket->getFd( iFd ) ) |
43 | FD_SET( nSocket, &fdActive ); | 65 | { |
44 | hServers.insert( nSocket, s ); | 66 | throw Bu::ExceptionBase("Cannot get file descriptor from " |
45 | } | 67 | "provided ServerSocket."); |
46 | 68 | } | |
47 | void Bu::Server::addPort( const String &sAddr, int nPort, int nPoolSize ) | 69 | FD_SET( iFd, &fdActive ); |
48 | { | 70 | hServers.insert( iFd, pSocket ); |
49 | TcpServerSocket *s = new TcpServerSocket( sAddr, nPort, nPoolSize ); | ||
50 | socket_t nSocket = s->getSocket(); | ||
51 | FD_SET( nSocket, &fdActive ); | ||
52 | hServers.insert( nSocket, s ); | ||
53 | } | 71 | } |
54 | 72 | ||
55 | void Bu::Server::setTimeout( int nTimeoutSec, int nTimeoutUSec ) | 73 | void Bu::Server::setTimeout( int nTimeoutSec, int nTimeoutUSec ) |
@@ -68,11 +86,13 @@ void Bu::Server::scan() | |||
68 | fd_set fdException = fdActive; | 86 | fd_set fdException = fdActive; |
69 | 87 | ||
70 | FD_ZERO( &fdWrite ); | 88 | FD_ZERO( &fdWrite ); |
89 | mClients.lock(); | ||
71 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) | 90 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) |
72 | { | 91 | { |
73 | if( (*i)->hasOutput() ) | 92 | if( (*i)->hasOutput() ) |
74 | FD_SET( i.getKey(), &fdWrite ); | 93 | FD_SET( i.getKey(), &fdWrite ); |
75 | } | 94 | } |
95 | mClients.unlock(); | ||
76 | 96 | ||
77 | if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, | 97 | if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, |
78 | &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 ) | 98 | &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 ) |
@@ -91,42 +111,17 @@ void Bu::Server::scan() | |||
91 | { | 111 | { |
92 | if( hServers.has( j ) ) | 112 | if( hServers.has( j ) ) |
93 | { | 113 | { |
94 | TcpServerSocket *pSrv = hServers.get( j ); | 114 | Bu::ServerSocket *pSrv = hServers.get( j ); |
95 | addClient( pSrv->accept(), pSrv->getPort() ); | 115 | addClient( pSrv, pSrv->accept() ); |
96 | } | 116 | } |
97 | else | 117 | else |
98 | { | 118 | { |
99 | Client *pClient = hClients.get( j ); | 119 | qIoEvent.enqueue( new Event( j, Event::Read ) ); |
100 | BU_PROFILE_START("processInput"); | ||
101 | pClient->processInput(); | ||
102 | BU_PROFILE_END("processInput"); | ||
103 | if( !pClient->isOpen() ) | ||
104 | { | ||
105 | closeClient( j ); | ||
106 | } | ||
107 | } | 120 | } |
108 | } | 121 | } |
109 | if( FD_ISSET( j, &fdWrite ) ) | 122 | if( FD_ISSET( j, &fdWrite ) ) |
110 | { | 123 | { |
111 | try | 124 | qIoEvent.enqueue( new Event( j, Event::Write ) ); |
112 | { | ||
113 | Client *pClient = hClients.get( j ); | ||
114 | try | ||
115 | { | ||
116 | BU_PROFILE_START("processOutput"); | ||
117 | pClient->processOutput(); | ||
118 | BU_PROFILE_END("processOutput"); | ||
119 | } | ||
120 | catch( Bu::TcpSocketException &e ) | ||
121 | { | ||
122 | closeClient( j ); | ||
123 | } | ||
124 | } | ||
125 | catch( Bu::HashException &e ) | ||
126 | { | ||
127 | // Do nothing, I guess, the client is already dead... | ||
128 | // TODO: Someday, we may want to handle this more graceully. | ||
129 | } | ||
130 | } | 125 | } |
131 | } | 126 | } |
132 | 127 | ||
@@ -134,6 +129,7 @@ void Bu::Server::scan() | |||
134 | // Now we just try to write all the pending data on all the sockets. | 129 | // Now we just try to write all the pending data on all the sockets. |
135 | // this could be done better eventually, if we care about the socket | 130 | // this could be done better eventually, if we care about the socket |
136 | // wanting to accept writes (using a select). | 131 | // wanting to accept writes (using a select). |
132 | mClients.lock(); | ||
137 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) | 133 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) |
138 | { | 134 | { |
139 | if( (*i)->wantsDisconnect() && !(*i)->hasOutput() ) | 135 | if( (*i)->wantsDisconnect() && !(*i)->hasOutput() ) |
@@ -141,6 +137,7 @@ void Bu::Server::scan() | |||
141 | lDelete.append( i.getKey() ); | 137 | lDelete.append( i.getKey() ); |
142 | } | 138 | } |
143 | } | 139 | } |
140 | mClients.unlock(); | ||
144 | 141 | ||
145 | for( Bu::List<int>::iterator i = lDelete.begin(); i != lDelete.end(); i++ ) | 142 | for( Bu::List<int>::iterator i = lDelete.begin(); i != lDelete.end(); i++ ) |
146 | { | 143 | { |
@@ -153,21 +150,47 @@ void Bu::Server::scan() | |||
153 | BU_PROFILE_END("scan"); | 150 | BU_PROFILE_END("scan"); |
154 | } | 151 | } |
155 | 152 | ||
156 | void Bu::Server::addClient( socket_t nSocket, int nPort ) | 153 | void Bu::Server::addClient( const Bu::ServerSocket *pSrv, Bu::Socket *pSocket ) |
157 | { | 154 | { |
158 | BU_PROFILE_START("addClient"); | 155 | BU_PROFILE_START("addClient"); |
159 | FD_SET( nSocket, &fdActive ); | 156 | int iFdSrv; |
157 | int iFdCli; | ||
158 | if( !pSrv->getFd( iFdSrv ) || !pSrv->getFd( iFdCli ) ) | ||
159 | { | ||
160 | throw Bu::ExceptionBase("No file descriptor?"); | ||
161 | } | ||
162 | FD_SET( iFdCli, &fdActive ); | ||
160 | 163 | ||
161 | Client *c = new Client( | 164 | Client *pClient = new Client( |
162 | new Bu::TcpSocket( nSocket ), | ||
163 | new SrvClientLinkFactory() | 165 | new SrvClientLinkFactory() |
164 | ); | 166 | ); |
165 | hClients.insert( nSocket, c ); | 167 | { |
168 | Bu::MutexLocker l( mClients ); | ||
169 | hClients.insert( iFdCli, pClient ); | ||
170 | hSockets.insert( iFdCli, pSocket ); | ||
171 | } | ||
166 | 172 | ||
167 | onNewConnection( c, nPort ); | 173 | onNewConnection( pSrv, pClient, pSocket ); |
168 | BU_PROFILE_END("addClient"); | 174 | BU_PROFILE_END("addClient"); |
169 | } | 175 | } |
170 | 176 | ||
177 | Bu::Client *Bu::Server::getClient( fd iId ) | ||
178 | { | ||
179 | Bu::MutexLocker l( mClients ); | ||
180 | return hClients.get( iId ); | ||
181 | } | ||
182 | |||
183 | bool Bu::Server::getClientAndSocket( fd iId, Bu::Client *&pClient, | ||
184 | Bu::Socket *&pSocket ) | ||
185 | { | ||
186 | Bu::MutexLocker l( mClients ); | ||
187 | if( !hClients.has( iId ) || !hSockets.has( iId ) ) | ||
188 | return false; | ||
189 | pClient = hClients.get( iId ); | ||
190 | pSocket = hSockets.get( iId ); | ||
191 | return true; | ||
192 | } | ||
193 | |||
171 | Bu::Server::SrvClientLink::SrvClientLink( Bu::Client *pClient ) : | 194 | Bu::Server::SrvClientLink::SrvClientLink( Bu::Client *pClient ) : |
172 | pClient( pClient ) | 195 | pClient( pClient ) |
173 | { | 196 | { |
@@ -203,14 +226,45 @@ void Bu::Server::setAutoTick( bool bEnable ) | |||
203 | 226 | ||
204 | void Bu::Server::tick() | 227 | void Bu::Server::tick() |
205 | { | 228 | { |
229 | mClients.lock(); | ||
206 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) | 230 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) |
207 | { | 231 | { |
208 | (*i)->tick(); | 232 | (*i)->tick(); |
209 | } | 233 | } |
234 | mClients.unlock(); | ||
210 | } | 235 | } |
211 | 236 | ||
212 | void Bu::Server::shutdown() | 237 | void Bu::Server::shutdown() |
213 | { | 238 | { |
239 | { | ||
240 | qIoEvent.stop(); | ||
241 | qClientEvent.stop(); | ||
242 | Bu::Server::Event *pEv; | ||
243 | while( (pEv = qIoEvent.drain()) != NULL ) | ||
244 | { | ||
245 | delete pEv; | ||
246 | } | ||
247 | while( (pEv = qClientEvent.drain()) != NULL ) | ||
248 | { | ||
249 | delete pEv; | ||
250 | } | ||
251 | |||
252 | Bu::MutexLocker l( mWorkers ); | ||
253 | for( IoWorkerList::iterator i = lIoWorker.begin(); i; i++ ) | ||
254 | { | ||
255 | (*i)->join(); | ||
256 | delete *i; | ||
257 | } | ||
258 | lIoWorker.clear(); | ||
259 | for( ClientWorkerList::iterator i = lClientWorker.begin(); | ||
260 | i; i++ ) | ||
261 | { | ||
262 | (*i)->join(); | ||
263 | delete *i; | ||
264 | } | ||
265 | lClientWorker.clear(); | ||
266 | } | ||
267 | |||
214 | for( SrvHash::iterator i = hServers.begin(); i != hServers.end(); i++ ) | 268 | for( SrvHash::iterator i = hServers.begin(); i != hServers.end(); i++ ) |
215 | { | 269 | { |
216 | delete *i; | 270 | delete *i; |
@@ -218,23 +272,201 @@ void Bu::Server::shutdown() | |||
218 | 272 | ||
219 | hServers.clear(); | 273 | hServers.clear(); |
220 | 274 | ||
221 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) | 275 | ClientHash::KeyList lClients = hClients.getKeys(); |
276 | for( ClientHash::KeyList::iterator i = lClients.begin(); i; i++ ) | ||
222 | { | 277 | { |
223 | closeClient( i.getKey() ); | 278 | closeClient( *i ); |
224 | } | 279 | } |
225 | |||
226 | hClients.clear(); | 280 | hClients.clear(); |
227 | } | 281 | } |
228 | 282 | ||
229 | void Bu::Server::closeClient( socket_t iSocket ) | 283 | void Bu::Server::closeClient( fd iSocket ) |
230 | { | 284 | { |
231 | BU_PROFILE_START("closeClient"); | 285 | BU_PROFILE_START("closeClient"); |
232 | Bu::Client *pClient = hClients.get( iSocket ); | 286 | Bu::Client *pClient = hClients.get( iSocket ); |
287 | Bu::Socket *pSocket = hSockets.get( iSocket ); | ||
233 | onClosedConnection( pClient ); | 288 | onClosedConnection( pClient ); |
234 | pClient->close(); | 289 | pClient->close(); |
235 | hClients.erase( iSocket ); | 290 | hClients.erase( iSocket ); |
291 | pSocket->close(); | ||
292 | hSockets.erase( iSocket ); | ||
236 | FD_CLR( iSocket, &fdActive ); | 293 | FD_CLR( iSocket, &fdActive ); |
237 | delete pClient; | 294 | delete pClient; |
295 | delete pSocket; | ||
238 | BU_PROFILE_END("closeClient"); | 296 | BU_PROFILE_END("closeClient"); |
239 | } | 297 | } |
240 | 298 | ||
299 | //////// | ||
300 | // Event | ||
301 | //// | ||
302 | |||
303 | Bu::Server::Event::Event( fd iId, Operation eOp ) : | ||
304 | iId( iId ), | ||
305 | eOp( eOp ) | ||
306 | { | ||
307 | } | ||
308 | |||
309 | Bu::Server::Event::~Event() | ||
310 | { | ||
311 | } | ||
312 | |||
313 | Bu::Server::fd Bu::Server::Event::getId() const | ||
314 | { | ||
315 | return iId; | ||
316 | } | ||
317 | |||
318 | Bu::Server::Event::Operation Bu::Server::Event::getOperation() const | ||
319 | { | ||
320 | return eOp; | ||
321 | } | ||
322 | |||
323 | ///////// | ||
324 | // IoWorker | ||
325 | //// | ||
326 | |||
327 | Bu::Server::IoWorker::IoWorker( Bu::Server &rSrv, | ||
328 | Bu::Server::EventQueue &qIoEvent, | ||
329 | Bu::Server::EventQueue &qClientEvent ) : | ||
330 | rSrv( rSrv ), | ||
331 | qIoEvent( qIoEvent ), | ||
332 | qClientEvent( qClientEvent ) | ||
333 | { | ||
334 | } | ||
335 | |||
336 | Bu::Server::IoWorker::~IoWorker() | ||
337 | { | ||
338 | } | ||
339 | |||
340 | void Bu::Server::IoWorker::run() | ||
341 | { | ||
342 | while( qIoEvent.isRunning() ) | ||
343 | { | ||
344 | Event *pEv = qIoEvent.dequeue(); | ||
345 | if( pEv == NULL ) | ||
346 | continue; | ||
347 | |||
348 | Client *pClient; | ||
349 | Socket *pSocket; | ||
350 | if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) ) | ||
351 | { | ||
352 | delete pEv; | ||
353 | continue; | ||
354 | } | ||
355 | |||
356 | switch( pEv->getOperation() ) | ||
357 | { | ||
358 | case Event::Read: | ||
359 | handleRead( pClient, pSocket ); | ||
360 | break; | ||
361 | |||
362 | case Event::Write: | ||
363 | handleWrite( pClient, pSocket ); | ||
364 | break; | ||
365 | |||
366 | case Event::Process: | ||
367 | break; | ||
368 | } | ||
369 | |||
370 | delete pEv; | ||
371 | } | ||
372 | } | ||
373 | |||
374 | void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) | ||
375 | { | ||
376 | char buf[RBS]; | ||
377 | Bu::size iRead; | ||
378 | Bu::size iTotal=0; | ||
379 | |||
380 | BU_PROFILE_START("client.read"); | ||
381 | for(;;) | ||
382 | { | ||
383 | try | ||
384 | { | ||
385 | iRead = pSocket->read( buf, RBS ); | ||
386 | |||
387 | if( iRead == 0 ) | ||
388 | { | ||
389 | break; | ||
390 | } | ||
391 | else | ||
392 | { | ||
393 | iTotal += iRead; | ||
394 | pClient->cbBuffer.server().write( buf, iRead ); | ||
395 | if( !pSocket->canRead() ) | ||
396 | break; | ||
397 | } | ||
398 | } | ||
399 | catch( Bu::ExceptionBase &e ) | ||
400 | { | ||
401 | close( pSocket ); | ||
402 | break; | ||
403 | } | ||
404 | } | ||
405 | BU_PROFILE_END("client.read"); | ||
406 | |||
407 | if( iTotal == 0 ) | ||
408 | { | ||
409 | close( pSocket ); | ||
410 | } | ||
411 | else | ||
412 | { | ||
413 | Bu::Server::fd iFd; | ||
414 | pSocket->getFd( iFd ); | ||
415 | qClientEvent.enqueue( new Event( iFd, Event::Process ) ); | ||
416 | } | ||
417 | } | ||
418 | |||
419 | void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket ) | ||
420 | { | ||
421 | char buf[RBS]; | ||
422 | if( pClient->hasOutput() > 0 ) | ||
423 | { | ||
424 | int iAmnt = RBS; | ||
425 | iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); | ||
426 | int iReal = pSocket->write( buf, iAmnt ); | ||
427 | pClient->cbBuffer.server().seek( iReal ); | ||
428 | } | ||
429 | } | ||
430 | |||
431 | void Bu::Server::IoWorker::close( Bu::Socket *pSocket ) | ||
432 | { | ||
433 | Bu::Server::fd iFd; | ||
434 | pSocket->getFd( iFd ); | ||
435 | rSrv.closeClient( iFd ); | ||
436 | } | ||
437 | |||
438 | ///////// | ||
439 | // ClientWorker | ||
440 | //// | ||
441 | |||
442 | Bu::Server::ClientWorker::ClientWorker( Bu::Server &rSrv, | ||
443 | Bu::Server::EventQueue &qEvent ) : | ||
444 | rSrv( rSrv ), | ||
445 | qEvent( qEvent ) | ||
446 | { | ||
447 | } | ||
448 | |||
449 | Bu::Server::ClientWorker::~ClientWorker() | ||
450 | { | ||
451 | } | ||
452 | |||
453 | void Bu::Server::ClientWorker::run() | ||
454 | { | ||
455 | while( qEvent.isRunning() ) | ||
456 | { | ||
457 | Event *pEv = qEvent.dequeue(); | ||
458 | if( pEv == NULL ) | ||
459 | continue; | ||
460 | |||
461 | Client *pClient = rSrv.getClient( pEv->getId() ); | ||
462 | if( pClient == NULL ) | ||
463 | { | ||
464 | delete pEv; | ||
465 | continue; | ||
466 | } | ||
467 | |||
468 | pClient->processInput(); | ||
469 | delete pEv; | ||
470 | } | ||
471 | } | ||
472 | |||