aboutsummaryrefslogtreecommitdiff
path: root/src/stable/myriad.cpp
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/stable/myriad.cpp1226
1 files changed, 646 insertions, 580 deletions
diff --git a/src/stable/myriad.cpp b/src/stable/myriad.cpp
index 86f651e..53250c2 100644
--- a/src/stable/myriad.cpp
+++ b/src/stable/myriad.cpp
@@ -1,24 +1,25 @@
1/*
2 * Copyright (C) 2007-2023 Xagasoft, All rights reserved.
3 *
4 * This file is part of the libbu++ library and is released under the
5 * terms of the license contained in the file LICENSE.
6 */
7
8#include "bu/config.h"
9#include "bu/myriad.h" 1#include "bu/myriad.h"
10#include "bu/stream.h"
11#include "bu/myriadstream.h" 2#include "bu/myriadstream.h"
3
4#include "bu/membuf.h"
12#include "bu/mutexlocker.h" 5#include "bu/mutexlocker.h"
13#include <stdio.h> 6#include "bu/util.h"
14 7
15#include "bu/sio.h" 8#include "bu/sio.h"
16using Bu::sio;
17using Bu::Fmt;
18 9
19#define Myriad_MAGIC_CODE ((unsigned char *)"\x0a\xd3\xfa\x84") 10#define Myriad_MAGIC_CODE ((unsigned char *)"\x0a\xd3\xfa\x84")
20 11
21#define TRACE( x ) Bu::println("%1:%2: %3: %4 - %5").arg(__FILE__).arg( __LINE__ ).arg(__PRETTY_FUNCTION__).arg(sStore.getLocation()).arg(x) 12#define MyriadRead( target, size ) if( rBacking.read( target, size ) < size ) \
13{ \
14 throw Bu::MyriadException( Bu::MyriadException::invalidFormat, \
15 "Insufficient data reading myriad data from backing stream."); \
16} (void)0
17
18#define ReqRead( stream, target, size ) if( stream.read( target, size ) < size ) \
19{ \
20 throw Bu::MyriadException( Bu::MyriadException::invalidFormat, \
21 "Insufficient data reading from myriad stream."); \
22} (void)0
22 23
23namespace Bu 24namespace Bu
24{ 25{
@@ -28,779 +29,844 @@ namespace Bu
28 } 29 }
29} 30}
30 31
31Bu::Myriad::Myriad( Bu::Stream &sStore, int iBlockSize, int iPreallocate ) : 32Bu::Myriad::Myriad( Bu::Stream &rBacking, int iBlockSize,
32 sStore( sStore ), 33 int iPreallocateBlocks ) :
34 rBacking( rBacking ),
33 iBlockSize( iBlockSize ), 35 iBlockSize( iBlockSize ),
34 iBlocks( 0 ), 36 iBlockCount( 0 ),
35 iUsed( 0 ), 37 bIsNewStream( true ),
36 bHeaderChanged( false ) 38 bStructureChanged( false ),
39 iLastUsedIndex( -1 )
37{ 40{
38 try 41 if( !rBacking.isSeekable() )
39 { 42 {
40 initialize(); 43 throw Bu::MyriadException( Bu::MyriadException::invalidBackingStream,
44 "Myriad backing stream must be random access (seekable).");
41 } 45 }
42 catch( Bu::MyriadException &e ) 46 if( rBacking.getSize() == 0 )
43 { 47 {
44 if( e.getErrorCode() == MyriadException::emptyStream ) 48 createMyriad( iBlockSize, iPreallocateBlocks );
45 { 49 }
46 initialize( iBlockSize, iPreallocate ); 50 else
47 } 51 {
48 else 52 loadMyriad();
49 {
50 throw;
51 }
52 } 53 }
53} 54}
54 55
55Bu::Myriad::~Myriad() 56Bu::Myriad::~Myriad()
56{ 57{
57 mActiveBlocks.lock(); 58 writeHeader();
58 TRACE("mActiveBlocks locked."); 59}
59 if( !hActiveBlocks.isEmpty() ) 60
60 { 61Bu::MyriadStream Bu::Myriad::create( Bu::Myriad::Mode eMode,
61 sio << "Bu::Myriad::~Myriad(): Error: There are " 62 int32_t iPreallocateBytes )
62 << hActiveBlocks.getSize() << " unsynced blocks!" << sio.nl; 63{
63 } 64 Bu::MutexLocker l( mAccess );
64 TRACE("mActiveBlocks unlocking...");
65 mActiveBlocks.unlock();
66 sync();
67 65
68 for( StreamArray::iterator i = aStreams.begin(); i; i++ ) 66 Stream *pStream = new Stream( *this, ++iLastUsedIndex, 0 );
67 int iBlocks = std::max(1, blkDiv( iPreallocateBytes, iBlockSize ));
68 for( int j = 0; j < iBlocks; j++ )
69 { 69 {
70 delete *i; 70 pStream->aBlocks.append( __allocateBlock() );
71 } 71 }
72 mhStream.lock();
73 hStream.insert( pStream->iStream, pStream );
74 mhStream.unlock();
75 bStructureChanged = true;
76
77 return Bu::MyriadStream( *this, pStream, eMode&ReadWrite );
72} 78}
73 79
74void Bu::Myriad::sync() 80Bu::MyriadStream Bu::Myriad::open( Bu::Myriad::StreamId iStream,
81 Bu::Myriad::Mode eMode )
75{ 82{
76 updateHeader(); 83 Stream *pStream = NULL;
77 84 Bu::MutexLocker l( mhStream );
78 mActiveBlocks.lock(); 85 if( (eMode&Create) )
79 TRACE("mActiveBlocks locked.");
80 for( BlockHash::iterator i = hActiveBlocks.begin(); i; i++ )
81 { 86 {
82 if( (*i)->bChanged ) 87 if( hStream.has( iStream ) )
88 {
89 if( (eMode&Exclusive) )
90 {
91 throw Bu::MyriadException( MyriadException::noSuchStream,
92 "Stream exists.");
93 }
94 }
95 else
83 { 96 {
84 syncBlock( *i ); 97 Bu::MutexLocker l( mAccess );
98 if( iStream >= iLastUsedIndex )
99 {
100 iLastUsedIndex = iStream;
101 }
102 pStream = new Stream( *this, iStream, 0 );
103 pStream->aBlocks.append( __allocateBlock() );
104 hStream.insert( pStream->iStream, pStream );
105 bStructureChanged = true;
85 } 106 }
86 } 107 }
87 TRACE("mActiveBlocks unlocked..."); 108 if( !hStream.has( iStream ) )
88 mActiveBlocks.unlock();
89}
90
91void Bu::Myriad::initialize()
92{
93 MutexLocker mLock( mHeader );
94 TRACE("mHeader locked.");
95 lFreeBlocks.clear();
96 sStore.setPosEnd( 0 );
97 Bu::size iSize = sStore.tell();
98 sStore.setPos( 0 );
99
100 unsigned char buf[4];
101 if( sStore.read( buf, 4 ) < 4 )
102 { 109 {
103 TRACE("mHeader unlocked..."); 110 throw Bu::MyriadException( MyriadException::noSuchStream,
104 throw MyriadException( MyriadException::emptyStream, 111 "No such stream.");
105 "Input stream appears to be empty.");
106 } 112 }
107 if( memcmp( buf, Myriad_MAGIC_CODE, 4 ) )
108 { 113 {
109 TRACE("mHeader unlocked..."); 114 Bu::MutexLocker l2( mBacking );
110 throw MyriadException( MyriadException::invalidFormat, 115 if( (eMode&Write) && !rBacking.isWritable() )
111 "Stream does not appear to be a valid Myriad format."); 116 {
117 throw Bu::MyriadException( MyriadException::badMode,
118 "Backing stream does not support writing.");
119 }
112 } 120 }
113 sStore.read( buf, 2 ); 121 if( pStream == NULL )
114 if( buf[0] != 1 )
115 { 122 {
116 TRACE("mHeader unlocked..."); 123 pStream = hStream.get( iStream );
117 throw MyriadException( MyriadException::badVersion,
118 "We can only handle version 1 for now.");
119 } 124 }
120 if( buf[1] != 32 ) 125 if( (eMode&Truncate) )
121 { 126 {
122 TRACE("mHeader unlocked..."); 127 pStream->setSize( 0 );
123 throw MyriadException( MyriadException::invalidWordSize,
124 "We can only handle 32-bit words at the moment.");
125 } 128 }
126 sStore.read( &iBlockSize, 4 ); 129 return Bu::MyriadStream( *this, pStream, eMode );
127 int iStreams; 130}
128 sStore.read( &iStreams, 4 );
129
130 iBlocks = iSize/iBlockSize;
131 //sio << "Myriad: iSize=" << iSize << ", iBlockSize=" << iBlockSize
132 // << ", iBlocks=" << iBlocks << ", iStreams=" << iStreams << sio.nl;
133 131
134 int iHeaderSize = 14 + 8 + 4; 132Bu::Myriad::StreamId Bu::Myriad::allocate()
135 int iHeaderBlocks = 0; //blkDiv( iHeaderSize+4, iBlockSize ); 133{
134 Bu::MutexLocker l( mAccess );
136 135
137 while( iHeaderSize > iHeaderBlocks*iBlockSize ) 136 Stream *pStream = new Stream( *this, ++iLastUsedIndex, 0 );
138 { 137 mhStream.lock();
139 iHeaderBlocks = blkDiv( iHeaderSize+4, iBlockSize ); 138 hStream.insert( pStream->iStream, pStream );
140 iHeaderSize = 14 + 8 + 4*iHeaderBlocks; 139 mhStream.unlock();
141 } 140 bStructureChanged = true;
142 141
143 //sio << "Myriad: iHeaderSize=" << iHeaderSize 142 return pStream->iStream;
144 // << ", iHeaderBlocks=" << iHeaderBlocks << sio.nl; 143}
145 144
146 Stream *pFakeHdr = new Stream; 145void Bu::Myriad::erase( Bu::Myriad::StreamId iStream )
147 pFakeHdr->iId = 0; 146{
148 pFakeHdr->iSize = iHeaderSize; 147 // For now, let's prevent you from erasing a stream if it's open.
149 for( int j = 0; j < iHeaderBlocks; j++ ) 148 Bu::MutexLocker l( mhStream );
149 if( !hStream.has( iStream ) )
150 { 150 {
151 pFakeHdr->aBlocks.append( j ); 151 throw Bu::MyriadException( Bu::MyriadException::noSuchStream,
152 "No such stream exists.");
152 } 153 }
153 154 Stream *pStream = hStream.get( iStream );
154// sio << "Blocks: " << iBlocks << " (size = " << iSize << "/" << iBlockSize
155// << ")" << sio.nl;
156 Bu::BitString bsBlockUsed( iBlocks, false );
157 bsBlockUsed.clear();
158
159// bool bCanSkip = false; // Can skip around, post initial header stream i/o
160 MyriadStream *pIn = new MyriadStream( *this, pFakeHdr );
161 pIn->setPos( sStore.tell() );
162 for( int j = 0; j < iStreams; j++ )
163 { 155 {
164 aStreams.append( new Stream() ); 156 Bu::MutexLocker sl( pStream->mAccess );
165 Stream &s = *aStreams[j]; 157 if( pStream->iOpenCount > 0 )
166 pIn->read( &s.iId, 4 );
167 pIn->read( &s.iSize, 4 );
168 int iSBlocks = blkDiv(s.iSize, iBlockSize);
169 // sio << "Myriad: - Stream::iId=" << s.iId
170 // << ", Stream::iSize=" << s.iSize
171 // << ", Stream::aBlocks=" << iSBlocks
172 // << ", pIn->tell()=" << pIn->tell() << sio.nl;
173 for( int k = 0; k < iSBlocks; k++ )
174 { 158 {
175 int iBId; 159 throw Bu::MyriadException( Bu::MyriadException::streamOpen,
176 pIn->read( &iBId, 4 ); 160 "Cannot currently erase a stream while it is open.");
177 // sio << "Myriad: - iBId=" << iBId
178 // << ", iStartPos=" << iBId*iBlockSize
179 // << ", pIn->tell()=" << pIn->tell() << sio.nl;
180 s.aBlocks.append( iBId );
181 bsBlockUsed.setBit( iBId );
182 iUsed++;
183 if( (j == 0 && k == iHeaderBlocks-1) )
184 {
185 // sio << "Myriad: - End of prepartition, unlocking skipping."
186 // << sio.nl;
187// bCanSkip = true;
188 MyriadStream *pTmp = new MyriadStream( *this, aStreams[0] );
189 // sio << "Myriad - Position = " << pIn->tell() << sio.nl;
190 pTmp->setPos( pIn->tell() );
191 delete pIn;
192 delete pFakeHdr;
193 pIn = pTmp;
194 }
195 } 161 }
196 }
197 delete pIn;
198 162
199 for( int j = 0; j < iBlocks; j++ ) 163 for( Bu::Array<int32_t>::iterator i = pStream->aBlocks.begin(); i; i++ )
200 {
201 if( bsBlockUsed.getBit( j ) == false )
202 { 164 {
203// sio << "Preinitialized block " << j << " is free." << sio.nl; 165 releaseBlock( *i, false );
204 lFreeBlocks.append( j );
205 } 166 }
167 pStream->aBlocks.clear();
168 hStream.erase( iStream );
206 } 169 }
207// sio << "Myriad: Blocks used: " << bsBlockUsed.toString() << sio.nl; 170 delete pStream;
208 TRACE("mHeader unlocked...");
209} 171}
210 172
211void Bu::Myriad::initialize( int iBlockSize, int iPreAllocate ) 173void Bu::Myriad::setSize( Bu::Myriad::StreamId iStream,
174 int32_t iNewSize )
212{ 175{
213 MutexLocker mLock( mHeader ); 176 Stream *pStream;
214 TRACE("mHeader locked.");
215 lFreeBlocks.clear();
216
217 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
218 { 177 {
219 delete *i; 178 Bu::MutexLocker l( mhStream );
220 } 179 pStream = hStream.get( iStream );
221 aStreams.clear();
222 iUsed = 0;
223
224 int iHeaderSize = 14 + 8 + 4;
225 int iHeaderBlocks = 0; //blkDiv( iHeaderSize+4, iBlockSize );
226 char cBuf = 1;
227 int iBuf = 0;
228
229 Stream *pStr = new Stream;
230 pStr->iId = 0;
231
232 while( iHeaderSize > iHeaderBlocks*iBlockSize )
233 {
234 iHeaderBlocks = blkDiv( iHeaderSize+4, iBlockSize );
235 iHeaderSize = 14 + 8 + 4*iHeaderBlocks;
236 } 180 }
181 pStream->setSize( iNewSize );
182}
237 183
238 iPreAllocate += iHeaderBlocks; 184int32_t Bu::Myriad::getSize( StreamId iStream ) const
185{
186 Bu::MutexLocker l( mhStream );
187 return hStream.get( iStream )->getSize();
188}
239 189
240 //sio << "Myriad: iHeaderSize=" << iHeaderSize << ", iBlockSize=" 190bool Bu::Myriad::exists( StreamId iStream ) const
241 // << iBlockSize << ", iHeaderBlocks=" << iHeaderBlocks << sio.nl; 191{
242 192 Bu::MutexLocker l( mhStream );
243// bsBlockUsed.setSize( iPreAllocate, true ); 193 return hStream.has( iStream );
244 iUsed++; 194}
245 195
246 char *pBlock = new char[iBlockSize]; 196Bu::String Bu::Myriad::getLocation() const
247 memset( pBlock, 0, iBlockSize ); 197{
248 for( int j = 0; j < iPreAllocate; j++ ) 198 Bu::MutexLocker l( mAccess );
249 { 199 Bu::MutexLocker l2( mBacking );
250 sStore.write( pBlock, iBlockSize ); 200 return Bu::String("myriad(%1,%2):%3")
251 } 201 .arg( 1 ).arg( iBlockSize ).arg( rBacking.getLocation() );
252 delete[] (char *)pBlock; 202}
253 203
254 sStore.setPos( 0 ); 204int32_t Bu::Myriad::getBlockSize() const
205{
206 Bu::MutexLocker l( mAccess );
207 return iBlockSize;
208}
255 209
256 // Magic number 210int32_t Bu::Myriad::getTotalBlocks() const
257 sStore.write( Myriad_MAGIC_CODE, 4 ); 211{
212 Bu::MutexLocker l( mAccess );
213 return iBlockCount;
214}
258 215
259 // Version (0) 216int32_t Bu::Myriad::getUsedBlocks() const
260 sStore.write( &cBuf, 1 ); 217{
218 Bu::MutexLocker l( mAccess );
219 return iBlockCount-lFreeBlocks.getSize();
220}
261 221
262 // Bits per int 222int32_t Bu::Myriad::getFreeBlocks() const
263 cBuf = 32; 223{
264 sStore.write( &cBuf, 1 ); 224 Bu::MutexLocker l( mAccess );
225 return lFreeBlocks.getSize();
226}
265 227
266 // The size of each block 228int32_t Bu::Myriad::getTotalStreams() const
267 sStore.write( &iBlockSize, 4 ); 229{
230 Bu::MutexLocker l( mhStream );
231 return hStream.getSize();
232}
268 233
269 iBuf = 1; 234int32_t Bu::Myriad::getTotalUsedBytes() const
270 // The number of streams 235{
271 sStore.write( &iBuf, 4 ); 236 Bu::MutexLocker l( mhStream );
272 237 int32_t iTotal = 0;
273 // Stream header 238 for( StreamHash::const_iterator i = hStream.begin(); i; i++ )
274 iBuf = 0;
275 sStore.write( &iBuf, 4 );
276 sStore.write( &iHeaderSize, 4 );
277 for( iBuf = 0; iBuf < iHeaderBlocks; iBuf++ )
278 { 239 {
279 sStore.write( &iBuf, 4 ); 240 iTotal += i.getValue()->getSize();
280 } 241 }
281 242
282 this->iBlockSize = iBlockSize; 243 return iTotal;
283 this->iBlocks = iPreAllocate; 244}
284
285 pStr->iSize = sStore.tell();
286// sio << "Myriad: Actual end of header stream = " << pStr->iSize << sio.nl;
287 245
288 pStr->iSize = iHeaderSize; 246int32_t Bu::Myriad::getTotalUnusedBytes(int32_t iAssumeBlockSize ) const
289 for( int j = 0; j < iHeaderBlocks; j++ ) 247{
248 if( iAssumeBlockSize < 0 )
290 { 249 {
291// sio << "Started block " << j << " is header." << sio.nl; 250 iAssumeBlockSize = getBlockSize();
292 pStr->aBlocks.append( j );
293// bsBlockUsed.setBit( j );
294 iUsed++;
295 } 251 }
296 for( int j = iHeaderBlocks; j < this->iBlocks; j++ ) 252 int32_t iTotal = 0;
297 { 253 {
298// sio << "Started block " << j << " is free." << sio.nl; 254 Bu::MutexLocker l( mhStream );
299 lFreeBlocks.append( j ); 255 for( StreamHash::const_iterator i = hStream.begin(); i; i++ )
256 {
257 if( (i.getValue()->getSize()%iAssumeBlockSize) > 0 )
258 iTotal += iBlockSize-(i.getValue()->getSize()%iAssumeBlockSize);
259 }
300 } 260 }
301 261
302 aStreams.append( pStr ); 262 {
263 Bu::MutexLocker l( mAccess );
264 iTotal += lFreeBlocks.getSize()*iBlockSize;
265 }
303 266
304 //sio << bsBlockUsed.toString() << " - " << pStr->aBlocks << sio.nl; 267 return iTotal;
268}
305 269
306 bHeaderChanged = true; 270Bu::Myriad::StreamIdList Bu::Myriad::getStreamList() const
307 //hStreams.insert( 0, BlockArray( 0 ) ); 271{
308 TRACE("mHeader unlocked..."); 272 mhStream.lock();
273 StreamIdList lIds = hStream.getKeys();
274 mhStream.unlock();
275 lIds.sort();
276 if( lIds.first() == 0 )
277 {
278 lIds.eraseFirst();
279 }
280 return lIds;
309} 281}
310 282
311void Bu::Myriad::updateHeader() 283Bu::BitString Bu::Myriad::buildBlockUseMap() const
312{ 284{
313 MutexLocker mLock( mHeader ); 285 Bu::MutexLocker l( mAccess );
314 TRACE("mHeader locked."); 286 Bu::BitString bsMap( iBlockCount );
287 bsMap.fill();
288 for( IndexList::const_iterator i = lFreeBlocks.begin(); i; i++ )
289 {
290 bsMap.setBit( *i, false );
291 }
292 return bsMap;
293}
315 294
316 if( bHeaderChanged == false ) 295Bu::Myriad::StreamIdArray Bu::Myriad::buildBlockMap() const
296{
297 Bu::MutexLocker l( mAccess );
298 StreamIdArray bm( iBlockCount );
299 for( int j = 0; j < iBlockCount; j++ )
317 { 300 {
318 TRACE("mHeader unlocked..."); 301 bm.append( -1 );
319 return;
320 } 302 }
321 if( !sStore.canWrite() ) 303 Bu::MutexLocker l2( mhStream );
304 for( StreamHash::const_iterator iStream = hStream.begin();
305 iStream; iStream++ )
322 { 306 {
323 TRACE("mHeader unlocked..."); 307 int32_t iId = iStream.getKey();
324 return; 308 Stream *pStream = iStream.getValue();
309 for( Bu::Array<int32_t>::const_iterator iBlock =
310 pStream->aBlocks.begin(); iBlock; iBlock++ )
311 {
312 bm[*iBlock] = iId;
313 }
325 } 314 }
315 return bm;
316}
326 317
327 char cBuf; 318void Bu::Myriad::sync()
328 int iBuf; 319{
320 writeHeader();
321}
329 322
330 //for( StreamArray::iterator i = aStreams.begin(); i; i++ ) 323bool Bu::Myriad::loadMyriad()
331 //{ 324{
332 // sio << "Myriad: Stream " << Fmt(4) << (*i)->iId << ": " << (*i)->aBlocks << sio.nl; 325 //Bu::println("Load myriad!");
333 //} 326 char sMagicCode[4];
327 rBacking.setPos( 0 );
328 MyriadRead( sMagicCode, 4 );
329 if( memcmp( sMagicCode, Myriad_MAGIC_CODE, 4 ) )
330 {
331 throw Bu::MyriadException( Bu::MyriadException::invalidFormat,
332 "Backing stream does not seem to be a Myriad structure.");
333 }
334 uint8_t uVer;
335 uint8_t uBitsPerInt;
336 MyriadRead( &uVer, 1 );
337 if( uVer != 1 )
338 {
339 throw Bu::MyriadException( Bu::MyriadException::invalidFormat,
340 "Only version 1 myriad structures are supported.");
341 }
342 MyriadRead( &uBitsPerInt, 1 );
343 if( uBitsPerInt != 32 )
344 {
345 throw Bu::MyriadException( Bu::MyriadException::invalidFormat,
346 "Only 32 bits per int are supported at this time.");
347 }
348 MyriadRead( &iBlockSize, 4 );
334 349
335 // Compute the new size of the header. 350 iBlockCount = rBacking.getSize()/iBlockSize;
336 int iHeaderSize = 14 + 8*aStreams.getSize(); 351 if( (rBacking.getSize()%iBlockSize) != 0 )
337// sio << "Myriad: updateHeader: aStreams.getSize() = " << aStreams.getSize()
338// << sio.nl;
339 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
340 { 352 {
341 iHeaderSize += 4*(*i)->aBlocks.getSize(); 353 throw Bu::MyriadException( Bu::MyriadException::invalidFormat,
342// sio << "Myriad: updateHeader: (*i)->aBlocks.getSize() = " 354 "Backing stream is not cleanly divisibly by the block size.");
343// << (*i)->aBlocks.getSize() << sio.nl;
344 } 355 }
345 int iNewBlocks = blkDiv( iHeaderSize, iBlockSize ); 356
346 while( iNewBlocks > aStreams[0]->aBlocks.getSize() ) 357 Bu::Hash<int32_t,bool> hUnusedBlocks;
358 for( int32_t j = 0; j < iBlockCount; j++ )
347 { 359 {
348 int iBlock = findEmptyBlock(); 360 hUnusedBlocks.insert( j, true );
349// sio << "Myriad: updateHeader: Appending block " << iBlock
350// << " to header." << sio.nl;
351 aStreams[0]->aBlocks.append( iBlock );
352// bsBlockUsed.setBit( iBlock );
353 iUsed++;
354 iHeaderSize += 4;
355 iNewBlocks = blkDiv( iHeaderSize, iBlockSize );
356 } 361 }
357 aStreams[0]->iSize = iHeaderSize;
358// sio << "Myriad: updateHeader: iHeaderSize=" << iHeaderSize
359// << ", iNewBlocks=" << iNewBlocks << ", curBlocks="
360// << aStreams[0]->aBlocks.getSize() << sio.nl;
361 362
362 MyriadStream sHdr( *this, aStreams[0] ); 363 int iStreamCount;
363 sHdr.write( Myriad_MAGIC_CODE, 4 ); 364 MyriadRead( &iStreamCount, 4 );
365
366 //
367 // Read stream data -- Bootstrap the zero stream
368 //
369 StreamId iStream;
370 MyriadRead( &iStream, 4 );
371 if( iStream != 0 )
372 {
373 throw Bu::MyriadException( Bu::MyriadException::invalidFormat,
374 "The first stream defined must be the header/zero stream.");
375 }
376 iLastUsedIndex = iStream;
377 int32_t iHeaderStreamBytes;
378 MyriadRead( &iHeaderStreamBytes, 4 );
364 379
365 // Version (1) 380 Stream *pHeaderStream = new Stream( *this, iStream, iHeaderStreamBytes );
366 cBuf = 1; 381 hStream.insert( iStream, pHeaderStream );
367 sHdr.write( &cBuf, 1 ); 382 int iHeaderStreamBlocks = blkDiv(iHeaderStreamBytes, iBlockSize );
383 MyriadStream sHeader( *this, pHeaderStream, Read );
368 384
369 // Bits per int 385 // We need to read enough so that we can gurantee that we're within a block
370 cBuf = 32; 386 // that we have read the index to, plus one index.
371 sHdr.write( &cBuf, 1 ); 387 for( int32_t j = 0; j < iHeaderStreamBlocks; j++ )
388 {
389 int32_t iBlockIndex;
390 MyriadRead( &iBlockIndex, 4 );
391 hUnusedBlocks.erase( iBlockIndex );
392 pHeaderStream->aBlocks.append( iBlockIndex );
393 if( rBacking.tell()+4 <= (j+1)*iBlockSize )
394 break;
395 }
372 396
373 // The size of each block 397 // Bootstrap now using the header stream to read the rest of the data.
374 sHdr.write( &iBlockSize, 4 ); 398 sHeader.setPos( rBacking.tell() );
399 while( pHeaderStream->aBlocks.getSize() < iHeaderStreamBlocks )
400 {
401 int32_t iBlockIndex;
402 ReqRead( sHeader, &iBlockIndex, 4 );
403 hUnusedBlocks.erase( iBlockIndex );
404 pHeaderStream->aBlocks.append( iBlockIndex );
405 }
375 406
376 iBuf = aStreams.getSize(); 407 // Ok, now we can read the rest of the header in.
377 // The number of streams 408 for( int j = 1; j < iStreamCount; j++ )
378 sHdr.write( &iBuf, 4 );
379
380 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
381 { 409 {
382 sHdr.write( &(*i)->iId, 4 ); 410 int32_t iStreamBytes;
383 sHdr.write( &(*i)->iSize, 4 ); 411 ReqRead( sHeader, &iStream, 4 );
384 int iUsedBlocks = blkDiv( (*i)->iSize, iBlockSize ); 412 ReqRead( sHeader, &iStreamBytes, 4 );
385// for( BlockArray::iterator j = (*i)->aBlocks.begin(); j; j++ ) 413 Stream *pStream = new Stream( *this, iStream, iStreamBytes );
386 for( int j = 0; j < iUsedBlocks; j++ ) 414 int32_t iBlocks = blkDiv(iStreamBytes, iBlockSize );
415 for( int k = 0; k < iBlocks; k++ )
387 { 416 {
388 sHdr.write( &(*i)->aBlocks[j], 4 ); 417 int32_t iBlockIndex;
418 ReqRead( sHeader, &iBlockIndex, 4 );
419 hUnusedBlocks.erase( iBlockIndex );
420 pStream->aBlocks.append( iBlockIndex );
389 } 421 }
422 hStream.insert( iStream, pStream );
423 if( iLastUsedIndex < iStream )
424 iLastUsedIndex = iStream;
390 } 425 }
426
427 lFreeBlocks = hUnusedBlocks.getKeys();
428 //Bu::println("Free blocks: %1").arg( lFreeBlocks.getSize() );
391 429
392 bHeaderChanged = false; 430 bIsNewStream = false;
393 TRACE("mHeader unlocked..."); 431
432 return true;
394} 433}
395 434
396int Bu::Myriad::createStream( int iPreAllocate ) 435void Bu::Myriad::createMyriad( int32_t iBlockSize, int32_t iPreallocateBlocks )
397{ 436{
398 MutexLocker mLock( mHeader ); 437 if( iBlockSize < 8 )
399 TRACE("mHeader locked."); 438 {
439 throw Bu::MyriadException( Bu::MyriadException::invalidParameter,
440 "iBlockSize cannot be below 8");
441 }
442 if( rBacking.getSize() )
443 {
444 throw Bu::MyriadException( Bu::MyriadException::invalidFormat,
445 "Backing stream contains data, but not a myriad structure.");
446 }
447
448 // Start with the bytes for the file header and initial stream header
449 int iHeaderStreamBytes
450 = 14 // Base header
451 + 8; // Stream header
452
453 // Pick the block count that matches our current estimate for the header
454 // plus one block index.
455 int iHeaderStreamBlocks = blkDiv(iHeaderStreamBytes+4, iBlockSize );
400 456
401 Stream *pStr = new Stream(); 457 //Bu::println("Initial estimate: %1 bytes / %2 cur blocks, %3 computed blocks (%4 target bytes).").arg( iHeaderStreamBytes+(iHeaderStreamBlocks*4) ).arg( iHeaderStreamBlocks ).arg( blkDiv((iHeaderStreamBytes+(iHeaderStreamBlocks*4)), iBlockSize) ).arg( iHeaderStreamBlocks*iBlockSize );
402 pStr->iId = aStreams.last()->iId+1; 458 while( iHeaderStreamBytes+(iHeaderStreamBlocks*4)
403 //sio << "Myriad: New stream id=" << pStr->iId << ", iPreAllocate=" 459 > iHeaderStreamBlocks*iBlockSize )
404 // << iPreAllocate << sio.nl; 460 {
405 pStr->iSize = 0; 461 iHeaderStreamBlocks = blkDiv((iHeaderStreamBytes+((iHeaderStreamBlocks+1)*4)), iBlockSize);
406 aStreams.append( pStr ); 462 if( iHeaderStreamBlocks > 100 )
463 break;
464 //Bu::println(" Adjustment: %1 bytes / %2 cur blocks, %3 computed blocks (%4 target bytes).").arg( iHeaderStreamBytes+(iHeaderStreamBlocks*4) ).arg( iHeaderStreamBlocks ).arg( blkDiv((iHeaderStreamBytes+(iHeaderStreamBlocks*4)), iBlockSize) ).arg( iHeaderStreamBlocks*iBlockSize );
465 }
407 466
408 for( int j = 0; j < iPreAllocate; j++ ) 467 if( iPreallocateBlocks < iHeaderStreamBlocks )
409 { 468 {
410 int iFreeBlock = findEmptyBlock(); 469 iPreallocateBlocks = iHeaderStreamBlocks;
411// sio << "Myriad: Adding block " << iFreeBlock << sio.nl;
412 pStr->aBlocks.append( iFreeBlock );
413// bsBlockUsed.setBit( iFreeBlock );
414 iUsed++;
415 } 470 }
471 rBacking.setSize( iBlockSize*iPreallocateBlocks );
416 472
417 bHeaderChanged = true; 473 //
474 // Write Myriad header
475 //
476 uint8_t uVer = 1;
477 uint8_t uBpi = 32;
478 int32_t iStreamCount = 1;
479 rBacking.setPos( 0 );
480 rBacking.write( Myriad_MAGIC_CODE, 4 );
481 rBacking.write( &uVer, 1 );
482 rBacking.write( &uBpi, 1 );
483 rBacking.write( &iBlockSize, 4 );
484 rBacking.write( &iStreamCount, 4 );
418 485
419 TRACE("mHeader unlocked..."); 486 Stream *pHeadStream = new Stream( *this, 0, Bu::Myriad::ReadWrite );
420 return pStr->iId; 487 //
421} 488 // Write stream header
489 //
490 uint32_t uStreamId = 0;
491 uint32_t uStreamSize = iHeaderStreamBytes+iHeaderStreamBlocks*4;
492 rBacking.write( &uStreamId, 4 );
493 rBacking.write( &uStreamSize, 4 );
494 for( int iBlockIndex = 0; iBlockIndex < iHeaderStreamBlocks; iBlockIndex++ )
495 {
496 rBacking.write( &iBlockIndex, 4 );
497 pHeadStream->aBlocks.append( iBlockIndex );
498 }
499 rBacking.flush();
422 500
423int Bu::Myriad::createStreamWithId( int iId, int iPreAllocate ) 501 hStream.insert( pHeadStream->iStream, pHeadStream );
424{
425 MutexLocker mLock( mHeader );
426 TRACE("mHeader locked.");
427 502
428 try 503 for( int32_t j = iHeaderStreamBlocks; j < iPreallocateBlocks; j++ )
429 { 504 {
430 findStream( iId ); 505 lFreeBlocks.append( j );
431 TRACE("mHeader unlocked...");
432 throw MyriadException( MyriadException::streamExists,
433 "There is already a stream with the given id.");
434 } 506 }
435 catch( MyriadException &e ) 507 iLastUsedIndex = 0;
508 iBlockCount = iPreallocateBlocks;
509}
510
511void Bu::Myriad::writeHeader()
512{
513 Bu::MutexLocker l( mAccess );
514 if( !rBacking.isWritable() )
515 return;
516 //Bu::println("Writing stream breakdown:");
517 Bu::MemBuf mbHeader;
436 { 518 {
437 Stream *pStr = new Stream(); 519 Bu::MutexLocker l2( mhStream );
438 pStr->iId = iId; 520
439 //sio << "Myriad: New stream id=" << pStr->iId << ", iPreAllocate=" 521 int32_t iHdrStreamSize = __calcHeaderSize();
440 // << iPreAllocate << sio.nl; 522 // Maybe just do stream surgery here.
441 pStr->iSize = 0;
442 if( aStreams.last()->iId < iId )
443 {
444 aStreams.append( pStr );
445 }
446 else
447 { 523 {
448 for( StreamArray::iterator i = aStreams.begin(); i; i++ ) 524 Stream *psHeader = hStream.get( 0 );
525 Bu::MutexLocker l2( psHeader->mAccess );
526 int iNewBlocks = Bu::blkDiv( iHdrStreamSize, iBlockSize );
527 if( iHdrStreamSize < psHeader->iSize )
449 { 528 {
450 if( (*i)->iId > iId ) 529 while( psHeader->aBlocks.getSize() > iNewBlocks )
451 { 530 {
452 aStreams.insert( i, pStr ); 531 __releaseBlock( psHeader->aBlocks.last(), false );
453 break; 532 psHeader->aBlocks.eraseLast();
454 } 533 }
455 } 534 }
535 else if( iHdrStreamSize > psHeader->iSize )
536 {
537 while( psHeader->aBlocks.getSize() < iNewBlocks )
538 {
539 psHeader->aBlocks.append( __allocateBlock() );
540 }
541 }
542 psHeader->iSize = iHdrStreamSize;
456 } 543 }
457 544
458 for( int j = 0; j < iPreAllocate; j++ ) 545 //Bu::println("Computed header size: %1 bytes. Ver=%2, Bpi=%3, BlockSize=%4").arg( iHdrStreamSize ).arg( 1 ).arg( 32 ).arg( iBlockSize );
546
547 uint8_t uVer = 1;
548 uint8_t uBpi = 32;
549 int32_t iStreamCount = hStream.getSize();
550
551 mbHeader.write( Myriad_MAGIC_CODE, 4 );
552 mbHeader.write( &uVer, 1 );
553 mbHeader.write( &uBpi, 1 );
554 mbHeader.write( &iBlockSize, 4 );
555 mbHeader.write( &iStreamCount, 4 );
556 StreamHash::KeyList lStreamId = hStream.getKeys();
557 lStreamId.sort();
558 if( lStreamId.first() != 0 )
559 {
560 throw Bu::MyriadException( Bu::MyriadException::invalidFormat,
561 "There doesn't appear to be a zero (header) stream.");
562 }
563 for( StreamHash::KeyList::iterator i = lStreamId.begin(); i; i++ )
459 { 564 {
460 int iFreeBlock = findEmptyBlock(); 565 uint32_t uStreamId = *i;
461 // sio << "Myriad: Adding block " << iFreeBlock << sio.nl; 566 Stream *pStream = hStream.get( uStreamId );
462 pStr->aBlocks.append( iFreeBlock ); 567 uint32_t uStreamSize = pStream->getSize();
463// bsBlockUsed.setBit( iFreeBlock ); 568 mbHeader.write( &uStreamId, 4 );
464 iUsed++; 569 mbHeader.write( &uStreamSize, 4 );
570 int32_t iBlocks = Bu::blkDiv( uStreamSize, (uint32_t)iBlockSize );
571 Bu::Array<int32_t> aBlocks = pStream->getBlockList();
572
573 //Bu::println(" Stream %1 is %2 bytes %3 blocks (%4 blocks computed)").arg( *i ).arg( uStreamSize ).arg( aBlocks.getSize() ).arg( Bu::blkDiv( (int)uStreamSize, (int)iBlockSize ) );
574
575// for( Bu::Array<int32_t>::iterator i = aBlocks.begin(); i; i++ )
576 for( int j = 0; j < iBlocks; j++ )
577 {
578 mbHeader.write( &aBlocks[j], 4 );
579 }
465 } 580 }
466 581
467 bHeaderChanged = true;
468 582
469 TRACE("mHeader unlocked...");
470 return pStr->iId;
471 } 583 }
472 TRACE("mHeader unlocked...");
473}
474 584
475int Bu::Myriad::findEmptyBlock() 585 Bu::MyriadStream sHeader( *this, hStream.get( 0 ), Bu::Myriad::Write );
476{ 586 sHeader.write( mbHeader.getString() );
477 bHeaderChanged = true; 587 bStructureChanged = false;
478
479 if( lFreeBlocks.isEmpty() )
480 {
481 sStore.setSize( (iBlocks+1)*(Bu::size)iBlockSize );
482 return iBlocks++;
483 }
484 else
485 {
486 return lFreeBlocks.dequeue();
487 }
488} 588}
489 589
490void Bu::Myriad::deleteStream( int iId ) 590int32_t Bu::Myriad::__calcHeaderSize()
491{ 591{
492 MutexLocker mLock( mHeader ); 592 int32_t iHdrSize = 4+1+1+4+4;
493 TRACE("mHeader locked.");
494 593
495 if( iId < 0 ) 594 StreamHash::KeyList lStreamId = hStream.getKeys();
595 lStreamId.sort();
596 if( lStreamId.first() != 0 )
496 { 597 {
497 TRACE("mHeader unlocked..."); 598 throw Bu::MyriadException( Bu::MyriadException::invalidFormat,
498 throw MyriadException( MyriadException::invalidStreamId, 599 "There doesn't appear to be a zero (header) stream.");
499 "Invalid stream id.");
500 } 600 }
501 if( iId == 0 ) 601 for( StreamHash::KeyList::iterator i = lStreamId.begin(); i; i++ )
502 { 602 {
503 TRACE("mHeader unlocked..."); 603 iHdrSize += 4+4;
504 throw MyriadException( MyriadException::protectedStream, 604 int32_t iStreamSize = hStream.get( *i )->getSize();
505 "You cannot delete stream zero, it is protected."); 605 if( (*i) != 0 )
506 }
507 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
508 {
509 if( (*i)->iId == iId )
510 { 606 {
511 Stream *pStream = *i; 607 iHdrSize += Bu::blkDiv( iStreamSize, iBlockSize )*4;
512 for( BlockArray::iterator j = pStream->aBlocks.begin(); j; j++ )
513 {
514 lFreeBlocks.append( *j );
515// bsBlockUsed.setBit( *j, false );
516 iUsed--;
517 }
518 aStreams.erase( i );
519 bHeaderChanged = true;
520 delete pStream;
521 TRACE("mHeader unlocked...");
522 return;
523 } 608 }
524 } 609 }
525 TRACE("mHeader unlocked...");
526}
527 610
528Bu::Array<int> Bu::Myriad::getStreamIds() 611 //Bu::println("HeaderCalc:");
529{ 612 //Bu::println(" Base (no header stream): %1").arg( iHdrSize );
530 MutexLocker mLock( mHeader ); 613 int32_t iNewSize = iHdrSize;
531 TRACE("mHeader locked."); 614 int32_t iOldSize;
532 615
533 Bu::Array<int> aRet( aStreams.getSize() ); 616 do {
534 for( StreamArray::iterator i = aStreams.begin(); i; i++ ) 617 iOldSize = iNewSize;
535 { 618 iNewSize = iHdrSize + Bu::blkDiv(iNewSize, iBlockSize)*4;
536 aRet.append( (*i)->iId ); 619 //Bu::println(" Recomp: %1").arg( iNewSize );
537 } 620 } while( iOldSize != iNewSize );
538 621
539 TRACE("mHeader unlocked..."); 622 return iNewSize;
540 return aRet;
541} 623}
542 624
543int Bu::Myriad::getStreamSize( int iId ) 625int32_t Bu::Myriad::allocateBlock()
544{ 626{
545 MutexLocker mLock( mHeader ); 627 Bu::MutexLocker l( mAccess );
546 TRACE("mHeader locked."); 628 return __allocateBlock();
547
548 TRACE("mHeader unlocked...");
549 return findStream( iId )->iSize;
550} 629}
551 630
552bool Bu::Myriad::hasStream( int iId ) 631int32_t Bu::Myriad::__allocateBlock()
553{ 632{
554 MutexLocker mLock( mHeader ); 633 bStructureChanged = true;
555 TRACE("mHeader locked."); 634 if( lFreeBlocks.isEmpty() )
556
557 try
558 { 635 {
559 findStream( iId ); 636 // Increase the size of the backing stream
560 TRACE("mHeader unlocked..."); 637 int32_t iIndex = iBlockCount++;
561 return true; 638 rBacking.setSize( iBlockCount*iBlockSize );
562 }catch(...) 639 return iIndex;
640 }
641 else
563 { 642 {
564 TRACE("mHeader unlocked..."); 643 // Provide an existing free block.
565 return false; 644 return lFreeBlocks.peekPop();
566 } 645 }
567} 646}
568 647
569Bu::MyriadStream Bu::Myriad::openStream( int iId ) 648void Bu::Myriad::releaseBlock( int32_t iBlockId, bool bBlank )
570{ 649{
571 MutexLocker mLock( mHeader ); 650 Bu::MutexLocker l( mAccess );
572 TRACE("mHeader locked."); 651 __releaseBlock( iBlockId, bBlank );
573
574 TRACE("mHeader unlocked...");
575 //sio << "Myriad: Request to open stream: " << iId << sio.nl;
576 return MyriadStream( *this, findStream( iId ) );
577} 652}
578 653
579int Bu::Myriad::getNumStreams() 654void Bu::Myriad::__releaseBlock( int32_t iBlockId, bool bBlank )
580{ 655{
581 MutexLocker mLock( mHeader ); 656 bStructureChanged = true;
582 TRACE("mHeader locked."); 657 lFreeBlocks.append( iBlockId );
583 658 if( bBlank )
584 TRACE("mHeader unlocked..."); 659 {
585 return aStreams.getSize(); 660 blankBlock( iBlockId );
661 }
586} 662}
587 663
588int Bu::Myriad::getBlockSize() 664void Bu::Myriad::blankBlock( int32_t iBlockId )
589{ 665{
590 return iBlockSize; 666 Bu::MutexLocker l( mBacking );
667 rBacking.setPos( iBlockId*iBlockSize );
668 int32_t iChunk = std::min( iBlockSize, 4096 );
669 uint8_t *pChunk = new uint8_t[iChunk];
670 memset( pChunk, 0, iChunk );
671 int iLeft = iBlockSize;
672 while( iLeft > 0 )
673 {
674 int32_t iWrite = rBacking.write( pChunk, std::min( iChunk, iLeft ) );
675 iLeft -= iWrite;
676 }
677 delete[] pChunk;
591} 678}
592 679
593int Bu::Myriad::getNumBlocks() 680void Bu::Myriad::openStream( StreamId id )
594{ 681{
595 return iBlocks; 682 Bu::MutexLocker l( mhStream );
683 hStream.get( id )->open();
596} 684}
597 685
598int Bu::Myriad::getNumUsedBlocks() 686void Bu::Myriad::closeStream( StreamId id )
599{ 687{
600 return iUsed; 688 Bu::MutexLocker l( mhStream );
689 hStream.get( id )->close();
601} 690}
602 691
603Bu::size Bu::Myriad::getTotalUsedBytes() 692int32_t Bu::Myriad::blockRead( int32_t iBlock, int32_t iStart,
693 void *pTarget, int32_t iSize )
604{ 694{
605 MutexLocker mLock( mHeader ); 695 int32_t iUpperSize = iBlockSize - (iStart%iBlockSize);
606 TRACE("mHeader locked."); 696/* Bu::println("Max read within block: %1 vs %2 (start=%3, blocksize=%4)")
697 .arg( iUpperSize ).arg( iSize )
698 .arg( iStart ).arg( iBlockSize );
699*/
700 int32_t iAmnt = std::min( iSize, iUpperSize );
701 Bu::MutexLocker l( mBacking );
702 rBacking.setPos( iBlockSize*iBlock + iStart );
607 703
608 Bu::size iTotalSize = 0; 704 return rBacking.read( pTarget, iAmnt );
609 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
610 {
611 iTotalSize += (*i)->iSize;
612 }
613 TRACE("mHeader unlocked...");
614 return iTotalSize;
615} 705}
616 706
617Bu::size Bu::Myriad::getTotalUnusedBytes() 707int32_t Bu::Myriad::blockWrite( int32_t iBlock, int32_t iStart,
708 const void *pTarget, int32_t iSize )
618{ 709{
619 MutexLocker mLock( mHeader ); 710 int32_t iUpperSize = iBlockSize - (iStart%iBlockSize);
620 TRACE("mHeader locked."); 711/* Bu::println("Max write within block: %1 vs %2 (start=%3, blocksize=%4)")
712 .arg( iUpperSize ).arg( iSize )
713 .arg( iStart ).arg( iBlockSize );
714*/
715 int32_t iAmnt = std::min( iSize, iUpperSize );
716 Bu::MutexLocker l( mBacking );
717 rBacking.setPos( iBlock*iBlockSize + iStart );
621 718
622 Bu::size iTotalSize = (iBlocks-iUsed)*iBlockSize; 719 return rBacking.write( pTarget, iAmnt );
623 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
624 {
625 iTotalSize += iBlockSize - ((Bu::size)(*i)->iSize%iBlockSize);
626 }
627 TRACE("mHeader unlocked...");
628 return iTotalSize;
629} 720}
630 721
631Bu::size Bu::Myriad::getTotalUnusedBytes( int iFakeBlockSize ) 722/////////
632{ 723// Bu::Myriad::Stream
633 MutexLocker mLock( mHeader ); 724//
634 TRACE("mHeader locked.");
635 725
636 Bu::size iTotalSize = (iBlocks-iUsed)*iFakeBlockSize; 726Bu::Myriad::Stream::Stream( Bu::Myriad &rParent, Bu::Myriad::StreamId iStream,
637 for( StreamArray::iterator i = aStreams.begin(); i; i++ ) 727 int32_t iSize ) :
638 { 728 rParent( rParent ),
639 iTotalSize += iFakeBlockSize - ((*i)->iSize%iFakeBlockSize); 729 iStream( iStream ),
640 } 730 iSize( iSize ),
641 TRACE("mHeader unlocked..."); 731 iOpenCount( 0 )
642 return iTotalSize; 732{
643} 733}
644 734
645Bu::Myriad::Stream *Bu::Myriad::findStream( int iId ) 735Bu::Myriad::Stream::~Stream()
646{ 736{
647 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
648 {
649 if( (*i)->iId == iId )
650 return *i;
651 }
652
653 throw MyriadException( MyriadException::noSuchStream,
654 "The requested stream doesn't exist and cannot be opened." );
655
656 return NULL;
657} 737}
658 738
659Bu::Myriad::Block *Bu::Myriad::getBlock( int iBlock ) 739int32_t Bu::Myriad::Stream::getSize() const
660{ 740{
661// sio << "Myriad: Reading block " << iBlock << ", bytes " 741 Bu::MutexLocker l( mAccess );
662// << iBlockSize*iBlock << "-" << iBlockSize*(iBlock+1) << sio.nl; 742 return iSize;
663 Block *pBlock = new Block; 743}
664 pBlock->pData = new char[iBlockSize];
665 sStore.setPos( iBlockSize * (Bu::size)iBlock );
666 sStore.read( pBlock->pData, iBlockSize );
667 pBlock->bChanged = false;
668 pBlock->iBlockIndex = iBlock;
669
670 mActiveBlocks.lock();
671 TRACE("mHeader locked.");
672 hActiveBlocks.insert( iBlock, pBlock );
673 TRACE("mHeader unlocked...");
674 mActiveBlocks.unlock();
675 744
676 return pBlock; 745int32_t Bu::Myriad::Stream::getBlockSize() const
746{
747 Bu::MutexLocker l( mAccess );
748 return rParent.iBlockSize;
677} 749}
678 750
679void Bu::Myriad::releaseBlock( Bu::Myriad::Block *pBlock ) 751Bu::Myriad::StreamId Bu::Myriad::Stream::getStreamId() const
680{ 752{
681 if( pBlock == NULL ) 753 return iStream;
682 return; 754}
683// sio << "Myriad: Releasing block " << pBlock->iBlockIndex << sio.nl;
684 syncBlock( pBlock );
685 mActiveBlocks.lock();
686 TRACE("mHeader locked.");
687 hActiveBlocks.erase( pBlock->iBlockIndex );
688 TRACE("mHeader unlocked...");
689 mActiveBlocks.unlock();
690 755
691 delete[] pBlock->pData; 756int32_t Bu::Myriad::Stream::getOpenCount() const
692 delete pBlock; 757{
758 Bu::MutexLocker l( mAccess );
759 return iOpenCount;
693} 760}
694 761
695void Bu::Myriad::syncBlock( Block *pBlock ) 762void Bu::Myriad::Stream::setSize( int32_t iNewSize )
696{ 763{
697 if( pBlock->bChanged ) 764 // Two possible modes, shrink or grow.
765 Bu::MutexLocker l( mAccess );
766 int iNewBlocks = Bu::blkDiv( iNewSize, rParent.iBlockSize );
767 if( iNewSize < iSize )
698 { 768 {
699// sio << "Myriad: - Block changed, writing back to stream." << sio.nl; 769 // Shrink it
700 sStore.setPos( iBlockSize * (Bu::size)pBlock->iBlockIndex ); 770 while( aBlocks.getSize() > iNewBlocks )
701 sStore.write( pBlock->pData, iBlockSize ); 771 {
702 pBlock->bChanged = false; 772 rParent.releaseBlock( aBlocks.last(), false );
773 aBlocks.eraseLast();
774 }
775 iSize = iNewSize;
776 }
777 else if( iNewSize > iSize )
778 {
779 // Grow it
780 while( aBlocks.getSize() < iNewBlocks )
781 {
782 aBlocks.append( rParent.allocateBlock() );
783 }
784 iSize = iNewSize;
703 } 785 }
704} 786}
705 787
706int Bu::Myriad::streamAddBlock( Stream *pStream ) 788int32_t Bu::Myriad::Stream::read( int32_t iStart, void *pTarget,
789 int32_t iSize )
707{ 790{
708 MutexLocker mLock( mHeader ); 791 int32_t iRead = 0;
709 TRACE("mHeader locked."); 792 Bu::MutexLocker l( mAccess );
710 793
711 int iBlock = findEmptyBlock(); 794 if( iStart >= this->iSize )
712 pStream->aBlocks.append( iBlock ); 795 return 0;
713// bsBlockUsed.setBit( iBlock );
714// bHeaderChanged = true;
715 iUsed++;
716 TRACE("mHeader unlocked...");
717 return iBlock;
718}
719
720void Bu::Myriad::setStreamSize( Stream *pStream, long iSize )
721{
722 MutexLocker mLock( mHeader );
723 TRACE("mHeader locked.");
724 796
725 if( pStream->iSize == iSize ) 797 if( iStart+iSize >= this->iSize )
726 { 798 {
727 TRACE("mHeader unlocked..."); 799 iSize = this->iSize-iStart;
728 return;
729 } 800 }
730 else if( pStream->iSize > iSize ) 801
802 while( iSize > 0 )
731 { 803 {
732 // Shrink 804 int32_t iBlock = aBlocks[iStart/rParent.iBlockSize];
733 TRACE(Bu::String("Shrink stream %1 from %2 to %3").arg(pStream->iId).arg(pStream->iSize).arg(iSize).end() ); 805 int32_t iChunkRead = rParent.blockRead(
734 for( int iNewSize = pStream->aBlocks.getSize()*iBlockSize; 806 iBlock, iStart%rParent.iBlockSize, pTarget, iSize
735 iNewSize-iBlockSize > iSize; iNewSize -= iBlockSize ) 807 );
736 { 808 if( iChunkRead == 0 )
737// if( bsBlockUsed.getBit( pStream->aBlocks.last() ) ) 809 break;
738 iUsed--; 810 iRead += iChunkRead;
739// else 811 iStart += iChunkRead;
740// sio << "Unused block used in stream? " << pStream->aBlocks.last() << sio.nl; 812 reinterpret_cast<ptrdiff_t &>(pTarget) += iChunkRead;
741 lFreeBlocks.enqueue( pStream->aBlocks.last() ); 813 iSize -= iChunkRead;
742// bsBlockUsed.setBit( pStream->aBlocks.last(), false );
743 pStream->aBlocks.eraseLast();
744 }
745 pStream->iSize = iSize;
746 bHeaderChanged = true;
747 } 814 }
748 else 815
816 return iRead;
817}
818
819int32_t Bu::Myriad::Stream::write( int32_t iStart, const void *pTarget,
820 int32_t iSize )
821{
822 int32_t iWrite = 0;
823 Bu::MutexLocker l( mAccess );
824 while( iSize > 0 )
749 { 825 {
750 // Grow 826 int32_t iBlockIdx = iStart/rParent.iBlockSize;
751 TRACE(Bu::String("Grow stream %1 from %2 to %3").arg(pStream->iId).arg(pStream->iSize).arg(iSize).end() ); 827 while( iBlockIdx >= aBlocks.getSize() )
752 for( int iNewSize = pStream->aBlocks.getSize()*iBlockSize;
753 iNewSize < iSize; iNewSize += iBlockSize )
754 { 828 {
755 //streamAddBlock( pStream ); 829 aBlocks.append( rParent.allocateBlock() );
756 int iBlock = findEmptyBlock();
757 pStream->aBlocks.append( iBlock );
758// bsBlockUsed.setBit( iBlock );
759// bHeaderChanged = true;
760 iUsed++;
761 } 830 }
762 pStream->iSize = iSize; 831 int32_t iBlock = aBlocks[iBlockIdx];
763 bHeaderChanged = true; 832 int32_t iChunkWrite = rParent.blockWrite(
833 iBlock, iStart%rParent.iBlockSize, pTarget, iSize
834 );
835 if( iChunkWrite == 0 )
836 break;
837 iWrite += iChunkWrite;
838 iStart += iChunkWrite;
839 reinterpret_cast<ptrdiff_t &>(pTarget) += iChunkWrite;
840 iSize -= iChunkWrite;
764 } 841 }
765 TRACE("mHeader unlocked..."); 842 if( this->iSize < iStart )
843 this->iSize = iStart;
844
845 return iWrite;
766} 846}
767 847
768void Bu::Myriad::headerChanged() 848Bu::String Bu::Myriad::Stream::getLocation() const
769{ 849{
770 bHeaderChanged = true; 850 Bu::MutexLocker l( mAccess );
851 return Bu::String("%1:stream %2")\
852 .arg( rParent.getLocation() ).arg( iStream );
771} 853}
772 854
773bool Bu::Myriad::isMyriad( Bu::Stream &sStore ) 855Bu::Array<int32_t> Bu::Myriad::Stream::getBlockList() const
774{ 856{
775 uint8_t uTmp; 857 Bu::MutexLocker l( mAccess );
776 858 return aBlocks.clone();
777 return isMyriad( sStore, uTmp );
778} 859}
779 860
780bool Bu::Myriad::isMyriad( Bu::Stream &sStore, uint8_t &uTmp ) 861void Bu::Myriad::Stream::open()
781{ 862{
782 sStore.setPos( 0 ); 863 Bu::MutexLocker l( mAccess );
783 864 iOpenCount++;
784 unsigned char buf[4];
785 if( sStore.read( buf, 4 ) < 4 )
786 throw MyriadException( MyriadException::emptyStream,
787 "Input stream appears to be empty.");
788 sStore.read( &uTmp, 1 );
789 sStore.setPos( 0 );
790 if( memcmp( buf, Myriad_MAGIC_CODE, 4 ) )
791 {
792 return false;
793 }
794 return true;
795} 865}
796 866
797const Bu::BitString Bu::Myriad::getBlocksUsed() const 867bool Bu::Myriad::Stream::close()
798{ 868{
799 Bu::BitString bs( iBlocks, false ); 869 Bu::MutexLocker l( mAccess );
800 for( int j = 0; j < iBlocks; j++ ) 870 return (bool)(--iOpenCount);
801 bs.setBit( j );
802 for( IndexList::const_iterator i = lFreeBlocks.begin(); i; i++ )
803 bs.setBit( *i, false );
804 return bs;
805} 871}
806 872