From caee572ff94822ca2ed354fcb79ca04ed9adf388 Mon Sep 17 00:00:00 2001 From: Mike Buland Date: Mon, 5 Aug 2024 12:44:59 -0700 Subject: Perhaps fixed a Myriad race condition. If so, this will fix the issue where streams randomly truncate when accessed by multiple threads. --- src/stable/myriad.cpp | 67 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 46 insertions(+), 21 deletions(-) (limited to 'src/stable/myriad.cpp') diff --git a/src/stable/myriad.cpp b/src/stable/myriad.cpp index 86f651e..c606369 100644 --- a/src/stable/myriad.cpp +++ b/src/stable/myriad.cpp @@ -18,7 +18,8 @@ using Bu::Fmt; #define Myriad_MAGIC_CODE ((unsigned char *)"\x0a\xd3\xfa\x84") -#define TRACE( x ) Bu::println("%1:%2: %3: %4 - %5").arg(__FILE__).arg( __LINE__ ).arg(__PRETTY_FUNCTION__).arg(sStore.getLocation()).arg(x) +// #define TRACE( x ) Bu::println("%1:%2: %3: %4 - %5").arg(__FILE__).arg( __LINE__ ).arg(__PRETTY_FUNCTION__).arg(sStore.getLocation()).arg(x) +#define TRACE( x ) (void)0 namespace Bu { @@ -145,7 +146,7 @@ void Bu::Myriad::initialize() Stream *pFakeHdr = new Stream; pFakeHdr->iId = 0; - pFakeHdr->iSize = iHeaderSize; + pFakeHdr->setSize( iHeaderSize ); for( int j = 0; j < iHeaderBlocks; j++ ) { pFakeHdr->aBlocks.append( j ); @@ -161,11 +162,13 @@ void Bu::Myriad::initialize() pIn->setPos( sStore.tell() ); for( int j = 0; j < iStreams; j++ ) { + int iSizeTmp; aStreams.append( new Stream() ); Stream &s = *aStreams[j]; pIn->read( &s.iId, 4 ); - pIn->read( &s.iSize, 4 ); - int iSBlocks = blkDiv(s.iSize, iBlockSize); + pIn->read( &iSizeTmp, 4 ); + s.setSize( iSizeTmp ); + int iSBlocks = blkDiv(s.getSize(), iBlockSize); // sio << "Myriad: - Stream::iId=" << s.iId // << ", Stream::iSize=" << s.iSize // << ", Stream::aBlocks=" << iSBlocks @@ -282,10 +285,10 @@ void Bu::Myriad::initialize( int iBlockSize, int iPreAllocate ) this->iBlockSize = iBlockSize; this->iBlocks = iPreAllocate; - pStr->iSize = sStore.tell(); + pStr->setSize( sStore.tell() ); // sio << "Myriad: Actual end of header stream = " << pStr->iSize << sio.nl; - pStr->iSize = iHeaderSize; + pStr->setSize( iHeaderSize ); for( int j = 0; j < iHeaderBlocks; j++ ) { // sio << "Started block " << j << " is header." << sio.nl; @@ -308,6 +311,26 @@ void Bu::Myriad::initialize( int iBlockSize, int iPreAllocate ) TRACE("mHeader unlocked..."); } +void Bu::Myriad::Stream::setSize( int iNewSize ) +{ + MutexLocker l( mStream ); + iSize = iNewSize; +} + +void Bu::Myriad::Stream::growTo( int iNewSize ) +{ + MutexLocker l( mStream ); + if( iNewSize < iSize ) + return; + iSize = iNewSize; +} + +int Bu::Myriad::Stream::getSize() const +{ + MutexLocker l( mStream ); + return iSize; +} + void Bu::Myriad::updateHeader() { MutexLocker mLock( mHeader ); @@ -354,7 +377,7 @@ void Bu::Myriad::updateHeader() iHeaderSize += 4; iNewBlocks = blkDiv( iHeaderSize, iBlockSize ); } - aStreams[0]->iSize = iHeaderSize; + aStreams[0]->setSize( iHeaderSize ); // sio << "Myriad: updateHeader: iHeaderSize=" << iHeaderSize // << ", iNewBlocks=" << iNewBlocks << ", curBlocks=" // << aStreams[0]->aBlocks.getSize() << sio.nl; @@ -379,9 +402,11 @@ void Bu::Myriad::updateHeader() for( StreamArray::iterator i = aStreams.begin(); i; i++ ) { + int iSizeTmp; sHdr.write( &(*i)->iId, 4 ); - sHdr.write( &(*i)->iSize, 4 ); - int iUsedBlocks = blkDiv( (*i)->iSize, iBlockSize ); + sHdr.write( &iSizeTmp, 4 ); + (*i)->setSize( iSizeTmp ); + int iUsedBlocks = blkDiv( iSizeTmp, iBlockSize ); // for( BlockArray::iterator j = (*i)->aBlocks.begin(); j; j++ ) for( int j = 0; j < iUsedBlocks; j++ ) { @@ -402,7 +427,7 @@ int Bu::Myriad::createStream( int iPreAllocate ) pStr->iId = aStreams.last()->iId+1; //sio << "Myriad: New stream id=" << pStr->iId << ", iPreAllocate=" // << iPreAllocate << sio.nl; - pStr->iSize = 0; + pStr->setSize( 0 ); aStreams.append( pStr ); for( int j = 0; j < iPreAllocate; j++ ) @@ -438,7 +463,7 @@ int Bu::Myriad::createStreamWithId( int iId, int iPreAllocate ) pStr->iId = iId; //sio << "Myriad: New stream id=" << pStr->iId << ", iPreAllocate=" // << iPreAllocate << sio.nl; - pStr->iSize = 0; + pStr->setSize( 0 ); if( aStreams.last()->iId < iId ) { aStreams.append( pStr ); @@ -546,7 +571,7 @@ int Bu::Myriad::getStreamSize( int iId ) TRACE("mHeader locked."); TRACE("mHeader unlocked..."); - return findStream( iId )->iSize; + return findStream( iId )->getSize(); } bool Bu::Myriad::hasStream( int iId ) @@ -608,7 +633,7 @@ Bu::size Bu::Myriad::getTotalUsedBytes() Bu::size iTotalSize = 0; for( StreamArray::iterator i = aStreams.begin(); i; i++ ) { - iTotalSize += (*i)->iSize; + iTotalSize += (*i)->getSize(); } TRACE("mHeader unlocked..."); return iTotalSize; @@ -622,7 +647,7 @@ Bu::size Bu::Myriad::getTotalUnusedBytes() Bu::size iTotalSize = (iBlocks-iUsed)*iBlockSize; for( StreamArray::iterator i = aStreams.begin(); i; i++ ) { - iTotalSize += iBlockSize - ((Bu::size)(*i)->iSize%iBlockSize); + iTotalSize += iBlockSize - ((Bu::size)(*i)->getSize()%iBlockSize); } TRACE("mHeader unlocked..."); return iTotalSize; @@ -636,7 +661,7 @@ Bu::size Bu::Myriad::getTotalUnusedBytes( int iFakeBlockSize ) Bu::size iTotalSize = (iBlocks-iUsed)*iFakeBlockSize; for( StreamArray::iterator i = aStreams.begin(); i; i++ ) { - iTotalSize += iFakeBlockSize - ((*i)->iSize%iFakeBlockSize); + iTotalSize += iFakeBlockSize - ((*i)->getSize()%iFakeBlockSize); } TRACE("mHeader unlocked..."); return iTotalSize; @@ -722,15 +747,15 @@ void Bu::Myriad::setStreamSize( Stream *pStream, long iSize ) MutexLocker mLock( mHeader ); TRACE("mHeader locked."); - if( pStream->iSize == iSize ) + if( pStream->getSize() == iSize ) { TRACE("mHeader unlocked..."); return; } - else if( pStream->iSize > iSize ) + else if( pStream->getSize() > iSize ) { // Shrink - TRACE(Bu::String("Shrink stream %1 from %2 to %3").arg(pStream->iId).arg(pStream->iSize).arg(iSize).end() ); + TRACE(Bu::String("Shrink stream %1 from %2 to %3").arg(pStream->iId).arg(pStream->getSize()).arg(iSize).end() ); for( int iNewSize = pStream->aBlocks.getSize()*iBlockSize; iNewSize-iBlockSize > iSize; iNewSize -= iBlockSize ) { @@ -742,13 +767,13 @@ void Bu::Myriad::setStreamSize( Stream *pStream, long iSize ) // bsBlockUsed.setBit( pStream->aBlocks.last(), false ); pStream->aBlocks.eraseLast(); } - pStream->iSize = iSize; + pStream->setSize( iSize ); bHeaderChanged = true; } else { // Grow - TRACE(Bu::String("Grow stream %1 from %2 to %3").arg(pStream->iId).arg(pStream->iSize).arg(iSize).end() ); + TRACE(Bu::String("Grow stream %1 from %2 to %3").arg(pStream->iId).arg(pStream->getSize()).arg(iSize).end() ); for( int iNewSize = pStream->aBlocks.getSize()*iBlockSize; iNewSize < iSize; iNewSize += iBlockSize ) { @@ -759,7 +784,7 @@ void Bu::Myriad::setStreamSize( Stream *pStream, long iSize ) // bHeaderChanged = true; iUsed++; } - pStream->iSize = iSize; + pStream->setSize( iSize ); bHeaderChanged = true; } TRACE("mHeader unlocked..."); -- cgit v1.2.3