From 412173a23c88a49ebaeb982e0c7eeddc5662b8cc Mon Sep 17 00:00:00 2001 From: Mike Buland Date: Sat, 29 Jul 2023 00:19:13 -0700 Subject: Many bugfixes. It's almost working! --- src/stable/server.cpp | 64 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 45 insertions(+), 19 deletions(-) (limited to 'src/stable/server.cpp') diff --git a/src/stable/server.cpp b/src/stable/server.cpp index 592230d..3f03a63 100644 --- a/src/stable/server.cpp +++ b/src/stable/server.cpp @@ -14,6 +14,8 @@ #include "bu/config.h" #include "bu/mutexlocker.h" +#include "bu/sio.h" + #ifdef PROFILE_BU_SERVER #define BU_PROFILE_START( x ) Bu::Profiler::getInstance().startEvent( x ) #define BU_PROFILE_END( x ) Bu::Profiler::getInstance().endEvent( x ) @@ -39,14 +41,14 @@ Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : for( int j = 0; j < iIoWorkers; j++ ) { - IoWorker *pWorker = new IoWorker( *this, qIoEvent, qClientEvent ); + IoWorker *pWorker = new IoWorker( *this ); lIoWorker.append( pWorker ); pWorker->start(); } for( int j = 0; j < iClientWorkers; j++ ) { - ClientWorker *pWorker = new ClientWorker( *this, qClientEvent ); + ClientWorker *pWorker = new ClientWorker( *this ); lClientWorker.append( pWorker ); pWorker->start(); } @@ -90,7 +92,10 @@ void Bu::Server::scan() for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) { if( (*i)->hasOutput() ) + { + Bu::println("Socket has output..."); FD_SET( i.getKey(), &fdWrite ); + } } mClients.unlock(); @@ -105,6 +110,9 @@ void Bu::Server::scan() ); } + evIoCycle.clear(); + Bu::println("Cycle clear"); + for( int j = 0; j < FD_SETSIZE; j++ ) { if( FD_ISSET( j, &fdRead ) ) @@ -116,15 +124,22 @@ void Bu::Server::scan() } else { + evIoCycle.increment(); + Bu::println("Increment (read)"); qIoEvent.enqueue( new Event( j, Event::Read ) ); } } if( FD_ISSET( j, &fdWrite ) ) { + evIoCycle.increment(); + Bu::println("Increment (write)"); qIoEvent.enqueue( new Event( j, Event::Write ) ); } } + Bu::println("Waiting"); + while( evIoCycle.wait() > 0 ) { } + Bu::List lDelete; // Now we just try to write all the pending data on all the sockets. // this could be done better eventually, if we care about the socket @@ -155,7 +170,7 @@ void Bu::Server::addClient( const Bu::ServerSocket *pSrv, Bu::Socket *pSocket ) BU_PROFILE_START("addClient"); int iFdSrv; int iFdCli; - if( !pSrv->getFd( iFdSrv ) || !pSrv->getFd( iFdCli ) ) + if( !pSrv->getFd( iFdSrv ) || !pSocket->getFd( iFdCli ) ) { throw Bu::ExceptionBase("No file descriptor?"); } @@ -324,12 +339,8 @@ Bu::Server::Event::Operation Bu::Server::Event::getOperation() const // IoWorker //// -Bu::Server::IoWorker::IoWorker( Bu::Server &rSrv, - Bu::Server::EventQueue &qIoEvent, - Bu::Server::EventQueue &qClientEvent ) : - rSrv( rSrv ), - qIoEvent( qIoEvent ), - qClientEvent( qClientEvent ) +Bu::Server::IoWorker::IoWorker( Bu::Server &rSrv ) : + rSrv( rSrv ) { } @@ -339,17 +350,21 @@ Bu::Server::IoWorker::~IoWorker() void Bu::Server::IoWorker::run() { - while( qIoEvent.isRunning() ) + setName("busrv-ioWorker"); + while( rSrv.qIoEvent.isRunning() ) { - Event *pEv = qIoEvent.dequeue(); + Event *pEv = rSrv.qIoEvent.dequeue( true ); if( pEv == NULL ) + { continue; + } Client *pClient; Socket *pSocket; if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) ) { delete pEv; + rSrv.evIoCycle.decrement(); continue; } @@ -368,6 +383,9 @@ void Bu::Server::IoWorker::run() } delete pEv; + + Bu::println("decrement"); + rSrv.evIoCycle.decrement(); } } @@ -377,12 +395,15 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) Bu::size iRead; Bu::size iTotal=0; + Bu::println("IoWorker::handleRead: starting"); + BU_PROFILE_START("client.read"); for(;;) { try { iRead = pSocket->read( buf, RBS ); + Bu::println("IoWorker::handleRead: read() -> %1").arg( iRead ); if( iRead == 0 ) { @@ -398,6 +419,7 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) } catch( Bu::ExceptionBase &e ) { + Bu::println("IoWorker::handleRead: exception, closing: %1").arg( e.what() ); close( pSocket ); break; } @@ -406,24 +428,27 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) if( iTotal == 0 ) { + Bu::println("IoWorker::handleRead: read nothing, closing"); close( pSocket ); } else { Bu::Server::fd iFd; pSocket->getFd( iFd ); - qClientEvent.enqueue( new Event( iFd, Event::Process ) ); + rSrv.qClientEvent.enqueue( new Event( iFd, Event::Process ) ); } } void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket ) { + Bu::println("IoWorker::handleWrite() "); char buf[RBS]; if( pClient->hasOutput() > 0 ) { int iAmnt = RBS; iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); int iReal = pSocket->write( buf, iAmnt ); + Bu::println("IoWorker::handleWrite: Copy out: iAmnt=%1, iReal=%2\n>%3<").arg( iAmnt ).arg( iReal ).arg( Bu::String( buf, iReal ) ); pClient->cbBuffer.server().seek( iReal ); } } @@ -439,10 +464,8 @@ void Bu::Server::IoWorker::close( Bu::Socket *pSocket ) // ClientWorker //// -Bu::Server::ClientWorker::ClientWorker( Bu::Server &rSrv, - Bu::Server::EventQueue &qEvent ) : - rSrv( rSrv ), - qEvent( qEvent ) +Bu::Server::ClientWorker::ClientWorker( Bu::Server &rSrv ) : + rSrv( rSrv ) { } @@ -452,9 +475,10 @@ Bu::Server::ClientWorker::~ClientWorker() void Bu::Server::ClientWorker::run() { - while( qEvent.isRunning() ) + setName("busrv-cntWorker"); + while( rSrv.qClientEvent.isRunning() ) { - Event *pEv = qEvent.dequeue(); + Event *pEv = rSrv.qClientEvent.dequeue( true ); if( pEv == NULL ) continue; @@ -464,8 +488,10 @@ void Bu::Server::ClientWorker::run() delete pEv; continue; } - + + Bu::println("Processing input..."); pClient->processInput(); + Bu::println("Processing input complete."); delete pEv; } } -- cgit v1.2.3