diff options
| -rw-r--r-- | default.bld | 2 | ||||
| -rw-r--r-- | src/stable/synchroqueue.h | 44 | ||||
| -rw-r--r-- | src/tests/synchroqueue.cpp | 131 |
3 files changed, 153 insertions, 24 deletions
diff --git a/default.bld b/default.bld index c1b93af..d46662c 100644 --- a/default.bld +++ b/default.bld | |||
| @@ -151,7 +151,7 @@ target "viewcsv" | |||
| 151 | LDFLAGS += "-lncurses"; | 151 | LDFLAGS += "-lncurses"; |
| 152 | } | 152 | } |
| 153 | 153 | ||
| 154 | target ["myriad", "myriadfs", "tests/myriad", "tests/myriadfs", "unit/myriad", "tests/bigmyriad"] | 154 | target ["myriad", "myriadfs", "tests/myriad", "tests/myriadfs", "unit/myriad", "tests/bigmyriad", "tests/synchroqueue"] |
| 155 | { | 155 | { |
| 156 | LDFLAGS += "-lpthread"; | 156 | LDFLAGS += "-lpthread"; |
| 157 | } | 157 | } |
diff --git a/src/stable/synchroqueue.h b/src/stable/synchroqueue.h index a0335f6..7ba299f 100644 --- a/src/stable/synchroqueue.h +++ b/src/stable/synchroqueue.h | |||
| @@ -56,6 +56,7 @@ namespace Bu | |||
| 56 | */ | 56 | */ |
| 57 | ~SynchroQueue() | 57 | ~SynchroQueue() |
| 58 | { | 58 | { |
| 59 | cBlock.lock(); | ||
| 59 | Item *pCur = pStart; | 60 | Item *pCur = pStart; |
| 60 | while( pCur ) | 61 | while( pCur ) |
| 61 | { | 62 | { |
| @@ -63,6 +64,7 @@ namespace Bu | |||
| 63 | delete pCur; | 64 | delete pCur; |
| 64 | pCur = pTmp; | 65 | pCur = pTmp; |
| 65 | } | 66 | } |
| 67 | cBlock.unlock(); | ||
| 66 | } | 68 | } |
| 67 | 69 | ||
| 68 | /** | 70 | /** |
| @@ -74,7 +76,7 @@ namespace Bu | |||
| 74 | */ | 76 | */ |
| 75 | void enqueue( T pData ) | 77 | void enqueue( T pData ) |
| 76 | { | 78 | { |
| 77 | mOperate.lock(); | 79 | cBlock.lock(); |
| 78 | 80 | ||
| 79 | if( pStart == NULL ) | 81 | if( pStart == NULL ) |
| 80 | { | 82 | { |
| @@ -93,8 +95,8 @@ namespace Bu | |||
| 93 | } | 95 | } |
| 94 | 96 | ||
| 95 | cBlock.signal(); | 97 | cBlock.signal(); |
| 96 | 98 | ||
| 97 | mOperate.unlock(); | 99 | cBlock.unlock(); |
| 98 | } | 100 | } |
| 99 | 101 | ||
| 100 | /** | 102 | /** |
| @@ -117,17 +119,20 @@ namespace Bu | |||
| 117 | */ | 119 | */ |
| 118 | T dequeue( bool bBlock=false ) | 120 | T dequeue( bool bBlock=false ) |
| 119 | { | 121 | { |
| 120 | mOperate.lock(); | 122 | cBlock.lock(); |
| 121 | if( pStart == NULL ) | 123 | if( pStart == NULL ) |
| 122 | { | 124 | { |
| 123 | mOperate.unlock(); | ||
| 124 | |||
| 125 | if( bBlock ) | 125 | if( bBlock ) |
| 126 | { | 126 | { |
| 127 | cBlock.lock(); | 127 | for(;;) |
| 128 | 128 | { | |
| 129 | while( pStart == NULL ) | 129 | if( pStart != NULL ) |
| 130 | { | ||
| 131 | cBlock.unlock(); | ||
| 132 | break; | ||
| 133 | } | ||
| 130 | cBlock.wait(); | 134 | cBlock.wait(); |
| 135 | } | ||
| 131 | 136 | ||
| 132 | T tmp = dequeue( false ); | 137 | T tmp = dequeue( false ); |
| 133 | 138 | ||
| @@ -146,7 +151,7 @@ namespace Bu | |||
| 146 | delete pDel; | 151 | delete pDel; |
| 147 | nSize--; | 152 | nSize--; |
| 148 | 153 | ||
| 149 | mOperate.unlock(); | 154 | cBlock.unlock(); |
| 150 | return pTmp; | 155 | return pTmp; |
| 151 | } | 156 | } |
| 152 | } | 157 | } |
| @@ -164,13 +169,9 @@ namespace Bu | |||
| 164 | */ | 169 | */ |
| 165 | T dequeue( int nSec, int nUSec ) | 170 | T dequeue( int nSec, int nUSec ) |
| 166 | { | 171 | { |
| 167 | mOperate.lock(); | 172 | cBlock.lock(); |
| 168 | if( pStart == NULL ) | 173 | if( pStart == NULL ) |
| 169 | { | 174 | { |
| 170 | mOperate.unlock(); | ||
| 171 | |||
| 172 | cBlock.lock(); | ||
| 173 | |||
| 174 | cBlock.wait( nSec, nUSec ); | 175 | cBlock.wait( nSec, nUSec ); |
| 175 | 176 | ||
| 176 | if( pStart == NULL ) | 177 | if( pStart == NULL ) |
| @@ -179,13 +180,11 @@ namespace Bu | |||
| 179 | return NULL; | 180 | return NULL; |
| 180 | } | 181 | } |
| 181 | 182 | ||
| 182 | mOperate.lock(); | ||
| 183 | T pTmp = pStart->pData; | 183 | T pTmp = pStart->pData; |
| 184 | Item *pDel = pStart; | 184 | Item *pDel = pStart; |
| 185 | pStart = pStart->pNext; | 185 | pStart = pStart->pNext; |
| 186 | delete pDel; | 186 | delete pDel; |
| 187 | nSize--; | 187 | nSize--; |
| 188 | mOperate.unlock(); | ||
| 189 | 188 | ||
| 190 | cBlock.unlock(); | 189 | cBlock.unlock(); |
| 191 | return pTmp; | 190 | return pTmp; |
| @@ -198,7 +197,7 @@ namespace Bu | |||
| 198 | delete pDel; | 197 | delete pDel; |
| 199 | nSize--; | 198 | nSize--; |
| 200 | 199 | ||
| 201 | mOperate.unlock(); | 200 | cBlock.unlock(); |
| 202 | return pTmp; | 201 | return pTmp; |
| 203 | } | 202 | } |
| 204 | } | 203 | } |
| @@ -211,18 +210,18 @@ namespace Bu | |||
| 211 | */ | 210 | */ |
| 212 | bool isEmpty() | 211 | bool isEmpty() |
| 213 | { | 212 | { |
| 214 | mOperate.lock(); | 213 | cBlock.lock(); |
| 215 | bool bEmpty = (pStart == NULL ); | 214 | bool bEmpty = (pStart == NULL ); |
| 216 | mOperate.unlock(); | 215 | cBlock.unlock(); |
| 217 | 216 | ||
| 218 | return bEmpty; | 217 | return bEmpty; |
| 219 | } | 218 | } |
| 220 | 219 | ||
| 221 | long getSize() | 220 | long getSize() |
| 222 | { | 221 | { |
| 223 | mOperate.lock(); | 222 | cBlock.lock(); |
| 224 | long nRet = nSize; | 223 | long nRet = nSize; |
| 225 | mOperate.unlock(); | 224 | cBlock.unlock(); |
| 226 | 225 | ||
| 227 | return nRet; | 226 | return nRet; |
| 228 | } | 227 | } |
| @@ -232,7 +231,6 @@ namespace Bu | |||
| 232 | Item *pEnd; /**< The end of the queue, the last element to dequeue. */ | 231 | Item *pEnd; /**< The end of the queue, the last element to dequeue. */ |
| 233 | long nSize; /**< The number of items in the queue. */ | 232 | long nSize; /**< The number of items in the queue. */ |
| 234 | 233 | ||
| 235 | Mutex mOperate; /**< The master mutex, used on all operations. */ | ||
| 236 | Condition cBlock; /**< The condition for blocking dequeues. */ | 234 | Condition cBlock; /**< The condition for blocking dequeues. */ |
| 237 | }; | 235 | }; |
| 238 | } | 236 | } |
diff --git a/src/tests/synchroqueue.cpp b/src/tests/synchroqueue.cpp new file mode 100644 index 0000000..980a4a3 --- /dev/null +++ b/src/tests/synchroqueue.cpp | |||
| @@ -0,0 +1,131 @@ | |||
| 1 | #include <bu/thread.h> | ||
| 2 | #include <bu/synchroqueue.h> | ||
| 3 | #include <bu/list.h> | ||
| 4 | |||
| 5 | class Thing | ||
| 6 | { | ||
| 7 | public: | ||
| 8 | Thing( int x ) : | ||
| 9 | x( x ), | ||
| 10 | y( 0 ) | ||
| 11 | { | ||
| 12 | } | ||
| 13 | |||
| 14 | int x; | ||
| 15 | int y; | ||
| 16 | }; | ||
| 17 | |||
| 18 | typedef Bu::SynchroQueue<Thing *> ThingQueue; | ||
| 19 | |||
| 20 | Bu::Mutex mWorkDone; | ||
| 21 | int iWorkDone; | ||
| 22 | Bu::Condition cWorkDone; | ||
| 23 | |||
| 24 | void workDone() | ||
| 25 | { | ||
| 26 | mWorkDone.lock(); | ||
| 27 | iWorkDone--; | ||
| 28 | if( iWorkDone == 0 ) | ||
| 29 | { | ||
| 30 | mWorkDone.unlock(); | ||
| 31 | cWorkDone.lock(); | ||
| 32 | cWorkDone.signal(); | ||
| 33 | cWorkDone.unlock(); | ||
| 34 | return; | ||
| 35 | } | ||
| 36 | mWorkDone.unlock(); | ||
| 37 | } | ||
| 38 | |||
| 39 | class ThingEater : public Bu::Thread | ||
| 40 | { | ||
| 41 | public: | ||
| 42 | ThingEater( ThingQueue &qThing ) : | ||
| 43 | qThing( qThing ) | ||
| 44 | { | ||
| 45 | } | ||
| 46 | |||
| 47 | bool bRunning; | ||
| 48 | |||
| 49 | void setRunning( bool b ) | ||
| 50 | { | ||
| 51 | mRunning.lock(); | ||
| 52 | bRunning = b; | ||
| 53 | mRunning.unlock(); | ||
| 54 | } | ||
| 55 | |||
| 56 | bool isRunning() | ||
| 57 | { | ||
| 58 | mRunning.lock(); | ||
| 59 | bool b = bRunning; | ||
| 60 | mRunning.unlock(); | ||
| 61 | return b; | ||
| 62 | } | ||
| 63 | |||
| 64 | protected: | ||
| 65 | virtual void run() | ||
| 66 | { | ||
| 67 | setRunning( true ); | ||
| 68 | while( isRunning() ) | ||
| 69 | { | ||
| 70 | Thing *pThing = qThing.dequeue( 0, 250000 ); | ||
| 71 | if( pThing == NULL ) | ||
| 72 | continue; | ||
| 73 | |||
| 74 | pThing->y = pThing->x*2; | ||
| 75 | usleep( 10000 ); | ||
| 76 | |||
| 77 | workDone(); | ||
| 78 | } | ||
| 79 | } | ||
| 80 | |||
| 81 | ThingQueue &qThing; | ||
| 82 | Bu::Mutex mRunning; | ||
| 83 | }; | ||
| 84 | |||
| 85 | typedef Bu::List<ThingEater *> ThingEaterList; | ||
| 86 | |||
| 87 | int main() | ||
| 88 | { | ||
| 89 | ThingQueue qThing; | ||
| 90 | ThingEaterList lEater; | ||
| 91 | |||
| 92 | mWorkDone.lock(); | ||
| 93 | iWorkDone = 1000; | ||
| 94 | mWorkDone.unlock(); | ||
| 95 | |||
| 96 | for( int j = 0; j < 5; j++ ) | ||
| 97 | lEater.append( new ThingEater( qThing ) ); | ||
| 98 | |||
| 99 | for( ThingEaterList::iterator i = lEater.begin(); i; i++ ) | ||
| 100 | (*i)->start(); | ||
| 101 | |||
| 102 | for( int j = 0; j < 1000; j++ ) | ||
| 103 | { | ||
| 104 | qThing.enqueue( new Thing( j ) ); | ||
| 105 | } | ||
| 106 | |||
| 107 | mWorkDone.lock(); | ||
| 108 | mWorkDone.unlock(); | ||
| 109 | cWorkDone.lock(); | ||
| 110 | for(;;) | ||
| 111 | { | ||
| 112 | mWorkDone.lock(); | ||
| 113 | if( iWorkDone == 0 ) | ||
| 114 | { | ||
| 115 | mWorkDone.unlock(); | ||
| 116 | break; | ||
| 117 | } | ||
| 118 | mWorkDone.unlock(); | ||
| 119 | cWorkDone.wait(); | ||
| 120 | } | ||
| 121 | cWorkDone.unlock(); | ||
| 122 | |||
| 123 | for( ThingEaterList::iterator i = lEater.begin(); i; i++ ) | ||
| 124 | (*i)->setRunning( false ); | ||
| 125 | |||
| 126 | for( ThingEaterList::iterator i = lEater.begin(); i; i++ ) | ||
| 127 | (*i)->join(); | ||
| 128 | |||
| 129 | return 0; | ||
| 130 | } | ||
| 131 | |||
