#include "bu/myriad.h" #include "bu/myriadstream.h" #include "bu/membuf.h" #include "bu/mutexlocker.h" #include "bu/util.h" #include "bu/sio.h" #define Myriad_MAGIC_CODE ((unsigned char *)"\x0a\xd3\xfa\x84") #define MyriadRead( target, size ) if( rBacking.read( target, size ) < size ) \ { \ throw Bu::MyriadException( Bu::MyriadException::invalidFormat, \ "Insufficent data reading myriad data from backing stream."); \ } (void)0 namespace Bu { subExceptionDef( MyriadException ) template t blkDiv( t total, t block ) { return (total/block)+((total%block==0)?(0):(1)); } } Bu::Myriad::Myriad( Bu::Stream &rBacking, int iBlockSize, int iPreallocateBlocks ) : rBacking( rBacking ), iBlockSize( iBlockSize ), iBlockCount( 0 ), bIsNewStream( true ), bStructureChanged( false ), iLastUsedIndex( -1 ) { if( !rBacking.isSeekable() ) { throw Bu::MyriadException( Bu::MyriadException::invalidBackingStream, "Myriad backing stream must be random access (seekable)."); } if( rBacking.getSize() == 0 ) { createMyriad( iBlockSize, iPreallocateBlocks ); } else { loadMyriad(); } } Bu::Myriad::~Myriad() { writeHeader(); } Bu::MyriadStream Bu::Myriad::create( Bu::Myriad::Mode eMode, int32_t iPreallocateBytes ) { Bu::MutexLocker l( mAccess ); Stream *pStream = new Stream( *this, ++iLastUsedIndex, 0 ); int iBlocks = std::max(1, blkDiv( iPreallocateBytes, iBlockSize )); for( int j = 0; j < iBlocks; j++ ) { pStream->aBlocks.append( __allocateBlock() ); } hStream.insert( pStream->iStream, pStream ); bStructureChanged = true; return Bu::MyriadStream( *this, pStream, (Mode)(eMode&ReadWrite) ); } Bu::MyriadStream Bu::Myriad::open( Bu::Myriad::StreamId iStream, Bu::Myriad::Mode eMode ) { Bu::MutexLocker l( mAccess ); if( !hStream.has( iStream ) ) { throw Bu::MyriadException( MyriadException::noSuchStream, "No such stream."); } { Bu::MutexLocker l2( mBacking ); if( (eMode&Write) && rBacking.isWritable() ) { throw Bu::MyriadException( MyriadException::badMode, "Backing stream does not support writing."); } } return Bu::MyriadStream( *this, hStream.get( iStream ), eMode ); } bool Bu::Myriad::erase( Bu::Myriad::StreamId /*iStream*/ ) { return false; } bool Bu::Myriad::setSize( Bu::Myriad::StreamId /*iStream*/, int32_t /*iNewSize*/ ) { return false; } Bu::String Bu::Myriad::getLocation() const { Bu::MutexLocker l( mAccess ); Bu::MutexLocker l2( mBacking ); return Bu::String("myriad(%1,%2):%3") .arg( 1 ).arg( iBlockSize ).arg( rBacking.getLocation() ); } bool Bu::Myriad::loadMyriad() { Bu::println("Load myriad!"); char sMagicCode[4]; rBacking.setPos( 0 ); MyriadRead( sMagicCode, 4 ); if( memcmp( sMagicCode, Myriad_MAGIC_CODE, 4 ) ) { throw Bu::MyriadException( Bu::MyriadException::invalidFormat, "Backing stream does not seem to be a Myriad structure."); } uint8_t uVer; uint8_t uBitsPerInt; MyriadRead( &uVer, 1 ); if( uVer != 1 ) { throw Bu::MyriadException( Bu::MyriadException::invalidFormat, "Only version 1 myriad structures are supported."); } MyriadRead( &uBitsPerInt, 1 ); if( uBitsPerInt != 32 ) { throw Bu::MyriadException( Bu::MyriadException::invalidFormat, "Only 32 bits per int are supported at this time."); } MyriadRead( &iBlockSize, 4 ); int iStreamCount; MyriadRead( &iStreamCount, 4 ); // // Read stream data -- Bootstrap the zero stream // StreamId iStream; MyriadRead( &iStream, 4 ); if( iStream != 0 ) { throw Bu::MyriadException( Bu::MyriadException::invalidFormat, "The first stream defined must be the header/zero stream."); } iLastUsedIndex = iStream; int32_t iHeaderStreamBytes; MyriadRead( &iHeaderStreamBytes, 4 ); Stream *pHeaderStream = new Stream( *this, iStream, iHeaderStreamBytes ); hStream.insert( iStream, pHeaderStream ); int iHeaderStreamBlocks = blkDiv(iHeaderStreamBytes, iBlockSize ); MyriadStream sHeader( *this, pHeaderStream, Read ); // We need to read enough so that we can gurantee that we're within a block // that we have read the index to, plus one index. for( int32_t j = 0; j < iHeaderStreamBlocks; j++ ) { int32_t iBlockIndex; MyriadRead( &iBlockIndex, 4 ); pHeaderStream->aBlocks.append( iBlockIndex ); if( rBacking.tell()+4 <= (j+1)*iBlockSize ) break; } // Bootstrap now using the header stream to read the rest of the data. sHeader.setPos( rBacking.tell() ); while( pHeaderStream->aBlocks.getSize() < iHeaderStreamBlocks ) { int32_t iBlockIndex; sHeader.read( &iBlockIndex, 4 ); pHeaderStream->aBlocks.append( iBlockIndex ); } // Ok, now we can read the rest of the header in. for( int j = 1; j < iStreamCount; j++ ) { int32_t iStreamBytes; sHeader.read( &iStream, 4 ); sHeader.read( &iStreamBytes, 4 ); Stream *pStream = new Stream( *this, iStream, iStreamBytes ); int32_t iBlocks = blkDiv(iStreamBytes, iBlockSize ); for( int k = 0; k < iBlocks; k++ ) { int32_t iBlockIndex; sHeader.read( &iBlockIndex, 4 ); pStream->aBlocks.append( iBlockIndex ); } hStream.insert( iStream, pStream ); if( iLastUsedIndex < iStream ) iLastUsedIndex = iStream; } return true; } void Bu::Myriad::createMyriad( int32_t iBlockSize, int32_t iPreallocateBlocks ) { if( iBlockSize < 8 ) { throw Bu::MyriadException( Bu::MyriadException::invalidParameter, "iBlockSize cannot be below 8"); } if( rBacking.getSize() ) { throw Bu::MyriadException( Bu::MyriadException::invalidFormat, "Backing stream contains data, but not a myriad structure."); } /* struct { char sMagicCode[4]; uint8_t uVer; uint8_t uBitsPerInt; uint32_t uBlockSize; uint32_t uStreamCount; } sHeader; struct { uint32_t uStreamId; uint32_t uStreamSize; } sStreamHeader; Bu::println("sHeader = %1, sStreamHeader = %2").arg( sizeof(sHeader) ).arg( sizeof(sStreamHeader) ); */ // Start with the bytes for the file header and initial stream header int iHeaderStreamBytes = 14 // Base header + 8; // Stream header // Pick the block count that matches our current estimate for the header // plus one block index. int iHeaderStreamBlocks = blkDiv(iHeaderStreamBytes+4, iBlockSize ); Bu::println("Initial estimate: %1 bytes / %2 cur blocks, %3 computed blocks (%4 target bytes).") .arg( iHeaderStreamBytes+(iHeaderStreamBlocks*4) ) .arg( iHeaderStreamBlocks ) .arg( blkDiv((iHeaderStreamBytes+(iHeaderStreamBlocks*4)), iBlockSize) ) .arg( iHeaderStreamBlocks*iBlockSize ); while( iHeaderStreamBytes+(iHeaderStreamBlocks*4) > iHeaderStreamBlocks*iBlockSize ) { iHeaderStreamBlocks = blkDiv((iHeaderStreamBytes+((iHeaderStreamBlocks+1)*4)), iBlockSize); if( iHeaderStreamBlocks > 100 ) break; Bu::println(" Adjustment: %1 bytes / %2 cur blocks, %3 computed blocks (%4 target bytes).") .arg( iHeaderStreamBytes+(iHeaderStreamBlocks*4) ) .arg( iHeaderStreamBlocks ) .arg( blkDiv((iHeaderStreamBytes+(iHeaderStreamBlocks*4)), iBlockSize) ) .arg( iHeaderStreamBlocks*iBlockSize ); } if( iPreallocateBlocks > iHeaderStreamBlocks ) { rBacking.setSize( iBlockSize*iPreallocateBlocks ); } else { rBacking.setSize( iBlockSize*iHeaderStreamBlocks ); } // // Write Myriad header // uint8_t uVer = 1; uint8_t uBpi = 32; int32_t iStreamCount = 1; rBacking.setPos( 0 ); rBacking.write( Myriad_MAGIC_CODE, 4 ); rBacking.write( &uVer, 1 ); rBacking.write( &uBpi, 1 ); rBacking.write( &iBlockSize, 4 ); rBacking.write( &iStreamCount, 4 ); Stream *pHeadStream = new Stream( *this, 0, Bu::Myriad::ReadWrite ); // // Write stream header // uint32_t uStreamId = 0; uint32_t uStreamSize = iHeaderStreamBytes+iHeaderStreamBlocks*4; rBacking.write( &uStreamId, 4 ); rBacking.write( &uStreamSize, 4 ); for( int iBlockIndex = 0; iBlockIndex < iHeaderStreamBlocks; iBlockIndex++ ) { rBacking.write( &iBlockIndex, 4 ); pHeadStream->aBlocks.append( iBlockIndex ); } rBacking.flush(); hStream.insert( pHeadStream->iStream, pHeadStream ); for( int32_t j = iHeaderStreamBlocks; j < iPreallocateBlocks; j++ ) { lFreeBlocks.append( j ); } iLastUsedIndex = 0; } void Bu::Myriad::writeHeader() { if( !rBacking.isWritable() ) return; Bu::println("Writing stream breakdown:"); Bu::MemBuf mbHeader; { Bu::MutexLocker l( mAccess ); int32_t iHdrStreamSize = __calcHeaderSize(); // Maybe just do stream surgery here. { Stream *psHeader = hStream.get( 0 ); Bu::MutexLocker l2( psHeader->mAccess ); int iNewBlocks = Bu::blkDiv( iHdrStreamSize, iBlockSize ); if( iHdrStreamSize < psHeader->iSize ) { while( psHeader->aBlocks.getSize() > iNewBlocks ) { __releaseBlock( psHeader->aBlocks.last(), false ); psHeader->aBlocks.eraseLast(); } } else if( iHdrStreamSize > psHeader->iSize ) { while( psHeader->aBlocks.getSize() < iNewBlocks ) { psHeader->aBlocks.append( __allocateBlock() ); } } psHeader->iSize = iHdrStreamSize; } Bu::println("Computed header size: %1 bytes. Ver=%2, Bpi=%3, BlockSize=%4").arg( iHdrStreamSize ).arg( 1 ).arg( 32 ).arg( iBlockSize ); uint8_t uVer = 1; uint8_t uBpi = 32; int32_t iStreamCount = hStream.getSize(); mbHeader.write( Myriad_MAGIC_CODE, 4 ); mbHeader.write( &uVer, 1 ); mbHeader.write( &uBpi, 1 ); mbHeader.write( &iBlockSize, 4 ); mbHeader.write( &iStreamCount, 4 ); StreamHash::KeyList lStreamId = hStream.getKeys(); lStreamId.sort(); if( lStreamId.first() != 0 ) { throw Bu::MyriadException( Bu::MyriadException::invalidFormat, "There doesn't appear to be a zero (header) stream."); } for( StreamHash::KeyList::iterator i = lStreamId.begin(); i; i++ ) { uint32_t uStreamId = *i; Stream *pStream = hStream.get( uStreamId ); uint32_t uStreamSize = pStream->getSize(); mbHeader.write( &uStreamId, 4 ); mbHeader.write( &uStreamSize, 4 ); Bu::Array aBlocks = pStream->getBlockList(); Bu::println(" Stream %1 is %2 bytes %3 blocks (%4 blocks computed)") .arg( *i ).arg( uStreamSize ) .arg( aBlocks.getSize() ) .arg( Bu::blkDiv( (int)uStreamSize, (int)iBlockSize ) ); for( Bu::Array::iterator i = aBlocks.begin(); i; i++ ) { int32_t iIdx = *i; mbHeader.write( &iIdx, 4 ); } } } Bu::MyriadStream sHeader( *this, hStream.get( 0 ), Bu::Myriad::Write ); sHeader.write( mbHeader.getString() ); } int32_t Bu::Myriad::__calcHeaderSize() { int32_t iHdrSize = 4+1+1+4+4; StreamHash::KeyList lStreamId = hStream.getKeys(); lStreamId.sort(); if( lStreamId.first() != 0 ) { throw Bu::MyriadException( Bu::MyriadException::invalidFormat, "There doesn't appear to be a zero (header) stream."); } for( StreamHash::KeyList::iterator i = lStreamId.begin(); i; i++ ) { iHdrSize += 4+4; int32_t iStreamSize = hStream.get( *i )->getSize(); if( (*i) != 0 ) { iHdrSize += Bu::blkDiv( iStreamSize, iBlockSize )*4; } } int32_t iNewSize = iHdrSize; int32_t iOldSize; do { iOldSize = iNewSize; iNewSize = iHdrSize + Bu::blkDiv(iNewSize, iBlockSize); } while( iOldSize != iNewSize ); return iNewSize; } int32_t Bu::Myriad::allocateBlock() { Bu::MutexLocker l( mAccess ); return __allocateBlock(); } int32_t Bu::Myriad::__allocateBlock() { if( lFreeBlocks.isEmpty() ) { // Increase the size of the backing stream int32_t iIndex = iBlockCount++; rBacking.setSize( iBlockCount*iBlockSize ); return iIndex; } else { // Provide an existing free block. return lFreeBlocks.peekPop(); } } void Bu::Myriad::releaseBlock( int32_t iBlockId, bool bBlank ) { Bu::MutexLocker l( mAccess ); __releaseBlock( iBlockId, bBlank ); } void Bu::Myriad::__releaseBlock( int32_t iBlockId, bool bBlank ) { lFreeBlocks.append( iBlockId ); if( bBlank ) { blankBlock( iBlockId ); } } void Bu::Myriad::blankBlock( int32_t iBlockId ) { Bu::MutexLocker l( mBacking ); rBacking.setPos( iBlockId*iBlockSize ); int32_t iChunk = std::min( iBlockSize, 4096 ); uint8_t *pChunk = new uint8_t[iChunk]; memset( pChunk, 0, iChunk ); int iLeft = iBlockSize; while( iLeft > 0 ) { int32_t iWrite = rBacking.write( pChunk, std::min( iChunk, iLeft ) ); iLeft -= iWrite; } delete[] pChunk; } void Bu::Myriad::openStream( StreamId id ) { Bu::MutexLocker l( mAccess ); hStream.get( id )->open(); } void Bu::Myriad::closeStream( StreamId id ) { Bu::MutexLocker l( mAccess ); hStream.get( id )->close(); } int32_t Bu::Myriad::blockRead( int32_t iBlock, int32_t iStart, void *pTarget, int32_t iSize ) { int32_t iUpperSize = iBlockSize - (iStart%iBlockSize); /* Bu::println("Max read within block: %1 vs %2 (start=%3, blocksize=%4)") .arg( iUpperSize ).arg( iSize ) .arg( iStart ).arg( iBlockSize ); */ int32_t iAmnt = std::min( iSize, iUpperSize ); Bu::MutexLocker l( mBacking ); rBacking.setPos( iBlockSize*iBlock + iStart ); return rBacking.read( pTarget, iAmnt ); } int32_t Bu::Myriad::blockWrite( int32_t iBlock, int32_t iStart, const void *pTarget, int32_t iSize ) { int32_t iUpperSize = iBlockSize - (iStart%iBlockSize); /* Bu::println("Max write within block: %1 vs %2 (start=%3, blocksize=%4)") .arg( iUpperSize ).arg( iSize ) .arg( iStart ).arg( iBlockSize ); */ int32_t iAmnt = std::min( iSize, iUpperSize ); Bu::MutexLocker l( mBacking ); rBacking.setPos( iBlock*iBlockSize + iStart ); return rBacking.write( pTarget, iAmnt ); } ///////// // Bu::Myriad::Stream // Bu::Myriad::Stream::Stream( Bu::Myriad &rParent, Bu::Myriad::StreamId iStream, int32_t iSize ) : rParent( rParent ), iStream( iStream ), iSize( iSize ), iOpenCount( 0 ), bStructureChanged( false ) { } Bu::Myriad::Stream::~Stream() { } int32_t Bu::Myriad::Stream::getSize() const { Bu::MutexLocker l( mAccess ); return iSize; } int32_t Bu::Myriad::Stream::getBlockSize() const { Bu::MutexLocker l( mAccess ); return rParent.iBlockSize; } Bu::Myriad::StreamId Bu::Myriad::Stream::getStreamId() const { return iStream; } int32_t Bu::Myriad::Stream::getOpenCount() const { Bu::MutexLocker l( mAccess ); return iOpenCount; } void Bu::Myriad::Stream::setSize( int32_t iNewSize ) { // Two possible modes, shrink or grow. Bu::MutexLocker l( mAccess ); int iNewBlocks = Bu::blkDiv( iNewSize, rParent.iBlockSize ); if( iNewSize < iSize ) { // Shrink it while( aBlocks.getSize() > iNewBlocks ) { rParent.releaseBlock( aBlocks.last(), false ); aBlocks.eraseLast(); } iSize = iNewSize; } else if( iNewSize > iSize ) { // Grow it while( aBlocks.getSize() < iNewBlocks ) { aBlocks.append( rParent.allocateBlock() ); } iSize = iNewSize; } } int32_t Bu::Myriad::Stream::read( int32_t iStart, void *pTarget, int32_t iSize ) { int32_t iRead = 0; Bu::MutexLocker l( mAccess ); while( iSize > 0 ) { int32_t iBlock = aBlocks[iStart/rParent.iBlockSize]; int32_t iChunkRead = rParent.blockRead( iBlock, iStart%rParent.iBlockSize, pTarget, iSize ); if( iChunkRead == 0 ) break; iRead += iChunkRead; iStart += iChunkRead; reinterpret_cast(pTarget) += iChunkRead; iSize -= iChunkRead; } return iRead; } int32_t Bu::Myriad::Stream::write( int32_t iStart, const void *pTarget, int32_t iSize ) { int32_t iWrite = 0; Bu::MutexLocker l( mAccess ); while( iSize > 0 ) { int32_t iBlockIdx = iStart/rParent.iBlockSize; while( iBlockIdx >= aBlocks.getSize() ) { aBlocks.append( rParent.allocateBlock() ); } int32_t iBlock = aBlocks[iBlockIdx]; int32_t iChunkWrite = rParent.blockWrite( iBlock, iStart%rParent.iBlockSize, pTarget, iSize ); if( iChunkWrite == 0 ) break; iWrite += iChunkWrite; iStart += iChunkWrite; reinterpret_cast(pTarget) += iChunkWrite; iSize -= iChunkWrite; } if( this->iSize < iStart ) this->iSize = iStart; return iWrite; } Bu::String Bu::Myriad::Stream::getLocation() const { Bu::MutexLocker l( mAccess ); return Bu::String("%1:stream %2")\ .arg( rParent.getLocation() ).arg( iStream ); } Bu::Array Bu::Myriad::Stream::getBlockList() const { Bu::MutexLocker l( mAccess ); return aBlocks.clone(); } void Bu::Myriad::Stream::open() { Bu::MutexLocker l( mAccess ); iOpenCount++; } bool Bu::Myriad::Stream::close() { Bu::MutexLocker l( mAccess ); return (bool)(--iOpenCount); }