diff options
author | Mike Buland <eichlan@xagasoft.com> | 2012-04-14 17:38:13 +0000 |
---|---|---|
committer | Mike Buland <eichlan@xagasoft.com> | 2012-04-14 17:38:13 +0000 |
commit | 49d9fa3c2435b8e97ffdc42e40a880a3dad82f8a (patch) | |
tree | 98afeeeccd5105ea812f1b6507cbd6ef1364a91f /src/stable | |
parent | 91f9d6e8b371f339dbcc16541054f9cb371d0ec9 (diff) | |
download | libbu++-49d9fa3c2435b8e97ffdc42e40a880a3dad82f8a.tar.gz libbu++-49d9fa3c2435b8e97ffdc42e40a880a3dad82f8a.tar.bz2 libbu++-49d9fa3c2435b8e97ffdc42e40a880a3dad82f8a.tar.xz libbu++-49d9fa3c2435b8e97ffdc42e40a880a3dad82f8a.zip |
The core of myriad is now thread-safe. It could use a little work on multiple
streams accessing the same blocks at the same time. Right now it's safe, but
a little strange, since both streams work from seperate buffers. Maybe that's
ok, but it still needs some more investigation.
I want to remove the BitString stuff entirely, it turns out it's the slowest
part, which, upon actually looking at the code is completely obvious. This
change shouldn't change the API at all, just make adding blocks to streams much,
much faster.
Diffstat (limited to '')
-rw-r--r-- | src/stable/myriad.cpp | 58 | ||||
-rw-r--r-- | src/stable/myriad.h | 11 |
2 files changed, 68 insertions, 1 deletions
diff --git a/src/stable/myriad.cpp b/src/stable/myriad.cpp index de44930..9030b83 100644 --- a/src/stable/myriad.cpp +++ b/src/stable/myriad.cpp | |||
@@ -8,6 +8,7 @@ | |||
8 | #include "bu/myriad.h" | 8 | #include "bu/myriad.h" |
9 | #include "bu/stream.h" | 9 | #include "bu/stream.h" |
10 | #include "bu/myriadstream.h" | 10 | #include "bu/myriadstream.h" |
11 | #include "bu/mutexlocker.h" | ||
11 | #include <stdio.h> | 12 | #include <stdio.h> |
12 | 13 | ||
13 | #include "bu/sio.h" | 14 | #include "bu/sio.h" |
@@ -50,11 +51,13 @@ Bu::Myriad::Myriad( Bu::Stream &sStore, int iBlockSize, int iPreallocate ) : | |||
50 | 51 | ||
51 | Bu::Myriad::~Myriad() | 52 | Bu::Myriad::~Myriad() |
52 | { | 53 | { |
54 | mActiveBlocks.lock(); | ||
53 | if( !hActiveBlocks.isEmpty() ) | 55 | if( !hActiveBlocks.isEmpty() ) |
54 | { | 56 | { |
55 | sio << "Bu::Myriad::~Myriad(): Error: There are " | 57 | sio << "Bu::Myriad::~Myriad(): Error: There are " |
56 | << hActiveBlocks.getSize() << " unsynced blocks!" << sio.nl; | 58 | << hActiveBlocks.getSize() << " unsynced blocks!" << sio.nl; |
57 | } | 59 | } |
60 | mActiveBlocks.unlock(); | ||
58 | sync(); | 61 | sync(); |
59 | 62 | ||
60 | for( StreamArray::iterator i = aStreams.begin(); i; i++ ) | 63 | for( StreamArray::iterator i = aStreams.begin(); i; i++ ) |
@@ -67,6 +70,7 @@ void Bu::Myriad::sync() | |||
67 | { | 70 | { |
68 | updateHeader(); | 71 | updateHeader(); |
69 | 72 | ||
73 | mActiveBlocks.lock(); | ||
70 | for( BlockHash::iterator i = hActiveBlocks.begin(); i; i++ ) | 74 | for( BlockHash::iterator i = hActiveBlocks.begin(); i; i++ ) |
71 | { | 75 | { |
72 | if( (*i)->bChanged ) | 76 | if( (*i)->bChanged ) |
@@ -74,10 +78,12 @@ void Bu::Myriad::sync() | |||
74 | syncBlock( *i ); | 78 | syncBlock( *i ); |
75 | } | 79 | } |
76 | } | 80 | } |
81 | mActiveBlocks.unlock(); | ||
77 | } | 82 | } |
78 | 83 | ||
79 | void Bu::Myriad::initialize() | 84 | void Bu::Myriad::initialize() |
80 | { | 85 | { |
86 | MutexLocker mLock( mHeader ); | ||
81 | sStore.setPosEnd( 0 ); | 87 | sStore.setPosEnd( 0 ); |
82 | int iSize = sStore.tell(); | 88 | int iSize = sStore.tell(); |
83 | sStore.setPos( 0 ); | 89 | sStore.setPos( 0 ); |
@@ -173,6 +179,8 @@ void Bu::Myriad::initialize() | |||
173 | 179 | ||
174 | void Bu::Myriad::initialize( int iBlockSize, int iPreAllocate ) | 180 | void Bu::Myriad::initialize( int iBlockSize, int iPreAllocate ) |
175 | { | 181 | { |
182 | MutexLocker mLock( mHeader ); | ||
183 | |||
176 | for( StreamArray::iterator i = aStreams.begin(); i; i++ ) | 184 | for( StreamArray::iterator i = aStreams.begin(); i; i++ ) |
177 | { | 185 | { |
178 | delete *i; | 186 | delete *i; |
@@ -262,6 +270,8 @@ void Bu::Myriad::initialize( int iBlockSize, int iPreAllocate ) | |||
262 | 270 | ||
263 | void Bu::Myriad::updateHeader() | 271 | void Bu::Myriad::updateHeader() |
264 | { | 272 | { |
273 | MutexLocker mLock( mHeader ); | ||
274 | |||
265 | if( bHeaderChanged == false ) | 275 | if( bHeaderChanged == false ) |
266 | return; | 276 | return; |
267 | if( !sStore.canWrite() ) | 277 | if( !sStore.canWrite() ) |
@@ -337,6 +347,8 @@ void Bu::Myriad::updateHeader() | |||
337 | 347 | ||
338 | int Bu::Myriad::createStream( int iPreAllocate ) | 348 | int Bu::Myriad::createStream( int iPreAllocate ) |
339 | { | 349 | { |
350 | MutexLocker mLock( mHeader ); | ||
351 | |||
340 | Stream *pStr = new Stream(); | 352 | Stream *pStr = new Stream(); |
341 | pStr->iId = aStreams.last()->iId+1; | 353 | pStr->iId = aStreams.last()->iId+1; |
342 | //sio << "Myriad: New stream id=" << pStr->iId << ", iPreAllocate=" | 354 | //sio << "Myriad: New stream id=" << pStr->iId << ", iPreAllocate=" |
@@ -360,6 +372,8 @@ int Bu::Myriad::createStream( int iPreAllocate ) | |||
360 | 372 | ||
361 | int Bu::Myriad::createStreamWithId( int iId, int iPreAllocate ) | 373 | int Bu::Myriad::createStreamWithId( int iId, int iPreAllocate ) |
362 | { | 374 | { |
375 | MutexLocker mLock( mHeader ); | ||
376 | |||
363 | try | 377 | try |
364 | { | 378 | { |
365 | findStream( iId ); | 379 | findStream( iId ); |
@@ -432,6 +446,8 @@ int Bu::Myriad::findEmptyBlock() | |||
432 | 446 | ||
433 | void Bu::Myriad::deleteStream( int iId ) | 447 | void Bu::Myriad::deleteStream( int iId ) |
434 | { | 448 | { |
449 | MutexLocker mLock( mHeader ); | ||
450 | |||
435 | if( iId < 0 ) | 451 | if( iId < 0 ) |
436 | throw MyriadException( MyriadException::invalidStreamId, | 452 | throw MyriadException( MyriadException::invalidStreamId, |
437 | "Invalid stream id."); | 453 | "Invalid stream id."); |
@@ -458,6 +474,8 @@ void Bu::Myriad::deleteStream( int iId ) | |||
458 | 474 | ||
459 | Bu::Array<int> Bu::Myriad::getStreamIds() | 475 | Bu::Array<int> Bu::Myriad::getStreamIds() |
460 | { | 476 | { |
477 | MutexLocker mLock( mHeader ); | ||
478 | |||
461 | Bu::Array<int> aRet( aStreams.getSize() ); | 479 | Bu::Array<int> aRet( aStreams.getSize() ); |
462 | for( StreamArray::iterator i = aStreams.begin(); i; i++ ) | 480 | for( StreamArray::iterator i = aStreams.begin(); i; i++ ) |
463 | { | 481 | { |
@@ -469,11 +487,15 @@ Bu::Array<int> Bu::Myriad::getStreamIds() | |||
469 | 487 | ||
470 | int Bu::Myriad::getStreamSize( int iId ) | 488 | int Bu::Myriad::getStreamSize( int iId ) |
471 | { | 489 | { |
490 | MutexLocker mLock( mHeader ); | ||
491 | |||
472 | return findStream( iId )->iSize; | 492 | return findStream( iId )->iSize; |
473 | } | 493 | } |
474 | 494 | ||
475 | bool Bu::Myriad::hasStream( int iId ) | 495 | bool Bu::Myriad::hasStream( int iId ) |
476 | { | 496 | { |
497 | MutexLocker mLock( mHeader ); | ||
498 | |||
477 | try | 499 | try |
478 | { | 500 | { |
479 | findStream( iId ); | 501 | findStream( iId ); |
@@ -486,12 +508,16 @@ bool Bu::Myriad::hasStream( int iId ) | |||
486 | 508 | ||
487 | Bu::MyriadStream Bu::Myriad::openStream( int iId ) | 509 | Bu::MyriadStream Bu::Myriad::openStream( int iId ) |
488 | { | 510 | { |
511 | MutexLocker mLock( mHeader ); | ||
512 | |||
489 | //sio << "Myriad: Request to open stream: " << iId << sio.nl; | 513 | //sio << "Myriad: Request to open stream: " << iId << sio.nl; |
490 | return MyriadStream( *this, findStream( iId ) ); | 514 | return MyriadStream( *this, findStream( iId ) ); |
491 | } | 515 | } |
492 | 516 | ||
493 | int Bu::Myriad::getNumStreams() | 517 | int Bu::Myriad::getNumStreams() |
494 | { | 518 | { |
519 | MutexLocker mLock( mHeader ); | ||
520 | |||
495 | return aStreams.getSize(); | 521 | return aStreams.getSize(); |
496 | } | 522 | } |
497 | 523 | ||
@@ -512,6 +538,8 @@ int Bu::Myriad::getNumUsedBlocks() | |||
512 | 538 | ||
513 | int Bu::Myriad::getTotalUsedBytes() | 539 | int Bu::Myriad::getTotalUsedBytes() |
514 | { | 540 | { |
541 | MutexLocker mLock( mHeader ); | ||
542 | |||
515 | int iTotalSize = 0; | 543 | int iTotalSize = 0; |
516 | for( StreamArray::iterator i = aStreams.begin(); i; i++ ) | 544 | for( StreamArray::iterator i = aStreams.begin(); i; i++ ) |
517 | { | 545 | { |
@@ -522,6 +550,8 @@ int Bu::Myriad::getTotalUsedBytes() | |||
522 | 550 | ||
523 | int Bu::Myriad::getTotalUnusedBytes() | 551 | int Bu::Myriad::getTotalUnusedBytes() |
524 | { | 552 | { |
553 | MutexLocker mLock( mHeader ); | ||
554 | |||
525 | int iTotalSize = (iBlocks-iUsed)*iBlockSize; | 555 | int iTotalSize = (iBlocks-iUsed)*iBlockSize; |
526 | for( StreamArray::iterator i = aStreams.begin(); i; i++ ) | 556 | for( StreamArray::iterator i = aStreams.begin(); i; i++ ) |
527 | { | 557 | { |
@@ -532,6 +562,8 @@ int Bu::Myriad::getTotalUnusedBytes() | |||
532 | 562 | ||
533 | int Bu::Myriad::getTotalUnusedBytes( int iFakeBlockSize ) | 563 | int Bu::Myriad::getTotalUnusedBytes( int iFakeBlockSize ) |
534 | { | 564 | { |
565 | MutexLocker mLock( mHeader ); | ||
566 | |||
535 | int iTotalSize = (iBlocks-iUsed)*iFakeBlockSize; | 567 | int iTotalSize = (iBlocks-iUsed)*iFakeBlockSize; |
536 | for( StreamArray::iterator i = aStreams.begin(); i; i++ ) | 568 | for( StreamArray::iterator i = aStreams.begin(); i; i++ ) |
537 | { | 569 | { |
@@ -565,7 +597,9 @@ Bu::Myriad::Block *Bu::Myriad::getBlock( int iBlock ) | |||
565 | pBlock->bChanged = false; | 597 | pBlock->bChanged = false; |
566 | pBlock->iBlockIndex = iBlock; | 598 | pBlock->iBlockIndex = iBlock; |
567 | 599 | ||
600 | mActiveBlocks.lock(); | ||
568 | hActiveBlocks.insert( iBlock, pBlock ); | 601 | hActiveBlocks.insert( iBlock, pBlock ); |
602 | mActiveBlocks.unlock(); | ||
569 | 603 | ||
570 | return pBlock; | 604 | return pBlock; |
571 | } | 605 | } |
@@ -576,7 +610,10 @@ void Bu::Myriad::releaseBlock( Bu::Myriad::Block *pBlock ) | |||
576 | return; | 610 | return; |
577 | // sio << "Myriad: Releasing block " << pBlock->iBlockIndex << sio.nl; | 611 | // sio << "Myriad: Releasing block " << pBlock->iBlockIndex << sio.nl; |
578 | syncBlock( pBlock ); | 612 | syncBlock( pBlock ); |
613 | mActiveBlocks.lock(); | ||
579 | hActiveBlocks.erase( pBlock->iBlockIndex ); | 614 | hActiveBlocks.erase( pBlock->iBlockIndex ); |
615 | mActiveBlocks.unlock(); | ||
616 | |||
580 | delete[] pBlock->pData; | 617 | delete[] pBlock->pData; |
581 | delete pBlock; | 618 | delete pBlock; |
582 | } | 619 | } |
@@ -594,6 +631,8 @@ void Bu::Myriad::syncBlock( Block *pBlock ) | |||
594 | 631 | ||
595 | int Bu::Myriad::streamAddBlock( Stream *pStream ) | 632 | int Bu::Myriad::streamAddBlock( Stream *pStream ) |
596 | { | 633 | { |
634 | MutexLocker mLock( mHeader ); | ||
635 | |||
597 | int iBlock = findEmptyBlock(); | 636 | int iBlock = findEmptyBlock(); |
598 | pStream->aBlocks.append( iBlock ); | 637 | pStream->aBlocks.append( iBlock ); |
599 | bsBlockUsed.setBit( iBlock ); | 638 | bsBlockUsed.setBit( iBlock ); |
@@ -604,6 +643,8 @@ int Bu::Myriad::streamAddBlock( Stream *pStream ) | |||
604 | 643 | ||
605 | void Bu::Myriad::setStreamSize( Stream *pStream, long iSize ) | 644 | void Bu::Myriad::setStreamSize( Stream *pStream, long iSize ) |
606 | { | 645 | { |
646 | MutexLocker mLock( mHeader ); | ||
647 | |||
607 | if( pStream->iSize == iSize ) | 648 | if( pStream->iSize == iSize ) |
608 | { | 649 | { |
609 | return; | 650 | return; |
@@ -616,6 +657,8 @@ void Bu::Myriad::setStreamSize( Stream *pStream, long iSize ) | |||
616 | { | 657 | { |
617 | if( bsBlockUsed.getBit( pStream->aBlocks.last() ) ) | 658 | if( bsBlockUsed.getBit( pStream->aBlocks.last() ) ) |
618 | iUsed--; | 659 | iUsed--; |
660 | else | ||
661 | sio << "Unused block used in stream? " << pStream->aBlocks.last() << sio.nl; | ||
619 | bsBlockUsed.setBit( pStream->aBlocks.last(), false ); | 662 | bsBlockUsed.setBit( pStream->aBlocks.last(), false ); |
620 | pStream->aBlocks.eraseLast(); | 663 | pStream->aBlocks.eraseLast(); |
621 | } | 664 | } |
@@ -628,7 +671,12 @@ void Bu::Myriad::setStreamSize( Stream *pStream, long iSize ) | |||
628 | for( int iNewSize = pStream->aBlocks.getSize()*iBlockSize; | 671 | for( int iNewSize = pStream->aBlocks.getSize()*iBlockSize; |
629 | iNewSize < iSize; iNewSize += iBlockSize ) | 672 | iNewSize < iSize; iNewSize += iBlockSize ) |
630 | { | 673 | { |
631 | streamAddBlock( pStream ); | 674 | //streamAddBlock( pStream ); |
675 | int iBlock = findEmptyBlock(); | ||
676 | pStream->aBlocks.append( iBlock ); | ||
677 | bsBlockUsed.setBit( iBlock ); | ||
678 | bHeaderChanged = true; | ||
679 | iUsed++; | ||
632 | } | 680 | } |
633 | pStream->iSize = iSize; | 681 | pStream->iSize = iSize; |
634 | bHeaderChanged = true; | 682 | bHeaderChanged = true; |
@@ -642,12 +690,20 @@ void Bu::Myriad::headerChanged() | |||
642 | 690 | ||
643 | bool Bu::Myriad::isMyriad( Bu::Stream &sStore ) | 691 | bool Bu::Myriad::isMyriad( Bu::Stream &sStore ) |
644 | { | 692 | { |
693 | uint8_t uTmp; | ||
694 | |||
695 | return isMyriad( sStore, uTmp ); | ||
696 | } | ||
697 | |||
698 | bool Bu::Myriad::isMyriad( Bu::Stream &sStore, uint8_t &uTmp ) | ||
699 | { | ||
645 | sStore.setPos( 0 ); | 700 | sStore.setPos( 0 ); |
646 | 701 | ||
647 | unsigned char buf[4]; | 702 | unsigned char buf[4]; |
648 | if( sStore.read( buf, 4 ) < 4 ) | 703 | if( sStore.read( buf, 4 ) < 4 ) |
649 | throw MyriadException( MyriadException::emptyStream, | 704 | throw MyriadException( MyriadException::emptyStream, |
650 | "Input stream appears to be empty."); | 705 | "Input stream appears to be empty."); |
706 | sStore.read( &uTmp, 1 ); | ||
651 | sStore.setPos( 0 ); | 707 | sStore.setPos( 0 ); |
652 | if( memcmp( buf, Myriad_MAGIC_CODE, 4 ) ) | 708 | if( memcmp( buf, Myriad_MAGIC_CODE, 4 ) ) |
653 | { | 709 | { |
diff --git a/src/stable/myriad.h b/src/stable/myriad.h index 3382ab5..14e8c73 100644 --- a/src/stable/myriad.h +++ b/src/stable/myriad.h | |||
@@ -13,6 +13,7 @@ | |||
13 | #include "bu/exceptionbase.h" | 13 | #include "bu/exceptionbase.h" |
14 | #include "bu/array.h" | 14 | #include "bu/array.h" |
15 | #include "bu/hash.h" | 15 | #include "bu/hash.h" |
16 | #include "bu/mutex.h" | ||
16 | 17 | ||
17 | namespace Bu | 18 | namespace Bu |
18 | { | 19 | { |
@@ -153,6 +154,13 @@ namespace Bu | |||
153 | * depending on weather or not it's a Myriad stream. This will throw | 154 | * depending on weather or not it's a Myriad stream. This will throw |
154 | * an exception if the stream is empty, or is not random access. | 155 | * an exception if the stream is empty, or is not random access. |
155 | */ | 156 | */ |
157 | static bool isMyriad( Bu::Stream &sStore, uint8_t &uVer ); | ||
158 | |||
159 | /** | ||
160 | * Read the first few bytes from the given stream and return true/false | ||
161 | * depending on weather or not it's a Myriad stream. This will throw | ||
162 | * an exception if the stream is empty, or is not random access. | ||
163 | */ | ||
156 | static bool isMyriad( Bu::Stream &sStore ); | 164 | static bool isMyriad( Bu::Stream &sStore ); |
157 | 165 | ||
158 | const Bu::BitString &getBlocksUsed() const; | 166 | const Bu::BitString &getBlocksUsed() const; |
@@ -216,6 +224,9 @@ namespace Bu | |||
216 | typedef Bu::Hash<int, Block *> BlockHash; | 224 | typedef Bu::Hash<int, Block *> BlockHash; |
217 | BlockHash hActiveBlocks; | 225 | BlockHash hActiveBlocks; |
218 | bool bHeaderChanged; | 226 | bool bHeaderChanged; |
227 | |||
228 | Bu::Mutex mHeader; | ||
229 | Bu::Mutex mActiveBlocks; | ||
219 | }; | 230 | }; |
220 | }; | 231 | }; |
221 | 232 | ||