#include "gats/gatsstream.h" #include "gats/object.h" // #include #include // using namespace Bu; Gats::GatsStream::GatsStream( Bu::Stream &rStream ) : rStream( rStream ) { } Gats::GatsStream::~GatsStream() { } Gats::Object *Gats::GatsStream::readObject() { char buf[1500]; // sio << "Gats::GatsStream::readObject(): Scanning for object header." << sio.nl; do { if( qbRead.getSize() < 5 ) { // sio << "Gats::GatsStream::readObject(): reading header data, need 5b, have " << qbRead.getSize() << "b." << sio.nl; int iRead = rStream.read( buf, 5-qbRead.getSize() ); qbRead.write( buf, iRead ); if( qbRead.getSize() < 5 ) return NULL; } } while( !skipReadNulls() ); uint8_t uVer; qbRead.peek( &uVer, 1 ); // sio << "Gats::GatsStream::readObject(): Packet version: " << (int)uVer << sio.nl; int32_t iSize; qbRead.peek( &iSize, 4, 1 ); iSize = be32toh( iSize ); // sio << "Gats::GatsStream::readObject(): Header read, looking for " << iSize << "b, we have " << qbRead.getSize() << "b." << sio.nl; while( qbRead.getSize() < iSize ) { int32_t iRead = iSize - qbRead.getSize(); if( iRead > 1500 ) iRead = 1500; // sio << "Gats::GatsStream::readObject(): Attempting to read " << iRead << "b." << sio.nl; int32_t iReal = rStream.read( buf, iRead ); // sio << "Gats::GatsStream::readObject(): Read " << iReal << "b." << sio.nl; qbRead.write( buf, iReal ); if( iReal < iRead ) { // sio << "Gats::GatsStream::readObject(): Insufficient data read in block, bailing on read." << sio.nl; return NULL; } } if( qbRead.getSize() < iSize ) { // sio << "Gats::GatsStream::readObject(): Somehow, we still don't have enough data, bailing." << sio.nl; return NULL; } // sio << "Gats::GatsStream::readObject(): We have " << qbRead.getSize() << "b of " << iSize << "b, time to read the object." << sio.nl; qbRead.seek( 5 ); Gats::Object *pObj = Gats::Object::read( qbRead ); // sio << "Gats::GatsStream::readObject(): Read completed, there are " << qbRead.getSize() << "b left in the buffer." << sio.nl; return pObj; } void Gats::GatsStream::writeObject( Gats::Object *pObject ) { Bu::NullStream ns; pObject->write( ns ); uint8_t uBuf = 1; int32_t iSize = htobe32( ns.tell()+5 ); rStream.write( &uBuf, 1 ); rStream.write( &iSize, 4 ); pObject->write( rStream ); // sio << "Object consumed " << ns.tell() << "b." << sio.nl; } bool Gats::GatsStream::skipReadNulls() { char buf; // sio << "Gats::GatsStream::skipReadNulls(): Scanning for nulls, " << qbRead.getSize() << "b." << sio.nl; bool bHaveSeeked = false; for(;;) { if( qbRead.peek( &buf, 1 ) == 0 ) return false; if( buf != 0 ) return !bHaveSeeked; //true; else { // sio << "Gats::GatsStream::skipReadNulls(): Null byte read, not header yet..." << sio.nl; qbRead.seek( 1 ); bHaveSeeked = true; } } }