aboutsummaryrefslogtreecommitdiff
path: root/src/stable/server.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/stable/server.cpp')
-rw-r--r--src/stable/server.cpp64
1 files changed, 45 insertions, 19 deletions
diff --git a/src/stable/server.cpp b/src/stable/server.cpp
index 592230d..3f03a63 100644
--- a/src/stable/server.cpp
+++ b/src/stable/server.cpp
@@ -14,6 +14,8 @@
14#include "bu/config.h" 14#include "bu/config.h"
15#include "bu/mutexlocker.h" 15#include "bu/mutexlocker.h"
16 16
17#include "bu/sio.h"
18
17#ifdef PROFILE_BU_SERVER 19#ifdef PROFILE_BU_SERVER
18#define BU_PROFILE_START( x ) Bu::Profiler::getInstance().startEvent( x ) 20#define BU_PROFILE_START( x ) Bu::Profiler::getInstance().startEvent( x )
19#define BU_PROFILE_END( x ) Bu::Profiler::getInstance().endEvent( x ) 21#define BU_PROFILE_END( x ) Bu::Profiler::getInstance().endEvent( x )
@@ -39,14 +41,14 @@ Bu::Server::Server( int iIoWorkers, int iClientWorkers ) :
39 41
40 for( int j = 0; j < iIoWorkers; j++ ) 42 for( int j = 0; j < iIoWorkers; j++ )
41 { 43 {
42 IoWorker *pWorker = new IoWorker( *this, qIoEvent, qClientEvent ); 44 IoWorker *pWorker = new IoWorker( *this );
43 lIoWorker.append( pWorker ); 45 lIoWorker.append( pWorker );
44 pWorker->start(); 46 pWorker->start();
45 } 47 }
46 48
47 for( int j = 0; j < iClientWorkers; j++ ) 49 for( int j = 0; j < iClientWorkers; j++ )
48 { 50 {
49 ClientWorker *pWorker = new ClientWorker( *this, qClientEvent ); 51 ClientWorker *pWorker = new ClientWorker( *this );
50 lClientWorker.append( pWorker ); 52 lClientWorker.append( pWorker );
51 pWorker->start(); 53 pWorker->start();
52 } 54 }
@@ -90,7 +92,10 @@ void Bu::Server::scan()
90 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) 92 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ )
91 { 93 {
92 if( (*i)->hasOutput() ) 94 if( (*i)->hasOutput() )
95 {
96 Bu::println("Socket has output...");
93 FD_SET( i.getKey(), &fdWrite ); 97 FD_SET( i.getKey(), &fdWrite );
98 }
94 } 99 }
95 mClients.unlock(); 100 mClients.unlock();
96 101
@@ -105,6 +110,9 @@ void Bu::Server::scan()
105 ); 110 );
106 } 111 }
107 112
113 evIoCycle.clear();
114 Bu::println("Cycle clear");
115
108 for( int j = 0; j < FD_SETSIZE; j++ ) 116 for( int j = 0; j < FD_SETSIZE; j++ )
109 { 117 {
110 if( FD_ISSET( j, &fdRead ) ) 118 if( FD_ISSET( j, &fdRead ) )
@@ -116,15 +124,22 @@ void Bu::Server::scan()
116 } 124 }
117 else 125 else
118 { 126 {
127 evIoCycle.increment();
128 Bu::println("Increment (read)");
119 qIoEvent.enqueue( new Event( j, Event::Read ) ); 129 qIoEvent.enqueue( new Event( j, Event::Read ) );
120 } 130 }
121 } 131 }
122 if( FD_ISSET( j, &fdWrite ) ) 132 if( FD_ISSET( j, &fdWrite ) )
123 { 133 {
134 evIoCycle.increment();
135 Bu::println("Increment (write)");
124 qIoEvent.enqueue( new Event( j, Event::Write ) ); 136 qIoEvent.enqueue( new Event( j, Event::Write ) );
125 } 137 }
126 } 138 }
127 139
140 Bu::println("Waiting");
141 while( evIoCycle.wait() > 0 ) { }
142
128 Bu::List<int> lDelete; 143 Bu::List<int> lDelete;
129 // Now we just try to write all the pending data on all the sockets. 144 // Now we just try to write all the pending data on all the sockets.
130 // this could be done better eventually, if we care about the socket 145 // this could be done better eventually, if we care about the socket
@@ -155,7 +170,7 @@ void Bu::Server::addClient( const Bu::ServerSocket *pSrv, Bu::Socket *pSocket )
155 BU_PROFILE_START("addClient"); 170 BU_PROFILE_START("addClient");
156 int iFdSrv; 171 int iFdSrv;
157 int iFdCli; 172 int iFdCli;
158 if( !pSrv->getFd( iFdSrv ) || !pSrv->getFd( iFdCli ) ) 173 if( !pSrv->getFd( iFdSrv ) || !pSocket->getFd( iFdCli ) )
159 { 174 {
160 throw Bu::ExceptionBase("No file descriptor?"); 175 throw Bu::ExceptionBase("No file descriptor?");
161 } 176 }
@@ -324,12 +339,8 @@ Bu::Server::Event::Operation Bu::Server::Event::getOperation() const
324// IoWorker 339// IoWorker
325//// 340////
326 341
327Bu::Server::IoWorker::IoWorker( Bu::Server &rSrv, 342Bu::Server::IoWorker::IoWorker( Bu::Server &rSrv ) :
328 Bu::Server::EventQueue &qIoEvent, 343 rSrv( rSrv )
329 Bu::Server::EventQueue &qClientEvent ) :
330 rSrv( rSrv ),
331 qIoEvent( qIoEvent ),
332 qClientEvent( qClientEvent )
333{ 344{
334} 345}
335 346
@@ -339,17 +350,21 @@ Bu::Server::IoWorker::~IoWorker()
339 350
340void Bu::Server::IoWorker::run() 351void Bu::Server::IoWorker::run()
341{ 352{
342 while( qIoEvent.isRunning() ) 353 setName("busrv-ioWorker");
354 while( rSrv.qIoEvent.isRunning() )
343 { 355 {
344 Event *pEv = qIoEvent.dequeue(); 356 Event *pEv = rSrv.qIoEvent.dequeue( true );
345 if( pEv == NULL ) 357 if( pEv == NULL )
358 {
346 continue; 359 continue;
360 }
347 361
348 Client *pClient; 362 Client *pClient;
349 Socket *pSocket; 363 Socket *pSocket;
350 if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) ) 364 if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) )
351 { 365 {
352 delete pEv; 366 delete pEv;
367 rSrv.evIoCycle.decrement();
353 continue; 368 continue;
354 } 369 }
355 370
@@ -368,6 +383,9 @@ void Bu::Server::IoWorker::run()
368 } 383 }
369 384
370 delete pEv; 385 delete pEv;
386
387 Bu::println("decrement");
388 rSrv.evIoCycle.decrement();
371 } 389 }
372} 390}
373 391
@@ -377,12 +395,15 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket )
377 Bu::size iRead; 395 Bu::size iRead;
378 Bu::size iTotal=0; 396 Bu::size iTotal=0;
379 397
398 Bu::println("IoWorker::handleRead: starting");
399
380 BU_PROFILE_START("client.read"); 400 BU_PROFILE_START("client.read");
381 for(;;) 401 for(;;)
382 { 402 {
383 try 403 try
384 { 404 {
385 iRead = pSocket->read( buf, RBS ); 405 iRead = pSocket->read( buf, RBS );
406 Bu::println("IoWorker::handleRead: read() -> %1").arg( iRead );
386 407
387 if( iRead == 0 ) 408 if( iRead == 0 )
388 { 409 {
@@ -398,6 +419,7 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket )
398 } 419 }
399 catch( Bu::ExceptionBase &e ) 420 catch( Bu::ExceptionBase &e )
400 { 421 {
422 Bu::println("IoWorker::handleRead: exception, closing: %1").arg( e.what() );
401 close( pSocket ); 423 close( pSocket );
402 break; 424 break;
403 } 425 }
@@ -406,24 +428,27 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket )
406 428
407 if( iTotal == 0 ) 429 if( iTotal == 0 )
408 { 430 {
431 Bu::println("IoWorker::handleRead: read nothing, closing");
409 close( pSocket ); 432 close( pSocket );
410 } 433 }
411 else 434 else
412 { 435 {
413 Bu::Server::fd iFd; 436 Bu::Server::fd iFd;
414 pSocket->getFd( iFd ); 437 pSocket->getFd( iFd );
415 qClientEvent.enqueue( new Event( iFd, Event::Process ) ); 438 rSrv.qClientEvent.enqueue( new Event( iFd, Event::Process ) );
416 } 439 }
417} 440}
418 441
419void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket ) 442void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket )
420{ 443{
444 Bu::println("IoWorker::handleWrite() ");
421 char buf[RBS]; 445 char buf[RBS];
422 if( pClient->hasOutput() > 0 ) 446 if( pClient->hasOutput() > 0 )
423 { 447 {
424 int iAmnt = RBS; 448 int iAmnt = RBS;
425 iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); 449 iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt );
426 int iReal = pSocket->write( buf, iAmnt ); 450 int iReal = pSocket->write( buf, iAmnt );
451 Bu::println("IoWorker::handleWrite: Copy out: iAmnt=%1, iReal=%2\n>%3<").arg( iAmnt ).arg( iReal ).arg( Bu::String( buf, iReal ) );
427 pClient->cbBuffer.server().seek( iReal ); 452 pClient->cbBuffer.server().seek( iReal );
428 } 453 }
429} 454}
@@ -439,10 +464,8 @@ void Bu::Server::IoWorker::close( Bu::Socket *pSocket )
439// ClientWorker 464// ClientWorker
440//// 465////
441 466
442Bu::Server::ClientWorker::ClientWorker( Bu::Server &rSrv, 467Bu::Server::ClientWorker::ClientWorker( Bu::Server &rSrv ) :
443 Bu::Server::EventQueue &qEvent ) : 468 rSrv( rSrv )
444 rSrv( rSrv ),
445 qEvent( qEvent )
446{ 469{
447} 470}
448 471
@@ -452,9 +475,10 @@ Bu::Server::ClientWorker::~ClientWorker()
452 475
453void Bu::Server::ClientWorker::run() 476void Bu::Server::ClientWorker::run()
454{ 477{
455 while( qEvent.isRunning() ) 478 setName("busrv-cntWorker");
479 while( rSrv.qClientEvent.isRunning() )
456 { 480 {
457 Event *pEv = qEvent.dequeue(); 481 Event *pEv = rSrv.qClientEvent.dequeue( true );
458 if( pEv == NULL ) 482 if( pEv == NULL )
459 continue; 483 continue;
460 484
@@ -464,8 +488,10 @@ void Bu::Server::ClientWorker::run()
464 delete pEv; 488 delete pEv;
465 continue; 489 continue;
466 } 490 }
467 491
492 Bu::println("Processing input...");
468 pClient->processInput(); 493 pClient->processInput();
494 Bu::println("Processing input complete.");
469 delete pEv; 495 delete pEv;
470 } 496 }
471} 497}