From d1f985c8191326292dd2ae5a85a63993ad68ddd7 Mon Sep 17 00:00:00 2001 From: Mike Buland Date: Mon, 16 Sep 2024 10:27:52 -0700 Subject: It could use more testing, but...it works. --- src/stable/myriad.cpp | 222 +++++++++++++++++++++++++++++++++++++++++----- src/stable/myriad.h | 13 ++- src/stable/myriadstream.h | 2 + src/tests/bigmyriad.cpp | 14 +-- 4 files changed, 218 insertions(+), 33 deletions(-) diff --git a/src/stable/myriad.cpp b/src/stable/myriad.cpp index f3ff09a..44a990a 100644 --- a/src/stable/myriad.cpp +++ b/src/stable/myriad.cpp @@ -1,6 +1,7 @@ #include "bu/myriad.h" #include "bu/myriadstream.h" +#include "bu/membuf.h" #include "bu/mutexlocker.h" #include "bu/util.h" @@ -27,27 +28,45 @@ Bu::Myriad::Myriad( Bu::Stream &rBacking, int iBlockSize, rBacking( rBacking ), iBlockSize( iBlockSize ), iBlockCount( 0 ), - bIsNewStream( true ) + bIsNewStream( true ), + bStructureChanged( false ), + iLastUsedIndex( -1 ) { if( !rBacking.isSeekable() ) { throw Bu::MyriadException( Bu::MyriadException::invalidBackingStream, "Myriad backing stream must be random access (seekable)."); } - if( !loadMyriad() ) + if( rBacking.getSize() == 0 ) { createMyriad( iBlockSize, iPreallocateBlocks ); } + else + { + loadMyriad(); + } } Bu::Myriad::~Myriad() { + writeHeader(); } -Bu::MyriadStream Bu::Myriad::create( Bu::Myriad::Mode /*eMode*/, - int32_t /*iPreallocateBytes*/ ) +Bu::MyriadStream Bu::Myriad::create( Bu::Myriad::Mode eMode, + int32_t iPreallocateBytes ) { - return Bu::MyriadStream( *this, NULL, (Mode)0 ); + Bu::MutexLocker l( mAccess ); + + Stream *pStream = new Stream( *this, ++iLastUsedIndex, 0 ); + int iBlocks = std::max(1, blkDiv( iPreallocateBytes, iBlockSize )); + for( int j = 0; j < iBlocks; j++ ) + { + pStream->aBlocks.append( __allocateBlock() ); + } + hStream.insert( pStream->iStream, pStream ); + bStructureChanged = true; + + return Bu::MyriadStream( *this, pStream, (Mode)(eMode&ReadWrite) ); } Bu::MyriadStream Bu::Myriad::open( Bu::Myriad::StreamId iStream, @@ -110,7 +129,7 @@ bool Bu::Myriad::loadMyriad() } MyriadRead( &uBitsPerInt, 1 ); if( uBitsPerInt != 32 ) - { +{ throw Bu::MyriadException( Bu::MyriadException::invalidFormat, "Only 32 bits per int are supported at this time."); } @@ -128,28 +147,53 @@ bool Bu::Myriad::loadMyriad() throw Bu::MyriadException( Bu::MyriadException::invalidFormat, "The first stream defined must be the header/zero stream."); } + iLastUsedIndex = iStream; int32_t iHeaderStreamBytes; MyriadRead( &iHeaderStreamBytes, 4 ); Stream *pHeaderStream = new Stream( *this, iStream, iHeaderStreamBytes ); - int iHeaderStreamBlocks = blkDiv(iHeaderStreamBytes+4, iBlockSize ); - - while( iHeaderStreamBytes+(iHeaderStreamBlocks*4) - > iHeaderStreamBlocks*iBlockSize ) - { - iHeaderStreamBlocks = blkDiv( - (iHeaderStreamBytes+((iHeaderStreamBlocks+1)*4)), iBlockSize - ); - } + hStream.insert( iStream, pHeaderStream ); + int iHeaderStreamBlocks = blkDiv(iHeaderStreamBytes, iBlockSize ); + MyriadStream sHeader( *this, pHeaderStream, Read ); + // We need to read enough so that we can gurantee that we're within a block + // that we have read the index to, plus one index. for( int32_t j = 0; j < iHeaderStreamBlocks; j++ ) { int32_t iBlockIndex; MyriadRead( &iBlockIndex, 4 ); pHeaderStream->aBlocks.append( iBlockIndex ); + if( rBacking.tell()+4 <= (j+1)*iBlockSize ) + break; } // Bootstrap now using the header stream to read the rest of the data. + sHeader.setPos( rBacking.tell() ); + while( pHeaderStream->aBlocks.getSize() < iHeaderStreamBlocks ) + { + int32_t iBlockIndex; + sHeader.read( &iBlockIndex, 4 ); + pHeaderStream->aBlocks.append( iBlockIndex ); + } + + // Ok, now we can read the rest of the header in. + for( int j = 1; j < iStreamCount; j++ ) + { + int32_t iStreamBytes; + sHeader.read( &iStream, 4 ); + sHeader.read( &iStreamBytes, 4 ); + Stream *pStream = new Stream( *this, iStream, iStreamBytes ); + int32_t iBlocks = blkDiv(iStreamBytes, iBlockSize ); + for( int k = 0; k < iBlocks; k++ ) + { + int32_t iBlockIndex; + sHeader.read( &iBlockIndex, 4 ); + pStream->aBlocks.append( iBlockIndex ); + } + hStream.insert( iStream, pStream ); + if( iLastUsedIndex < iStream ) + iLastUsedIndex = iStream; + } return true; } @@ -165,7 +209,8 @@ void Bu::Myriad::createMyriad( int32_t iBlockSize, int32_t iPreallocateBlocks ) { throw Bu::MyriadException( Bu::MyriadException::invalidFormat, "Backing stream contains data, but not a myriad structure."); -} + } + /* struct { char sMagicCode[4]; @@ -253,11 +298,128 @@ void Bu::Myriad::createMyriad( int32_t iBlockSize, int32_t iPreallocateBlocks ) { lFreeBlocks.append( j ); } + iLastUsedIndex = 0; +} + +void Bu::Myriad::writeHeader() +{ + if( !rBacking.isWritable() ) + return; + Bu::println("Writing stream breakdown:"); + Bu::MemBuf mbHeader; + { + Bu::MutexLocker l( mAccess ); + + int32_t iHdrStreamSize = __calcHeaderSize(); + // Maybe just do stream surgery here. + { + Stream *psHeader = hStream.get( 0 ); + Bu::MutexLocker l2( psHeader->mAccess ); + int iNewBlocks = Bu::blkDiv( iHdrStreamSize, iBlockSize ); + if( iHdrStreamSize < psHeader->iSize ) + { + while( psHeader->aBlocks.getSize() > iNewBlocks ) + { + __releaseBlock( psHeader->aBlocks.last(), false ); + psHeader->aBlocks.eraseLast(); + } + } + else if( iHdrStreamSize > psHeader->iSize ) + { + while( psHeader->aBlocks.getSize() < iNewBlocks ) + { + psHeader->aBlocks.append( __allocateBlock() ); + } + } + psHeader->iSize = iHdrStreamSize; + } + + Bu::println("Computed header size: %1 bytes. Ver=%2, Bpi=%3, BlockSize=%4").arg( iHdrStreamSize ).arg( 1 ).arg( 32 ).arg( iBlockSize ); + + uint8_t uVer = 1; + uint8_t uBpi = 32; + int32_t iStreamCount = hStream.getSize(); + + mbHeader.write( Myriad_MAGIC_CODE, 4 ); + mbHeader.write( &uVer, 1 ); + mbHeader.write( &uBpi, 1 ); + mbHeader.write( &iBlockSize, 4 ); + mbHeader.write( &iStreamCount, 4 ); + StreamHash::KeyList lStreamId = hStream.getKeys(); + lStreamId.sort(); + if( lStreamId.first() != 0 ) + { + throw Bu::MyriadException( Bu::MyriadException::invalidFormat, + "There doesn't appear to be a zero (header) stream."); + } + for( StreamHash::KeyList::iterator i = lStreamId.begin(); i; i++ ) + { + uint32_t uStreamId = *i; + Stream *pStream = hStream.get( uStreamId ); + uint32_t uStreamSize = pStream->getSize(); + mbHeader.write( &uStreamId, 4 ); + mbHeader.write( &uStreamSize, 4 ); + Bu::Array aBlocks = pStream->getBlockList(); + + Bu::println(" Stream %1 is %2 bytes %3 blocks (%4 blocks computed)") + .arg( *i ).arg( uStreamSize ) + .arg( aBlocks.getSize() ) + .arg( Bu::blkDiv( (int)uStreamSize, (int)iBlockSize ) ); + + for( Bu::Array::iterator i = aBlocks.begin(); i; i++ ) + { + int32_t iIdx = *i; + mbHeader.write( &iIdx, 4 ); + } + } + + + } + + Bu::MyriadStream sHeader( *this, hStream.get( 0 ), Bu::Myriad::Write ); + sHeader.write( mbHeader.getString() ); +} + +int32_t Bu::Myriad::__calcHeaderSize() +{ + int32_t iHdrSize = 4+1+1+4+4; + + StreamHash::KeyList lStreamId = hStream.getKeys(); + lStreamId.sort(); + if( lStreamId.first() != 0 ) + { + throw Bu::MyriadException( Bu::MyriadException::invalidFormat, + "There doesn't appear to be a zero (header) stream."); + } + for( StreamHash::KeyList::iterator i = lStreamId.begin(); i; i++ ) + { + iHdrSize += 4+4; + int32_t iStreamSize = hStream.get( *i )->getSize(); + if( (*i) != 0 ) + { + iHdrSize += Bu::blkDiv( iStreamSize, iBlockSize )*4; + } + } + + int32_t iNewSize = iHdrSize; + int32_t iOldSize; + + do { + iOldSize = iNewSize; + iNewSize = iHdrSize + Bu::blkDiv(iNewSize, iBlockSize); + } while( iOldSize != iNewSize ); + + return iNewSize; } int32_t Bu::Myriad::allocateBlock() { Bu::MutexLocker l( mAccess ); + return __allocateBlock(); +} + +int32_t Bu::Myriad::__allocateBlock() +{ if( lFreeBlocks.isEmpty() ) { // Increase the size of the backing stream @@ -275,6 +437,11 @@ int32_t Bu::Myriad::allocateBlock() void Bu::Myriad::releaseBlock( int32_t iBlockId, bool bBlank ) { Bu::MutexLocker l( mAccess ); + __releaseBlock( iBlockId, bBlank ); +} + +void Bu::Myriad::__releaseBlock( int32_t iBlockId, bool bBlank ) +{ lFreeBlocks.append( iBlockId ); if( bBlank ) { @@ -314,10 +481,10 @@ int32_t Bu::Myriad::blockRead( int32_t iBlock, int32_t iStart, void *pTarget, int32_t iSize ) { int32_t iUpperSize = iBlockSize - (iStart%iBlockSize); - Bu::println("Max read within block: %1 vs %2 (start=%3, blocksize=%4)") +/* Bu::println("Max read within block: %1 vs %2 (start=%3, blocksize=%4)") .arg( iUpperSize ).arg( iSize ) .arg( iStart ).arg( iBlockSize ); - +*/ int32_t iAmnt = std::min( iSize, iUpperSize ); Bu::MutexLocker l( mBacking ); rBacking.setPos( iBlockSize*iBlock + iStart ); @@ -329,10 +496,10 @@ int32_t Bu::Myriad::blockWrite( int32_t iBlock, int32_t iStart, const void *pTarget, int32_t iSize ) { int32_t iUpperSize = iBlockSize - (iStart%iBlockSize); - Bu::println("Max write within block: %1 vs %2 (start=%3, blocksize=%4)") +/* Bu::println("Max write within block: %1 vs %2 (start=%3, blocksize=%4)") .arg( iUpperSize ).arg( iSize ) .arg( iStart ).arg( iBlockSize ); - +*/ int32_t iAmnt = std::min( iSize, iUpperSize ); Bu::MutexLocker l( mBacking ); rBacking.setPos( iBlock*iBlockSize + iStart ); @@ -385,10 +552,10 @@ void Bu::Myriad::Stream::setSize( int32_t iNewSize ) { // Two possible modes, shrink or grow. Bu::MutexLocker l( mAccess ); + int iNewBlocks = Bu::blkDiv( iNewSize, rParent.iBlockSize ); if( iNewSize < iSize ) { // Shrink it - int iNewBlocks = Bu::blkDiv( iNewSize, rParent.iBlockSize ); while( aBlocks.getSize() > iNewBlocks ) { rParent.releaseBlock( aBlocks.last(), false ); @@ -399,7 +566,6 @@ void Bu::Myriad::Stream::setSize( int32_t iNewSize ) else if( iNewSize > iSize ) { // Grow it - int iNewBlocks = Bu::blkDiv( iNewSize, rParent.iBlockSize ); while( aBlocks.getSize() < iNewBlocks ) { aBlocks.append( rParent.allocateBlock() ); @@ -438,6 +604,10 @@ int32_t Bu::Myriad::Stream::write( int32_t iStart, const void *pTarget, while( iSize > 0 ) { int32_t iBlockIdx = iStart/rParent.iBlockSize; + while( iBlockIdx >= aBlocks.getSize() ) + { + aBlocks.append( rParent.allocateBlock() ); + } int32_t iBlock = aBlocks[iBlockIdx]; int32_t iChunkWrite = rParent.blockWrite( iBlock, iStart%rParent.iBlockSize, pTarget, iSize @@ -449,6 +619,8 @@ int32_t Bu::Myriad::Stream::write( int32_t iStart, const void *pTarget, reinterpret_cast(pTarget) += iChunkWrite; iSize -= iChunkWrite; } + if( this->iSize < iStart ) + this->iSize = iStart; return iWrite; } @@ -460,6 +632,12 @@ Bu::String Bu::Myriad::Stream::getLocation() const .arg( rParent.getLocation() ).arg( iStream ); } +Bu::Array Bu::Myriad::Stream::getBlockList() const +{ + Bu::MutexLocker l( mAccess ); + return aBlocks.clone(); +} + void Bu::Myriad::Stream::open() { Bu::MutexLocker l( mAccess ); diff --git a/src/stable/myriad.h b/src/stable/myriad.h index 6d99ee4..7cf6041 100644 --- a/src/stable/myriad.h +++ b/src/stable/myriad.h @@ -40,8 +40,8 @@ namespace Bu Write = 0x02, ///< Open file for writing Create = 0x04, ///< Create file if it doesn't exist Truncate = 0x08, ///< Truncate file if it does exist - Append = 0x10, ///< Always append on every write - NonBlock = 0x20, ///< Open file in non-blocking mode + Append = 0x10, ///< Start writing at end of file + //NonBlock = 0x20, ///< Open file in non-blocking mode Exclusive = 0x44, ///< Create file, if it exists then fail // Helpful mixes @@ -80,8 +80,12 @@ namespace Bu private: bool loadMyriad(); void createMyriad( int32_t iBlockSize, int32_t iPreallocateBlocks ); + void writeHeader(); + int32_t __calcHeaderSize(); int32_t allocateBlock(); + int32_t __allocateBlock(); void releaseBlock( int32_t iBlockId, bool bBlank=true ); + void __releaseBlock( int32_t iBlockId, bool bBlank=true ); void blankBlock( int32_t iBlockId ); void openStream( StreamId id ); @@ -124,6 +128,7 @@ namespace Bu int32_t read( int32_t iStart, void *pTarget, int32_t iSize ); int32_t write( int32_t iStart, const void *pTarget, int32_t iSize ); Bu::String getLocation() const; + Bu::Array getBlockList() const; /** * Doesn't actually open, just increments the open counter. @@ -150,9 +155,7 @@ namespace Bu }; private: - typedef Bu::Hash StreamHash; - typedef Bu::List IndexList; mutable Bu::Mutex mAccess; mutable Bu::Mutex mBacking; @@ -160,8 +163,10 @@ namespace Bu int32_t iBlockSize; int32_t iBlockCount; bool bIsNewStream; + bool bStructureChanged; StreamHash hStream; IndexList lFreeBlocks; + StreamId iLastUsedIndex; }; }; diff --git a/src/stable/myriadstream.h b/src/stable/myriadstream.h index c01cf34..b86dbd7 100644 --- a/src/stable/myriadstream.h +++ b/src/stable/myriadstream.h @@ -13,12 +13,14 @@ namespace Bu private: MyriadStream( Bu::Myriad &rMyriad, Bu::Myriad::Stream *pStream, Bu::Myriad::Mode eMode ); + public: virtual ~MyriadStream(); public: virtual void close(); virtual size read( void *pBuf, size iBytes ); virtual size write( const void *pBuf, size iBytes ); + using Stream::write; virtual size tell(); virtual void seek( size offset ); virtual void setPos( size pos ); diff --git a/src/tests/bigmyriad.cpp b/src/tests/bigmyriad.cpp index 21c5ff2..9d24741 100644 --- a/src/tests/bigmyriad.cpp +++ b/src/tests/bigmyriad.cpp @@ -5,17 +5,17 @@ int main() { Bu::File f("big.myr", Bu::File::Read|Bu::File::Write|Bu::File::Create ); - Bu::Myriad m( f, 8, 12 ); + Bu::Myriad m( f, 256, 12 ); char *buf = new char[1024*1024*10]; - memset( buf, 0, 1024*1024*10 ); - for( int j = 0; j < 250; j++ ) + for( int j = 0; j < 25; j++ ) { -// m.openStream( m.createStream() ).write( buf, 1024*1024*10 ); -// m.sync(); -// printf("\r%03d%%", (j+1)*100/250 ); -// fflush( stdout ); + memset( buf, j, 1024*1024*10 ); + m.create( Bu::Myriad::Write ).write( buf, 1024*1024*10 ); +// m.sync(); + printf("\r%03d%%", (j+1)*100/25 ); + fflush( stdout ); } printf("\n\n"); -- cgit v1.2.3