aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Buland <eichlan@xagasoft.com>2012-04-14 17:38:13 +0000
committerMike Buland <eichlan@xagasoft.com>2012-04-14 17:38:13 +0000
commit49d9fa3c2435b8e97ffdc42e40a880a3dad82f8a (patch)
tree98afeeeccd5105ea812f1b6507cbd6ef1364a91f
parent91f9d6e8b371f339dbcc16541054f9cb371d0ec9 (diff)
downloadlibbu++-49d9fa3c2435b8e97ffdc42e40a880a3dad82f8a.tar.gz
libbu++-49d9fa3c2435b8e97ffdc42e40a880a3dad82f8a.tar.bz2
libbu++-49d9fa3c2435b8e97ffdc42e40a880a3dad82f8a.tar.xz
libbu++-49d9fa3c2435b8e97ffdc42e40a880a3dad82f8a.zip
The core of myriad is now thread-safe. It could use a little work on multiple
streams accessing the same blocks at the same time. Right now it's safe, but a little strange, since both streams work from seperate buffers. Maybe that's ok, but it still needs some more investigation. I want to remove the BitString stuff entirely, it turns out it's the slowest part, which, upon actually looking at the code is completely obvious. This change shouldn't change the API at all, just make adding blocks to streams much, much faster.
Diffstat (limited to '')
-rw-r--r--default.bld7
-rw-r--r--src/extra/myriadfs.cpp1
-rw-r--r--src/stable/myriad.cpp58
-rw-r--r--src/stable/myriad.h11
4 files changed, 76 insertions, 1 deletions
diff --git a/default.bld b/default.bld
index c6721b2..c7a597e 100644
--- a/default.bld
+++ b/default.bld
@@ -14,6 +14,8 @@
14include "gensigs.bld"; 14include "gensigs.bld";
15 15
16CXXFLAGS += "-ggdb -W -Wall -I."; 16CXXFLAGS += "-ggdb -W -Wall -I.";
17// CXXFLAGS += "-pg";
18// LDFLAGS += "-pg";
17 19
18action "default" 20action "default"
19{ 21{
@@ -149,6 +151,11 @@ target "viewcsv"
149 LDFLAGS += "-lncurses"; 151 LDFLAGS += "-lncurses";
150} 152}
151 153
154target ["myriad", "myriadfs", "tests/myriad", "tests/myriadfs", "unit/myriad"]
155{
156 LDFLAGS += "-lpthread";
157}
158
152target files("src/extra/*.cpp").replace("src/extra/","").replace(".cpp","") 159target files("src/extra/*.cpp").replace("src/extra/","").replace(".cpp","")
153{ 160{
154 input "src/extra/${OUTPUT}.cpp"; 161 input "src/extra/${OUTPUT}.cpp";
diff --git a/src/extra/myriadfs.cpp b/src/extra/myriadfs.cpp
index 3e4599b..8e48ec0 100644
--- a/src/extra/myriadfs.cpp
+++ b/src/extra/myriadfs.cpp
@@ -5,6 +5,7 @@
5 * terms of the license contained in the file LICENSE. 5 * terms of the license contained in the file LICENSE.
6 */ 6 */
7 7
8#define _FILE_OFFSET_BITS 64
8#define FUSE_USE_VERSION 26 9#define FUSE_USE_VERSION 26
9 10
10#include <fuse.h> 11#include <fuse.h>
diff --git a/src/stable/myriad.cpp b/src/stable/myriad.cpp
index de44930..9030b83 100644
--- a/src/stable/myriad.cpp
+++ b/src/stable/myriad.cpp
@@ -8,6 +8,7 @@
8#include "bu/myriad.h" 8#include "bu/myriad.h"
9#include "bu/stream.h" 9#include "bu/stream.h"
10#include "bu/myriadstream.h" 10#include "bu/myriadstream.h"
11#include "bu/mutexlocker.h"
11#include <stdio.h> 12#include <stdio.h>
12 13
13#include "bu/sio.h" 14#include "bu/sio.h"
@@ -50,11 +51,13 @@ Bu::Myriad::Myriad( Bu::Stream &sStore, int iBlockSize, int iPreallocate ) :
50 51
51Bu::Myriad::~Myriad() 52Bu::Myriad::~Myriad()
52{ 53{
54 mActiveBlocks.lock();
53 if( !hActiveBlocks.isEmpty() ) 55 if( !hActiveBlocks.isEmpty() )
54 { 56 {
55 sio << "Bu::Myriad::~Myriad(): Error: There are " 57 sio << "Bu::Myriad::~Myriad(): Error: There are "
56 << hActiveBlocks.getSize() << " unsynced blocks!" << sio.nl; 58 << hActiveBlocks.getSize() << " unsynced blocks!" << sio.nl;
57 } 59 }
60 mActiveBlocks.unlock();
58 sync(); 61 sync();
59 62
60 for( StreamArray::iterator i = aStreams.begin(); i; i++ ) 63 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
@@ -67,6 +70,7 @@ void Bu::Myriad::sync()
67{ 70{
68 updateHeader(); 71 updateHeader();
69 72
73 mActiveBlocks.lock();
70 for( BlockHash::iterator i = hActiveBlocks.begin(); i; i++ ) 74 for( BlockHash::iterator i = hActiveBlocks.begin(); i; i++ )
71 { 75 {
72 if( (*i)->bChanged ) 76 if( (*i)->bChanged )
@@ -74,10 +78,12 @@ void Bu::Myriad::sync()
74 syncBlock( *i ); 78 syncBlock( *i );
75 } 79 }
76 } 80 }
81 mActiveBlocks.unlock();
77} 82}
78 83
79void Bu::Myriad::initialize() 84void Bu::Myriad::initialize()
80{ 85{
86 MutexLocker mLock( mHeader );
81 sStore.setPosEnd( 0 ); 87 sStore.setPosEnd( 0 );
82 int iSize = sStore.tell(); 88 int iSize = sStore.tell();
83 sStore.setPos( 0 ); 89 sStore.setPos( 0 );
@@ -173,6 +179,8 @@ void Bu::Myriad::initialize()
173 179
174void Bu::Myriad::initialize( int iBlockSize, int iPreAllocate ) 180void Bu::Myriad::initialize( int iBlockSize, int iPreAllocate )
175{ 181{
182 MutexLocker mLock( mHeader );
183
176 for( StreamArray::iterator i = aStreams.begin(); i; i++ ) 184 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
177 { 185 {
178 delete *i; 186 delete *i;
@@ -262,6 +270,8 @@ void Bu::Myriad::initialize( int iBlockSize, int iPreAllocate )
262 270
263void Bu::Myriad::updateHeader() 271void Bu::Myriad::updateHeader()
264{ 272{
273 MutexLocker mLock( mHeader );
274
265 if( bHeaderChanged == false ) 275 if( bHeaderChanged == false )
266 return; 276 return;
267 if( !sStore.canWrite() ) 277 if( !sStore.canWrite() )
@@ -337,6 +347,8 @@ void Bu::Myriad::updateHeader()
337 347
338int Bu::Myriad::createStream( int iPreAllocate ) 348int Bu::Myriad::createStream( int iPreAllocate )
339{ 349{
350 MutexLocker mLock( mHeader );
351
340 Stream *pStr = new Stream(); 352 Stream *pStr = new Stream();
341 pStr->iId = aStreams.last()->iId+1; 353 pStr->iId = aStreams.last()->iId+1;
342 //sio << "Myriad: New stream id=" << pStr->iId << ", iPreAllocate=" 354 //sio << "Myriad: New stream id=" << pStr->iId << ", iPreAllocate="
@@ -360,6 +372,8 @@ int Bu::Myriad::createStream( int iPreAllocate )
360 372
361int Bu::Myriad::createStreamWithId( int iId, int iPreAllocate ) 373int Bu::Myriad::createStreamWithId( int iId, int iPreAllocate )
362{ 374{
375 MutexLocker mLock( mHeader );
376
363 try 377 try
364 { 378 {
365 findStream( iId ); 379 findStream( iId );
@@ -432,6 +446,8 @@ int Bu::Myriad::findEmptyBlock()
432 446
433void Bu::Myriad::deleteStream( int iId ) 447void Bu::Myriad::deleteStream( int iId )
434{ 448{
449 MutexLocker mLock( mHeader );
450
435 if( iId < 0 ) 451 if( iId < 0 )
436 throw MyriadException( MyriadException::invalidStreamId, 452 throw MyriadException( MyriadException::invalidStreamId,
437 "Invalid stream id."); 453 "Invalid stream id.");
@@ -458,6 +474,8 @@ void Bu::Myriad::deleteStream( int iId )
458 474
459Bu::Array<int> Bu::Myriad::getStreamIds() 475Bu::Array<int> Bu::Myriad::getStreamIds()
460{ 476{
477 MutexLocker mLock( mHeader );
478
461 Bu::Array<int> aRet( aStreams.getSize() ); 479 Bu::Array<int> aRet( aStreams.getSize() );
462 for( StreamArray::iterator i = aStreams.begin(); i; i++ ) 480 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
463 { 481 {
@@ -469,11 +487,15 @@ Bu::Array<int> Bu::Myriad::getStreamIds()
469 487
470int Bu::Myriad::getStreamSize( int iId ) 488int Bu::Myriad::getStreamSize( int iId )
471{ 489{
490 MutexLocker mLock( mHeader );
491
472 return findStream( iId )->iSize; 492 return findStream( iId )->iSize;
473} 493}
474 494
475bool Bu::Myriad::hasStream( int iId ) 495bool Bu::Myriad::hasStream( int iId )
476{ 496{
497 MutexLocker mLock( mHeader );
498
477 try 499 try
478 { 500 {
479 findStream( iId ); 501 findStream( iId );
@@ -486,12 +508,16 @@ bool Bu::Myriad::hasStream( int iId )
486 508
487Bu::MyriadStream Bu::Myriad::openStream( int iId ) 509Bu::MyriadStream Bu::Myriad::openStream( int iId )
488{ 510{
511 MutexLocker mLock( mHeader );
512
489 //sio << "Myriad: Request to open stream: " << iId << sio.nl; 513 //sio << "Myriad: Request to open stream: " << iId << sio.nl;
490 return MyriadStream( *this, findStream( iId ) ); 514 return MyriadStream( *this, findStream( iId ) );
491} 515}
492 516
493int Bu::Myriad::getNumStreams() 517int Bu::Myriad::getNumStreams()
494{ 518{
519 MutexLocker mLock( mHeader );
520
495 return aStreams.getSize(); 521 return aStreams.getSize();
496} 522}
497 523
@@ -512,6 +538,8 @@ int Bu::Myriad::getNumUsedBlocks()
512 538
513int Bu::Myriad::getTotalUsedBytes() 539int Bu::Myriad::getTotalUsedBytes()
514{ 540{
541 MutexLocker mLock( mHeader );
542
515 int iTotalSize = 0; 543 int iTotalSize = 0;
516 for( StreamArray::iterator i = aStreams.begin(); i; i++ ) 544 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
517 { 545 {
@@ -522,6 +550,8 @@ int Bu::Myriad::getTotalUsedBytes()
522 550
523int Bu::Myriad::getTotalUnusedBytes() 551int Bu::Myriad::getTotalUnusedBytes()
524{ 552{
553 MutexLocker mLock( mHeader );
554
525 int iTotalSize = (iBlocks-iUsed)*iBlockSize; 555 int iTotalSize = (iBlocks-iUsed)*iBlockSize;
526 for( StreamArray::iterator i = aStreams.begin(); i; i++ ) 556 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
527 { 557 {
@@ -532,6 +562,8 @@ int Bu::Myriad::getTotalUnusedBytes()
532 562
533int Bu::Myriad::getTotalUnusedBytes( int iFakeBlockSize ) 563int Bu::Myriad::getTotalUnusedBytes( int iFakeBlockSize )
534{ 564{
565 MutexLocker mLock( mHeader );
566
535 int iTotalSize = (iBlocks-iUsed)*iFakeBlockSize; 567 int iTotalSize = (iBlocks-iUsed)*iFakeBlockSize;
536 for( StreamArray::iterator i = aStreams.begin(); i; i++ ) 568 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
537 { 569 {
@@ -565,7 +597,9 @@ Bu::Myriad::Block *Bu::Myriad::getBlock( int iBlock )
565 pBlock->bChanged = false; 597 pBlock->bChanged = false;
566 pBlock->iBlockIndex = iBlock; 598 pBlock->iBlockIndex = iBlock;
567 599
600 mActiveBlocks.lock();
568 hActiveBlocks.insert( iBlock, pBlock ); 601 hActiveBlocks.insert( iBlock, pBlock );
602 mActiveBlocks.unlock();
569 603
570 return pBlock; 604 return pBlock;
571} 605}
@@ -576,7 +610,10 @@ void Bu::Myriad::releaseBlock( Bu::Myriad::Block *pBlock )
576 return; 610 return;
577// sio << "Myriad: Releasing block " << pBlock->iBlockIndex << sio.nl; 611// sio << "Myriad: Releasing block " << pBlock->iBlockIndex << sio.nl;
578 syncBlock( pBlock ); 612 syncBlock( pBlock );
613 mActiveBlocks.lock();
579 hActiveBlocks.erase( pBlock->iBlockIndex ); 614 hActiveBlocks.erase( pBlock->iBlockIndex );
615 mActiveBlocks.unlock();
616
580 delete[] pBlock->pData; 617 delete[] pBlock->pData;
581 delete pBlock; 618 delete pBlock;
582} 619}
@@ -594,6 +631,8 @@ void Bu::Myriad::syncBlock( Block *pBlock )
594 631
595int Bu::Myriad::streamAddBlock( Stream *pStream ) 632int Bu::Myriad::streamAddBlock( Stream *pStream )
596{ 633{
634 MutexLocker mLock( mHeader );
635
597 int iBlock = findEmptyBlock(); 636 int iBlock = findEmptyBlock();
598 pStream->aBlocks.append( iBlock ); 637 pStream->aBlocks.append( iBlock );
599 bsBlockUsed.setBit( iBlock ); 638 bsBlockUsed.setBit( iBlock );
@@ -604,6 +643,8 @@ int Bu::Myriad::streamAddBlock( Stream *pStream )
604 643
605void Bu::Myriad::setStreamSize( Stream *pStream, long iSize ) 644void Bu::Myriad::setStreamSize( Stream *pStream, long iSize )
606{ 645{
646 MutexLocker mLock( mHeader );
647
607 if( pStream->iSize == iSize ) 648 if( pStream->iSize == iSize )
608 { 649 {
609 return; 650 return;
@@ -616,6 +657,8 @@ void Bu::Myriad::setStreamSize( Stream *pStream, long iSize )
616 { 657 {
617 if( bsBlockUsed.getBit( pStream->aBlocks.last() ) ) 658 if( bsBlockUsed.getBit( pStream->aBlocks.last() ) )
618 iUsed--; 659 iUsed--;
660 else
661 sio << "Unused block used in stream? " << pStream->aBlocks.last() << sio.nl;
619 bsBlockUsed.setBit( pStream->aBlocks.last(), false ); 662 bsBlockUsed.setBit( pStream->aBlocks.last(), false );
620 pStream->aBlocks.eraseLast(); 663 pStream->aBlocks.eraseLast();
621 } 664 }
@@ -628,7 +671,12 @@ void Bu::Myriad::setStreamSize( Stream *pStream, long iSize )
628 for( int iNewSize = pStream->aBlocks.getSize()*iBlockSize; 671 for( int iNewSize = pStream->aBlocks.getSize()*iBlockSize;
629 iNewSize < iSize; iNewSize += iBlockSize ) 672 iNewSize < iSize; iNewSize += iBlockSize )
630 { 673 {
631 streamAddBlock( pStream ); 674 //streamAddBlock( pStream );
675 int iBlock = findEmptyBlock();
676 pStream->aBlocks.append( iBlock );
677 bsBlockUsed.setBit( iBlock );
678 bHeaderChanged = true;
679 iUsed++;
632 } 680 }
633 pStream->iSize = iSize; 681 pStream->iSize = iSize;
634 bHeaderChanged = true; 682 bHeaderChanged = true;
@@ -642,12 +690,20 @@ void Bu::Myriad::headerChanged()
642 690
643bool Bu::Myriad::isMyriad( Bu::Stream &sStore ) 691bool Bu::Myriad::isMyriad( Bu::Stream &sStore )
644{ 692{
693 uint8_t uTmp;
694
695 return isMyriad( sStore, uTmp );
696}
697
698bool Bu::Myriad::isMyriad( Bu::Stream &sStore, uint8_t &uTmp )
699{
645 sStore.setPos( 0 ); 700 sStore.setPos( 0 );
646 701
647 unsigned char buf[4]; 702 unsigned char buf[4];
648 if( sStore.read( buf, 4 ) < 4 ) 703 if( sStore.read( buf, 4 ) < 4 )
649 throw MyriadException( MyriadException::emptyStream, 704 throw MyriadException( MyriadException::emptyStream,
650 "Input stream appears to be empty."); 705 "Input stream appears to be empty.");
706 sStore.read( &uTmp, 1 );
651 sStore.setPos( 0 ); 707 sStore.setPos( 0 );
652 if( memcmp( buf, Myriad_MAGIC_CODE, 4 ) ) 708 if( memcmp( buf, Myriad_MAGIC_CODE, 4 ) )
653 { 709 {
diff --git a/src/stable/myriad.h b/src/stable/myriad.h
index 3382ab5..14e8c73 100644
--- a/src/stable/myriad.h
+++ b/src/stable/myriad.h
@@ -13,6 +13,7 @@
13#include "bu/exceptionbase.h" 13#include "bu/exceptionbase.h"
14#include "bu/array.h" 14#include "bu/array.h"
15#include "bu/hash.h" 15#include "bu/hash.h"
16#include "bu/mutex.h"
16 17
17namespace Bu 18namespace Bu
18{ 19{
@@ -153,6 +154,13 @@ namespace Bu
153 * depending on weather or not it's a Myriad stream. This will throw 154 * depending on weather or not it's a Myriad stream. This will throw
154 * an exception if the stream is empty, or is not random access. 155 * an exception if the stream is empty, or is not random access.
155 */ 156 */
157 static bool isMyriad( Bu::Stream &sStore, uint8_t &uVer );
158
159 /**
160 * Read the first few bytes from the given stream and return true/false
161 * depending on weather or not it's a Myriad stream. This will throw
162 * an exception if the stream is empty, or is not random access.
163 */
156 static bool isMyriad( Bu::Stream &sStore ); 164 static bool isMyriad( Bu::Stream &sStore );
157 165
158 const Bu::BitString &getBlocksUsed() const; 166 const Bu::BitString &getBlocksUsed() const;
@@ -216,6 +224,9 @@ namespace Bu
216 typedef Bu::Hash<int, Block *> BlockHash; 224 typedef Bu::Hash<int, Block *> BlockHash;
217 BlockHash hActiveBlocks; 225 BlockHash hActiveBlocks;
218 bool bHeaderChanged; 226 bool bHeaderChanged;
227
228 Bu::Mutex mHeader;
229 Bu::Mutex mActiveBlocks;
219 }; 230 };
220}; 231};
221 232