aboutsummaryrefslogtreecommitdiff
path: root/src/stable/server.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/stable/server.cpp')
-rw-r--r--src/stable/server.cpp342
1 files changed, 287 insertions, 55 deletions
diff --git a/src/stable/server.cpp b/src/stable/server.cpp
index 0552510..592230d 100644
--- a/src/stable/server.cpp
+++ b/src/stable/server.cpp
@@ -8,10 +8,11 @@
8#include "bu/server.h" 8#include "bu/server.h"
9#include <errno.h> 9#include <errno.h>
10#include <unistd.h> 10#include <unistd.h>
11#include "bu/tcpserversocket.h" 11#include "bu/serversocket.h"
12#include "bu/client.h" 12#include "bu/client.h"
13#include "bu/tcpsocket.h" 13#include "bu/socket.h"
14#include "bu/config.h" 14#include "bu/config.h"
15#include "bu/mutexlocker.h"
15 16
16#ifdef PROFILE_BU_SERVER 17#ifdef PROFILE_BU_SERVER
17#define BU_PROFILE_START( x ) Bu::Profiler::getInstance().startEvent( x ) 18#define BU_PROFILE_START( x ) Bu::Profiler::getInstance().startEvent( x )
@@ -21,13 +22,34 @@
21#define BU_PROFILE_END( x ) (void)0 22#define BU_PROFILE_END( x ) (void)0
22#endif 23#endif
23 24
24Bu::Server::Server() : 25#define RBS 1500
26
27Bu::Server::Server( int iIoWorkers, int iClientWorkers ) :
25 nTimeoutSec( 0 ), 28 nTimeoutSec( 0 ),
26 nTimeoutUSec( 0 ), 29 nTimeoutUSec( 0 ),
27 bAutoTick( false ) 30 bAutoTick( false )
28{ 31{
29 BU_PROFILE_START("server"); 32 BU_PROFILE_START("server");
30 FD_ZERO( &fdActive ); 33 FD_ZERO( &fdActive );
34
35 if( iIoWorkers < 1 )
36 iIoWorkers = 1;
37 if( iClientWorkers < 1 )
38 iClientWorkers = 1;
39
40 for( int j = 0; j < iIoWorkers; j++ )
41 {
42 IoWorker *pWorker = new IoWorker( *this, qIoEvent, qClientEvent );
43 lIoWorker.append( pWorker );
44 pWorker->start();
45 }
46
47 for( int j = 0; j < iClientWorkers; j++ )
48 {
49 ClientWorker *pWorker = new ClientWorker( *this, qClientEvent );
50 lClientWorker.append( pWorker );
51 pWorker->start();
52 }
31} 53}
32 54
33Bu::Server::~Server() 55Bu::Server::~Server()
@@ -36,20 +58,16 @@ Bu::Server::~Server()
36 BU_PROFILE_START("server"); 58 BU_PROFILE_START("server");
37} 59}
38 60
39void Bu::Server::addPort( int nPort, int nPoolSize ) 61void Bu::Server::addServerSocket( Bu::ServerSocket *pSocket )
40{ 62{
41 TcpServerSocket *s = new TcpServerSocket( nPort, nPoolSize ); 63 fd iFd;
42 socket_t nSocket = s->getSocket(); 64 if( !pSocket->getFd( iFd ) )
43 FD_SET( nSocket, &fdActive ); 65 {
44 hServers.insert( nSocket, s ); 66 throw Bu::ExceptionBase("Cannot get file descriptor from "
45} 67 "provided ServerSocket.");
46 68 }
47void Bu::Server::addPort( const String &sAddr, int nPort, int nPoolSize ) 69 FD_SET( iFd, &fdActive );
48{ 70 hServers.insert( iFd, pSocket );
49 TcpServerSocket *s = new TcpServerSocket( sAddr, nPort, nPoolSize );
50 socket_t nSocket = s->getSocket();
51 FD_SET( nSocket, &fdActive );
52 hServers.insert( nSocket, s );
53} 71}
54 72
55void Bu::Server::setTimeout( int nTimeoutSec, int nTimeoutUSec ) 73void Bu::Server::setTimeout( int nTimeoutSec, int nTimeoutUSec )
@@ -68,11 +86,13 @@ void Bu::Server::scan()
68 fd_set fdException = fdActive; 86 fd_set fdException = fdActive;
69 87
70 FD_ZERO( &fdWrite ); 88 FD_ZERO( &fdWrite );
89 mClients.lock();
71 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) 90 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ )
72 { 91 {
73 if( (*i)->hasOutput() ) 92 if( (*i)->hasOutput() )
74 FD_SET( i.getKey(), &fdWrite ); 93 FD_SET( i.getKey(), &fdWrite );
75 } 94 }
95 mClients.unlock();
76 96
77 if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, 97 if( TEMP_FAILURE_RETRY( select( FD_SETSIZE,
78 &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 ) 98 &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 )
@@ -91,42 +111,17 @@ void Bu::Server::scan()
91 { 111 {
92 if( hServers.has( j ) ) 112 if( hServers.has( j ) )
93 { 113 {
94 TcpServerSocket *pSrv = hServers.get( j ); 114 Bu::ServerSocket *pSrv = hServers.get( j );
95 addClient( pSrv->accept(), pSrv->getPort() ); 115 addClient( pSrv, pSrv->accept() );
96 } 116 }
97 else 117 else
98 { 118 {
99 Client *pClient = hClients.get( j ); 119 qIoEvent.enqueue( new Event( j, Event::Read ) );
100 BU_PROFILE_START("processInput");
101 pClient->processInput();
102 BU_PROFILE_END("processInput");
103 if( !pClient->isOpen() )
104 {
105 closeClient( j );
106 }
107 } 120 }
108 } 121 }
109 if( FD_ISSET( j, &fdWrite ) ) 122 if( FD_ISSET( j, &fdWrite ) )
110 { 123 {
111 try 124 qIoEvent.enqueue( new Event( j, Event::Write ) );
112 {
113 Client *pClient = hClients.get( j );
114 try
115 {
116 BU_PROFILE_START("processOutput");
117 pClient->processOutput();
118 BU_PROFILE_END("processOutput");
119 }
120 catch( Bu::TcpSocketException &e )
121 {
122 closeClient( j );
123 }
124 }
125 catch( Bu::HashException &e )
126 {
127 // Do nothing, I guess, the client is already dead...
128 // TODO: Someday, we may want to handle this more graceully.
129 }
130 } 125 }
131 } 126 }
132 127
@@ -134,6 +129,7 @@ void Bu::Server::scan()
134 // Now we just try to write all the pending data on all the sockets. 129 // Now we just try to write all the pending data on all the sockets.
135 // this could be done better eventually, if we care about the socket 130 // this could be done better eventually, if we care about the socket
136 // wanting to accept writes (using a select). 131 // wanting to accept writes (using a select).
132 mClients.lock();
137 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) 133 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ )
138 { 134 {
139 if( (*i)->wantsDisconnect() && !(*i)->hasOutput() ) 135 if( (*i)->wantsDisconnect() && !(*i)->hasOutput() )
@@ -141,6 +137,7 @@ void Bu::Server::scan()
141 lDelete.append( i.getKey() ); 137 lDelete.append( i.getKey() );
142 } 138 }
143 } 139 }
140 mClients.unlock();
144 141
145 for( Bu::List<int>::iterator i = lDelete.begin(); i != lDelete.end(); i++ ) 142 for( Bu::List<int>::iterator i = lDelete.begin(); i != lDelete.end(); i++ )
146 { 143 {
@@ -153,21 +150,47 @@ void Bu::Server::scan()
153 BU_PROFILE_END("scan"); 150 BU_PROFILE_END("scan");
154} 151}
155 152
156void Bu::Server::addClient( socket_t nSocket, int nPort ) 153void Bu::Server::addClient( const Bu::ServerSocket *pSrv, Bu::Socket *pSocket )
157{ 154{
158 BU_PROFILE_START("addClient"); 155 BU_PROFILE_START("addClient");
159 FD_SET( nSocket, &fdActive ); 156 int iFdSrv;
157 int iFdCli;
158 if( !pSrv->getFd( iFdSrv ) || !pSrv->getFd( iFdCli ) )
159 {
160 throw Bu::ExceptionBase("No file descriptor?");
161 }
162 FD_SET( iFdCli, &fdActive );
160 163
161 Client *c = new Client( 164 Client *pClient = new Client(
162 new Bu::TcpSocket( nSocket ),
163 new SrvClientLinkFactory() 165 new SrvClientLinkFactory()
164 ); 166 );
165 hClients.insert( nSocket, c ); 167 {
168 Bu::MutexLocker l( mClients );
169 hClients.insert( iFdCli, pClient );
170 hSockets.insert( iFdCli, pSocket );
171 }
166 172
167 onNewConnection( c, nPort ); 173 onNewConnection( pSrv, pClient, pSocket );
168 BU_PROFILE_END("addClient"); 174 BU_PROFILE_END("addClient");
169} 175}
170 176
177Bu::Client *Bu::Server::getClient( fd iId )
178{
179 Bu::MutexLocker l( mClients );
180 return hClients.get( iId );
181}
182
183bool Bu::Server::getClientAndSocket( fd iId, Bu::Client *&pClient,
184 Bu::Socket *&pSocket )
185{
186 Bu::MutexLocker l( mClients );
187 if( !hClients.has( iId ) || !hSockets.has( iId ) )
188 return false;
189 pClient = hClients.get( iId );
190 pSocket = hSockets.get( iId );
191 return true;
192}
193
171Bu::Server::SrvClientLink::SrvClientLink( Bu::Client *pClient ) : 194Bu::Server::SrvClientLink::SrvClientLink( Bu::Client *pClient ) :
172 pClient( pClient ) 195 pClient( pClient )
173{ 196{
@@ -203,14 +226,45 @@ void Bu::Server::setAutoTick( bool bEnable )
203 226
204void Bu::Server::tick() 227void Bu::Server::tick()
205{ 228{
229 mClients.lock();
206 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) 230 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ )
207 { 231 {
208 (*i)->tick(); 232 (*i)->tick();
209 } 233 }
234 mClients.unlock();
210} 235}
211 236
212void Bu::Server::shutdown() 237void Bu::Server::shutdown()
213{ 238{
239 {
240 qIoEvent.stop();
241 qClientEvent.stop();
242 Bu::Server::Event *pEv;
243 while( (pEv = qIoEvent.drain()) != NULL )
244 {
245 delete pEv;
246 }
247 while( (pEv = qClientEvent.drain()) != NULL )
248 {
249 delete pEv;
250 }
251
252 Bu::MutexLocker l( mWorkers );
253 for( IoWorkerList::iterator i = lIoWorker.begin(); i; i++ )
254 {
255 (*i)->join();
256 delete *i;
257 }
258 lIoWorker.clear();
259 for( ClientWorkerList::iterator i = lClientWorker.begin();
260 i; i++ )
261 {
262 (*i)->join();
263 delete *i;
264 }
265 lClientWorker.clear();
266 }
267
214 for( SrvHash::iterator i = hServers.begin(); i != hServers.end(); i++ ) 268 for( SrvHash::iterator i = hServers.begin(); i != hServers.end(); i++ )
215 { 269 {
216 delete *i; 270 delete *i;
@@ -218,23 +272,201 @@ void Bu::Server::shutdown()
218 272
219 hServers.clear(); 273 hServers.clear();
220 274
221 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) 275 ClientHash::KeyList lClients = hClients.getKeys();
276 for( ClientHash::KeyList::iterator i = lClients.begin(); i; i++ )
222 { 277 {
223 closeClient( i.getKey() ); 278 closeClient( *i );
224 } 279 }
225
226 hClients.clear(); 280 hClients.clear();
227} 281}
228 282
229void Bu::Server::closeClient( socket_t iSocket ) 283void Bu::Server::closeClient( fd iSocket )
230{ 284{
231 BU_PROFILE_START("closeClient"); 285 BU_PROFILE_START("closeClient");
232 Bu::Client *pClient = hClients.get( iSocket ); 286 Bu::Client *pClient = hClients.get( iSocket );
287 Bu::Socket *pSocket = hSockets.get( iSocket );
233 onClosedConnection( pClient ); 288 onClosedConnection( pClient );
234 pClient->close(); 289 pClient->close();
235 hClients.erase( iSocket ); 290 hClients.erase( iSocket );
291 pSocket->close();
292 hSockets.erase( iSocket );
236 FD_CLR( iSocket, &fdActive ); 293 FD_CLR( iSocket, &fdActive );
237 delete pClient; 294 delete pClient;
295 delete pSocket;
238 BU_PROFILE_END("closeClient"); 296 BU_PROFILE_END("closeClient");
239} 297}
240 298
299////////
300// Event
301////
302
303Bu::Server::Event::Event( fd iId, Operation eOp ) :
304 iId( iId ),
305 eOp( eOp )
306{
307}
308
309Bu::Server::Event::~Event()
310{
311}
312
313Bu::Server::fd Bu::Server::Event::getId() const
314{
315 return iId;
316}
317
318Bu::Server::Event::Operation Bu::Server::Event::getOperation() const
319{
320 return eOp;
321}
322
323/////////
324// IoWorker
325////
326
327Bu::Server::IoWorker::IoWorker( Bu::Server &rSrv,
328 Bu::Server::EventQueue &qIoEvent,
329 Bu::Server::EventQueue &qClientEvent ) :
330 rSrv( rSrv ),
331 qIoEvent( qIoEvent ),
332 qClientEvent( qClientEvent )
333{
334}
335
336Bu::Server::IoWorker::~IoWorker()
337{
338}
339
340void Bu::Server::IoWorker::run()
341{
342 while( qIoEvent.isRunning() )
343 {
344 Event *pEv = qIoEvent.dequeue();
345 if( pEv == NULL )
346 continue;
347
348 Client *pClient;
349 Socket *pSocket;
350 if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) )
351 {
352 delete pEv;
353 continue;
354 }
355
356 switch( pEv->getOperation() )
357 {
358 case Event::Read:
359 handleRead( pClient, pSocket );
360 break;
361
362 case Event::Write:
363 handleWrite( pClient, pSocket );
364 break;
365
366 case Event::Process:
367 break;
368 }
369
370 delete pEv;
371 }
372}
373
374void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket )
375{
376 char buf[RBS];
377 Bu::size iRead;
378 Bu::size iTotal=0;
379
380 BU_PROFILE_START("client.read");
381 for(;;)
382 {
383 try
384 {
385 iRead = pSocket->read( buf, RBS );
386
387 if( iRead == 0 )
388 {
389 break;
390 }
391 else
392 {
393 iTotal += iRead;
394 pClient->cbBuffer.server().write( buf, iRead );
395 if( !pSocket->canRead() )
396 break;
397 }
398 }
399 catch( Bu::ExceptionBase &e )
400 {
401 close( pSocket );
402 break;
403 }
404 }
405 BU_PROFILE_END("client.read");
406
407 if( iTotal == 0 )
408 {
409 close( pSocket );
410 }
411 else
412 {
413 Bu::Server::fd iFd;
414 pSocket->getFd( iFd );
415 qClientEvent.enqueue( new Event( iFd, Event::Process ) );
416 }
417}
418
419void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket )
420{
421 char buf[RBS];
422 if( pClient->hasOutput() > 0 )
423 {
424 int iAmnt = RBS;
425 iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt );
426 int iReal = pSocket->write( buf, iAmnt );
427 pClient->cbBuffer.server().seek( iReal );
428 }
429}
430
431void Bu::Server::IoWorker::close( Bu::Socket *pSocket )
432{
433 Bu::Server::fd iFd;
434 pSocket->getFd( iFd );
435 rSrv.closeClient( iFd );
436}
437
438/////////
439// ClientWorker
440////
441
442Bu::Server::ClientWorker::ClientWorker( Bu::Server &rSrv,
443 Bu::Server::EventQueue &qEvent ) :
444 rSrv( rSrv ),
445 qEvent( qEvent )
446{
447}
448
449Bu::Server::ClientWorker::~ClientWorker()
450{
451}
452
453void Bu::Server::ClientWorker::run()
454{
455 while( qEvent.isRunning() )
456 {
457 Event *pEv = qEvent.dequeue();
458 if( pEv == NULL )
459 continue;
460
461 Client *pClient = rSrv.getClient( pEv->getId() );
462 if( pClient == NULL )
463 {
464 delete pEv;
465 continue;
466 }
467
468 pClient->processInput();
469 delete pEv;
470 }
471}
472