From da1e0ef0772b078bd295301bd675afdee00d40e9 Mon Sep 17 00:00:00 2001 From: Mike Buland Date: Sun, 23 Oct 2011 07:43:50 +0000 Subject: Switched ito* to synchro*, except the server, I'm thinking of takeing the core in a different direction anyway. Added the Deflate class, it uses zlib, and can do raw (headerless) deflate streams, zlib format, or gzip format. It's easy to use and quite versitile. --- default.bld | 5 + src/bzip2.cpp | 2 +- src/conduit.h | 8 +- src/deflate.cpp | 242 +++++++++++++++++++++++++++++++++++++++++++++++++ src/deflate.h | 63 +++++++++++++ src/itoatom.h | 64 ------------- src/itocounter.cpp | 8 -- src/itocounter.h | 49 ---------- src/itoheap.cpp | 9 -- src/itoheap.h | 154 ------------------------------- src/itoqueue.h | 240 ------------------------------------------------ src/itoserver.h | 6 +- src/synchroatom.h | 63 +++++++++++++ src/synchrocounter.cpp | 8 ++ src/synchrocounter.h | 49 ++++++++++ src/synchroheap.cpp | 9 ++ src/synchroheap.h | 151 ++++++++++++++++++++++++++++++ src/synchroqueue.h | 240 ++++++++++++++++++++++++++++++++++++++++++++++++ src/tests/deflate.cpp | 53 +++++++++++ 19 files changed, 891 insertions(+), 532 deletions(-) create mode 100644 src/deflate.cpp create mode 100644 src/deflate.h delete mode 100644 src/itoatom.h delete mode 100644 src/itocounter.cpp delete mode 100644 src/itocounter.h delete mode 100644 src/itoheap.cpp delete mode 100644 src/itoheap.h delete mode 100644 src/itoqueue.h create mode 100644 src/synchroatom.h create mode 100644 src/synchrocounter.cpp create mode 100644 src/synchrocounter.h create mode 100644 src/synchroheap.cpp create mode 100644 src/synchroheap.h create mode 100644 src/synchroqueue.h create mode 100644 src/tests/deflate.cpp diff --git a/default.bld b/default.bld index 8f98db0..6666fe2 100644 --- a/default.bld +++ b/default.bld @@ -167,6 +167,11 @@ target ["tests/bzip2", "tests/streamstack"] LDFLAGS += "-lbz2"; } +target ["tests/deflate"] +{ + LDFLAGS += "-lz"; +} + target ["tests/itoserver", "tests/socketblock", "tests/itoheap", "tests/itoqueue1", "tests/itoqueue2", "tests/conduit"] { diff --git a/src/bzip2.cpp b/src/bzip2.cpp index 5c35a26..0ff5444 100644 --- a/src/bzip2.cpp +++ b/src/bzip2.cpp @@ -33,7 +33,7 @@ void Bu::BZip2::start() bzState.bzfree = NULL; bzState.opaque = NULL; - nBufSize = 50000; + nBufSize = 64*1024; pBuf = new char[nBufSize]; } diff --git a/src/conduit.h b/src/conduit.h index 72b8d52..9babaaf 100644 --- a/src/conduit.h +++ b/src/conduit.h @@ -11,8 +11,8 @@ #include "bu/stream.h" #include "bu/string.h" #include "bu/queuebuf.h" -#include "bu/itomutex.h" -#include "bu/itocondition.h" +#include "bu/mutex.h" +#include "bu/condition.h" namespace Bu { @@ -54,8 +54,8 @@ namespace Bu private: QueueBuf qb; - mutable ItoMutex im; - ItoCondition cBlock; + mutable Mutex im; + Condition cBlock; bool bBlocking; bool bOpen; }; diff --git a/src/deflate.cpp b/src/deflate.cpp new file mode 100644 index 0000000..aec2a18 --- /dev/null +++ b/src/deflate.cpp @@ -0,0 +1,242 @@ +/* + * Copyright (C) 2007-2011 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#include "bu/deflate.h" +#include "bu/trace.h" + +using namespace Bu; + +Bu::Deflate::Deflate( Bu::Stream &rNext, int nCompression, Format eFmt ) : + Bu::Filter( rNext ), + nCompression( nCompression ), + sTotalOut( 0 ), + eFmt( eFmt ), + bEos( false ) +{ + TRACE( nCompression ); + start(); +} + +Bu::Deflate::~Deflate() +{ + TRACE(); + stop(); +} + +void Bu::Deflate::start() +{ + TRACE(); + zState.zalloc = NULL; + zState.zfree = NULL; + zState.opaque = NULL; + zState.state = NULL; + + nBufSize = 64*1024; + pBuf = new char[nBufSize]; +} + +Bu::size Bu::Deflate::stop() +{ + TRACE(); + if( zState.state ) + { + if( bReading ) + { + inflateEnd( &zState ); + delete[] pBuf; + pBuf = NULL; + return 0; + } + else + { + for(;;) + { + zState.next_in = NULL; + zState.avail_in = 0; + zState.avail_out = nBufSize; + zState.next_out = (Bytef *)pBuf; + int res = deflate( &zState, Z_FINISH ); + if( zState.avail_out < nBufSize ) + { + sTotalOut += rNext.write( pBuf, nBufSize-zState.avail_out ); + } + if( res == Z_STREAM_END ) + break; + } + deflateEnd( &zState ); + delete[] pBuf; + pBuf = NULL; + return sTotalOut; + } + } + return 0; +} + +void Bu::Deflate::zError( int code ) +{ + TRACE( code ); + switch( code ) + { + case Z_OK: + case Z_STREAM_END: + case Z_NEED_DICT: + return; + + case Z_ERRNO: + throw ExceptionBase("Deflate: Errno - %s", zState.msg ); + + case Z_STREAM_ERROR: + throw ExceptionBase("Deflate: Stream Error - %s", zState.msg ); + + case Z_DATA_ERROR: + throw ExceptionBase("Deflate: Data Error - %s", zState.msg ); + + case Z_MEM_ERROR: + throw ExceptionBase("Deflate: Mem Error - %s", zState.msg ); + + case Z_BUF_ERROR: + throw ExceptionBase("Deflate: Buf Error - %s", zState.msg ); + + case Z_VERSION_ERROR: + throw ExceptionBase("Deflate: Version Error - %s", zState.msg ); + + default: + throw ExceptionBase("Deflate: Unknown error encountered - %s.", zState.msg ); + + } +} + +Bu::size Bu::Deflate::read( void *pData, Bu::size nBytes ) +{ + TRACE( pData, nBytes ); + if( !zState.state ) + { + bReading = true; + if( eFmt&AutoDetect ) + inflateInit2( &zState, 32+15 ); // Auto-detect, large window + else if( eFmt == Raw ) + inflateInit2( &zState, -15 ); // Raw + else if( eFmt == Zlib ) + inflateInit2( &zState, 15 ); // Zlib + else if( eFmt == Gzip ) + inflateInit2( &zState, 16+15 ); // GZip + else + throw Bu::ExceptionBase("Format mode for deflate read."); + zState.next_in = (Bytef *)pBuf; + zState.avail_in = 0; + } + if( bReading == false ) + throw ExceptionBase("This deflate filter is in writing mode, you can't read."); + + int nRead = 0; + int nReadTotal = zState.total_out; + zState.next_out = (Bytef *)pData; + zState.avail_out = nBytes; + for(;;) + { + int ret = inflate( &zState, Z_NO_FLUSH ); + printf("inflate returned %d; avail in=%d, out=%d\n", ret, + zState.avail_in, zState.avail_out ); + + nReadTotal += nRead-zState.avail_out; + + if( ret == Z_STREAM_END ) + { + bEos = true; + if( zState.avail_in > 0 ) + { + if( rNext.isSeekable() ) + { + rNext.seek( -zState.avail_in ); + } + } + return nBytes-zState.avail_out; + } + if( ret != Z_BUF_ERROR ) + zError( ret ); + + if( zState.avail_out ) + { + if( zState.avail_in == 0 ) + { + nRead = rNext.read( pBuf, nBufSize ); + if( nRead == 0 && rNext.isEos() ) + { + throw Bu::ExceptionBase("Premature end of underlying " + "stream found reading deflate stream."); + } + zState.next_in = (Bytef *)pBuf; + zState.avail_in = nRead; + } + } + else + { + return nBytes-zState.avail_out; + } + } + return 0; +} + +Bu::size Bu::Deflate::write( const void *pData, Bu::size nBytes ) +{ + TRACE( pData, nBytes ); + if( !zState.state ) + { + bReading = false; + int iFmt = eFmt&Gzip; + if( iFmt == Raw ) + deflateInit2( &zState, nCompression, Z_DEFLATED, -15, 9, + Z_DEFAULT_STRATEGY ); + else if( iFmt == Zlib ) + deflateInit2( &zState, nCompression, Z_DEFLATED, 15, 9, + Z_DEFAULT_STRATEGY ); + else if( iFmt == Gzip ) + deflateInit2( &zState, nCompression, Z_DEFLATED, 16+15, 9, + Z_DEFAULT_STRATEGY ); + else + throw Bu::ExceptionBase("Invalid format for deflate."); + } + if( bReading == true ) + throw ExceptionBase("This deflate filter is in reading mode, you can't write."); + + zState.next_in = (Bytef *)pData; + zState.avail_in = nBytes; + for(;;) + { + zState.avail_out = nBufSize; + zState.next_out = (Bytef *)pBuf; + + zError( deflate( &zState, Z_NO_FLUSH ) ); + + if( zState.avail_out < nBufSize ) + { + sTotalOut += rNext.write( pBuf, nBufSize-zState.avail_out ); + } + if( zState.avail_in == 0 ) + break; + } + + return nBytes; +} + +bool Bu::Deflate::isOpen() +{ + TRACE(); + return (zState.state != NULL); +} + +bool Bu::Deflate::isEos() +{ + TRACE(); + return bEos; +} + +Bu::size Bu::Deflate::getCompressedSize() +{ + return sTotalOut; +} + diff --git a/src/deflate.h b/src/deflate.h new file mode 100644 index 0000000..cab9b51 --- /dev/null +++ b/src/deflate.h @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2007-2011 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#ifndef BU_DEFLATE_H +#define BU_DEFLATE_H + +#include +#include + +#include "bu/filter.h" + +namespace Bu +{ + /** + * + *@ingroup Streams + */ + class Deflate : public Bu::Filter + { + public: + enum Format + { + Raw = 0x01, + Zlib = 0x02, + Gzip = 0x03, + AutoDetect = 0x04, + + AutoRaw = 0x04|0x01, + AutoZlib = 0x04|0x02, + AutoGzip = 0x04|0x03 + }; + + Deflate( Bu::Stream &rNext, int nCompression=9, Format eFmt=AutoRaw ); + virtual ~Deflate(); + + virtual void start(); + virtual Bu::size stop(); + virtual Bu::size read( void *pBuf, Bu::size nBytes ); + virtual Bu::size write( const void *pBuf, Bu::size nBytes ); + + virtual bool isOpen(); + virtual bool isEos(); + + Bu::size getCompressedSize(); + + private: + void zError( int code ); + z_stream zState; + bool bReading; + int nCompression; + char *pBuf; + uint32_t nBufSize; + Bu::size sTotalOut; + Format eFmt; + bool bEos; + }; +} + +#endif diff --git a/src/itoatom.h b/src/itoatom.h deleted file mode 100644 index 3659f4e..0000000 --- a/src/itoatom.h +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright (C) 2007-2011 Xagasoft, All rights reserved. - * - * This file is part of the libbu++ library and is released under the - * terms of the license contained in the file LICENSE. - */ - -#ifndef BU_ITO_ATOM_H -#define BU_ITO_ATOM_H - -#include - -#include "itomutex.h" -#include "itocondition.h" - -namespace Bu -{ - /** - * A thread-safe wrapper class. - *@ingroup Threading - */ - template - class ItoAtom - { - public: - /** - * Construct an empty queue. - */ - ItoAtom() - { - } - - ItoAtom( const T &src ) : - data( src ) - { - } - - ~ItoAtom() - { - } - - T get() - { - mOperate.lock(); - T ret = data; - mOperate.unlock(); - return ret; - } - - void set( const T &val ) - { - mOperate.lock(); - data = val; - mOperate.unlock(); - } - - private: - T data; - - ItoMutex mOperate; /**< The master mutex, used on all operations. */ - }; -}; - -#endif diff --git a/src/itocounter.cpp b/src/itocounter.cpp deleted file mode 100644 index 0c6e06c..0000000 --- a/src/itocounter.cpp +++ /dev/null @@ -1,8 +0,0 @@ -/* - * Copyright (C) 2007-2011 Xagasoft, All rights reserved. - * - * This file is part of the libbu++ library and is released under the - * terms of the license contained in the file LICENSE. - */ - -#include "bu/itocounter.h" diff --git a/src/itocounter.h b/src/itocounter.h deleted file mode 100644 index 10126a5..0000000 --- a/src/itocounter.h +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (C) 2007-2011 Xagasoft, All rights reserved. - * - * This file is part of the libbu++ library and is released under the - * terms of the license contained in the file LICENSE. - */ - -#ifndef BU_ITO_COUNTER_H -#define BU_ITO_COUNTER_H - -#include "mutex.h" - -namespace Bu -{ - /** - * A simple thread-safe counter class. This is handy for assigning unique - * IDs to objects that are being created in different threads. - *@ingroup Threading Containers - */ - template - class ItoCounter - { - public: - ItoCounter() : - tCounter( 0 ) - { - } - - virtual ~ItoCounter() - { - } - - T next() - { - mOperate.lock(); - T tRet = tCounter; - tCounter++; - mOperate.unlock(); - - return tRet; - } - - private: - T tCounter; /**< The counter itself. */ - Mutex mOperate; /**< The master mutex, used on all operations. */ - }; -} - -#endif diff --git a/src/itoheap.cpp b/src/itoheap.cpp deleted file mode 100644 index 21ccef8..0000000 --- a/src/itoheap.cpp +++ /dev/null @@ -1,9 +0,0 @@ -/* - * Copyright (C) 2007-2011 Xagasoft, All rights reserved. - * - * This file is part of the libbu++ library and is released under the - * terms of the license contained in the file LICENSE. - */ - -#include "bu/itoheap.h" - diff --git a/src/itoheap.h b/src/itoheap.h deleted file mode 100644 index a5aad05..0000000 --- a/src/itoheap.h +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Copyright (C) 2007-2011 Xagasoft, All rights reserved. - * - * This file is part of the libbu++ library and is released under the - * terms of the license contained in the file LICENSE. - */ - -#ifndef BU_ITO_HEAP_H -#define BU_ITO_HEAP_H - -#include "bu/heap.h" -#include "bu/itomutex.h" -#include "bu/itocondition.h" - -namespace Bu -{ - class ItoMutex; - class ItoCondition; - - template, - typename itemalloc=std::allocator > - class ItoHeap - { - public: - ItoHeap() - { - } - - virtual ~ItoHeap() - { - } - - void enqueue( item i ) - { - imData.lock(); - hData.enqueue( i ); - icBlock.signal(); - imData.unlock(); - } - - item dequeue( bool bBlock=false ) - { - imData.lock(); - if( hData.isEmpty() ) - { - imData.unlock(); - - if( bBlock ) - { - icBlock.lock(); - - while( hData.isEmpty() ) - icBlock.wait(); - - imData.lock(); - try - { - item iRet = hData.dequeue(); - imData.unlock(); - icBlock.unlock(); - return iRet; - } - catch(...) - { - imData.unlock(); - icBlock.unlock(); - throw; - } - } - throw HeapException("Heap empty."); - } - else - { - try - { - item iRet = hData.dequeue(); - imData.unlock(); - return iRet; - } - catch(...) - { - imData.unlock(); - throw; - } - } - } - - item dequeue( int iSec, int iUSec ) - { - imData.lock(); - if( hData.isEmpty() ) - { - imData.unlock(); - - icBlock.lock(); - - icBlock.wait( iSec, iUSec ); - - imData.lock(); - try - { - item iRet = hData.dequeue(); - imData.unlock(); - icBlock.unlock(); - return iRet; - } - catch(...) - { - imData.unlock(); - icBlock.unlock(); - throw; - } - } - else - { - try - { - item iRet = hData.dequeue(); - imData.unlock(); - return iRet; - } - catch(...) - { - imData.unlock(); - throw; - } - } - } - - bool isEmpty() - { - imData.lock(); - bool bRet = hData.isEmpty(); - imData.unlock(); - return bRet; - } - - int getSize() - { - imData.lock(); - int iRet = hData.getSize(); - imData.unlock(); - return iRet; - } - - private: - Heap< item, cmpfunc, itemalloc > hData; - ItoMutex imData; - ItoCondition icBlock; - }; -}; - -#endif - diff --git a/src/itoqueue.h b/src/itoqueue.h deleted file mode 100644 index 039e09c..0000000 --- a/src/itoqueue.h +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Copyright (C) 2007-2011 Xagasoft, All rights reserved. - * - * This file is part of the libbu++ library and is released under the - * terms of the license contained in the file LICENSE. - */ - -#ifndef BU_ITO_QUEUE_H -#define BU_ITO_QUEUE_H - -#include - -#include "mutex.h" -#include "condition.h" - -namespace Bu -{ - /** - * A thread-safe queue class. This class is a very simple queue with some - * cool extra functionality for use with the Ito system. The main extra - * that it provides is the option to either dequeue without blocking, with - * infinite blocking, or with timed blocking, which will return a value if - * something is enqueued within the specified time limit, or NULL if the - * time limit is exceded. - *@ingroup Threading Containers - */ - template - class ItoQueue - { - private: - /** - * Helper struct. Keeps track of linked-list items for the queue data. - */ - typedef struct Item - { - T pData; - Item *pNext; - } Item; - - public: - /** - * Construct an empty queue. - */ - ItoQueue() : - pStart( NULL ), - pEnd( NULL ), - nSize( 0 ) - { - } - - /** - * Destroy the queue. This function will simply free all contained - * structures. If you stored pointers in the queue, this will lose the - * pointers without cleaning up the memory they pointed to. Make sure - * you're queue is empty before allowing it to be destroyed! - */ - ~ItoQueue() - { - Item *pCur = pStart; - while( pCur ) - { - Item *pTmp = pCur->pNext; - delete pCur; - pCur = pTmp; - } - } - - /** - * Enqueue a pieces of data. The new data will go at the end of the - * queue, and unless another piece of data is enqueued, will be the - * last piece of data to be dequeued. - *@param pData The data to enqueue. If this is not a primitive data - * type it's probably best to use a pointer type. - */ - void enqueue( T pData ) - { - mOperate.lock(); - - if( pStart == NULL ) - { - pStart = pEnd = new Item; - pStart->pData = pData; - pStart->pNext = NULL; - nSize++; - } - else - { - pEnd->pNext = new Item; - pEnd = pEnd->pNext; - pEnd->pData = pData; - pEnd->pNext = NULL; - nSize++; - } - - cBlock.signal(); - - mOperate.unlock(); - } - - /** - * Dequeue the first item from the queue. This function can operate in - * two different modes, blocking and non-blocking. In non-blocking - * mode it will return immediately weather there was data in the queue - * or not. If there was data it will remove it from the queue and - * return it to the caller. - * - * In blocking mode it will block forever wating for data to be - * enqueued. When data finally is enqueued this function will return - * immediately with the new data. The only way this function should - * ever return a null in blocking mode is if the calling thread was - * cancelled. It's probably a good idea to check for NULL return - * values even if you use blocking, just to be on the safe side. - *@param bBlock Set to true to enable blocking, leave as false to work - * in non-blocking mode. - *@returns The next piece of data in the queue, or NULL if no data was - * in the queue. - */ - T dequeue( bool bBlock=false ) - { - mOperate.lock(); - if( pStart == NULL ) - { - mOperate.unlock(); - - if( bBlock ) - { - cBlock.lock(); - - while( pStart == NULL ) - cBlock.wait(); - - T tmp = dequeue( false ); - - cBlock.unlock(); - return tmp; - - } - - return NULL; - } - else - { - T pTmp = pStart->pData; - Item *pDel = pStart; - pStart = pStart->pNext; - delete pDel; - nSize--; - - mOperate.unlock(); - return pTmp; - } - } - - /** - * Operates just like the other dequeue function in blocking mode with - * one twist. This function will block for at most nSec seconds and - * nUSec micro-seconds. If the timer is up and no data is available, - * this will just return NULL. If data is enqueued before the timeout - * expires, it will dequeue and exit immediately. - *@param nSec The number of seconds to wait, max. - *@param nUSec The number of micro-seconds to wait, max. - *@returns The next piece of data in the queue, or NULL if the timeout - * was exceeded. - */ - T dequeue( int nSec, int nUSec ) - { - mOperate.lock(); - if( pStart == NULL ) - { - mOperate.unlock(); - - cBlock.lock(); - - cBlock.wait( nSec, nUSec ); - - if( pStart == NULL ) - { - cBlock.unlock(); - return NULL; - } - - mOperate.lock(); - T pTmp = pStart->pData; - Item *pDel = pStart; - pStart = pStart->pNext; - delete pDel; - nSize--; - mOperate.unlock(); - - cBlock.unlock(); - return pTmp; - } - else - { - T pTmp = pStart->pData; - Item *pDel = pStart; - pStart = pStart->pNext; - delete pDel; - nSize--; - - mOperate.unlock(); - return pTmp; - } - } - - /** - * Checks to see if the queue has data in it or not. Note that there - * is no function to determine the length of the queue. This data - * isn't kept track of. If you really need to know, fix this. - *@returns True if the queue is empty, false if it has data in it. - */ - bool isEmpty() - { - mOperate.lock(); - bool bEmpty = (pStart == NULL ); - mOperate.unlock(); - - return bEmpty; - } - - long getSize() - { - mOperate.lock(); - long nRet = nSize; - mOperate.unlock(); - - return nRet; - } - - private: - Item *pStart; /**< The start of the queue, the next element to dequeue. */ - Item *pEnd; /**< The end of the queue, the last element to dequeue. */ - long nSize; /**< The number of items in the queue. */ - - Mutex mOperate; /**< The master mutex, used on all operations. */ - Condition cBlock; /**< The condition for blocking dequeues. */ - }; -} - -#endif diff --git a/src/itoserver.h b/src/itoserver.h index 75b3349..b1f5479 100644 --- a/src/itoserver.h +++ b/src/itoserver.h @@ -18,7 +18,7 @@ #include "bu/list.h" #include "bu/thread.h" #include "bu/mutex.h" -#include "bu/itoqueue.h" +#include "bu/synchroqueue.h" #include "bu/set.h" #include "bu/clientlink.h" @@ -82,7 +82,7 @@ namespace Bu int nTimeoutSec, int nTimeoutUSec ); virtual ~ItoClient(); - typedef ItoQueue StringQueue; + typedef SynchroQueue StringQueue; StringQueue qMsg; protected: @@ -129,7 +129,7 @@ namespace Bu typedef Hash ServerHash; ServerHash hServers; typedef Hash ClientHash; - typedef ItoQueue ClientQueue; + typedef SynchroQueue ClientQueue; ClientHash hClients; ClientQueue qClientCleanup; Mutex imClients; diff --git a/src/synchroatom.h b/src/synchroatom.h new file mode 100644 index 0000000..fb02054 --- /dev/null +++ b/src/synchroatom.h @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2007-2011 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#ifndef BU_SYNCHRO_ATOM_H +#define BU_SYNCHRO_ATOM_H + +#include + +#include "bu/mutex.h" + +namespace Bu +{ + /** + * A thread-safe wrapper class. + *@ingroup Threading + */ + template + class SynchroAtom + { + public: + /** + * Construct an empty queue. + */ + SynchroAtom() + { + } + + SynchroAtom( const T &src ) : + data( src ) + { + } + + ~SynchroAtom() + { + } + + T get() + { + mOperate.lock(); + T ret = data; + mOperate.unlock(); + return ret; + } + + void set( const T &val ) + { + mOperate.lock(); + data = val; + mOperate.unlock(); + } + + private: + T data; + + Mutex mOperate; /**< The master mutex, used on all operations. */ + }; +}; + +#endif diff --git a/src/synchrocounter.cpp b/src/synchrocounter.cpp new file mode 100644 index 0000000..48bbe21 --- /dev/null +++ b/src/synchrocounter.cpp @@ -0,0 +1,8 @@ +/* + * Copyright (C) 2007-2011 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#include "bu/synchrocounter.h" diff --git a/src/synchrocounter.h b/src/synchrocounter.h new file mode 100644 index 0000000..d201bee --- /dev/null +++ b/src/synchrocounter.h @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2007-2011 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#ifndef BU_SYNCHRO_COUNTER_H +#define BU_SYNCHRO_COUNTER_H + +#include "bu/mutex.h" + +namespace Bu +{ + /** + * A simple thread-safe counter class. This is handy for assigning unique + * IDs to objects that are being created in different threads. + *@ingroup Threading Containers + */ + template + class SynchroCounter + { + public: + SynchroCounter() : + tCounter( 0 ) + { + } + + virtual ~SynchroCounter() + { + } + + T next() + { + mOperate.lock(); + T tRet = tCounter; + tCounter++; + mOperate.unlock(); + + return tRet; + } + + private: + T tCounter; /**< The counter itself. */ + Mutex mOperate; /**< The master mutex, used on all operations. */ + }; +} + +#endif diff --git a/src/synchroheap.cpp b/src/synchroheap.cpp new file mode 100644 index 0000000..5dcce33 --- /dev/null +++ b/src/synchroheap.cpp @@ -0,0 +1,9 @@ +/* + * Copyright (C) 2007-2011 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#include "bu/synchroheap.h" + diff --git a/src/synchroheap.h b/src/synchroheap.h new file mode 100644 index 0000000..4dd898d --- /dev/null +++ b/src/synchroheap.h @@ -0,0 +1,151 @@ +/* + * Copyright (C) 2007-2011 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#ifndef BU_SYNCHRO_HEAP_H +#define BU_SYNCHRO_HEAP_H + +#include "bu/heap.h" +#include "bu/mutex.h" +#include "bu/condition.h" + +namespace Bu +{ + template, + typename itemalloc=std::allocator > + class SynchroHeap + { + public: + SynchroHeap() + { + } + + virtual ~SynchroHeap() + { + } + + void enqueue( item i ) + { + imData.lock(); + hData.enqueue( i ); + icBlock.signal(); + imData.unlock(); + } + + item dequeue( bool bBlock=false ) + { + imData.lock(); + if( hData.isEmpty() ) + { + imData.unlock(); + + if( bBlock ) + { + icBlock.lock(); + + while( hData.isEmpty() ) + icBlock.wait(); + + imData.lock(); + try + { + item iRet = hData.dequeue(); + imData.unlock(); + icBlock.unlock(); + return iRet; + } + catch(...) + { + imData.unlock(); + icBlock.unlock(); + throw; + } + } + throw HeapException("Heap empty."); + } + else + { + try + { + item iRet = hData.dequeue(); + imData.unlock(); + return iRet; + } + catch(...) + { + imData.unlock(); + throw; + } + } + } + + item dequeue( int iSec, int iUSec ) + { + imData.lock(); + if( hData.isEmpty() ) + { + imData.unlock(); + + icBlock.lock(); + + icBlock.wait( iSec, iUSec ); + + imData.lock(); + try + { + item iRet = hData.dequeue(); + imData.unlock(); + icBlock.unlock(); + return iRet; + } + catch(...) + { + imData.unlock(); + icBlock.unlock(); + throw; + } + } + else + { + try + { + item iRet = hData.dequeue(); + imData.unlock(); + return iRet; + } + catch(...) + { + imData.unlock(); + throw; + } + } + } + + bool isEmpty() + { + imData.lock(); + bool bRet = hData.isEmpty(); + imData.unlock(); + return bRet; + } + + int getSize() + { + imData.lock(); + int iRet = hData.getSize(); + imData.unlock(); + return iRet; + } + + private: + Heap< item, cmpfunc, itemalloc > hData; + Mutex imData; + Condition icBlock; + }; +}; + +#endif + diff --git a/src/synchroqueue.h b/src/synchroqueue.h new file mode 100644 index 0000000..79d5e49 --- /dev/null +++ b/src/synchroqueue.h @@ -0,0 +1,240 @@ +/* + * Copyright (C) 2007-2011 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#ifndef BU_SYNCHRO_QUEUE_H +#define BU_SYNCHRO_QUEUE_H + +#include + +#include "bu/mutex.h" +#include "bu/condition.h" + +namespace Bu +{ + /** + * A thread-safe queue class. This class is a very simple queue with some + * cool extra functionality for use with the Synchro system. The main extra + * that it provides is the option to either dequeue without blocking, with + * infinite blocking, or with timed blocking, which will return a value if + * something is enqueued within the specified time limit, or NULL if the + * time limit is exceded. + *@ingroup Threading Containers + */ + template + class SynchroQueue + { + private: + /** + * Helper struct. Keeps track of linked-list items for the queue data. + */ + typedef struct Item + { + T pData; + Item *pNext; + } Item; + + public: + /** + * Construct an empty queue. + */ + SynchroQueue() : + pStart( NULL ), + pEnd( NULL ), + nSize( 0 ) + { + } + + /** + * Destroy the queue. This function will simply free all contained + * structures. If you stored pointers in the queue, this will lose the + * pointers without cleaning up the memory they pointed to. Make sure + * you're queue is empty before allowing it to be destroyed! + */ + ~SynchroQueue() + { + Item *pCur = pStart; + while( pCur ) + { + Item *pTmp = pCur->pNext; + delete pCur; + pCur = pTmp; + } + } + + /** + * Enqueue a pieces of data. The new data will go at the end of the + * queue, and unless another piece of data is enqueued, will be the + * last piece of data to be dequeued. + *@param pData The data to enqueue. If this is not a primitive data + * type it's probably best to use a pointer type. + */ + void enqueue( T pData ) + { + mOperate.lock(); + + if( pStart == NULL ) + { + pStart = pEnd = new Item; + pStart->pData = pData; + pStart->pNext = NULL; + nSize++; + } + else + { + pEnd->pNext = new Item; + pEnd = pEnd->pNext; + pEnd->pData = pData; + pEnd->pNext = NULL; + nSize++; + } + + cBlock.signal(); + + mOperate.unlock(); + } + + /** + * Dequeue the first item from the queue. This function can operate in + * two different modes, blocking and non-blocking. In non-blocking + * mode it will return immediately weather there was data in the queue + * or not. If there was data it will remove it from the queue and + * return it to the caller. + * + * In blocking mode it will block forever wating for data to be + * enqueued. When data finally is enqueued this function will return + * immediately with the new data. The only way this function should + * ever return a null in blocking mode is if the calling thread was + * cancelled. It's probably a good idea to check for NULL return + * values even if you use blocking, just to be on the safe side. + *@param bBlock Set to true to enable blocking, leave as false to work + * in non-blocking mode. + *@returns The next piece of data in the queue, or NULL if no data was + * in the queue. + */ + T dequeue( bool bBlock=false ) + { + mOperate.lock(); + if( pStart == NULL ) + { + mOperate.unlock(); + + if( bBlock ) + { + cBlock.lock(); + + while( pStart == NULL ) + cBlock.wait(); + + T tmp = dequeue( false ); + + cBlock.unlock(); + return tmp; + + } + + return NULL; + } + else + { + T pTmp = pStart->pData; + Item *pDel = pStart; + pStart = pStart->pNext; + delete pDel; + nSize--; + + mOperate.unlock(); + return pTmp; + } + } + + /** + * Operates just like the other dequeue function in blocking mode with + * one twist. This function will block for at most nSec seconds and + * nUSec micro-seconds. If the timer is up and no data is available, + * this will just return NULL. If data is enqueued before the timeout + * expires, it will dequeue and exit immediately. + *@param nSec The number of seconds to wait, max. + *@param nUSec The number of micro-seconds to wait, max. + *@returns The next piece of data in the queue, or NULL if the timeout + * was exceeded. + */ + T dequeue( int nSec, int nUSec ) + { + mOperate.lock(); + if( pStart == NULL ) + { + mOperate.unlock(); + + cBlock.lock(); + + cBlock.wait( nSec, nUSec ); + + if( pStart == NULL ) + { + cBlock.unlock(); + return NULL; + } + + mOperate.lock(); + T pTmp = pStart->pData; + Item *pDel = pStart; + pStart = pStart->pNext; + delete pDel; + nSize--; + mOperate.unlock(); + + cBlock.unlock(); + return pTmp; + } + else + { + T pTmp = pStart->pData; + Item *pDel = pStart; + pStart = pStart->pNext; + delete pDel; + nSize--; + + mOperate.unlock(); + return pTmp; + } + } + + /** + * Checks to see if the queue has data in it or not. Note that there + * is no function to determine the length of the queue. This data + * isn't kept track of. If you really need to know, fix this. + *@returns True if the queue is empty, false if it has data in it. + */ + bool isEmpty() + { + mOperate.lock(); + bool bEmpty = (pStart == NULL ); + mOperate.unlock(); + + return bEmpty; + } + + long getSize() + { + mOperate.lock(); + long nRet = nSize; + mOperate.unlock(); + + return nRet; + } + + private: + Item *pStart; /**< The start of the queue, the next element to dequeue. */ + Item *pEnd; /**< The end of the queue, the last element to dequeue. */ + long nSize; /**< The number of items in the queue. */ + + Mutex mOperate; /**< The master mutex, used on all operations. */ + Condition cBlock; /**< The condition for blocking dequeues. */ + }; +} + +#endif diff --git a/src/tests/deflate.cpp b/src/tests/deflate.cpp new file mode 100644 index 0000000..9796408 --- /dev/null +++ b/src/tests/deflate.cpp @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2007-2011 Xagasoft, All rights reserved. + * + * This file is part of the libbu++ library and is released under the + * terms of the license contained in the file LICENSE. + */ + +#include "bu/deflate.h" +#include "bu/file.h" + +int main( int argc, char *argv[] ) +{ + if( argc < 3 ) + { + printf("usage: %s \n", argv[0] ); + return -1; + } + + char buf[1024]; + size_t nRead; + + /* + Bu::File fin( argv[1], Bu::File::Read ); + fin.seek( 4 ); + Bu::Deflate def( fin ); + + Bu::File f( argv[2], Bu::File::WriteNew ); + + for(;;) + { + nRead = def.read( buf, 1024 ); + if( nRead > 0 ) + f.write( buf, nRead ); + if( def.isEos() ) + break; + } + */ + + Bu::File fin( argv[1], Bu::File::Read ); + + Bu::File f( argv[2], Bu::File::WriteNew ); + Bu::Deflate def( f, 9, Bu::Deflate::Gzip ); + + for(;;) + { + nRead = fin.read( buf, 1024 ); + if( nRead > 0 ) + def.write( buf, nRead ); + if( fin.isEos() ) + break; + } +} + -- cgit v1.2.3