From 412173a23c88a49ebaeb982e0c7eeddc5662b8cc Mon Sep 17 00:00:00 2001 From: Mike Buland Date: Sat, 29 Jul 2023 00:19:13 -0700 Subject: Many bugfixes. It's almost working! --- src/stable/client.h | 1 - src/stable/clientbuf.cpp | 19 +++++- src/stable/clientbuf.h | 3 +- src/stable/counterevent.h | 154 ++++++++++++++++++++++++++++++++++++++++++++++ src/stable/server.cpp | 64 +++++++++++++------ src/stable/server.h | 12 ++-- src/stable/thread.cpp | 5 ++ src/stable/thread.h | 6 ++ 8 files changed, 234 insertions(+), 30 deletions(-) create mode 100644 src/stable/counterevent.h (limited to 'src/stable') diff --git a/src/stable/client.h b/src/stable/client.h index 4cd8e3f..abe807e 100644 --- a/src/stable/client.h +++ b/src/stable/client.h @@ -54,7 +54,6 @@ namespace Bu Bu::size write( uint64_t nData ); Bu::size read( void *pData, Bu::size nBytes ); Bu::size peek( void *pData, int nBytes, int nOffset=0 ); - void seek( int nBytes ); Bu::size getInputSize(); Bu::size getOutputSize(); diff --git a/src/stable/clientbuf.cpp b/src/stable/clientbuf.cpp index 493e577..4e59120 100644 --- a/src/stable/clientbuf.cpp +++ b/src/stable/clientbuf.cpp @@ -2,10 +2,12 @@ #include "bu/mutexlocker.h" +#include "bu/sio.h" + Bu::ClientBuf::ClientBuf() : accClientRaw( *this ), accServer( *this ), - accClientFiltered( &accClient ), + //accClientFiltered( &accClientRaw ), accClient( *this ) { } @@ -156,7 +158,7 @@ Bu::String Bu::ClientBuf::ClientAccessRaw::getLocation() const ///////// // ClientAccess /// - +#define accClientFiltered accClientRaw Bu::ClientBuf::ClientAccess::ClientAccess( Bu::ClientBuf &rBuf ) : rBuf( rBuf ) { @@ -173,16 +175,19 @@ void Bu::ClientBuf::ClientAccess::close() Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes ) { + Bu::println("ClientAccess::read( ptr, %1 )").arg( iBytes ); char *pBuf = (char *)pBufRaw; Bu::MutexLocker l( mAccess ); // Read from QueueBuf first Bu::size ps = qbPeek.read( pBuf, iBytes ); + Bu::println("ClientAccess::read: attempted qbPeek, got %1\n>%2<").arg( ps ).arg( Bu::String(pBuf, ps) ); iBytes -= ps; pBuf += ps; // Request space left? Try the client if( iBytes > 0 ) { ps += rBuf.accClientFiltered.read( pBuf, iBytes ); + Bu::println("ClientAccess::read: attempted completion from socket buffer, got %1\n>%2<").arg( ps ).arg( Bu::String(pBuf, ps) ); } return ps; } @@ -190,22 +195,26 @@ Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes ) Bu::size Bu::ClientBuf::ClientAccess::peek( void *pData, int iBytes, int iOffset ) { + Bu::println("ClientAccess::peek( ptr, %1, %2 )").arg( iBytes ).arg( iOffset ); Bu::MutexLocker l( mAccess ); // Do we have enough data in the peek buffer to handle this? if( qbPeek.getSize() < iBytes+iOffset ) { + Bu::println("ClientAccess::peek: Insufficient buffered data (%1)").arg( qbPeek.getSize() ); // Nope, make an attempt to fill it in. int nDiff = iBytes-qbPeek.getSize(); // We have to make our own buffer, since iBytes+nOffeset could be bigger // than pData. char *pTmp = new char[nDiff]; Bu::size ps = rBuf.accClientFiltered.read( pTmp, nDiff ); + Bu::println("ClientAccess::peek: Tried to fill buffer, read %1 of needed %2\n>%3<").arg( ps ).arg( nDiff ).arg( Bu::String(pTmp, ps) ); if( ps > 0 ) { // Add the data read to the peek buffer. qbPeek.write( pTmp, ps ); } delete[] pTmp; + Bu::println("ClientAccess::peek: buffer left with %1").arg( qbPeek.getSize() ); } return qbPeek.peek( pData, iBytes, iOffset ); @@ -225,13 +234,15 @@ Bu::size Bu::ClientBuf::ClientAccess::tell() void Bu::ClientBuf::ClientAccess::seek( Bu::size offset ) { + Bu::println("ClientAccess::seek( %1 )").arg( offset ); Bu::MutexLocker l( mAccess ); // For this type of stream seek is basically a destructive skip. It's like // reading the data but with no output buffer. Let's remove data from the // peek buffer first. if( qbPeek.getSize() > 0 ) { - Bu::size amount = Bu::buMax( qbPeek.getSize(), offset ); + Bu::size amount = Bu::buMin( qbPeek.getSize(), offset ); + Bu::println("ClientAccess::seek: buffered: %1, amount: %2").arg( qbPeek.getSize() ).arg( amount ); qbPeek.seek( amount ); offset -= amount; } @@ -239,6 +250,7 @@ void Bu::ClientBuf::ClientAccess::seek( Bu::size offset ) // If there's offset left, then apply it to the underlying stream if( offset > 0 ) { + Bu::println("ClientAccess::seek: seeking remaining %1 in socket buffer").arg( offset ); rBuf.accClientFiltered.seek( offset ); } } @@ -329,6 +341,7 @@ Bu::String Bu::ClientBuf::ClientAccess::getLocation() const { return "ClientBuf"; } +#undef accClientFiltered ///////// // ServerAccess diff --git a/src/stable/clientbuf.h b/src/stable/clientbuf.h index 7781b6a..50b6617 100644 --- a/src/stable/clientbuf.h +++ b/src/stable/clientbuf.h @@ -164,12 +164,13 @@ namespace Bu private: ClientAccessRaw accClientRaw; ServerAccess accServer; - Bu::StreamStack accClientFiltered; + //Bu::StreamStack accClientFiltered; ClientAccess accClient; Bu::QueueBuf qbOutput; Bu::QueueBuf qbInput; Bu::Mutex mOutput; Bu::Mutex mInput; + friend class Bu::ClientBuf::ClientAccess; friend class Bu::ClientBuf::ClientAccessRaw; friend class Bu::ClientBuf::ServerAccess; diff --git a/src/stable/counterevent.h b/src/stable/counterevent.h new file mode 100644 index 0000000..e7ad7a1 --- /dev/null +++ b/src/stable/counterevent.h @@ -0,0 +1,154 @@ +/* + * Copyright (C) 2007-2019 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#ifndef BU_COUNTER_EVENT_H +#define BU_COUNTER_EVENT_H + +#include + +#include "bu/mutex.h" +#include "bu/condition.h" + +namespace Bu +{ + /** + * Represents a true/false state that controls thread synchronization. This + * is primarilly intended to control the synchronization state of + * multithreaded services. For example, telling all threads when to exit. + * + * An Event is either set or unset. If the Event is unset then it can be + * waited on for something to happen. As soon as the Event is set all + * waiting threads are released and new requests to wait are ignored until + * the Event is cleared again. + * + * Threads can also be woken up without setting the Event, which may be + * handy in certain circumstances. + *@ingroup Threading + */ + class CounterEvent + { + public: + CounterEvent() : + iCount( 0 ) + { + } + + ~CounterEvent() + { + } + + /** + * Wait indefinitely for the Event to trigger. If the event is already + * set, then return immediately. It's important to note that this may + * return at any time, not only when the Event is set, so examining the + * return value is important. + *@returns the set status of the Event. + */ + int wait() + { + cBlock.lock(); + if( iCount == 0 ) + { + cBlock.unlock(); + return iCount; + } + cBlock.wait(); + int iRet = iCount; + cBlock.unlock(); + return iRet; + } + + /** + * Wait for up to nSec seconds and nUSec nanoseconds for the event to + * trigger. If the Event is already set then return immediately. + *@returns the set status of the Event. + */ + int wait( int nSec, int nUSec ) + { + cBlock.lock(); + if( iCount == 0 ) + { + cBlock.unlock(); + return iCount; + } + cBlock.wait( nSec, nUSec ); + bool iRet = iCount; + cBlock.unlock(); + return iRet; + } + + /** + * Allow one of the waiting threads to unlock without updating the set + * state of the Event. + */ + void unblockOne() + { + cBlock.lock(); + cBlock.signal(); + cBlock.unlock(); + } + + /** + * Allow all waiting threads to unlock and proceed without updating the + * set state of the Event. + */ + void unblockAll() + { + cBlock.lock(); + cBlock.broadcast(); + cBlock.unlock(); + } + + /** + * Find out if the Event is in the set state or not. + *@returns True if set, false otherwise. + */ + bool isZero() + { + cBlock.lock(); + bool bRet = (iCount == 0); + cBlock.unlock(); + return bRet; + } + + /** + * Sets the Event's state to true and triggers all waiting threads. + */ + void decrement() + { + cBlock.lock(); + iCount--; if( iCount < 0 ) iCount = 0; + if( iCount == 0 ) + cBlock.broadcast(); + cBlock.unlock(); + } + + void increment() + { + cBlock.lock(); + iCount++; + cBlock.unlock(); + } + + /** + * Sets the Event's state to false. This does NOT trigger any waiting + * threads. + */ + void clear() + { + cBlock.lock(); + iCount = 0; + cBlock.unlock(); + } + + private: + Condition cBlock; /**< The condition for blocking dequeues. */ + int iCount; + }; +} + +#endif diff --git a/src/stable/server.cpp b/src/stable/server.cpp index 592230d..3f03a63 100644 --- a/src/stable/server.cpp +++ b/src/stable/server.cpp @@ -14,6 +14,8 @@ #include "bu/config.h" #include "bu/mutexlocker.h" +#include "bu/sio.h" + #ifdef PROFILE_BU_SERVER #define BU_PROFILE_START( x ) Bu::Profiler::getInstance().startEvent( x ) #define BU_PROFILE_END( x ) Bu::Profiler::getInstance().endEvent( x ) @@ -39,14 +41,14 @@ Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : for( int j = 0; j < iIoWorkers; j++ ) { - IoWorker *pWorker = new IoWorker( *this, qIoEvent, qClientEvent ); + IoWorker *pWorker = new IoWorker( *this ); lIoWorker.append( pWorker ); pWorker->start(); } for( int j = 0; j < iClientWorkers; j++ ) { - ClientWorker *pWorker = new ClientWorker( *this, qClientEvent ); + ClientWorker *pWorker = new ClientWorker( *this ); lClientWorker.append( pWorker ); pWorker->start(); } @@ -90,7 +92,10 @@ void Bu::Server::scan() for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) { if( (*i)->hasOutput() ) + { + Bu::println("Socket has output..."); FD_SET( i.getKey(), &fdWrite ); + } } mClients.unlock(); @@ -105,6 +110,9 @@ void Bu::Server::scan() ); } + evIoCycle.clear(); + Bu::println("Cycle clear"); + for( int j = 0; j < FD_SETSIZE; j++ ) { if( FD_ISSET( j, &fdRead ) ) @@ -116,15 +124,22 @@ void Bu::Server::scan() } else { + evIoCycle.increment(); + Bu::println("Increment (read)"); qIoEvent.enqueue( new Event( j, Event::Read ) ); } } if( FD_ISSET( j, &fdWrite ) ) { + evIoCycle.increment(); + Bu::println("Increment (write)"); qIoEvent.enqueue( new Event( j, Event::Write ) ); } } + Bu::println("Waiting"); + while( evIoCycle.wait() > 0 ) { } + Bu::List lDelete; // Now we just try to write all the pending data on all the sockets. // this could be done better eventually, if we care about the socket @@ -155,7 +170,7 @@ void Bu::Server::addClient( const Bu::ServerSocket *pSrv, Bu::Socket *pSocket ) BU_PROFILE_START("addClient"); int iFdSrv; int iFdCli; - if( !pSrv->getFd( iFdSrv ) || !pSrv->getFd( iFdCli ) ) + if( !pSrv->getFd( iFdSrv ) || !pSocket->getFd( iFdCli ) ) { throw Bu::ExceptionBase("No file descriptor?"); } @@ -324,12 +339,8 @@ Bu::Server::Event::Operation Bu::Server::Event::getOperation() const // IoWorker //// -Bu::Server::IoWorker::IoWorker( Bu::Server &rSrv, - Bu::Server::EventQueue &qIoEvent, - Bu::Server::EventQueue &qClientEvent ) : - rSrv( rSrv ), - qIoEvent( qIoEvent ), - qClientEvent( qClientEvent ) +Bu::Server::IoWorker::IoWorker( Bu::Server &rSrv ) : + rSrv( rSrv ) { } @@ -339,17 +350,21 @@ Bu::Server::IoWorker::~IoWorker() void Bu::Server::IoWorker::run() { - while( qIoEvent.isRunning() ) + setName("busrv-ioWorker"); + while( rSrv.qIoEvent.isRunning() ) { - Event *pEv = qIoEvent.dequeue(); + Event *pEv = rSrv.qIoEvent.dequeue( true ); if( pEv == NULL ) + { continue; + } Client *pClient; Socket *pSocket; if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) ) { delete pEv; + rSrv.evIoCycle.decrement(); continue; } @@ -368,6 +383,9 @@ void Bu::Server::IoWorker::run() } delete pEv; + + Bu::println("decrement"); + rSrv.evIoCycle.decrement(); } } @@ -377,12 +395,15 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) Bu::size iRead; Bu::size iTotal=0; + Bu::println("IoWorker::handleRead: starting"); + BU_PROFILE_START("client.read"); for(;;) { try { iRead = pSocket->read( buf, RBS ); + Bu::println("IoWorker::handleRead: read() -> %1").arg( iRead ); if( iRead == 0 ) { @@ -398,6 +419,7 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) } catch( Bu::ExceptionBase &e ) { + Bu::println("IoWorker::handleRead: exception, closing: %1").arg( e.what() ); close( pSocket ); break; } @@ -406,24 +428,27 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) if( iTotal == 0 ) { + Bu::println("IoWorker::handleRead: read nothing, closing"); close( pSocket ); } else { Bu::Server::fd iFd; pSocket->getFd( iFd ); - qClientEvent.enqueue( new Event( iFd, Event::Process ) ); + rSrv.qClientEvent.enqueue( new Event( iFd, Event::Process ) ); } } void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket ) { + Bu::println("IoWorker::handleWrite() "); char buf[RBS]; if( pClient->hasOutput() > 0 ) { int iAmnt = RBS; iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); int iReal = pSocket->write( buf, iAmnt ); + Bu::println("IoWorker::handleWrite: Copy out: iAmnt=%1, iReal=%2\n>%3<").arg( iAmnt ).arg( iReal ).arg( Bu::String( buf, iReal ) ); pClient->cbBuffer.server().seek( iReal ); } } @@ -439,10 +464,8 @@ void Bu::Server::IoWorker::close( Bu::Socket *pSocket ) // ClientWorker //// -Bu::Server::ClientWorker::ClientWorker( Bu::Server &rSrv, - Bu::Server::EventQueue &qEvent ) : - rSrv( rSrv ), - qEvent( qEvent ) +Bu::Server::ClientWorker::ClientWorker( Bu::Server &rSrv ) : + rSrv( rSrv ) { } @@ -452,9 +475,10 @@ Bu::Server::ClientWorker::~ClientWorker() void Bu::Server::ClientWorker::run() { - while( qEvent.isRunning() ) + setName("busrv-cntWorker"); + while( rSrv.qClientEvent.isRunning() ) { - Event *pEv = qEvent.dequeue(); + Event *pEv = rSrv.qClientEvent.dequeue( true ); if( pEv == NULL ) continue; @@ -464,8 +488,10 @@ void Bu::Server::ClientWorker::run() delete pEv; continue; } - + + Bu::println("Processing input..."); pClient->processInput(); + Bu::println("Processing input complete."); delete pEv; } } diff --git a/src/stable/server.h b/src/stable/server.h index d66d9d5..e2b7d53 100644 --- a/src/stable/server.h +++ b/src/stable/server.h @@ -22,6 +22,7 @@ #include "bu/hash.h" #include "bu/synchroqueue.h" #include "bu/thread.h" +#include "bu/counterevent.h" #include "bu/config.h" @@ -137,8 +138,7 @@ namespace Bu class IoWorker : public Bu::Thread { public: - IoWorker( Server &rSrv, EventQueue &qIoEvent, - EventQueue &qClientEvent ); + IoWorker( Server &rSrv ); virtual ~IoWorker(); protected: @@ -151,14 +151,13 @@ namespace Bu private: Server &rSrv; - EventQueue &qIoEvent; - EventQueue &qClientEvent; }; + friend class Bu::Server::IoWorker; class ClientWorker : public Bu::Thread { public: - ClientWorker( Server &rSrv, EventQueue &qEvent ); + ClientWorker( Server &rSrv ); virtual ~ClientWorker(); protected: @@ -166,8 +165,8 @@ namespace Bu private: Server &rSrv; - EventQueue &qEvent; }; + friend class Bu::Server::ClientWorker; int nTimeoutSec; int nTimeoutUSec; @@ -189,6 +188,7 @@ namespace Bu typedef List ClientWorkerList; IoWorkerList lIoWorker; ClientWorkerList lClientWorker; + Bu::CounterEvent evIoCycle; }; } diff --git a/src/stable/thread.cpp b/src/stable/thread.cpp index 5fe034a..99239e9 100644 --- a/src/stable/thread.cpp +++ b/src/stable/thread.cpp @@ -77,3 +77,8 @@ void Bu::Thread::yield() sched_yield(); } +void Bu::Thread::setName( const char *sName ) +{ + pthread_setname_np( ptHandle, sName ); +} + diff --git a/src/stable/thread.h b/src/stable/thread.h index 5174d1c..0a64390 100644 --- a/src/stable/thread.h +++ b/src/stable/thread.h @@ -124,6 +124,12 @@ namespace Bu */ void yield(); + /** + * Sets the name of the thread. The sName parameter must be a C string + * with a max length of 16 bytes including null terminator. + */ + void setName( const char *sName ); + private: /** * This is the hidden-heart of the thread system. While run is what the -- cgit v1.2.3