From 0886ad4f53deb8e148f87f77b9e7ff690c02b069 Mon Sep 17 00:00:00 2001 From: Mike Buland Date: Wed, 28 Aug 2024 11:45:51 -0700 Subject: Most of the new myriad api is in. Still to go: bootstrapping reading the initial header, saving the header, growing streams as we write? --- src/stable/myriad.cpp | 143 +++++++++++++++++++++++++++++++++++++++++--- src/stable/myriad.h | 24 +++++++- src/stable/myriadstream.cpp | 72 +++++++++++++++++----- src/stable/myriadstream.h | 4 +- 4 files changed, 216 insertions(+), 27 deletions(-) diff --git a/src/stable/myriad.cpp b/src/stable/myriad.cpp index 5278ac5..f3ff09a 100644 --- a/src/stable/myriad.cpp +++ b/src/stable/myriad.cpp @@ -81,6 +81,14 @@ bool Bu::Myriad::setSize( Bu::Myriad::StreamId /*iStream*/, return false; } +Bu::String Bu::Myriad::getLocation() const +{ + Bu::MutexLocker l( mAccess ); + Bu::MutexLocker l2( mBacking ); + return Bu::String("myriad(%1,%2):%3") + .arg( 1 ).arg( iBlockSize ).arg( rBacking.getLocation() ); +} + bool Bu::Myriad::loadMyriad() { Bu::println("Load myriad!"); @@ -264,6 +272,32 @@ int32_t Bu::Myriad::allocateBlock() } } +void Bu::Myriad::releaseBlock( int32_t iBlockId, bool bBlank ) +{ + Bu::MutexLocker l( mAccess ); + lFreeBlocks.append( iBlockId ); + if( bBlank ) + { + blankBlock( iBlockId ); + } +} + +void Bu::Myriad::blankBlock( int32_t iBlockId ) +{ + Bu::MutexLocker l( mBacking ); + rBacking.setPos( iBlockId*iBlockSize ); + int32_t iChunk = std::min( iBlockSize, 4096 ); + uint8_t *pChunk = new uint8_t[iChunk]; + memset( pChunk, 0, iChunk ); + int iLeft = iBlockSize; + while( iLeft > 0 ) + { + int32_t iWrite = rBacking.write( pChunk, std::min( iChunk, iLeft ) ); + iLeft -= iWrite; + } + delete[] pChunk; +} + void Bu::Myriad::openStream( StreamId id ) { Bu::MutexLocker l( mAccess ); @@ -276,20 +310,36 @@ void Bu::Myriad::closeStream( StreamId id ) hStream.get( id )->close(); } -int32_t Bu::Myriad::blockRead( int32_t iStart, void *pTarget, int32_t iSize ) +int32_t Bu::Myriad::blockRead( int32_t iBlock, int32_t iStart, + void *pTarget, int32_t iSize ) { int32_t iUpperSize = iBlockSize - (iStart%iBlockSize); - Bu::println("Max size within block: %1 vs %2 (start=%3, blocksize=%4)") + Bu::println("Max read within block: %1 vs %2 (start=%3, blocksize=%4)") .arg( iUpperSize ).arg( iSize ) .arg( iStart ).arg( iBlockSize ); int32_t iAmnt = std::min( iSize, iUpperSize ); Bu::MutexLocker l( mBacking ); - rBacking.setPos( iStart ); + rBacking.setPos( iBlockSize*iBlock + iStart ); return rBacking.read( pTarget, iAmnt ); } +int32_t Bu::Myriad::blockWrite( int32_t iBlock, int32_t iStart, + const void *pTarget, int32_t iSize ) +{ + int32_t iUpperSize = iBlockSize - (iStart%iBlockSize); + Bu::println("Max write within block: %1 vs %2 (start=%3, blocksize=%4)") + .arg( iUpperSize ).arg( iSize ) + .arg( iStart ).arg( iBlockSize ); + + int32_t iAmnt = std::min( iSize, iUpperSize ); + Bu::MutexLocker l( mBacking ); + rBacking.setPos( iBlock*iBlockSize + iStart ); + + return rBacking.write( pTarget, iAmnt ); +} + ///////// // Bu::Myriad::Stream // @@ -308,22 +358,71 @@ Bu::Myriad::Stream::~Stream() { } +int32_t Bu::Myriad::Stream::getSize() const +{ + Bu::MutexLocker l( mAccess ); + return iSize; +} + +int32_t Bu::Myriad::Stream::getBlockSize() const +{ + Bu::MutexLocker l( mAccess ); + return rParent.iBlockSize; +} + +Bu::Myriad::StreamId Bu::Myriad::Stream::getStreamId() const +{ + return iStream; +} + +int32_t Bu::Myriad::Stream::getOpenCount() const +{ + Bu::MutexLocker l( mAccess ); + return iOpenCount; +} + +void Bu::Myriad::Stream::setSize( int32_t iNewSize ) +{ + // Two possible modes, shrink or grow. + Bu::MutexLocker l( mAccess ); + if( iNewSize < iSize ) + { + // Shrink it + int iNewBlocks = Bu::blkDiv( iNewSize, rParent.iBlockSize ); + while( aBlocks.getSize() > iNewBlocks ) + { + rParent.releaseBlock( aBlocks.last(), false ); + aBlocks.eraseLast(); + } + iSize = iNewSize; + } + else if( iNewSize > iSize ) + { + // Grow it + int iNewBlocks = Bu::blkDiv( iNewSize, rParent.iBlockSize ); + while( aBlocks.getSize() < iNewBlocks ) + { + aBlocks.append( rParent.allocateBlock() ); + } + iSize = iNewSize; + } +} + int32_t Bu::Myriad::Stream::read( int32_t iStart, void *pTarget, int32_t iSize ) { - int32_t iPos = iStart; int32_t iRead = 0; Bu::MutexLocker l( mAccess ); - while( iStart > 0 ) + while( iSize > 0 ) { int32_t iBlock = aBlocks[iStart/rParent.iBlockSize]; - int32_t iOffset = iPos % rParent.iBlockSize; int32_t iChunkRead = rParent.blockRead( - iBlock*rParent.iBlockSize+iOffset, pTarget, iSize + iBlock, iStart%rParent.iBlockSize, pTarget, iSize ); if( iChunkRead == 0 ) break; iRead += iChunkRead; + iStart += iChunkRead; reinterpret_cast(pTarget) += iChunkRead; iSize -= iChunkRead; } @@ -331,6 +430,36 @@ int32_t Bu::Myriad::Stream::read( int32_t iStart, void *pTarget, return iRead; } +int32_t Bu::Myriad::Stream::write( int32_t iStart, const void *pTarget, + int32_t iSize ) +{ + int32_t iWrite = 0; + Bu::MutexLocker l( mAccess ); + while( iSize > 0 ) + { + int32_t iBlockIdx = iStart/rParent.iBlockSize; + int32_t iBlock = aBlocks[iBlockIdx]; + int32_t iChunkWrite = rParent.blockWrite( + iBlock, iStart%rParent.iBlockSize, pTarget, iSize + ); + if( iChunkWrite == 0 ) + break; + iWrite += iChunkWrite; + iStart += iChunkWrite; + reinterpret_cast(pTarget) += iChunkWrite; + iSize -= iChunkWrite; + } + + return iWrite; +} + +Bu::String Bu::Myriad::Stream::getLocation() const +{ + Bu::MutexLocker l( mAccess ); + return Bu::String("%1:stream %2")\ + .arg( rParent.getLocation() ).arg( iStream ); +} + void Bu::Myriad::Stream::open() { Bu::MutexLocker l( mAccess ); diff --git a/src/stable/myriad.h b/src/stable/myriad.h index 07b4a1d..6d99ee4 100644 --- a/src/stable/myriad.h +++ b/src/stable/myriad.h @@ -75,11 +75,14 @@ namespace Bu MyriadStream open( StreamId iStream, Mode eMode ); bool erase( StreamId iStream ); bool setSize( StreamId iStream, int32_t iNewSize ); + Bu::String getLocation() const; private: bool loadMyriad(); void createMyriad( int32_t iBlockSize, int32_t iPreallocateBlocks ); int32_t allocateBlock(); + void releaseBlock( int32_t iBlockId, bool bBlank=true ); + void blankBlock( int32_t iBlockId ); void openStream( StreamId id ); void closeStream( StreamId id ); @@ -87,7 +90,16 @@ namespace Bu * Block restricted read, it will not read past the end of the block * that iStart places it in. */ - int32_t blockRead( int32_t iStart, void *pTarget, int32_t iSize ); + int32_t blockRead( int32_t iBlock, int32_t iStart, + void *pTarget, int32_t iSize ); + + /** + * Block restricted write, it will not write past the end of the block + * that iStart places it in. If this returns a non-zero number it's an + * indication that you need to allocate a new block. + */ + int32_t blockWrite( int32_t iBlock, int32_t iStart, + const void *pTarget, int32_t iSize ); public: /** @@ -103,7 +115,15 @@ namespace Bu virtual ~Stream(); public: + int32_t getSize() const; + int32_t getBlockSize() const; + StreamId getStreamId() const; + int32_t getOpenCount() const; + + void setSize( int32_t iNewSize ); int32_t read( int32_t iStart, void *pTarget, int32_t iSize ); + int32_t write( int32_t iStart, const void *pTarget, int32_t iSize ); + Bu::String getLocation() const; /** * Doesn't actually open, just increments the open counter. @@ -134,7 +154,7 @@ namespace Bu typedef Bu::Hash StreamHash; typedef Bu::List IndexList; - Bu::Mutex mAccess; + mutable Bu::Mutex mAccess; mutable Bu::Mutex mBacking; Bu::Stream &rBacking; int32_t iBlockSize; diff --git a/src/stable/myriadstream.cpp b/src/stable/myriadstream.cpp index cbbd4fe..9ea2e17 100644 --- a/src/stable/myriadstream.cpp +++ b/src/stable/myriadstream.cpp @@ -6,7 +6,8 @@ Bu::MyriadStream::MyriadStream( Bu::Myriad &rMyriad, Bu::Myriad::Stream *pStream, Bu::Myriad::Mode eMode ) : rMyriad( rMyriad ), pStream( pStream ), - eMode( eMode ) + eMode( eMode ), + iPos( 0 ) { if( (eMode&Bu::Myriad::ReadWrite) == 0 ) { @@ -15,6 +16,20 @@ Bu::MyriadStream::MyriadStream( Bu::Myriad &rMyriad, } Bu::MutexLocker l( mAccess ); pStream->open(); + + if( (eMode&Bu::Myriad::Write) != 0 ) + { + // Writing mode, what other options do we deal with? + if( (eMode&Bu::Myriad::Truncate) != 0 ) + { + // Truncate, set size to zero before starting. + pStream->setSize( 0 ); + } + else if( (eMode&Bu::Myriad::Append) != 0 ) + { + iPos = pStream->getSize(); + } + } } Bu::MyriadStream::~MyriadStream() @@ -34,93 +49,120 @@ void Bu::MyriadStream::close() Bu::size Bu::MyriadStream::read( void *pBuf, size iBytes ) { -} - -Bu::String Bu::MyriadStream::readLine() -{ -} - -Bu::String Bu::MyriadStream::readAll() -{ + Bu::MutexLocker l( mAccess ); + int32_t iRead = pStream->read( iPos, pBuf, iBytes ); + iPos += iRead; + return iRead; } Bu::size Bu::MyriadStream::write( const void *pBuf, size iBytes ) { -} - -Bu::size Bu::MyriadStream::write( const Bu::String &sBuf ) -{ + Bu::MutexLocker l( mAccess ); + int32_t iWrite = pStream->write( iPos, pBuf, iBytes ); + iPos += iWrite; + return iWrite; } Bu::size Bu::MyriadStream::tell() { + Bu::MutexLocker l( mAccess ); + return iPos; } void Bu::MyriadStream::seek( size offset ) { + Bu::MutexLocker l( mAccess ); + iPos += offset; } void Bu::MyriadStream::setPos( size pos ) { + Bu::MutexLocker l( mAccess ); + iPos = pos; } void Bu::MyriadStream::setPosEnd( size pos ) { + Bu::MutexLocker l( mAccess ); + iPos = pStream->getSize()-pos; } bool Bu::MyriadStream::isEos() { + Bu::MutexLocker l( mAccess ); + return iPos == pStream->getSize(); } bool Bu::MyriadStream::isOpen() { + return true; } void Bu::MyriadStream::flush() { + // Does this make sense? } bool Bu::MyriadStream::canRead() { + return true; } bool Bu::MyriadStream::canWrite() { + return true; } bool Bu::MyriadStream::isReadable() { + Bu::MutexLocker l( mAccess ); + return (eMode&Bu::Myriad::Read) != 0; } bool Bu::MyriadStream::isWritable() { + Bu::MutexLocker l( mAccess ); + return (eMode&Bu::Myriad::Write) != 0; } bool Bu::MyriadStream::isSeekable() { + return true; } bool Bu::MyriadStream::isBlocking() { + return true; } -void Bu::MyriadStream::setBlocking( bool bBlocking ) +void Bu::MyriadStream::setBlocking( bool /*bBlocking*/ ) { + // Dunno what this would even mean here. } void Bu::MyriadStream::setSize( size iSize ) { + Bu::MutexLocker l( mAccess ); + pStream->setSize( iSize ); + if( iPos > iSize ) + iPos = iSize; } Bu::size Bu::MyriadStream::getSize() const { + Bu::MutexLocker l( mAccess ); + return pStream->getSize(); } Bu::size Bu::MyriadStream::getBlockSize() const { + Bu::MutexLocker l( mAccess ); + return pStream->getBlockSize(); } -Bu::String getLocation() const +Bu::String Bu::MyriadStream::getLocation() const { + Bu::MutexLocker l( mAccess ); + return pStream->getLocation(); } diff --git a/src/stable/myriadstream.h b/src/stable/myriadstream.h index 87192a9..c01cf34 100644 --- a/src/stable/myriadstream.h +++ b/src/stable/myriadstream.h @@ -18,10 +18,7 @@ namespace Bu public: virtual void close(); virtual size read( void *pBuf, size iBytes ); - virtual Bu::String readLine(); - virtual Bu::String readAll(); virtual size write( const void *pBuf, size iBytes ); - virtual size write( const Bu::String &sBuf ); virtual size tell(); virtual void seek( size offset ); virtual void setPos( size pos ); @@ -46,6 +43,7 @@ namespace Bu Bu::Myriad &rMyriad; Bu::Myriad::Stream *pStream; Bu::Myriad::Mode eMode; + int32_t iPos; }; }; -- cgit v1.2.3