From 364ded646ad4de2065cdf0a2f4282c7eaf7148e6 Mon Sep 17 00:00:00 2001 From: Mike Buland Date: Wed, 18 Jul 2012 06:45:17 +0000 Subject: Bu::SynchroQueue had a bug, it seems to be all fixed now. --- default.bld | 2 +- src/stable/synchroqueue.h | 44 ++++++++------- src/tests/synchroqueue.cpp | 131 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 153 insertions(+), 24 deletions(-) create mode 100644 src/tests/synchroqueue.cpp diff --git a/default.bld b/default.bld index c1b93af..d46662c 100644 --- a/default.bld +++ b/default.bld @@ -151,7 +151,7 @@ target "viewcsv" LDFLAGS += "-lncurses"; } -target ["myriad", "myriadfs", "tests/myriad", "tests/myriadfs", "unit/myriad", "tests/bigmyriad"] +target ["myriad", "myriadfs", "tests/myriad", "tests/myriadfs", "unit/myriad", "tests/bigmyriad", "tests/synchroqueue"] { LDFLAGS += "-lpthread"; } diff --git a/src/stable/synchroqueue.h b/src/stable/synchroqueue.h index a0335f6..7ba299f 100644 --- a/src/stable/synchroqueue.h +++ b/src/stable/synchroqueue.h @@ -56,6 +56,7 @@ namespace Bu */ ~SynchroQueue() { + cBlock.lock(); Item *pCur = pStart; while( pCur ) { @@ -63,6 +64,7 @@ namespace Bu delete pCur; pCur = pTmp; } + cBlock.unlock(); } /** @@ -74,7 +76,7 @@ namespace Bu */ void enqueue( T pData ) { - mOperate.lock(); + cBlock.lock(); if( pStart == NULL ) { @@ -93,8 +95,8 @@ namespace Bu } cBlock.signal(); - - mOperate.unlock(); + + cBlock.unlock(); } /** @@ -117,17 +119,20 @@ namespace Bu */ T dequeue( bool bBlock=false ) { - mOperate.lock(); + cBlock.lock(); if( pStart == NULL ) { - mOperate.unlock(); - if( bBlock ) { - cBlock.lock(); - - while( pStart == NULL ) + for(;;) + { + if( pStart != NULL ) + { + cBlock.unlock(); + break; + } cBlock.wait(); + } T tmp = dequeue( false ); @@ -146,7 +151,7 @@ namespace Bu delete pDel; nSize--; - mOperate.unlock(); + cBlock.unlock(); return pTmp; } } @@ -164,13 +169,9 @@ namespace Bu */ T dequeue( int nSec, int nUSec ) { - mOperate.lock(); + cBlock.lock(); if( pStart == NULL ) { - mOperate.unlock(); - - cBlock.lock(); - cBlock.wait( nSec, nUSec ); if( pStart == NULL ) @@ -179,13 +180,11 @@ namespace Bu return NULL; } - mOperate.lock(); T pTmp = pStart->pData; Item *pDel = pStart; pStart = pStart->pNext; delete pDel; nSize--; - mOperate.unlock(); cBlock.unlock(); return pTmp; @@ -198,7 +197,7 @@ namespace Bu delete pDel; nSize--; - mOperate.unlock(); + cBlock.unlock(); return pTmp; } } @@ -211,18 +210,18 @@ namespace Bu */ bool isEmpty() { - mOperate.lock(); + cBlock.lock(); bool bEmpty = (pStart == NULL ); - mOperate.unlock(); + cBlock.unlock(); return bEmpty; } long getSize() { - mOperate.lock(); + cBlock.lock(); long nRet = nSize; - mOperate.unlock(); + cBlock.unlock(); return nRet; } @@ -232,7 +231,6 @@ namespace Bu Item *pEnd; /**< The end of the queue, the last element to dequeue. */ long nSize; /**< The number of items in the queue. */ - Mutex mOperate; /**< The master mutex, used on all operations. */ Condition cBlock; /**< The condition for blocking dequeues. */ }; } diff --git a/src/tests/synchroqueue.cpp b/src/tests/synchroqueue.cpp new file mode 100644 index 0000000..980a4a3 --- /dev/null +++ b/src/tests/synchroqueue.cpp @@ -0,0 +1,131 @@ +#include +#include +#include + +class Thing +{ +public: + Thing( int x ) : + x( x ), + y( 0 ) + { + } + + int x; + int y; +}; + +typedef Bu::SynchroQueue ThingQueue; + +Bu::Mutex mWorkDone; +int iWorkDone; +Bu::Condition cWorkDone; + +void workDone() +{ + mWorkDone.lock(); + iWorkDone--; + if( iWorkDone == 0 ) + { + mWorkDone.unlock(); + cWorkDone.lock(); + cWorkDone.signal(); + cWorkDone.unlock(); + return; + } + mWorkDone.unlock(); +} + +class ThingEater : public Bu::Thread +{ +public: + ThingEater( ThingQueue &qThing ) : + qThing( qThing ) + { + } + + bool bRunning; + + void setRunning( bool b ) + { + mRunning.lock(); + bRunning = b; + mRunning.unlock(); + } + + bool isRunning() + { + mRunning.lock(); + bool b = bRunning; + mRunning.unlock(); + return b; + } + +protected: + virtual void run() + { + setRunning( true ); + while( isRunning() ) + { + Thing *pThing = qThing.dequeue( 0, 250000 ); + if( pThing == NULL ) + continue; + + pThing->y = pThing->x*2; + usleep( 10000 ); + + workDone(); + } + } + + ThingQueue &qThing; + Bu::Mutex mRunning; +}; + +typedef Bu::List ThingEaterList; + +int main() +{ + ThingQueue qThing; + ThingEaterList lEater; + + mWorkDone.lock(); + iWorkDone = 1000; + mWorkDone.unlock(); + + for( int j = 0; j < 5; j++ ) + lEater.append( new ThingEater( qThing ) ); + + for( ThingEaterList::iterator i = lEater.begin(); i; i++ ) + (*i)->start(); + + for( int j = 0; j < 1000; j++ ) + { + qThing.enqueue( new Thing( j ) ); + } + + mWorkDone.lock(); + mWorkDone.unlock(); + cWorkDone.lock(); + for(;;) + { + mWorkDone.lock(); + if( iWorkDone == 0 ) + { + mWorkDone.unlock(); + break; + } + mWorkDone.unlock(); + cWorkDone.wait(); + } + cWorkDone.unlock(); + + for( ThingEaterList::iterator i = lEater.begin(); i; i++ ) + (*i)->setRunning( false ); + + for( ThingEaterList::iterator i = lEater.begin(); i; i++ ) + (*i)->join(); + + return 0; +} + -- cgit v1.2.3