From 49d9fa3c2435b8e97ffdc42e40a880a3dad82f8a Mon Sep 17 00:00:00 2001 From: Mike Buland Date: Sat, 14 Apr 2012 17:38:13 +0000 Subject: The core of myriad is now thread-safe. It could use a little work on multiple streams accessing the same blocks at the same time. Right now it's safe, but a little strange, since both streams work from seperate buffers. Maybe that's ok, but it still needs some more investigation. I want to remove the BitString stuff entirely, it turns out it's the slowest part, which, upon actually looking at the code is completely obvious. This change shouldn't change the API at all, just make adding blocks to streams much, much faster. --- src/stable/myriad.cpp | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++- src/stable/myriad.h | 11 ++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) (limited to 'src/stable') diff --git a/src/stable/myriad.cpp b/src/stable/myriad.cpp index de44930..9030b83 100644 --- a/src/stable/myriad.cpp +++ b/src/stable/myriad.cpp @@ -8,6 +8,7 @@ #include "bu/myriad.h" #include "bu/stream.h" #include "bu/myriadstream.h" +#include "bu/mutexlocker.h" #include #include "bu/sio.h" @@ -50,11 +51,13 @@ Bu::Myriad::Myriad( Bu::Stream &sStore, int iBlockSize, int iPreallocate ) : Bu::Myriad::~Myriad() { + mActiveBlocks.lock(); if( !hActiveBlocks.isEmpty() ) { sio << "Bu::Myriad::~Myriad(): Error: There are " << hActiveBlocks.getSize() << " unsynced blocks!" << sio.nl; } + mActiveBlocks.unlock(); sync(); for( StreamArray::iterator i = aStreams.begin(); i; i++ ) @@ -67,6 +70,7 @@ void Bu::Myriad::sync() { updateHeader(); + mActiveBlocks.lock(); for( BlockHash::iterator i = hActiveBlocks.begin(); i; i++ ) { if( (*i)->bChanged ) @@ -74,10 +78,12 @@ void Bu::Myriad::sync() syncBlock( *i ); } } + mActiveBlocks.unlock(); } void Bu::Myriad::initialize() { + MutexLocker mLock( mHeader ); sStore.setPosEnd( 0 ); int iSize = sStore.tell(); sStore.setPos( 0 ); @@ -173,6 +179,8 @@ void Bu::Myriad::initialize() void Bu::Myriad::initialize( int iBlockSize, int iPreAllocate ) { + MutexLocker mLock( mHeader ); + for( StreamArray::iterator i = aStreams.begin(); i; i++ ) { delete *i; @@ -262,6 +270,8 @@ void Bu::Myriad::initialize( int iBlockSize, int iPreAllocate ) void Bu::Myriad::updateHeader() { + MutexLocker mLock( mHeader ); + if( bHeaderChanged == false ) return; if( !sStore.canWrite() ) @@ -337,6 +347,8 @@ void Bu::Myriad::updateHeader() int Bu::Myriad::createStream( int iPreAllocate ) { + MutexLocker mLock( mHeader ); + Stream *pStr = new Stream(); pStr->iId = aStreams.last()->iId+1; //sio << "Myriad: New stream id=" << pStr->iId << ", iPreAllocate=" @@ -360,6 +372,8 @@ int Bu::Myriad::createStream( int iPreAllocate ) int Bu::Myriad::createStreamWithId( int iId, int iPreAllocate ) { + MutexLocker mLock( mHeader ); + try { findStream( iId ); @@ -432,6 +446,8 @@ int Bu::Myriad::findEmptyBlock() void Bu::Myriad::deleteStream( int iId ) { + MutexLocker mLock( mHeader ); + if( iId < 0 ) throw MyriadException( MyriadException::invalidStreamId, "Invalid stream id."); @@ -458,6 +474,8 @@ void Bu::Myriad::deleteStream( int iId ) Bu::Array Bu::Myriad::getStreamIds() { + MutexLocker mLock( mHeader ); + Bu::Array aRet( aStreams.getSize() ); for( StreamArray::iterator i = aStreams.begin(); i; i++ ) { @@ -469,11 +487,15 @@ Bu::Array Bu::Myriad::getStreamIds() int Bu::Myriad::getStreamSize( int iId ) { + MutexLocker mLock( mHeader ); + return findStream( iId )->iSize; } bool Bu::Myriad::hasStream( int iId ) { + MutexLocker mLock( mHeader ); + try { findStream( iId ); @@ -486,12 +508,16 @@ bool Bu::Myriad::hasStream( int iId ) Bu::MyriadStream Bu::Myriad::openStream( int iId ) { + MutexLocker mLock( mHeader ); + //sio << "Myriad: Request to open stream: " << iId << sio.nl; return MyriadStream( *this, findStream( iId ) ); } int Bu::Myriad::getNumStreams() { + MutexLocker mLock( mHeader ); + return aStreams.getSize(); } @@ -512,6 +538,8 @@ int Bu::Myriad::getNumUsedBlocks() int Bu::Myriad::getTotalUsedBytes() { + MutexLocker mLock( mHeader ); + int iTotalSize = 0; for( StreamArray::iterator i = aStreams.begin(); i; i++ ) { @@ -522,6 +550,8 @@ int Bu::Myriad::getTotalUsedBytes() int Bu::Myriad::getTotalUnusedBytes() { + MutexLocker mLock( mHeader ); + int iTotalSize = (iBlocks-iUsed)*iBlockSize; for( StreamArray::iterator i = aStreams.begin(); i; i++ ) { @@ -532,6 +562,8 @@ int Bu::Myriad::getTotalUnusedBytes() int Bu::Myriad::getTotalUnusedBytes( int iFakeBlockSize ) { + MutexLocker mLock( mHeader ); + int iTotalSize = (iBlocks-iUsed)*iFakeBlockSize; for( StreamArray::iterator i = aStreams.begin(); i; i++ ) { @@ -565,7 +597,9 @@ Bu::Myriad::Block *Bu::Myriad::getBlock( int iBlock ) pBlock->bChanged = false; pBlock->iBlockIndex = iBlock; + mActiveBlocks.lock(); hActiveBlocks.insert( iBlock, pBlock ); + mActiveBlocks.unlock(); return pBlock; } @@ -576,7 +610,10 @@ void Bu::Myriad::releaseBlock( Bu::Myriad::Block *pBlock ) return; // sio << "Myriad: Releasing block " << pBlock->iBlockIndex << sio.nl; syncBlock( pBlock ); + mActiveBlocks.lock(); hActiveBlocks.erase( pBlock->iBlockIndex ); + mActiveBlocks.unlock(); + delete[] pBlock->pData; delete pBlock; } @@ -594,6 +631,8 @@ void Bu::Myriad::syncBlock( Block *pBlock ) int Bu::Myriad::streamAddBlock( Stream *pStream ) { + MutexLocker mLock( mHeader ); + int iBlock = findEmptyBlock(); pStream->aBlocks.append( iBlock ); bsBlockUsed.setBit( iBlock ); @@ -604,6 +643,8 @@ int Bu::Myriad::streamAddBlock( Stream *pStream ) void Bu::Myriad::setStreamSize( Stream *pStream, long iSize ) { + MutexLocker mLock( mHeader ); + if( pStream->iSize == iSize ) { return; @@ -616,6 +657,8 @@ void Bu::Myriad::setStreamSize( Stream *pStream, long iSize ) { if( bsBlockUsed.getBit( pStream->aBlocks.last() ) ) iUsed--; + else + sio << "Unused block used in stream? " << pStream->aBlocks.last() << sio.nl; bsBlockUsed.setBit( pStream->aBlocks.last(), false ); pStream->aBlocks.eraseLast(); } @@ -628,7 +671,12 @@ void Bu::Myriad::setStreamSize( Stream *pStream, long iSize ) for( int iNewSize = pStream->aBlocks.getSize()*iBlockSize; iNewSize < iSize; iNewSize += iBlockSize ) { - streamAddBlock( pStream ); + //streamAddBlock( pStream ); + int iBlock = findEmptyBlock(); + pStream->aBlocks.append( iBlock ); + bsBlockUsed.setBit( iBlock ); + bHeaderChanged = true; + iUsed++; } pStream->iSize = iSize; bHeaderChanged = true; @@ -641,6 +689,13 @@ void Bu::Myriad::headerChanged() } bool Bu::Myriad::isMyriad( Bu::Stream &sStore ) +{ + uint8_t uTmp; + + return isMyriad( sStore, uTmp ); +} + +bool Bu::Myriad::isMyriad( Bu::Stream &sStore, uint8_t &uTmp ) { sStore.setPos( 0 ); @@ -648,6 +703,7 @@ bool Bu::Myriad::isMyriad( Bu::Stream &sStore ) if( sStore.read( buf, 4 ) < 4 ) throw MyriadException( MyriadException::emptyStream, "Input stream appears to be empty."); + sStore.read( &uTmp, 1 ); sStore.setPos( 0 ); if( memcmp( buf, Myriad_MAGIC_CODE, 4 ) ) { diff --git a/src/stable/myriad.h b/src/stable/myriad.h index 3382ab5..14e8c73 100644 --- a/src/stable/myriad.h +++ b/src/stable/myriad.h @@ -13,6 +13,7 @@ #include "bu/exceptionbase.h" #include "bu/array.h" #include "bu/hash.h" +#include "bu/mutex.h" namespace Bu { @@ -148,6 +149,13 @@ namespace Bu */ void sync(); + /** + * Read the first few bytes from the given stream and return true/false + * depending on weather or not it's a Myriad stream. This will throw + * an exception if the stream is empty, or is not random access. + */ + static bool isMyriad( Bu::Stream &sStore, uint8_t &uVer ); + /** * Read the first few bytes from the given stream and return true/false * depending on weather or not it's a Myriad stream. This will throw @@ -216,6 +224,9 @@ namespace Bu typedef Bu::Hash BlockHash; BlockHash hActiveBlocks; bool bHeaderChanged; + + Bu::Mutex mHeader; + Bu::Mutex mActiveBlocks; }; }; -- cgit v1.2.3