From f1e3f25d9b7a12cdedb99e4cb0bfa66157a1a972 Mon Sep 17 00:00:00 2001 From: Mike Buland Date: Tue, 27 Aug 2024 13:37:36 -0700 Subject: Making progress. --- src/stable/myriad.cpp | 930 +++++++++++--------------------------------- src/stable/myriad.h | 287 +++++--------- src/stable/myriadstream.cpp | 257 ++---------- src/stable/myriadstream.h | 49 +-- 4 files changed, 372 insertions(+), 1151 deletions(-) (limited to 'src/stable') diff --git a/src/stable/myriad.cpp b/src/stable/myriad.cpp index c606369..5278ac5 100644 --- a/src/stable/myriad.cpp +++ b/src/stable/myriad.cpp @@ -1,25 +1,18 @@ -/* - * Copyright (C) 2007-2023 Xagasoft, All rights reserved. - * - * This file is part of the libbu++ library and is released under the - * terms of the license contained in the file LICENSE. - */ - -#include "bu/config.h" #include "bu/myriad.h" -#include "bu/stream.h" #include "bu/myriadstream.h" + #include "bu/mutexlocker.h" -#include +#include "bu/util.h" #include "bu/sio.h" -using Bu::sio; -using Bu::Fmt; #define Myriad_MAGIC_CODE ((unsigned char *)"\x0a\xd3\xfa\x84") -// #define TRACE( x ) Bu::println("%1:%2: %3: %4 - %5").arg(__FILE__).arg( __LINE__ ).arg(__PRETTY_FUNCTION__).arg(sStore.getLocation()).arg(x) -#define TRACE( x ) (void)0 +#define MyriadRead( target, size ) if( rBacking.read( target, size ) < size ) \ +{ \ + throw Bu::MyriadException( Bu::MyriadException::invalidFormat, \ + "Insufficent data reading myriad data from backing stream."); \ +} (void)0 namespace Bu { @@ -29,803 +22,324 @@ namespace Bu } } -Bu::Myriad::Myriad( Bu::Stream &sStore, int iBlockSize, int iPreallocate ) : - sStore( sStore ), +Bu::Myriad::Myriad( Bu::Stream &rBacking, int iBlockSize, + int iPreallocateBlocks ) : + rBacking( rBacking ), iBlockSize( iBlockSize ), - iBlocks( 0 ), - iUsed( 0 ), - bHeaderChanged( false ) + iBlockCount( 0 ), + bIsNewStream( true ) { - try + if( !rBacking.isSeekable() ) { - initialize(); + throw Bu::MyriadException( Bu::MyriadException::invalidBackingStream, + "Myriad backing stream must be random access (seekable)."); } - catch( Bu::MyriadException &e ) + if( !loadMyriad() ) { - if( e.getErrorCode() == MyriadException::emptyStream ) - { - initialize( iBlockSize, iPreallocate ); - } - else - { - throw; - } + createMyriad( iBlockSize, iPreallocateBlocks ); } } Bu::Myriad::~Myriad() { - mActiveBlocks.lock(); - TRACE("mActiveBlocks locked."); - if( !hActiveBlocks.isEmpty() ) - { - sio << "Bu::Myriad::~Myriad(): Error: There are " - << hActiveBlocks.getSize() << " unsynced blocks!" << sio.nl; - } - TRACE("mActiveBlocks unlocking..."); - mActiveBlocks.unlock(); - sync(); - - for( StreamArray::iterator i = aStreams.begin(); i; i++ ) - { - delete *i; - } } -void Bu::Myriad::sync() +Bu::MyriadStream Bu::Myriad::create( Bu::Myriad::Mode /*eMode*/, + int32_t /*iPreallocateBytes*/ ) { - updateHeader(); - - mActiveBlocks.lock(); - TRACE("mActiveBlocks locked."); - for( BlockHash::iterator i = hActiveBlocks.begin(); i; i++ ) - { - if( (*i)->bChanged ) - { - syncBlock( *i ); - } - } - TRACE("mActiveBlocks unlocked..."); - mActiveBlocks.unlock(); + return Bu::MyriadStream( *this, NULL, (Mode)0 ); } -void Bu::Myriad::initialize() +Bu::MyriadStream Bu::Myriad::open( Bu::Myriad::StreamId iStream, + Bu::Myriad::Mode eMode ) { - MutexLocker mLock( mHeader ); - TRACE("mHeader locked."); - lFreeBlocks.clear(); - sStore.setPosEnd( 0 ); - Bu::size iSize = sStore.tell(); - sStore.setPos( 0 ); - - unsigned char buf[4]; - if( sStore.read( buf, 4 ) < 4 ) - { - TRACE("mHeader unlocked..."); - throw MyriadException( MyriadException::emptyStream, - "Input stream appears to be empty."); - } - if( memcmp( buf, Myriad_MAGIC_CODE, 4 ) ) - { - TRACE("mHeader unlocked..."); - throw MyriadException( MyriadException::invalidFormat, - "Stream does not appear to be a valid Myriad format."); - } - sStore.read( buf, 2 ); - if( buf[0] != 1 ) - { - TRACE("mHeader unlocked..."); - throw MyriadException( MyriadException::badVersion, - "We can only handle version 1 for now."); - } - if( buf[1] != 32 ) - { - TRACE("mHeader unlocked..."); - throw MyriadException( MyriadException::invalidWordSize, - "We can only handle 32-bit words at the moment."); - } - sStore.read( &iBlockSize, 4 ); - int iStreams; - sStore.read( &iStreams, 4 ); - - iBlocks = iSize/iBlockSize; - //sio << "Myriad: iSize=" << iSize << ", iBlockSize=" << iBlockSize - // << ", iBlocks=" << iBlocks << ", iStreams=" << iStreams << sio.nl; - - int iHeaderSize = 14 + 8 + 4; - int iHeaderBlocks = 0; //blkDiv( iHeaderSize+4, iBlockSize ); - - while( iHeaderSize > iHeaderBlocks*iBlockSize ) - { - iHeaderBlocks = blkDiv( iHeaderSize+4, iBlockSize ); - iHeaderSize = 14 + 8 + 4*iHeaderBlocks; - } - - //sio << "Myriad: iHeaderSize=" << iHeaderSize - // << ", iHeaderBlocks=" << iHeaderBlocks << sio.nl; - - Stream *pFakeHdr = new Stream; - pFakeHdr->iId = 0; - pFakeHdr->setSize( iHeaderSize ); - for( int j = 0; j < iHeaderBlocks; j++ ) - { - pFakeHdr->aBlocks.append( j ); - } - -// sio << "Blocks: " << iBlocks << " (size = " << iSize << "/" << iBlockSize -// << ")" << sio.nl; - Bu::BitString bsBlockUsed( iBlocks, false ); - bsBlockUsed.clear(); - -// bool bCanSkip = false; // Can skip around, post initial header stream i/o - MyriadStream *pIn = new MyriadStream( *this, pFakeHdr ); - pIn->setPos( sStore.tell() ); - for( int j = 0; j < iStreams; j++ ) + Bu::MutexLocker l( mAccess ); + if( !hStream.has( iStream ) ) { - int iSizeTmp; - aStreams.append( new Stream() ); - Stream &s = *aStreams[j]; - pIn->read( &s.iId, 4 ); - pIn->read( &iSizeTmp, 4 ); - s.setSize( iSizeTmp ); - int iSBlocks = blkDiv(s.getSize(), iBlockSize); - // sio << "Myriad: - Stream::iId=" << s.iId - // << ", Stream::iSize=" << s.iSize - // << ", Stream::aBlocks=" << iSBlocks - // << ", pIn->tell()=" << pIn->tell() << sio.nl; - for( int k = 0; k < iSBlocks; k++ ) - { - int iBId; - pIn->read( &iBId, 4 ); - // sio << "Myriad: - iBId=" << iBId - // << ", iStartPos=" << iBId*iBlockSize - // << ", pIn->tell()=" << pIn->tell() << sio.nl; - s.aBlocks.append( iBId ); - bsBlockUsed.setBit( iBId ); - iUsed++; - if( (j == 0 && k == iHeaderBlocks-1) ) - { - // sio << "Myriad: - End of prepartition, unlocking skipping." - // << sio.nl; -// bCanSkip = true; - MyriadStream *pTmp = new MyriadStream( *this, aStreams[0] ); - // sio << "Myriad - Position = " << pIn->tell() << sio.nl; - pTmp->setPos( pIn->tell() ); - delete pIn; - delete pFakeHdr; - pIn = pTmp; - } - } + throw Bu::MyriadException( MyriadException::noSuchStream, + "No such stream."); } - delete pIn; - - for( int j = 0; j < iBlocks; j++ ) { - if( bsBlockUsed.getBit( j ) == false ) + Bu::MutexLocker l2( mBacking ); + if( (eMode&Write) && rBacking.isWritable() ) { -// sio << "Preinitialized block " << j << " is free." << sio.nl; - lFreeBlocks.append( j ); + throw Bu::MyriadException( MyriadException::badMode, + "Backing stream does not support writing."); } } -// sio << "Myriad: Blocks used: " << bsBlockUsed.toString() << sio.nl; - TRACE("mHeader unlocked..."); + return Bu::MyriadStream( *this, hStream.get( iStream ), eMode ); } -void Bu::Myriad::initialize( int iBlockSize, int iPreAllocate ) +bool Bu::Myriad::erase( Bu::Myriad::StreamId /*iStream*/ ) { - MutexLocker mLock( mHeader ); - TRACE("mHeader locked."); - lFreeBlocks.clear(); - - for( StreamArray::iterator i = aStreams.begin(); i; i++ ) - { - delete *i; - } - aStreams.clear(); - iUsed = 0; - - int iHeaderSize = 14 + 8 + 4; - int iHeaderBlocks = 0; //blkDiv( iHeaderSize+4, iBlockSize ); - char cBuf = 1; - int iBuf = 0; - - Stream *pStr = new Stream; - pStr->iId = 0; - - while( iHeaderSize > iHeaderBlocks*iBlockSize ) - { - iHeaderBlocks = blkDiv( iHeaderSize+4, iBlockSize ); - iHeaderSize = 14 + 8 + 4*iHeaderBlocks; - } - - iPreAllocate += iHeaderBlocks; - - //sio << "Myriad: iHeaderSize=" << iHeaderSize << ", iBlockSize=" - // << iBlockSize << ", iHeaderBlocks=" << iHeaderBlocks << sio.nl; - -// bsBlockUsed.setSize( iPreAllocate, true ); - iUsed++; - - char *pBlock = new char[iBlockSize]; - memset( pBlock, 0, iBlockSize ); - for( int j = 0; j < iPreAllocate; j++ ) - { - sStore.write( pBlock, iBlockSize ); - } - delete[] (char *)pBlock; - - sStore.setPos( 0 ); - - // Magic number - sStore.write( Myriad_MAGIC_CODE, 4 ); - - // Version (0) - sStore.write( &cBuf, 1 ); - - // Bits per int - cBuf = 32; - sStore.write( &cBuf, 1 ); - - // The size of each block - sStore.write( &iBlockSize, 4 ); - - iBuf = 1; - // The number of streams - sStore.write( &iBuf, 4 ); - - // Stream header - iBuf = 0; - sStore.write( &iBuf, 4 ); - sStore.write( &iHeaderSize, 4 ); - for( iBuf = 0; iBuf < iHeaderBlocks; iBuf++ ) - { - sStore.write( &iBuf, 4 ); - } - - this->iBlockSize = iBlockSize; - this->iBlocks = iPreAllocate; - - pStr->setSize( sStore.tell() ); -// sio << "Myriad: Actual end of header stream = " << pStr->iSize << sio.nl; - - pStr->setSize( iHeaderSize ); - for( int j = 0; j < iHeaderBlocks; j++ ) - { -// sio << "Started block " << j << " is header." << sio.nl; - pStr->aBlocks.append( j ); -// bsBlockUsed.setBit( j ); - iUsed++; - } - for( int j = iHeaderBlocks; j < this->iBlocks; j++ ) - { -// sio << "Started block " << j << " is free." << sio.nl; - lFreeBlocks.append( j ); - } - - aStreams.append( pStr ); - - //sio << bsBlockUsed.toString() << " - " << pStr->aBlocks << sio.nl; - - bHeaderChanged = true; - //hStreams.insert( 0, BlockArray( 0 ) ); - TRACE("mHeader unlocked..."); -} - -void Bu::Myriad::Stream::setSize( int iNewSize ) -{ - MutexLocker l( mStream ); - iSize = iNewSize; -} - -void Bu::Myriad::Stream::growTo( int iNewSize ) -{ - MutexLocker l( mStream ); - if( iNewSize < iSize ) - return; - iSize = iNewSize; + return false; } -int Bu::Myriad::Stream::getSize() const +bool Bu::Myriad::setSize( Bu::Myriad::StreamId /*iStream*/, + int32_t /*iNewSize*/ ) { - MutexLocker l( mStream ); - return iSize; + return false; } -void Bu::Myriad::updateHeader() +bool Bu::Myriad::loadMyriad() { - MutexLocker mLock( mHeader ); - TRACE("mHeader locked."); - - if( bHeaderChanged == false ) + Bu::println("Load myriad!"); + char sMagicCode[4]; + rBacking.setPos( 0 ); + MyriadRead( sMagicCode, 4 ); + if( memcmp( sMagicCode, Myriad_MAGIC_CODE, 4 ) ) { - TRACE("mHeader unlocked..."); - return; + throw Bu::MyriadException( Bu::MyriadException::invalidFormat, + "Backing stream does not seem to be a Myriad structure."); } - if( !sStore.canWrite() ) + uint8_t uVer; + uint8_t uBitsPerInt; + MyriadRead( &uVer, 1 ); + if( uVer != 1 ) { - TRACE("mHeader unlocked..."); - return; + throw Bu::MyriadException( Bu::MyriadException::invalidFormat, + "Only version 1 myriad structures are supported."); } - - char cBuf; - int iBuf; - - //for( StreamArray::iterator i = aStreams.begin(); i; i++ ) - //{ - // sio << "Myriad: Stream " << Fmt(4) << (*i)->iId << ": " << (*i)->aBlocks << sio.nl; - //} - - // Compute the new size of the header. - int iHeaderSize = 14 + 8*aStreams.getSize(); -// sio << "Myriad: updateHeader: aStreams.getSize() = " << aStreams.getSize() -// << sio.nl; - for( StreamArray::iterator i = aStreams.begin(); i; i++ ) + MyriadRead( &uBitsPerInt, 1 ); + if( uBitsPerInt != 32 ) { - iHeaderSize += 4*(*i)->aBlocks.getSize(); -// sio << "Myriad: updateHeader: (*i)->aBlocks.getSize() = " -// << (*i)->aBlocks.getSize() << sio.nl; + throw Bu::MyriadException( Bu::MyriadException::invalidFormat, + "Only 32 bits per int are supported at this time."); } - int iNewBlocks = blkDiv( iHeaderSize, iBlockSize ); - while( iNewBlocks > aStreams[0]->aBlocks.getSize() ) + MyriadRead( &iBlockSize, 4 ); + int iStreamCount; + MyriadRead( &iStreamCount, 4 ); + + // + // Read stream data -- Bootstrap the zero stream + // + StreamId iStream; + MyriadRead( &iStream, 4 ); + if( iStream != 0 ) { - int iBlock = findEmptyBlock(); -// sio << "Myriad: updateHeader: Appending block " << iBlock -// << " to header." << sio.nl; - aStreams[0]->aBlocks.append( iBlock ); -// bsBlockUsed.setBit( iBlock ); - iUsed++; - iHeaderSize += 4; - iNewBlocks = blkDiv( iHeaderSize, iBlockSize ); + throw Bu::MyriadException( Bu::MyriadException::invalidFormat, + "The first stream defined must be the header/zero stream."); } - aStreams[0]->setSize( iHeaderSize ); -// sio << "Myriad: updateHeader: iHeaderSize=" << iHeaderSize -// << ", iNewBlocks=" << iNewBlocks << ", curBlocks=" -// << aStreams[0]->aBlocks.getSize() << sio.nl; + int32_t iHeaderStreamBytes; + MyriadRead( &iHeaderStreamBytes, 4 ); - MyriadStream sHdr( *this, aStreams[0] ); - sHdr.write( Myriad_MAGIC_CODE, 4 ); + Stream *pHeaderStream = new Stream( *this, iStream, iHeaderStreamBytes ); + int iHeaderStreamBlocks = blkDiv(iHeaderStreamBytes+4, iBlockSize ); - // Version (1) - cBuf = 1; - sHdr.write( &cBuf, 1 ); - - // Bits per int - cBuf = 32; - sHdr.write( &cBuf, 1 ); - - // The size of each block - sHdr.write( &iBlockSize, 4 ); - - iBuf = aStreams.getSize(); - // The number of streams - sHdr.write( &iBuf, 4 ); - - for( StreamArray::iterator i = aStreams.begin(); i; i++ ) + while( iHeaderStreamBytes+(iHeaderStreamBlocks*4) + > iHeaderStreamBlocks*iBlockSize ) { - int iSizeTmp; - sHdr.write( &(*i)->iId, 4 ); - sHdr.write( &iSizeTmp, 4 ); - (*i)->setSize( iSizeTmp ); - int iUsedBlocks = blkDiv( iSizeTmp, iBlockSize ); -// for( BlockArray::iterator j = (*i)->aBlocks.begin(); j; j++ ) - for( int j = 0; j < iUsedBlocks; j++ ) - { - sHdr.write( &(*i)->aBlocks[j], 4 ); - } + iHeaderStreamBlocks = blkDiv( + (iHeaderStreamBytes+((iHeaderStreamBlocks+1)*4)), iBlockSize + ); } - - bHeaderChanged = false; - TRACE("mHeader unlocked..."); -} - -int Bu::Myriad::createStream( int iPreAllocate ) -{ - MutexLocker mLock( mHeader ); - TRACE("mHeader locked."); - - Stream *pStr = new Stream(); - pStr->iId = aStreams.last()->iId+1; - //sio << "Myriad: New stream id=" << pStr->iId << ", iPreAllocate=" - // << iPreAllocate << sio.nl; - pStr->setSize( 0 ); - aStreams.append( pStr ); - for( int j = 0; j < iPreAllocate; j++ ) + for( int32_t j = 0; j < iHeaderStreamBlocks; j++ ) { - int iFreeBlock = findEmptyBlock(); -// sio << "Myriad: Adding block " << iFreeBlock << sio.nl; - pStr->aBlocks.append( iFreeBlock ); -// bsBlockUsed.setBit( iFreeBlock ); - iUsed++; + int32_t iBlockIndex; + MyriadRead( &iBlockIndex, 4 ); + pHeaderStream->aBlocks.append( iBlockIndex ); } - bHeaderChanged = true; + // Bootstrap now using the header stream to read the rest of the data. - TRACE("mHeader unlocked..."); - return pStr->iId; + return true; } -int Bu::Myriad::createStreamWithId( int iId, int iPreAllocate ) +void Bu::Myriad::createMyriad( int32_t iBlockSize, int32_t iPreallocateBlocks ) { - MutexLocker mLock( mHeader ); - TRACE("mHeader locked."); - - try + if( iBlockSize < 8 ) { - findStream( iId ); - TRACE("mHeader unlocked..."); - throw MyriadException( MyriadException::streamExists, - "There is already a stream with the given id."); + throw Bu::MyriadException( Bu::MyriadException::invalidParameter, + "iBlockSize cannot be below 8"); } - catch( MyriadException &e ) + if( rBacking.getSize() ) { - Stream *pStr = new Stream(); - pStr->iId = iId; - //sio << "Myriad: New stream id=" << pStr->iId << ", iPreAllocate=" - // << iPreAllocate << sio.nl; - pStr->setSize( 0 ); - if( aStreams.last()->iId < iId ) - { - aStreams.append( pStr ); - } - else - { - for( StreamArray::iterator i = aStreams.begin(); i; i++ ) - { - if( (*i)->iId > iId ) - { - aStreams.insert( i, pStr ); - break; - } - } - } - - for( int j = 0; j < iPreAllocate; j++ ) - { - int iFreeBlock = findEmptyBlock(); - // sio << "Myriad: Adding block " << iFreeBlock << sio.nl; - pStr->aBlocks.append( iFreeBlock ); -// bsBlockUsed.setBit( iFreeBlock ); - iUsed++; - } - - bHeaderChanged = true; - - TRACE("mHeader unlocked..."); - return pStr->iId; - } - TRACE("mHeader unlocked..."); + throw Bu::MyriadException( Bu::MyriadException::invalidFormat, + "Backing stream contains data, but not a myriad structure."); } - -int Bu::Myriad::findEmptyBlock() -{ - bHeaderChanged = true; - - if( lFreeBlocks.isEmpty() ) - { - sStore.setSize( (iBlocks+1)*(Bu::size)iBlockSize ); - return iBlocks++; +/* + struct { + char sMagicCode[4]; + uint8_t uVer; + uint8_t uBitsPerInt; + uint32_t uBlockSize; + uint32_t uStreamCount; + } sHeader; + + struct { + uint32_t uStreamId; + uint32_t uStreamSize; + } sStreamHeader; + + Bu::println("sHeader = %1, sStreamHeader = %2").arg( sizeof(sHeader) ).arg( sizeof(sStreamHeader) ); +*/ + + // Start with the bytes for the file header and initial stream header + int iHeaderStreamBytes + = 14 // Base header + + 8; // Stream header + + // Pick the block count that matches our current estimate for the header + // plus one block index. + int iHeaderStreamBlocks = blkDiv(iHeaderStreamBytes+4, iBlockSize ); + + Bu::println("Initial estimate: %1 bytes / %2 cur blocks, %3 computed blocks (%4 target bytes).") + .arg( iHeaderStreamBytes+(iHeaderStreamBlocks*4) ) + .arg( iHeaderStreamBlocks ) + .arg( blkDiv((iHeaderStreamBytes+(iHeaderStreamBlocks*4)), iBlockSize) ) + .arg( iHeaderStreamBlocks*iBlockSize ); + while( iHeaderStreamBytes+(iHeaderStreamBlocks*4) + > iHeaderStreamBlocks*iBlockSize ) + { + iHeaderStreamBlocks = blkDiv((iHeaderStreamBytes+((iHeaderStreamBlocks+1)*4)), iBlockSize); + if( iHeaderStreamBlocks > 100 ) + break; + Bu::println(" Adjustment: %1 bytes / %2 cur blocks, %3 computed blocks (%4 target bytes).") + .arg( iHeaderStreamBytes+(iHeaderStreamBlocks*4) ) + .arg( iHeaderStreamBlocks ) + .arg( blkDiv((iHeaderStreamBytes+(iHeaderStreamBlocks*4)), iBlockSize) ) + .arg( iHeaderStreamBlocks*iBlockSize ); + } + + if( iPreallocateBlocks > iHeaderStreamBlocks ) + { + rBacking.setSize( iBlockSize*iPreallocateBlocks ); } else { - return lFreeBlocks.dequeue(); + rBacking.setSize( iBlockSize*iHeaderStreamBlocks ); } -} -void Bu::Myriad::deleteStream( int iId ) -{ - MutexLocker mLock( mHeader ); - TRACE("mHeader locked."); + // + // Write Myriad header + // + uint8_t uVer = 1; + uint8_t uBpi = 32; + int32_t iStreamCount = 1; + rBacking.setPos( 0 ); + rBacking.write( Myriad_MAGIC_CODE, 4 ); + rBacking.write( &uVer, 1 ); + rBacking.write( &uBpi, 1 ); + rBacking.write( &iBlockSize, 4 ); + rBacking.write( &iStreamCount, 4 ); - if( iId < 0 ) + Stream *pHeadStream = new Stream( *this, 0, Bu::Myriad::ReadWrite ); + // + // Write stream header + // + uint32_t uStreamId = 0; + uint32_t uStreamSize = iHeaderStreamBytes+iHeaderStreamBlocks*4; + rBacking.write( &uStreamId, 4 ); + rBacking.write( &uStreamSize, 4 ); + for( int iBlockIndex = 0; iBlockIndex < iHeaderStreamBlocks; iBlockIndex++ ) { - TRACE("mHeader unlocked..."); - throw MyriadException( MyriadException::invalidStreamId, - "Invalid stream id."); + rBacking.write( &iBlockIndex, 4 ); + pHeadStream->aBlocks.append( iBlockIndex ); } - if( iId == 0 ) - { - TRACE("mHeader unlocked..."); - throw MyriadException( MyriadException::protectedStream, - "You cannot delete stream zero, it is protected."); - } - for( StreamArray::iterator i = aStreams.begin(); i; i++ ) - { - if( (*i)->iId == iId ) - { - Stream *pStream = *i; - for( BlockArray::iterator j = pStream->aBlocks.begin(); j; j++ ) - { - lFreeBlocks.append( *j ); -// bsBlockUsed.setBit( *j, false ); - iUsed--; - } - aStreams.erase( i ); - bHeaderChanged = true; - delete pStream; - TRACE("mHeader unlocked..."); - return; - } - } - TRACE("mHeader unlocked..."); -} + rBacking.flush(); -Bu::Array Bu::Myriad::getStreamIds() -{ - MutexLocker mLock( mHeader ); - TRACE("mHeader locked."); + hStream.insert( pHeadStream->iStream, pHeadStream ); - Bu::Array aRet( aStreams.getSize() ); - for( StreamArray::iterator i = aStreams.begin(); i; i++ ) + for( int32_t j = iHeaderStreamBlocks; j < iPreallocateBlocks; j++ ) { - aRet.append( (*i)->iId ); + lFreeBlocks.append( j ); } - - TRACE("mHeader unlocked..."); - return aRet; -} - -int Bu::Myriad::getStreamSize( int iId ) -{ - MutexLocker mLock( mHeader ); - TRACE("mHeader locked."); - - TRACE("mHeader unlocked..."); - return findStream( iId )->getSize(); } -bool Bu::Myriad::hasStream( int iId ) +int32_t Bu::Myriad::allocateBlock() { - MutexLocker mLock( mHeader ); - TRACE("mHeader locked."); - - try - { - findStream( iId ); - TRACE("mHeader unlocked..."); - return true; - }catch(...) + Bu::MutexLocker l( mAccess ); + if( lFreeBlocks.isEmpty() ) { - TRACE("mHeader unlocked..."); - return false; + // Increase the size of the backing stream + int32_t iIndex = iBlockCount++; + rBacking.setSize( iBlockCount*iBlockSize ); + return iIndex; } -} - -Bu::MyriadStream Bu::Myriad::openStream( int iId ) -{ - MutexLocker mLock( mHeader ); - TRACE("mHeader locked."); - - TRACE("mHeader unlocked..."); - //sio << "Myriad: Request to open stream: " << iId << sio.nl; - return MyriadStream( *this, findStream( iId ) ); -} - -int Bu::Myriad::getNumStreams() -{ - MutexLocker mLock( mHeader ); - TRACE("mHeader locked."); - - TRACE("mHeader unlocked..."); - return aStreams.getSize(); -} - -int Bu::Myriad::getBlockSize() -{ - return iBlockSize; -} - -int Bu::Myriad::getNumBlocks() -{ - return iBlocks; -} - -int Bu::Myriad::getNumUsedBlocks() -{ - return iUsed; -} - -Bu::size Bu::Myriad::getTotalUsedBytes() -{ - MutexLocker mLock( mHeader ); - TRACE("mHeader locked."); - - Bu::size iTotalSize = 0; - for( StreamArray::iterator i = aStreams.begin(); i; i++ ) + else { - iTotalSize += (*i)->getSize(); + // Provide an existing free block. + return lFreeBlocks.peekPop(); } - TRACE("mHeader unlocked..."); - return iTotalSize; } -Bu::size Bu::Myriad::getTotalUnusedBytes() +void Bu::Myriad::openStream( StreamId id ) { - MutexLocker mLock( mHeader ); - TRACE("mHeader locked."); - - Bu::size iTotalSize = (iBlocks-iUsed)*iBlockSize; - for( StreamArray::iterator i = aStreams.begin(); i; i++ ) - { - iTotalSize += iBlockSize - ((Bu::size)(*i)->getSize()%iBlockSize); - } - TRACE("mHeader unlocked..."); - return iTotalSize; + Bu::MutexLocker l( mAccess ); + hStream.get( id )->open(); } -Bu::size Bu::Myriad::getTotalUnusedBytes( int iFakeBlockSize ) +void Bu::Myriad::closeStream( StreamId id ) { - MutexLocker mLock( mHeader ); - TRACE("mHeader locked."); - - Bu::size iTotalSize = (iBlocks-iUsed)*iFakeBlockSize; - for( StreamArray::iterator i = aStreams.begin(); i; i++ ) - { - iTotalSize += iFakeBlockSize - ((*i)->getSize()%iFakeBlockSize); - } - TRACE("mHeader unlocked..."); - return iTotalSize; + Bu::MutexLocker l( mAccess ); + hStream.get( id )->close(); } -Bu::Myriad::Stream *Bu::Myriad::findStream( int iId ) +int32_t Bu::Myriad::blockRead( int32_t iStart, void *pTarget, int32_t iSize ) { - for( StreamArray::iterator i = aStreams.begin(); i; i++ ) - { - if( (*i)->iId == iId ) - return *i; - } + int32_t iUpperSize = iBlockSize - (iStart%iBlockSize); + Bu::println("Max size within block: %1 vs %2 (start=%3, blocksize=%4)") + .arg( iUpperSize ).arg( iSize ) + .arg( iStart ).arg( iBlockSize ); - throw MyriadException( MyriadException::noSuchStream, - "The requested stream doesn't exist and cannot be opened." ); + int32_t iAmnt = std::min( iSize, iUpperSize ); + Bu::MutexLocker l( mBacking ); + rBacking.setPos( iStart ); - return NULL; + return rBacking.read( pTarget, iAmnt ); } -Bu::Myriad::Block *Bu::Myriad::getBlock( int iBlock ) -{ -// sio << "Myriad: Reading block " << iBlock << ", bytes " -// << iBlockSize*iBlock << "-" << iBlockSize*(iBlock+1) << sio.nl; - Block *pBlock = new Block; - pBlock->pData = new char[iBlockSize]; - sStore.setPos( iBlockSize * (Bu::size)iBlock ); - sStore.read( pBlock->pData, iBlockSize ); - pBlock->bChanged = false; - pBlock->iBlockIndex = iBlock; - - mActiveBlocks.lock(); - TRACE("mHeader locked."); - hActiveBlocks.insert( iBlock, pBlock ); - TRACE("mHeader unlocked..."); - mActiveBlocks.unlock(); - - return pBlock; -} +///////// +// Bu::Myriad::Stream +// -void Bu::Myriad::releaseBlock( Bu::Myriad::Block *pBlock ) +Bu::Myriad::Stream::Stream( Bu::Myriad &rParent, Bu::Myriad::StreamId iStream, + int32_t iSize ) : + rParent( rParent ), + iStream( iStream ), + iSize( iSize ), + iOpenCount( 0 ), + bStructureChanged( false ) { - if( pBlock == NULL ) - return; -// sio << "Myriad: Releasing block " << pBlock->iBlockIndex << sio.nl; - syncBlock( pBlock ); - mActiveBlocks.lock(); - TRACE("mHeader locked."); - hActiveBlocks.erase( pBlock->iBlockIndex ); - TRACE("mHeader unlocked..."); - mActiveBlocks.unlock(); - - delete[] pBlock->pData; - delete pBlock; } -void Bu::Myriad::syncBlock( Block *pBlock ) +Bu::Myriad::Stream::~Stream() { - if( pBlock->bChanged ) - { -// sio << "Myriad: - Block changed, writing back to stream." << sio.nl; - sStore.setPos( iBlockSize * (Bu::size)pBlock->iBlockIndex ); - sStore.write( pBlock->pData, iBlockSize ); - pBlock->bChanged = false; - } -} - -int Bu::Myriad::streamAddBlock( Stream *pStream ) -{ - MutexLocker mLock( mHeader ); - TRACE("mHeader locked."); - - int iBlock = findEmptyBlock(); - pStream->aBlocks.append( iBlock ); -// bsBlockUsed.setBit( iBlock ); -// bHeaderChanged = true; - iUsed++; - TRACE("mHeader unlocked..."); - return iBlock; } -void Bu::Myriad::setStreamSize( Stream *pStream, long iSize ) +int32_t Bu::Myriad::Stream::read( int32_t iStart, void *pTarget, + int32_t iSize ) { - MutexLocker mLock( mHeader ); - TRACE("mHeader locked."); - - if( pStream->getSize() == iSize ) - { - TRACE("mHeader unlocked..."); - return; - } - else if( pStream->getSize() > iSize ) + int32_t iPos = iStart; + int32_t iRead = 0; + Bu::MutexLocker l( mAccess ); + while( iStart > 0 ) { - // Shrink - TRACE(Bu::String("Shrink stream %1 from %2 to %3").arg(pStream->iId).arg(pStream->getSize()).arg(iSize).end() ); - for( int iNewSize = pStream->aBlocks.getSize()*iBlockSize; - iNewSize-iBlockSize > iSize; iNewSize -= iBlockSize ) - { -// if( bsBlockUsed.getBit( pStream->aBlocks.last() ) ) - iUsed--; -// else -// sio << "Unused block used in stream? " << pStream->aBlocks.last() << sio.nl; - lFreeBlocks.enqueue( pStream->aBlocks.last() ); -// bsBlockUsed.setBit( pStream->aBlocks.last(), false ); - pStream->aBlocks.eraseLast(); - } - pStream->setSize( iSize ); - bHeaderChanged = true; + int32_t iBlock = aBlocks[iStart/rParent.iBlockSize]; + int32_t iOffset = iPos % rParent.iBlockSize; + int32_t iChunkRead = rParent.blockRead( + iBlock*rParent.iBlockSize+iOffset, pTarget, iSize + ); + if( iChunkRead == 0 ) + break; + iRead += iChunkRead; + reinterpret_cast(pTarget) += iChunkRead; + iSize -= iChunkRead; } - else - { - // Grow - TRACE(Bu::String("Grow stream %1 from %2 to %3").arg(pStream->iId).arg(pStream->getSize()).arg(iSize).end() ); - for( int iNewSize = pStream->aBlocks.getSize()*iBlockSize; - iNewSize < iSize; iNewSize += iBlockSize ) - { - //streamAddBlock( pStream ); - int iBlock = findEmptyBlock(); - pStream->aBlocks.append( iBlock ); -// bsBlockUsed.setBit( iBlock ); -// bHeaderChanged = true; - iUsed++; - } - pStream->setSize( iSize ); - bHeaderChanged = true; - } - TRACE("mHeader unlocked..."); -} -void Bu::Myriad::headerChanged() -{ - bHeaderChanged = true; + return iRead; } -bool Bu::Myriad::isMyriad( Bu::Stream &sStore ) +void Bu::Myriad::Stream::open() { - uint8_t uTmp; - - return isMyriad( sStore, uTmp ); -} - -bool Bu::Myriad::isMyriad( Bu::Stream &sStore, uint8_t &uTmp ) -{ - sStore.setPos( 0 ); - - unsigned char buf[4]; - if( sStore.read( buf, 4 ) < 4 ) - throw MyriadException( MyriadException::emptyStream, - "Input stream appears to be empty."); - sStore.read( &uTmp, 1 ); - sStore.setPos( 0 ); - if( memcmp( buf, Myriad_MAGIC_CODE, 4 ) ) - { - return false; - } - return true; + Bu::MutexLocker l( mAccess ); + iOpenCount++; } -const Bu::BitString Bu::Myriad::getBlocksUsed() const +bool Bu::Myriad::Stream::close() { - Bu::BitString bs( iBlocks, false ); - for( int j = 0; j < iBlocks; j++ ) - bs.setBit( j ); - for( IndexList::const_iterator i = lFreeBlocks.begin(); i; i++ ) - bs.setBit( *i, false ); - return bs; + Bu::MutexLocker l( mAccess ); + return (bool)(--iOpenCount); } diff --git a/src/stable/myriad.h b/src/stable/myriad.h index e63c29f..07b4a1d 100644 --- a/src/stable/myriad.h +++ b/src/stable/myriad.h @@ -1,24 +1,14 @@ -/* - * Copyright (C) 2007-2023 Xagasoft, All rights reserved. - * - * This file is part of the libbu++ library and is released under the - * terms of the license contained in the file LICENSE. - */ - #ifndef BU_MYRIAD_H #define BU_MYRIAD_H -#include -#include "bu/bitstring.h" +#include "bu/stream.h" #include "bu/exceptionbase.h" +#include "bu/mutex.h" #include "bu/array.h" #include "bu/hash.h" -#include "bu/mutex.h" -#include "bu/extratypes.h" namespace Bu { - class Stream; class MyriadStream; subExceptionDeclBegin( MyriadException ) @@ -31,212 +21,127 @@ namespace Bu noSuchStream, streamExists, invalidStreamId, - protectedStream + protectedStream, + invalidParameter, + invalidBackingStream, + badMode, }; subExceptionDeclEnd(); - /** - * Myriad block-allocated stream multiplexing system. This is a system for - * creating streams that contain other streams in a flexible and lightweight - * manner. Basically, you can create a file (or any other stream) that can - * store any number of flexible, growing streams. The streams within the - * Myriad stream are automatically numbered, not named. This works more - * or less like a filesystem, but without the extra layer for managing - * file and directory links. This would actually be very easy to add - * on top of Myriad, but is not required. - * - * Header format is as follows: - * - * MMMMvBssssSSSS* - * M = Magic number (0AD3FA84) - * v = version number - * B = Bits per int - * s = Blocksize in bytes - * S = Number of Streams - * - * The * represents the Stream headers, one per stream, as follows: - * IIIIssss$ - * I = Id number of the stream - * s = size of stream in bytes - * - * The $ represents the Block headers, one per used block, as follows: - * IIII - * I = Index of the block - * - * The stream/block data is interleaved in the header, so all blocks stored - * with one stream are together. The block headers are in order, and the - * data in them is required to be "solid" you cannot fill partial blocks - * mid-way through a stream. - * - * The initial block starts with the nids header, and is both the zero block - * and the zero stream. For now, the minimum block size is the size needed - * to store the base header, the zero stream header, and the first two - * blocks of the zero stream, so 30 bytes. Since it's reccomended to use - * a size that will fit evenly into filesystem blocks, then a size of 32 is - * probably the smallest reccomended size because all powers of two equal - * to or greater than 32 are evenly divisible by 32. - * - * I have had a thought that if the block size were smaller than 42 bytes - * the header would consume the first N blocks where N * block size is - * enough space to house the initial header, the first stream header, and - * the first N block headers. This, of course, causes you to hit an - * infinite header if the block size is small enough. - */ class Myriad { - friend class MyriadStream; public: - /** - * Create a Myriad object that uses the given stream to store data. - * This stream must be random access. The block size and preallocate - * values passed in are values that will be used if the given stream - * is empty. In that case the stream will be "formatted" for myriad - * with the specified block size. If there is already a viable Myriad - * format present in the stream, then the blocksize and preallocate - * values will be ignored and the values from the stream will be used - * instead. If the stream doesn't appear to be Myriad formatted an - * exception will be thrown. - */ - Myriad( Bu::Stream &sStore, int iBlockSize=512, int iPreallocate=8 ); - virtual ~Myriad(); - - /** - * Destroy whatever data may be in the base stream and create a new - * Myriad system there with the given blocksize. Use this with care, - * it will destroy anything that was already in the stream, and - * generally, should not ever have to be used. - */ - void initialize( int iBlockSize, int iPreAllocate=1 ); - - /** - * Create a new stream within the Myriad system. The ID of the new - * stream is returned. - */ - int createStream( int iPreAllocate=1 ); - - /** - * Create a new stream within the Myriad system with a given id. The - * id that you provide will be the new id of the stream unless it's - * already used, in which case an error is thrown. This is primarilly - * useful when copying an old Myriad file into a new one. - */ - int createStreamWithId( int iId, int iPreAllocate=1 ); - - /** - * Delete a stream that's already within the Myriad. - */ - void deleteStream( int iId ); + typedef int32_t StreamId; + enum Mode { + None = 0x00, + + // Flags + Read = 0x01, ///< Open file for reading + Write = 0x02, ///< Open file for writing + Create = 0x04, ///< Create file if it doesn't exist + Truncate = 0x08, ///< Truncate file if it does exist + Append = 0x10, ///< Always append on every write + NonBlock = 0x20, ///< Open file in non-blocking mode + Exclusive = 0x44, ///< Create file, if it exists then fail + + // Helpful mixes + ReadWrite = 0x03, ///< Open for reading and writing + WriteNew = 0x0E ///< Create a file (or truncate) for writing. + /// Same as Write|Create|Truncate + }; + public: /** - * Return a new Stream object assosiated with the given stream ID. + * Open existing Myriad stream, or initialize a new one if it doesn't + * exist. + * + * Myriad format V0 + * 0 - 3: Myriad_MAGIC_CODE (0ad3fa84) + * 4 - 4: Version Id (1) + * 5 - 5: Bits per integer (32) + * 6 - 9: Block size in bytes. + * 10 - 13: Number of streams. + * 14 - ...: Stream Data + * + * Stream Data: + * 0 - 3: Stream Id + * 4 - 7: Size of stream in bytes + * 8 - ...: List of blocks in stream (4 bytes per block */ - MyriadStream openStream( int iId ); - - Bu::Array getStreamIds(); - int getStreamSize( int iId ); - bool hasStream( int iId ); + Myriad( Bu::Stream &rBacking, int32_t iBlockSize=-1, int32_t iPreallocateBlocks=-1 ); + virtual ~Myriad(); - int getNumStreams(); - int getBlockSize(); - int getNumBlocks(); - int getNumUsedBlocks(); - Bu::size getTotalUsedBytes(); - Bu::size getTotalUnusedBytes(); - Bu::size getTotalUnusedBytes( int iFakeBlockSize ); + MyriadStream create( Mode eMode, int32_t iPreallocateBytes=-1 ); + MyriadStream open( StreamId iStream, Mode eMode ); + bool erase( StreamId iStream ); + bool setSize( StreamId iStream, int32_t iNewSize ); - /** - * Syncronize the header data, etc. with the storage stream. It's not - * a bad idea to call this periodically. - */ - void sync(); + private: + bool loadMyriad(); + void createMyriad( int32_t iBlockSize, int32_t iPreallocateBlocks ); + int32_t allocateBlock(); + void openStream( StreamId id ); + void closeStream( StreamId id ); /** - * Read the first few bytes from the given stream and return true/false - * depending on weather or not it's a Myriad stream. This will throw - * an exception if the stream is empty, or is not random access. - */ - static bool isMyriad( Bu::Stream &sStore, uint8_t &uVer ); - - /** - * Read the first few bytes from the given stream and return true/false - * depending on weather or not it's a Myriad stream. This will throw - * an exception if the stream is empty, or is not random access. + * Block restricted read, it will not read past the end of the block + * that iStart places it in. */ - static bool isMyriad( Bu::Stream &sStore ); + int32_t blockRead( int32_t iStart, void *pTarget, int32_t iSize ); - const Bu::BitString getBlocksUsed() const; - - private: + public: /** - * Initialize this object based on the data already in the assosiated - * stream. This will be called automatically for you if you forget, - * but if you want to pre-initialize for some reason, just call this - * once before you actually start doing anything with your Myriad. + * Bridge/communication/tracking class for individual Myriad streams. + * Not for general use, this is used by Myriad and MyriadStream to + * control access. */ - void initialize(); - - enum - { - blockUnused = 0xFFFFFFFFUL - }; - - typedef Bu::Array BlockArray; class Stream { - public: - void setSize( int iNewSize ); - void growTo( int iNewSize ); - int getSize() const; - - int iId; - BlockArray aBlocks; - + friend Bu::Myriad; private: - int iSize; - mutable Bu::Mutex mStream; - }; - typedef Bu::Array StreamArray; + Stream( Myriad &rParent, StreamId iStream, int32_t iSize ); + virtual ~Stream(); - class Block - { public: - char *pData; - bool bChanged; - int iBlockIndex; - }; - - void updateHeader(); - int findEmptyBlock(); - - /** - *@todo Change this to use a binary search, it's nicer. - */ - Stream *findStream( int iId ); + int32_t read( int32_t iStart, void *pTarget, int32_t iSize ); + + /** + * Doesn't actually open, just increments the open counter. + * If the open counter is non-zero then at least one stream has + * a lock on this stream. + */ + void open(); + + /** + * Doesn't actually close, just decrements the open counter. + *@returns true if there are still handles open, false if no + * streams have a lock. + */ + bool close(); - Block *getBlock( int iBlock ); - void releaseBlock( Block *pBlock ); - void syncBlock( Block *pBlock ); + private: + mutable Bu::Mutex mAccess; + Myriad &rParent; + StreamId iStream; + int32_t iSize; + Bu::Array aBlocks; + int32_t iOpenCount; + bool bStructureChanged; + }; - int streamAddBlock( Stream *pStream ); - void setStreamSize( Stream *pStream, long iSize ); + private: - void headerChanged(); + typedef Bu::Hash StreamHash; - private: - Bu::Stream &sStore; - int iBlockSize; - int iBlocks; - int iUsed; - typedef Bu::List IndexList; + typedef Bu::List IndexList; + Bu::Mutex mAccess; + mutable Bu::Mutex mBacking; + Bu::Stream &rBacking; + int32_t iBlockSize; + int32_t iBlockCount; + bool bIsNewStream; + StreamHash hStream; IndexList lFreeBlocks; -// Bu::BitString bsBlockUsed; - StreamArray aStreams; - typedef Bu::Hash BlockHash; - BlockHash hActiveBlocks; - bool bHeaderChanged; - - Bu::Mutex mHeader; - Bu::Mutex mActiveBlocks; }; }; diff --git a/src/stable/myriadstream.cpp b/src/stable/myriadstream.cpp index 3c78bb0..cbbd4fe 100644 --- a/src/stable/myriadstream.cpp +++ b/src/stable/myriadstream.cpp @@ -1,254 +1,79 @@ -/* - * Copyright (C) 2007-2023 Xagasoft, All rights reserved. - * - * This file is part of the libbu++ library and is released under the - * terms of the license contained in the file LICENSE. - */ - #include "bu/myriadstream.h" -#include - -// #define MYRIAD_STREAM_DEBUG 1 - -#ifdef MYRIAD_STREAM_DEBUG -#include "bu/sio.h" - -using Bu::sio; -using Bu::Fmt; -#endif -#include "bu/sio.h" - -// #define TRACE( x ) Bu::println("%1:%2: %3: %4 - %5").arg(__FILE__).arg( __LINE__ ).arg(__PRETTY_FUNCTION__).arg(rMyriad.sStore.getLocation()).arg(x) -#define TRACE( x ) (void)0 +#include "bu/mutexlocker.h" Bu::MyriadStream::MyriadStream( Bu::Myriad &rMyriad, - Bu::Myriad::Stream *pStream ) : + Bu::Myriad::Stream *pStream, Bu::Myriad::Mode eMode ) : rMyriad( rMyriad ), pStream( pStream ), - pCurBlock( NULL ), - iPos( 0 ) + eMode( eMode ) { -#ifdef MYRIAD_STREAM_DEBUG - sio << "MyriadStream: " << __LINE__ << ": Created, iId=" << pStream->iId << ", iSize=" - << pStream->iSize << sio.nl; -#endif - //pCurBlock = rMyriad.newBlock(); - //rMyriad.getBlock( uStream, pCurBlock ); - //uSize = pCurBlock->uBytesUsed; + if( (eMode&Bu::Myriad::ReadWrite) == 0 ) + { + throw Bu::MyriadException( Bu::MyriadException::invalidParameter, + "MyriadStream must be opened Read or Write or both."); + } + Bu::MutexLocker l( mAccess ); + pStream->open(); } Bu::MyriadStream::~MyriadStream() { - if( pCurBlock ) - rMyriad.releaseBlock( pCurBlock ); - //rMyriad.updateStreamSize( uStream, uSize ); - //rMyriad.deleteBlock( pCurBlock ); + close(); } void Bu::MyriadStream::close() { -} - -Bu::size Bu::MyriadStream::read( void *pBuf, Bu::size nBytes ) -{ -#ifdef MYRIAD_STREAM_DEBUG - sio << "MyriadStream: read: " << __LINE__ << ": Started, asked to read " << nBytes << "b." - << sio.nl; -#endif - if( nBytes > (Bu::size)pStream->getSize()-iPos ) - nBytes = pStream->getSize()-iPos; - if( nBytes <= 0 ) - return 0; - int iLeft = nBytes; -#ifdef MYRIAD_STREAM_DEBUG - sio << "MyriadStream: read: " << __LINE__ << ": Started, going to read " << nBytes << "b." - << sio.nl; -#endif - if( pCurBlock == NULL ) + Bu::MutexLocker l( mAccess ); + if( eMode ) { -#ifdef MYRIAD_STREAM_DEBUG - sio << "MyriadStream: read: " << __LINE__ << ": No block loaded, loading initial block." - << sio.nl; -#endif - pCurBlock = rMyriad.getBlock( - pStream->aBlocks[iPos/rMyriad.iBlockSize] - ); + pStream->close(); + eMode = Bu::Myriad::None; } - while( iLeft > 0 ) - { - int iCurBlock = pStream->aBlocks[iPos/rMyriad.iBlockSize]; - if( pCurBlock->iBlockIndex != iCurBlock ) - { -#ifdef MYRIAD_STREAM_DEBUG - sio << "MyriadStream: read: " << __LINE__ << ": Loading new block " << iCurBlock << "." - << sio.nl; -#endif - rMyriad.releaseBlock( pCurBlock ); - pCurBlock = rMyriad.getBlock( iCurBlock ); - } +} - int iAmnt = Bu::buMin( - Bu::buMin( - rMyriad.iBlockSize - iPos%rMyriad.iBlockSize, - iLeft - ), - pStream->getSize()-iPos - ); -#ifdef MYRIAD_STREAM_DEBUG - sio << "MyriadStream: read: " << __LINE__ << ": Copying out bytes: " - << iPos << "(" << (iPos%rMyriad.iBlockSize) << ")+" - << iAmnt - << ", " << iLeft << "b left." << sio.nl; -#endif - memcpy( - pBuf, - pCurBlock->pData+(iPos%rMyriad.iBlockSize), - iAmnt - ); - iPos += iAmnt; - pBuf = &((char *)pBuf)[iAmnt]; - iLeft -= iAmnt; - } - return nBytes; +Bu::size Bu::MyriadStream::read( void *pBuf, size iBytes ) +{ } -Bu::size Bu::MyriadStream::write( const void *pBuf, Bu::size nBytes ) +Bu::String Bu::MyriadStream::readLine() { - if( nBytes <= 0 ) - return 0; +} -#ifdef MYRIAD_STREAM_DEBUG - sio << "MyriadStream: write: " << __LINE__ << ": Started, asked to write " << nBytes << "b." - << sio.nl; -#endif - if( nBytes <= 0 ) - return 0; - int iLeft = nBytes; - /* - if( pCurBlock == NULL ) - { -#ifdef MYRIAD_STREAM_DEBUG - sio << "MyriadStream: write: No block loaded, loading initial block." - << sio.nl; -#endif - pCurBlock = rMyriad.getBlock( - pStream->aBlocks[iPos/rMyriad.iBlockSize] - ); - }*/ - - while( iLeft > 0 ) - { - int iCurBlock; - if( iPos/rMyriad.iBlockSize < pStream->aBlocks.getSize() ) - { - iCurBlock = pStream->aBlocks[iPos/rMyriad.iBlockSize]; - } - else - { - iCurBlock = rMyriad.streamAddBlock( pStream ); -#ifdef MYRIAD_STREAM_DEBUG - sio << "MyriadStream: write: " << __LINE__ << ": New block allocated and appended: " - << iCurBlock << "." << sio.nl; +Bu::String Bu::MyriadStream::readAll() +{ +} -#endif - } - if( !pCurBlock || pCurBlock->iBlockIndex != iCurBlock ) - { -#ifdef MYRIAD_STREAM_DEBUG - sio << "MyriadStream: write: " << __LINE__ << ": Loading new block " << iCurBlock << "." - << sio.nl; -#endif - rMyriad.releaseBlock( pCurBlock ); - pCurBlock = rMyriad.getBlock( iCurBlock ); - } - pCurBlock->bChanged = true; - - // There are two main writing modes when it comes down to it. - // Overwrite mode and append mode. Append is what pretty much always - // happens when creating a new stream. - if( iPos < pStream->getSize() ) - { - int iAmnt = Bu::buMin( - Bu::buMin( - rMyriad.iBlockSize - iPos%rMyriad.iBlockSize, - iLeft - ), - pStream->getSize()-iPos - ); -#ifdef MYRIAD_STREAM_DEBUG - sio << "MyriadStream: write (ovr): " << __LINE__ << ": Copying in bytes: " - << (iPos%rMyriad.iBlockSize) << "+" - << iAmnt - << ", " << iLeft << "b left." << sio.nl; -#endif - memcpy( - pCurBlock->pData+(iPos%rMyriad.iBlockSize), - pBuf, - iAmnt - ); - iPos += iAmnt; - pBuf = &((char *)pBuf)[iAmnt]; - iLeft -= iAmnt; - } - else - { - int iAmnt = Bu::buMin( - rMyriad.iBlockSize - iPos%rMyriad.iBlockSize, - iLeft - ); -#ifdef MYRIAD_STREAM_DEBUG - sio << "MyriadStream: write (app): " << __LINE__ << ": Copying in bytes: " - << (iPos%rMyriad.iBlockSize) << "+" - << iAmnt - << ", " << iLeft << "b left." << sio.nl; -#endif - memcpy( - pCurBlock->pData+(iPos%rMyriad.iBlockSize), - pBuf, - iAmnt - ); - iPos += iAmnt; - TRACE(Bu::String("Stream=%1 - pStream->iSize(%2) += iAmnt(%3)").arg(pStream->iId).arg( pStream->getSize() ).arg(iAmnt).end()); - pStream->growTo( pStream->getSize()+iAmnt ); - TRACE(Bu::String("Stream=%1 - pStream->iSize = %2").arg(pStream->iId).arg( pStream->getSize() ).end()); - rMyriad.headerChanged(); - pBuf = &((char *)pBuf)[iAmnt]; - iLeft -= iAmnt; - } - } +Bu::size Bu::MyriadStream::write( const void *pBuf, size iBytes ) +{ +} - return nBytes; +Bu::size Bu::MyriadStream::write( const Bu::String &sBuf ) +{ } Bu::size Bu::MyriadStream::tell() { - return iPos; } -void Bu::MyriadStream::seek( Bu::size offset ) +void Bu::MyriadStream::seek( size offset ) { - iPos += offset; } -void Bu::MyriadStream::setPos( Bu::size pos ) +void Bu::MyriadStream::setPos( size pos ) { - iPos = pos; } -void Bu::MyriadStream::setPosEnd( Bu::size pos ) +void Bu::MyriadStream::setPosEnd( size pos ) { - iPos = pStream->getSize()-pos; } bool Bu::MyriadStream::isEos() { - return iPos >= pStream->getSize(); } bool Bu::MyriadStream::isOpen() { - return true; } void Bu::MyriadStream::flush() @@ -257,59 +82,45 @@ void Bu::MyriadStream::flush() bool Bu::MyriadStream::canRead() { - return true; } bool Bu::MyriadStream::canWrite() { - return true; } bool Bu::MyriadStream::isReadable() { - return true; } bool Bu::MyriadStream::isWritable() { - return true; } bool Bu::MyriadStream::isSeekable() { - return true; } bool Bu::MyriadStream::isBlocking() { - return true; } -void Bu::MyriadStream::setBlocking( bool /*bBlocking*/ ) +void Bu::MyriadStream::setBlocking( bool bBlocking ) { } -void Bu::MyriadStream::setSize( Bu::size iSize ) +void Bu::MyriadStream::setSize( size iSize ) { - if( iSize < 0 ) - iSize = 0; - rMyriad.setStreamSize( pStream, iSize ); - if( iPos > iSize ) - iPos = iSize; } Bu::size Bu::MyriadStream::getSize() const { - return pStream->getSize(); } Bu::size Bu::MyriadStream::getBlockSize() const { - return rMyriad.getBlockSize(); } -Bu::String Bu::MyriadStream::getLocation() const +Bu::String getLocation() const { - return Bu::String("%1").arg( pStream->iId ); } diff --git a/src/stable/myriadstream.h b/src/stable/myriadstream.h index a94a9a2..87192a9 100644 --- a/src/stable/myriadstream.h +++ b/src/stable/myriadstream.h @@ -1,38 +1,31 @@ -/* - * Copyright (C) 2007-2023 Xagasoft, All rights reserved. - * - * This file is part of the libbu++ library and is released under the - * terms of the license contained in the file LICENSE. - */ - #ifndef BU_MYRIAD_STREAM_H #define BU_MYRIAD_STREAM_H #include "bu/stream.h" +#include "bu/myriadstream.h" #include "bu/myriad.h" namespace Bu { class MyriadStream : public Bu::Stream { - friend class Myriad; + friend class Myriad; private: - /** - * These can only be created by the Myriad class. - */ - MyriadStream( Myriad &rMyriad, Myriad::Stream *pStream ); - - public: + MyriadStream( Bu::Myriad &rMyriad, Bu::Myriad::Stream *pStream, + Bu::Myriad::Mode eMode ); virtual ~MyriadStream(); - + + public: virtual void close(); - virtual Bu::size read( void *pBuf, Bu::size nBytes ); - virtual Bu::size write( const void *pBuf, Bu::size nBytes ); - using Stream::write; - virtual Bu::size tell(); - virtual void seek( Bu::size offset ); - virtual void setPos( Bu::size pos ); - virtual void setPosEnd( Bu::size pos ); + virtual size read( void *pBuf, size iBytes ); + virtual Bu::String readLine(); + virtual Bu::String readAll(); + virtual size write( const void *pBuf, size iBytes ); + virtual size write( const Bu::String &sBuf ); + virtual size tell(); + virtual void seek( size offset ); + virtual void setPos( size pos ); + virtual void setPosEnd( size pos ); virtual bool isEos(); virtual bool isOpen(); virtual void flush(); @@ -43,18 +36,16 @@ namespace Bu virtual bool isSeekable(); virtual bool isBlocking(); virtual void setBlocking( bool bBlocking=true ); - virtual void setSize( Bu::size iSize ); - + virtual void setSize( size iSize ); virtual size getSize() const; virtual size getBlockSize() const; virtual Bu::String getLocation() const; private: - Myriad &rMyriad; - Myriad::Stream *pStream; - Myriad::Block *pCurBlock; - int iBlockSize; - int iPos; + mutable Bu::Mutex mAccess; + Bu::Myriad &rMyriad; + Bu::Myriad::Stream *pStream; + Bu::Myriad::Mode eMode; }; }; -- cgit v1.2.3