From e343acc2548fba7670977029da8373a0e58fa25a Mon Sep 17 00:00:00 2001 From: Mike Buland Date: Mon, 5 Aug 2024 10:40:14 -0700 Subject: Added loads and loads of debugging output to myriad. There's an issue somewhere sometimes and streams are being truncated. My guess is a multi-threaded issue. --- src/stable/myriad.cpp | 71 +++++++++++++++++++++++++++++++++++++++++++++ src/stable/myriadstream.cpp | 5 ++++ 2 files changed, 76 insertions(+) (limited to 'src/stable') diff --git a/src/stable/myriad.cpp b/src/stable/myriad.cpp index 4d7595a..86f651e 100644 --- a/src/stable/myriad.cpp +++ b/src/stable/myriad.cpp @@ -18,6 +18,8 @@ using Bu::Fmt; #define Myriad_MAGIC_CODE ((unsigned char *)"\x0a\xd3\xfa\x84") +#define TRACE( x ) Bu::println("%1:%2: %3: %4 - %5").arg(__FILE__).arg( __LINE__ ).arg(__PRETTY_FUNCTION__).arg(sStore.getLocation()).arg(x) + namespace Bu { subExceptionDef( MyriadException ) @@ -53,11 +55,13 @@ Bu::Myriad::Myriad( Bu::Stream &sStore, int iBlockSize, int iPreallocate ) : Bu::Myriad::~Myriad() { mActiveBlocks.lock(); + TRACE("mActiveBlocks locked."); if( !hActiveBlocks.isEmpty() ) { sio << "Bu::Myriad::~Myriad(): Error: There are " << hActiveBlocks.getSize() << " unsynced blocks!" << sio.nl; } + TRACE("mActiveBlocks unlocking..."); mActiveBlocks.unlock(); sync(); @@ -72,6 +76,7 @@ void Bu::Myriad::sync() updateHeader(); mActiveBlocks.lock(); + TRACE("mActiveBlocks locked."); for( BlockHash::iterator i = hActiveBlocks.begin(); i; i++ ) { if( (*i)->bChanged ) @@ -79,12 +84,14 @@ void Bu::Myriad::sync() syncBlock( *i ); } } + TRACE("mActiveBlocks unlocked..."); mActiveBlocks.unlock(); } void Bu::Myriad::initialize() { MutexLocker mLock( mHeader ); + TRACE("mHeader locked."); lFreeBlocks.clear(); sStore.setPosEnd( 0 ); Bu::size iSize = sStore.tell(); @@ -92,20 +99,30 @@ void Bu::Myriad::initialize() unsigned char buf[4]; if( sStore.read( buf, 4 ) < 4 ) + { + TRACE("mHeader unlocked..."); throw MyriadException( MyriadException::emptyStream, "Input stream appears to be empty."); + } if( memcmp( buf, Myriad_MAGIC_CODE, 4 ) ) { + TRACE("mHeader unlocked..."); throw MyriadException( MyriadException::invalidFormat, "Stream does not appear to be a valid Myriad format."); } sStore.read( buf, 2 ); if( buf[0] != 1 ) + { + TRACE("mHeader unlocked..."); throw MyriadException( MyriadException::badVersion, "We can only handle version 1 for now."); + } if( buf[1] != 32 ) + { + TRACE("mHeader unlocked..."); throw MyriadException( MyriadException::invalidWordSize, "We can only handle 32-bit words at the moment."); + } sStore.read( &iBlockSize, 4 ); int iStreams; sStore.read( &iStreams, 4 ); @@ -188,11 +205,13 @@ void Bu::Myriad::initialize() } } // sio << "Myriad: Blocks used: " << bsBlockUsed.toString() << sio.nl; + TRACE("mHeader unlocked..."); } void Bu::Myriad::initialize( int iBlockSize, int iPreAllocate ) { MutexLocker mLock( mHeader ); + TRACE("mHeader locked."); lFreeBlocks.clear(); for( StreamArray::iterator i = aStreams.begin(); i; i++ ) @@ -286,16 +305,24 @@ void Bu::Myriad::initialize( int iBlockSize, int iPreAllocate ) bHeaderChanged = true; //hStreams.insert( 0, BlockArray( 0 ) ); + TRACE("mHeader unlocked..."); } void Bu::Myriad::updateHeader() { MutexLocker mLock( mHeader ); + TRACE("mHeader locked."); if( bHeaderChanged == false ) + { + TRACE("mHeader unlocked..."); return; + } if( !sStore.canWrite() ) + { + TRACE("mHeader unlocked..."); return; + } char cBuf; int iBuf; @@ -363,11 +390,13 @@ void Bu::Myriad::updateHeader() } bHeaderChanged = false; + TRACE("mHeader unlocked..."); } int Bu::Myriad::createStream( int iPreAllocate ) { MutexLocker mLock( mHeader ); + TRACE("mHeader locked."); Stream *pStr = new Stream(); pStr->iId = aStreams.last()->iId+1; @@ -387,16 +416,19 @@ int Bu::Myriad::createStream( int iPreAllocate ) bHeaderChanged = true; + TRACE("mHeader unlocked..."); return pStr->iId; } int Bu::Myriad::createStreamWithId( int iId, int iPreAllocate ) { MutexLocker mLock( mHeader ); + TRACE("mHeader locked."); try { findStream( iId ); + TRACE("mHeader unlocked..."); throw MyriadException( MyriadException::streamExists, "There is already a stream with the given id."); } @@ -434,8 +466,10 @@ int Bu::Myriad::createStreamWithId( int iId, int iPreAllocate ) bHeaderChanged = true; + TRACE("mHeader unlocked..."); return pStr->iId; } + TRACE("mHeader unlocked..."); } int Bu::Myriad::findEmptyBlock() @@ -456,13 +490,20 @@ int Bu::Myriad::findEmptyBlock() void Bu::Myriad::deleteStream( int iId ) { MutexLocker mLock( mHeader ); + TRACE("mHeader locked."); if( iId < 0 ) + { + TRACE("mHeader unlocked..."); throw MyriadException( MyriadException::invalidStreamId, "Invalid stream id."); + } if( iId == 0 ) + { + TRACE("mHeader unlocked..."); throw MyriadException( MyriadException::protectedStream, "You cannot delete stream zero, it is protected."); + } for( StreamArray::iterator i = aStreams.begin(); i; i++ ) { if( (*i)->iId == iId ) @@ -477,14 +518,17 @@ void Bu::Myriad::deleteStream( int iId ) aStreams.erase( i ); bHeaderChanged = true; delete pStream; + TRACE("mHeader unlocked..."); return; } } + TRACE("mHeader unlocked..."); } Bu::Array Bu::Myriad::getStreamIds() { MutexLocker mLock( mHeader ); + TRACE("mHeader locked."); Bu::Array aRet( aStreams.getSize() ); for( StreamArray::iterator i = aStreams.begin(); i; i++ ) @@ -492,26 +536,32 @@ Bu::Array Bu::Myriad::getStreamIds() aRet.append( (*i)->iId ); } + TRACE("mHeader unlocked..."); return aRet; } int Bu::Myriad::getStreamSize( int iId ) { MutexLocker mLock( mHeader ); + TRACE("mHeader locked."); + TRACE("mHeader unlocked..."); return findStream( iId )->iSize; } bool Bu::Myriad::hasStream( int iId ) { MutexLocker mLock( mHeader ); + TRACE("mHeader locked."); try { findStream( iId ); + TRACE("mHeader unlocked..."); return true; }catch(...) { + TRACE("mHeader unlocked..."); return false; } } @@ -519,7 +569,9 @@ bool Bu::Myriad::hasStream( int iId ) Bu::MyriadStream Bu::Myriad::openStream( int iId ) { MutexLocker mLock( mHeader ); + TRACE("mHeader locked."); + TRACE("mHeader unlocked..."); //sio << "Myriad: Request to open stream: " << iId << sio.nl; return MyriadStream( *this, findStream( iId ) ); } @@ -527,7 +579,9 @@ Bu::MyriadStream Bu::Myriad::openStream( int iId ) int Bu::Myriad::getNumStreams() { MutexLocker mLock( mHeader ); + TRACE("mHeader locked."); + TRACE("mHeader unlocked..."); return aStreams.getSize(); } @@ -549,36 +603,42 @@ int Bu::Myriad::getNumUsedBlocks() Bu::size Bu::Myriad::getTotalUsedBytes() { MutexLocker mLock( mHeader ); + TRACE("mHeader locked."); Bu::size iTotalSize = 0; for( StreamArray::iterator i = aStreams.begin(); i; i++ ) { iTotalSize += (*i)->iSize; } + TRACE("mHeader unlocked..."); return iTotalSize; } Bu::size Bu::Myriad::getTotalUnusedBytes() { MutexLocker mLock( mHeader ); + TRACE("mHeader locked."); Bu::size iTotalSize = (iBlocks-iUsed)*iBlockSize; for( StreamArray::iterator i = aStreams.begin(); i; i++ ) { iTotalSize += iBlockSize - ((Bu::size)(*i)->iSize%iBlockSize); } + TRACE("mHeader unlocked..."); return iTotalSize; } Bu::size Bu::Myriad::getTotalUnusedBytes( int iFakeBlockSize ) { MutexLocker mLock( mHeader ); + TRACE("mHeader locked."); Bu::size iTotalSize = (iBlocks-iUsed)*iFakeBlockSize; for( StreamArray::iterator i = aStreams.begin(); i; i++ ) { iTotalSize += iFakeBlockSize - ((*i)->iSize%iFakeBlockSize); } + TRACE("mHeader unlocked..."); return iTotalSize; } @@ -608,7 +668,9 @@ Bu::Myriad::Block *Bu::Myriad::getBlock( int iBlock ) pBlock->iBlockIndex = iBlock; mActiveBlocks.lock(); + TRACE("mHeader locked."); hActiveBlocks.insert( iBlock, pBlock ); + TRACE("mHeader unlocked..."); mActiveBlocks.unlock(); return pBlock; @@ -621,7 +683,9 @@ void Bu::Myriad::releaseBlock( Bu::Myriad::Block *pBlock ) // sio << "Myriad: Releasing block " << pBlock->iBlockIndex << sio.nl; syncBlock( pBlock ); mActiveBlocks.lock(); + TRACE("mHeader locked."); hActiveBlocks.erase( pBlock->iBlockIndex ); + TRACE("mHeader unlocked..."); mActiveBlocks.unlock(); delete[] pBlock->pData; @@ -642,26 +706,31 @@ void Bu::Myriad::syncBlock( Block *pBlock ) int Bu::Myriad::streamAddBlock( Stream *pStream ) { MutexLocker mLock( mHeader ); + TRACE("mHeader locked."); int iBlock = findEmptyBlock(); pStream->aBlocks.append( iBlock ); // bsBlockUsed.setBit( iBlock ); // bHeaderChanged = true; iUsed++; + TRACE("mHeader unlocked..."); return iBlock; } void Bu::Myriad::setStreamSize( Stream *pStream, long iSize ) { MutexLocker mLock( mHeader ); + TRACE("mHeader locked."); if( pStream->iSize == iSize ) { + TRACE("mHeader unlocked..."); return; } else if( pStream->iSize > iSize ) { // Shrink + TRACE(Bu::String("Shrink stream %1 from %2 to %3").arg(pStream->iId).arg(pStream->iSize).arg(iSize).end() ); for( int iNewSize = pStream->aBlocks.getSize()*iBlockSize; iNewSize-iBlockSize > iSize; iNewSize -= iBlockSize ) { @@ -679,6 +748,7 @@ void Bu::Myriad::setStreamSize( Stream *pStream, long iSize ) else { // Grow + TRACE(Bu::String("Grow stream %1 from %2 to %3").arg(pStream->iId).arg(pStream->iSize).arg(iSize).end() ); for( int iNewSize = pStream->aBlocks.getSize()*iBlockSize; iNewSize < iSize; iNewSize += iBlockSize ) { @@ -692,6 +762,7 @@ void Bu::Myriad::setStreamSize( Stream *pStream, long iSize ) pStream->iSize = iSize; bHeaderChanged = true; } + TRACE("mHeader unlocked..."); } void Bu::Myriad::headerChanged() diff --git a/src/stable/myriadstream.cpp b/src/stable/myriadstream.cpp index b2ed75e..50c6924 100644 --- a/src/stable/myriadstream.cpp +++ b/src/stable/myriadstream.cpp @@ -17,6 +17,9 @@ using Bu::sio; using Bu::Fmt; #endif +#include "bu/sio.h" + +#define TRACE( x ) Bu::println("%1:%2: %3: %4 - %5").arg(__FILE__).arg( __LINE__ ).arg(__PRETTY_FUNCTION__).arg(rMyriad.sStore.getLocation()).arg(x) Bu::MyriadStream::MyriadStream( Bu::Myriad &rMyriad, Bu::Myriad::Stream *pStream ) : @@ -205,7 +208,9 @@ Bu::size Bu::MyriadStream::write( const void *pBuf, Bu::size nBytes ) iAmnt ); iPos += iAmnt; + TRACE(Bu::String("Stream=%1 - pStream->iSize(%2) += iAmnt(%3)").arg(pStream->iId).arg( pStream->iSize ).arg(iAmnt).end()); pStream->iSize += iAmnt; + TRACE(Bu::String("Stream=%1 - pStream->iSize = %2").arg(pStream->iId).arg( pStream->iSize ).end()); rMyriad.headerChanged(); pBuf = &((char *)pBuf)[iAmnt]; iLeft -= iAmnt; -- cgit v1.2.3