From 915005e218b5d00939b548de65ce6354f7acb487 Mon Sep 17 00:00:00 2001 From: Mike Buland Date: Fri, 28 Jul 2023 21:18:56 -0700 Subject: Completely redesigned Server and Client. Like, seriously, they're almost completely different. --- src/stable/synchroqueue.h | 100 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 96 insertions(+), 4 deletions(-) (limited to 'src/stable/synchroqueue.h') diff --git a/src/stable/synchroqueue.h b/src/stable/synchroqueue.h index 1c39c2c..b10ec33 100644 --- a/src/stable/synchroqueue.h +++ b/src/stable/synchroqueue.h @@ -44,7 +44,8 @@ namespace Bu SynchroQueue() : pStart( NULL ), pEnd( NULL ), - nSize( 0 ) + nSize( 0 ), + bRunning( true ) { } @@ -76,6 +77,14 @@ namespace Bu */ void enqueue( T pData ) { + mRunning.lock(); + if( !bRunning ) + { + mRunning.unlock(); + throw Bu::ExceptionBase("SynchoQueue is stopped."); + } + mRunning.unlock(); + cBlock.lock(); if( pStart == NULL ) @@ -119,6 +128,14 @@ namespace Bu */ T dequeue( bool bBlock=false ) { + mRunning.lock(); + if( !bRunning ) + { + mRunning.unlock(); + return T(); + } + mRunning.unlock(); + cBlock.lock(); if( pStart == NULL ) { @@ -129,8 +146,18 @@ namespace Bu if( pStart == NULL ) { cBlock.unlock(); - return NULL; + return T(); } + + mRunning.lock(); + if( !bRunning ) + { + mRunning.unlock(); + cBlock.unlock(); + return T(); + } + mRunning.unlock(); + T pTmp = pStart->pData; Item *pDel = pStart; pStart = pStart->pNext; @@ -143,7 +170,7 @@ namespace Bu } cBlock.unlock(); - return NULL; + return T(); } else { @@ -171,6 +198,14 @@ namespace Bu */ T dequeue( int nSec, int nUSec ) { + mRunning.lock(); + if( !bRunning ) + { + mRunning.unlock(); + return T(); + } + mRunning.unlock(); + cBlock.lock(); if( pStart == NULL ) { @@ -179,8 +214,17 @@ namespace Bu if( pStart == NULL ) { cBlock.unlock(); - return NULL; + return T(); + } + + mRunning.lock(); + if( !bRunning ) + { + mRunning.unlock(); + cBlock.unlock(); + return T(); } + mRunning.unlock(); T pTmp = pStart->pData; Item *pDel = pStart; @@ -203,6 +247,35 @@ namespace Bu return pTmp; } } + + T drain() + { + mRunning.lock(); + if( bRunning ) + { + mRunning.unlock(); + return NULL; + } + mRunning.unlock(); + + cBlock.lock(); + if( pStart == NULL ) + { + cBlock.unlock(); + return T(); + } + else + { + T pTmp = pStart->pData; + Item *pDel = pStart; + pStart = pStart->pNext; + delete pDel; + nSize--; + + cBlock.unlock(); + return pTmp; + } + } /** * Checks to see if the queue has data in it or not. Note that there @@ -235,12 +308,31 @@ namespace Bu cBlock.unlock(); } + void stop() + { + mRunning.lock(); + bRunning = false; + mRunning.unlock(); + unblockAll(); + } + + bool isRunning() const + { + bool bRet; + mRunning.lock(); + bRet = bRunning; + mRunning.unlock(); + return bRet; + } + private: Item *pStart; /**< The start of the queue, the next element to dequeue. */ Item *pEnd; /**< The end of the queue, the last element to dequeue. */ long nSize; /**< The number of items in the queue. */ Condition cBlock; /**< The condition for blocking dequeues. */ + mutable Mutex mRunning; + bool bRunning; }; } -- cgit v1.2.3