aboutsummaryrefslogtreecommitdiff
path: root/src/stable
diff options
context:
space:
mode:
authorMike Buland <mike@xagasoft.com>2024-08-27 13:37:36 -0700
committerMike Buland <mike@xagasoft.com>2024-08-27 13:37:36 -0700
commitf1e3f25d9b7a12cdedb99e4cb0bfa66157a1a972 (patch)
treec8414b8040cdcd38bd98471d96a01908cdef49ad /src/stable
parentcaee572ff94822ca2ed354fcb79ca04ed9adf388 (diff)
downloadlibbu++-f1e3f25d9b7a12cdedb99e4cb0bfa66157a1a972.tar.gz
libbu++-f1e3f25d9b7a12cdedb99e4cb0bfa66157a1a972.tar.bz2
libbu++-f1e3f25d9b7a12cdedb99e4cb0bfa66157a1a972.tar.xz
libbu++-f1e3f25d9b7a12cdedb99e4cb0bfa66157a1a972.zip
Making progress.
Diffstat (limited to '')
-rw-r--r--src/stable/myriad.cpp930
-rw-r--r--src/stable/myriad.h287
-rw-r--r--src/stable/myriadstream.cpp257
-rw-r--r--src/stable/myriadstream.h49
4 files changed, 372 insertions, 1151 deletions
diff --git a/src/stable/myriad.cpp b/src/stable/myriad.cpp
index c606369..5278ac5 100644
--- a/src/stable/myriad.cpp
+++ b/src/stable/myriad.cpp
@@ -1,25 +1,18 @@
1/*
2 * Copyright (C) 2007-2023 Xagasoft, All rights reserved.
3 *
4 * This file is part of the libbu++ library and is released under the
5 * terms of the license contained in the file LICENSE.
6 */
7
8#include "bu/config.h"
9#include "bu/myriad.h" 1#include "bu/myriad.h"
10#include "bu/stream.h"
11#include "bu/myriadstream.h" 2#include "bu/myriadstream.h"
3
12#include "bu/mutexlocker.h" 4#include "bu/mutexlocker.h"
13#include <stdio.h> 5#include "bu/util.h"
14 6
15#include "bu/sio.h" 7#include "bu/sio.h"
16using Bu::sio;
17using Bu::Fmt;
18 8
19#define Myriad_MAGIC_CODE ((unsigned char *)"\x0a\xd3\xfa\x84") 9#define Myriad_MAGIC_CODE ((unsigned char *)"\x0a\xd3\xfa\x84")
20 10
21// #define TRACE( x ) Bu::println("%1:%2: %3: %4 - %5").arg(__FILE__).arg( __LINE__ ).arg(__PRETTY_FUNCTION__).arg(sStore.getLocation()).arg(x) 11#define MyriadRead( target, size ) if( rBacking.read( target, size ) < size ) \
22#define TRACE( x ) (void)0 12{ \
13 throw Bu::MyriadException( Bu::MyriadException::invalidFormat, \
14 "Insufficent data reading myriad data from backing stream."); \
15} (void)0
23 16
24namespace Bu 17namespace Bu
25{ 18{
@@ -29,803 +22,324 @@ namespace Bu
29 } 22 }
30} 23}
31 24
32Bu::Myriad::Myriad( Bu::Stream &sStore, int iBlockSize, int iPreallocate ) : 25Bu::Myriad::Myriad( Bu::Stream &rBacking, int iBlockSize,
33 sStore( sStore ), 26 int iPreallocateBlocks ) :
27 rBacking( rBacking ),
34 iBlockSize( iBlockSize ), 28 iBlockSize( iBlockSize ),
35 iBlocks( 0 ), 29 iBlockCount( 0 ),
36 iUsed( 0 ), 30 bIsNewStream( true )
37 bHeaderChanged( false )
38{ 31{
39 try 32 if( !rBacking.isSeekable() )
40 { 33 {
41 initialize(); 34 throw Bu::MyriadException( Bu::MyriadException::invalidBackingStream,
35 "Myriad backing stream must be random access (seekable).");
42 } 36 }
43 catch( Bu::MyriadException &e ) 37 if( !loadMyriad() )
44 { 38 {
45 if( e.getErrorCode() == MyriadException::emptyStream ) 39 createMyriad( iBlockSize, iPreallocateBlocks );
46 {
47 initialize( iBlockSize, iPreallocate );
48 }
49 else
50 {
51 throw;
52 }
53 } 40 }
54} 41}
55 42
56Bu::Myriad::~Myriad() 43Bu::Myriad::~Myriad()
57{ 44{
58 mActiveBlocks.lock();
59 TRACE("mActiveBlocks locked.");
60 if( !hActiveBlocks.isEmpty() )
61 {
62 sio << "Bu::Myriad::~Myriad(): Error: There are "
63 << hActiveBlocks.getSize() << " unsynced blocks!" << sio.nl;
64 }
65 TRACE("mActiveBlocks unlocking...");
66 mActiveBlocks.unlock();
67 sync();
68
69 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
70 {
71 delete *i;
72 }
73} 45}
74 46
75void Bu::Myriad::sync() 47Bu::MyriadStream Bu::Myriad::create( Bu::Myriad::Mode /*eMode*/,
48 int32_t /*iPreallocateBytes*/ )
76{ 49{
77 updateHeader(); 50 return Bu::MyriadStream( *this, NULL, (Mode)0 );
78
79 mActiveBlocks.lock();
80 TRACE("mActiveBlocks locked.");
81 for( BlockHash::iterator i = hActiveBlocks.begin(); i; i++ )
82 {
83 if( (*i)->bChanged )
84 {
85 syncBlock( *i );
86 }
87 }
88 TRACE("mActiveBlocks unlocked...");
89 mActiveBlocks.unlock();
90} 51}
91 52
92void Bu::Myriad::initialize() 53Bu::MyriadStream Bu::Myriad::open( Bu::Myriad::StreamId iStream,
54 Bu::Myriad::Mode eMode )
93{ 55{
94 MutexLocker mLock( mHeader ); 56 Bu::MutexLocker l( mAccess );
95 TRACE("mHeader locked."); 57 if( !hStream.has( iStream ) )
96 lFreeBlocks.clear();
97 sStore.setPosEnd( 0 );
98 Bu::size iSize = sStore.tell();
99 sStore.setPos( 0 );
100
101 unsigned char buf[4];
102 if( sStore.read( buf, 4 ) < 4 )
103 {
104 TRACE("mHeader unlocked...");
105 throw MyriadException( MyriadException::emptyStream,
106 "Input stream appears to be empty.");
107 }
108 if( memcmp( buf, Myriad_MAGIC_CODE, 4 ) )
109 {
110 TRACE("mHeader unlocked...");
111 throw MyriadException( MyriadException::invalidFormat,
112 "Stream does not appear to be a valid Myriad format.");
113 }
114 sStore.read( buf, 2 );
115 if( buf[0] != 1 )
116 {
117 TRACE("mHeader unlocked...");
118 throw MyriadException( MyriadException::badVersion,
119 "We can only handle version 1 for now.");
120 }
121 if( buf[1] != 32 )
122 {
123 TRACE("mHeader unlocked...");
124 throw MyriadException( MyriadException::invalidWordSize,
125 "We can only handle 32-bit words at the moment.");
126 }
127 sStore.read( &iBlockSize, 4 );
128 int iStreams;
129 sStore.read( &iStreams, 4 );
130
131 iBlocks = iSize/iBlockSize;
132 //sio << "Myriad: iSize=" << iSize << ", iBlockSize=" << iBlockSize
133 // << ", iBlocks=" << iBlocks << ", iStreams=" << iStreams << sio.nl;
134
135 int iHeaderSize = 14 + 8 + 4;
136 int iHeaderBlocks = 0; //blkDiv( iHeaderSize+4, iBlockSize );
137
138 while( iHeaderSize > iHeaderBlocks*iBlockSize )
139 {
140 iHeaderBlocks = blkDiv( iHeaderSize+4, iBlockSize );
141 iHeaderSize = 14 + 8 + 4*iHeaderBlocks;
142 }
143
144 //sio << "Myriad: iHeaderSize=" << iHeaderSize
145 // << ", iHeaderBlocks=" << iHeaderBlocks << sio.nl;
146
147 Stream *pFakeHdr = new Stream;
148 pFakeHdr->iId = 0;
149 pFakeHdr->setSize( iHeaderSize );
150 for( int j = 0; j < iHeaderBlocks; j++ )
151 {
152 pFakeHdr->aBlocks.append( j );
153 }
154
155// sio << "Blocks: " << iBlocks << " (size = " << iSize << "/" << iBlockSize
156// << ")" << sio.nl;
157 Bu::BitString bsBlockUsed( iBlocks, false );
158 bsBlockUsed.clear();
159
160// bool bCanSkip = false; // Can skip around, post initial header stream i/o
161 MyriadStream *pIn = new MyriadStream( *this, pFakeHdr );
162 pIn->setPos( sStore.tell() );
163 for( int j = 0; j < iStreams; j++ )
164 { 58 {
165 int iSizeTmp; 59 throw Bu::MyriadException( MyriadException::noSuchStream,
166 aStreams.append( new Stream() ); 60 "No such stream.");
167 Stream &s = *aStreams[j];
168 pIn->read( &s.iId, 4 );
169 pIn->read( &iSizeTmp, 4 );
170 s.setSize( iSizeTmp );
171 int iSBlocks = blkDiv(s.getSize(), iBlockSize);
172 // sio << "Myriad: - Stream::iId=" << s.iId
173 // << ", Stream::iSize=" << s.iSize
174 // << ", Stream::aBlocks=" << iSBlocks
175 // << ", pIn->tell()=" << pIn->tell() << sio.nl;
176 for( int k = 0; k < iSBlocks; k++ )
177 {
178 int iBId;
179 pIn->read( &iBId, 4 );
180 // sio << "Myriad: - iBId=" << iBId
181 // << ", iStartPos=" << iBId*iBlockSize
182 // << ", pIn->tell()=" << pIn->tell() << sio.nl;
183 s.aBlocks.append( iBId );
184 bsBlockUsed.setBit( iBId );
185 iUsed++;
186 if( (j == 0 && k == iHeaderBlocks-1) )
187 {
188 // sio << "Myriad: - End of prepartition, unlocking skipping."
189 // << sio.nl;
190// bCanSkip = true;
191 MyriadStream *pTmp = new MyriadStream( *this, aStreams[0] );
192 // sio << "Myriad - Position = " << pIn->tell() << sio.nl;
193 pTmp->setPos( pIn->tell() );
194 delete pIn;
195 delete pFakeHdr;
196 pIn = pTmp;
197 }
198 }
199 } 61 }
200 delete pIn;
201
202 for( int j = 0; j < iBlocks; j++ )
203 { 62 {
204 if( bsBlockUsed.getBit( j ) == false ) 63 Bu::MutexLocker l2( mBacking );
64 if( (eMode&Write) && rBacking.isWritable() )
205 { 65 {
206// sio << "Preinitialized block " << j << " is free." << sio.nl; 66 throw Bu::MyriadException( MyriadException::badMode,
207 lFreeBlocks.append( j ); 67 "Backing stream does not support writing.");
208 } 68 }
209 } 69 }
210// sio << "Myriad: Blocks used: " << bsBlockUsed.toString() << sio.nl; 70 return Bu::MyriadStream( *this, hStream.get( iStream ), eMode );
211 TRACE("mHeader unlocked...");
212} 71}
213 72
214void Bu::Myriad::initialize( int iBlockSize, int iPreAllocate ) 73bool Bu::Myriad::erase( Bu::Myriad::StreamId /*iStream*/ )
215{ 74{
216 MutexLocker mLock( mHeader ); 75 return false;
217 TRACE("mHeader locked.");
218 lFreeBlocks.clear();
219
220 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
221 {
222 delete *i;
223 }
224 aStreams.clear();
225 iUsed = 0;
226
227 int iHeaderSize = 14 + 8 + 4;
228 int iHeaderBlocks = 0; //blkDiv( iHeaderSize+4, iBlockSize );
229 char cBuf = 1;
230 int iBuf = 0;
231
232 Stream *pStr = new Stream;
233 pStr->iId = 0;
234
235 while( iHeaderSize > iHeaderBlocks*iBlockSize )
236 {
237 iHeaderBlocks = blkDiv( iHeaderSize+4, iBlockSize );
238 iHeaderSize = 14 + 8 + 4*iHeaderBlocks;
239 }
240
241 iPreAllocate += iHeaderBlocks;
242
243 //sio << "Myriad: iHeaderSize=" << iHeaderSize << ", iBlockSize="
244 // << iBlockSize << ", iHeaderBlocks=" << iHeaderBlocks << sio.nl;
245
246// bsBlockUsed.setSize( iPreAllocate, true );
247 iUsed++;
248
249 char *pBlock = new char[iBlockSize];
250 memset( pBlock, 0, iBlockSize );
251 for( int j = 0; j < iPreAllocate; j++ )
252 {
253 sStore.write( pBlock, iBlockSize );
254 }
255 delete[] (char *)pBlock;
256
257 sStore.setPos( 0 );
258
259 // Magic number
260 sStore.write( Myriad_MAGIC_CODE, 4 );
261
262 // Version (0)
263 sStore.write( &cBuf, 1 );
264
265 // Bits per int
266 cBuf = 32;
267 sStore.write( &cBuf, 1 );
268
269 // The size of each block
270 sStore.write( &iBlockSize, 4 );
271
272 iBuf = 1;
273 // The number of streams
274 sStore.write( &iBuf, 4 );
275
276 // Stream header
277 iBuf = 0;
278 sStore.write( &iBuf, 4 );
279 sStore.write( &iHeaderSize, 4 );
280 for( iBuf = 0; iBuf < iHeaderBlocks; iBuf++ )
281 {
282 sStore.write( &iBuf, 4 );
283 }
284
285 this->iBlockSize = iBlockSize;
286 this->iBlocks = iPreAllocate;
287
288 pStr->setSize( sStore.tell() );
289// sio << "Myriad: Actual end of header stream = " << pStr->iSize << sio.nl;
290
291 pStr->setSize( iHeaderSize );
292 for( int j = 0; j < iHeaderBlocks; j++ )
293 {
294// sio << "Started block " << j << " is header." << sio.nl;
295 pStr->aBlocks.append( j );
296// bsBlockUsed.setBit( j );
297 iUsed++;
298 }
299 for( int j = iHeaderBlocks; j < this->iBlocks; j++ )
300 {
301// sio << "Started block " << j << " is free." << sio.nl;
302 lFreeBlocks.append( j );
303 }
304
305 aStreams.append( pStr );
306
307 //sio << bsBlockUsed.toString() << " - " << pStr->aBlocks << sio.nl;
308
309 bHeaderChanged = true;
310 //hStreams.insert( 0, BlockArray( 0 ) );
311 TRACE("mHeader unlocked...");
312}
313
314void Bu::Myriad::Stream::setSize( int iNewSize )
315{
316 MutexLocker l( mStream );
317 iSize = iNewSize;
318}
319
320void Bu::Myriad::Stream::growTo( int iNewSize )
321{
322 MutexLocker l( mStream );
323 if( iNewSize < iSize )
324 return;
325 iSize = iNewSize;
326} 76}
327 77
328int Bu::Myriad::Stream::getSize() const 78bool Bu::Myriad::setSize( Bu::Myriad::StreamId /*iStream*/,
79 int32_t /*iNewSize*/ )
329{ 80{
330 MutexLocker l( mStream ); 81 return false;
331 return iSize;
332} 82}
333 83
334void Bu::Myriad::updateHeader() 84bool Bu::Myriad::loadMyriad()
335{ 85{
336 MutexLocker mLock( mHeader ); 86 Bu::println("Load myriad!");
337 TRACE("mHeader locked."); 87 char sMagicCode[4];
338 88 rBacking.setPos( 0 );
339 if( bHeaderChanged == false ) 89 MyriadRead( sMagicCode, 4 );
90 if( memcmp( sMagicCode, Myriad_MAGIC_CODE, 4 ) )
340 { 91 {
341 TRACE("mHeader unlocked..."); 92 throw Bu::MyriadException( Bu::MyriadException::invalidFormat,
342 return; 93 "Backing stream does not seem to be a Myriad structure.");
343 } 94 }
344 if( !sStore.canWrite() ) 95 uint8_t uVer;
96 uint8_t uBitsPerInt;
97 MyriadRead( &uVer, 1 );
98 if( uVer != 1 )
345 { 99 {
346 TRACE("mHeader unlocked..."); 100 throw Bu::MyriadException( Bu::MyriadException::invalidFormat,
347 return; 101 "Only version 1 myriad structures are supported.");
348 } 102 }
349 103 MyriadRead( &uBitsPerInt, 1 );
350 char cBuf; 104 if( uBitsPerInt != 32 )
351 int iBuf;
352
353 //for( StreamArray::iterator i = aStreams.begin(); i; i++ )
354 //{
355 // sio << "Myriad: Stream " << Fmt(4) << (*i)->iId << ": " << (*i)->aBlocks << sio.nl;
356 //}
357
358 // Compute the new size of the header.
359 int iHeaderSize = 14 + 8*aStreams.getSize();
360// sio << "Myriad: updateHeader: aStreams.getSize() = " << aStreams.getSize()
361// << sio.nl;
362 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
363 { 105 {
364 iHeaderSize += 4*(*i)->aBlocks.getSize(); 106 throw Bu::MyriadException( Bu::MyriadException::invalidFormat,
365// sio << "Myriad: updateHeader: (*i)->aBlocks.getSize() = " 107 "Only 32 bits per int are supported at this time.");
366// << (*i)->aBlocks.getSize() << sio.nl;
367 } 108 }
368 int iNewBlocks = blkDiv( iHeaderSize, iBlockSize ); 109 MyriadRead( &iBlockSize, 4 );
369 while( iNewBlocks > aStreams[0]->aBlocks.getSize() ) 110 int iStreamCount;
111 MyriadRead( &iStreamCount, 4 );
112
113 //
114 // Read stream data -- Bootstrap the zero stream
115 //
116 StreamId iStream;
117 MyriadRead( &iStream, 4 );
118 if( iStream != 0 )
370 { 119 {
371 int iBlock = findEmptyBlock(); 120 throw Bu::MyriadException( Bu::MyriadException::invalidFormat,
372// sio << "Myriad: updateHeader: Appending block " << iBlock 121 "The first stream defined must be the header/zero stream.");
373// << " to header." << sio.nl;
374 aStreams[0]->aBlocks.append( iBlock );
375// bsBlockUsed.setBit( iBlock );
376 iUsed++;
377 iHeaderSize += 4;
378 iNewBlocks = blkDiv( iHeaderSize, iBlockSize );
379 } 122 }
380 aStreams[0]->setSize( iHeaderSize ); 123 int32_t iHeaderStreamBytes;
381// sio << "Myriad: updateHeader: iHeaderSize=" << iHeaderSize 124 MyriadRead( &iHeaderStreamBytes, 4 );
382// << ", iNewBlocks=" << iNewBlocks << ", curBlocks="
383// << aStreams[0]->aBlocks.getSize() << sio.nl;
384 125
385 MyriadStream sHdr( *this, aStreams[0] ); 126 Stream *pHeaderStream = new Stream( *this, iStream, iHeaderStreamBytes );
386 sHdr.write( Myriad_MAGIC_CODE, 4 ); 127 int iHeaderStreamBlocks = blkDiv(iHeaderStreamBytes+4, iBlockSize );
387 128
388 // Version (1) 129 while( iHeaderStreamBytes+(iHeaderStreamBlocks*4)
389 cBuf = 1; 130 > iHeaderStreamBlocks*iBlockSize )
390 sHdr.write( &cBuf, 1 );
391
392 // Bits per int
393 cBuf = 32;
394 sHdr.write( &cBuf, 1 );
395
396 // The size of each block
397 sHdr.write( &iBlockSize, 4 );
398
399 iBuf = aStreams.getSize();
400 // The number of streams
401 sHdr.write( &iBuf, 4 );
402
403 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
404 { 131 {
405 int iSizeTmp; 132 iHeaderStreamBlocks = blkDiv(
406 sHdr.write( &(*i)->iId, 4 ); 133 (iHeaderStreamBytes+((iHeaderStreamBlocks+1)*4)), iBlockSize
407 sHdr.write( &iSizeTmp, 4 ); 134 );
408 (*i)->setSize( iSizeTmp );
409 int iUsedBlocks = blkDiv( iSizeTmp, iBlockSize );
410// for( BlockArray::iterator j = (*i)->aBlocks.begin(); j; j++ )
411 for( int j = 0; j < iUsedBlocks; j++ )
412 {
413 sHdr.write( &(*i)->aBlocks[j], 4 );
414 }
415 } 135 }
416
417 bHeaderChanged = false;
418 TRACE("mHeader unlocked...");
419}
420
421int Bu::Myriad::createStream( int iPreAllocate )
422{
423 MutexLocker mLock( mHeader );
424 TRACE("mHeader locked.");
425
426 Stream *pStr = new Stream();
427 pStr->iId = aStreams.last()->iId+1;
428 //sio << "Myriad: New stream id=" << pStr->iId << ", iPreAllocate="
429 // << iPreAllocate << sio.nl;
430 pStr->setSize( 0 );
431 aStreams.append( pStr );
432 136
433 for( int j = 0; j < iPreAllocate; j++ ) 137 for( int32_t j = 0; j < iHeaderStreamBlocks; j++ )
434 { 138 {
435 int iFreeBlock = findEmptyBlock(); 139 int32_t iBlockIndex;
436// sio << "Myriad: Adding block " << iFreeBlock << sio.nl; 140 MyriadRead( &iBlockIndex, 4 );
437 pStr->aBlocks.append( iFreeBlock ); 141 pHeaderStream->aBlocks.append( iBlockIndex );
438// bsBlockUsed.setBit( iFreeBlock );
439 iUsed++;
440 } 142 }
441 143
442 bHeaderChanged = true; 144 // Bootstrap now using the header stream to read the rest of the data.
443 145
444 TRACE("mHeader unlocked..."); 146 return true;
445 return pStr->iId;
446} 147}
447 148
448int Bu::Myriad::createStreamWithId( int iId, int iPreAllocate ) 149void Bu::Myriad::createMyriad( int32_t iBlockSize, int32_t iPreallocateBlocks )
449{ 150{
450 MutexLocker mLock( mHeader ); 151 if( iBlockSize < 8 )
451 TRACE("mHeader locked.");
452
453 try
454 { 152 {
455 findStream( iId ); 153 throw Bu::MyriadException( Bu::MyriadException::invalidParameter,
456 TRACE("mHeader unlocked..."); 154 "iBlockSize cannot be below 8");
457 throw MyriadException( MyriadException::streamExists,
458 "There is already a stream with the given id.");
459 } 155 }
460 catch( MyriadException &e ) 156 if( rBacking.getSize() )
461 { 157 {
462 Stream *pStr = new Stream(); 158 throw Bu::MyriadException( Bu::MyriadException::invalidFormat,
463 pStr->iId = iId; 159 "Backing stream contains data, but not a myriad structure.");
464 //sio << "Myriad: New stream id=" << pStr->iId << ", iPreAllocate="
465 // << iPreAllocate << sio.nl;
466 pStr->setSize( 0 );
467 if( aStreams.last()->iId < iId )
468 {
469 aStreams.append( pStr );
470 }
471 else
472 {
473 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
474 {
475 if( (*i)->iId > iId )
476 {
477 aStreams.insert( i, pStr );
478 break;
479 }
480 }
481 }
482
483 for( int j = 0; j < iPreAllocate; j++ )
484 {
485 int iFreeBlock = findEmptyBlock();
486 // sio << "Myriad: Adding block " << iFreeBlock << sio.nl;
487 pStr->aBlocks.append( iFreeBlock );
488// bsBlockUsed.setBit( iFreeBlock );
489 iUsed++;
490 }
491
492 bHeaderChanged = true;
493
494 TRACE("mHeader unlocked...");
495 return pStr->iId;
496 }
497 TRACE("mHeader unlocked...");
498} 160}
499 161/*
500int Bu::Myriad::findEmptyBlock() 162 struct {
501{ 163 char sMagicCode[4];
502 bHeaderChanged = true; 164 uint8_t uVer;
503 165 uint8_t uBitsPerInt;
504 if( lFreeBlocks.isEmpty() ) 166 uint32_t uBlockSize;
505 { 167 uint32_t uStreamCount;
506 sStore.setSize( (iBlocks+1)*(Bu::size)iBlockSize ); 168 } sHeader;
507 return iBlocks++; 169
170 struct {
171 uint32_t uStreamId;
172 uint32_t uStreamSize;
173 } sStreamHeader;
174
175 Bu::println("sHeader = %1, sStreamHeader = %2").arg( sizeof(sHeader) ).arg( sizeof(sStreamHeader) );
176*/
177
178 // Start with the bytes for the file header and initial stream header
179 int iHeaderStreamBytes
180 = 14 // Base header
181 + 8; // Stream header
182
183 // Pick the block count that matches our current estimate for the header
184 // plus one block index.
185 int iHeaderStreamBlocks = blkDiv(iHeaderStreamBytes+4, iBlockSize );
186
187 Bu::println("Initial estimate: %1 bytes / %2 cur blocks, %3 computed blocks (%4 target bytes).")
188 .arg( iHeaderStreamBytes+(iHeaderStreamBlocks*4) )
189 .arg( iHeaderStreamBlocks )
190 .arg( blkDiv((iHeaderStreamBytes+(iHeaderStreamBlocks*4)), iBlockSize) )
191 .arg( iHeaderStreamBlocks*iBlockSize );
192 while( iHeaderStreamBytes+(iHeaderStreamBlocks*4)
193 > iHeaderStreamBlocks*iBlockSize )
194 {
195 iHeaderStreamBlocks = blkDiv((iHeaderStreamBytes+((iHeaderStreamBlocks+1)*4)), iBlockSize);
196 if( iHeaderStreamBlocks > 100 )
197 break;
198 Bu::println(" Adjustment: %1 bytes / %2 cur blocks, %3 computed blocks (%4 target bytes).")
199 .arg( iHeaderStreamBytes+(iHeaderStreamBlocks*4) )
200 .arg( iHeaderStreamBlocks )
201 .arg( blkDiv((iHeaderStreamBytes+(iHeaderStreamBlocks*4)), iBlockSize) )
202 .arg( iHeaderStreamBlocks*iBlockSize );
203 }
204
205 if( iPreallocateBlocks > iHeaderStreamBlocks )
206 {
207 rBacking.setSize( iBlockSize*iPreallocateBlocks );
508 } 208 }
509 else 209 else
510 { 210 {
511 return lFreeBlocks.dequeue(); 211 rBacking.setSize( iBlockSize*iHeaderStreamBlocks );
512 } 212 }
513}
514 213
515void Bu::Myriad::deleteStream( int iId ) 214 //
516{ 215 // Write Myriad header
517 MutexLocker mLock( mHeader ); 216 //
518 TRACE("mHeader locked."); 217 uint8_t uVer = 1;
218 uint8_t uBpi = 32;
219 int32_t iStreamCount = 1;
220 rBacking.setPos( 0 );
221 rBacking.write( Myriad_MAGIC_CODE, 4 );
222 rBacking.write( &uVer, 1 );
223 rBacking.write( &uBpi, 1 );
224 rBacking.write( &iBlockSize, 4 );
225 rBacking.write( &iStreamCount, 4 );
519 226
520 if( iId < 0 ) 227 Stream *pHeadStream = new Stream( *this, 0, Bu::Myriad::ReadWrite );
228 //
229 // Write stream header
230 //
231 uint32_t uStreamId = 0;
232 uint32_t uStreamSize = iHeaderStreamBytes+iHeaderStreamBlocks*4;
233 rBacking.write( &uStreamId, 4 );
234 rBacking.write( &uStreamSize, 4 );
235 for( int iBlockIndex = 0; iBlockIndex < iHeaderStreamBlocks; iBlockIndex++ )
521 { 236 {
522 TRACE("mHeader unlocked..."); 237 rBacking.write( &iBlockIndex, 4 );
523 throw MyriadException( MyriadException::invalidStreamId, 238 pHeadStream->aBlocks.append( iBlockIndex );
524 "Invalid stream id.");
525 } 239 }
526 if( iId == 0 ) 240 rBacking.flush();
527 {
528 TRACE("mHeader unlocked...");
529 throw MyriadException( MyriadException::protectedStream,
530 "You cannot delete stream zero, it is protected.");
531 }
532 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
533 {
534 if( (*i)->iId == iId )
535 {
536 Stream *pStream = *i;
537 for( BlockArray::iterator j = pStream->aBlocks.begin(); j; j++ )
538 {
539 lFreeBlocks.append( *j );
540// bsBlockUsed.setBit( *j, false );
541 iUsed--;
542 }
543 aStreams.erase( i );
544 bHeaderChanged = true;
545 delete pStream;
546 TRACE("mHeader unlocked...");
547 return;
548 }
549 }
550 TRACE("mHeader unlocked...");
551}
552 241
553Bu::Array<int> Bu::Myriad::getStreamIds() 242 hStream.insert( pHeadStream->iStream, pHeadStream );
554{
555 MutexLocker mLock( mHeader );
556 TRACE("mHeader locked.");
557 243
558 Bu::Array<int> aRet( aStreams.getSize() ); 244 for( int32_t j = iHeaderStreamBlocks; j < iPreallocateBlocks; j++ )
559 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
560 { 245 {
561 aRet.append( (*i)->iId ); 246 lFreeBlocks.append( j );
562 } 247 }
563
564 TRACE("mHeader unlocked...");
565 return aRet;
566}
567
568int Bu::Myriad::getStreamSize( int iId )
569{
570 MutexLocker mLock( mHeader );
571 TRACE("mHeader locked.");
572
573 TRACE("mHeader unlocked...");
574 return findStream( iId )->getSize();
575} 248}
576 249
577bool Bu::Myriad::hasStream( int iId ) 250int32_t Bu::Myriad::allocateBlock()
578{ 251{
579 MutexLocker mLock( mHeader ); 252 Bu::MutexLocker l( mAccess );
580 TRACE("mHeader locked."); 253 if( lFreeBlocks.isEmpty() )
581
582 try
583 {
584 findStream( iId );
585 TRACE("mHeader unlocked...");
586 return true;
587 }catch(...)
588 { 254 {
589 TRACE("mHeader unlocked..."); 255 // Increase the size of the backing stream
590 return false; 256 int32_t iIndex = iBlockCount++;
257 rBacking.setSize( iBlockCount*iBlockSize );
258 return iIndex;
591 } 259 }
592} 260 else
593
594Bu::MyriadStream Bu::Myriad::openStream( int iId )
595{
596 MutexLocker mLock( mHeader );
597 TRACE("mHeader locked.");
598
599 TRACE("mHeader unlocked...");
600 //sio << "Myriad: Request to open stream: " << iId << sio.nl;
601 return MyriadStream( *this, findStream( iId ) );
602}
603
604int Bu::Myriad::getNumStreams()
605{
606 MutexLocker mLock( mHeader );
607 TRACE("mHeader locked.");
608
609 TRACE("mHeader unlocked...");
610 return aStreams.getSize();
611}
612
613int Bu::Myriad::getBlockSize()
614{
615 return iBlockSize;
616}
617
618int Bu::Myriad::getNumBlocks()
619{
620 return iBlocks;
621}
622
623int Bu::Myriad::getNumUsedBlocks()
624{
625 return iUsed;
626}
627
628Bu::size Bu::Myriad::getTotalUsedBytes()
629{
630 MutexLocker mLock( mHeader );
631 TRACE("mHeader locked.");
632
633 Bu::size iTotalSize = 0;
634 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
635 { 261 {
636 iTotalSize += (*i)->getSize(); 262 // Provide an existing free block.
263 return lFreeBlocks.peekPop();
637 } 264 }
638 TRACE("mHeader unlocked...");
639 return iTotalSize;
640} 265}
641 266
642Bu::size Bu::Myriad::getTotalUnusedBytes() 267void Bu::Myriad::openStream( StreamId id )
643{ 268{
644 MutexLocker mLock( mHeader ); 269 Bu::MutexLocker l( mAccess );
645 TRACE("mHeader locked."); 270 hStream.get( id )->open();
646
647 Bu::size iTotalSize = (iBlocks-iUsed)*iBlockSize;
648 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
649 {
650 iTotalSize += iBlockSize - ((Bu::size)(*i)->getSize()%iBlockSize);
651 }
652 TRACE("mHeader unlocked...");
653 return iTotalSize;
654} 271}
655 272
656Bu::size Bu::Myriad::getTotalUnusedBytes( int iFakeBlockSize ) 273void Bu::Myriad::closeStream( StreamId id )
657{ 274{
658 MutexLocker mLock( mHeader ); 275 Bu::MutexLocker l( mAccess );
659 TRACE("mHeader locked."); 276 hStream.get( id )->close();
660
661 Bu::size iTotalSize = (iBlocks-iUsed)*iFakeBlockSize;
662 for( StreamArray::iterator i = aStreams.begin(); i; i++ )
663 {
664 iTotalSize += iFakeBlockSize - ((*i)->getSize()%iFakeBlockSize);
665 }
666 TRACE("mHeader unlocked...");
667 return iTotalSize;
668} 277}
669 278
670Bu::Myriad::Stream *Bu::Myriad::findStream( int iId ) 279int32_t Bu::Myriad::blockRead( int32_t iStart, void *pTarget, int32_t iSize )
671{ 280{
672 for( StreamArray::iterator i = aStreams.begin(); i; i++ ) 281 int32_t iUpperSize = iBlockSize - (iStart%iBlockSize);
673 { 282 Bu::println("Max size within block: %1 vs %2 (start=%3, blocksize=%4)")
674 if( (*i)->iId == iId ) 283 .arg( iUpperSize ).arg( iSize )
675 return *i; 284 .arg( iStart ).arg( iBlockSize );
676 }
677 285
678 throw MyriadException( MyriadException::noSuchStream, 286 int32_t iAmnt = std::min( iSize, iUpperSize );
679 "The requested stream doesn't exist and cannot be opened." ); 287 Bu::MutexLocker l( mBacking );
288 rBacking.setPos( iStart );
680 289
681 return NULL; 290 return rBacking.read( pTarget, iAmnt );
682} 291}
683 292
684Bu::Myriad::Block *Bu::Myriad::getBlock( int iBlock ) 293/////////
685{ 294// Bu::Myriad::Stream
686// sio << "Myriad: Reading block " << iBlock << ", bytes " 295//
687// << iBlockSize*iBlock << "-" << iBlockSize*(iBlock+1) << sio.nl;
688 Block *pBlock = new Block;
689 pBlock->pData = new char[iBlockSize];
690 sStore.setPos( iBlockSize * (Bu::size)iBlock );
691 sStore.read( pBlock->pData, iBlockSize );
692 pBlock->bChanged = false;
693 pBlock->iBlockIndex = iBlock;
694
695 mActiveBlocks.lock();
696 TRACE("mHeader locked.");
697 hActiveBlocks.insert( iBlock, pBlock );
698 TRACE("mHeader unlocked...");
699 mActiveBlocks.unlock();
700
701 return pBlock;
702}
703 296
704void Bu::Myriad::releaseBlock( Bu::Myriad::Block *pBlock ) 297Bu::Myriad::Stream::Stream( Bu::Myriad &rParent, Bu::Myriad::StreamId iStream,
298 int32_t iSize ) :
299 rParent( rParent ),
300 iStream( iStream ),
301 iSize( iSize ),
302 iOpenCount( 0 ),
303 bStructureChanged( false )
705{ 304{
706 if( pBlock == NULL )
707 return;
708// sio << "Myriad: Releasing block " << pBlock->iBlockIndex << sio.nl;
709 syncBlock( pBlock );
710 mActiveBlocks.lock();
711 TRACE("mHeader locked.");
712 hActiveBlocks.erase( pBlock->iBlockIndex );
713 TRACE("mHeader unlocked...");
714 mActiveBlocks.unlock();
715
716 delete[] pBlock->pData;
717 delete pBlock;
718} 305}
719 306
720void Bu::Myriad::syncBlock( Block *pBlock ) 307Bu::Myriad::Stream::~Stream()
721{ 308{
722 if( pBlock->bChanged )
723 {
724// sio << "Myriad: - Block changed, writing back to stream." << sio.nl;
725 sStore.setPos( iBlockSize * (Bu::size)pBlock->iBlockIndex );
726 sStore.write( pBlock->pData, iBlockSize );
727 pBlock->bChanged = false;
728 }
729}
730
731int Bu::Myriad::streamAddBlock( Stream *pStream )
732{
733 MutexLocker mLock( mHeader );
734 TRACE("mHeader locked.");
735
736 int iBlock = findEmptyBlock();
737 pStream->aBlocks.append( iBlock );
738// bsBlockUsed.setBit( iBlock );
739// bHeaderChanged = true;
740 iUsed++;
741 TRACE("mHeader unlocked...");
742 return iBlock;
743} 309}
744 310
745void Bu::Myriad::setStreamSize( Stream *pStream, long iSize ) 311int32_t Bu::Myriad::Stream::read( int32_t iStart, void *pTarget,
312 int32_t iSize )
746{ 313{
747 MutexLocker mLock( mHeader ); 314 int32_t iPos = iStart;
748 TRACE("mHeader locked."); 315 int32_t iRead = 0;
749 316 Bu::MutexLocker l( mAccess );
750 if( pStream->getSize() == iSize ) 317 while( iStart > 0 )
751 {
752 TRACE("mHeader unlocked...");
753 return;
754 }
755 else if( pStream->getSize() > iSize )
756 { 318 {
757 // Shrink 319 int32_t iBlock = aBlocks[iStart/rParent.iBlockSize];
758 TRACE(Bu::String("Shrink stream %1 from %2 to %3").arg(pStream->iId).arg(pStream->getSize()).arg(iSize).end() ); 320 int32_t iOffset = iPos % rParent.iBlockSize;
759 for( int iNewSize = pStream->aBlocks.getSize()*iBlockSize; 321 int32_t iChunkRead = rParent.blockRead(
760 iNewSize-iBlockSize > iSize; iNewSize -= iBlockSize ) 322 iBlock*rParent.iBlockSize+iOffset, pTarget, iSize
761 { 323 );
762// if( bsBlockUsed.getBit( pStream->aBlocks.last() ) ) 324 if( iChunkRead == 0 )
763 iUsed--; 325 break;
764// else 326 iRead += iChunkRead;
765// sio << "Unused block used in stream? " << pStream->aBlocks.last() << sio.nl; 327 reinterpret_cast<ptrdiff_t &>(pTarget) += iChunkRead;
766 lFreeBlocks.enqueue( pStream->aBlocks.last() ); 328 iSize -= iChunkRead;
767// bsBlockUsed.setBit( pStream->aBlocks.last(), false );
768 pStream->aBlocks.eraseLast();
769 }
770 pStream->setSize( iSize );
771 bHeaderChanged = true;
772 } 329 }
773 else
774 {
775 // Grow
776 TRACE(Bu::String("Grow stream %1 from %2 to %3").arg(pStream->iId).arg(pStream->getSize()).arg(iSize).end() );
777 for( int iNewSize = pStream->aBlocks.getSize()*iBlockSize;
778 iNewSize < iSize; iNewSize += iBlockSize )
779 {
780 //streamAddBlock( pStream );
781 int iBlock = findEmptyBlock();
782 pStream->aBlocks.append( iBlock );
783// bsBlockUsed.setBit( iBlock );
784// bHeaderChanged = true;
785 iUsed++;
786 }
787 pStream->setSize( iSize );
788 bHeaderChanged = true;
789 }
790 TRACE("mHeader unlocked...");
791}
792 330
793void Bu::Myriad::headerChanged() 331 return iRead;
794{
795 bHeaderChanged = true;
796} 332}
797 333
798bool Bu::Myriad::isMyriad( Bu::Stream &sStore ) 334void Bu::Myriad::Stream::open()
799{ 335{
800 uint8_t uTmp; 336 Bu::MutexLocker l( mAccess );
801 337 iOpenCount++;
802 return isMyriad( sStore, uTmp );
803}
804
805bool Bu::Myriad::isMyriad( Bu::Stream &sStore, uint8_t &uTmp )
806{
807 sStore.setPos( 0 );
808
809 unsigned char buf[4];
810 if( sStore.read( buf, 4 ) < 4 )
811 throw MyriadException( MyriadException::emptyStream,
812 "Input stream appears to be empty.");
813 sStore.read( &uTmp, 1 );
814 sStore.setPos( 0 );
815 if( memcmp( buf, Myriad_MAGIC_CODE, 4 ) )
816 {
817 return false;
818 }
819 return true;
820} 338}
821 339
822const Bu::BitString Bu::Myriad::getBlocksUsed() const 340bool Bu::Myriad::Stream::close()
823{ 341{
824 Bu::BitString bs( iBlocks, false ); 342 Bu::MutexLocker l( mAccess );
825 for( int j = 0; j < iBlocks; j++ ) 343 return (bool)(--iOpenCount);
826 bs.setBit( j );
827 for( IndexList::const_iterator i = lFreeBlocks.begin(); i; i++ )
828 bs.setBit( *i, false );
829 return bs;
830} 344}
831 345
diff --git a/src/stable/myriad.h b/src/stable/myriad.h
index e63c29f..07b4a1d 100644
--- a/src/stable/myriad.h
+++ b/src/stable/myriad.h
@@ -1,24 +1,14 @@
1/*
2 * Copyright (C) 2007-2023 Xagasoft, All rights reserved.
3 *
4 * This file is part of the libbu++ library and is released under the
5 * terms of the license contained in the file LICENSE.
6 */
7
8#ifndef BU_MYRIAD_H 1#ifndef BU_MYRIAD_H
9#define BU_MYRIAD_H 2#define BU_MYRIAD_H
10 3
11#include <stdint.h> 4#include "bu/stream.h"
12#include "bu/bitstring.h"
13#include "bu/exceptionbase.h" 5#include "bu/exceptionbase.h"
6#include "bu/mutex.h"
14#include "bu/array.h" 7#include "bu/array.h"
15#include "bu/hash.h" 8#include "bu/hash.h"
16#include "bu/mutex.h"
17#include "bu/extratypes.h"
18 9
19namespace Bu 10namespace Bu
20{ 11{
21 class Stream;
22 class MyriadStream; 12 class MyriadStream;
23 13
24 subExceptionDeclBegin( MyriadException ) 14 subExceptionDeclBegin( MyriadException )
@@ -31,212 +21,127 @@ namespace Bu
31 noSuchStream, 21 noSuchStream,
32 streamExists, 22 streamExists,
33 invalidStreamId, 23 invalidStreamId,
34 protectedStream 24 protectedStream,
25 invalidParameter,
26 invalidBackingStream,
27 badMode,
35 }; 28 };
36 subExceptionDeclEnd(); 29 subExceptionDeclEnd();
37 30
38 /**
39 * Myriad block-allocated stream multiplexing system. This is a system for
40 * creating streams that contain other streams in a flexible and lightweight
41 * manner. Basically, you can create a file (or any other stream) that can
42 * store any number of flexible, growing streams. The streams within the
43 * Myriad stream are automatically numbered, not named. This works more
44 * or less like a filesystem, but without the extra layer for managing
45 * file and directory links. This would actually be very easy to add
46 * on top of Myriad, but is not required.
47 *
48 * Header format is as follows:
49 *
50 * MMMMvBssssSSSS*
51 * M = Magic number (0AD3FA84)
52 * v = version number
53 * B = Bits per int
54 * s = Blocksize in bytes
55 * S = Number of Streams
56 *
57 * The * represents the Stream headers, one per stream, as follows:
58 * IIIIssss$
59 * I = Id number of the stream
60 * s = size of stream in bytes
61 *
62 * The $ represents the Block headers, one per used block, as follows:
63 * IIII
64 * I = Index of the block
65 *
66 * The stream/block data is interleaved in the header, so all blocks stored
67 * with one stream are together. The block headers are in order, and the
68 * data in them is required to be "solid" you cannot fill partial blocks
69 * mid-way through a stream.
70 *
71 * The initial block starts with the nids header, and is both the zero block
72 * and the zero stream. For now, the minimum block size is the size needed
73 * to store the base header, the zero stream header, and the first two
74 * blocks of the zero stream, so 30 bytes. Since it's reccomended to use
75 * a size that will fit evenly into filesystem blocks, then a size of 32 is
76 * probably the smallest reccomended size because all powers of two equal
77 * to or greater than 32 are evenly divisible by 32.
78 *
79 * I have had a thought that if the block size were smaller than 42 bytes
80 * the header would consume the first N blocks where N * block size is
81 * enough space to house the initial header, the first stream header, and
82 * the first N block headers. This, of course, causes you to hit an
83 * infinite header if the block size is small enough.
84 */
85 class Myriad 31 class Myriad
86 { 32 {
87 friend class MyriadStream;
88 public: 33 public:
89 /** 34 typedef int32_t StreamId;
90 * Create a Myriad object that uses the given stream to store data. 35 enum Mode {
91 * This stream must be random access. The block size and preallocate 36 None = 0x00,
92 * values passed in are values that will be used if the given stream 37
93 * is empty. In that case the stream will be "formatted" for myriad 38 // Flags
94 * with the specified block size. If there is already a viable Myriad 39 Read = 0x01, ///< Open file for reading
95 * format present in the stream, then the blocksize and preallocate 40 Write = 0x02, ///< Open file for writing
96 * values will be ignored and the values from the stream will be used 41 Create = 0x04, ///< Create file if it doesn't exist
97 * instead. If the stream doesn't appear to be Myriad formatted an 42 Truncate = 0x08, ///< Truncate file if it does exist
98 * exception will be thrown. 43 Append = 0x10, ///< Always append on every write
99 */ 44 NonBlock = 0x20, ///< Open file in non-blocking mode
100 Myriad( Bu::Stream &sStore, int iBlockSize=512, int iPreallocate=8 ); 45 Exclusive = 0x44, ///< Create file, if it exists then fail
101 virtual ~Myriad(); 46
102 47 // Helpful mixes
103 /** 48 ReadWrite = 0x03, ///< Open for reading and writing
104 * Destroy whatever data may be in the base stream and create a new 49 WriteNew = 0x0E ///< Create a file (or truncate) for writing.
105 * Myriad system there with the given blocksize. Use this with care, 50 /// Same as Write|Create|Truncate
106 * it will destroy anything that was already in the stream, and 51 };
107 * generally, should not ever have to be used.
108 */
109 void initialize( int iBlockSize, int iPreAllocate=1 );
110
111 /**
112 * Create a new stream within the Myriad system. The ID of the new
113 * stream is returned.
114 */
115 int createStream( int iPreAllocate=1 );
116
117 /**
118 * Create a new stream within the Myriad system with a given id. The
119 * id that you provide will be the new id of the stream unless it's
120 * already used, in which case an error is thrown. This is primarilly
121 * useful when copying an old Myriad file into a new one.
122 */
123 int createStreamWithId( int iId, int iPreAllocate=1 );
124
125 /**
126 * Delete a stream that's already within the Myriad.
127 */
128 void deleteStream( int iId );
129 52
53 public:
130 /** 54 /**
131 * Return a new Stream object assosiated with the given stream ID. 55 * Open existing Myriad stream, or initialize a new one if it doesn't
56 * exist.
57 *
58 * Myriad format V0
59 * 0 - 3: Myriad_MAGIC_CODE (0ad3fa84)
60 * 4 - 4: Version Id (1)
61 * 5 - 5: Bits per integer (32)
62 * 6 - 9: Block size in bytes.
63 * 10 - 13: Number of streams.
64 * 14 - ...: Stream Data
65 *
66 * Stream Data:
67 * 0 - 3: Stream Id
68 * 4 - 7: Size of stream in bytes
69 * 8 - ...: List of blocks in stream (4 bytes per block
132 */ 70 */
133 MyriadStream openStream( int iId ); 71 Myriad( Bu::Stream &rBacking, int32_t iBlockSize=-1, int32_t iPreallocateBlocks=-1 );
134 72 virtual ~Myriad();
135 Bu::Array<int> getStreamIds();
136 int getStreamSize( int iId );
137 bool hasStream( int iId );
138 73
139 int getNumStreams(); 74 MyriadStream create( Mode eMode, int32_t iPreallocateBytes=-1 );
140 int getBlockSize(); 75 MyriadStream open( StreamId iStream, Mode eMode );
141 int getNumBlocks(); 76 bool erase( StreamId iStream );
142 int getNumUsedBlocks(); 77 bool setSize( StreamId iStream, int32_t iNewSize );
143 Bu::size getTotalUsedBytes();
144 Bu::size getTotalUnusedBytes();
145 Bu::size getTotalUnusedBytes( int iFakeBlockSize );
146 78
147 /** 79 private:
148 * Syncronize the header data, etc. with the storage stream. It's not 80 bool loadMyriad();
149 * a bad idea to call this periodically. 81 void createMyriad( int32_t iBlockSize, int32_t iPreallocateBlocks );
150 */ 82 int32_t allocateBlock();
151 void sync();
152 83
84 void openStream( StreamId id );
85 void closeStream( StreamId id );
153 /** 86 /**
154 * Read the first few bytes from the given stream and return true/false 87 * Block restricted read, it will not read past the end of the block
155 * depending on weather or not it's a Myriad stream. This will throw 88 * that iStart places it in.
156 * an exception if the stream is empty, or is not random access.
157 */
158 static bool isMyriad( Bu::Stream &sStore, uint8_t &uVer );
159
160 /**
161 * Read the first few bytes from the given stream and return true/false
162 * depending on weather or not it's a Myriad stream. This will throw
163 * an exception if the stream is empty, or is not random access.
164 */ 89 */
165 static bool isMyriad( Bu::Stream &sStore ); 90 int32_t blockRead( int32_t iStart, void *pTarget, int32_t iSize );
166 91
167 const Bu::BitString getBlocksUsed() const; 92 public:
168
169 private:
170 /** 93 /**
171 * Initialize this object based on the data already in the assosiated 94 * Bridge/communication/tracking class for individual Myriad streams.
172 * stream. This will be called automatically for you if you forget, 95 * Not for general use, this is used by Myriad and MyriadStream to
173 * but if you want to pre-initialize for some reason, just call this 96 * control access.
174 * once before you actually start doing anything with your Myriad.
175 */ 97 */
176 void initialize();
177
178 enum
179 {
180 blockUnused = 0xFFFFFFFFUL
181 };
182
183 typedef Bu::Array<int> BlockArray;
184 class Stream 98 class Stream
185 { 99 {
186 public: 100 friend Bu::Myriad;
187 void setSize( int iNewSize );
188 void growTo( int iNewSize );
189 int getSize() const;
190
191 int iId;
192 BlockArray aBlocks;
193
194 private: 101 private:
195 int iSize; 102 Stream( Myriad &rParent, StreamId iStream, int32_t iSize );
196 mutable Bu::Mutex mStream; 103 virtual ~Stream();
197 };
198 typedef Bu::Array<Stream *> StreamArray;
199 104
200 class Block
201 {
202 public: 105 public:
203 char *pData; 106 int32_t read( int32_t iStart, void *pTarget, int32_t iSize );
204 bool bChanged; 107
205 int iBlockIndex; 108 /**
206 }; 109 * Doesn't actually open, just increments the open counter.
207 110 * If the open counter is non-zero then at least one stream has
208 void updateHeader(); 111 * a lock on this stream.
209 int findEmptyBlock(); 112 */
210 113 void open();
211 /** 114
212 *@todo Change this to use a binary search, it's nicer. 115 /**
213 */ 116 * Doesn't actually close, just decrements the open counter.
214 Stream *findStream( int iId ); 117 *@returns true if there are still handles open, false if no
118 * streams have a lock.
119 */
120 bool close();
215 121
216 Block *getBlock( int iBlock ); 122 private:
217 void releaseBlock( Block *pBlock ); 123 mutable Bu::Mutex mAccess;
218 void syncBlock( Block *pBlock ); 124 Myriad &rParent;
125 StreamId iStream;
126 int32_t iSize;
127 Bu::Array<int32_t> aBlocks;
128 int32_t iOpenCount;
129 bool bStructureChanged;
130 };
219 131
220 int streamAddBlock( Stream *pStream ); 132 private:
221 void setStreamSize( Stream *pStream, long iSize );
222 133
223 void headerChanged(); 134 typedef Bu::Hash<StreamId, Stream *> StreamHash;
224 135
225 private: 136 typedef Bu::List<int32_t> IndexList;
226 Bu::Stream &sStore; 137 Bu::Mutex mAccess;
227 int iBlockSize; 138 mutable Bu::Mutex mBacking;
228 int iBlocks; 139 Bu::Stream &rBacking;
229 int iUsed; 140 int32_t iBlockSize;
230 typedef Bu::List<int> IndexList; 141 int32_t iBlockCount;
142 bool bIsNewStream;
143 StreamHash hStream;
231 IndexList lFreeBlocks; 144 IndexList lFreeBlocks;
232// Bu::BitString bsBlockUsed;
233 StreamArray aStreams;
234 typedef Bu::Hash<int, Block *> BlockHash;
235 BlockHash hActiveBlocks;
236 bool bHeaderChanged;
237
238 Bu::Mutex mHeader;
239 Bu::Mutex mActiveBlocks;
240 }; 145 };
241}; 146};
242 147
diff --git a/src/stable/myriadstream.cpp b/src/stable/myriadstream.cpp
index 3c78bb0..cbbd4fe 100644
--- a/src/stable/myriadstream.cpp
+++ b/src/stable/myriadstream.cpp
@@ -1,254 +1,79 @@
1/*
2 * Copyright (C) 2007-2023 Xagasoft, All rights reserved.
3 *
4 * This file is part of the libbu++ library and is released under the
5 * terms of the license contained in the file LICENSE.
6 */
7
8#include "bu/myriadstream.h" 1#include "bu/myriadstream.h"
9 2
10#include <string.h> 3#include "bu/mutexlocker.h"
11
12// #define MYRIAD_STREAM_DEBUG 1
13
14#ifdef MYRIAD_STREAM_DEBUG
15#include "bu/sio.h"
16
17using Bu::sio;
18using Bu::Fmt;
19#endif
20#include "bu/sio.h"
21
22// #define TRACE( x ) Bu::println("%1:%2: %3: %4 - %5").arg(__FILE__).arg( __LINE__ ).arg(__PRETTY_FUNCTION__).arg(rMyriad.sStore.getLocation()).arg(x)
23#define TRACE( x ) (void)0
24 4
25Bu::MyriadStream::MyriadStream( Bu::Myriad &rMyriad, 5Bu::MyriadStream::MyriadStream( Bu::Myriad &rMyriad,
26 Bu::Myriad::Stream *pStream ) : 6 Bu::Myriad::Stream *pStream, Bu::Myriad::Mode eMode ) :
27 rMyriad( rMyriad ), 7 rMyriad( rMyriad ),
28 pStream( pStream ), 8 pStream( pStream ),
29 pCurBlock( NULL ), 9 eMode( eMode )
30 iPos( 0 )
31{ 10{
32#ifdef MYRIAD_STREAM_DEBUG 11 if( (eMode&Bu::Myriad::ReadWrite) == 0 )
33 sio << "MyriadStream: " << __LINE__ << ": Created, iId=" << pStream->iId << ", iSize=" 12 {
34 << pStream->iSize << sio.nl; 13 throw Bu::MyriadException( Bu::MyriadException::invalidParameter,
35#endif 14 "MyriadStream must be opened Read or Write or both.");
36 //pCurBlock = rMyriad.newBlock(); 15 }
37 //rMyriad.getBlock( uStream, pCurBlock ); 16 Bu::MutexLocker l( mAccess );
38 //uSize = pCurBlock->uBytesUsed; 17 pStream->open();
39} 18}
40 19
41Bu::MyriadStream::~MyriadStream() 20Bu::MyriadStream::~MyriadStream()
42{ 21{
43 if( pCurBlock ) 22 close();
44 rMyriad.releaseBlock( pCurBlock );
45 //rMyriad.updateStreamSize( uStream, uSize );
46 //rMyriad.deleteBlock( pCurBlock );
47} 23}
48 24
49void Bu::MyriadStream::close() 25void Bu::MyriadStream::close()
50{ 26{
51} 27 Bu::MutexLocker l( mAccess );
52 28 if( eMode )
53Bu::size Bu::MyriadStream::read( void *pBuf, Bu::size nBytes )
54{
55#ifdef MYRIAD_STREAM_DEBUG
56 sio << "MyriadStream: read: " << __LINE__ << ": Started, asked to read " << nBytes << "b."
57 << sio.nl;
58#endif
59 if( nBytes > (Bu::size)pStream->getSize()-iPos )
60 nBytes = pStream->getSize()-iPos;
61 if( nBytes <= 0 )
62 return 0;
63 int iLeft = nBytes;
64#ifdef MYRIAD_STREAM_DEBUG
65 sio << "MyriadStream: read: " << __LINE__ << ": Started, going to read " << nBytes << "b."
66 << sio.nl;
67#endif
68 if( pCurBlock == NULL )
69 { 29 {
70#ifdef MYRIAD_STREAM_DEBUG 30 pStream->close();
71 sio << "MyriadStream: read: " << __LINE__ << ": No block loaded, loading initial block." 31 eMode = Bu::Myriad::None;
72 << sio.nl;
73#endif
74 pCurBlock = rMyriad.getBlock(
75 pStream->aBlocks[iPos/rMyriad.iBlockSize]
76 );
77 } 32 }
78 while( iLeft > 0 ) 33}
79 {
80 int iCurBlock = pStream->aBlocks[iPos/rMyriad.iBlockSize];
81 if( pCurBlock->iBlockIndex != iCurBlock )
82 {
83#ifdef MYRIAD_STREAM_DEBUG
84 sio << "MyriadStream: read: " << __LINE__ << ": Loading new block " << iCurBlock << "."
85 << sio.nl;
86#endif
87 rMyriad.releaseBlock( pCurBlock );
88 pCurBlock = rMyriad.getBlock( iCurBlock );
89 }
90 34
91 int iAmnt = Bu::buMin( 35Bu::size Bu::MyriadStream::read( void *pBuf, size iBytes )
92 Bu::buMin( 36{
93 rMyriad.iBlockSize - iPos%rMyriad.iBlockSize,
94 iLeft
95 ),
96 pStream->getSize()-iPos
97 );
98#ifdef MYRIAD_STREAM_DEBUG
99 sio << "MyriadStream: read: " << __LINE__ << ": Copying out bytes: "
100 << iPos << "(" << (iPos%rMyriad.iBlockSize) << ")+"
101 << iAmnt
102 << ", " << iLeft << "b left." << sio.nl;
103#endif
104 memcpy(
105 pBuf,
106 pCurBlock->pData+(iPos%rMyriad.iBlockSize),
107 iAmnt
108 );
109 iPos += iAmnt;
110 pBuf = &((char *)pBuf)[iAmnt];
111 iLeft -= iAmnt;
112 }
113 return nBytes;
114} 37}
115 38
116Bu::size Bu::MyriadStream::write( const void *pBuf, Bu::size nBytes ) 39Bu::String Bu::MyriadStream::readLine()
117{ 40{
118 if( nBytes <= 0 ) 41}
119 return 0;
120 42
121#ifdef MYRIAD_STREAM_DEBUG 43Bu::String Bu::MyriadStream::readAll()
122 sio << "MyriadStream: write: " << __LINE__ << ": Started, asked to write " << nBytes << "b." 44{
123 << sio.nl; 45}
124#endif
125 if( nBytes <= 0 )
126 return 0;
127 int iLeft = nBytes;
128 /*
129 if( pCurBlock == NULL )
130 {
131#ifdef MYRIAD_STREAM_DEBUG
132 sio << "MyriadStream: write: No block loaded, loading initial block."
133 << sio.nl;
134#endif
135 pCurBlock = rMyriad.getBlock(
136 pStream->aBlocks[iPos/rMyriad.iBlockSize]
137 );
138 }*/
139
140 while( iLeft > 0 )
141 {
142 int iCurBlock;
143 if( iPos/rMyriad.iBlockSize < pStream->aBlocks.getSize() )
144 {
145 iCurBlock = pStream->aBlocks[iPos/rMyriad.iBlockSize];
146 }
147 else
148 {
149 iCurBlock = rMyriad.streamAddBlock( pStream );
150#ifdef MYRIAD_STREAM_DEBUG
151 sio << "MyriadStream: write: " << __LINE__ << ": New block allocated and appended: "
152 << iCurBlock << "." << sio.nl;
153 46
154#endif 47Bu::size Bu::MyriadStream::write( const void *pBuf, size iBytes )
155 } 48{
156 if( !pCurBlock || pCurBlock->iBlockIndex != iCurBlock ) 49}
157 {
158#ifdef MYRIAD_STREAM_DEBUG
159 sio << "MyriadStream: write: " << __LINE__ << ": Loading new block " << iCurBlock << "."
160 << sio.nl;
161#endif
162 rMyriad.releaseBlock( pCurBlock );
163 pCurBlock = rMyriad.getBlock( iCurBlock );
164 }
165 pCurBlock->bChanged = true;
166
167 // There are two main writing modes when it comes down to it.
168 // Overwrite mode and append mode. Append is what pretty much always
169 // happens when creating a new stream.
170 if( iPos < pStream->getSize() )
171 {
172 int iAmnt = Bu::buMin(
173 Bu::buMin(
174 rMyriad.iBlockSize - iPos%rMyriad.iBlockSize,
175 iLeft
176 ),
177 pStream->getSize()-iPos
178 );
179#ifdef MYRIAD_STREAM_DEBUG
180 sio << "MyriadStream: write (ovr): " << __LINE__ << ": Copying in bytes: "
181 << (iPos%rMyriad.iBlockSize) << "+"
182 << iAmnt
183 << ", " << iLeft << "b left." << sio.nl;
184#endif
185 memcpy(
186 pCurBlock->pData+(iPos%rMyriad.iBlockSize),
187 pBuf,
188 iAmnt
189 );
190 iPos += iAmnt;
191 pBuf = &((char *)pBuf)[iAmnt];
192 iLeft -= iAmnt;
193 }
194 else
195 {
196 int iAmnt = Bu::buMin(
197 rMyriad.iBlockSize - iPos%rMyriad.iBlockSize,
198 iLeft
199 );
200#ifdef MYRIAD_STREAM_DEBUG
201 sio << "MyriadStream: write (app): " << __LINE__ << ": Copying in bytes: "
202 << (iPos%rMyriad.iBlockSize) << "+"
203 << iAmnt
204 << ", " << iLeft << "b left." << sio.nl;
205#endif
206 memcpy(
207 pCurBlock->pData+(iPos%rMyriad.iBlockSize),
208 pBuf,
209 iAmnt
210 );
211 iPos += iAmnt;
212 TRACE(Bu::String("Stream=%1 - pStream->iSize(%2) += iAmnt(%3)").arg(pStream->iId).arg( pStream->getSize() ).arg(iAmnt).end());
213 pStream->growTo( pStream->getSize()+iAmnt );
214 TRACE(Bu::String("Stream=%1 - pStream->iSize = %2").arg(pStream->iId).arg( pStream->getSize() ).end());
215 rMyriad.headerChanged();
216 pBuf = &((char *)pBuf)[iAmnt];
217 iLeft -= iAmnt;
218 }
219 }
220 50
221 return nBytes; 51Bu::size Bu::MyriadStream::write( const Bu::String &sBuf )
52{
222} 53}
223 54
224Bu::size Bu::MyriadStream::tell() 55Bu::size Bu::MyriadStream::tell()
225{ 56{
226 return iPos;
227} 57}
228 58
229void Bu::MyriadStream::seek( Bu::size offset ) 59void Bu::MyriadStream::seek( size offset )
230{ 60{
231 iPos += offset;
232} 61}
233 62
234void Bu::MyriadStream::setPos( Bu::size pos ) 63void Bu::MyriadStream::setPos( size pos )
235{ 64{
236 iPos = pos;
237} 65}
238 66
239void Bu::MyriadStream::setPosEnd( Bu::size pos ) 67void Bu::MyriadStream::setPosEnd( size pos )
240{ 68{
241 iPos = pStream->getSize()-pos;
242} 69}
243 70
244bool Bu::MyriadStream::isEos() 71bool Bu::MyriadStream::isEos()
245{ 72{
246 return iPos >= pStream->getSize();
247} 73}
248 74
249bool Bu::MyriadStream::isOpen() 75bool Bu::MyriadStream::isOpen()
250{ 76{
251 return true;
252} 77}
253 78
254void Bu::MyriadStream::flush() 79void Bu::MyriadStream::flush()
@@ -257,59 +82,45 @@ void Bu::MyriadStream::flush()
257 82
258bool Bu::MyriadStream::canRead() 83bool Bu::MyriadStream::canRead()
259{ 84{
260 return true;
261} 85}
262 86
263bool Bu::MyriadStream::canWrite() 87bool Bu::MyriadStream::canWrite()
264{ 88{
265 return true;
266} 89}
267 90
268bool Bu::MyriadStream::isReadable() 91bool Bu::MyriadStream::isReadable()
269{ 92{
270 return true;
271} 93}
272 94
273bool Bu::MyriadStream::isWritable() 95bool Bu::MyriadStream::isWritable()
274{ 96{
275 return true;
276} 97}
277 98
278bool Bu::MyriadStream::isSeekable() 99bool Bu::MyriadStream::isSeekable()
279{ 100{
280 return true;
281} 101}
282 102
283bool Bu::MyriadStream::isBlocking() 103bool Bu::MyriadStream::isBlocking()
284{ 104{
285 return true;
286} 105}
287 106
288void Bu::MyriadStream::setBlocking( bool /*bBlocking*/ ) 107void Bu::MyriadStream::setBlocking( bool bBlocking )
289{ 108{
290} 109}
291 110
292void Bu::MyriadStream::setSize( Bu::size iSize ) 111void Bu::MyriadStream::setSize( size iSize )
293{ 112{
294 if( iSize < 0 )
295 iSize = 0;
296 rMyriad.setStreamSize( pStream, iSize );
297 if( iPos > iSize )
298 iPos = iSize;
299} 113}
300 114
301Bu::size Bu::MyriadStream::getSize() const 115Bu::size Bu::MyriadStream::getSize() const
302{ 116{
303 return pStream->getSize();
304} 117}
305 118
306Bu::size Bu::MyriadStream::getBlockSize() const 119Bu::size Bu::MyriadStream::getBlockSize() const
307{ 120{
308 return rMyriad.getBlockSize();
309} 121}
310 122
311Bu::String Bu::MyriadStream::getLocation() const 123Bu::String getLocation() const
312{ 124{
313 return Bu::String("%1").arg( pStream->iId );
314} 125}
315 126
diff --git a/src/stable/myriadstream.h b/src/stable/myriadstream.h
index a94a9a2..87192a9 100644
--- a/src/stable/myriadstream.h
+++ b/src/stable/myriadstream.h
@@ -1,38 +1,31 @@
1/*
2 * Copyright (C) 2007-2023 Xagasoft, All rights reserved.
3 *
4 * This file is part of the libbu++ library and is released under the
5 * terms of the license contained in the file LICENSE.
6 */
7
8#ifndef BU_MYRIAD_STREAM_H 1#ifndef BU_MYRIAD_STREAM_H
9#define BU_MYRIAD_STREAM_H 2#define BU_MYRIAD_STREAM_H
10 3
11#include "bu/stream.h" 4#include "bu/stream.h"
5#include "bu/myriadstream.h"
12#include "bu/myriad.h" 6#include "bu/myriad.h"
13 7
14namespace Bu 8namespace Bu
15{ 9{
16 class MyriadStream : public Bu::Stream 10 class MyriadStream : public Bu::Stream
17 { 11 {
18 friend class Myriad; 12 friend class Myriad;
19 private: 13 private:
20 /** 14 MyriadStream( Bu::Myriad &rMyriad, Bu::Myriad::Stream *pStream,
21 * These can only be created by the Myriad class. 15 Bu::Myriad::Mode eMode );
22 */
23 MyriadStream( Myriad &rMyriad, Myriad::Stream *pStream );
24
25 public:
26 virtual ~MyriadStream(); 16 virtual ~MyriadStream();
27 17
18 public:
28 virtual void close(); 19 virtual void close();
29 virtual Bu::size read( void *pBuf, Bu::size nBytes ); 20 virtual size read( void *pBuf, size iBytes );
30 virtual Bu::size write( const void *pBuf, Bu::size nBytes ); 21 virtual Bu::String readLine();
31 using Stream::write; 22 virtual Bu::String readAll();
32 virtual Bu::size tell(); 23 virtual size write( const void *pBuf, size iBytes );
33 virtual void seek( Bu::size offset ); 24 virtual size write( const Bu::String &sBuf );
34 virtual void setPos( Bu::size pos ); 25 virtual size tell();
35 virtual void setPosEnd( Bu::size pos ); 26 virtual void seek( size offset );
27 virtual void setPos( size pos );
28 virtual void setPosEnd( size pos );
36 virtual bool isEos(); 29 virtual bool isEos();
37 virtual bool isOpen(); 30 virtual bool isOpen();
38 virtual void flush(); 31 virtual void flush();
@@ -43,18 +36,16 @@ namespace Bu
43 virtual bool isSeekable(); 36 virtual bool isSeekable();
44 virtual bool isBlocking(); 37 virtual bool isBlocking();
45 virtual void setBlocking( bool bBlocking=true ); 38 virtual void setBlocking( bool bBlocking=true );
46 virtual void setSize( Bu::size iSize ); 39 virtual void setSize( size iSize );
47
48 virtual size getSize() const; 40 virtual size getSize() const;
49 virtual size getBlockSize() const; 41 virtual size getBlockSize() const;
50 virtual Bu::String getLocation() const; 42 virtual Bu::String getLocation() const;
51 43
52 private: 44 private:
53 Myriad &rMyriad; 45 mutable Bu::Mutex mAccess;
54 Myriad::Stream *pStream; 46 Bu::Myriad &rMyriad;
55 Myriad::Block *pCurBlock; 47 Bu::Myriad::Stream *pStream;
56 int iBlockSize; 48 Bu::Myriad::Mode eMode;
57 int iPos;
58 }; 49 };
59}; 50};
60 51