From a49af97abf091a32f6ec2c3985aa0890ded65d9c Mon Sep 17 00:00:00 2001 From: Mike Buland Date: Mon, 31 Jul 2023 15:17:52 -0700 Subject: Debugging yet, but the new server works. It at least seems to! --- src/stable/client.cpp | 18 +++- src/stable/client.h | 9 +- src/stable/clientbuf.cpp | 10 -- src/stable/server.cpp | 194 +++++++++++++++++++++++++++++-------- src/stable/server.h | 29 ++++-- src/stable/serverinterface.cpp | 29 ++++++ src/stable/serverinterface.h | 40 ++++++++ src/unstable/protocolwebsocket.cpp | 5 +- 8 files changed, 266 insertions(+), 68 deletions(-) create mode 100644 src/stable/serverinterface.cpp create mode 100644 src/stable/serverinterface.h (limited to 'src') diff --git a/src/stable/client.cpp b/src/stable/client.cpp index d2d48d7..159d103 100644 --- a/src/stable/client.cpp +++ b/src/stable/client.cpp @@ -23,16 +23,21 @@ #define BU_PROFILE_END( x ) (void)0 #endif -Bu::Client::Client( class Bu::ClientLinkFactory *pfLink ) : +Bu::Client::Client( int iId, const Bu::ServerInterface &rServer ) : pProto( NULL ), bWantsDisconnect( false ), - pfLink( pfLink ) + iId( iId ), + xServer( rServer ) { } Bu::Client::~Client() { - delete pfLink; +} + +int Bu::Client::getId() const +{ + return iId; } void Bu::Client::processInput() @@ -47,6 +52,11 @@ void Bu::Client::processInput() } } +void Bu::Client::outputReady() +{ + xServer.outputReady( iId ); +} + void Bu::Client::setProtocol( Protocol *pProto ) { Bu::MutexLocker l( mProto ); @@ -161,7 +171,7 @@ void Bu::Client::close() Bu::ClientLink *Bu::Client::getLink() { - return pfLink->createLink( this ); + return NULL; //pfLink->createLink( this ); } void Bu::Client::onMessage( const Bu::String &sMsg ) diff --git a/src/stable/client.h b/src/stable/client.h index abe807e..0ff7914 100644 --- a/src/stable/client.h +++ b/src/stable/client.h @@ -15,6 +15,7 @@ #include "bu/clientbuf.h" #include "bu/mutex.h" #include "bu/readwritemutex.h" +#include "bu/serverinterface.h" #ifndef PROFILE_BU_SERVER // #define PROFILE_BU_SERVER 1 @@ -37,10 +38,13 @@ namespace Bu { friend class Server; public: - Client( Bu::ClientLinkFactory *pfLink ); + Client( int iId, const Bu::ServerInterface &rServer ); virtual ~Client(); + int getId() const; + void processInput(); + void outputReady(); Bu::size write( const Bu::String &sData ); Bu::size write( const void *pData, Bu::size nBytes ); @@ -116,9 +120,10 @@ namespace Bu Bu::Protocol *pProto; Bu::ClientBuf cbBuffer; bool bWantsDisconnect; - class Bu::ClientLinkFactory *pfLink; mutable Bu::Mutex mProto; mutable Bu::Mutex mDisconnect; + int iId; + Bu::ServerInterface xServer; }; } diff --git a/src/stable/clientbuf.cpp b/src/stable/clientbuf.cpp index c6b310f..8c4afc4 100644 --- a/src/stable/clientbuf.cpp +++ b/src/stable/clientbuf.cpp @@ -175,19 +175,16 @@ void Bu::ClientBuf::ClientAccess::close() Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes ) { - Bu::println("ClientAccess::read( ptr, %1 )").arg( iBytes ); char *pBuf = (char *)pBufRaw; Bu::MutexLocker l( mAccess ); // Read from QueueBuf first Bu::size ps = qbPeek.read( pBuf, iBytes ); - Bu::println("ClientAccess::read: attempted qbPeek, got %1\n>%2<").arg( ps ).arg( Bu::String(pBuf, ps) ); iBytes -= ps; pBuf += ps; // Request space left? Try the client if( iBytes > 0 ) { ps += rBuf.accClientFiltered.read( pBuf, iBytes ); - Bu::println("ClientAccess::read: attempted completion from socket buffer, got %1\n>%2<").arg( ps ).arg( Bu::String(pBuf, ps) ); } return ps; } @@ -195,26 +192,22 @@ Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes ) Bu::size Bu::ClientBuf::ClientAccess::peek( void *pData, int iBytes, int iOffset ) { - Bu::println("ClientAccess::peek( ptr, %1, %2 )").arg( iBytes ).arg( iOffset ); Bu::MutexLocker l( mAccess ); // Do we have enough data in the peek buffer to handle this? if( qbPeek.getSize() < iBytes+iOffset ) { - Bu::println("ClientAccess::peek: Insufficient buffered (have %1b, need %2b)").arg( qbPeek.getSize() ).arg( iBytes+iOffset ); // Nope, make an attempt to fill it in. int nDiff = (iBytes+iOffset)-qbPeek.getSize(); // We have to make our own buffer, since iBytes+nOffeset could be bigger // than pData. char *pTmp = new char[nDiff]; Bu::size ps = rBuf.accClientFiltered.read( pTmp, nDiff ); - Bu::println("ClientAccess::peek: Tried to fill buffer, read %1 of needed %2\n>%3<").arg( ps ).arg( nDiff ).arg( Bu::String(pTmp, ps) ); if( ps > 0 ) { // Add the data read to the peek buffer. qbPeek.write( pTmp, ps ); } delete[] pTmp; - Bu::println("ClientAccess::peek: buffer left with %1").arg( qbPeek.getSize() ); } return qbPeek.peek( pData, iBytes, iOffset ); @@ -234,7 +227,6 @@ Bu::size Bu::ClientBuf::ClientAccess::tell() void Bu::ClientBuf::ClientAccess::seek( Bu::size offset ) { - Bu::println("ClientAccess::seek( %1 )").arg( offset ); Bu::MutexLocker l( mAccess ); // For this type of stream seek is basically a destructive skip. It's like // reading the data but with no output buffer. Let's remove data from the @@ -242,7 +234,6 @@ void Bu::ClientBuf::ClientAccess::seek( Bu::size offset ) if( qbPeek.getSize() > 0 ) { Bu::size amount = Bu::buMin( qbPeek.getSize(), offset ); - Bu::println("ClientAccess::seek: buffered: %1, amount: %2").arg( qbPeek.getSize() ).arg( amount ); qbPeek.seek( amount ); offset -= amount; } @@ -250,7 +241,6 @@ void Bu::ClientBuf::ClientAccess::seek( Bu::size offset ) // If there's offset left, then apply it to the underlying stream if( offset > 0 ) { - Bu::println("ClientAccess::seek: seeking remaining %1 in socket buffer").arg( offset ); rBuf.accClientFiltered.seek( offset ); } } diff --git a/src/stable/server.cpp b/src/stable/server.cpp index 7c44c11..cc89f64 100644 --- a/src/stable/server.cpp +++ b/src/stable/server.cpp @@ -8,11 +8,14 @@ #include "bu/server.h" #include #include +#include +#include #include "bu/serversocket.h" #include "bu/client.h" #include "bu/socket.h" #include "bu/config.h" #include "bu/mutexlocker.h" +#include "bu/serverinterface.h" #include "bu/sio.h" @@ -25,14 +28,48 @@ #endif #define RBS 1500 +#define MAX_EVENTS 20 + +namespace Bu +{ + class __ServerCore + { + public: + __ServerCore() + { + fdRead = epoll_create( 1 ); + fdWrite = epoll_create( 1 ); + fdEvent = eventfd( 0, 0 ); + + struct epoll_event ev; + ev.events = EPOLLIN; + ev.data.fd = fdEvent; + epoll_ctl( fdRead, EPOLL_CTL_ADD, fdEvent, &ev ); + epoll_ctl( fdWrite, EPOLL_CTL_ADD, fdEvent, &ev ); + } + + ~__ServerCore() + { + close( fdRead ); + close( fdWrite ); + close( fdEvent ); + } + + Server::fd fdRead; + Server::fd fdWrite; + Server::fd fdEvent; + }; +} Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : + pCore( new Bu::__ServerCore() ), nTimeoutSec( 0 ), nTimeoutUSec( 0 ), - bAutoTick( false ) + bAutoTick( false ), + tMonitorWrite( *this ), + bRunning( true ) { BU_PROFILE_START("server"); - FD_ZERO( &fdActive ); if( iIoWorkers < 1 ) iIoWorkers = 1; @@ -52,11 +89,14 @@ Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : lClientWorker.append( pWorker ); pWorker->start(); } + tMonitorWrite.start(); } Bu::Server::~Server() { shutdown(); + tMonitorWrite.join(); + delete pCore; BU_PROFILE_START("server"); } @@ -68,7 +108,11 @@ void Bu::Server::addServerSocket( Bu::ServerSocket *pSocket ) throw Bu::ExceptionBase("Cannot get file descriptor from " "provided ServerSocket."); } - FD_SET( iFd, &fdActive ); + + struct epoll_event ev; + ev.events = EPOLLIN; + ev.data.fd = iFd; + epoll_ctl( pCore->fdRead, EPOLL_CTL_ADD, iFd, &ev ); hServers.insert( iFd, pSocket ); } @@ -81,26 +125,22 @@ void Bu::Server::setTimeout( int nTimeoutSec, int nTimeoutUSec ) void Bu::Server::scan() { BU_PROFILE_START("scan"); - struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec }; - fd_set fdRead = fdActive; - fd_set fdWrite /* = fdActive*/; - fd_set fdException = fdActive; - - FD_ZERO( &fdWrite ); mClients.lock(); for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) { if( (*i)->hasOutput() ) { - Bu::println("Socket has output..."); - FD_SET( i.getKey(), &fdWrite ); + // Queue it for output here? } } mClients.unlock(); - if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, - &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 ) + struct epoll_event ev[MAX_EVENTS]; + + int iCount = epoll_wait( pCore->fdRead, ev, MAX_EVENTS, nTimeoutSec*1000+nTimeoutUSec/1000 ); + + if( iCount < 0 ) { char buf[1024]; strerror_r( errno, buf, 1024 ); @@ -110,41 +150,29 @@ void Bu::Server::scan() ); } - evIoCycle.clear(); Bu::println("Cycle clear"); - for( int j = 0; j < FD_SETSIZE; j++ ) + for( int j = 0; j < iCount; j++ ) { - if( FD_ISSET( j, &fdRead ) ) + if( hServers.has( ev[j].data.fd ) ) { - if( hServers.has( j ) ) - { - Bu::ServerSocket *pSrv = hServers.get( j ); - addClient( pSrv, pSrv->accept() ); - } - else - { - evIoCycle.increment(); - Bu::println("Increment (read)"); - qIoEvent.enqueue( new Event( j, Event::Read ) ); - } + Bu::ServerSocket *pSrv = hServers.get( ev[j].data.fd ); + addClient( pSrv, pSrv->accept() ); } - if( FD_ISSET( j, &fdWrite ) ) + else { - evIoCycle.increment(); - Bu::println("Increment (write)"); - qIoEvent.enqueue( new Event( j, Event::Write ) ); + qIoEvent.enqueue( new Event( ev[j].data.fd, Event::Read ) ); } } Bu::println("Waiting"); - while( evIoCycle.wait() > 0 ) { } Bu::List lDelete; // Now we just try to write all the pending data on all the sockets. // this could be done better eventually, if we care about the socket // wanting to accept writes (using a select). mClients.lock(); + for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) { if( (*i)->wantsDisconnect() && !(*i)->hasOutput() ) @@ -174,10 +202,18 @@ void Bu::Server::addClient( const Bu::ServerSocket *pSrv, Bu::Socket *pSocket ) { throw Bu::ExceptionBase("No file descriptor?"); } - FD_SET( iFdCli, &fdActive ); + + struct epoll_event ev; + ev.events = EPOLLIN | EPOLLONESHOT; + ev.data.fd = iFdCli; + epoll_ctl( pCore->fdRead, EPOLL_CTL_ADD, iFdCli, &ev ); + + ev.events = 0; + epoll_ctl( pCore->fdWrite, EPOLL_CTL_ADD, iFdCli, &ev ); Client *pClient = new Client( - new SrvClientLinkFactory() + iFdCli, + ServerInterface( *this ) ); { Bu::MutexLocker l( mClients ); @@ -251,6 +287,11 @@ void Bu::Server::tick() void Bu::Server::shutdown() { + { + int64_t iVal = 1; + write( pCore->fdEvent, &iVal, sizeof(int64_t) ); + } + { qIoEvent.stop(); qClientEvent.stop(); @@ -294,6 +335,24 @@ void Bu::Server::shutdown() } hClients.clear(); } + +void Bu::Server::clientReadReady( fd iFd ) +{ + // Re-arm + struct epoll_event ev; + ev.events = EPOLLIN | EPOLLONESHOT; + ev.data.fd = iFd; + epoll_ctl( pCore->fdRead, EPOLL_CTL_MOD, iFd, &ev ); +} + +void Bu::Server::clientWriteReady( fd iFd ) +{ + // Re-arm + struct epoll_event ev; + ev.events = EPOLLOUT | EPOLLONESHOT; + ev.data.fd = iFd; + epoll_ctl( pCore->fdWrite, EPOLL_CTL_MOD, iFd, &ev ); +} void Bu::Server::closeClient( fd iSocket ) { @@ -306,12 +365,59 @@ void Bu::Server::closeClient( fd iSocket ) hClients.erase( iSocket ); pSocket->close(); hSockets.erase( iSocket ); - FD_CLR( iSocket, &fdActive ); + + struct epoll_event ev; + epoll_ctl( pCore->fdRead, EPOLL_CTL_DEL, iSocket, &ev ); + epoll_ctl( pCore->fdWrite, EPOLL_CTL_DEL, iSocket, &ev ); + delete pClient; delete pSocket; BU_PROFILE_END("closeClient"); } +//////// +// WriteMonitor +//// +Bu::Server::WriteMonitor::WriteMonitor( Server &rSrv ) : + rSrv( rSrv ) +{ +} + +Bu::Server::WriteMonitor::~WriteMonitor() +{ +} + +void Bu::Server::WriteMonitor::run() +{ + setName("busrv-writeMon"); + struct epoll_event ev[MAX_EVENTS]; + for(;;) + { + int iCount = epoll_wait( rSrv.pCore->fdWrite, ev, MAX_EVENTS, 5000 ); + if( iCount < 0 ) + { + // Bad error? + return; + } + + for( int j = 0; j < iCount; j++ ) + { + if( ev[j].data.fd == rSrv.pCore->fdEvent ) + { + Bu::println("Bu::Server::WriteMonitor -> got event on fdEvent, exiting..."); + return; + } + else + { + // Queue the write + rSrv.qIoEvent.enqueue( + new Event( ev[j].data.fd, Event::Write ) + ); + } + } + } +} + //////// // Event //// @@ -365,7 +471,6 @@ void Bu::Server::IoWorker::run() if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) ) { delete pEv; - rSrv.evIoCycle.decrement(); continue; } @@ -384,9 +489,6 @@ void Bu::Server::IoWorker::run() } delete pEv; - - Bu::println("decrement"); - rSrv.evIoCycle.decrement(); } } @@ -404,7 +506,6 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) try { iRead = pSocket->read( buf, RBS ); - Bu::println("IoWorker::handleRead: read() -> %1").arg( iRead ); if( iRead == 0 ) { @@ -421,7 +522,8 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) catch( Bu::ExceptionBase &e ) { Bu::println("IoWorker::handleRead: exception, closing: %1").arg( e.what() ); - close( pSocket ); + pClient->disconnect(); + //close( pSocket ); return; } } @@ -430,13 +532,16 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) if( iTotal == 0 ) { Bu::println("IoWorker::handleRead: read nothing, closing"); - close( pSocket ); + pClient->disconnect(); + //close( pSocket ); } else { Bu::Server::fd iFd; pSocket->getFd( iFd ); rSrv.qClientEvent.enqueue( new Event( iFd, Event::Process ) ); + + rSrv.clientReadReady( iFd ); } } @@ -449,7 +554,6 @@ void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket ) int iAmnt = RBS; iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); int iReal = pSocket->write( buf, iAmnt ); - Bu::println("IoWorker::handleWrite: Copy out: iAmnt=%1, iReal=%2\n>%3<").arg( iAmnt ).arg( iReal ).arg( Bu::String( buf, iReal ) ); pClient->cbBuffer.server().seek( iReal ); } } @@ -494,6 +598,10 @@ void Bu::Server::ClientWorker::run() pClient->processInput(); Bu::println("Processing input complete."); Bu::println("*** ClientBuf: input: %1b, output: %2b").arg( pClient->getInputSize() ).arg( pClient->getOutputSize() ); + if( pClient->getOutputSize() > 0 ) + { + rSrv.clientWriteReady( pClient->getId() ); + } delete pEv; } } diff --git a/src/stable/server.h b/src/stable/server.h index e2b7d53..56ac29a 100644 --- a/src/stable/server.h +++ b/src/stable/server.h @@ -10,10 +10,6 @@ #include -#ifndef WIN32 - #include -#endif - #include "bu/string.h" #include "bu/list.h" @@ -62,6 +58,7 @@ namespace Bu */ class Server { + friend class ServerInterface; public: Server( int iIoWorkers=4, int iClientWorkers=8 ); virtual ~Server(); @@ -90,8 +87,12 @@ namespace Bu void shutdown(); - private: + protected: + void clientReadReady( fd iFd ); + void clientWriteReady( fd iFd ); void closeClient( fd iSocket ); + + private: class SrvClientLink : public Bu::ClientLink { public: @@ -113,6 +114,19 @@ namespace Bu virtual Bu::ClientLink *createLink( Bu::Client *pClient ); }; + class WriteMonitor : public Bu::Thread + { + public: + WriteMonitor( Server &rSrv ); + virtual ~WriteMonitor(); + + protected: + virtual void run(); + + private: + Server &rSrv; + }; + class Event { public: @@ -168,9 +182,9 @@ namespace Bu }; friend class Bu::Server::ClientWorker; + class __ServerCore *pCore; int nTimeoutSec; int nTimeoutUSec; - fd_set fdActive; typedef Hash SrvHash; SrvHash hServers; typedef Hash ClientHash; @@ -188,7 +202,8 @@ namespace Bu typedef List ClientWorkerList; IoWorkerList lIoWorker; ClientWorkerList lClientWorker; - Bu::CounterEvent evIoCycle; + WriteMonitor tMonitorWrite; + bool bRunning; }; } diff --git a/src/stable/serverinterface.cpp b/src/stable/serverinterface.cpp new file mode 100644 index 0000000..0e3122e --- /dev/null +++ b/src/stable/serverinterface.cpp @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2007-2019 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#include "bu/serverinterface.h" +#include "bu/server.h" + +Bu::ServerInterface::ServerInterface( Server &rSrv ) : + rSrv( rSrv ) +{ +} + +Bu::ServerInterface::ServerInterface( const ServerInterface &rSrc ) : + rSrv( rSrc.rSrv ) +{ +} + +Bu::ServerInterface::~ServerInterface() +{ +} + +void Bu::ServerInterface::outputReady( int iClientId ) +{ + rSrv.clientWriteReady( iClientId ); +} + diff --git a/src/stable/serverinterface.h b/src/stable/serverinterface.h new file mode 100644 index 0000000..edc8335 --- /dev/null +++ b/src/stable/serverinterface.h @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2007-2019 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#ifndef BU_SERVER_INTERFACE_H +#define BU_SERVER_INTERFACE_H + +#include + +namespace Bu +{ + class Server; + + class ServerInterface + { + friend class Bu::Server; + private: + ServerInterface( Bu::Server &rSrv ); + public: + ServerInterface( const Bu::ServerInterface &rSrc ); + ~ServerInterface(); + + /** + * Clients will call this when they have filled the output buffer and + * are ready for that data to be sent. This only needs to be called + * when async output is generated, that is when output is generated not + * in the processInput method. + */ + void outputReady( int iClientId ); + + private: + Bu::Server &rSrv; + }; +} + +#endif + diff --git a/src/unstable/protocolwebsocket.cpp b/src/unstable/protocolwebsocket.cpp index fa2d882..2bec1ce 100644 --- a/src/unstable/protocolwebsocket.cpp +++ b/src/unstable/protocolwebsocket.cpp @@ -21,8 +21,8 @@ #include -//#define DEBUG( X ) { } (void)0 -#define DEBUG( X ) { X; } (void)0 +#define DEBUG( X ) { } (void)0 +//#define DEBUG( X ) { X; } (void)0 Bu::ProtocolWebSocket::ProtocolWebSocket() : eStatus( stProtoId ) @@ -120,6 +120,7 @@ void Bu::ProtocolWebSocket::writeMessage( const Bu::String &sData, return; pClient->write( cHeader, idx ); pClient->write( sData ); + pClient->outputReady(); } bool Bu::ProtocolWebSocket::stateProtoId() -- cgit v1.2.3