aboutsummaryrefslogtreecommitdiff
path: root/src/stable
diff options
context:
space:
mode:
authorMike Buland <mike@xagasoft.com>2024-08-05 12:44:59 -0700
committerMike Buland <mike@xagasoft.com>2024-08-05 12:44:59 -0700
commitcaee572ff94822ca2ed354fcb79ca04ed9adf388 (patch)
tree4d5253c310e99a09852863024250a22d810ead7b /src/stable
parent5f183d4a0e7caa529db09f32848314360cecfd91 (diff)
downloadlibbu++-caee572ff94822ca2ed354fcb79ca04ed9adf388.tar.gz
libbu++-caee572ff94822ca2ed354fcb79ca04ed9adf388.tar.bz2
libbu++-caee572ff94822ca2ed354fcb79ca04ed9adf388.tar.xz
libbu++-caee572ff94822ca2ed354fcb79ca04ed9adf388.zip
Perhaps fixed a Myriad race condition.0.0.1
If so, this will fix the issue where streams randomly truncate when accessed by multiple threads.
Diffstat (limited to '')
-rw-r--r--src/stable/myriad.cpp67
-rw-r--r--src/stable/myriad.h9
-rw-r--r--src/stable/myriadstream.cpp25
3 files changed, 67 insertions, 34 deletions
diff --git a/src/stable/myriad.cpp b/src/stable/myriad.cpp
index 86f651e..c606369 100644
--- a/src/stable/myriad.cpp
+++ b/src/stable/myriad.cpp
@@ -18,7 +18,8 @@ using Bu::Fmt;
18 18
19#define Myriad_MAGIC_CODE ((unsigned char *)"\x0a\xd3\xfa\x84") 19#define Myriad_MAGIC_CODE ((unsigned char *)"\x0a\xd3\xfa\x84")
20 20
21#define TRACE( x ) Bu::println("%1:%2: %3: %4 - %5").arg(__FILE__).arg( __LINE__ ).arg(__PRETTY_FUNCTION__).arg(sStore.getLocation()).arg(x) 21// #define TRACE( x ) Bu::println("%1:%2: %3: %4 - %5").arg(__FILE__).arg( __LINE__ ).arg(__PRETTY_FUNCTION__).arg(sStore.getLocation()).arg(x)
22#define TRACE( x ) (void)0
22 23
23namespace Bu 24namespace Bu
24{ 25{
@@ -145,7 +146,7 @@ void Bu::Myriad::initialize()
145 146
146 Stream *pFakeHdr = new Stream; 147 Stream *pFakeHdr = new Stream;
147 pFakeHdr->iId = 0; 148 pFakeHdr->iId = 0;
148 pFakeHdr->iSize = iHeaderSize; 149 pFakeHdr->setSize( iHeaderSize );
149 for( int j = 0; j < iHeaderBlocks; j++ ) 150 for( int j = 0; j < iHeaderBlocks; j++ )
150 { 151 {
151 pFakeHdr->aBlocks.append( j ); 152 pFakeHdr->aBlocks.append( j );
@@ -161,11 +162,13 @@ void Bu::Myriad::initialize()
161 pIn->setPos( sStore.tell() ); 162 pIn->setPos( sStore.tell() );
162 for( int j = 0; j < iStreams; j++ ) 163 for( int j = 0; j < iStreams; j++ )
163 { 164 {
165 int iSizeTmp;
164 aStreams.append( new Stream() ); 166 aStreams.append( new Stream() );
165 Stream &s = *aStreams[j]; 167 Stream &s = *aStreams[j];
166 pIn->read( &s.iId, 4 ); 168 pIn->read( &s.iId, 4 );
167 pIn->read( &s.iSize, 4 ); 169 pIn->read( &iSizeTmp, 4 );
168 int iSBlocks = blkDiv(s.iSize, iBlockSize); 170 s.setSize( iSizeTmp );
171 int iSBlocks = blkDiv(s.getSize(), iBlockSize);
169 // sio << "Myriad: - Stream::iId=" << s.iId 172 // sio << "Myriad: - Stream::iId=" << s.iId
170 // << ", Stream::iSize=" << s.iSize 173 // << ", Stream::iSize=" << s.iSize
171 // << ", Stream::aBlocks=" << iSBlocks 174 // << ", Stream::aBlocks=" << iSBlocks
@@ -282,10 +285,10 @@ void Bu::Myriad::initialize( int iBlockSize, int iPreAllocate )
282 this->iBlockSize = iBlockSize; 285 this->iBlockSize = iBlockSize;
283 this->iBlocks = iPreAllocate; 286 this->iBlocks = iPreAllocate;
284 287
285 pStr->iSize = sStore.tell(); 288 pStr->setSize( sStore.tell() );
286// sio << "Myriad: Actual end of header stream = " << pStr->iSize << sio.nl; 289// sio << "Myriad: Actual end of header stream = " << pStr->iSize << sio.nl;
287 290
288 pStr->iSize = iHeaderSize; 291 pStr->setSize( iHeaderSize );
289 for( int j = 0; j < iHeaderBlocks; j++ ) 292 for( int j = 0; j < iHeaderBlocks; j++ )
290 { 293 {
291// sio << "Started block " << j << " is header." << sio.nl; 294// sio << "Started block " << j << " is header." << sio.nl;
@@ -308,6 +311,26 @@ void Bu::Myriad::initialize( int iBlockSize, int iPreAllocate )
308 TRACE("mHeader unlocked..."); 311 TRACE("mHeader unlocked...");
309} 312}
310 313
314void Bu::Myriad::Stream::setSize( int iNewSize )
315{
316 MutexLocker l( mStream );
317 iSize = iNewSize;
318}
319
320void Bu::Myriad::Stream::growTo( int iNewSize )
321{
322 MutexLocker l( mStream );
323 if( iNewSize < iSize )
324 return;
325 iSize = iNewSize;
326}
327
328int Bu::Myriad::Stream::getSize() const
329{
330 MutexLocker l( mStream );
331 return iSize;
332}
333
311void Bu::Myriad::updateHeader() 334void Bu::Myriad::updateHeader()
312{ 335{
313 MutexLocker mLock( mHeader ); 336 MutexLocker mLock( mHeader );
@@ -354,7 +377,7 @@ void Bu::Myriad::updateHeader()
354 iHeaderSize += 4; 377 iHeaderSize += 4;
355 iNewBlocks = blkDiv( iHeaderSize, iBlockSize ); 378 iNewBlocks = blkDiv( iHeaderSize, iBlockSize );
356 } 379 }
357 aStreams[0]->iSize = iHeaderSize; 380 aStreams[0]->setSize( iHeaderSize );
358// sio << "Myriad: updateHeader: iHeaderSize=" << iHeaderSize 381// sio << "Myriad: updateHeader: iHeaderSize=" << iHeaderSize
359// << ", iNewBlocks=" << iNewBlocks << ", curBlocks=" 382// << ", iNewBlocks=" << iNewBlocks << ", curBlocks="
360// << aStreams[0]->aBlocks.getSize() << sio.nl; 383// << aStreams[0]->aBlocks.getSize() << sio.nl;
@@ -379,9 +402,11 @@ void Bu::Myriad::updateHeader()
379 402
380 for( StreamArray::iterator i = aStreams.begin(); i; i++ ) 403 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
381 { 404 {
405 int iSizeTmp;
382 sHdr.write( &(*i)->iId, 4 ); 406 sHdr.write( &(*i)->iId, 4 );
383 sHdr.write( &(*i)->iSize, 4 ); 407 sHdr.write( &iSizeTmp, 4 );
384 int iUsedBlocks = blkDiv( (*i)->iSize, iBlockSize ); 408 (*i)->setSize( iSizeTmp );
409 int iUsedBlocks = blkDiv( iSizeTmp, iBlockSize );
385// for( BlockArray::iterator j = (*i)->aBlocks.begin(); j; j++ ) 410// for( BlockArray::iterator j = (*i)->aBlocks.begin(); j; j++ )
386 for( int j = 0; j < iUsedBlocks; j++ ) 411 for( int j = 0; j < iUsedBlocks; j++ )
387 { 412 {
@@ -402,7 +427,7 @@ int Bu::Myriad::createStream( int iPreAllocate )
402 pStr->iId = aStreams.last()->iId+1; 427 pStr->iId = aStreams.last()->iId+1;
403 //sio << "Myriad: New stream id=" << pStr->iId << ", iPreAllocate=" 428 //sio << "Myriad: New stream id=" << pStr->iId << ", iPreAllocate="
404 // << iPreAllocate << sio.nl; 429 // << iPreAllocate << sio.nl;
405 pStr->iSize = 0; 430 pStr->setSize( 0 );
406 aStreams.append( pStr ); 431 aStreams.append( pStr );
407 432
408 for( int j = 0; j < iPreAllocate; j++ ) 433 for( int j = 0; j < iPreAllocate; j++ )
@@ -438,7 +463,7 @@ int Bu::Myriad::createStreamWithId( int iId, int iPreAllocate )
438 pStr->iId = iId; 463 pStr->iId = iId;
439 //sio << "Myriad: New stream id=" << pStr->iId << ", iPreAllocate=" 464 //sio << "Myriad: New stream id=" << pStr->iId << ", iPreAllocate="
440 // << iPreAllocate << sio.nl; 465 // << iPreAllocate << sio.nl;
441 pStr->iSize = 0; 466 pStr->setSize( 0 );
442 if( aStreams.last()->iId < iId ) 467 if( aStreams.last()->iId < iId )
443 { 468 {
444 aStreams.append( pStr ); 469 aStreams.append( pStr );
@@ -546,7 +571,7 @@ int Bu::Myriad::getStreamSize( int iId )
546 TRACE("mHeader locked."); 571 TRACE("mHeader locked.");
547 572
548 TRACE("mHeader unlocked..."); 573 TRACE("mHeader unlocked...");
549 return findStream( iId )->iSize; 574 return findStream( iId )->getSize();
550} 575}
551 576
552bool Bu::Myriad::hasStream( int iId ) 577bool Bu::Myriad::hasStream( int iId )
@@ -608,7 +633,7 @@ Bu::size Bu::Myriad::getTotalUsedBytes()
608 Bu::size iTotalSize = 0; 633 Bu::size iTotalSize = 0;
609 for( StreamArray::iterator i = aStreams.begin(); i; i++ ) 634 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
610 { 635 {
611 iTotalSize += (*i)->iSize; 636 iTotalSize += (*i)->getSize();
612 } 637 }
613 TRACE("mHeader unlocked..."); 638 TRACE("mHeader unlocked...");
614 return iTotalSize; 639 return iTotalSize;
@@ -622,7 +647,7 @@ Bu::size Bu::Myriad::getTotalUnusedBytes()
622 Bu::size iTotalSize = (iBlocks-iUsed)*iBlockSize; 647 Bu::size iTotalSize = (iBlocks-iUsed)*iBlockSize;
623 for( StreamArray::iterator i = aStreams.begin(); i; i++ ) 648 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
624 { 649 {
625 iTotalSize += iBlockSize - ((Bu::size)(*i)->iSize%iBlockSize); 650 iTotalSize += iBlockSize - ((Bu::size)(*i)->getSize()%iBlockSize);
626 } 651 }
627 TRACE("mHeader unlocked..."); 652 TRACE("mHeader unlocked...");
628 return iTotalSize; 653 return iTotalSize;
@@ -636,7 +661,7 @@ Bu::size Bu::Myriad::getTotalUnusedBytes( int iFakeBlockSize )
636 Bu::size iTotalSize = (iBlocks-iUsed)*iFakeBlockSize; 661 Bu::size iTotalSize = (iBlocks-iUsed)*iFakeBlockSize;
637 for( StreamArray::iterator i = aStreams.begin(); i; i++ ) 662 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
638 { 663 {
639 iTotalSize += iFakeBlockSize - ((*i)->iSize%iFakeBlockSize); 664 iTotalSize += iFakeBlockSize - ((*i)->getSize()%iFakeBlockSize);
640 } 665 }
641 TRACE("mHeader unlocked..."); 666 TRACE("mHeader unlocked...");
642 return iTotalSize; 667 return iTotalSize;
@@ -722,15 +747,15 @@ void Bu::Myriad::setStreamSize( Stream *pStream, long iSize )
722 MutexLocker mLock( mHeader ); 747 MutexLocker mLock( mHeader );
723 TRACE("mHeader locked."); 748 TRACE("mHeader locked.");
724 749
725 if( pStream->iSize == iSize ) 750 if( pStream->getSize() == iSize )
726 { 751 {
727 TRACE("mHeader unlocked..."); 752 TRACE("mHeader unlocked...");
728 return; 753 return;
729 } 754 }
730 else if( pStream->iSize > iSize ) 755 else if( pStream->getSize() > iSize )
731 { 756 {
732 // Shrink 757 // Shrink
733 TRACE(Bu::String("Shrink stream %1 from %2 to %3").arg(pStream->iId).arg(pStream->iSize).arg(iSize).end() ); 758 TRACE(Bu::String("Shrink stream %1 from %2 to %3").arg(pStream->iId).arg(pStream->getSize()).arg(iSize).end() );
734 for( int iNewSize = pStream->aBlocks.getSize()*iBlockSize; 759 for( int iNewSize = pStream->aBlocks.getSize()*iBlockSize;
735 iNewSize-iBlockSize > iSize; iNewSize -= iBlockSize ) 760 iNewSize-iBlockSize > iSize; iNewSize -= iBlockSize )
736 { 761 {
@@ -742,13 +767,13 @@ void Bu::Myriad::setStreamSize( Stream *pStream, long iSize )
742// bsBlockUsed.setBit( pStream->aBlocks.last(), false ); 767// bsBlockUsed.setBit( pStream->aBlocks.last(), false );
743 pStream->aBlocks.eraseLast(); 768 pStream->aBlocks.eraseLast();
744 } 769 }
745 pStream->iSize = iSize; 770 pStream->setSize( iSize );
746 bHeaderChanged = true; 771 bHeaderChanged = true;
747 } 772 }
748 else 773 else
749 { 774 {
750 // Grow 775 // Grow
751 TRACE(Bu::String("Grow stream %1 from %2 to %3").arg(pStream->iId).arg(pStream->iSize).arg(iSize).end() ); 776 TRACE(Bu::String("Grow stream %1 from %2 to %3").arg(pStream->iId).arg(pStream->getSize()).arg(iSize).end() );
752 for( int iNewSize = pStream->aBlocks.getSize()*iBlockSize; 777 for( int iNewSize = pStream->aBlocks.getSize()*iBlockSize;
753 iNewSize < iSize; iNewSize += iBlockSize ) 778 iNewSize < iSize; iNewSize += iBlockSize )
754 { 779 {
@@ -759,7 +784,7 @@ void Bu::Myriad::setStreamSize( Stream *pStream, long iSize )
759// bHeaderChanged = true; 784// bHeaderChanged = true;
760 iUsed++; 785 iUsed++;
761 } 786 }
762 pStream->iSize = iSize; 787 pStream->setSize( iSize );
763 bHeaderChanged = true; 788 bHeaderChanged = true;
764 } 789 }
765 TRACE("mHeader unlocked..."); 790 TRACE("mHeader unlocked...");
diff --git a/src/stable/myriad.h b/src/stable/myriad.h
index 14467a4..e63c29f 100644
--- a/src/stable/myriad.h
+++ b/src/stable/myriad.h
@@ -184,9 +184,16 @@ namespace Bu
184 class Stream 184 class Stream
185 { 185 {
186 public: 186 public:
187 void setSize( int iNewSize );
188 void growTo( int iNewSize );
189 int getSize() const;
190
187 int iId; 191 int iId;
188 int iSize;
189 BlockArray aBlocks; 192 BlockArray aBlocks;
193
194 private:
195 int iSize;
196 mutable Bu::Mutex mStream;
190 }; 197 };
191 typedef Bu::Array<Stream *> StreamArray; 198 typedef Bu::Array<Stream *> StreamArray;
192 199
diff --git a/src/stable/myriadstream.cpp b/src/stable/myriadstream.cpp
index 50c6924..3c78bb0 100644
--- a/src/stable/myriadstream.cpp
+++ b/src/stable/myriadstream.cpp
@@ -19,7 +19,8 @@ using Bu::Fmt;
19#endif 19#endif
20#include "bu/sio.h" 20#include "bu/sio.h"
21 21
22#define TRACE( x ) Bu::println("%1:%2: %3: %4 - %5").arg(__FILE__).arg( __LINE__ ).arg(__PRETTY_FUNCTION__).arg(rMyriad.sStore.getLocation()).arg(x) 22// #define TRACE( x ) Bu::println("%1:%2: %3: %4 - %5").arg(__FILE__).arg( __LINE__ ).arg(__PRETTY_FUNCTION__).arg(rMyriad.sStore.getLocation()).arg(x)
23#define TRACE( x ) (void)0
23 24
24Bu::MyriadStream::MyriadStream( Bu::Myriad &rMyriad, 25Bu::MyriadStream::MyriadStream( Bu::Myriad &rMyriad,
25 Bu::Myriad::Stream *pStream ) : 26 Bu::Myriad::Stream *pStream ) :
@@ -55,8 +56,8 @@ Bu::size Bu::MyriadStream::read( void *pBuf, Bu::size nBytes )
55 sio << "MyriadStream: read: " << __LINE__ << ": Started, asked to read " << nBytes << "b." 56 sio << "MyriadStream: read: " << __LINE__ << ": Started, asked to read " << nBytes << "b."
56 << sio.nl; 57 << sio.nl;
57#endif 58#endif
58 if( nBytes > (Bu::size)pStream->iSize-iPos ) 59 if( nBytes > (Bu::size)pStream->getSize()-iPos )
59 nBytes = pStream->iSize-iPos; 60 nBytes = pStream->getSize()-iPos;
60 if( nBytes <= 0 ) 61 if( nBytes <= 0 )
61 return 0; 62 return 0;
62 int iLeft = nBytes; 63 int iLeft = nBytes;
@@ -92,7 +93,7 @@ Bu::size Bu::MyriadStream::read( void *pBuf, Bu::size nBytes )
92 rMyriad.iBlockSize - iPos%rMyriad.iBlockSize, 93 rMyriad.iBlockSize - iPos%rMyriad.iBlockSize,
93 iLeft 94 iLeft
94 ), 95 ),
95 pStream->iSize-iPos 96 pStream->getSize()-iPos
96 ); 97 );
97#ifdef MYRIAD_STREAM_DEBUG 98#ifdef MYRIAD_STREAM_DEBUG
98 sio << "MyriadStream: read: " << __LINE__ << ": Copying out bytes: " 99 sio << "MyriadStream: read: " << __LINE__ << ": Copying out bytes: "
@@ -166,14 +167,14 @@ Bu::size Bu::MyriadStream::write( const void *pBuf, Bu::size nBytes )
166 // There are two main writing modes when it comes down to it. 167 // There are two main writing modes when it comes down to it.
167 // Overwrite mode and append mode. Append is what pretty much always 168 // Overwrite mode and append mode. Append is what pretty much always
168 // happens when creating a new stream. 169 // happens when creating a new stream.
169 if( iPos < pStream->iSize ) 170 if( iPos < pStream->getSize() )
170 { 171 {
171 int iAmnt = Bu::buMin( 172 int iAmnt = Bu::buMin(
172 Bu::buMin( 173 Bu::buMin(
173 rMyriad.iBlockSize - iPos%rMyriad.iBlockSize, 174 rMyriad.iBlockSize - iPos%rMyriad.iBlockSize,
174 iLeft 175 iLeft
175 ), 176 ),
176 pStream->iSize-iPos 177 pStream->getSize()-iPos
177 ); 178 );
178#ifdef MYRIAD_STREAM_DEBUG 179#ifdef MYRIAD_STREAM_DEBUG
179 sio << "MyriadStream: write (ovr): " << __LINE__ << ": Copying in bytes: " 180 sio << "MyriadStream: write (ovr): " << __LINE__ << ": Copying in bytes: "
@@ -208,9 +209,9 @@ Bu::size Bu::MyriadStream::write( const void *pBuf, Bu::size nBytes )
208 iAmnt 209 iAmnt
209 ); 210 );
210 iPos += iAmnt; 211 iPos += iAmnt;
211 TRACE(Bu::String("Stream=%1 - pStream->iSize(%2) += iAmnt(%3)").arg(pStream->iId).arg( pStream->iSize ).arg(iAmnt).end()); 212 TRACE(Bu::String("Stream=%1 - pStream->iSize(%2) += iAmnt(%3)").arg(pStream->iId).arg( pStream->getSize() ).arg(iAmnt).end());
212 pStream->iSize += iAmnt; 213 pStream->growTo( pStream->getSize()+iAmnt );
213 TRACE(Bu::String("Stream=%1 - pStream->iSize = %2").arg(pStream->iId).arg( pStream->iSize ).end()); 214 TRACE(Bu::String("Stream=%1 - pStream->iSize = %2").arg(pStream->iId).arg( pStream->getSize() ).end());
214 rMyriad.headerChanged(); 215 rMyriad.headerChanged();
215 pBuf = &((char *)pBuf)[iAmnt]; 216 pBuf = &((char *)pBuf)[iAmnt];
216 iLeft -= iAmnt; 217 iLeft -= iAmnt;
@@ -237,12 +238,12 @@ void Bu::MyriadStream::setPos( Bu::size pos )
237 238
238void Bu::MyriadStream::setPosEnd( Bu::size pos ) 239void Bu::MyriadStream::setPosEnd( Bu::size pos )
239{ 240{
240 iPos = pStream->iSize-pos; 241 iPos = pStream->getSize()-pos;
241} 242}
242 243
243bool Bu::MyriadStream::isEos() 244bool Bu::MyriadStream::isEos()
244{ 245{
245 return iPos >= pStream->iSize; 246 return iPos >= pStream->getSize();
246} 247}
247 248
248bool Bu::MyriadStream::isOpen() 249bool Bu::MyriadStream::isOpen()
@@ -299,7 +300,7 @@ void Bu::MyriadStream::setSize( Bu::size iSize )
299 300
300Bu::size Bu::MyriadStream::getSize() const 301Bu::size Bu::MyriadStream::getSize() const
301{ 302{
302 return pStream->iSize; 303 return pStream->getSize();
303} 304}
304 305
305Bu::size Bu::MyriadStream::getBlockSize() const 306Bu::size Bu::MyriadStream::getBlockSize() const