From caee572ff94822ca2ed354fcb79ca04ed9adf388 Mon Sep 17 00:00:00 2001 From: Mike Buland Date: Mon, 5 Aug 2024 12:44:59 -0700 Subject: Perhaps fixed a Myriad race condition. If so, this will fix the issue where streams randomly truncate when accessed by multiple threads. --- src/stable/myriad.cpp | 67 +++++++++++++++++++++++++++++++-------------- src/stable/myriad.h | 9 +++++- src/stable/myriadstream.cpp | 25 +++++++++-------- 3 files changed, 67 insertions(+), 34 deletions(-) (limited to 'src/stable') diff --git a/src/stable/myriad.cpp b/src/stable/myriad.cpp index 86f651e..c606369 100644 --- a/src/stable/myriad.cpp +++ b/src/stable/myriad.cpp @@ -18,7 +18,8 @@ using Bu::Fmt; #define Myriad_MAGIC_CODE ((unsigned char *)"\x0a\xd3\xfa\x84") -#define TRACE( x ) Bu::println("%1:%2: %3: %4 - %5").arg(__FILE__).arg( __LINE__ ).arg(__PRETTY_FUNCTION__).arg(sStore.getLocation()).arg(x) +// #define TRACE( x ) Bu::println("%1:%2: %3: %4 - %5").arg(__FILE__).arg( __LINE__ ).arg(__PRETTY_FUNCTION__).arg(sStore.getLocation()).arg(x) +#define TRACE( x ) (void)0 namespace Bu { @@ -145,7 +146,7 @@ void Bu::Myriad::initialize() Stream *pFakeHdr = new Stream; pFakeHdr->iId = 0; - pFakeHdr->iSize = iHeaderSize; + pFakeHdr->setSize( iHeaderSize ); for( int j = 0; j < iHeaderBlocks; j++ ) { pFakeHdr->aBlocks.append( j ); @@ -161,11 +162,13 @@ void Bu::Myriad::initialize() pIn->setPos( sStore.tell() ); for( int j = 0; j < iStreams; j++ ) { + int iSizeTmp; aStreams.append( new Stream() ); Stream &s = *aStreams[j]; pIn->read( &s.iId, 4 ); - pIn->read( &s.iSize, 4 ); - int iSBlocks = blkDiv(s.iSize, iBlockSize); + pIn->read( &iSizeTmp, 4 ); + s.setSize( iSizeTmp ); + int iSBlocks = blkDiv(s.getSize(), iBlockSize); // sio << "Myriad: - Stream::iId=" << s.iId // << ", Stream::iSize=" << s.iSize // << ", Stream::aBlocks=" << iSBlocks @@ -282,10 +285,10 @@ void Bu::Myriad::initialize( int iBlockSize, int iPreAllocate ) this->iBlockSize = iBlockSize; this->iBlocks = iPreAllocate; - pStr->iSize = sStore.tell(); + pStr->setSize( sStore.tell() ); // sio << "Myriad: Actual end of header stream = " << pStr->iSize << sio.nl; - pStr->iSize = iHeaderSize; + pStr->setSize( iHeaderSize ); for( int j = 0; j < iHeaderBlocks; j++ ) { // sio << "Started block " << j << " is header." << sio.nl; @@ -308,6 +311,26 @@ void Bu::Myriad::initialize( int iBlockSize, int iPreAllocate ) TRACE("mHeader unlocked..."); } +void Bu::Myriad::Stream::setSize( int iNewSize ) +{ + MutexLocker l( mStream ); + iSize = iNewSize; +} + +void Bu::Myriad::Stream::growTo( int iNewSize ) +{ + MutexLocker l( mStream ); + if( iNewSize < iSize ) + return; + iSize = iNewSize; +} + +int Bu::Myriad::Stream::getSize() const +{ + MutexLocker l( mStream ); + return iSize; +} + void Bu::Myriad::updateHeader() { MutexLocker mLock( mHeader ); @@ -354,7 +377,7 @@ void Bu::Myriad::updateHeader() iHeaderSize += 4; iNewBlocks = blkDiv( iHeaderSize, iBlockSize ); } - aStreams[0]->iSize = iHeaderSize; + aStreams[0]->setSize( iHeaderSize ); // sio << "Myriad: updateHeader: iHeaderSize=" << iHeaderSize // << ", iNewBlocks=" << iNewBlocks << ", curBlocks=" // << aStreams[0]->aBlocks.getSize() << sio.nl; @@ -379,9 +402,11 @@ void Bu::Myriad::updateHeader() for( StreamArray::iterator i = aStreams.begin(); i; i++ ) { + int iSizeTmp; sHdr.write( &(*i)->iId, 4 ); - sHdr.write( &(*i)->iSize, 4 ); - int iUsedBlocks = blkDiv( (*i)->iSize, iBlockSize ); + sHdr.write( &iSizeTmp, 4 ); + (*i)->setSize( iSizeTmp ); + int iUsedBlocks = blkDiv( iSizeTmp, iBlockSize ); // for( BlockArray::iterator j = (*i)->aBlocks.begin(); j; j++ ) for( int j = 0; j < iUsedBlocks; j++ ) { @@ -402,7 +427,7 @@ int Bu::Myriad::createStream( int iPreAllocate ) pStr->iId = aStreams.last()->iId+1; //sio << "Myriad: New stream id=" << pStr->iId << ", iPreAllocate=" // << iPreAllocate << sio.nl; - pStr->iSize = 0; + pStr->setSize( 0 ); aStreams.append( pStr ); for( int j = 0; j < iPreAllocate; j++ ) @@ -438,7 +463,7 @@ int Bu::Myriad::createStreamWithId( int iId, int iPreAllocate ) pStr->iId = iId; //sio << "Myriad: New stream id=" << pStr->iId << ", iPreAllocate=" // << iPreAllocate << sio.nl; - pStr->iSize = 0; + pStr->setSize( 0 ); if( aStreams.last()->iId < iId ) { aStreams.append( pStr ); @@ -546,7 +571,7 @@ int Bu::Myriad::getStreamSize( int iId ) TRACE("mHeader locked."); TRACE("mHeader unlocked..."); - return findStream( iId )->iSize; + return findStream( iId )->getSize(); } bool Bu::Myriad::hasStream( int iId ) @@ -608,7 +633,7 @@ Bu::size Bu::Myriad::getTotalUsedBytes() Bu::size iTotalSize = 0; for( StreamArray::iterator i = aStreams.begin(); i; i++ ) { - iTotalSize += (*i)->iSize; + iTotalSize += (*i)->getSize(); } TRACE("mHeader unlocked..."); return iTotalSize; @@ -622,7 +647,7 @@ Bu::size Bu::Myriad::getTotalUnusedBytes() Bu::size iTotalSize = (iBlocks-iUsed)*iBlockSize; for( StreamArray::iterator i = aStreams.begin(); i; i++ ) { - iTotalSize += iBlockSize - ((Bu::size)(*i)->iSize%iBlockSize); + iTotalSize += iBlockSize - ((Bu::size)(*i)->getSize()%iBlockSize); } TRACE("mHeader unlocked..."); return iTotalSize; @@ -636,7 +661,7 @@ Bu::size Bu::Myriad::getTotalUnusedBytes( int iFakeBlockSize ) Bu::size iTotalSize = (iBlocks-iUsed)*iFakeBlockSize; for( StreamArray::iterator i = aStreams.begin(); i; i++ ) { - iTotalSize += iFakeBlockSize - ((*i)->iSize%iFakeBlockSize); + iTotalSize += iFakeBlockSize - ((*i)->getSize()%iFakeBlockSize); } TRACE("mHeader unlocked..."); return iTotalSize; @@ -722,15 +747,15 @@ void Bu::Myriad::setStreamSize( Stream *pStream, long iSize ) MutexLocker mLock( mHeader ); TRACE("mHeader locked."); - if( pStream->iSize == iSize ) + if( pStream->getSize() == iSize ) { TRACE("mHeader unlocked..."); return; } - else if( pStream->iSize > iSize ) + else if( pStream->getSize() > iSize ) { // Shrink - TRACE(Bu::String("Shrink stream %1 from %2 to %3").arg(pStream->iId).arg(pStream->iSize).arg(iSize).end() ); + TRACE(Bu::String("Shrink stream %1 from %2 to %3").arg(pStream->iId).arg(pStream->getSize()).arg(iSize).end() ); for( int iNewSize = pStream->aBlocks.getSize()*iBlockSize; iNewSize-iBlockSize > iSize; iNewSize -= iBlockSize ) { @@ -742,13 +767,13 @@ void Bu::Myriad::setStreamSize( Stream *pStream, long iSize ) // bsBlockUsed.setBit( pStream->aBlocks.last(), false ); pStream->aBlocks.eraseLast(); } - pStream->iSize = iSize; + pStream->setSize( iSize ); bHeaderChanged = true; } else { // Grow - TRACE(Bu::String("Grow stream %1 from %2 to %3").arg(pStream->iId).arg(pStream->iSize).arg(iSize).end() ); + TRACE(Bu::String("Grow stream %1 from %2 to %3").arg(pStream->iId).arg(pStream->getSize()).arg(iSize).end() ); for( int iNewSize = pStream->aBlocks.getSize()*iBlockSize; iNewSize < iSize; iNewSize += iBlockSize ) { @@ -759,7 +784,7 @@ void Bu::Myriad::setStreamSize( Stream *pStream, long iSize ) // bHeaderChanged = true; iUsed++; } - pStream->iSize = iSize; + pStream->setSize( iSize ); bHeaderChanged = true; } TRACE("mHeader unlocked..."); diff --git a/src/stable/myriad.h b/src/stable/myriad.h index 14467a4..e63c29f 100644 --- a/src/stable/myriad.h +++ b/src/stable/myriad.h @@ -184,9 +184,16 @@ namespace Bu class Stream { public: + void setSize( int iNewSize ); + void growTo( int iNewSize ); + int getSize() const; + int iId; - int iSize; BlockArray aBlocks; + + private: + int iSize; + mutable Bu::Mutex mStream; }; typedef Bu::Array StreamArray; diff --git a/src/stable/myriadstream.cpp b/src/stable/myriadstream.cpp index 50c6924..3c78bb0 100644 --- a/src/stable/myriadstream.cpp +++ b/src/stable/myriadstream.cpp @@ -19,7 +19,8 @@ using Bu::Fmt; #endif #include "bu/sio.h" -#define TRACE( x ) Bu::println("%1:%2: %3: %4 - %5").arg(__FILE__).arg( __LINE__ ).arg(__PRETTY_FUNCTION__).arg(rMyriad.sStore.getLocation()).arg(x) +// #define TRACE( x ) Bu::println("%1:%2: %3: %4 - %5").arg(__FILE__).arg( __LINE__ ).arg(__PRETTY_FUNCTION__).arg(rMyriad.sStore.getLocation()).arg(x) +#define TRACE( x ) (void)0 Bu::MyriadStream::MyriadStream( Bu::Myriad &rMyriad, Bu::Myriad::Stream *pStream ) : @@ -55,8 +56,8 @@ Bu::size Bu::MyriadStream::read( void *pBuf, Bu::size nBytes ) sio << "MyriadStream: read: " << __LINE__ << ": Started, asked to read " << nBytes << "b." << sio.nl; #endif - if( nBytes > (Bu::size)pStream->iSize-iPos ) - nBytes = pStream->iSize-iPos; + if( nBytes > (Bu::size)pStream->getSize()-iPos ) + nBytes = pStream->getSize()-iPos; if( nBytes <= 0 ) return 0; int iLeft = nBytes; @@ -92,7 +93,7 @@ Bu::size Bu::MyriadStream::read( void *pBuf, Bu::size nBytes ) rMyriad.iBlockSize - iPos%rMyriad.iBlockSize, iLeft ), - pStream->iSize-iPos + pStream->getSize()-iPos ); #ifdef MYRIAD_STREAM_DEBUG sio << "MyriadStream: read: " << __LINE__ << ": Copying out bytes: " @@ -166,14 +167,14 @@ Bu::size Bu::MyriadStream::write( const void *pBuf, Bu::size nBytes ) // There are two main writing modes when it comes down to it. // Overwrite mode and append mode. Append is what pretty much always // happens when creating a new stream. - if( iPos < pStream->iSize ) + if( iPos < pStream->getSize() ) { int iAmnt = Bu::buMin( Bu::buMin( rMyriad.iBlockSize - iPos%rMyriad.iBlockSize, iLeft ), - pStream->iSize-iPos + pStream->getSize()-iPos ); #ifdef MYRIAD_STREAM_DEBUG sio << "MyriadStream: write (ovr): " << __LINE__ << ": Copying in bytes: " @@ -208,9 +209,9 @@ Bu::size Bu::MyriadStream::write( const void *pBuf, Bu::size nBytes ) iAmnt ); iPos += iAmnt; - TRACE(Bu::String("Stream=%1 - pStream->iSize(%2) += iAmnt(%3)").arg(pStream->iId).arg( pStream->iSize ).arg(iAmnt).end()); - pStream->iSize += iAmnt; - TRACE(Bu::String("Stream=%1 - pStream->iSize = %2").arg(pStream->iId).arg( pStream->iSize ).end()); + TRACE(Bu::String("Stream=%1 - pStream->iSize(%2) += iAmnt(%3)").arg(pStream->iId).arg( pStream->getSize() ).arg(iAmnt).end()); + pStream->growTo( pStream->getSize()+iAmnt ); + TRACE(Bu::String("Stream=%1 - pStream->iSize = %2").arg(pStream->iId).arg( pStream->getSize() ).end()); rMyriad.headerChanged(); pBuf = &((char *)pBuf)[iAmnt]; iLeft -= iAmnt; @@ -237,12 +238,12 @@ void Bu::MyriadStream::setPos( Bu::size pos ) void Bu::MyriadStream::setPosEnd( Bu::size pos ) { - iPos = pStream->iSize-pos; + iPos = pStream->getSize()-pos; } bool Bu::MyriadStream::isEos() { - return iPos >= pStream->iSize; + return iPos >= pStream->getSize(); } bool Bu::MyriadStream::isOpen() @@ -299,7 +300,7 @@ void Bu::MyriadStream::setSize( Bu::size iSize ) Bu::size Bu::MyriadStream::getSize() const { - return pStream->iSize; + return pStream->getSize(); } Bu::size Bu::MyriadStream::getBlockSize() const -- cgit v1.2.3