From fc2943ed980306244749d8d13796eaff690917b6 Mon Sep 17 00:00:00 2001 From: Mike Buland Date: Mon, 12 Apr 2010 17:50:37 +0000 Subject: Wow Myriad!! Myriad seems to work. I have to run it through a few more paces, and there are some known corner cases that I may just disallow, such as too-small block sizes. Beyond a little more testing, it's ready for production. I may switch some of my cache tests to using it now. --- src/myriad.cpp | 121 ++++++++++++++++++++++++-------- src/myriadstream.cpp | 191 +++++++++++++++++++++++++++++++++------------------ src/server.cpp | 12 +++- src/tools/myriad.cpp | 49 +++++++++++-- 4 files changed, 269 insertions(+), 104 deletions(-) diff --git a/src/myriad.cpp b/src/myriad.cpp index a1a5c38..58270c5 100644 --- a/src/myriad.cpp +++ b/src/myriad.cpp @@ -78,10 +78,6 @@ void Bu::Myriad::initialize() sio << "Myriad: iSize=" << iSize << ", iBlockSize=" << iBlockSize << ", iBlocks=" << iBlocks << ", iStreams=" << iStreams << sio.nl; - // Don't do this, just read the damn header. - sio << "Myriad: Don't do this, just read the damn header (line 82)" - << sio.nl; - int iHeaderSize = 14 + 8 + 4; int iHeaderBlocks = 0; //blkDiv( iHeaderSize+4, iBlockSize ); @@ -93,30 +89,38 @@ void Bu::Myriad::initialize() sio << "Myriad: iHeaderSize=" << iHeaderSize << ", iHeaderBlocks=" << iHeaderBlocks << sio.nl; + + Stream *pFakeHdr = new Stream; + pFakeHdr->iId = 0; + pFakeHdr->iSize = iHeaderSize; + for( int j = 0; j < iHeaderBlocks; j++ ) + { + pFakeHdr->aBlocks.append( j ); + } bsBlockUsed.setSize( iBlocks, true ); bool bCanSkip = false; // Can skip around, post initial header stream i/o + MyriadStream *pIn = new MyriadStream( *this, pFakeHdr ); + pIn->setPos( sStore.tell() ); for( int j = 0; j < iStreams; j++ ) { - int iHdrBlock = 0; - int iCurBlock = 0; aStreams.append( new Stream() ); Stream &s = *aStreams[j]; - sStore.read( &s.iId, 4 ); - sStore.read( &s.iSize, 4 ); + pIn->read( &s.iId, 4 ); + pIn->read( &s.iSize, 4 ); int iSBlocks = blkDiv(s.iSize, iBlockSize); sio << "Myriad: - Stream::iId=" << s.iId << ", Stream::iSize=" << s.iSize << ", Stream::aBlocks=" << iSBlocks - << ", sStore.tell()=" << sStore.tell() << sio.nl; + << ", pIn->tell()=" << pIn->tell() << sio.nl; for( int k = 0; k < iSBlocks; k++ ) { int iBId; - sStore.read( &iBId, 4 ); + pIn->read( &iBId, 4 ); sio << "Myriad: - iBId=" << iBId << ", iStartPos=" << iBId*iBlockSize - << ", sStore.tell()=" << sStore.tell() << sio.nl; + << ", pIn->tell()=" << pIn->tell() << sio.nl; s.aBlocks.append( iBId ); bsBlockUsed.setBit( iBId ); if( (j == 0 && k == iHeaderBlocks-1) ) @@ -124,25 +128,18 @@ void Bu::Myriad::initialize() sio << "Myriad: - End of prepartition, unlocking skipping." << sio.nl; bCanSkip = true; - iCurBlock = blkDiv( (int)sStore.tell(), iBlockSize ); - } - if( bCanSkip && sStore.tell() >= iCurBlock*iBlockSize+iBlockSize ) - { - iHdrBlock++; - iCurBlock = aStreams[0]->aBlocks[iHdrBlock]; - sio << "Myriad: Ran out of data in block, finding next header " - "block: " << iHdrBlock << " = " << iCurBlock << " (" - << iCurBlock*iBlockSize << "b)" << sio.nl; - sStore.setPos( iCurBlock*iBlockSize ); + MyriadStream *pTmp = new MyriadStream( *this, aStreams[0] ); + sio << "Myriad - Position = " << pIn->tell() << sio.nl; + pTmp->setPos( pIn->tell() ); + delete pIn; + delete pFakeHdr; + pIn = pTmp; } } } + delete pIn; sio << "Myriad: Blocks used: " << bsBlockUsed.toString() << sio.nl; - - //printf("%d blocks, %db each, %db block offset\n", - // iBlocks, iBlockSize, iBlockStart ); - } void Bu::Myriad::initialize( int iBlockSize, int iPreAllocate ) @@ -211,6 +208,14 @@ void Bu::Myriad::initialize( int iBlockSize, int iPreAllocate ) pStr->iSize = sStore.tell(); sio << "Myriad: Actual end of header stream = " << pStr->iSize << sio.nl; + pStr->iSize = iHeaderSize; + for( int j = 0; j < iHeaderBlocks; j++ ) + { + pStr->aBlocks.append( j ); + } + + aStreams.append( pStr ); + //hStreams.insert( 0, BlockArray( 0 ) ); } @@ -219,9 +224,64 @@ void Bu::Myriad::updateHeader() if( !sStore.canWrite() ) return; - + char cBuf; + int iBuf; - // TODO: Use the stream class to access this really smoothly, I hope :) + // Compute the new size of the header. + int iHeaderSize = 14 + 8*aStreams.getSize(); + sio << "Myriad: updateHeader: aStreams.getSize() = " << aStreams.getSize() + << sio.nl; + for( StreamArray::iterator i = aStreams.begin(); i; i++ ) + { + iHeaderSize += 4*(*i)->aBlocks.getSize(); + sio << "Myriad: updateHeader: (*i)->aBlocks.getSize() = " + << (*i)->aBlocks.getSize() << sio.nl; + } + int iNewBlocks = blkDiv( iHeaderSize, iBlockSize ); + while( iNewBlocks > aStreams[0]->aBlocks.getSize() ) + { + int iBlock = findEmptyBlock(); + sio << "Myriad: updateHeader: Appending block " << iBlock + << " to header." << sio.nl; + aStreams[0]->aBlocks.append( iBlock ); + bsBlockUsed.setBit( iBlock ); + iHeaderSize += 4; + iNewBlocks = blkDiv( iHeaderSize, iBlockSize ); + } + aStreams[0]->iSize = iHeaderSize; + sio << "Myriad: updateHeader: iHeaderSize=" << iHeaderSize + << ", iNewBlocks=" << iNewBlocks << ", curBlocks=" + << aStreams[0]->aBlocks.getSize() << sio.nl; + + MyriadStream sHdr( *this, aStreams[0] ); + sHdr.write( Myriad_MAGIC_CODE, 4 ); + + // Version (1) + cBuf = 1; + sHdr.write( &cBuf, 1 ); + + // Bits per int + cBuf = 32; + sHdr.write( &cBuf, 1 ); + + // The size of each block + sHdr.write( &iBlockSize, 4 ); + + iBuf = aStreams.getSize(); + // The number of streams + sHdr.write( &iBuf, 4 ); + + for( StreamArray::iterator i = aStreams.begin(); i; i++ ) + { + sHdr.write( &(*i)->iId, 4 ); + sHdr.write( &(*i)->iSize, 4 ); + int iUsedBlocks = blkDiv( (*i)->iSize, iBlockSize ); +// for( BlockArray::iterator j = (*i)->aBlocks.begin(); j; j++ ) + for( int j = 0; j < iUsedBlocks; j++ ) + { + sHdr.write( &(*i)->aBlocks[j], 4 ); + } + } } int Bu::Myriad::createStream( int iPreAllocate ) @@ -231,6 +291,7 @@ int Bu::Myriad::createStream( int iPreAllocate ) sio << "Myriad: New stream id=" << pStr->iId << ", iPreAllocate=" << iPreAllocate << sio.nl; pStr->iSize = 0; + aStreams.append( pStr ); for( int j = 0; j < iPreAllocate; j++ ) { @@ -240,7 +301,7 @@ int Bu::Myriad::createStream( int iPreAllocate ) bsBlockUsed.setBit( iFreeBlock ); } - return 0; + return pStr->iId; } int Bu::Myriad::findEmptyBlock() @@ -260,7 +321,7 @@ int Bu::Myriad::findEmptyBlock() sStore.write( pBlock, iBlockSize ); delete pBlock; - return iBlockSize++; + return iBlocks++; } void Bu::Myriad::deleteStream( int /*iID*/ ) @@ -314,6 +375,8 @@ Bu::Myriad::Block *Bu::Myriad::getBlock( int iBlock ) void Bu::Myriad::releaseBlock( Bu::Myriad::Block *pBlock ) { + if( pBlock == NULL ) + return; sio << "Myriad: Releasing block " << pBlock->iBlockIndex << sio.nl; if( pBlock->bChanged ) { diff --git a/src/myriadstream.cpp b/src/myriadstream.cpp index 0e6fc89..d714c23 100644 --- a/src/myriadstream.cpp +++ b/src/myriadstream.cpp @@ -13,6 +13,8 @@ using Bu::Fmt; #include +// #define MYRIAD_STREAM_DEBUG 1 + Bu::MyriadStream::MyriadStream( Bu::Myriad &rMyriad, Bu::Myriad::Stream *pStream ) : rMyriad( rMyriad ), @@ -20,8 +22,10 @@ Bu::MyriadStream::MyriadStream( Bu::Myriad &rMyriad, pCurBlock( NULL ), iPos( 0 ) { +#ifdef MYRIAD_STREAM_DEBUG sio << "MyriadStream: Created, iId=" << pStream->iId << ", iSize=" << pStream->iSize << sio.nl; +#endif //pCurBlock = rMyriad.newBlock(); //rMyriad.getBlock( uStream, pCurBlock ); //uSize = pCurBlock->uBytesUsed; @@ -41,19 +45,25 @@ void Bu::MyriadStream::close() size_t Bu::MyriadStream::read( void *pBuf, size_t nBytes ) { - sio << "MyriadStream: Read: Started, asked to read " << nBytes << "b." +#ifdef MYRIAD_STREAM_DEBUG + sio << "MyriadStream: read: Started, asked to read " << nBytes << "b." << sio.nl; - if( nBytes <= 0 ) - return 0; +#endif if( nBytes > pStream->iSize-iPos ) nBytes = pStream->iSize-iPos; + if( nBytes <= 0 ) + return 0; int iLeft = nBytes; - sio << "MyriadStream: Read: Started, going to read " << nBytes << "b." +#ifdef MYRIAD_STREAM_DEBUG + sio << "MyriadStream: read: Started, going to read " << nBytes << "b." << sio.nl; +#endif if( pCurBlock == NULL ) { - sio << "MyriadStream: Read: No block loaded, loading initial block." +#ifdef MYRIAD_STREAM_DEBUG + sio << "MyriadStream: read: No block loaded, loading initial block." << sio.nl; +#endif pCurBlock = rMyriad.getBlock( pStream->aBlocks[iPos/rMyriad.iBlockSize] ); @@ -63,25 +73,30 @@ size_t Bu::MyriadStream::read( void *pBuf, size_t nBytes ) int iCurBlock = pStream->aBlocks[iPos/rMyriad.iBlockSize]; if( pCurBlock->iBlockIndex != iCurBlock ) { - sio << "MyriadStream: Read: Loading new block " << iCurBlock << "." +#ifdef MYRIAD_STREAM_DEBUG + sio << "MyriadStream: read: Loading new block " << iCurBlock << "." << sio.nl; +#endif rMyriad.releaseBlock( pCurBlock ); pCurBlock = rMyriad.getBlock( iCurBlock ); } int iAmnt = Bu::min( - rMyriad.iBlockSize - iPos%rMyriad.iBlockSize, - iLeft + Bu::min( + rMyriad.iBlockSize - iPos%rMyriad.iBlockSize, + iLeft + ), + pStream->iSize-iPos ); -// if( iLeft > iAmnt ) -// iAmnt = iLeft; - sio << "MyriadStream: Read: Copying out bytes: " - << (iPos%rMyriad.iBlockSize) << " - " +#ifdef MYRIAD_STREAM_DEBUG + sio << "MyriadStream: read: Copying out bytes: " + << iPos << "(" << (iPos%rMyriad.iBlockSize) << ")+" << iAmnt << ", " << iLeft << "b left." << sio.nl; +#endif memcpy( pBuf, - pCurBlock->pData+iPos%rMyriad.iBlockSize, + pCurBlock->pData+(iPos%rMyriad.iBlockSize), iAmnt ); iPos += iAmnt; @@ -93,65 +108,109 @@ size_t Bu::MyriadStream::read( void *pBuf, size_t nBytes ) size_t Bu::MyriadStream::write( const void *pBuf, size_t nBytes ) { - if( nBytes == 0 ) + if( nBytes <= 0 ) return 0; -/* if( pCurBlock->uBytesUsed >= uBlockSize ) - { - // We're at the end of our current block, allocate another before we do - // anything. - uCurBlock = rMyriad.getNextBlock( uCurBlock, pCurBlock ); - } */ + +#ifdef MYRIAD_STREAM_DEBUG + sio << "MyriadStream: write: Started, asked to write " << nBytes << "b." + << sio.nl; +#endif + if( nBytes <= 0 ) + return 0; + int iLeft = nBytes; /* - if( (iPos%uBlockSize)+nBytes < uBlockSize ) + if( pCurBlock == NULL ) { - //printf("wa: %u:%u:%u:%u -> ", iPos, iPos%uBlockSize, uSize, pCurBlock->uBytesUsed ); - memcpy( pCurBlock->pData+(iPos%uBlockSize), pBuf, nBytes ); - //printf("write buffill: %ub, %u-%u/%u -> %d-%d/%d (a:%u:%u)\n", - // nBytes, 0, nBytes-1, nBytes, - // iPos, iPos+nBytes-1, uSize, uCurBlock, - // pCurBlock->uBytesUsed ); - if( (iPos%uBlockSize)+nBytes > pCurBlock->uBytesUsed ) - pCurBlock->uBytesUsed = (iPos%uBlockSize)+nBytes; - rMyriad.setBlock( uCurBlock, pCurBlock ); - iPos += nBytes; - if( iPos > uSize ) - uSize = iPos; - //printf("block %u = %ub (%ub total) %d:%u\n", - // uCurBlock, pCurBlock->uBytesUsed, uSize, nBytes, iPos ); - return nBytes; - } - else +#ifdef MYRIAD_STREAM_DEBUG + sio << "MyriadStream: write: No block loaded, loading initial block." + << sio.nl; +#endif + pCurBlock = rMyriad.getBlock( + pStream->aBlocks[iPos/rMyriad.iBlockSize] + ); + }*/ + + while( iLeft > 0 ) { - size_t nTotal = 0; - for(;;) + int iCurBlock; + if( iPos/rMyriad.iBlockSize < pStream->aBlocks.getSize() ) { - uint32_t uNow = uBlockSize-(iPos%uBlockSize); - //printf("uNow: %u (%u-(%u%%%u)) %d req\n", uNow, uBlockSize, iPos, uBlockSize, nBytes ); - if( nBytes < uNow ) - uNow = nBytes; - memcpy( pCurBlock->pData+(iPos%uBlockSize), - &((char *)pBuf)[nTotal], uNow ); - //printf("write buffill: %ub, %u-%u/%u -> %d-%d/%d (b:%u:%u)\n", - // uNow, nTotal, nTotal+uNow-1, nBytes, - // iPos, iPos+uNow-1, uSize, uCurBlock, pCurBlock->uBytesUsed ); - if( (iPos%uBlockSize)+uNow > pCurBlock->uBytesUsed ) - pCurBlock->uBytesUsed = (iPos%uBlockSize)+uNow; - rMyriad.setBlock( uCurBlock, pCurBlock ); - iPos += uNow; - if( iPos > uSize ) - uSize = iPos; - nTotal += uNow; - nBytes -= uNow; - //printf("wb: block %u = %ub (%ub total)\n", - // uCurBlock, pCurBlock->uBytesUsed, uSize ); - //if( pCurBlock->uBytesUsed == uBlockSize ) - if( iPos%uBlockSize == 0 ) - uCurBlock = rMyriad.getNextBlock( uCurBlock, pCurBlock ); - if( nBytes == 0 ) - return nTotal; + iCurBlock = pStream->aBlocks[iPos/rMyriad.iBlockSize]; } - }*/ - return 0; + else + { + iCurBlock = rMyriad.findEmptyBlock(); + pStream->aBlocks.append( iCurBlock ); + rMyriad.bsBlockUsed.setBit( iCurBlock ); +#ifdef MYRIAD_STREAM_DEBUG + sio << "MyriadStream: write: New block allocated and appended: " + << iCurBlock << "." << sio.nl; + +#endif + } + if( !pCurBlock || pCurBlock->iBlockIndex != iCurBlock ) + { +#ifdef MYRIAD_STREAM_DEBUG + sio << "MyriadStream: write: Loading new block " << iCurBlock << "." + << sio.nl; +#endif + rMyriad.releaseBlock( pCurBlock ); + pCurBlock = rMyriad.getBlock( iCurBlock ); + } + pCurBlock->bChanged = true; + + // There are two main writing modes when it comes down to it. + // Overwrite mode and append mode. Append is what pretty much always + // happens when creating a new stream. + if( iPos < pStream->iSize ) + { + int iAmnt = Bu::min( + Bu::min( + rMyriad.iBlockSize - iPos%rMyriad.iBlockSize, + iLeft + ), + pStream->iSize-iPos + ); +#ifdef MYRIAD_STREAM_DEBUG + sio << "MyriadStream: write (ovr): Copying in bytes: " + << (iPos%rMyriad.iBlockSize) << "+" + << iAmnt + << ", " << iLeft << "b left." << sio.nl; +#endif + memcpy( + pCurBlock->pData+(iPos%rMyriad.iBlockSize), + pBuf, + iAmnt + ); + iPos += iAmnt; + pBuf = &((char *)pBuf)[iAmnt]; + iLeft -= iAmnt; + } + else + { + int iAmnt = Bu::min( + rMyriad.iBlockSize - iPos%rMyriad.iBlockSize, + iLeft + ); +#ifdef MYRIAD_STREAM_DEBUG + sio << "MyriadStream: write (app): Copying in bytes: " + << (iPos%rMyriad.iBlockSize) << "+" + << iAmnt + << ", " << iLeft << "b left." << sio.nl; +#endif + memcpy( + pCurBlock->pData+(iPos%rMyriad.iBlockSize), + pBuf, + iAmnt + ); + iPos += iAmnt; + pStream->iSize += iAmnt; + pBuf = &((char *)pBuf)[iAmnt]; + iLeft -= iAmnt; + } + } + + return nBytes; } long Bu::MyriadStream::tell() diff --git a/src/server.cpp b/src/server.cpp index 80ed509..36d9d6c 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -93,8 +93,16 @@ void Bu::Server::scan() } if( FD_ISSET( j, &fdWrite ) ) { - Client *pClient = hClients.get( j ); - pClient->processOutput(); + try + { + Client *pClient = hClients.get( j ); + pClient->processOutput(); + } + catch( Bu::HashException &e ) + { + // Do nothing, I guess, the client is already dead... + // TODO: Someday, we may want to handle this more graceully. + } } } diff --git a/src/tools/myriad.cpp b/src/tools/myriad.cpp index 95b4503..605aac9 100644 --- a/src/tools/myriad.cpp +++ b/src/tools/myriad.cpp @@ -20,7 +20,8 @@ enum Mode modeCreate, modeInfo, modeStreamNew, - modeStreamRead, + modeStreamDump, + modeStreamPut, modeNone }; @@ -41,8 +42,10 @@ public: "Display some info about a Myriad file." ); addOption( eMode, 'n', "new", "Create a new sub-stream in a Myriad file."); - addOption( eMode, 'r', "read", + addOption( eMode, 'd', "dump", "Read a stream from a Myriad file."); + addOption( eMode, "put", + "Put a file into a Myriad stream."); addHelpOption(); addHelpBanner("\nGeneral options:"); @@ -56,7 +59,8 @@ public: setOverride( "create", "create" ); setOverride( "info", "info" ); setOverride( "new", "new" ); - setOverride( "read", "read" ); + setOverride( "dump", "dump" ); + setOverride( "put", "put" ); parse( argc, argv ); } @@ -78,8 +82,10 @@ Bu::Formatter &operator>>( Bu::Formatter &f, Mode &m ) m = modeInfo; else if( sTok == "new" ) m = modeStreamNew; - else if( sTok == "read" ) - m = modeStreamRead; + else if( sTok == "dump" ) + m = modeStreamDump; + else if( sTok == "put" ) + m = modeStreamPut; else m = modeNone; return f; @@ -99,7 +105,7 @@ int main( int argc, char *argv[] ) } else { - File fOut( opts.sFile, File::WriteNew ); + File fOut( opts.sFile, File::WriteNew|File::Read ); Myriad m( fOut ); m.initialize( opts.iBlockSize, opts.iPreallocate ); } @@ -134,7 +140,7 @@ int main( int argc, char *argv[] ) } break; - case modeStreamRead: + case modeStreamDump: if( !opts.sFile.isSet() ) { sio << "Please specify a file manipulate." << sio.nl; @@ -173,6 +179,35 @@ int main( int argc, char *argv[] ) } sio << sio.nl; } + sio << "Position: " << s.tell() << ", isEos()=" << s.isEos() + << sio.nl; + } + break; + + case modeStreamPut: + if( !opts.sFile.isSet() ) + { + sio << "Please specify a file manipulate." << sio.nl; + return 0; + } + else if( !opts.sSrc.isSet() ) + { + sio << "Please specify a source file to read." << sio.nl; + } + else + { + File fOut( opts.sFile, File::Write|File::Read ); + Myriad m( fOut ); + m.initialize(); + MyriadStream sOut = m.openStream( + m.createStream( opts.iPreallocate ) + ); + File fIn( opts.sSrc, File::Read ); + char buf[1024]; + while( !fIn.isEos() ) + { + sOut.write( buf, fIn.read( buf, 1024 ) ); + } } break; -- cgit v1.2.3