aboutsummaryrefslogtreecommitdiff
path: root/src/stable/myriad.cpp
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/stable/myriad.cpp930
1 files changed, 222 insertions, 708 deletions
diff --git a/src/stable/myriad.cpp b/src/stable/myriad.cpp
index c606369..5278ac5 100644
--- a/src/stable/myriad.cpp
+++ b/src/stable/myriad.cpp
@@ -1,25 +1,18 @@
1/*
2 * Copyright (C) 2007-2023 Xagasoft, All rights reserved.
3 *
4 * This file is part of the libbu++ library and is released under the
5 * terms of the license contained in the file LICENSE.
6 */
7
8#include "bu/config.h"
9#include "bu/myriad.h" 1#include "bu/myriad.h"
10#include "bu/stream.h"
11#include "bu/myriadstream.h" 2#include "bu/myriadstream.h"
3
12#include "bu/mutexlocker.h" 4#include "bu/mutexlocker.h"
13#include <stdio.h> 5#include "bu/util.h"
14 6
15#include "bu/sio.h" 7#include "bu/sio.h"
16using Bu::sio;
17using Bu::Fmt;
18 8
19#define Myriad_MAGIC_CODE ((unsigned char *)"\x0a\xd3\xfa\x84") 9#define Myriad_MAGIC_CODE ((unsigned char *)"\x0a\xd3\xfa\x84")
20 10
21// #define TRACE( x ) Bu::println("%1:%2: %3: %4 - %5").arg(__FILE__).arg( __LINE__ ).arg(__PRETTY_FUNCTION__).arg(sStore.getLocation()).arg(x) 11#define MyriadRead( target, size ) if( rBacking.read( target, size ) < size ) \
22#define TRACE( x ) (void)0 12{ \
13 throw Bu::MyriadException( Bu::MyriadException::invalidFormat, \
14 "Insufficent data reading myriad data from backing stream."); \
15} (void)0
23 16
24namespace Bu 17namespace Bu
25{ 18{
@@ -29,803 +22,324 @@ namespace Bu
29 } 22 }
30} 23}
31 24
32Bu::Myriad::Myriad( Bu::Stream &sStore, int iBlockSize, int iPreallocate ) : 25Bu::Myriad::Myriad( Bu::Stream &rBacking, int iBlockSize,
33 sStore( sStore ), 26 int iPreallocateBlocks ) :
27 rBacking( rBacking ),
34 iBlockSize( iBlockSize ), 28 iBlockSize( iBlockSize ),
35 iBlocks( 0 ), 29 iBlockCount( 0 ),
36 iUsed( 0 ), 30 bIsNewStream( true )
37 bHeaderChanged( false )
38{ 31{
39 try 32 if( !rBacking.isSeekable() )
40 { 33 {
41 initialize(); 34 throw Bu::MyriadException( Bu::MyriadException::invalidBackingStream,
35 "Myriad backing stream must be random access (seekable).");
42 } 36 }
43 catch( Bu::MyriadException &e ) 37 if( !loadMyriad() )
44 { 38 {
45 if( e.getErrorCode() == MyriadException::emptyStream ) 39 createMyriad( iBlockSize, iPreallocateBlocks );
46 {
47 initialize( iBlockSize, iPreallocate );
48 }
49 else
50 {
51 throw;
52 }
53 } 40 }
54} 41}
55 42
56Bu::Myriad::~Myriad() 43Bu::Myriad::~Myriad()
57{ 44{
58 mActiveBlocks.lock();
59 TRACE("mActiveBlocks locked.");
60 if( !hActiveBlocks.isEmpty() )
61 {
62 sio << "Bu::Myriad::~Myriad(): Error: There are "
63 << hActiveBlocks.getSize() << " unsynced blocks!" << sio.nl;
64 }
65 TRACE("mActiveBlocks unlocking...");
66 mActiveBlocks.unlock();
67 sync();
68
69 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
70 {
71 delete *i;
72 }
73} 45}
74 46
75void Bu::Myriad::sync() 47Bu::MyriadStream Bu::Myriad::create( Bu::Myriad::Mode /*eMode*/,
48 int32_t /*iPreallocateBytes*/ )
76{ 49{
77 updateHeader(); 50 return Bu::MyriadStream( *this, NULL, (Mode)0 );
78
79 mActiveBlocks.lock();
80 TRACE("mActiveBlocks locked.");
81 for( BlockHash::iterator i = hActiveBlocks.begin(); i; i++ )
82 {
83 if( (*i)->bChanged )
84 {
85 syncBlock( *i );
86 }
87 }
88 TRACE("mActiveBlocks unlocked...");
89 mActiveBlocks.unlock();
90} 51}
91 52
92void Bu::Myriad::initialize() 53Bu::MyriadStream Bu::Myriad::open( Bu::Myriad::StreamId iStream,
54 Bu::Myriad::Mode eMode )
93{ 55{
94 MutexLocker mLock( mHeader ); 56 Bu::MutexLocker l( mAccess );
95 TRACE("mHeader locked."); 57 if( !hStream.has( iStream ) )
96 lFreeBlocks.clear();
97 sStore.setPosEnd( 0 );
98 Bu::size iSize = sStore.tell();
99 sStore.setPos( 0 );
100
101 unsigned char buf[4];
102 if( sStore.read( buf, 4 ) < 4 )
103 {
104 TRACE("mHeader unlocked...");
105 throw MyriadException( MyriadException::emptyStream,
106 "Input stream appears to be empty.");
107 }
108 if( memcmp( buf, Myriad_MAGIC_CODE, 4 ) )
109 {
110 TRACE("mHeader unlocked...");
111 throw MyriadException( MyriadException::invalidFormat,
112 "Stream does not appear to be a valid Myriad format.");
113 }
114 sStore.read( buf, 2 );
115 if( buf[0] != 1 )
116 {
117 TRACE("mHeader unlocked...");
118 throw MyriadException( MyriadException::badVersion,
119 "We can only handle version 1 for now.");
120 }
121 if( buf[1] != 32 )
122 {
123 TRACE("mHeader unlocked...");
124 throw MyriadException( MyriadException::invalidWordSize,
125 "We can only handle 32-bit words at the moment.");
126 }
127 sStore.read( &iBlockSize, 4 );
128 int iStreams;
129 sStore.read( &iStreams, 4 );
130
131 iBlocks = iSize/iBlockSize;
132 //sio << "Myriad: iSize=" << iSize << ", iBlockSize=" << iBlockSize
133 // << ", iBlocks=" << iBlocks << ", iStreams=" << iStreams << sio.nl;
134
135 int iHeaderSize = 14 + 8 + 4;
136 int iHeaderBlocks = 0; //blkDiv( iHeaderSize+4, iBlockSize );
137
138 while( iHeaderSize > iHeaderBlocks*iBlockSize )
139 {
140 iHeaderBlocks = blkDiv( iHeaderSize+4, iBlockSize );
141 iHeaderSize = 14 + 8 + 4*iHeaderBlocks;
142 }
143
144 //sio << "Myriad: iHeaderSize=" << iHeaderSize
145 // << ", iHeaderBlocks=" << iHeaderBlocks << sio.nl;
146
147 Stream *pFakeHdr = new Stream;
148 pFakeHdr->iId = 0;
149 pFakeHdr->setSize( iHeaderSize );
150 for( int j = 0; j < iHeaderBlocks; j++ )
151 {
152 pFakeHdr->aBlocks.append( j );
153 }
154
155// sio << "Blocks: " << iBlocks << " (size = " << iSize << "/" << iBlockSize
156// << ")" << sio.nl;
157 Bu::BitString bsBlockUsed( iBlocks, false );
158 bsBlockUsed.clear();
159
160// bool bCanSkip = false; // Can skip around, post initial header stream i/o
161 MyriadStream *pIn = new MyriadStream( *this, pFakeHdr );
162 pIn->setPos( sStore.tell() );
163 for( int j = 0; j < iStreams; j++ )
164 { 58 {
165 int iSizeTmp; 59 throw Bu::MyriadException( MyriadException::noSuchStream,
166 aStreams.append( new Stream() ); 60 "No such stream.");
167 Stream &s = *aStreams[j];
168 pIn->read( &s.iId, 4 );
169 pIn->read( &iSizeTmp, 4 );
170 s.setSize( iSizeTmp );
171 int iSBlocks = blkDiv(s.getSize(), iBlockSize);
172 // sio << "Myriad: - Stream::iId=" << s.iId
173 // << ", Stream::iSize=" << s.iSize
174 // << ", Stream::aBlocks=" << iSBlocks
175 // << ", pIn->tell()=" << pIn->tell() << sio.nl;
176 for( int k = 0; k < iSBlocks; k++ )
177 {
178 int iBId;
179 pIn->read( &iBId, 4 );
180 // sio << "Myriad: - iBId=" << iBId
181 // << ", iStartPos=" << iBId*iBlockSize
182 // << ", pIn->tell()=" << pIn->tell() << sio.nl;
183 s.aBlocks.append( iBId );
184 bsBlockUsed.setBit( iBId );
185 iUsed++;
186 if( (j == 0 && k == iHeaderBlocks-1) )
187 {
188 // sio << "Myriad: - End of prepartition, unlocking skipping."
189 // << sio.nl;
190// bCanSkip = true;
191 MyriadStream *pTmp = new MyriadStream( *this, aStreams[0] );
192 // sio << "Myriad - Position = " << pIn->tell() << sio.nl;
193 pTmp->setPos( pIn->tell() );
194 delete pIn;
195 delete pFakeHdr;
196 pIn = pTmp;
197 }
198 }
199 } 61 }
200 delete pIn;
201
202 for( int j = 0; j < iBlocks; j++ )
203 { 62 {
204 if( bsBlockUsed.getBit( j ) == false ) 63 Bu::MutexLocker l2( mBacking );
64 if( (eMode&Write) && rBacking.isWritable() )
205 { 65 {
206// sio << "Preinitialized block " << j << " is free." << sio.nl; 66 throw Bu::MyriadException( MyriadException::badMode,
207 lFreeBlocks.append( j ); 67 "Backing stream does not support writing.");
208 } 68 }
209 } 69 }
210// sio << "Myriad: Blocks used: " << bsBlockUsed.toString() << sio.nl; 70 return Bu::MyriadStream( *this, hStream.get( iStream ), eMode );
211 TRACE("mHeader unlocked...");
212} 71}
213 72
214void Bu::Myriad::initialize( int iBlockSize, int iPreAllocate ) 73bool Bu::Myriad::erase( Bu::Myriad::StreamId /*iStream*/ )
215{ 74{
216 MutexLocker mLock( mHeader ); 75 return false;
217 TRACE("mHeader locked.");
218 lFreeBlocks.clear();
219
220 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
221 {
222 delete *i;
223 }
224 aStreams.clear();
225 iUsed = 0;
226
227 int iHeaderSize = 14 + 8 + 4;
228 int iHeaderBlocks = 0; //blkDiv( iHeaderSize+4, iBlockSize );
229 char cBuf = 1;
230 int iBuf = 0;
231
232 Stream *pStr = new Stream;
233 pStr->iId = 0;
234
235 while( iHeaderSize > iHeaderBlocks*iBlockSize )
236 {
237 iHeaderBlocks = blkDiv( iHeaderSize+4, iBlockSize );
238 iHeaderSize = 14 + 8 + 4*iHeaderBlocks;
239 }
240
241 iPreAllocate += iHeaderBlocks;
242
243 //sio << "Myriad: iHeaderSize=" << iHeaderSize << ", iBlockSize="
244 // << iBlockSize << ", iHeaderBlocks=" << iHeaderBlocks << sio.nl;
245
246// bsBlockUsed.setSize( iPreAllocate, true );
247 iUsed++;
248
249 char *pBlock = new char[iBlockSize];
250 memset( pBlock, 0, iBlockSize );
251 for( int j = 0; j < iPreAllocate; j++ )
252 {
253 sStore.write( pBlock, iBlockSize );
254 }
255 delete[] (char *)pBlock;
256
257 sStore.setPos( 0 );
258
259 // Magic number
260 sStore.write( Myriad_MAGIC_CODE, 4 );
261
262 // Version (0)
263 sStore.write( &cBuf, 1 );
264
265 // Bits per int
266 cBuf = 32;
267 sStore.write( &cBuf, 1 );
268
269 // The size of each block
270 sStore.write( &iBlockSize, 4 );
271
272 iBuf = 1;
273 // The number of streams
274 sStore.write( &iBuf, 4 );
275
276 // Stream header
277 iBuf = 0;
278 sStore.write( &iBuf, 4 );
279 sStore.write( &iHeaderSize, 4 );
280 for( iBuf = 0; iBuf < iHeaderBlocks; iBuf++ )
281 {
282 sStore.write( &iBuf, 4 );
283 }
284
285 this->iBlockSize = iBlockSize;
286 this->iBlocks = iPreAllocate;
287
288 pStr->setSize( sStore.tell() );
289// sio << "Myriad: Actual end of header stream = " << pStr->iSize << sio.nl;
290
291 pStr->setSize( iHeaderSize );
292 for( int j = 0; j < iHeaderBlocks; j++ )
293 {
294// sio << "Started block " << j << " is header." << sio.nl;
295 pStr->aBlocks.append( j );
296// bsBlockUsed.setBit( j );
297 iUsed++;
298 }
299 for( int j = iHeaderBlocks; j < this->iBlocks; j++ )
300 {
301// sio << "Started block " << j << " is free." << sio.nl;
302 lFreeBlocks.append( j );
303 }
304
305 aStreams.append( pStr );
306
307 //sio << bsBlockUsed.toString() << " - " << pStr->aBlocks << sio.nl;
308
309 bHeaderChanged = true;
310 //hStreams.insert( 0, BlockArray( 0 ) );
311 TRACE("mHeader unlocked...");
312}
313
314void Bu::Myriad::Stream::setSize( int iNewSize )
315{
316 MutexLocker l( mStream );
317 iSize = iNewSize;
318}
319
320void Bu::Myriad::Stream::growTo( int iNewSize )
321{
322 MutexLocker l( mStream );
323 if( iNewSize < iSize )
324 return;
325 iSize = iNewSize;
326} 76}
327 77
328int Bu::Myriad::Stream::getSize() const 78bool Bu::Myriad::setSize( Bu::Myriad::StreamId /*iStream*/,
79 int32_t /*iNewSize*/ )
329{ 80{
330 MutexLocker l( mStream ); 81 return false;
331 return iSize;
332} 82}
333 83
334void Bu::Myriad::updateHeader() 84bool Bu::Myriad::loadMyriad()
335{ 85{
336 MutexLocker mLock( mHeader ); 86 Bu::println("Load myriad!");
337 TRACE("mHeader locked."); 87 char sMagicCode[4];
338 88 rBacking.setPos( 0 );
339 if( bHeaderChanged == false ) 89 MyriadRead( sMagicCode, 4 );
90 if( memcmp( sMagicCode, Myriad_MAGIC_CODE, 4 ) )
340 { 91 {
341 TRACE("mHeader unlocked..."); 92 throw Bu::MyriadException( Bu::MyriadException::invalidFormat,
342 return; 93 "Backing stream does not seem to be a Myriad structure.");
343 } 94 }
344 if( !sStore.canWrite() ) 95 uint8_t uVer;
96 uint8_t uBitsPerInt;
97 MyriadRead( &uVer, 1 );
98 if( uVer != 1 )
345 { 99 {
346 TRACE("mHeader unlocked..."); 100 throw Bu::MyriadException( Bu::MyriadException::invalidFormat,
347 return; 101 "Only version 1 myriad structures are supported.");
348 } 102 }
349 103 MyriadRead( &uBitsPerInt, 1 );
350 char cBuf; 104 if( uBitsPerInt != 32 )
351 int iBuf;
352
353 //for( StreamArray::iterator i = aStreams.begin(); i; i++ )
354 //{
355 // sio << "Myriad: Stream " << Fmt(4) << (*i)->iId << ": " << (*i)->aBlocks << sio.nl;
356 //}
357
358 // Compute the new size of the header.
359 int iHeaderSize = 14 + 8*aStreams.getSize();
360// sio << "Myriad: updateHeader: aStreams.getSize() = " << aStreams.getSize()
361// << sio.nl;
362 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
363 { 105 {
364 iHeaderSize += 4*(*i)->aBlocks.getSize(); 106 throw Bu::MyriadException( Bu::MyriadException::invalidFormat,
365// sio << "Myriad: updateHeader: (*i)->aBlocks.getSize() = " 107 "Only 32 bits per int are supported at this time.");
366// << (*i)->aBlocks.getSize() << sio.nl;
367 } 108 }
368 int iNewBlocks = blkDiv( iHeaderSize, iBlockSize ); 109 MyriadRead( &iBlockSize, 4 );
369 while( iNewBlocks > aStreams[0]->aBlocks.getSize() ) 110 int iStreamCount;
111 MyriadRead( &iStreamCount, 4 );
112
113 //
114 // Read stream data -- Bootstrap the zero stream
115 //
116 StreamId iStream;
117 MyriadRead( &iStream, 4 );
118 if( iStream != 0 )
370 { 119 {
371 int iBlock = findEmptyBlock(); 120 throw Bu::MyriadException( Bu::MyriadException::invalidFormat,
372// sio << "Myriad: updateHeader: Appending block " << iBlock 121 "The first stream defined must be the header/zero stream.");
373// << " to header." << sio.nl;
374 aStreams[0]->aBlocks.append( iBlock );
375// bsBlockUsed.setBit( iBlock );
376 iUsed++;
377 iHeaderSize += 4;
378 iNewBlocks = blkDiv( iHeaderSize, iBlockSize );
379 } 122 }
380 aStreams[0]->setSize( iHeaderSize ); 123 int32_t iHeaderStreamBytes;
381// sio << "Myriad: updateHeader: iHeaderSize=" << iHeaderSize 124 MyriadRead( &iHeaderStreamBytes, 4 );
382// << ", iNewBlocks=" << iNewBlocks << ", curBlocks="
383// << aStreams[0]->aBlocks.getSize() << sio.nl;
384 125
385 MyriadStream sHdr( *this, aStreams[0] ); 126 Stream *pHeaderStream = new Stream( *this, iStream, iHeaderStreamBytes );
386 sHdr.write( Myriad_MAGIC_CODE, 4 ); 127 int iHeaderStreamBlocks = blkDiv(iHeaderStreamBytes+4, iBlockSize );
387 128
388 // Version (1) 129 while( iHeaderStreamBytes+(iHeaderStreamBlocks*4)
389 cBuf = 1; 130 > iHeaderStreamBlocks*iBlockSize )
390 sHdr.write( &cBuf, 1 );
391
392 // Bits per int
393 cBuf = 32;
394 sHdr.write( &cBuf, 1 );
395
396 // The size of each block
397 sHdr.write( &iBlockSize, 4 );
398
399 iBuf = aStreams.getSize();
400 // The number of streams
401 sHdr.write( &iBuf, 4 );
402
403 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
404 { 131 {
405 int iSizeTmp; 132 iHeaderStreamBlocks = blkDiv(
406 sHdr.write( &(*i)->iId, 4 ); 133 (iHeaderStreamBytes+((iHeaderStreamBlocks+1)*4)), iBlockSize
407 sHdr.write( &iSizeTmp, 4 ); 134 );
408 (*i)->setSize( iSizeTmp );
409 int iUsedBlocks = blkDiv( iSizeTmp, iBlockSize );
410// for( BlockArray::iterator j = (*i)->aBlocks.begin(); j; j++ )
411 for( int j = 0; j < iUsedBlocks; j++ )
412 {
413 sHdr.write( &(*i)->aBlocks[j], 4 );
414 }
415 } 135 }
416
417 bHeaderChanged = false;
418 TRACE("mHeader unlocked...");
419}
420
421int Bu::Myriad::createStream( int iPreAllocate )
422{
423 MutexLocker mLock( mHeader );
424 TRACE("mHeader locked.");
425
426 Stream *pStr = new Stream();
427 pStr->iId = aStreams.last()->iId+1;
428 //sio << "Myriad: New stream id=" << pStr->iId << ", iPreAllocate="
429 // << iPreAllocate << sio.nl;
430 pStr->setSize( 0 );
431 aStreams.append( pStr );
432 136
433 for( int j = 0; j < iPreAllocate; j++ ) 137 for( int32_t j = 0; j < iHeaderStreamBlocks; j++ )
434 { 138 {
435 int iFreeBlock = findEmptyBlock(); 139 int32_t iBlockIndex;
436// sio << "Myriad: Adding block " << iFreeBlock << sio.nl; 140 MyriadRead( &iBlockIndex, 4 );
437 pStr->aBlocks.append( iFreeBlock ); 141 pHeaderStream->aBlocks.append( iBlockIndex );
438// bsBlockUsed.setBit( iFreeBlock );
439 iUsed++;
440 } 142 }
441 143
442 bHeaderChanged = true; 144 // Bootstrap now using the header stream to read the rest of the data.
443 145
444 TRACE("mHeader unlocked..."); 146 return true;
445 return pStr->iId;
446} 147}
447 148
448int Bu::Myriad::createStreamWithId( int iId, int iPreAllocate ) 149void Bu::Myriad::createMyriad( int32_t iBlockSize, int32_t iPreallocateBlocks )
449{ 150{
450 MutexLocker mLock( mHeader ); 151 if( iBlockSize < 8 )
451 TRACE("mHeader locked.");
452
453 try
454 { 152 {
455 findStream( iId ); 153 throw Bu::MyriadException( Bu::MyriadException::invalidParameter,
456 TRACE("mHeader unlocked..."); 154 "iBlockSize cannot be below 8");
457 throw MyriadException( MyriadException::streamExists,
458 "There is already a stream with the given id.");
459 } 155 }
460 catch( MyriadException &e ) 156 if( rBacking.getSize() )
461 { 157 {
462 Stream *pStr = new Stream(); 158 throw Bu::MyriadException( Bu::MyriadException::invalidFormat,
463 pStr->iId = iId; 159 "Backing stream contains data, but not a myriad structure.");
464 //sio << "Myriad: New stream id=" << pStr->iId << ", iPreAllocate="
465 // << iPreAllocate << sio.nl;
466 pStr->setSize( 0 );
467 if( aStreams.last()->iId < iId )
468 {
469 aStreams.append( pStr );
470 }
471 else
472 {
473 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
474 {
475 if( (*i)->iId > iId )
476 {
477 aStreams.insert( i, pStr );
478 break;
479 }
480 }
481 }
482
483 for( int j = 0; j < iPreAllocate; j++ )
484 {
485 int iFreeBlock = findEmptyBlock();
486 // sio << "Myriad: Adding block " << iFreeBlock << sio.nl;
487 pStr->aBlocks.append( iFreeBlock );
488// bsBlockUsed.setBit( iFreeBlock );
489 iUsed++;
490 }
491
492 bHeaderChanged = true;
493
494 TRACE("mHeader unlocked...");
495 return pStr->iId;
496 }
497 TRACE("mHeader unlocked...");
498} 160}
499 161/*
500int Bu::Myriad::findEmptyBlock() 162 struct {
501{ 163 char sMagicCode[4];
502 bHeaderChanged = true; 164 uint8_t uVer;
503 165 uint8_t uBitsPerInt;
504 if( lFreeBlocks.isEmpty() ) 166 uint32_t uBlockSize;
505 { 167 uint32_t uStreamCount;
506 sStore.setSize( (iBlocks+1)*(Bu::size)iBlockSize ); 168 } sHeader;
507 return iBlocks++; 169
170 struct {
171 uint32_t uStreamId;
172 uint32_t uStreamSize;
173 } sStreamHeader;
174
175 Bu::println("sHeader = %1, sStreamHeader = %2").arg( sizeof(sHeader) ).arg( sizeof(sStreamHeader) );
176*/
177
178 // Start with the bytes for the file header and initial stream header
179 int iHeaderStreamBytes
180 = 14 // Base header
181 + 8; // Stream header
182
183 // Pick the block count that matches our current estimate for the header
184 // plus one block index.
185 int iHeaderStreamBlocks = blkDiv(iHeaderStreamBytes+4, iBlockSize );
186
187 Bu::println("Initial estimate: %1 bytes / %2 cur blocks, %3 computed blocks (%4 target bytes).")
188 .arg( iHeaderStreamBytes+(iHeaderStreamBlocks*4) )
189 .arg( iHeaderStreamBlocks )
190 .arg( blkDiv((iHeaderStreamBytes+(iHeaderStreamBlocks*4)), iBlockSize) )
191 .arg( iHeaderStreamBlocks*iBlockSize );
192 while( iHeaderStreamBytes+(iHeaderStreamBlocks*4)
193 > iHeaderStreamBlocks*iBlockSize )
194 {
195 iHeaderStreamBlocks = blkDiv((iHeaderStreamBytes+((iHeaderStreamBlocks+1)*4)), iBlockSize);
196 if( iHeaderStreamBlocks > 100 )
197 break;
198 Bu::println(" Adjustment: %1 bytes / %2 cur blocks, %3 computed blocks (%4 target bytes).")
199 .arg( iHeaderStreamBytes+(iHeaderStreamBlocks*4) )
200 .arg( iHeaderStreamBlocks )
201 .arg( blkDiv((iHeaderStreamBytes+(iHeaderStreamBlocks*4)), iBlockSize) )
202 .arg( iHeaderStreamBlocks*iBlockSize );
203 }
204
205 if( iPreallocateBlocks > iHeaderStreamBlocks )
206 {
207 rBacking.setSize( iBlockSize*iPreallocateBlocks );
508 } 208 }
509 else 209 else
510 { 210 {
511 return lFreeBlocks.dequeue(); 211 rBacking.setSize( iBlockSize*iHeaderStreamBlocks );
512 } 212 }
513}
514 213
515void Bu::Myriad::deleteStream( int iId ) 214 //
516{ 215 // Write Myriad header
517 MutexLocker mLock( mHeader ); 216 //
518 TRACE("mHeader locked."); 217 uint8_t uVer = 1;
218 uint8_t uBpi = 32;
219 int32_t iStreamCount = 1;
220 rBacking.setPos( 0 );
221 rBacking.write( Myriad_MAGIC_CODE, 4 );
222 rBacking.write( &uVer, 1 );
223 rBacking.write( &uBpi, 1 );
224 rBacking.write( &iBlockSize, 4 );
225 rBacking.write( &iStreamCount, 4 );
519 226
520 if( iId < 0 ) 227 Stream *pHeadStream = new Stream( *this, 0, Bu::Myriad::ReadWrite );
228 //
229 // Write stream header
230 //
231 uint32_t uStreamId = 0;
232 uint32_t uStreamSize = iHeaderStreamBytes+iHeaderStreamBlocks*4;
233 rBacking.write( &uStreamId, 4 );
234 rBacking.write( &uStreamSize, 4 );
235 for( int iBlockIndex = 0; iBlockIndex < iHeaderStreamBlocks; iBlockIndex++ )
521 { 236 {
522 TRACE("mHeader unlocked..."); 237 rBacking.write( &iBlockIndex, 4 );
523 throw MyriadException( MyriadException::invalidStreamId, 238 pHeadStream->aBlocks.append( iBlockIndex );
524 "Invalid stream id.");
525 } 239 }
526 if( iId == 0 ) 240 rBacking.flush();
527 {
528 TRACE("mHeader unlocked...");
529 throw MyriadException( MyriadException::protectedStream,
530 "You cannot delete stream zero, it is protected.");
531 }
532 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
533 {
534 if( (*i)->iId == iId )
535 {
536 Stream *pStream = *i;
537 for( BlockArray::iterator j = pStream->aBlocks.begin(); j; j++ )
538 {
539 lFreeBlocks.append( *j );
540// bsBlockUsed.setBit( *j, false );
541 iUsed--;
542 }
543 aStreams.erase( i );
544 bHeaderChanged = true;
545 delete pStream;
546 TRACE("mHeader unlocked...");
547 return;
548 }
549 }
550 TRACE("mHeader unlocked...");
551}
552 241
553Bu::Array<int> Bu::Myriad::getStreamIds() 242 hStream.insert( pHeadStream->iStream, pHeadStream );
554{
555 MutexLocker mLock( mHeader );
556 TRACE("mHeader locked.");
557 243
558 Bu::Array<int> aRet( aStreams.getSize() ); 244 for( int32_t j = iHeaderStreamBlocks; j < iPreallocateBlocks; j++ )
559 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
560 { 245 {
561 aRet.append( (*i)->iId ); 246 lFreeBlocks.append( j );
562 } 247 }
563
564 TRACE("mHeader unlocked...");
565 return aRet;
566}
567
568int Bu::Myriad::getStreamSize( int iId )
569{
570 MutexLocker mLock( mHeader );
571 TRACE("mHeader locked.");
572
573 TRACE("mHeader unlocked...");
574 return findStream( iId )->getSize();
575} 248}
576 249
577bool Bu::Myriad::hasStream( int iId ) 250int32_t Bu::Myriad::allocateBlock()
578{ 251{
579 MutexLocker mLock( mHeader ); 252 Bu::MutexLocker l( mAccess );
580 TRACE("mHeader locked."); 253 if( lFreeBlocks.isEmpty() )
581
582 try
583 {
584 findStream( iId );
585 TRACE("mHeader unlocked...");
586 return true;
587 }catch(...)
588 { 254 {
589 TRACE("mHeader unlocked..."); 255 // Increase the size of the backing stream
590 return false; 256 int32_t iIndex = iBlockCount++;
257 rBacking.setSize( iBlockCount*iBlockSize );
258 return iIndex;
591 } 259 }
592} 260 else
593
594Bu::MyriadStream Bu::Myriad::openStream( int iId )
595{
596 MutexLocker mLock( mHeader );
597 TRACE("mHeader locked.");
598
599 TRACE("mHeader unlocked...");
600 //sio << "Myriad: Request to open stream: " << iId << sio.nl;
601 return MyriadStream( *this, findStream( iId ) );
602}
603
604int Bu::Myriad::getNumStreams()
605{
606 MutexLocker mLock( mHeader );
607 TRACE("mHeader locked.");
608
609 TRACE("mHeader unlocked...");
610 return aStreams.getSize();
611}
612
613int Bu::Myriad::getBlockSize()
614{
615 return iBlockSize;
616}
617
618int Bu::Myriad::getNumBlocks()
619{
620 return iBlocks;
621}
622
623int Bu::Myriad::getNumUsedBlocks()
624{
625 return iUsed;
626}
627
628Bu::size Bu::Myriad::getTotalUsedBytes()
629{
630 MutexLocker mLock( mHeader );
631 TRACE("mHeader locked.");
632
633 Bu::size iTotalSize = 0;
634 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
635 { 261 {
636 iTotalSize += (*i)->getSize(); 262 // Provide an existing free block.
263 return lFreeBlocks.peekPop();
637 } 264 }
638 TRACE("mHeader unlocked...");
639 return iTotalSize;
640} 265}
641 266
642Bu::size Bu::Myriad::getTotalUnusedBytes() 267void Bu::Myriad::openStream( StreamId id )
643{ 268{
644 MutexLocker mLock( mHeader ); 269 Bu::MutexLocker l( mAccess );
645 TRACE("mHeader locked."); 270 hStream.get( id )->open();
646
647 Bu::size iTotalSize = (iBlocks-iUsed)*iBlockSize;
648 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
649 {
650 iTotalSize += iBlockSize - ((Bu::size)(*i)->getSize()%iBlockSize);
651 }
652 TRACE("mHeader unlocked...");
653 return iTotalSize;
654} 271}
655 272
656Bu::size Bu::Myriad::getTotalUnusedBytes( int iFakeBlockSize ) 273void Bu::Myriad::closeStream( StreamId id )
657{ 274{
658 MutexLocker mLock( mHeader ); 275 Bu::MutexLocker l( mAccess );
659 TRACE("mHeader locked."); 276 hStream.get( id )->close();
660
661 Bu::size iTotalSize = (iBlocks-iUsed)*iFakeBlockSize;
662 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
663 {
664 iTotalSize += iFakeBlockSize - ((*i)->getSize()%iFakeBlockSize);
665 }
666 TRACE("mHeader unlocked...");
667 return iTotalSize;
668} 277}
669 278
670Bu::Myriad::Stream *Bu::Myriad::findStream( int iId ) 279int32_t Bu::Myriad::blockRead( int32_t iStart, void *pTarget, int32_t iSize )
671{ 280{
672 for( StreamArray::iterator i = aStreams.begin(); i; i++ ) 281 int32_t iUpperSize = iBlockSize - (iStart%iBlockSize);
673 { 282 Bu::println("Max size within block: %1 vs %2 (start=%3, blocksize=%4)")
674 if( (*i)->iId == iId ) 283 .arg( iUpperSize ).arg( iSize )
675 return *i; 284 .arg( iStart ).arg( iBlockSize );
676 }
677 285
678 throw MyriadException( MyriadException::noSuchStream, 286 int32_t iAmnt = std::min( iSize, iUpperSize );
679 "The requested stream doesn't exist and cannot be opened." ); 287 Bu::MutexLocker l( mBacking );
288 rBacking.setPos( iStart );
680 289
681 return NULL; 290 return rBacking.read( pTarget, iAmnt );
682} 291}
683 292
684Bu::Myriad::Block *Bu::Myriad::getBlock( int iBlock ) 293/////////
685{ 294// Bu::Myriad::Stream
686// sio << "Myriad: Reading block " << iBlock << ", bytes " 295//
687// << iBlockSize*iBlock << "-" << iBlockSize*(iBlock+1) << sio.nl;
688 Block *pBlock = new Block;
689 pBlock->pData = new char[iBlockSize];
690 sStore.setPos( iBlockSize * (Bu::size)iBlock );
691 sStore.read( pBlock->pData, iBlockSize );
692 pBlock->bChanged = false;
693 pBlock->iBlockIndex = iBlock;
694
695 mActiveBlocks.lock();
696 TRACE("mHeader locked.");
697 hActiveBlocks.insert( iBlock, pBlock );
698 TRACE("mHeader unlocked...");
699 mActiveBlocks.unlock();
700
701 return pBlock;
702}
703 296
704void Bu::Myriad::releaseBlock( Bu::Myriad::Block *pBlock ) 297Bu::Myriad::Stream::Stream( Bu::Myriad &rParent, Bu::Myriad::StreamId iStream,
298 int32_t iSize ) :
299 rParent( rParent ),
300 iStream( iStream ),
301 iSize( iSize ),
302 iOpenCount( 0 ),
303 bStructureChanged( false )
705{ 304{
706 if( pBlock == NULL )
707 return;
708// sio << "Myriad: Releasing block " << pBlock->iBlockIndex << sio.nl;
709 syncBlock( pBlock );
710 mActiveBlocks.lock();
711 TRACE("mHeader locked.");
712 hActiveBlocks.erase( pBlock->iBlockIndex );
713 TRACE("mHeader unlocked...");
714 mActiveBlocks.unlock();
715
716 delete[] pBlock->pData;
717 delete pBlock;
718} 305}
719 306
720void Bu::Myriad::syncBlock( Block *pBlock ) 307Bu::Myriad::Stream::~Stream()
721{ 308{
722 if( pBlock->bChanged )
723 {
724// sio << "Myriad: - Block changed, writing back to stream." << sio.nl;
725 sStore.setPos( iBlockSize * (Bu::size)pBlock->iBlockIndex );
726 sStore.write( pBlock->pData, iBlockSize );
727 pBlock->bChanged = false;
728 }
729}
730
731int Bu::Myriad::streamAddBlock( Stream *pStream )
732{
733 MutexLocker mLock( mHeader );
734 TRACE("mHeader locked.");
735
736 int iBlock = findEmptyBlock();
737 pStream->aBlocks.append( iBlock );
738// bsBlockUsed.setBit( iBlock );
739// bHeaderChanged = true;
740 iUsed++;
741 TRACE("mHeader unlocked...");
742 return iBlock;
743} 309}
744 310
745void Bu::Myriad::setStreamSize( Stream *pStream, long iSize ) 311int32_t Bu::Myriad::Stream::read( int32_t iStart, void *pTarget,
312 int32_t iSize )
746{ 313{
747 MutexLocker mLock( mHeader ); 314 int32_t iPos = iStart;
748 TRACE("mHeader locked."); 315 int32_t iRead = 0;
749 316 Bu::MutexLocker l( mAccess );
750 if( pStream->getSize() == iSize ) 317 while( iStart > 0 )
751 {
752 TRACE("mHeader unlocked...");
753 return;
754 }
755 else if( pStream->getSize() > iSize )
756 { 318 {
757 // Shrink 319 int32_t iBlock = aBlocks[iStart/rParent.iBlockSize];
758 TRACE(Bu::String("Shrink stream %1 from %2 to %3").arg(pStream->iId).arg(pStream->getSize()).arg(iSize).end() ); 320 int32_t iOffset = iPos % rParent.iBlockSize;
759 for( int iNewSize = pStream->aBlocks.getSize()*iBlockSize; 321 int32_t iChunkRead = rParent.blockRead(
760 iNewSize-iBlockSize > iSize; iNewSize -= iBlockSize ) 322 iBlock*rParent.iBlockSize+iOffset, pTarget, iSize
761 { 323 );
762// if( bsBlockUsed.getBit( pStream->aBlocks.last() ) ) 324 if( iChunkRead == 0 )
763 iUsed--; 325 break;
764// else 326 iRead += iChunkRead;
765// sio << "Unused block used in stream? " << pStream->aBlocks.last() << sio.nl; 327 reinterpret_cast<ptrdiff_t &>(pTarget) += iChunkRead;
766 lFreeBlocks.enqueue( pStream->aBlocks.last() ); 328 iSize -= iChunkRead;
767// bsBlockUsed.setBit( pStream->aBlocks.last(), false );
768 pStream->aBlocks.eraseLast();
769 }
770 pStream->setSize( iSize );
771 bHeaderChanged = true;
772 } 329 }
773 else
774 {
775 // Grow
776 TRACE(Bu::String("Grow stream %1 from %2 to %3").arg(pStream->iId).arg(pStream->getSize()).arg(iSize).end() );
777 for( int iNewSize = pStream->aBlocks.getSize()*iBlockSize;
778 iNewSize < iSize; iNewSize += iBlockSize )
779 {
780 //streamAddBlock( pStream );
781 int iBlock = findEmptyBlock();
782 pStream->aBlocks.append( iBlock );
783// bsBlockUsed.setBit( iBlock );
784// bHeaderChanged = true;
785 iUsed++;
786 }
787 pStream->setSize( iSize );
788 bHeaderChanged = true;
789 }
790 TRACE("mHeader unlocked...");
791}
792 330
793void Bu::Myriad::headerChanged() 331 return iRead;
794{
795 bHeaderChanged = true;
796} 332}
797 333
798bool Bu::Myriad::isMyriad( Bu::Stream &sStore ) 334void Bu::Myriad::Stream::open()
799{ 335{
800 uint8_t uTmp; 336 Bu::MutexLocker l( mAccess );
801 337 iOpenCount++;
802 return isMyriad( sStore, uTmp );
803}
804
805bool Bu::Myriad::isMyriad( Bu::Stream &sStore, uint8_t &uTmp )
806{
807 sStore.setPos( 0 );
808
809 unsigned char buf[4];
810 if( sStore.read( buf, 4 ) < 4 )
811 throw MyriadException( MyriadException::emptyStream,
812 "Input stream appears to be empty.");
813 sStore.read( &uTmp, 1 );
814 sStore.setPos( 0 );
815 if( memcmp( buf, Myriad_MAGIC_CODE, 4 ) )
816 {
817 return false;
818 }
819 return true;
820} 338}
821 339
822const Bu::BitString Bu::Myriad::getBlocksUsed() const 340bool Bu::Myriad::Stream::close()
823{ 341{
824 Bu::BitString bs( iBlocks, false ); 342 Bu::MutexLocker l( mAccess );
825 for( int j = 0; j < iBlocks; j++ ) 343 return (bool)(--iOpenCount);
826 bs.setBit( j );
827 for( IndexList::const_iterator i = lFreeBlocks.begin(); i; i++ )
828 bs.setBit( *i, false );
829 return bs;
830} 344}
831 345