From 1cb0fc6ab3f05e37f8c4c0bf5549b320c8b89078 Mon Sep 17 00:00:00 2001 From: Mike Buland Date: Mon, 29 Jan 2018 00:47:50 -0800 Subject: Changes related to debugging an issue. It may not have had as much to do with low-level protocol details, and a lot of this can be reverted, but I can't revert it right now. I'll look it over later in the week. --- src/stable/client.cpp | 32 ++++++++++++++++++++++++++++++++ src/stable/client.h | 2 ++ src/stable/multiserver.cpp | 6 +++++- src/stable/string.cpp | 10 +++++----- src/unstable/protocolwebsocket.cpp | 23 ++++++++++++++++++++--- src/unstable/protocolwebsocket.h | 3 +++ src/unstable/readwritemutex.cpp | 4 ++-- 7 files changed, 69 insertions(+), 11 deletions(-) diff --git a/src/stable/client.cpp b/src/stable/client.cpp index ca86f37..43ae83b 100644 --- a/src/stable/client.cpp +++ b/src/stable/client.cpp @@ -25,11 +25,13 @@ Bu::Client::Client( Bu::TcpSocket *pSocket, bWantsDisconnect( false ), pfLink( pfLink ) { + Bu::ReadWriteMutex::WriteLocker lGlobal( mGlobal ); lFilts.prepend( pSocket ); } Bu::Client::~Client() { + Bu::ReadWriteMutex::WriteLocker lGlobal( mGlobal ); for( FilterList::iterator i = lFilts.begin(); i; i++ ) { delete *i; @@ -40,6 +42,7 @@ Bu::Client::~Client() void Bu::Client::processInput() { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); mRead.lock(); char buf[RBS]; Bu::size nRead, nTotal=0; @@ -85,6 +88,7 @@ void Bu::Client::processInput() void Bu::Client::processOutput() { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); mWrite.lock(); char buf[RBS]; if( qbWrite.getSize() > 0 ) @@ -100,17 +104,20 @@ void Bu::Client::processOutput() void Bu::Client::setProtocol( Protocol *pProto ) { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); this->pProto = pProto; this->pProto->onNewConnection( this ); } Bu::Protocol *Bu::Client::getProtocol() { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); return pProto; } void Bu::Client::clearProtocol() { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); pProto = NULL; } /* @@ -127,127 +134,149 @@ Bu::String &Bu::Client::getOutput() bool Bu::Client::isOpen() { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); if( !pTopStream ) return false; return pTopStream->isOpen(); } Bu::size Bu::Client::write( const Bu::String &sData ) { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbWrite.write( sData.getStr(), sData.getSize() ); } Bu::size Bu::Client::write( const void *pData, Bu::size nBytes ) { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbWrite.write( pData, nBytes ); } Bu::size Bu::Client::write( int8_t nData ) { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbWrite.write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::write( int16_t nData ) { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbWrite.write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::write( int32_t nData ) { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbWrite.write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::write( int64_t nData ) { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbWrite.write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::write( uint8_t nData ) { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbWrite.write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::write( uint16_t nData ) { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbWrite.write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::write( uint32_t nData ) { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbWrite.write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::write( uint64_t nData ) { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbWrite.write( (const char *)&nData, sizeof(nData) ); } Bu::size Bu::Client::read( void *pData, Bu::size nBytes ) { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbRead.read( pData, nBytes ); } Bu::size Bu::Client::peek( void *pData, int nBytes, int nOffset ) { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbRead.peek( pData, nBytes, nOffset ); } Bu::size Bu::Client::getInputSize() { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbRead.getSize(); } Bu::size Bu::Client::getOutputSize() { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mWrite ); return qbWrite.getSize(); } const Bu::TcpSocket *Bu::Client::getSocket() const { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); return pSocket; } void Bu::Client::disconnect() { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); bWantsDisconnect = true; } bool Bu::Client::wantsDisconnect() { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); return bWantsDisconnect; } void Bu::Client::close() { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); pTopStream->close(); } Bu::ClientLink *Bu::Client::getLink() { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); return pfLink->createLink( this ); } void Bu::Client::onMessage( const Bu::String &sMsg ) { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); if( pProto ) pProto->onMessage( this, sMsg ); } void Bu::Client::tick() { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); if( pProto ) pProto->onTick( this ); } @@ -259,6 +288,7 @@ Bu::size Bu::Client::tell() void Bu::Client::seek( Bu::size offset ) { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mRead ); return qbRead.seek( offset ); } @@ -280,11 +310,13 @@ bool Bu::Client::isEos() void Bu::Client::flush() { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); processOutput(); } bool Bu::Client::canRead() { + Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); Bu::MutexLocker l( mRead ); return qbRead.getSize() > 0; } diff --git a/src/stable/client.h b/src/stable/client.h index ac882eb..6492a6e 100644 --- a/src/stable/client.h +++ b/src/stable/client.h @@ -14,6 +14,7 @@ #include "bu/string.h" #include "bu/queuebuf.h" #include "bu/mutex.h" +#include "bu/readwritemutex.h" namespace Bu { @@ -130,6 +131,7 @@ namespace Bu class Bu::ClientLinkFactory *pfLink; Bu::Mutex mRead; Bu::Mutex mWrite; + mutable Bu::ReadWriteMutex mGlobal; }; } diff --git a/src/stable/multiserver.cpp b/src/stable/multiserver.cpp index 6fd7ff3..c9e86cf 100644 --- a/src/stable/multiserver.cpp +++ b/src/stable/multiserver.cpp @@ -11,6 +11,8 @@ #include "bu/config.h" +#include "bu/sio.h" + Bu::MultiServer::MultiServer() { } @@ -40,7 +42,9 @@ void Bu::MultiServer::onNewConnection( Bu::Client *pClient, int nPort ) void Bu::MultiServer::onClosedConnection( Bu::Client *pClient ) { - delete pClient->getProtocol(); + Bu::Protocol *pProto = pClient->getProtocol(); + pClient->clearProtocol(); + delete pProto; } void Bu::MultiServer::shutdown() diff --git a/src/stable/string.cpp b/src/stable/string.cpp index 1579826..ce679fe 100644 --- a/src/stable/string.cpp +++ b/src/stable/string.cpp @@ -201,7 +201,7 @@ void Bu::String::append( const char *pData, long nStart, long nLen ) _hardCopy(); - if( core->pLast && core->pLast->nLength < nMinSize ) + if( core->pLast && core->pLast->nLength+1 < nMinSize ) { int nAmnt = nMinSize - core->pLast->nLength; if( nAmnt > nLen ) @@ -228,9 +228,9 @@ void Bu::String::append( const char *pData, long nStart, long nLen ) void Bu::String::append( const char &cData ) { - if( core->pLast && core->pLast->nLength < nMinSize ) + _hardCopy(); + if( core->pLast && core->pLast->nLength+1 < nMinSize ) { - _hardCopy(); core->pLast->pData[core->pLast->nLength] = cData; ++core->pLast->nLength; ++core->nLength; // pLast->pData[pLast->nLength] = (char)0; @@ -615,9 +615,9 @@ Bu::String &Bu::String::operator+=( const Bu::String::const_iterator &i ) Bu::String &Bu::String::operator+=( const char cData ) { - if( core->pLast && core->pLast->nLength < nMinSize ) - { _hardCopy(); + if( core->pLast && core->pLast->nLength+1 < nMinSize ) + { core->pLast->pData[core->pLast->nLength] = cData; ++core->pLast->nLength; ++core->nLength; // pLast->pData[pLast->nLength] = (char)0; diff --git a/src/unstable/protocolwebsocket.cpp b/src/unstable/protocolwebsocket.cpp index 9200904..576249c 100644 --- a/src/unstable/protocolwebsocket.cpp +++ b/src/unstable/protocolwebsocket.cpp @@ -9,6 +9,7 @@ #include "bu/protocolwebsocket.h" #include "bu/sio.h" +#include "bu/fmt.h" #include "bu/client.h" #include "bu/membuf.h" @@ -16,6 +17,8 @@ #include "bu/sha1.h" #include "bu/json.h" +#include "bu/mutexlocker.h" + #include Bu::ProtocolWebSocket::ProtocolWebSocket() : @@ -25,11 +28,17 @@ Bu::ProtocolWebSocket::ProtocolWebSocket() : Bu::ProtocolWebSocket::~ProtocolWebSocket() { + mClient.lock(); + printf("ProtocolWebSocket::~ProtocolWebSocket(): Clearing pClient (%p)\n", (void *)this->pClient ); + this->pClient = NULL; + mClient.unlock(); } void Bu::ProtocolWebSocket::onNewConnection( Bu::Client *pClient ) { + mClient.lock(); this->pClient = pClient; + mClient.unlock(); } void Bu::ProtocolWebSocket::onNewData( Bu::Client * /*pClient*/ ) @@ -95,6 +104,9 @@ void Bu::ProtocolWebSocket::writeMessage( const Bu::String &sData, } Bu::println(""); */ + Bu::MutexLocker l( mClient ); + if( pClient == NULL ) + return; pClient->write( cHeader, idx ); pClient->write( sData ); } @@ -108,18 +120,21 @@ bool Bu::ProtocolWebSocket::stateProtoId() Bu::StringList lChunks = sLine.split(' '); if( lChunks.getSize() != 3 ) { + Bu::MutexLocker l( mClient ); pClient->disconnect(); return false; } Bu::StringList::iterator i = lChunks.begin(); if( *i != "GET" ) { + Bu::MutexLocker l( mClient ); pClient->disconnect(); return false; } sPath = *(++i); if( *(++i) != "HTTP/1.1" ) { + Bu::MutexLocker l( mClient ); pClient->disconnect(); return false; } @@ -147,6 +162,7 @@ bool Bu::ProtocolWebSocket::stateHandshake() int iPos = sLine.findIdx(':'); if( iPos < 0 ) { + Bu::MutexLocker l( mClient ); pClient->disconnect(); return false; } @@ -183,6 +199,7 @@ bool Bu::ProtocolWebSocket::readHttpHdrLine( Bu::String &sLine ) bool Bu::ProtocolWebSocket::processHeaders() { + Bu::MutexLocker l( mClient ); if( !headerMatch("Connection", "Upgrade") || !headerMatch("Upgrade", "websocket") || !headerMatch("Sec-WebSocket-Version", "13") ) @@ -209,7 +226,7 @@ bool Bu::ProtocolWebSocket::processHeaders() sum.writeResult( bOut ); bOut.stop(); - Bu::println("accept: %1").arg( mbOut.getString() ); +// Bu::println("accept: %1").arg( mbOut.getString() ); pClient->write("HTTP/1.1 101 Switching Protocols\r\n" "Upgrade: websocket\r\n" @@ -290,8 +307,8 @@ bool Bu::ProtocolWebSocket::parseMessage() } } - Bu::println(""); - Bu::println("Data: >>%1<<").arg( sData ); +// Bu::println(""); +// Bu::println("Data: >>%1<<").arg( sData ); onNewMessage( sData, eOp ); diff --git a/src/unstable/protocolwebsocket.h b/src/unstable/protocolwebsocket.h index cf00d34..4a3d0a1 100644 --- a/src/unstable/protocolwebsocket.h +++ b/src/unstable/protocolwebsocket.h @@ -11,6 +11,8 @@ #include "bu/protocol.h" #include "bu/hash.h" +#include "bu/mutex.h" + namespace Bu { class ProtocolWebSocket : public Bu::Protocol @@ -58,6 +60,7 @@ namespace Bu Status eStatus; Bu::String sPath; Bu::Hash hHeader; + Bu::Mutex mClient; }; } diff --git a/src/unstable/readwritemutex.cpp b/src/unstable/readwritemutex.cpp index 651a343..9719bfb 100644 --- a/src/unstable/readwritemutex.cpp +++ b/src/unstable/readwritemutex.cpp @@ -51,11 +51,11 @@ void Bu::ReadWriteMutex::unlockRead() // // The bWantWrite could be a counter like the read lock counter, however -// once a write lock occurs and bWantWrite is set at least one wite +// once a write lock occurs and bWantWrite is set at least one write // will definately occur. In practice most writes all happen one after // the other anyway and this way reads get a chance to mingle in. // -// Really, just getting all currint reads to stop so a write can happen +// Really, just getting all current reads to stop so a write can happen // I think is sufficient right now. // void Bu::ReadWriteMutex::lockWrite() -- cgit v1.2.3