summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--default.bld2
-rw-r--r--src/stable/synchroqueue.h44
-rw-r--r--src/tests/synchroqueue.cpp131
3 files changed, 153 insertions, 24 deletions
diff --git a/default.bld b/default.bld
index c1b93af..d46662c 100644
--- a/default.bld
+++ b/default.bld
@@ -151,7 +151,7 @@ target "viewcsv"
151 LDFLAGS += "-lncurses"; 151 LDFLAGS += "-lncurses";
152} 152}
153 153
154target ["myriad", "myriadfs", "tests/myriad", "tests/myriadfs", "unit/myriad", "tests/bigmyriad"] 154target ["myriad", "myriadfs", "tests/myriad", "tests/myriadfs", "unit/myriad", "tests/bigmyriad", "tests/synchroqueue"]
155{ 155{
156 LDFLAGS += "-lpthread"; 156 LDFLAGS += "-lpthread";
157} 157}
diff --git a/src/stable/synchroqueue.h b/src/stable/synchroqueue.h
index a0335f6..7ba299f 100644
--- a/src/stable/synchroqueue.h
+++ b/src/stable/synchroqueue.h
@@ -56,6 +56,7 @@ namespace Bu
56 */ 56 */
57 ~SynchroQueue() 57 ~SynchroQueue()
58 { 58 {
59 cBlock.lock();
59 Item *pCur = pStart; 60 Item *pCur = pStart;
60 while( pCur ) 61 while( pCur )
61 { 62 {
@@ -63,6 +64,7 @@ namespace Bu
63 delete pCur; 64 delete pCur;
64 pCur = pTmp; 65 pCur = pTmp;
65 } 66 }
67 cBlock.unlock();
66 } 68 }
67 69
68 /** 70 /**
@@ -74,7 +76,7 @@ namespace Bu
74 */ 76 */
75 void enqueue( T pData ) 77 void enqueue( T pData )
76 { 78 {
77 mOperate.lock(); 79 cBlock.lock();
78 80
79 if( pStart == NULL ) 81 if( pStart == NULL )
80 { 82 {
@@ -93,8 +95,8 @@ namespace Bu
93 } 95 }
94 96
95 cBlock.signal(); 97 cBlock.signal();
96 98
97 mOperate.unlock(); 99 cBlock.unlock();
98 } 100 }
99 101
100 /** 102 /**
@@ -117,17 +119,20 @@ namespace Bu
117 */ 119 */
118 T dequeue( bool bBlock=false ) 120 T dequeue( bool bBlock=false )
119 { 121 {
120 mOperate.lock(); 122 cBlock.lock();
121 if( pStart == NULL ) 123 if( pStart == NULL )
122 { 124 {
123 mOperate.unlock();
124
125 if( bBlock ) 125 if( bBlock )
126 { 126 {
127 cBlock.lock(); 127 for(;;)
128 128 {
129 while( pStart == NULL ) 129 if( pStart != NULL )
130 {
131 cBlock.unlock();
132 break;
133 }
130 cBlock.wait(); 134 cBlock.wait();
135 }
131 136
132 T tmp = dequeue( false ); 137 T tmp = dequeue( false );
133 138
@@ -146,7 +151,7 @@ namespace Bu
146 delete pDel; 151 delete pDel;
147 nSize--; 152 nSize--;
148 153
149 mOperate.unlock(); 154 cBlock.unlock();
150 return pTmp; 155 return pTmp;
151 } 156 }
152 } 157 }
@@ -164,13 +169,9 @@ namespace Bu
164 */ 169 */
165 T dequeue( int nSec, int nUSec ) 170 T dequeue( int nSec, int nUSec )
166 { 171 {
167 mOperate.lock(); 172 cBlock.lock();
168 if( pStart == NULL ) 173 if( pStart == NULL )
169 { 174 {
170 mOperate.unlock();
171
172 cBlock.lock();
173
174 cBlock.wait( nSec, nUSec ); 175 cBlock.wait( nSec, nUSec );
175 176
176 if( pStart == NULL ) 177 if( pStart == NULL )
@@ -179,13 +180,11 @@ namespace Bu
179 return NULL; 180 return NULL;
180 } 181 }
181 182
182 mOperate.lock();
183 T pTmp = pStart->pData; 183 T pTmp = pStart->pData;
184 Item *pDel = pStart; 184 Item *pDel = pStart;
185 pStart = pStart->pNext; 185 pStart = pStart->pNext;
186 delete pDel; 186 delete pDel;
187 nSize--; 187 nSize--;
188 mOperate.unlock();
189 188
190 cBlock.unlock(); 189 cBlock.unlock();
191 return pTmp; 190 return pTmp;
@@ -198,7 +197,7 @@ namespace Bu
198 delete pDel; 197 delete pDel;
199 nSize--; 198 nSize--;
200 199
201 mOperate.unlock(); 200 cBlock.unlock();
202 return pTmp; 201 return pTmp;
203 } 202 }
204 } 203 }
@@ -211,18 +210,18 @@ namespace Bu
211 */ 210 */
212 bool isEmpty() 211 bool isEmpty()
213 { 212 {
214 mOperate.lock(); 213 cBlock.lock();
215 bool bEmpty = (pStart == NULL ); 214 bool bEmpty = (pStart == NULL );
216 mOperate.unlock(); 215 cBlock.unlock();
217 216
218 return bEmpty; 217 return bEmpty;
219 } 218 }
220 219
221 long getSize() 220 long getSize()
222 { 221 {
223 mOperate.lock(); 222 cBlock.lock();
224 long nRet = nSize; 223 long nRet = nSize;
225 mOperate.unlock(); 224 cBlock.unlock();
226 225
227 return nRet; 226 return nRet;
228 } 227 }
@@ -232,7 +231,6 @@ namespace Bu
232 Item *pEnd; /**< The end of the queue, the last element to dequeue. */ 231 Item *pEnd; /**< The end of the queue, the last element to dequeue. */
233 long nSize; /**< The number of items in the queue. */ 232 long nSize; /**< The number of items in the queue. */
234 233
235 Mutex mOperate; /**< The master mutex, used on all operations. */
236 Condition cBlock; /**< The condition for blocking dequeues. */ 234 Condition cBlock; /**< The condition for blocking dequeues. */
237 }; 235 };
238} 236}
diff --git a/src/tests/synchroqueue.cpp b/src/tests/synchroqueue.cpp
new file mode 100644
index 0000000..980a4a3
--- /dev/null
+++ b/src/tests/synchroqueue.cpp
@@ -0,0 +1,131 @@
1#include <bu/thread.h>
2#include <bu/synchroqueue.h>
3#include <bu/list.h>
4
5class Thing
6{
7public:
8 Thing( int x ) :
9 x( x ),
10 y( 0 )
11 {
12 }
13
14 int x;
15 int y;
16};
17
18typedef Bu::SynchroQueue<Thing *> ThingQueue;
19
20Bu::Mutex mWorkDone;
21int iWorkDone;
22Bu::Condition cWorkDone;
23
24void workDone()
25{
26 mWorkDone.lock();
27 iWorkDone--;
28 if( iWorkDone == 0 )
29 {
30 mWorkDone.unlock();
31 cWorkDone.lock();
32 cWorkDone.signal();
33 cWorkDone.unlock();
34 return;
35 }
36 mWorkDone.unlock();
37}
38
39class ThingEater : public Bu::Thread
40{
41public:
42 ThingEater( ThingQueue &qThing ) :
43 qThing( qThing )
44 {
45 }
46
47 bool bRunning;
48
49 void setRunning( bool b )
50 {
51 mRunning.lock();
52 bRunning = b;
53 mRunning.unlock();
54 }
55
56 bool isRunning()
57 {
58 mRunning.lock();
59 bool b = bRunning;
60 mRunning.unlock();
61 return b;
62 }
63
64protected:
65 virtual void run()
66 {
67 setRunning( true );
68 while( isRunning() )
69 {
70 Thing *pThing = qThing.dequeue( 0, 250000 );
71 if( pThing == NULL )
72 continue;
73
74 pThing->y = pThing->x*2;
75 usleep( 10000 );
76
77 workDone();
78 }
79 }
80
81 ThingQueue &qThing;
82 Bu::Mutex mRunning;
83};
84
85typedef Bu::List<ThingEater *> ThingEaterList;
86
87int main()
88{
89 ThingQueue qThing;
90 ThingEaterList lEater;
91
92 mWorkDone.lock();
93 iWorkDone = 1000;
94 mWorkDone.unlock();
95
96 for( int j = 0; j < 5; j++ )
97 lEater.append( new ThingEater( qThing ) );
98
99 for( ThingEaterList::iterator i = lEater.begin(); i; i++ )
100 (*i)->start();
101
102 for( int j = 0; j < 1000; j++ )
103 {
104 qThing.enqueue( new Thing( j ) );
105 }
106
107 mWorkDone.lock();
108 mWorkDone.unlock();
109 cWorkDone.lock();
110 for(;;)
111 {
112 mWorkDone.lock();
113 if( iWorkDone == 0 )
114 {
115 mWorkDone.unlock();
116 break;
117 }
118 mWorkDone.unlock();
119 cWorkDone.wait();
120 }
121 cWorkDone.unlock();
122
123 for( ThingEaterList::iterator i = lEater.begin(); i; i++ )
124 (*i)->setRunning( false );
125
126 for( ThingEaterList::iterator i = lEater.begin(); i; i++ )
127 (*i)->join();
128
129 return 0;
130}
131