aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/experimental/fastcgi.cpp81
-rw-r--r--src/experimental/fastcgi.h18
-rw-r--r--src/experimental/httpget.h4
-rw-r--r--src/stable/client.cpp184
-rw-r--r--src/stable/client.h38
-rw-r--r--src/stable/clientbuf.cpp465
-rw-r--r--src/stable/clientbuf.h179
-rw-r--r--src/stable/multiserver.cpp27
-rw-r--r--src/stable/multiserver.h8
-rw-r--r--src/stable/server.cpp342
-rw-r--r--src/stable/server.h97
-rw-r--r--src/stable/serversocket.cpp10
-rw-r--r--src/stable/serversocket.h44
-rw-r--r--src/stable/serversockettcp.cpp (renamed from src/stable/tcpserversocket.cpp)101
-rw-r--r--src/stable/serversockettcp.h (renamed from src/stable/tcpserversocket.h)28
-rw-r--r--src/stable/socket.cpp10
-rw-r--r--src/stable/socket.h33
-rw-r--r--src/stable/sockettcp.cpp (renamed from src/stable/tcpsocket.cpp)168
-rw-r--r--src/stable/sockettcp.h (renamed from src/stable/tcpsocket.h)26
-rw-r--r--src/stable/synchroqueue.h100
-rw-r--r--src/unstable/itoserver.cpp241
-rw-r--r--src/unstable/itoserver.h147
22 files changed, 1484 insertions, 867 deletions
diff --git a/src/experimental/fastcgi.cpp b/src/experimental/fastcgi.cpp
index ac946b0..7e91bd1 100644
--- a/src/experimental/fastcgi.cpp
+++ b/src/experimental/fastcgi.cpp
@@ -24,14 +24,14 @@ Bu::FastCgi::FastCgi() :
24 pSrv( NULL ), 24 pSrv( NULL ),
25 bRunning( true ) 25 bRunning( true )
26{ 26{
27 pSrv = new Bu::TcpServerSocket( (Bu::TcpServerSocket::socket_t)STDIN_FILENO, false ); 27 pSrv = new Bu::ServerSocketTcp( (Bu::ServerSocketTcp::socket_t)STDIN_FILENO, false );
28} 28}
29 29
30Bu::FastCgi::FastCgi( int iPort ) : 30Bu::FastCgi::FastCgi( int iPort ) :
31 pSrv( NULL ), 31 pSrv( NULL ),
32 bRunning( true ) 32 bRunning( true )
33{ 33{
34 pSrv = new Bu::TcpServerSocket( iPort ); 34 pSrv = new Bu::ServerSocketTcp( iPort );
35} 35}
36 36
37Bu::FastCgi::~FastCgi() 37Bu::FastCgi::~FastCgi()
@@ -64,63 +64,63 @@ bool Bu::FastCgi::isEmbedded()
64#endif 64#endif
65} 65}
66 66
67void Bu::FastCgi::read( Bu::TcpSocket &s, Bu::FastCgi::Record &r ) 67void Bu::FastCgi::read( Bu::Socket *pSock, Bu::FastCgi::Record &r )
68{ 68{
69 int iRead = s.read( &r, sizeof(Record) ); 69 int iRead = pSock->read( &r, sizeof(Record) );
70 if( iRead != sizeof(Record) ) 70 if( iRead != sizeof(Record) )
71 throw Bu::TcpSocketException("Hey, the size %d is wrong for Record. (%s)", 71 throw Bu::ExceptionBase("Hey, the size %d is wrong for Record. (%pSock)",
72 iRead, strerror( errno ) ); 72 iRead, strerror( errno ) );
73 r.uRequestId = ntohs( r.uRequestId ); 73 r.uRequestId = ntohs( r.uRequestId );
74 r.uContentLength = ntohs( r.uContentLength ); 74 r.uContentLength = ntohs( r.uContentLength );
75} 75}
76 76
77void Bu::FastCgi::write( Bu::TcpSocket &s, Bu::FastCgi::Record r ) 77void Bu::FastCgi::write( Bu::Socket *pSock, Bu::FastCgi::Record r )
78{ 78{
79// sio << "Out -> " << r << sio.nl; 79// sio << "Out -> " << r << sio.nl;
80 r.uRequestId = htons( r.uRequestId ); 80 r.uRequestId = htons( r.uRequestId );
81 r.uContentLength = htons( r.uContentLength ); 81 r.uContentLength = htons( r.uContentLength );
82 s.write( &r, sizeof(Record) ); 82 pSock->write( &r, sizeof(Record) );
83} 83}
84 84
85void Bu::FastCgi::read( Bu::TcpSocket &s, Bu::FastCgi::BeginRequestBody &b ) 85void Bu::FastCgi::read( Bu::Socket *pSock, Bu::FastCgi::BeginRequestBody &b )
86{ 86{
87 s.read( &b, sizeof(BeginRequestBody) ); 87 pSock->read( &b, sizeof(BeginRequestBody) );
88 b.uRole = ntohs( b.uRole ); 88 b.uRole = ntohs( b.uRole );
89} 89}
90 90
91void Bu::FastCgi::write( Bu::TcpSocket &s, Bu::FastCgi::EndRequestBody b ) 91void Bu::FastCgi::write( Bu::Socket *pSock, Bu::FastCgi::EndRequestBody b )
92{ 92{
93 b.uStatus = htonl( b.uStatus ); 93 b.uStatus = htonl( b.uStatus );
94 s.write( &b, sizeof(b) ); 94 pSock->write( &b, sizeof(b) );
95} 95}
96 96
97uint32_t Bu::FastCgi::readLen( Bu::TcpSocket &s, uint16_t &uRead ) 97uint32_t Bu::FastCgi::readLen( Bu::Socket *pSock, uint16_t &uRead )
98{ 98{
99 uint8_t uByte[4]; 99 uint8_t uByte[4];
100 s.read( uByte, 1 ); 100 pSock->read( uByte, 1 );
101 uRead++; 101 uRead++;
102 if( uByte[0] >> 7 == 0 ) 102 if( uByte[0] >> 7 == 0 )
103 return uByte[0]; 103 return uByte[0];
104 104
105 s.read( uByte+1, 3 ); 105 pSock->read( uByte+1, 3 );
106 uRead += 3; 106 uRead += 3;
107 return ((uByte[0]&0x7f)<<24)|(uByte[1]<<16)|(uByte[2]<<8)|(uByte[3]); 107 return ((uByte[0]&0x7f)<<24)|(uByte[1]<<16)|(uByte[2]<<8)|(uByte[3]);
108} 108}
109 109
110void Bu::FastCgi::readPair( Bu::TcpSocket &s, StrHash &hParams, uint16_t &uRead ) 110void Bu::FastCgi::readPair( Bu::Socket *pSock, StrHash &hParams, uint16_t &uRead )
111{ 111{
112 uint32_t uName = readLen( s, uRead ); 112 uint32_t uName = readLen( pSock, uRead );
113 uint32_t uValue = readLen( s, uRead ); 113 uint32_t uValue = readLen( pSock, uRead );
114 uRead += uName + uValue; 114 uRead += uName + uValue;
115 unsigned char *sName = new unsigned char[uName]; 115 unsigned char *sName = new unsigned char[uName];
116 s.read( sName, uName ); 116 pSock->read( sName, uName );
117 Bu::String fsName( (char *)sName, uName ); 117 Bu::String fsName( (char *)sName, uName );
118 delete[] sName; 118 delete[] sName;
119 119
120 if( uValue > 0 ) 120 if( uValue > 0 )
121 { 121 {
122 unsigned char *sValue = new unsigned char[uValue]; 122 unsigned char *sValue = new unsigned char[uValue];
123 s.read( sValue, uValue ); 123 pSock->read( sValue, uValue );
124 Bu::String fsValue( (char *)sValue, uValue ); 124 Bu::String fsValue( (char *)sValue, uValue );
125 hParams.insert( fsName, fsValue ); 125 hParams.insert( fsName, fsValue );
126 delete[] sValue; 126 delete[] sValue;
@@ -158,13 +158,12 @@ void Bu::FastCgi::run()
158 bRunning = true; 158 bRunning = true;
159 while( bRunning ) 159 while( bRunning )
160 { 160 {
161 int iSock = pSrv->accept( 5 ); 161 Bu::Socket *pSock = pSrv->accept( 5 );
162 if( iSock < 0 ) 162 if( pSock == NULL )
163 continue; 163 continue;
164 164
165 Bu::TcpSocket s( iSock ); 165 pSock->setBlocking( true );
166 s.setBlocking( true ); 166// sio << "Got connection, blocking? " << pSock->isBlocking() << sio.nl;
167// sio << "Got connection, blocking? " << s.isBlocking() << sio.nl;
168 try 167 try
169 { 168 {
170 for(;;) 169 for(;;)
@@ -173,11 +172,11 @@ void Bu::FastCgi::run()
173 memset( &r, 0, sizeof(r) ); 172 memset( &r, 0, sizeof(r) );
174// try 173// try
175// { 174// {
176 read( s, r ); 175 read( pSock, r );
177// } 176// }
178// catch( Bu::ExceptionBase &e ) 177// catch( Bu::ExceptionBase &e )
179// { 178// {
180// sio << "Error: " << e.what() << ", " << s.isOpen() << 179// sio << "Error: " << e.what() << ", " << pSock->isOpen() <<
181// sio.nl; 180// sio.nl;
182// continue; 181// continue;
183// } 182// }
@@ -192,7 +191,7 @@ void Bu::FastCgi::run()
192 if( r.uContentLength > 0 ) 191 if( r.uContentLength > 0 )
193 { 192 {
194 char *buf = new char[r.uContentLength]; 193 char *buf = new char[r.uContentLength];
195 sio << " (read " << s.read( buf, r.uContentLength ) 194 sio << " (read " << pSock->read( buf, r.uContentLength )
196 << "/" << r.uContentLength << "):" << sio.nl; 195 << "/" << r.uContentLength << "):" << sio.nl;
197 sio.write( buf, r.uContentLength ); 196 sio.write( buf, r.uContentLength );
198 sio << sio.nl << sio.nl; 197 sio << sio.nl << sio.nl;
@@ -215,7 +214,7 @@ void Bu::FastCgi::run()
215// sio << "Begin Request."; 214// sio << "Begin Request.";
216 { 215 {
217 BeginRequestBody b; 216 BeginRequestBody b;
218 read( s, b ); 217 read( pSock, b );
219 if( pChan != NULL ) 218 if( pChan != NULL )
220 { 219 {
221 sio << "Error!!!" << sio.nl; 220 sio << "Error!!!" << sio.nl;
@@ -236,7 +235,7 @@ void Bu::FastCgi::run()
236 uint16_t uUsed = 0; 235 uint16_t uUsed = 0;
237 while( uUsed < r.uContentLength ) 236 while( uUsed < r.uContentLength )
238 { 237 {
239 readPair( s, pChan->hParams, uUsed ); 238 readPair( pSock, pChan->hParams, uUsed );
240 } 239 }
241 } 240 }
242 break; 241 break;
@@ -253,7 +252,7 @@ void Bu::FastCgi::run()
253 int iTotal = 0; 252 int iTotal = 0;
254 do 253 do
255 { 254 {
256 size_t iRead = s.read( 255 size_t iRead = pSock->read(
257 buf, r.uContentLength-iTotal ); 256 buf, r.uContentLength-iTotal );
258 iTotal += iRead; 257 iTotal += iRead;
259// sio << " (read " << iRead << " " << iTotal 258// sio << " (read " << iRead << " " << iTotal
@@ -273,7 +272,7 @@ void Bu::FastCgi::run()
273 else 272 else
274 { 273 {
275 char *buf = new char[r.uContentLength]; 274 char *buf = new char[r.uContentLength];
276 s.read( buf, r.uContentLength ); 275 pSock->read( buf, r.uContentLength );
277 pChan->sData.append( buf, r.uContentLength ); 276 pChan->sData.append( buf, r.uContentLength );
278 delete[] buf; 277 delete[] buf;
279 } 278 }
@@ -323,12 +322,12 @@ void Bu::FastCgi::run()
323 if( iSize > 65528 ) 322 if( iSize > 65528 )
324 iSize = 65528; 323 iSize = 65528;
325 rOut.uContentLength = iSize; 324 rOut.uContentLength = iSize;
326 write( s, rOut ); 325 write( pSock, rOut );
327 s.write( sStdOut.getStr()+iPos, iSize ); 326 pSock->write( sStdOut.getStr()+iPos, iSize );
328 } 327 }
329 } 328 }
330 rOut.uContentLength = 0; 329 rOut.uContentLength = 0;
331 write( s, rOut ); 330 write( pSock, rOut );
332 331
333 rOut.uType = typeStdErr; 332 rOut.uType = typeStdErr;
334 if( sStdErr.getSize() > 0 ) 333 if( sStdErr.getSize() > 0 )
@@ -340,21 +339,21 @@ void Bu::FastCgi::run()
340 if( iSize > 65528 ) 339 if( iSize > 65528 )
341 iSize = 65528; 340 iSize = 65528;
342 rOut.uContentLength = iSize; 341 rOut.uContentLength = iSize;
343 write( s, rOut ); 342 write( pSock, rOut );
344 s.write( sStdErr.getStr()+iPos, iSize ); 343 pSock->write( sStdErr.getStr()+iPos, iSize );
345 } 344 }
346 } 345 }
347 rOut.uContentLength = 0; 346 rOut.uContentLength = 0;
348 write( s, rOut ); 347 write( pSock, rOut );
349 348
350 rOut.uType = typeEndRequest; 349 rOut.uType = typeEndRequest;
351 rOut.uContentLength = 8; 350 rOut.uContentLength = 8;
352 write( s, rOut ); 351 write( pSock, rOut );
353 352
354 EndRequestBody b; 353 EndRequestBody b;
355 memset( &b, 0, sizeof(b) ); 354 memset( &b, 0, sizeof(b) );
356 b.uStatus = iRet; 355 b.uStatus = iRet;
357 write( s, b ); 356 write( pSock, b );
358 357
359 delete pChan; 358 delete pChan;
360 aChannel[r.uRequestId-1] = NULL; 359 aChannel[r.uRequestId-1] = NULL;
@@ -362,10 +361,10 @@ void Bu::FastCgi::run()
362 } 361 }
363 } 362 }
364 } 363 }
365 catch( Bu::TcpSocketException &e ) 364 catch( Bu::ExceptionBase &e )
366 { 365 {
367// sio << "Bu::SocketException: " << e.what() << sio.nl << 366// sio << "Bu::SocketException: " << e.what() << sio.nl <<
368// "\tSocket open: " << s.isOpen() << sio.nl; 367// "\tSocket open: " << pSock->isOpen() << sio.nl;
369 } 368 }
370 } 369 }
371} 370}
diff --git a/src/experimental/fastcgi.h b/src/experimental/fastcgi.h
index 081156e..693ff82 100644
--- a/src/experimental/fastcgi.h
+++ b/src/experimental/fastcgi.h
@@ -11,8 +11,8 @@
11#include "bu/string.h" 11#include "bu/string.h"
12#include "bu/hash.h" 12#include "bu/hash.h"
13#include "bu/array.h" 13#include "bu/array.h"
14#include "bu/tcpsocket.h" 14#include "bu/sockettcp.h"
15#include "bu/tcpserversocket.h" 15#include "bu/serversockettcp.h"
16 16
17namespace Bu 17namespace Bu
18{ 18{
@@ -111,18 +111,18 @@ namespace Bu
111 virtual void onUninit() { }; 111 virtual void onUninit() { };
112 112
113 private: 113 private:
114 void read( Bu::TcpSocket &s, Record &r ); 114 void read( Bu::Socket *pSock, Record &r );
115 void read( Bu::TcpSocket &s, BeginRequestBody &b ); 115 void read( Bu::Socket *pSock, BeginRequestBody &b );
116 uint32_t readLen( Bu::TcpSocket &s, uint16_t &uUsed ); 116 uint32_t readLen( Bu::Socket *pSock, uint16_t &uUsed );
117 void readPair( Bu::TcpSocket &s, StrHash &hParams, uint16_t &uUsed ); 117 void readPair( Bu::Socket *pSock, StrHash &hParams, uint16_t &uUsed );
118 118
119 void write( Bu::TcpSocket &s, Record r ); 119 void write( Bu::Socket *pSock, Record r );
120 void write( Bu::TcpSocket &s, EndRequestBody b ); 120 void write( Bu::Socket *pSock, EndRequestBody b );
121 121
122 bool hasChannel( int iChan ); 122 bool hasChannel( int iChan );
123 123
124 private: 124 private:
125 Bu::TcpServerSocket *pSrv; 125 Bu::ServerSocket *pSrv;
126 bool bRunning; 126 bool bRunning;
127 Bu::Array<Channel *> aChannel; 127 Bu::Array<Channel *> aChannel;
128 }; 128 };
diff --git a/src/experimental/httpget.h b/src/experimental/httpget.h
index f4bf639..a1a8b42 100644
--- a/src/experimental/httpget.h
+++ b/src/experimental/httpget.h
@@ -11,7 +11,7 @@
11#include "bu/stream.h" 11#include "bu/stream.h"
12#include "bu/string.h" 12#include "bu/string.h"
13#include "bu/url.h" 13#include "bu/url.h"
14#include "bu/tcpsocket.h" 14#include "bu/sockettcp.h"
15#include "bu/hash.h" 15#include "bu/hash.h"
16 16
17namespace Bu 17namespace Bu
@@ -56,7 +56,7 @@ namespace Bu
56 private: 56 private:
57 Bu::Url uSrc; 57 Bu::Url uSrc;
58 Bu::String sMethod; 58 Bu::String sMethod;
59 Bu::TcpSocket sSrv; 59 Bu::SocketTcp sSrv;
60 typedef Bu::Hash<Bu::String,Bu::String> MimeHash; 60 typedef Bu::Hash<Bu::String,Bu::String> MimeHash;
61 MimeHash hMimeIn; 61 MimeHash hMimeIn;
62 MimeHash hMimeOut; 62 MimeHash hMimeOut;
diff --git a/src/stable/client.cpp b/src/stable/client.cpp
index 56c5094..d2d48d7 100644
--- a/src/stable/client.cpp
+++ b/src/stable/client.cpp
@@ -6,7 +6,6 @@
6 */ 6 */
7 7
8#include "bu/client.h" 8#include "bu/client.h"
9#include "bu/tcpsocket.h"
10#include <stdlib.h> 9#include <stdlib.h>
11#include <errno.h> 10#include <errno.h>
12#include "bu/protocol.h" 11#include "bu/protocol.h"
@@ -24,72 +23,23 @@
24#define BU_PROFILE_END( x ) (void)0 23#define BU_PROFILE_END( x ) (void)0
25#endif 24#endif
26 25
27Bu::Client::Client( Bu::TcpSocket *pSocket, 26Bu::Client::Client( class Bu::ClientLinkFactory *pfLink ) :
28 class Bu::ClientLinkFactory *pfLink ) :
29 pTopStream( pSocket ),
30 pSocket( pSocket ),
31 pProto( NULL ), 27 pProto( NULL ),
32 bWantsDisconnect( false ), 28 bWantsDisconnect( false ),
33 pfLink( pfLink ) 29 pfLink( pfLink )
34{ 30{
35 Bu::ReadWriteMutex::WriteLocker lGlobal( mGlobal );
36 lFilts.prepend( pSocket );
37} 31}
38 32
39Bu::Client::~Client() 33Bu::Client::~Client()
40{ 34{
41 Bu::ReadWriteMutex::WriteLocker lGlobal( mGlobal );
42 for( FilterList::iterator i = lFilts.begin(); i; i++ )
43 {
44 delete *i;
45 }
46 pTopStream = pSocket = NULL;
47 delete pfLink; 35 delete pfLink;
48} 36}
49 37
50void Bu::Client::processInput() 38void Bu::Client::processInput()
51{ 39{
52 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 40 Bu::MutexLocker l( mProto );
53 mRead.lock();
54 char buf[RBS];
55 Bu::size nRead, nTotal=0;
56 41
57 BU_PROFILE_START("client.read"); 42 if( pProto && getInputSize() > 0 )
58 for(;;)
59 {
60 try
61 {
62 nRead = pTopStream->read( buf, RBS );
63
64 if( nRead == 0 )
65 {
66 break;
67 }
68 else
69 {
70 nTotal += nRead;
71 qbRead.write( buf, nRead );
72 if( !pTopStream->canRead() )
73 break;
74 }
75 }
76 catch( Bu::TcpSocketException &e )
77 {
78 pTopStream->close();
79 bWantsDisconnect = true;
80 break;
81 }
82 }
83 BU_PROFILE_END("client.read");
84 mRead.unlock();
85
86 if( nTotal == 0 )
87 {
88 pTopStream->close();
89 bWantsDisconnect = true;
90 }
91
92 if( pProto && nTotal )
93 { 43 {
94 BU_PROFILE_START("client.process"); 44 BU_PROFILE_START("client.process");
95 pProto->onNewData( this ); 45 pProto->onNewData( this );
@@ -97,197 +47,133 @@ void Bu::Client::processInput()
97 } 47 }
98} 48}
99 49
100void Bu::Client::processOutput()
101{
102 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal );
103 mWrite.lock();
104 char buf[RBS];
105 if( qbWrite.getSize() > 0 )
106 {
107 int nAmnt = RBS;
108 nAmnt = qbWrite.peek( buf, nAmnt );
109 int nReal = pTopStream->write( buf, nAmnt );
110 qbWrite.seek( nReal );
111 pTopStream->flush();
112 }
113 mWrite.unlock();
114}
115
116void Bu::Client::setProtocol( Protocol *pProto ) 50void Bu::Client::setProtocol( Protocol *pProto )
117{ 51{
118 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 52 Bu::MutexLocker l( mProto );
119 this->pProto = pProto; 53 this->pProto = pProto;
120 this->pProto->onNewConnection( this ); 54 this->pProto->onNewConnection( this );
121} 55}
122 56
123Bu::Protocol *Bu::Client::getProtocol() 57Bu::Protocol *Bu::Client::getProtocol()
124{ 58{
125 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 59 Bu::MutexLocker l( mProto );
126 return pProto; 60 return pProto;
127} 61}
128 62
129void Bu::Client::clearProtocol() 63void Bu::Client::clearProtocol()
130{ 64{
131 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 65 Bu::MutexLocker l( mProto );
132 pProto = NULL; 66 pProto = NULL;
133} 67}
134/*
135Bu::String &Bu::Client::getInput()
136{
137 return sReadBuf;
138}
139
140Bu::String &Bu::Client::getOutput()
141{
142 return sWriteBuf;
143}
144*/
145 68
146bool Bu::Client::isOpen() 69bool Bu::Client::isOpen()
147{ 70{
148 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 71 return true;
149 if( !pTopStream ) return false;
150 return pTopStream->isOpen();
151} 72}
152 73
153Bu::size Bu::Client::write( const Bu::String &sData ) 74Bu::size Bu::Client::write( const Bu::String &sData )
154{ 75{
155 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 76 return cbBuffer.client().write( sData.getStr(), sData.getSize() );
156 Bu::MutexLocker l( mWrite );
157 return qbWrite.write( sData.getStr(), sData.getSize() );
158} 77}
159 78
160Bu::size Bu::Client::write( const void *pData, Bu::size nBytes ) 79Bu::size Bu::Client::write( const void *pData, Bu::size nBytes )
161{ 80{
162 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 81 return cbBuffer.client().write( pData, nBytes );
163 Bu::MutexLocker l( mWrite );
164 return qbWrite.write( pData, nBytes );
165} 82}
166 83
167Bu::size Bu::Client::write( int8_t nData ) 84Bu::size Bu::Client::write( int8_t nData )
168{ 85{
169 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 86 return cbBuffer.client().write( (const char *)&nData, sizeof(nData) );
170 Bu::MutexLocker l( mWrite );
171 return qbWrite.write( (const char *)&nData, sizeof(nData) );
172} 87}
173 88
174Bu::size Bu::Client::write( int16_t nData ) 89Bu::size Bu::Client::write( int16_t nData )
175{ 90{
176 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 91 return cbBuffer.client().write( (const char *)&nData, sizeof(nData) );
177 Bu::MutexLocker l( mWrite );
178 return qbWrite.write( (const char *)&nData, sizeof(nData) );
179} 92}
180 93
181Bu::size Bu::Client::write( int32_t nData ) 94Bu::size Bu::Client::write( int32_t nData )
182{ 95{
183 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 96 return cbBuffer.client().write( (const char *)&nData, sizeof(nData) );
184 Bu::MutexLocker l( mWrite );
185 return qbWrite.write( (const char *)&nData, sizeof(nData) );
186} 97}
187 98
188Bu::size Bu::Client::write( int64_t nData ) 99Bu::size Bu::Client::write( int64_t nData )
189{ 100{
190 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 101 return cbBuffer.client().write( (const char *)&nData, sizeof(nData) );
191 Bu::MutexLocker l( mWrite );
192 return qbWrite.write( (const char *)&nData, sizeof(nData) );
193} 102}
194 103
195Bu::size Bu::Client::write( uint8_t nData ) 104Bu::size Bu::Client::write( uint8_t nData )
196{ 105{
197 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 106 return cbBuffer.client().write( (const char *)&nData, sizeof(nData) );
198 Bu::MutexLocker l( mWrite );
199 return qbWrite.write( (const char *)&nData, sizeof(nData) );
200} 107}
201 108
202Bu::size Bu::Client::write( uint16_t nData ) 109Bu::size Bu::Client::write( uint16_t nData )
203{ 110{
204 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 111 return cbBuffer.client().write( (const char *)&nData, sizeof(nData) );
205 Bu::MutexLocker l( mWrite );
206 return qbWrite.write( (const char *)&nData, sizeof(nData) );
207} 112}
208 113
209Bu::size Bu::Client::write( uint32_t nData ) 114Bu::size Bu::Client::write( uint32_t nData )
210{ 115{
211 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 116 return cbBuffer.client().write( (const char *)&nData, sizeof(nData) );
212 Bu::MutexLocker l( mWrite );
213 return qbWrite.write( (const char *)&nData, sizeof(nData) );
214} 117}
215 118
216Bu::size Bu::Client::write( uint64_t nData ) 119Bu::size Bu::Client::write( uint64_t nData )
217{ 120{
218 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 121 return cbBuffer.client().write( (const char *)&nData, sizeof(nData) );
219 Bu::MutexLocker l( mWrite );
220 return qbWrite.write( (const char *)&nData, sizeof(nData) );
221} 122}
222 123
223Bu::size Bu::Client::read( void *pData, Bu::size nBytes ) 124Bu::size Bu::Client::read( void *pData, Bu::size nBytes )
224{ 125{
225 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 126 return cbBuffer.client().read( pData, nBytes );
226 Bu::MutexLocker l( mWrite );
227 return qbRead.read( pData, nBytes );
228} 127}
229 128
129
230Bu::size Bu::Client::peek( void *pData, int nBytes, int nOffset ) 130Bu::size Bu::Client::peek( void *pData, int nBytes, int nOffset )
231{ 131{
232 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 132 return cbBuffer.client().peek( pData, nBytes, nOffset );
233 Bu::MutexLocker l( mWrite );
234 return qbRead.peek( pData, nBytes, nOffset );
235} 133}
236 134
135
237Bu::size Bu::Client::getInputSize() 136Bu::size Bu::Client::getInputSize()
238{ 137{
239 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 138 return cbBuffer.client().getSize();
240 Bu::MutexLocker l( mWrite );
241 return qbRead.getSize();
242} 139}
243 140
244Bu::size Bu::Client::getOutputSize() 141Bu::size Bu::Client::getOutputSize()
245{ 142{
246 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 143 return cbBuffer.server().getSize();
247 Bu::MutexLocker l( mWrite );
248 return qbWrite.getSize();
249}
250
251const Bu::TcpSocket *Bu::Client::getSocket() const
252{
253 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal );
254 return pSocket;
255} 144}
256 145
257void Bu::Client::disconnect() 146void Bu::Client::disconnect()
258{ 147{
259 Bu::ReadWriteMutex::WriteLocker lGlobal( mGlobal ); 148 Bu::MutexLocker l( mDisconnect );
260 bWantsDisconnect = true; 149 bWantsDisconnect = true;
261} 150}
262 151
263bool Bu::Client::wantsDisconnect() 152bool Bu::Client::wantsDisconnect()
264{ 153{
265 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 154 Bu::MutexLocker l( mDisconnect );
266 return bWantsDisconnect; 155 return bWantsDisconnect;
267} 156}
268 157
269void Bu::Client::close() 158void Bu::Client::close()
270{ 159{
271 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal );
272 pTopStream->close();
273} 160}
274 161
275Bu::ClientLink *Bu::Client::getLink() 162Bu::ClientLink *Bu::Client::getLink()
276{ 163{
277 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal );
278 return pfLink->createLink( this ); 164 return pfLink->createLink( this );
279} 165}
280 166
281void Bu::Client::onMessage( const Bu::String &sMsg ) 167void Bu::Client::onMessage( const Bu::String &sMsg )
282{ 168{
283 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 169 Bu::MutexLocker l( mProto );
284 if( pProto ) 170 if( pProto )
285 pProto->onMessage( this, sMsg ); 171 pProto->onMessage( this, sMsg );
286} 172}
287 173
288void Bu::Client::tick() 174void Bu::Client::tick()
289{ 175{
290 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 176 Bu::MutexLocker l( mProto );
291 if( pProto ) 177 if( pProto )
292 pProto->onTick( this ); 178 pProto->onTick( this );
293} 179}
@@ -299,9 +185,7 @@ Bu::size Bu::Client::tell()
299 185
300void Bu::Client::seek( Bu::size offset ) 186void Bu::Client::seek( Bu::size offset )
301{ 187{
302 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 188 return cbBuffer.client().seek( offset );
303 Bu::MutexLocker l( mRead );
304 return qbRead.seek( offset );
305} 189}
306 190
307void Bu::Client::setPos( Bu::size ) 191void Bu::Client::setPos( Bu::size )
@@ -321,15 +205,11 @@ bool Bu::Client::isEos()
321 205
322void Bu::Client::flush() 206void Bu::Client::flush()
323{ 207{
324 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal );
325 processOutput();
326} 208}
327 209
328bool Bu::Client::canRead() 210bool Bu::Client::canRead()
329{ 211{
330 Bu::ReadWriteMutex::ReadLocker lGlobal( mGlobal ); 212 return cbBuffer.client().getSize() > 0;
331 Bu::MutexLocker l( mRead );
332 return qbRead.getSize() > 0;
333} 213}
334 214
335bool Bu::Client::canWrite() 215bool Bu::Client::canWrite()
@@ -374,11 +254,13 @@ Bu::size Bu::Client::getSize() const
374 254
375Bu::size Bu::Client::getBlockSize() const 255Bu::size Bu::Client::getBlockSize() const
376{ 256{
377 return pSocket->getBlockSize(); 257 return 0;
258 //return pSocket->getBlockSize();
378} 259}
379 260
380Bu::String Bu::Client::getLocation() const 261Bu::String Bu::Client::getLocation() const
381{ 262{
382 return pSocket->getLocation(); 263 return "???";
264 //return pSocket->getLocation();
383} 265}
384 266
diff --git a/src/stable/client.h b/src/stable/client.h
index abde682..4cd8e3f 100644
--- a/src/stable/client.h
+++ b/src/stable/client.h
@@ -12,7 +12,7 @@
12 12
13#include "bu/config.h" 13#include "bu/config.h"
14#include "bu/string.h" 14#include "bu/string.h"
15#include "bu/queuebuf.h" 15#include "bu/clientbuf.h"
16#include "bu/mutex.h" 16#include "bu/mutex.h"
17#include "bu/readwritemutex.h" 17#include "bu/readwritemutex.h"
18 18
@@ -28,7 +28,6 @@ namespace Bu
28{ 28{
29 class Protocol; 29 class Protocol;
30 class Stream; 30 class Stream;
31 class TcpSocket;
32 class ClientLinkFactory; 31 class ClientLinkFactory;
33 32
34 /** 33 /**
@@ -36,15 +35,13 @@ namespace Bu
36 */ 35 */
37 class Client : public Bu::Stream 36 class Client : public Bu::Stream
38 { 37 {
38 friend class Server;
39 public: 39 public:
40 Client( Bu::TcpSocket *pSocket, Bu::ClientLinkFactory *pfLink); 40 Client( Bu::ClientLinkFactory *pfLink );
41 virtual ~Client(); 41 virtual ~Client();
42 42
43 void processInput(); 43 void processInput();
44 void processOutput();
45 44
46 //Bu::String &getInput();
47 //Bu::String &getOutput();
48 Bu::size write( const Bu::String &sData ); 45 Bu::size write( const Bu::String &sData );
49 Bu::size write( const void *pData, Bu::size nBytes ); 46 Bu::size write( const void *pData, Bu::size nBytes );
50 Bu::size write( int8_t nData ); 47 Bu::size write( int8_t nData );
@@ -57,7 +54,7 @@ namespace Bu
57 Bu::size write( uint64_t nData ); 54 Bu::size write( uint64_t nData );
58 Bu::size read( void *pData, Bu::size nBytes ); 55 Bu::size read( void *pData, Bu::size nBytes );
59 Bu::size peek( void *pData, int nBytes, int nOffset=0 ); 56 Bu::size peek( void *pData, int nBytes, int nOffset=0 );
60// void seek( int nBytes ); 57 void seek( int nBytes );
61 Bu::size getInputSize(); 58 Bu::size getInputSize();
62 Bu::size getOutputSize(); 59 Bu::size getOutputSize();
63 60
@@ -69,8 +66,6 @@ namespace Bu
69 void close(); 66 void close();
70 void tick(); 67 void tick();
71 68
72 const Bu::TcpSocket *getSocket() const;
73
74 void disconnect(); 69 void disconnect();
75 bool wantsDisconnect(); 70 bool wantsDisconnect();
76 71
@@ -78,31 +73,22 @@ namespace Bu
78 73
79 void onMessage( const Bu::String &sMsg ); 74 void onMessage( const Bu::String &sMsg );
80 75
81 bool hasOutput() { return qbWrite.getSize() > 0; } 76 bool hasOutput() { return cbBuffer.server().getSize() > 0; }
82 bool hasInput() { return qbRead.getSize() > 0; } 77 bool hasInput() { return cbBuffer.client().getSize() > 0; }
83 78
84 template<typename filter> 79 template<typename filter>
85 void pushFilter() 80 void pushFilter()
86 { 81 {
87 filter *pFlt = new filter( *pTopStream );
88 pTopStream = pFlt;
89 lFilts.prepend( pFlt );
90 } 82 }
91 83
92 template<typename filter, typename p1t> 84 template<typename filter, typename p1t>
93 void pushFilter( p1t p1 ) 85 void pushFilter( p1t p1 )
94 { 86 {
95 filter *pFlt = new filter( *pTopStream, p1 );
96 pTopStream = pFlt;
97 lFilts.prepend( pFlt );
98 } 87 }
99 88
100 template<typename filter, typename p1t, typename p2t> 89 template<typename filter, typename p1t, typename p2t>
101 void pushFilter( p1t p1, p2t p2 ) 90 void pushFilter( p1t p1, p2t p2 )
102 { 91 {
103 filter *pFlt = new filter( *pTopStream, p1, p2 );
104 pTopStream = pFlt;
105 lFilts.prepend( pFlt );
106 } 92 }
107 93
108 /* 94 /*
@@ -128,18 +114,12 @@ namespace Bu
128 virtual Bu::String getLocation() const; 114 virtual Bu::String getLocation() const;
129 115
130 private: 116 private:
131 typedef Bu::List<Bu::Stream *> FilterList;
132 FilterList lFilts;
133 Bu::Stream *pTopStream;
134 Bu::TcpSocket *pSocket;
135 Bu::Protocol *pProto; 117 Bu::Protocol *pProto;
136 Bu::QueueBuf qbRead; 118 Bu::ClientBuf cbBuffer;
137 Bu::QueueBuf qbWrite;
138 bool bWantsDisconnect; 119 bool bWantsDisconnect;
139 class Bu::ClientLinkFactory *pfLink; 120 class Bu::ClientLinkFactory *pfLink;
140 Bu::Mutex mRead; 121 mutable Bu::Mutex mProto;
141 Bu::Mutex mWrite; 122 mutable Bu::Mutex mDisconnect;
142 mutable Bu::ReadWriteMutex mGlobal;
143 }; 123 };
144} 124}
145 125
diff --git a/src/stable/clientbuf.cpp b/src/stable/clientbuf.cpp
new file mode 100644
index 0000000..493e577
--- /dev/null
+++ b/src/stable/clientbuf.cpp
@@ -0,0 +1,465 @@
1#include "bu/clientbuf.h"
2
3#include "bu/mutexlocker.h"
4
5Bu::ClientBuf::ClientBuf() :
6 accClientRaw( *this ),
7 accServer( *this ),
8 accClientFiltered( &accClient ),
9 accClient( *this )
10{
11}
12
13Bu::ClientBuf::~ClientBuf()
14{
15}
16
17Bu::ClientBuf::ClientAccess &Bu::ClientBuf::client()
18{
19 return accClient;
20}
21
22Bu::ClientBuf::ServerAccess &Bu::ClientBuf::server()
23{
24 return accServer;
25}
26
27/////////
28// ClientAccessRaw
29///
30
31Bu::ClientBuf::ClientAccessRaw::ClientAccessRaw( Bu::ClientBuf &rBuf ) :
32 rBuf( rBuf )
33{
34}
35
36Bu::ClientBuf::ClientAccessRaw::~ClientAccessRaw()
37{
38}
39
40void Bu::ClientBuf::ClientAccessRaw::close()
41{
42 // Roughly meaningless
43}
44
45Bu::size Bu::ClientBuf::ClientAccessRaw::read( void *pBuf, size iBytes )
46{
47 Bu::MutexLocker l( rBuf.mInput );
48 return rBuf.qbInput.read( pBuf, iBytes );
49}
50
51Bu::size Bu::ClientBuf::ClientAccessRaw::write( const void *pBuf, size iBytes )
52{
53 Bu::MutexLocker l( rBuf.mOutput );
54 return rBuf.qbOutput.write( pBuf, iBytes );
55}
56
57Bu::size Bu::ClientBuf::ClientAccessRaw::tell()
58{
59 Bu::MutexLocker l( rBuf.mInput );
60 return rBuf.qbInput.tell();
61}
62
63void Bu::ClientBuf::ClientAccessRaw::seek( Bu::size offset )
64{
65 Bu::MutexLocker l( rBuf.mInput );
66 rBuf.qbInput.seek( offset );
67}
68
69void Bu::ClientBuf::ClientAccessRaw::setPos( Bu::size )
70{
71}
72
73void Bu::ClientBuf::ClientAccessRaw::setPosEnd( Bu::size )
74{
75}
76
77bool Bu::ClientBuf::ClientAccessRaw::isEos()
78{
79 Bu::MutexLocker l( rBuf.mInput );
80 return rBuf.qbInput.isEos();
81}
82
83bool Bu::ClientBuf::ClientAccessRaw::isOpen()
84{
85 Bu::MutexLocker l( rBuf.mInput );
86 return rBuf.qbInput.isOpen();
87}
88
89void Bu::ClientBuf::ClientAccessRaw::flush()
90{
91 Bu::MutexLocker l( rBuf.mOutput );
92 return rBuf.qbOutput.flush();
93}
94
95bool Bu::ClientBuf::ClientAccessRaw::canRead()
96{
97 Bu::MutexLocker l( rBuf.mInput );
98 return rBuf.qbInput.canRead();
99}
100
101bool Bu::ClientBuf::ClientAccessRaw::canWrite()
102{
103 Bu::MutexLocker l( rBuf.mOutput );
104 return rBuf.qbOutput.canWrite();
105}
106
107bool Bu::ClientBuf::ClientAccessRaw::isReadable()
108{
109 Bu::MutexLocker l( rBuf.mInput );
110 return rBuf.qbInput.isReadable();
111}
112
113bool Bu::ClientBuf::ClientAccessRaw::isWritable()
114{
115 Bu::MutexLocker l( rBuf.mOutput );
116 return rBuf.qbOutput.isWritable();
117}
118
119bool Bu::ClientBuf::ClientAccessRaw::isSeekable()
120{
121 Bu::MutexLocker l( rBuf.mInput );
122 return rBuf.qbInput.isSeekable();
123}
124
125bool Bu::ClientBuf::ClientAccessRaw::isBlocking()
126{
127 return false;
128}
129
130void Bu::ClientBuf::ClientAccessRaw::setBlocking( bool )
131{
132}
133
134void Bu::ClientBuf::ClientAccessRaw::setSize( Bu::size )
135{
136 return;
137}
138
139Bu::size Bu::ClientBuf::ClientAccessRaw::getSize() const
140{
141 Bu::MutexLocker l( rBuf.mInput );
142 return rBuf.qbInput.getSize();
143}
144
145Bu::size Bu::ClientBuf::ClientAccessRaw::getBlockSize() const
146{
147 Bu::MutexLocker l( rBuf.mInput );
148 return rBuf.qbInput.getBlockSize();
149}
150
151Bu::String Bu::ClientBuf::ClientAccessRaw::getLocation() const
152{
153 return "ClientBuf";
154}
155
156/////////
157// ClientAccess
158///
159
160Bu::ClientBuf::ClientAccess::ClientAccess( Bu::ClientBuf &rBuf ) :
161 rBuf( rBuf )
162{
163}
164
165Bu::ClientBuf::ClientAccess::~ClientAccess()
166{
167}
168
169void Bu::ClientBuf::ClientAccess::close()
170{
171 // Roughly meaningless
172}
173
174Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes )
175{
176 char *pBuf = (char *)pBufRaw;
177 Bu::MutexLocker l( mAccess );
178 // Read from QueueBuf first
179 Bu::size ps = qbPeek.read( pBuf, iBytes );
180 iBytes -= ps;
181 pBuf += ps;
182 // Request space left? Try the client
183 if( iBytes > 0 )
184 {
185 ps += rBuf.accClientFiltered.read( pBuf, iBytes );
186 }
187 return ps;
188}
189
190Bu::size Bu::ClientBuf::ClientAccess::peek( void *pData, int iBytes,
191 int iOffset )
192{
193 Bu::MutexLocker l( mAccess );
194 // Do we have enough data in the peek buffer to handle this?
195 if( qbPeek.getSize() < iBytes+iOffset )
196 {
197 // Nope, make an attempt to fill it in.
198 int nDiff = iBytes-qbPeek.getSize();
199 // We have to make our own buffer, since iBytes+nOffeset could be bigger
200 // than pData.
201 char *pTmp = new char[nDiff];
202 Bu::size ps = rBuf.accClientFiltered.read( pTmp, nDiff );
203 if( ps > 0 )
204 {
205 // Add the data read to the peek buffer.
206 qbPeek.write( pTmp, ps );
207 }
208 delete[] pTmp;
209 }
210
211 return qbPeek.peek( pData, iBytes, iOffset );
212}
213
214Bu::size Bu::ClientBuf::ClientAccess::write( const void *pBuf, size iBytes )
215{
216 Bu::MutexLocker l( mAccess );
217 return rBuf.accClientFiltered.write( pBuf, iBytes );
218}
219
220Bu::size Bu::ClientBuf::ClientAccess::tell()
221{
222 Bu::MutexLocker l( mAccess );
223 return rBuf.accClientFiltered.tell() + qbPeek.getSize();
224}
225
226void Bu::ClientBuf::ClientAccess::seek( Bu::size offset )
227{
228 Bu::MutexLocker l( mAccess );
229 // For this type of stream seek is basically a destructive skip. It's like
230 // reading the data but with no output buffer. Let's remove data from the
231 // peek buffer first.
232 if( qbPeek.getSize() > 0 )
233 {
234 Bu::size amount = Bu::buMax( qbPeek.getSize(), offset );
235 qbPeek.seek( amount );
236 offset -= amount;
237 }
238
239 // If there's offset left, then apply it to the underlying stream
240 if( offset > 0 )
241 {
242 rBuf.accClientFiltered.seek( offset );
243 }
244}
245
246void Bu::ClientBuf::ClientAccess::setPos( Bu::size )
247{
248}
249
250void Bu::ClientBuf::ClientAccess::setPosEnd( Bu::size )
251{
252}
253
254bool Bu::ClientBuf::ClientAccess::isEos()
255{
256 Bu::MutexLocker l( mAccess );
257 return rBuf.accClientFiltered.isEos();
258}
259
260bool Bu::ClientBuf::ClientAccess::isOpen()
261{
262 Bu::MutexLocker l( mAccess );
263 return rBuf.accClientFiltered.isOpen();
264}
265
266void Bu::ClientBuf::ClientAccess::flush()
267{
268 Bu::MutexLocker l( mAccess );
269 return rBuf.accClientFiltered.flush();
270}
271
272bool Bu::ClientBuf::ClientAccess::canRead()
273{
274 Bu::MutexLocker l( mAccess );
275 return rBuf.accClientFiltered.canRead();
276}
277
278bool Bu::ClientBuf::ClientAccess::canWrite()
279{
280 Bu::MutexLocker l( mAccess );
281 return rBuf.accClientFiltered.canWrite();
282}
283
284bool Bu::ClientBuf::ClientAccess::isReadable()
285{
286 Bu::MutexLocker l( mAccess );
287 return rBuf.accClientFiltered.isReadable();
288}
289
290bool Bu::ClientBuf::ClientAccess::isWritable()
291{
292 Bu::MutexLocker l( mAccess );
293 return rBuf.accClientFiltered.isWritable();
294}
295
296bool Bu::ClientBuf::ClientAccess::isSeekable()
297{
298 Bu::MutexLocker l( mAccess );
299 return rBuf.accClientFiltered.isSeekable();
300}
301
302bool Bu::ClientBuf::ClientAccess::isBlocking()
303{
304 return false;
305}
306
307void Bu::ClientBuf::ClientAccess::setBlocking( bool )
308{
309}
310
311void Bu::ClientBuf::ClientAccess::setSize( Bu::size )
312{
313 return;
314}
315
316Bu::size Bu::ClientBuf::ClientAccess::getSize() const
317{
318 Bu::MutexLocker l( mAccess );
319 return rBuf.accClientFiltered.getSize() + qbPeek.getSize();
320}
321
322Bu::size Bu::ClientBuf::ClientAccess::getBlockSize() const
323{
324 Bu::MutexLocker l( mAccess );
325 return rBuf.accClientFiltered.getBlockSize();
326}
327
328Bu::String Bu::ClientBuf::ClientAccess::getLocation() const
329{
330 return "ClientBuf";
331}
332
333/////////
334// ServerAccess
335///
336
337Bu::ClientBuf::ServerAccess::ServerAccess( Bu::ClientBuf &rBuf ) :
338 rBuf( rBuf )
339{
340}
341
342Bu::ClientBuf::ServerAccess::~ServerAccess()
343{
344}
345
346void Bu::ClientBuf::ServerAccess::close()
347{
348}
349
350Bu::size Bu::ClientBuf::ServerAccess::read( void *pBuf, size iBytes )
351{
352 Bu::MutexLocker l( rBuf.mOutput );
353 return rBuf.qbOutput.read( pBuf, iBytes );
354}
355
356Bu::size Bu::ClientBuf::ServerAccess::peek( void *pData, int iBytes, int iOffset )
357{
358 Bu::MutexLocker l( rBuf.mOutput );
359 return rBuf.qbOutput.peek( pData, iBytes, iOffset );
360}
361
362Bu::size Bu::ClientBuf::ServerAccess::write( const void *pBuf, size iBytes )
363{
364 Bu::MutexLocker l( rBuf.mInput );
365 return rBuf.qbInput.write( pBuf, iBytes );
366}
367
368Bu::size Bu::ClientBuf::ServerAccess::tell()
369{
370 Bu::MutexLocker l( rBuf.mOutput );
371 return rBuf.qbOutput.tell();
372}
373
374void Bu::ClientBuf::ServerAccess::seek( Bu::size offset )
375{
376 Bu::MutexLocker l( rBuf.mOutput );
377 rBuf.qbOutput.seek( offset );
378}
379
380void Bu::ClientBuf::ServerAccess::setPos( Bu::size )
381{
382}
383
384void Bu::ClientBuf::ServerAccess::setPosEnd( Bu::size )
385{
386}
387
388bool Bu::ClientBuf::ServerAccess::isEos()
389{
390 Bu::MutexLocker l( rBuf.mOutput );
391 return rBuf.qbOutput.isEos();
392}
393
394bool Bu::ClientBuf::ServerAccess::isOpen()
395{
396 Bu::MutexLocker l( rBuf.mOutput );
397 return rBuf.qbOutput.isOpen();
398}
399
400void Bu::ClientBuf::ServerAccess::flush()
401{
402 Bu::MutexLocker l( rBuf.mInput );
403 return rBuf.qbInput.flush();
404}
405
406bool Bu::ClientBuf::ServerAccess::canRead()
407{
408 Bu::MutexLocker l( rBuf.mOutput );
409 return rBuf.qbOutput.canRead();
410}
411
412bool Bu::ClientBuf::ServerAccess::canWrite()
413{
414 Bu::MutexLocker l( rBuf.mInput );
415 return rBuf.qbInput.canWrite();
416}
417
418bool Bu::ClientBuf::ServerAccess::isReadable()
419{
420 Bu::MutexLocker l( rBuf.mOutput );
421 return rBuf.qbOutput.isReadable();
422}
423
424bool Bu::ClientBuf::ServerAccess::isWritable()
425{
426 Bu::MutexLocker l( rBuf.mInput );
427 return rBuf.qbInput.isWritable();
428}
429
430bool Bu::ClientBuf::ServerAccess::isSeekable()
431{
432 Bu::MutexLocker l( rBuf.mOutput );
433 return rBuf.qbOutput.isSeekable();
434}
435
436bool Bu::ClientBuf::ServerAccess::isBlocking()
437{
438 return false;
439}
440
441void Bu::ClientBuf::ServerAccess::setBlocking( bool )
442{
443}
444
445void Bu::ClientBuf::ServerAccess::setSize( Bu::size )
446{
447 return;
448}
449
450Bu::size Bu::ClientBuf::ServerAccess::getSize() const
451{
452 Bu::MutexLocker l( rBuf.mOutput );
453 return rBuf.qbOutput.getSize();
454}
455
456Bu::size Bu::ClientBuf::ServerAccess::getBlockSize() const
457{
458 Bu::MutexLocker l( rBuf.mOutput );
459 return rBuf.qbOutput.getBlockSize();
460}
461
462Bu::String Bu::ClientBuf::ServerAccess::getLocation() const
463{
464 return "ClientBuf";
465}
diff --git a/src/stable/clientbuf.h b/src/stable/clientbuf.h
new file mode 100644
index 0000000..7781b6a
--- /dev/null
+++ b/src/stable/clientbuf.h
@@ -0,0 +1,179 @@
1/*
2 * Copyright (C) 2007-2019 Xagasoft, All rights reserved.
3 *
4 * This file is part of the libbu++ library and is released under the
5 * terms of the license contained in the file LICENSE.
6 */
7
8#ifndef BU_CLIENT_BUF_H
9#define BU_CLIENT_BUF_H
10
11#include "bu/stream.h"
12#include "bu/streamstack.h"
13#include "bu/queuebuf.h"
14#include "bu/mutex.h"
15
16namespace Bu
17{
18 /**
19 * ClientBuf represents two buffered I/O channels, input and output. Each
20 * is stored in its own Bu::QueueBuf. One side is client oriented and can
21 * have filters applied ala Bu::StreamStack. The other is server oriented
22 * and is accesesed raw. Both must be accessed via accessor objects.
23 *
24 * Direct access to the buffers isn't allowed, instead you must select a
25 * perspective. client() provides an accessor where reading consumes data
26 * coming from the socket and writing provides data to write to the socket.
27 * server() works in reverse, writing is writing data from the socket that
28 * we have just read, reading is reading data that should be written to the
29 * socket.
30 *
31 * client() provides a filtered access view of the data, while server() is
32 * unfiltered.
33 */
34 class ClientBuf
35 {
36 public:
37 class ClientAccess : public Bu::Stream
38 {
39 friend class Bu::ClientBuf;
40 private:
41 ClientAccess( Bu::ClientBuf &rBuf );
42
43 public:
44 virtual ~ClientAccess();
45
46 virtual void close();
47 virtual size read( void *pBuf, size iBytes );
48 virtual size peek( void *pData, int iBytes, int iOffset=0 );
49 virtual size write( const void *pBuf, size iBytes );
50 using Stream::write;
51 virtual size tell();
52 virtual void seek( size offset );
53 virtual void setPos( size pos );
54 virtual void setPosEnd( size pos );
55 virtual bool isEos();
56 virtual bool isOpen();
57 virtual void flush();
58 virtual bool canRead();
59 virtual bool canWrite();
60 virtual bool isReadable();
61 virtual bool isWritable();
62 virtual bool isSeekable();
63 virtual bool isBlocking();
64 virtual void setBlocking( bool bBlocking=true );
65 virtual void setSize( size iSize );
66 virtual size getSize() const;
67 virtual size getBlockSize() const;
68 virtual Bu::String getLocation() const;
69
70 private:
71 Bu::ClientBuf &rBuf;
72 Bu::QueueBuf qbPeek;
73 mutable Bu::Mutex mAccess;
74 };
75
76 class ServerAccess : public Bu::Stream
77 {
78 friend class Bu::ClientBuf;
79 private:
80 ServerAccess( Bu::ClientBuf &rBuf );
81
82 public:
83 virtual ~ServerAccess();
84
85 virtual void close();
86 virtual size read( void *pBuf, size iBytes );
87 virtual size peek( void *pData, int iBytes, int iOffset=0 );
88 virtual size write( const void *pBuf, size iBytes );
89 using Stream::write;
90 virtual size tell();
91 virtual void seek( size offset );
92 virtual void setPos( size pos );
93 virtual void setPosEnd( size pos );
94 virtual bool isEos();
95 virtual bool isOpen();
96 virtual void flush();
97 virtual bool canRead();
98 virtual bool canWrite();
99 virtual bool isReadable();
100 virtual bool isWritable();
101 virtual bool isSeekable();
102 virtual bool isBlocking();
103 virtual void setBlocking( bool bBlocking=true );
104 virtual void setSize( size iSize );
105 virtual size getSize() const;
106 virtual size getBlockSize() const;
107 virtual Bu::String getLocation() const;
108
109 private:
110 Bu::ClientBuf &rBuf;
111 };
112
113 private:
114 class ClientAccessRaw : public Bu::Stream
115 {
116 friend class Bu::ClientBuf;
117 private:
118 ClientAccessRaw( Bu::ClientBuf &rBuf );
119
120 public:
121 virtual ~ClientAccessRaw();
122
123 virtual void close();
124 virtual size read( void *pBuf, size iBytes );
125 virtual size write( const void *pBuf, size iBytes );
126 using Stream::write;
127 virtual size tell();
128 virtual void seek( size offset );
129 virtual void setPos( size pos );
130 virtual void setPosEnd( size pos );
131 virtual bool isEos();
132 virtual bool isOpen();
133 virtual void flush();
134 virtual bool canRead();
135 virtual bool canWrite();
136 virtual bool isReadable();
137 virtual bool isWritable();
138 virtual bool isSeekable();
139 virtual bool isBlocking();
140 virtual void setBlocking( bool bBlocking=true );
141 virtual void setSize( size iSize );
142 virtual size getSize() const;
143 virtual size getBlockSize() const;
144 virtual Bu::String getLocation() const;
145
146 private:
147 Bu::ClientBuf &rBuf;
148 };
149
150 public:
151 ClientBuf();
152 virtual ~ClientBuf();
153
154 /**
155 * Access the client access side. Use directly, do NOT copy or store.
156 */
157 ClientAccess &client();
158
159 /**
160 * Access the server access side. Use directly, do NOT copy or store.
161 */
162 ServerAccess &server();
163
164 private:
165 ClientAccessRaw accClientRaw;
166 ServerAccess accServer;
167 Bu::StreamStack accClientFiltered;
168 ClientAccess accClient;
169 Bu::QueueBuf qbOutput;
170 Bu::QueueBuf qbInput;
171 Bu::Mutex mOutput;
172 Bu::Mutex mInput;
173 friend class Bu::ClientBuf::ClientAccess;
174 friend class Bu::ClientBuf::ClientAccessRaw;
175 friend class Bu::ClientBuf::ServerAccess;
176 };
177}
178
179#endif
diff --git a/src/stable/multiserver.cpp b/src/stable/multiserver.cpp
index e0386a7..697725c 100644
--- a/src/stable/multiserver.cpp
+++ b/src/stable/multiserver.cpp
@@ -8,6 +8,7 @@
8#include "bu/multiserver.h" 8#include "bu/multiserver.h"
9#include "bu/protocol.h" 9#include "bu/protocol.h"
10#include "bu/client.h" 10#include "bu/client.h"
11#include "bu/serversocket.h"
11 12
12#include "bu/config.h" 13#include "bu/config.h"
13 14
@@ -21,23 +22,23 @@ Bu::MultiServer::~MultiServer()
21{ 22{
22} 23}
23 24
24void Bu::MultiServer::addProtocol( Bu::Protocol *(*proc)(), int iPort, 25void Bu::MultiServer::addProtocol( Bu::Protocol *(*proc)(),
25 int nPoolSize ) 26 Bu::ServerSocket *pServer )
26{ 27{
27 hProtos[iPort] = proc; 28 addServerSocket( pServer );
28 addPort( iPort, nPoolSize ); 29 int iFd;
30 pServer->getFd( iFd );
31 hProtos.insert( iFd, proc );
29} 32}
30 33
31void Bu::MultiServer::addProtocol( Protocol *(*proc)(), const String &sAddr, 34void Bu::MultiServer::onNewConnection( const Bu::ServerSocket *pSrv,
32 int iPort, int nPoolSize ) 35 Client *pClient, Bu::Socket * /*pSocket*/ )
33{ 36{
34 hProtos[iPort] = proc; 37 int iFd;
35 addPort( sAddr, iPort, nPoolSize ); 38 if( pSrv->getFd( iFd ) )
36} 39 {
37 40 pClient->setProtocol( hProtos.get( iFd )() );
38void Bu::MultiServer::onNewConnection( Bu::Client *pClient, int nPort ) 41 }
39{
40 pClient->setProtocol( hProtos.get( nPort )() );
41} 42}
42 43
43void Bu::MultiServer::onClosedConnection( Bu::Client *pClient ) 44void Bu::MultiServer::onClosedConnection( Bu::Client *pClient )
diff --git a/src/stable/multiserver.h b/src/stable/multiserver.h
index b12aa90..2490427 100644
--- a/src/stable/multiserver.h
+++ b/src/stable/multiserver.h
@@ -15,6 +15,7 @@ namespace Bu
15{ 15{
16 class Protocol; 16 class Protocol;
17 class Client; 17 class Client;
18 class ServerSocket;
18 19
19 template<class T> 20 template<class T>
20 Protocol *genProtocol() 21 Protocol *genProtocol()
@@ -28,9 +29,7 @@ namespace Bu
28 MultiServer(); 29 MultiServer();
29 virtual ~MultiServer(); 30 virtual ~MultiServer();
30 31
31 void addProtocol( Protocol *(*proc)(), int iPort, int nPoolSize=40 ); 32 void addProtocol( Protocol *(*proc)(), ServerSocket *pServer );
32 void addProtocol( Protocol *(*proc)(), const String &sAddr, int iPort,
33 int nPoolSize=40 );
34 33
35 void scan() 34 void scan()
36 { 35 {
@@ -42,7 +41,8 @@ namespace Bu
42 Server::setTimeout( nTimeoutSec, nTimeoutUSec ); 41 Server::setTimeout( nTimeoutSec, nTimeoutUSec );
43 } 42 }
44 43
45 virtual void onNewConnection( Client *pClient, int nPort ); 44 virtual void onNewConnection( const Bu::ServerSocket *pSrv,
45 Client *pClient, Bu::Socket *pSocket );
46 virtual void onClosedConnection( Client *pClient ); 46 virtual void onClosedConnection( Client *pClient );
47 47
48 void shutdown(); 48 void shutdown();
diff --git a/src/stable/server.cpp b/src/stable/server.cpp
index 0552510..592230d 100644
--- a/src/stable/server.cpp
+++ b/src/stable/server.cpp
@@ -8,10 +8,11 @@
8#include "bu/server.h" 8#include "bu/server.h"
9#include <errno.h> 9#include <errno.h>
10#include <unistd.h> 10#include <unistd.h>
11#include "bu/tcpserversocket.h" 11#include "bu/serversocket.h"
12#include "bu/client.h" 12#include "bu/client.h"
13#include "bu/tcpsocket.h" 13#include "bu/socket.h"
14#include "bu/config.h" 14#include "bu/config.h"
15#include "bu/mutexlocker.h"
15 16
16#ifdef PROFILE_BU_SERVER 17#ifdef PROFILE_BU_SERVER
17#define BU_PROFILE_START( x ) Bu::Profiler::getInstance().startEvent( x ) 18#define BU_PROFILE_START( x ) Bu::Profiler::getInstance().startEvent( x )
@@ -21,13 +22,34 @@
21#define BU_PROFILE_END( x ) (void)0 22#define BU_PROFILE_END( x ) (void)0
22#endif 23#endif
23 24
24Bu::Server::Server() : 25#define RBS 1500
26
27Bu::Server::Server( int iIoWorkers, int iClientWorkers ) :
25 nTimeoutSec( 0 ), 28 nTimeoutSec( 0 ),
26 nTimeoutUSec( 0 ), 29 nTimeoutUSec( 0 ),
27 bAutoTick( false ) 30 bAutoTick( false )
28{ 31{
29 BU_PROFILE_START("server"); 32 BU_PROFILE_START("server");
30 FD_ZERO( &fdActive ); 33 FD_ZERO( &fdActive );
34
35 if( iIoWorkers < 1 )
36 iIoWorkers = 1;
37 if( iClientWorkers < 1 )
38 iClientWorkers = 1;
39
40 for( int j = 0; j < iIoWorkers; j++ )
41 {
42 IoWorker *pWorker = new IoWorker( *this, qIoEvent, qClientEvent );
43 lIoWorker.append( pWorker );
44 pWorker->start();
45 }
46
47 for( int j = 0; j < iClientWorkers; j++ )
48 {
49 ClientWorker *pWorker = new ClientWorker( *this, qClientEvent );
50 lClientWorker.append( pWorker );
51 pWorker->start();
52 }
31} 53}
32 54
33Bu::Server::~Server() 55Bu::Server::~Server()
@@ -36,20 +58,16 @@ Bu::Server::~Server()
36 BU_PROFILE_START("server"); 58 BU_PROFILE_START("server");
37} 59}
38 60
39void Bu::Server::addPort( int nPort, int nPoolSize ) 61void Bu::Server::addServerSocket( Bu::ServerSocket *pSocket )
40{ 62{
41 TcpServerSocket *s = new TcpServerSocket( nPort, nPoolSize ); 63 fd iFd;
42 socket_t nSocket = s->getSocket(); 64 if( !pSocket->getFd( iFd ) )
43 FD_SET( nSocket, &fdActive ); 65 {
44 hServers.insert( nSocket, s ); 66 throw Bu::ExceptionBase("Cannot get file descriptor from "
45} 67 "provided ServerSocket.");
46 68 }
47void Bu::Server::addPort( const String &sAddr, int nPort, int nPoolSize ) 69 FD_SET( iFd, &fdActive );
48{ 70 hServers.insert( iFd, pSocket );
49 TcpServerSocket *s = new TcpServerSocket( sAddr, nPort, nPoolSize );
50 socket_t nSocket = s->getSocket();
51 FD_SET( nSocket, &fdActive );
52 hServers.insert( nSocket, s );
53} 71}
54 72
55void Bu::Server::setTimeout( int nTimeoutSec, int nTimeoutUSec ) 73void Bu::Server::setTimeout( int nTimeoutSec, int nTimeoutUSec )
@@ -68,11 +86,13 @@ void Bu::Server::scan()
68 fd_set fdException = fdActive; 86 fd_set fdException = fdActive;
69 87
70 FD_ZERO( &fdWrite ); 88 FD_ZERO( &fdWrite );
89 mClients.lock();
71 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) 90 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ )
72 { 91 {
73 if( (*i)->hasOutput() ) 92 if( (*i)->hasOutput() )
74 FD_SET( i.getKey(), &fdWrite ); 93 FD_SET( i.getKey(), &fdWrite );
75 } 94 }
95 mClients.unlock();
76 96
77 if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, 97 if( TEMP_FAILURE_RETRY( select( FD_SETSIZE,
78 &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 ) 98 &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 )
@@ -91,42 +111,17 @@ void Bu::Server::scan()
91 { 111 {
92 if( hServers.has( j ) ) 112 if( hServers.has( j ) )
93 { 113 {
94 TcpServerSocket *pSrv = hServers.get( j ); 114 Bu::ServerSocket *pSrv = hServers.get( j );
95 addClient( pSrv->accept(), pSrv->getPort() ); 115 addClient( pSrv, pSrv->accept() );
96 } 116 }
97 else 117 else
98 { 118 {
99 Client *pClient = hClients.get( j ); 119 qIoEvent.enqueue( new Event( j, Event::Read ) );
100 BU_PROFILE_START("processInput");
101 pClient->processInput();
102 BU_PROFILE_END("processInput");
103 if( !pClient->isOpen() )
104 {
105 closeClient( j );
106 }
107 } 120 }
108 } 121 }
109 if( FD_ISSET( j, &fdWrite ) ) 122 if( FD_ISSET( j, &fdWrite ) )
110 { 123 {
111 try 124 qIoEvent.enqueue( new Event( j, Event::Write ) );
112 {
113 Client *pClient = hClients.get( j );
114 try
115 {
116 BU_PROFILE_START("processOutput");
117 pClient->processOutput();
118 BU_PROFILE_END("processOutput");
119 }
120 catch( Bu::TcpSocketException &e )
121 {
122 closeClient( j );
123 }
124 }
125 catch( Bu::HashException &e )
126 {
127 // Do nothing, I guess, the client is already dead...
128 // TODO: Someday, we may want to handle this more graceully.
129 }
130 } 125 }
131 } 126 }
132 127
@@ -134,6 +129,7 @@ void Bu::Server::scan()
134 // Now we just try to write all the pending data on all the sockets. 129 // Now we just try to write all the pending data on all the sockets.
135 // this could be done better eventually, if we care about the socket 130 // this could be done better eventually, if we care about the socket
136 // wanting to accept writes (using a select). 131 // wanting to accept writes (using a select).
132 mClients.lock();
137 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) 133 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ )
138 { 134 {
139 if( (*i)->wantsDisconnect() && !(*i)->hasOutput() ) 135 if( (*i)->wantsDisconnect() && !(*i)->hasOutput() )
@@ -141,6 +137,7 @@ void Bu::Server::scan()
141 lDelete.append( i.getKey() ); 137 lDelete.append( i.getKey() );
142 } 138 }
143 } 139 }
140 mClients.unlock();
144 141
145 for( Bu::List<int>::iterator i = lDelete.begin(); i != lDelete.end(); i++ ) 142 for( Bu::List<int>::iterator i = lDelete.begin(); i != lDelete.end(); i++ )
146 { 143 {
@@ -153,21 +150,47 @@ void Bu::Server::scan()
153 BU_PROFILE_END("scan"); 150 BU_PROFILE_END("scan");
154} 151}
155 152
156void Bu::Server::addClient( socket_t nSocket, int nPort ) 153void Bu::Server::addClient( const Bu::ServerSocket *pSrv, Bu::Socket *pSocket )
157{ 154{
158 BU_PROFILE_START("addClient"); 155 BU_PROFILE_START("addClient");
159 FD_SET( nSocket, &fdActive ); 156 int iFdSrv;
157 int iFdCli;
158 if( !pSrv->getFd( iFdSrv ) || !pSrv->getFd( iFdCli ) )
159 {
160 throw Bu::ExceptionBase("No file descriptor?");
161 }
162 FD_SET( iFdCli, &fdActive );
160 163
161 Client *c = new Client( 164 Client *pClient = new Client(
162 new Bu::TcpSocket( nSocket ),
163 new SrvClientLinkFactory() 165 new SrvClientLinkFactory()
164 ); 166 );
165 hClients.insert( nSocket, c ); 167 {
168 Bu::MutexLocker l( mClients );
169 hClients.insert( iFdCli, pClient );
170 hSockets.insert( iFdCli, pSocket );
171 }
166 172
167 onNewConnection( c, nPort ); 173 onNewConnection( pSrv, pClient, pSocket );
168 BU_PROFILE_END("addClient"); 174 BU_PROFILE_END("addClient");
169} 175}
170 176
177Bu::Client *Bu::Server::getClient( fd iId )
178{
179 Bu::MutexLocker l( mClients );
180 return hClients.get( iId );
181}
182
183bool Bu::Server::getClientAndSocket( fd iId, Bu::Client *&pClient,
184 Bu::Socket *&pSocket )
185{
186 Bu::MutexLocker l( mClients );
187 if( !hClients.has( iId ) || !hSockets.has( iId ) )
188 return false;
189 pClient = hClients.get( iId );
190 pSocket = hSockets.get( iId );
191 return true;
192}
193
171Bu::Server::SrvClientLink::SrvClientLink( Bu::Client *pClient ) : 194Bu::Server::SrvClientLink::SrvClientLink( Bu::Client *pClient ) :
172 pClient( pClient ) 195 pClient( pClient )
173{ 196{
@@ -203,14 +226,45 @@ void Bu::Server::setAutoTick( bool bEnable )
203 226
204void Bu::Server::tick() 227void Bu::Server::tick()
205{ 228{
229 mClients.lock();
206 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) 230 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ )
207 { 231 {
208 (*i)->tick(); 232 (*i)->tick();
209 } 233 }
234 mClients.unlock();
210} 235}
211 236
212void Bu::Server::shutdown() 237void Bu::Server::shutdown()
213{ 238{
239 {
240 qIoEvent.stop();
241 qClientEvent.stop();
242 Bu::Server::Event *pEv;
243 while( (pEv = qIoEvent.drain()) != NULL )
244 {
245 delete pEv;
246 }
247 while( (pEv = qClientEvent.drain()) != NULL )
248 {
249 delete pEv;
250 }
251
252 Bu::MutexLocker l( mWorkers );
253 for( IoWorkerList::iterator i = lIoWorker.begin(); i; i++ )
254 {
255 (*i)->join();
256 delete *i;
257 }
258 lIoWorker.clear();
259 for( ClientWorkerList::iterator i = lClientWorker.begin();
260 i; i++ )
261 {
262 (*i)->join();
263 delete *i;
264 }
265 lClientWorker.clear();
266 }
267
214 for( SrvHash::iterator i = hServers.begin(); i != hServers.end(); i++ ) 268 for( SrvHash::iterator i = hServers.begin(); i != hServers.end(); i++ )
215 { 269 {
216 delete *i; 270 delete *i;
@@ -218,23 +272,201 @@ void Bu::Server::shutdown()
218 272
219 hServers.clear(); 273 hServers.clear();
220 274
221 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) 275 ClientHash::KeyList lClients = hClients.getKeys();
276 for( ClientHash::KeyList::iterator i = lClients.begin(); i; i++ )
222 { 277 {
223 closeClient( i.getKey() ); 278 closeClient( *i );
224 } 279 }
225
226 hClients.clear(); 280 hClients.clear();
227} 281}
228 282
229void Bu::Server::closeClient( socket_t iSocket ) 283void Bu::Server::closeClient( fd iSocket )
230{ 284{
231 BU_PROFILE_START("closeClient"); 285 BU_PROFILE_START("closeClient");
232 Bu::Client *pClient = hClients.get( iSocket ); 286 Bu::Client *pClient = hClients.get( iSocket );
287 Bu::Socket *pSocket = hSockets.get( iSocket );
233 onClosedConnection( pClient ); 288 onClosedConnection( pClient );
234 pClient->close(); 289 pClient->close();
235 hClients.erase( iSocket ); 290 hClients.erase( iSocket );
291 pSocket->close();
292 hSockets.erase( iSocket );
236 FD_CLR( iSocket, &fdActive ); 293 FD_CLR( iSocket, &fdActive );
237 delete pClient; 294 delete pClient;
295 delete pSocket;
238 BU_PROFILE_END("closeClient"); 296 BU_PROFILE_END("closeClient");
239} 297}
240 298
299////////
300// Event
301////
302
303Bu::Server::Event::Event( fd iId, Operation eOp ) :
304 iId( iId ),
305 eOp( eOp )
306{
307}
308
309Bu::Server::Event::~Event()
310{
311}
312
313Bu::Server::fd Bu::Server::Event::getId() const
314{
315 return iId;
316}
317
318Bu::Server::Event::Operation Bu::Server::Event::getOperation() const
319{
320 return eOp;
321}
322
323/////////
324// IoWorker
325////
326
327Bu::Server::IoWorker::IoWorker( Bu::Server &rSrv,
328 Bu::Server::EventQueue &qIoEvent,
329 Bu::Server::EventQueue &qClientEvent ) :
330 rSrv( rSrv ),
331 qIoEvent( qIoEvent ),
332 qClientEvent( qClientEvent )
333{
334}
335
336Bu::Server::IoWorker::~IoWorker()
337{
338}
339
340void Bu::Server::IoWorker::run()
341{
342 while( qIoEvent.isRunning() )
343 {
344 Event *pEv = qIoEvent.dequeue();
345 if( pEv == NULL )
346 continue;
347
348 Client *pClient;
349 Socket *pSocket;
350 if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) )
351 {
352 delete pEv;
353 continue;
354 }
355
356 switch( pEv->getOperation() )
357 {
358 case Event::Read:
359 handleRead( pClient, pSocket );
360 break;
361
362 case Event::Write:
363 handleWrite( pClient, pSocket );
364 break;
365
366 case Event::Process:
367 break;
368 }
369
370 delete pEv;
371 }
372}
373
374void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket )
375{
376 char buf[RBS];
377 Bu::size iRead;
378 Bu::size iTotal=0;
379
380 BU_PROFILE_START("client.read");
381 for(;;)
382 {
383 try
384 {
385 iRead = pSocket->read( buf, RBS );
386
387 if( iRead == 0 )
388 {
389 break;
390 }
391 else
392 {
393 iTotal += iRead;
394 pClient->cbBuffer.server().write( buf, iRead );
395 if( !pSocket->canRead() )
396 break;
397 }
398 }
399 catch( Bu::ExceptionBase &e )
400 {
401 close( pSocket );
402 break;
403 }
404 }
405 BU_PROFILE_END("client.read");
406
407 if( iTotal == 0 )
408 {
409 close( pSocket );
410 }
411 else
412 {
413 Bu::Server::fd iFd;
414 pSocket->getFd( iFd );
415 qClientEvent.enqueue( new Event( iFd, Event::Process ) );
416 }
417}
418
419void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket )
420{
421 char buf[RBS];
422 if( pClient->hasOutput() > 0 )
423 {
424 int iAmnt = RBS;
425 iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt );
426 int iReal = pSocket->write( buf, iAmnt );
427 pClient->cbBuffer.server().seek( iReal );
428 }
429}
430
431void Bu::Server::IoWorker::close( Bu::Socket *pSocket )
432{
433 Bu::Server::fd iFd;
434 pSocket->getFd( iFd );
435 rSrv.closeClient( iFd );
436}
437
438/////////
439// ClientWorker
440////
441
442Bu::Server::ClientWorker::ClientWorker( Bu::Server &rSrv,
443 Bu::Server::EventQueue &qEvent ) :
444 rSrv( rSrv ),
445 qEvent( qEvent )
446{
447}
448
449Bu::Server::ClientWorker::~ClientWorker()
450{
451}
452
453void Bu::Server::ClientWorker::run()
454{
455 while( qEvent.isRunning() )
456 {
457 Event *pEv = qEvent.dequeue();
458 if( pEv == NULL )
459 continue;
460
461 Client *pClient = rSrv.getClient( pEv->getId() );
462 if( pClient == NULL )
463 {
464 delete pEv;
465 continue;
466 }
467
468 pClient->processInput();
469 delete pEv;
470 }
471}
472
diff --git a/src/stable/server.h b/src/stable/server.h
index d3f0903..d66d9d5 100644
--- a/src/stable/server.h
+++ b/src/stable/server.h
@@ -20,6 +20,8 @@
20#include "bu/clientlink.h" 20#include "bu/clientlink.h"
21#include "bu/clientlinkfactory.h" 21#include "bu/clientlinkfactory.h"
22#include "bu/hash.h" 22#include "bu/hash.h"
23#include "bu/synchroqueue.h"
24#include "bu/thread.h"
23 25
24#include "bu/config.h" 26#include "bu/config.h"
25 27
@@ -33,8 +35,8 @@
33 35
34namespace Bu 36namespace Bu
35{ 37{
36 class TcpServerSocket; 38 class ServerSocket;
37 class TcpSocket; 39 class Socket;
38 class Client; 40 class Client;
39 41
40 /** 42 /**
@@ -60,33 +62,35 @@ namespace Bu
60 class Server 62 class Server
61 { 63 {
62 public: 64 public:
63 Server(); 65 Server( int iIoWorkers=4, int iClientWorkers=8 );
64 virtual ~Server(); 66 virtual ~Server();
65 67
66#ifdef WIN32 68#ifdef WIN32
67 typedef unsigned int socket_t; 69 typedef unsigned int fd;
68#else 70#else
69 typedef int socket_t; 71 typedef int fd;
70#endif 72#endif
71 73
72 void addPort( int nPort, int nPoolSize=40 ); 74 void addServerSocket( Bu::ServerSocket *pSocket );
73 void addPort( const String &sAddr, int nPort, int nPoolSize=40 );
74 75
75 virtual void scan(); 76 virtual void scan();
76 void setTimeout( int nTimeoutSec, int nTimeoutUSec=0 ); 77 void setTimeout( int nTimeoutSec, int nTimeoutUSec=0 );
77 78
78 void addClient( socket_t nSocket, int nPort ); 79 void addClient( const Bu::ServerSocket *pSrv, Bu::Socket *pSocket );
80 Bu::Client *getClient( fd iId );
81 bool getClientAndSocket( fd iId, Bu::Client *&pClient,
82 Bu::Socket *&pSocket );
79 83
80 void setAutoTick( bool bEnable=true ); 84 void setAutoTick( bool bEnable=true );
81 void tick(); 85 void tick();
82 86
83 virtual void onNewConnection( Client *pClient, int nPort )=0; 87 virtual void onNewConnection( const Bu::ServerSocket *pSrv, Client *pClient, Bu::Socket *pSocket )=0;
84 virtual void onClosedConnection( Client *pClient )=0; 88 virtual void onClosedConnection( Client *pClient )=0;
85 89
86 void shutdown(); 90 void shutdown();
87 91
88 private: 92 private:
89 void closeClient( socket_t iSocket ); 93 void closeClient( fd iSocket );
90 class SrvClientLink : public Bu::ClientLink 94 class SrvClientLink : public Bu::ClientLink
91 { 95 {
92 public: 96 public:
@@ -108,14 +112,83 @@ namespace Bu
108 virtual Bu::ClientLink *createLink( Bu::Client *pClient ); 112 virtual Bu::ClientLink *createLink( Bu::Client *pClient );
109 }; 113 };
110 114
115 class Event
116 {
117 public:
118 enum Operation
119 {
120 Read,
121 Write,
122 Process
123 };
124 Event( fd iId, Operation eOp );
125 ~Event();
126
127 fd getId() const;
128 Operation getOperation() const;
129
130 private:
131 fd iId;
132 Operation eOp;
133 };
134
135 typedef Bu::SynchroQueue<Event *> EventQueue;
136
137 class IoWorker : public Bu::Thread
138 {
139 public:
140 IoWorker( Server &rSrv, EventQueue &qIoEvent,
141 EventQueue &qClientEvent );
142 virtual ~IoWorker();
143
144 protected:
145 virtual void run();
146
147 private:
148 void handleRead( Client *pClient, Socket *pSocket );
149 void handleWrite( Client *pClient, Socket *pSocket );
150 void close( Socket *pSocket );
151
152 private:
153 Server &rSrv;
154 EventQueue &qIoEvent;
155 EventQueue &qClientEvent;
156 };
157
158 class ClientWorker : public Bu::Thread
159 {
160 public:
161 ClientWorker( Server &rSrv, EventQueue &qEvent );
162 virtual ~ClientWorker();
163
164 protected:
165 virtual void run();
166
167 private:
168 Server &rSrv;
169 EventQueue &qEvent;
170 };
171
111 int nTimeoutSec; 172 int nTimeoutSec;
112 int nTimeoutUSec; 173 int nTimeoutUSec;
113 fd_set fdActive; 174 fd_set fdActive;
114 typedef Hash<socket_t,TcpServerSocket *> SrvHash; 175 typedef Hash<fd,ServerSocket *> SrvHash;
115 SrvHash hServers; 176 SrvHash hServers;
116 typedef Hash<socket_t,Client *> ClientHash; 177 typedef Hash<fd,Client *> ClientHash;
178 typedef Hash<fd,Socket *> SocketHash;
117 ClientHash hClients; 179 ClientHash hClients;
180 SocketHash hSockets;
118 bool bAutoTick; 181 bool bAutoTick;
182 Bu::Mutex mClients;
183 Bu::Mutex mScan;
184 Bu::Mutex mWorkers;
185
186 EventQueue qIoEvent;
187 EventQueue qClientEvent;
188 typedef List<IoWorker *> IoWorkerList;
189 typedef List<ClientWorker *> ClientWorkerList;
190 IoWorkerList lIoWorker;
191 ClientWorkerList lClientWorker;
119 }; 192 };
120} 193}
121 194
diff --git a/src/stable/serversocket.cpp b/src/stable/serversocket.cpp
new file mode 100644
index 0000000..f0384b7
--- /dev/null
+++ b/src/stable/serversocket.cpp
@@ -0,0 +1,10 @@
1#include "bu/serversocket.h"
2
3Bu::ServerSocket::ServerSocket()
4{
5}
6
7Bu::ServerSocket::~ServerSocket()
8{
9}
10
diff --git a/src/stable/serversocket.h b/src/stable/serversocket.h
new file mode 100644
index 0000000..cb2591d
--- /dev/null
+++ b/src/stable/serversocket.h
@@ -0,0 +1,44 @@
1/*
2 * Copyright (C) 2007-2019 Xagasoft, All rights reserved.
3 *
4 * This file is part of the libbu++ library and is released under the
5 * terms of the license contained in the file LICENSE.
6 */
7
8#ifndef BU_SERVER_SOCKET_H
9#define BU_SERVER_SOCKET_H
10
11#include <stdint.h>
12
13namespace Bu
14{
15 class Socket;
16
17 /**
18 * Abstract representation of a server socket of some kind. Maybe socket
19 * isn't strictly accurate. This could be a tcp/ip socket, a named
20 * filesystem based socket, etc.
21 *
22 *@ingroup Serving
23 */
24 class ServerSocket
25 {
26 public:
27 ServerSocket();
28 virtual ~ServerSocket();
29
30 /**
31 * Accept a new connection, returning a connected Bu::Socket object.
32 */
33 virtual Bu::Socket *accept( int nTimeoutSec=0, int nTimeoutUSec=0 )=0;
34
35 /**
36 * Provide the contained file descriptor. Return false if there is no
37 * internal file descriptor as such. If the return value is true, then
38 * rFdOut will be set to the file descriptor.
39 */
40 virtual bool getFd( int &rFdOut ) const=0;
41 };
42}
43
44#endif
diff --git a/src/stable/tcpserversocket.cpp b/src/stable/serversockettcp.cpp
index b1e3461..c3750b2 100644
--- a/src/stable/tcpserversocket.cpp
+++ b/src/stable/serversockettcp.cpp
@@ -23,12 +23,13 @@
23#include <sys/types.h> 23#include <sys/types.h>
24//#include <termios.h> 24//#include <termios.h>
25#include <fcntl.h> 25#include <fcntl.h>
26#include "bu/tcpserversocket.h" 26#include "bu/serversockettcp.h"
27#include "bu/sockettcp.h"
27 28
28namespace Bu { subExceptionDef( TcpServerSocketException ) } 29namespace Bu { subExceptionDef( ServerSocketTcpException ) }
29 30
30Bu::TcpServerSocket::TcpServerSocket( int nPort, int nPoolSize ) : 31Bu::ServerSocketTcp::ServerSocketTcp( int iPort, int nPoolSize ) :
31 nPort( nPort ) 32 iPort( iPort )
32{ 33{
33#ifdef WIN32 34#ifdef WIN32
34 Bu::Winsock2::getInstance(); 35 Bu::Winsock2::getInstance();
@@ -39,7 +40,7 @@ Bu::TcpServerSocket::TcpServerSocket( int nPort, int nPoolSize ) :
39 40
40 /* Give the socket a name. */ 41 /* Give the socket a name. */
41 name.sin_family = AF_INET; 42 name.sin_family = AF_INET;
42 name.sin_port = bu_htons( nPort ); 43 name.sin_port = bu_htons( iPort );
43 44
44 // I think this specifies who we will accept connections from, 45 // I think this specifies who we will accept connections from,
45 // a good thing to make configurable later on 46 // a good thing to make configurable later on
@@ -48,8 +49,8 @@ Bu::TcpServerSocket::TcpServerSocket( int nPort, int nPoolSize ) :
48 startServer( name, nPoolSize ); 49 startServer( name, nPoolSize );
49} 50}
50 51
51Bu::TcpServerSocket::TcpServerSocket(const String &sAddr,int nPort, int nPoolSize) : 52Bu::ServerSocketTcp::ServerSocketTcp(const String &sAddr,int iPort, int nPoolSize) :
52 nPort( nPort ) 53 iPort( iPort )
53{ 54{
54#ifdef WIN32 55#ifdef WIN32
55 Bu::Winsock2::getInstance(); 56 Bu::Winsock2::getInstance();
@@ -61,7 +62,7 @@ Bu::TcpServerSocket::TcpServerSocket(const String &sAddr,int nPort, int nPoolSiz
61 /* Give the socket a name. */ 62 /* Give the socket a name. */
62 name.sin_family = AF_INET; 63 name.sin_family = AF_INET;
63 64
64 name.sin_port = bu_htons( nPort ); 65 name.sin_port = bu_htons( iPort );
65 66
66#ifdef WIN32 67#ifdef WIN32
67 name.sin_addr.s_addr = bu_inet_addr( sAddr.getStr() ); 68 name.sin_addr.s_addr = bu_inet_addr( sAddr.getStr() );
@@ -72,9 +73,9 @@ Bu::TcpServerSocket::TcpServerSocket(const String &sAddr,int nPort, int nPoolSiz
72 startServer( name, nPoolSize ); 73 startServer( name, nPoolSize );
73} 74}
74 75
75Bu::TcpServerSocket::TcpServerSocket( socket_t nServer, bool bInit, int nPoolSize ) : 76Bu::ServerSocketTcp::ServerSocketTcp( socket_t iSocket, bool bInit, int nPoolSize ) :
76 nServer( nServer ), 77 iSocket( iSocket ),
77 nPort( 0 ) 78 iPort( 0 )
78{ 79{
79#ifdef WIN32 80#ifdef WIN32
80 Bu::Winsock2::getInstance(); 81 Bu::Winsock2::getInstance();
@@ -84,56 +85,56 @@ Bu::TcpServerSocket::TcpServerSocket( socket_t nServer, bool bInit, int nPoolSiz
84 { 85 {
85 struct sockaddr name; 86 struct sockaddr name;
86 socklen_t namelen = sizeof(name); 87 socklen_t namelen = sizeof(name);
87 getpeername( nServer, &name, &namelen ); 88 getpeername( iSocket, &name, &namelen );
88 89
89 initServer( *((sockaddr_in *)&name), nPoolSize ); 90 initServer( *((sockaddr_in *)&name), nPoolSize );
90 } 91 }
91 else 92 else
92 { 93 {
93 FD_ZERO( &fdActive ); 94 FD_ZERO( &fdActive );
94 FD_SET( nServer, &fdActive ); 95 FD_SET( iSocket, &fdActive );
95 } 96 }
96} 97}
97 98
98Bu::TcpServerSocket::TcpServerSocket( const TcpServerSocket &rSrc ) 99Bu::ServerSocketTcp::ServerSocketTcp( const ServerSocketTcp &rSrc )
99{ 100{
100#ifdef WIN32 101#ifdef WIN32
101 Bu::Winsock2::getInstance(); 102 Bu::Winsock2::getInstance();
102#endif 103#endif
103 104
104 nServer = dup( rSrc.nServer ); 105 iSocket = dup( rSrc.iSocket );
105 nPort = rSrc.nPort; 106 iPort = rSrc.iPort;
106 FD_ZERO( &fdActive ); 107 FD_ZERO( &fdActive );
107 FD_SET( nServer, &fdActive ); 108 FD_SET( iSocket, &fdActive );
108} 109}
109 110
110Bu::TcpServerSocket::~TcpServerSocket() 111Bu::ServerSocketTcp::~ServerSocketTcp()
111{ 112{
112#ifdef WIN32 113#ifdef WIN32
113 if( nServer != INVALID_SOCKET ) 114 if( iSocket != INVALID_SOCKET )
114#else 115#else
115 if( nServer > -1 ) 116 if( iSocket > -1 )
116#endif 117#endif
117 ::close( nServer ); 118 ::close( iSocket );
118} 119}
119 120
120void Bu::TcpServerSocket::startServer( struct sockaddr_in &name, int nPoolSize ) 121void Bu::ServerSocketTcp::startServer( struct sockaddr_in &name, int nPoolSize )
121{ 122{
122 /* Create the socket. */ 123 /* Create the socket. */
123 nServer = bu_socket( PF_INET, SOCK_STREAM, 0 ); 124 iSocket = bu_socket( PF_INET, SOCK_STREAM, 0 );
124 125
125#ifdef WIN32 126#ifdef WIN32
126 if( nServer == INVALID_SOCKET ) 127 if( iSocket == INVALID_SOCKET )
127#else 128#else
128 if( nServer < 0 ) 129 if( iSocket < 0 )
129#endif 130#endif
130 { 131 {
131 throw Bu::TcpServerSocketException("Couldn't create a listen socket."); 132 throw Bu::ServerSocketTcpException("Couldn't create a listen socket.");
132 } 133 }
133 134
134 int opt = 1; 135 int opt = 1;
135 bu_setsockopt( 136 bu_setsockopt(
136 nServer, 137 iSocket,
137 SOL_SOCKET, 138 SOL_SOCKET,
138 SO_REUSEADDR, 139 SO_REUSEADDR,
139 (char *)&opt, 140 (char *)&opt,
@@ -143,31 +144,26 @@ void Bu::TcpServerSocket::startServer( struct sockaddr_in &name, int nPoolSize )
143 initServer( name, nPoolSize ); 144 initServer( name, nPoolSize );
144} 145}
145 146
146void Bu::TcpServerSocket::initServer( struct sockaddr_in &name, int nPoolSize ) 147void Bu::ServerSocketTcp::initServer( struct sockaddr_in &name, int nPoolSize )
147{ 148{
148 if( bu_bind( nServer, (struct sockaddr *) &name, sizeof(name) ) < 0 ) 149 if( bu_bind( iSocket, (struct sockaddr *) &name, sizeof(name) ) < 0 )
149 { 150 {
150 throw Bu::TcpServerSocketException("Couldn't bind to the listen socket."); 151 throw Bu::ServerSocketTcpException("Couldn't bind to the listen socket.");
151 } 152 }
152 153
153 if( bu_listen( nServer, nPoolSize ) < 0 ) 154 if( bu_listen( iSocket, nPoolSize ) < 0 )
154 { 155 {
155 throw Bu::TcpServerSocketException( 156 throw Bu::ServerSocketTcpException(
156 "Couldn't begin listening to the server socket." 157 "Couldn't begin listening to the server socket."
157 ); 158 );
158 } 159 }
159 160
160 FD_ZERO( &fdActive ); 161 FD_ZERO( &fdActive );
161 /* Initialize the set of active sockets. */ 162 /* Initialize the set of active sockets. */
162 FD_SET( nServer, &fdActive ); 163 FD_SET( iSocket, &fdActive );
163} 164}
164 165
165int Bu::TcpServerSocket::getSocket() 166Bu::Socket *Bu::ServerSocketTcp::accept( int nTimeoutSec, int nTimeoutUSec )
166{
167 return nServer;
168}
169
170int Bu::TcpServerSocket::accept( int nTimeoutSec, int nTimeoutUSec )
171{ 167{
172 fd_set fdRead = fdActive; 168 fd_set fdRead = fdActive;
173 169
@@ -177,14 +173,14 @@ int Bu::TcpServerSocket::accept( int nTimeoutSec, int nTimeoutUSec )
177 xT.tv_usec = nTimeoutUSec; 173 xT.tv_usec = nTimeoutUSec;
178 174
179 if( TEMP_FAILURE_RETRY( 175 if( TEMP_FAILURE_RETRY(
180 bu_select( nServer+1, &fdRead, NULL, NULL, &xT )) < 0 ) 176 bu_select( iSocket+1, &fdRead, NULL, NULL, &xT )) < 0 )
181 { 177 {
182 throw Bu::TcpServerSocketException( 178 throw Bu::ServerSocketTcpException(
183 "Error scanning for new connections: %s", strerror( errno ) 179 "Error scanning for new connections: %s", strerror( errno )
184 ); 180 );
185 } 181 }
186 182
187 if( FD_ISSET( nServer, &fdRead ) ) 183 if( FD_ISSET( iSocket, &fdRead ) )
188 { 184 {
189 struct sockaddr_in clientname; 185 struct sockaddr_in clientname;
190 socklen_t size; 186 socklen_t size;
@@ -192,23 +188,23 @@ int Bu::TcpServerSocket::accept( int nTimeoutSec, int nTimeoutUSec )
192 188
193 size = sizeof( clientname ); 189 size = sizeof( clientname );
194#ifdef WIN32 190#ifdef WIN32
195 nClient = bu_accept( nServer, (struct sockaddr *)&clientname, &size); 191 nClient = bu_accept( iSocket, (struct sockaddr *)&clientname, &size);
196#else /* not-WIN32 */ 192#else /* not-WIN32 */
197#ifdef __CYGWIN__ 193#ifdef __CYGWIN__
198 nClient = ::accept( nServer, (struct sockaddr *)&clientname, 194 nClient = ::accept( iSocket, (struct sockaddr *)&clientname,
199 (int *)&size 195 (int *)&size
200 ); 196 );
201#else /* not-cygwin */ 197#else /* not-cygwin */
202#ifdef __APPLE__ 198#ifdef __APPLE__
203 nClient = ::accept( nServer, (struct sockaddr *)&clientname, (socklen_t*)&size ); 199 nClient = ::accept( iSocket, (struct sockaddr *)&clientname, (socklen_t*)&size );
204#else /* linux */ 200#else /* linux */
205 nClient = ::accept( nServer, (struct sockaddr *)&clientname, &size ); 201 nClient = ::accept( iSocket, (struct sockaddr *)&clientname, &size );
206#endif /* __APPLE__ */ 202#endif /* __APPLE__ */
207#endif /* __CYGWIN__ */ 203#endif /* __CYGWIN__ */
208#endif /* WIN32 */ 204#endif /* WIN32 */
209 if( nClient < 0 ) 205 if( nClient < 0 )
210 { 206 {
211 throw Bu::TcpServerSocketException( 207 throw Bu::ServerSocketTcpException(
212 "Error accepting a new connection: %s", strerror( errno ) 208 "Error accepting a new connection: %s", strerror( errno )
213 ); 209 );
214 } 210 }
@@ -227,7 +223,7 @@ int Bu::TcpServerSocket::accept( int nTimeoutSec, int nTimeoutUSec )
227 flags |= O_NONBLOCK; 223 flags |= O_NONBLOCK;
228 if( fcntl( nClient, F_SETFL, flags ) < 0) 224 if( fcntl( nClient, F_SETFL, flags ) < 0)
229 { 225 {
230 throw Bu::TcpServerSocketException( 226 throw Bu::ServerSocketTcpException(
231 "Error setting option on client socket: %s", 227 "Error setting option on client socket: %s",
232 strerror( errno ) 228 strerror( errno )
233 ); 229 );
@@ -244,14 +240,15 @@ int Bu::TcpServerSocket::accept( int nTimeoutSec, int nTimeoutUSec )
244#endif 240#endif
245 } 241 }
246 242
247 return nClient; 243 return new SocketTcp( nClient );
248 } 244 }
249 245
250 return -1; 246 return NULL;
251} 247}
252 248
253int Bu::TcpServerSocket::getPort() 249bool Bu::ServerSocketTcp::getFd( int &rFdOut ) const
254{ 250{
255 return nPort; 251 rFdOut = iSocket;
252 return true;
256} 253}
257 254
diff --git a/src/stable/tcpserversocket.h b/src/stable/serversockettcp.h
index d15d7bd..8e43c76 100644
--- a/src/stable/tcpserversocket.h
+++ b/src/stable/serversockettcp.h
@@ -5,12 +5,13 @@
5 * terms of the license contained in the file LICENSE. 5 * terms of the license contained in the file LICENSE.
6 */ 6 */
7 7
8#ifndef BU_TCP_SERVER_SOCKET_H 8#ifndef BU_SERVER_SOCKET_TCP_H
9#define BU_TCP_SERVER_SOCKET_H 9#define BU_SERVER_SOCKET_TCP_H
10 10
11#include <stdint.h> 11#include <stdint.h>
12#include "bu/string.h" 12#include "bu/string.h"
13#include "bu/exceptionbase.h" 13#include "bu/exceptionbase.h"
14#include "bu/serversocket.h"
14 15
15#ifdef WIN32 16#ifdef WIN32
16 #include <Winsock2.h> 17 #include <Winsock2.h>
@@ -20,7 +21,7 @@
20 21
21namespace Bu 22namespace Bu
22{ 23{
23 subExceptionDecl( TcpServerSocketException ); 24 subExceptionDecl( ServerSocketTcpException );
24 25
25 /** 26 /**
26 * A single tcp/ip server socket. When created the server socket will bind 27 * A single tcp/ip server socket. When created the server socket will bind
@@ -34,7 +35,7 @@ namespace Bu
34 * 35 *
35 *@ingroup Serving 36 *@ingroup Serving
36 */ 37 */
37 class TcpServerSocket 38 class ServerSocketTcp : public ServerSocket
38 { 39 {
39 public: 40 public:
40#ifdef WIN32 41#ifdef WIN32
@@ -42,23 +43,22 @@ namespace Bu
42#else 43#else
43 typedef int socket_t; 44 typedef int socket_t;
44#endif 45#endif
45 TcpServerSocket( int nPort, int nPoolSize=40 ); 46 ServerSocketTcp( int iPort, int nPoolSize=40 );
46 TcpServerSocket( const String &sAddr, int nPort, int nPoolSize=40 ); 47 ServerSocketTcp( const String &sAddr, int iPort, int nPoolSize=40 );
47 TcpServerSocket( socket_t nSocket, bool bInit, int nPoolSize=40 ); 48 ServerSocketTcp( socket_t nSocket, bool bInit, int nPoolSize=40 );
48 TcpServerSocket( const TcpServerSocket &rSrc ); 49 ServerSocketTcp( const ServerSocketTcp &rSrc );
49 virtual ~TcpServerSocket(); 50 virtual ~ServerSocketTcp();
50 51
51 int accept( int nTimeoutSec=0, int nTimeoutUSec=0 ); 52 virtual Bu::Socket *accept( int nTimeoutSec=0, int nTimeoutUSec=0 );
52 int getSocket(); 53 virtual bool getFd( int &rFdOut ) const;
53 int getPort();
54 54
55 private: 55 private:
56 void startServer( struct sockaddr_in &name, int nPoolSize ); 56 void startServer( struct sockaddr_in &name, int nPoolSize );
57 void initServer( struct sockaddr_in &name, int nPoolSize ); 57 void initServer( struct sockaddr_in &name, int nPoolSize );
58 58
59 fd_set fdActive; 59 fd_set fdActive;
60 socket_t nServer; 60 socket_t iSocket;
61 int nPort; 61 int iPort;
62 }; 62 };
63} 63}
64 64
diff --git a/src/stable/socket.cpp b/src/stable/socket.cpp
new file mode 100644
index 0000000..95c8773
--- /dev/null
+++ b/src/stable/socket.cpp
@@ -0,0 +1,10 @@
1#include "bu/socket.h"
2
3Bu::Socket::Socket()
4{
5}
6
7Bu::Socket::~Socket()
8{
9}
10
diff --git a/src/stable/socket.h b/src/stable/socket.h
new file mode 100644
index 0000000..360b3aa
--- /dev/null
+++ b/src/stable/socket.h
@@ -0,0 +1,33 @@
1/*
2 * Copyright (C) 2007-2019 Xagasoft, All rights reserved.
3 *
4 * This file is part of the libbu++ library and is released under the
5 * terms of the license contained in the file LICENSE.
6 */
7
8#ifndef BU_SOCKET_H
9#define BU_SOCKET_H
10
11#include <stdint.h>
12
13#include "bu/config.h"
14#include "bu/stream.h"
15
16namespace Bu
17{
18 /**
19 *
20 *@ingroup Serving
21 *@ingroup Streams
22 */
23 class Socket : public Stream
24 {
25 public:
26 Socket();
27 virtual ~Socket();
28
29 virtual bool getFd( int &rFdOut ) const=0;
30 };
31}
32
33#endif
diff --git a/src/stable/tcpsocket.cpp b/src/stable/sockettcp.cpp
index d036063..d61f92f 100644
--- a/src/stable/tcpsocket.cpp
+++ b/src/stable/sockettcp.cpp
@@ -14,7 +14,7 @@
14#include <sys/time.h> 14#include <sys/time.h>
15#include <errno.h> 15#include <errno.h>
16#include <fcntl.h> 16#include <fcntl.h>
17#include "bu/tcpsocket.h" 17#include "bu/sockettcp.h"
18 18
19#include "bu/config.h" 19#include "bu/config.h"
20 20
@@ -29,10 +29,10 @@
29 29
30#define RBS (1024*2) 30#define RBS (1024*2)
31 31
32namespace Bu { subExceptionDef( TcpSocketException ) } 32namespace Bu { subExceptionDef( SocketTcpException ) }
33 33
34Bu::TcpSocket::TcpSocket( handle nTcpSocket ) : 34Bu::SocketTcp::SocketTcp( handle nSocketTcp ) :
35 nTcpSocket( nTcpSocket ), 35 nSocketTcp( nSocketTcp ),
36 bActive( true ), 36 bActive( true ),
37 bBlocking( true ) 37 bBlocking( true )
38{ 38{
@@ -42,9 +42,9 @@ Bu::TcpSocket::TcpSocket( handle nTcpSocket ) :
42 setAddress(); 42 setAddress();
43} 43}
44 44
45Bu::TcpSocket::TcpSocket( const Bu::String &sAddr, int nPort, int nTimeout, 45Bu::SocketTcp::SocketTcp( const Bu::String &sAddr, int nPort, int nTimeout,
46 bool bBlocking ) : 46 bool bBlocking ) :
47 nTcpSocket( 0 ), 47 nSocketTcp( 0 ),
48 bActive( false ), 48 bActive( false ),
49 bBlocking( true ) 49 bBlocking( true )
50{ 50{
@@ -53,12 +53,12 @@ Bu::TcpSocket::TcpSocket( const Bu::String &sAddr, int nPort, int nTimeout,
53#endif 53#endif
54 54
55 /* Create the socket. */ 55 /* Create the socket. */
56 nTcpSocket = bu_socket( PF_INET, SOCK_STREAM, 0 ); 56 nSocketTcp = bu_socket( PF_INET, SOCK_STREAM, 0 );
57 57
58#ifdef WIN32 58#ifdef WIN32
59 if( nTcpSocket == INVALID_SOCKET ) 59 if( nSocketTcp == INVALID_SOCKET )
60#else 60#else
61 if( nTcpSocket < 0 ) 61 if( nSocketTcp < 0 )
62#endif 62#endif
63 { 63 {
64 throw ExceptionBase("Couldn't create socket.\n"); 64 throw ExceptionBase("Couldn't create socket.\n");
@@ -83,12 +83,12 @@ Bu::TcpSocket::TcpSocket( const Bu::String &sAddr, int nPort, int nTimeout,
83 sAddr.getStr(), ibuf, &aiHints, &pAddr )) != 0 ) 83 sAddr.getStr(), ibuf, &aiHints, &pAddr )) != 0 )
84 { 84 {
85 close(); 85 close();
86 throw Bu::TcpSocketException("Couldn't resolve hostname %s (%s).\n", 86 throw Bu::SocketTcpException("Couldn't resolve hostname %s (%s).\n",
87 sAddr.getStr(), bu_gai_strerror(ret)); 87 sAddr.getStr(), bu_gai_strerror(ret));
88 } 88 }
89 89
90 bu_connect( 90 bu_connect(
91 nTcpSocket, 91 nSocketTcp,
92 pAddr->ai_addr, 92 pAddr->ai_addr,
93 pAddr->ai_addrlen 93 pAddr->ai_addrlen
94 ); 94 );
@@ -106,17 +106,17 @@ Bu::TcpSocket::TcpSocket( const Bu::String &sAddr, int nPort, int nTimeout,
106 int retval; 106 int retval;
107 107
108 FD_ZERO(&rfds); 108 FD_ZERO(&rfds);
109 FD_SET(nTcpSocket, &rfds); 109 FD_SET(nSocketTcp, &rfds);
110 FD_ZERO(&wfds); 110 FD_ZERO(&wfds);
111 FD_SET(nTcpSocket, &wfds); 111 FD_SET(nSocketTcp, &wfds);
112 FD_ZERO(&efds); 112 FD_ZERO(&efds);
113 FD_SET(nTcpSocket, &efds); 113 FD_SET(nSocketTcp, &efds);
114 114
115 struct timeval tv; 115 struct timeval tv;
116 tv.tv_sec = nTimeout; 116 tv.tv_sec = nTimeout;
117 tv.tv_usec = 0; 117 tv.tv_usec = 0;
118 118
119 retval = bu_select( nTcpSocket+1, &rfds, &wfds, &efds, &tv ); 119 retval = bu_select( nSocketTcp+1, &rfds, &wfds, &efds, &tv );
120 120
121 if( retval == 0 ) 121 if( retval == 0 )
122 { 122 {
@@ -130,49 +130,49 @@ Bu::TcpSocket::TcpSocket( const Bu::String &sAddr, int nPort, int nTimeout,
130 setBlocking( bBlocking ); 130 setBlocking( bBlocking );
131} 131}
132 132
133Bu::TcpSocket::~TcpSocket() 133Bu::SocketTcp::~SocketTcp()
134{ 134{
135 close(); 135 close();
136} 136}
137 137
138void Bu::TcpSocket::close() 138void Bu::SocketTcp::close()
139{ 139{
140 if( bActive ) 140 if( bActive )
141 { 141 {
142#ifndef WIN32 142#ifndef WIN32
143 fsync( nTcpSocket ); 143 fsync( nSocketTcp );
144#endif 144#endif
145#ifdef WIN32 145#ifdef WIN32
146 #ifndef SHUT_RDWR 146 #ifndef SHUT_RDWR
147 #define SHUT_RDWR (SD_BOTH) 147 #define SHUT_RDWR (SD_BOTH)
148 #endif 148 #endif
149#endif 149#endif
150 bu_shutdown( nTcpSocket, SHUT_RDWR ); 150 bu_shutdown( nSocketTcp, SHUT_RDWR );
151 ::close( nTcpSocket ); 151 ::close( nSocketTcp );
152 } 152 }
153 bActive = false; 153 bActive = false;
154} 154}
155 155
156Bu::size Bu::TcpSocket::read( void *pBuf, Bu::size nBytes ) 156Bu::size Bu::SocketTcp::read( void *pBuf, Bu::size nBytes )
157{ 157{
158 fd_set rfds; 158 fd_set rfds;
159 FD_ZERO(&rfds); 159 FD_ZERO(&rfds);
160 FD_SET(nTcpSocket, &rfds); 160 FD_SET(nSocketTcp, &rfds);
161 struct timeval tv = {0, 0}; 161 struct timeval tv = {0, 0};
162 if( bu_select( nTcpSocket+1, &rfds, NULL, NULL, &tv ) < 0 ) 162 if( bu_select( nSocketTcp+1, &rfds, NULL, NULL, &tv ) < 0 )
163 { 163 {
164 int iErr = errno; 164 int iErr = errno;
165 close(); 165 close();
166 throw TcpSocketException( TcpSocketException::cRead, strerror(iErr) ); 166 throw SocketTcpException( SocketTcpException::cRead, strerror(iErr) );
167 } 167 }
168 if( FD_ISSET( nTcpSocket, &rfds ) || bBlocking ) 168 if( FD_ISSET( nSocketTcp, &rfds ) || bBlocking )
169 { 169 {
170 int nRead = TEMP_FAILURE_RETRY( 170 int nRead = TEMP_FAILURE_RETRY(
171 bu_recv( nTcpSocket, (char *) pBuf, nBytes, 0 ) ); 171 bu_recv( nSocketTcp, (char *) pBuf, nBytes, 0 ) );
172 if( nRead == 0 && nBytes > 0 ) 172 if( nRead == 0 && nBytes > 0 )
173 { 173 {
174 close(); 174 close();
175 throw TcpSocketException( TcpSocketException::cClosed, "TcpSocket closed."); 175 throw SocketTcpException( SocketTcpException::cClosed, "SocketTcp closed.");
176 } 176 }
177 if( nRead < 0 ) 177 if( nRead < 0 )
178 { 178 {
@@ -184,14 +184,14 @@ Bu::size Bu::TcpSocket::read( void *pBuf, Bu::size nBytes )
184 if( errno == ENETRESET || errno == ECONNRESET ) 184 if( errno == ENETRESET || errno == ECONNRESET )
185 { 185 {
186 close(); 186 close();
187 throw TcpSocketException( TcpSocketException::cClosed, 187 throw SocketTcpException( SocketTcpException::cClosed,
188 strerror(errno) ); 188 strerror(errno) );
189 } 189 }
190 if( errno == EAGAIN ) 190 if( errno == EAGAIN )
191 return 0; 191 return 0;
192 int iErr = errno; 192 int iErr = errno;
193 close(); 193 close();
194 throw TcpSocketException( TcpSocketException::cRead, strerror(iErr) ); 194 throw SocketTcpException( SocketTcpException::cRead, strerror(iErr) );
195#endif 195#endif
196 } 196 }
197 return nRead; 197 return nRead;
@@ -199,7 +199,7 @@ Bu::size Bu::TcpSocket::read( void *pBuf, Bu::size nBytes )
199 return 0; 199 return 0;
200} 200}
201 201
202Bu::size Bu::TcpSocket::read( void *pBuf, Bu::size nBytes, 202Bu::size Bu::SocketTcp::read( void *pBuf, Bu::size nBytes,
203 uint32_t nSec, uint32_t nUSec ) 203 uint32_t nSec, uint32_t nUSec )
204{ 204{
205 struct timeval tv; 205 struct timeval tv;
@@ -207,7 +207,7 @@ Bu::size Bu::TcpSocket::read( void *pBuf, Bu::size nBytes,
207 207
208 fd_set rfds; 208 fd_set rfds;
209 FD_ZERO(&rfds); 209 FD_ZERO(&rfds);
210 FD_SET(nTcpSocket, &rfds); 210 FD_SET(nSocketTcp, &rfds);
211 211
212#ifdef WIN32 212#ifdef WIN32
213 DWORD dwStart = GetTickCount(); 213 DWORD dwStart = GetTickCount();
@@ -224,7 +224,7 @@ Bu::size Bu::TcpSocket::read( void *pBuf, Bu::size nBytes,
224 { 224 {
225 tv.tv_sec = nSec; 225 tv.tv_sec = nSec;
226 tv.tv_usec = nUSec; 226 tv.tv_usec = nUSec;
227 bu_select( nTcpSocket+1, &rfds, NULL, NULL, &tv ); 227 bu_select( nSocketTcp+1, &rfds, NULL, NULL, &tv );
228 nRead += read( ((char *)pBuf)+nRead, nBytes-nRead ); 228 nRead += read( ((char *)pBuf)+nRead, nBytes-nRead );
229 if( nRead >= nBytes ) 229 if( nRead >= nBytes )
230 break; 230 break;
@@ -243,13 +243,13 @@ Bu::size Bu::TcpSocket::read( void *pBuf, Bu::size nBytes,
243 return nRead; 243 return nRead;
244} 244}
245 245
246Bu::size Bu::TcpSocket::write( const void *pBuf, Bu::size nBytes ) 246Bu::size Bu::SocketTcp::write( const void *pBuf, Bu::size nBytes )
247{ 247{
248//#ifdef WIN32 248//#ifdef WIN32
249 int nWrote = TEMP_FAILURE_RETRY( 249 int nWrote = TEMP_FAILURE_RETRY(
250 bu_send( nTcpSocket, (const char *) pBuf, nBytes, 0 ) ); 250 bu_send( nSocketTcp, (const char *) pBuf, nBytes, 0 ) );
251//#else 251//#else
252// int nWrote = TEMP_FAILURE_RETRY( ::write( nTcpSocket, pBuf, nBytes ) ); 252// int nWrote = TEMP_FAILURE_RETRY( ::write( nSocketTcp, pBuf, nBytes ) );
253//#endif 253//#endif
254 if( nWrote < 0 ) 254 if( nWrote < 0 )
255 { 255 {
@@ -260,19 +260,19 @@ Bu::size Bu::TcpSocket::write( const void *pBuf, Bu::size nBytes )
260#else 260#else
261 if( errno == EAGAIN ) return 0; 261 if( errno == EAGAIN ) return 0;
262#endif 262#endif
263 throw TcpSocketException( TcpSocketException::cWrite, strerror(errno) ); 263 throw SocketTcpException( SocketTcpException::cWrite, strerror(errno) );
264 } 264 }
265 return nWrote; 265 return nWrote;
266} 266}
267 267
268Bu::size Bu::TcpSocket::write( const void *pBuf, Bu::size nBytes, uint32_t nSec, uint32_t nUSec ) 268Bu::size Bu::SocketTcp::write( const void *pBuf, Bu::size nBytes, uint32_t nSec, uint32_t nUSec )
269{ 269{
270 struct timeval tv; 270 struct timeval tv;
271 Bu::size nWrote = 0; 271 Bu::size nWrote = 0;
272 272
273 fd_set wfds; 273 fd_set wfds;
274 FD_ZERO(&wfds); 274 FD_ZERO(&wfds);
275 FD_SET(nTcpSocket, &wfds); 275 FD_SET(nSocketTcp, &wfds);
276 276
277#ifdef WIN32 277#ifdef WIN32
278 DWORD dwStart = GetTickCount(); 278 DWORD dwStart = GetTickCount();
@@ -289,7 +289,7 @@ Bu::size Bu::TcpSocket::write( const void *pBuf, Bu::size nBytes, uint32_t nSec,
289 { 289 {
290 tv.tv_sec = nSec; 290 tv.tv_sec = nSec;
291 tv.tv_usec = nUSec; 291 tv.tv_usec = nUSec;
292 bu_select( nTcpSocket+1, NULL, &wfds, NULL, &tv ); 292 bu_select( nSocketTcp+1, NULL, &wfds, NULL, &tv );
293 nWrote += write( ((char *)pBuf)+nWrote, nBytes-nWrote ); 293 nWrote += write( ((char *)pBuf)+nWrote, nBytes-nWrote );
294 if( nWrote >= nBytes ) 294 if( nWrote >= nBytes )
295 break; 295 break;
@@ -308,101 +308,101 @@ Bu::size Bu::TcpSocket::write( const void *pBuf, Bu::size nBytes, uint32_t nSec,
308 return nWrote; 308 return nWrote;
309} 309}
310 310
311Bu::size Bu::TcpSocket::tell() 311Bu::size Bu::SocketTcp::tell()
312{ 312{
313 throw UnsupportedException(); 313 throw UnsupportedException();
314} 314}
315 315
316void Bu::TcpSocket::seek( Bu::size ) 316void Bu::SocketTcp::seek( Bu::size )
317{ 317{
318 throw UnsupportedException(); 318 throw UnsupportedException();
319} 319}
320 320
321void Bu::TcpSocket::setPos( Bu::size ) 321void Bu::SocketTcp::setPos( Bu::size )
322{ 322{
323 throw UnsupportedException(); 323 throw UnsupportedException();
324} 324}
325 325
326void Bu::TcpSocket::setPosEnd( Bu::size ) 326void Bu::SocketTcp::setPosEnd( Bu::size )
327{ 327{
328 throw UnsupportedException(); 328 throw UnsupportedException();
329} 329}
330 330
331bool Bu::TcpSocket::isEos() 331bool Bu::SocketTcp::isEos()
332{ 332{
333 return !bActive; 333 return !bActive;
334} 334}
335 335
336bool Bu::TcpSocket::canRead() 336bool Bu::SocketTcp::canRead()
337{ 337{
338 fd_set rfds; 338 fd_set rfds;
339 FD_ZERO(&rfds); 339 FD_ZERO(&rfds);
340 FD_SET(nTcpSocket, &rfds); 340 FD_SET(nSocketTcp, &rfds);
341 struct timeval tv = { 0, 0 }; 341 struct timeval tv = { 0, 0 };
342 int retval = bu_select( nTcpSocket+1, &rfds, NULL, NULL, &tv ); 342 int retval = bu_select( nSocketTcp+1, &rfds, NULL, NULL, &tv );
343 if( retval == -1 ) 343 if( retval == -1 )
344 throw TcpSocketException( 344 throw SocketTcpException(
345 TcpSocketException::cBadRead, 345 SocketTcpException::cBadRead,
346 "Bad Read error" 346 "Bad Read error"
347 ); 347 );
348 348
349 if( !FD_ISSET( nTcpSocket, &rfds ) ) 349 if( !FD_ISSET( nSocketTcp, &rfds ) )
350 return false; 350 return false;
351 return true; 351 return true;
352} 352}
353 353
354bool Bu::TcpSocket::canWrite() 354bool Bu::SocketTcp::canWrite()
355{ 355{
356 fd_set wfds; 356 fd_set wfds;
357 FD_ZERO(&wfds); 357 FD_ZERO(&wfds);
358 FD_SET(nTcpSocket, &wfds); 358 FD_SET(nSocketTcp, &wfds);
359 struct timeval tv = { 0, 0 }; 359 struct timeval tv = { 0, 0 };
360 int retval = bu_select( nTcpSocket+1, NULL, &wfds, NULL, &tv ); 360 int retval = bu_select( nSocketTcp+1, NULL, &wfds, NULL, &tv );
361 if( retval == -1 ) 361 if( retval == -1 )
362 throw TcpSocketException( 362 throw SocketTcpException(
363 TcpSocketException::cBadRead, 363 SocketTcpException::cBadRead,
364 "Bad Read error" 364 "Bad Read error"
365 ); 365 );
366 if( !FD_ISSET( nTcpSocket, &wfds ) ) 366 if( !FD_ISSET( nSocketTcp, &wfds ) )
367 return false; 367 return false;
368 return true; 368 return true;
369} 369}
370 370
371bool Bu::TcpSocket::isReadable() 371bool Bu::SocketTcp::isReadable()
372{ 372{
373 return true; 373 return true;
374} 374}
375 375
376bool Bu::TcpSocket::isWritable() 376bool Bu::SocketTcp::isWritable()
377{ 377{
378 return true; 378 return true;
379} 379}
380 380
381bool Bu::TcpSocket::isSeekable() 381bool Bu::SocketTcp::isSeekable()
382{ 382{
383 return false; 383 return false;
384} 384}
385 385
386bool Bu::TcpSocket::isBlocking() 386bool Bu::SocketTcp::isBlocking()
387{ 387{
388#ifndef WIN32 388#ifndef WIN32
389 return ((fcntl( nTcpSocket, F_GETFL, 0 ) & O_NONBLOCK) != O_NONBLOCK); 389 return ((fcntl( nSocketTcp, F_GETFL, 0 ) & O_NONBLOCK) != O_NONBLOCK);
390#else 390#else
391 return false; 391 return false;
392#endif 392#endif
393} 393}
394 394
395void Bu::TcpSocket::setBlocking( bool bBlocking ) 395void Bu::SocketTcp::setBlocking( bool bBlocking )
396{ 396{
397 this->bBlocking = bBlocking; 397 this->bBlocking = bBlocking;
398#ifndef WIN32 398#ifndef WIN32
399 if( bBlocking ) 399 if( bBlocking )
400 { 400 {
401 fcntl( nTcpSocket, F_SETFL, fcntl( nTcpSocket, F_GETFL, 0 ) & (~O_NONBLOCK) ); 401 fcntl( nSocketTcp, F_SETFL, fcntl( nSocketTcp, F_GETFL, 0 ) & (~O_NONBLOCK) );
402 } 402 }
403 else 403 else
404 { 404 {
405 fcntl( nTcpSocket, F_SETFL, fcntl( nTcpSocket, F_GETFL, 0 ) | O_NONBLOCK ); 405 fcntl( nSocketTcp, F_SETFL, fcntl( nSocketTcp, F_GETFL, 0 ) | O_NONBLOCK );
406 } 406 }
407#else 407#else
408 u_long iMode; 408 u_long iMode;
@@ -416,67 +416,73 @@ void Bu::TcpSocket::setBlocking( bool bBlocking )
416 // socket based on the numerical value of iMode. 416 // socket based on the numerical value of iMode.
417 // If iMode = 0, blocking is enabled; 417 // If iMode = 0, blocking is enabled;
418 // If iMode != 0, non-blocking mode is enabled. 418 // If iMode != 0, non-blocking mode is enabled.
419 bu_ioctlsocket(nTcpSocket, FIONBIO, &iMode); 419 bu_ioctlsocket(nSocketTcp, FIONBIO, &iMode);
420#endif 420#endif
421} 421}
422 422
423void Bu::TcpSocket::setSize( Bu::size ) 423void Bu::SocketTcp::setSize( Bu::size )
424{ 424{
425} 425}
426 426
427void Bu::TcpSocket::flush() 427void Bu::SocketTcp::flush()
428{ 428{
429} 429}
430 430
431bool Bu::TcpSocket::isOpen() 431bool Bu::SocketTcp::isOpen()
432{ 432{
433 return bActive; 433 return bActive;
434} 434}
435 435
436void Bu::TcpSocket::setAddress() 436void Bu::SocketTcp::setAddress()
437{ 437{
438 struct sockaddr_in addr; 438 struct sockaddr_in addr;
439 socklen_t len = sizeof(addr); 439 socklen_t len = sizeof(addr);
440 addr.sin_family = AF_INET; 440 addr.sin_family = AF_INET;
441 bu_getpeername( nTcpSocket, (sockaddr *)(&addr), &len ); 441 bu_getpeername( nSocketTcp, (sockaddr *)(&addr), &len );
442 sAddress = bu_inet_ntoa( addr.sin_addr ); 442 sAddress = bu_inet_ntoa( addr.sin_addr );
443} 443}
444 444
445Bu::String Bu::TcpSocket::getAddress() const 445Bu::String Bu::SocketTcp::getAddress() const
446{ 446{
447 return sAddress; 447 return sAddress;
448} 448}
449 449
450Bu::TcpSocket::operator Bu::TcpSocket::handle() const 450Bu::SocketTcp::operator Bu::SocketTcp::handle() const
451{ 451{
452 return nTcpSocket; 452 return nSocketTcp;
453} 453}
454 454
455Bu::TcpSocket::handle Bu::TcpSocket::getHandle() const 455Bu::SocketTcp::handle Bu::SocketTcp::getHandle() const
456{ 456{
457 return nTcpSocket; 457 return nSocketTcp;
458} 458}
459 459
460Bu::TcpSocket::handle Bu::TcpSocket::takeHandle() 460Bu::SocketTcp::handle Bu::SocketTcp::takeHandle()
461{ 461{
462 handle nRet = nTcpSocket; 462 handle nRet = nSocketTcp;
463 bActive = false; 463 bActive = false;
464 nTcpSocket = 0; 464 nSocketTcp = 0;
465 return nRet; 465 return nRet;
466} 466}
467 467
468Bu::size Bu::TcpSocket::getSize() const 468Bu::size Bu::SocketTcp::getSize() const
469{ 469{
470 throw UnsupportedException(); 470 throw UnsupportedException();
471} 471}
472 472
473Bu::size Bu::TcpSocket::getBlockSize() const 473Bu::size Bu::SocketTcp::getBlockSize() const
474{ 474{
475 return 1500; //TODO: Fix this, it's stupid. 475 return 1500; //TODO: Fix this, it's stupid.
476} 476}
477 477
478Bu::String Bu::TcpSocket::getLocation() const 478Bu::String Bu::SocketTcp::getLocation() const
479{ 479{
480 return getAddress(); 480 return getAddress();
481} 481}
482 482
483bool Bu::SocketTcp::getFd( int &rFdOut ) const
484{
485 rFdOut = nSocketTcp;
486 return true;
487}
488
diff --git a/src/stable/tcpsocket.h b/src/stable/sockettcp.h
index 69cc4fd..3fc14ef 100644
--- a/src/stable/tcpsocket.h
+++ b/src/stable/sockettcp.h
@@ -5,19 +5,19 @@
5 * terms of the license contained in the file LICENSE. 5 * terms of the license contained in the file LICENSE.
6 */ 6 */
7 7
8#ifndef BU_TCP_SOCKET_H 8#ifndef BU_SOCKET_TCP_H
9#define BU_TCP_SOCKET_H 9#define BU_SOCKET_TCP_H
10 10
11#include <stdint.h> 11#include <stdint.h>
12 12
13#include "bu/config.h" 13#include "bu/config.h"
14#include "bu/stream.h" 14#include "bu/socket.h"
15#include "bu/string.h" 15#include "bu/string.h"
16#include "bu/exceptionbase.h" 16#include "bu/exceptionbase.h"
17 17
18namespace Bu 18namespace Bu
19{ 19{
20 subExceptionDeclBegin( TcpSocketException ); 20 subExceptionDeclBegin( SocketTcpException );
21 enum { 21 enum {
22 cRead, 22 cRead,
23 cWrite, 23 cWrite,
@@ -41,23 +41,23 @@ namespace Bu
41 * Please note that there is a condition that will occur eventually (at 41 * Please note that there is a condition that will occur eventually (at
42 * least on *nix systems) that will trigger a SIGPIPE condition. This 42 * least on *nix systems) that will trigger a SIGPIPE condition. This
43 * will terminate your program immediately unless handled properly. Most 43 * will terminate your program immediately unless handled properly. Most
44 * people doing any connections with TcpSocket will want to put this in 44 * people doing any connections with SocketTcp will want to put this in
45 * their program somewhere before they use it: 45 * their program somewhere before they use it:
46 *@code 46 *@code
47 #include <signal.h> 47 #include <signal.h>
48 ... 48 ...
49 ... 49 ...
50 ... 50 ...
51 sigset( SIGPIPE, SIG_IGN ); // do this before you use a Bu::TcpSocket 51 sigset( SIGPIPE, SIG_IGN ); // do this before you use a Bu::SocketTcp
52 @endcode 52 @endcode
53 * When this is done, Bu::TcpSocket will simply throw a broken pipe 53 * When this is done, Bu::SocketTcp will simply throw a broken pipe
54 * exception just like every other error condition, allowing your program 54 * exception just like every other error condition, allowing your program
55 * to handle it sanely. 55 * to handle it sanely.
56 * 56 *
57 *@ingroup Serving 57 *@ingroup Serving
58 *@ingroup Streams 58 *@ingroup Streams
59 */ 59 */
60 class TcpSocket : public Stream 60 class SocketTcp : public Socket
61 { 61 {
62 public: 62 public:
63#ifdef WIN32 63#ifdef WIN32
@@ -66,10 +66,10 @@ namespace Bu
66 typedef int handle; 66 typedef int handle;
67#endif 67#endif
68 68
69 TcpSocket( handle nTcpSocket ); 69 SocketTcp( handle nSocketTcp );
70 TcpSocket( const String &sAddr, int nPort, int nTimeout=30, 70 SocketTcp( const String &sAddr, int nPort, int nTimeout=30,
71 bool bBlocking=true ); 71 bool bBlocking=true );
72 virtual ~TcpSocket(); 72 virtual ~SocketTcp();
73 73
74 virtual void close(); 74 virtual void close();
75 virtual size read( void *pBuf, size nBytes ); 75 virtual size read( void *pBuf, size nBytes );
@@ -111,10 +111,12 @@ namespace Bu
111 virtual size getBlockSize() const; 111 virtual size getBlockSize() const;
112 virtual Bu::String getLocation() const; 112 virtual Bu::String getLocation() const;
113 113
114 virtual bool getFd( int &rFdOut ) const;
115
114 private: 116 private:
115 void setAddress(); 117 void setAddress();
116 118
117 handle nTcpSocket; 119 handle nSocketTcp;
118 120
119 bool bActive; 121 bool bActive;
120 bool bBlocking; 122 bool bBlocking;
diff --git a/src/stable/synchroqueue.h b/src/stable/synchroqueue.h
index 1c39c2c..b10ec33 100644
--- a/src/stable/synchroqueue.h
+++ b/src/stable/synchroqueue.h
@@ -44,7 +44,8 @@ namespace Bu
44 SynchroQueue() : 44 SynchroQueue() :
45 pStart( NULL ), 45 pStart( NULL ),
46 pEnd( NULL ), 46 pEnd( NULL ),
47 nSize( 0 ) 47 nSize( 0 ),
48 bRunning( true )
48 { 49 {
49 } 50 }
50 51
@@ -76,6 +77,14 @@ namespace Bu
76 */ 77 */
77 void enqueue( T pData ) 78 void enqueue( T pData )
78 { 79 {
80 mRunning.lock();
81 if( !bRunning )
82 {
83 mRunning.unlock();
84 throw Bu::ExceptionBase("SynchoQueue is stopped.");
85 }
86 mRunning.unlock();
87
79 cBlock.lock(); 88 cBlock.lock();
80 89
81 if( pStart == NULL ) 90 if( pStart == NULL )
@@ -119,6 +128,14 @@ namespace Bu
119 */ 128 */
120 T dequeue( bool bBlock=false ) 129 T dequeue( bool bBlock=false )
121 { 130 {
131 mRunning.lock();
132 if( !bRunning )
133 {
134 mRunning.unlock();
135 return T();
136 }
137 mRunning.unlock();
138
122 cBlock.lock(); 139 cBlock.lock();
123 if( pStart == NULL ) 140 if( pStart == NULL )
124 { 141 {
@@ -129,8 +146,18 @@ namespace Bu
129 if( pStart == NULL ) 146 if( pStart == NULL )
130 { 147 {
131 cBlock.unlock(); 148 cBlock.unlock();
132 return NULL; 149 return T();
133 } 150 }
151
152 mRunning.lock();
153 if( !bRunning )
154 {
155 mRunning.unlock();
156 cBlock.unlock();
157 return T();
158 }
159 mRunning.unlock();
160
134 T pTmp = pStart->pData; 161 T pTmp = pStart->pData;
135 Item *pDel = pStart; 162 Item *pDel = pStart;
136 pStart = pStart->pNext; 163 pStart = pStart->pNext;
@@ -143,7 +170,7 @@ namespace Bu
143 } 170 }
144 171
145 cBlock.unlock(); 172 cBlock.unlock();
146 return NULL; 173 return T();
147 } 174 }
148 else 175 else
149 { 176 {
@@ -171,6 +198,14 @@ namespace Bu
171 */ 198 */
172 T dequeue( int nSec, int nUSec ) 199 T dequeue( int nSec, int nUSec )
173 { 200 {
201 mRunning.lock();
202 if( !bRunning )
203 {
204 mRunning.unlock();
205 return T();
206 }
207 mRunning.unlock();
208
174 cBlock.lock(); 209 cBlock.lock();
175 if( pStart == NULL ) 210 if( pStart == NULL )
176 { 211 {
@@ -179,8 +214,17 @@ namespace Bu
179 if( pStart == NULL ) 214 if( pStart == NULL )
180 { 215 {
181 cBlock.unlock(); 216 cBlock.unlock();
182 return NULL; 217 return T();
218 }
219
220 mRunning.lock();
221 if( !bRunning )
222 {
223 mRunning.unlock();
224 cBlock.unlock();
225 return T();
183 } 226 }
227 mRunning.unlock();
184 228
185 T pTmp = pStart->pData; 229 T pTmp = pStart->pData;
186 Item *pDel = pStart; 230 Item *pDel = pStart;
@@ -203,6 +247,35 @@ namespace Bu
203 return pTmp; 247 return pTmp;
204 } 248 }
205 } 249 }
250
251 T drain()
252 {
253 mRunning.lock();
254 if( bRunning )
255 {
256 mRunning.unlock();
257 return NULL;
258 }
259 mRunning.unlock();
260
261 cBlock.lock();
262 if( pStart == NULL )
263 {
264 cBlock.unlock();
265 return T();
266 }
267 else
268 {
269 T pTmp = pStart->pData;
270 Item *pDel = pStart;
271 pStart = pStart->pNext;
272 delete pDel;
273 nSize--;
274
275 cBlock.unlock();
276 return pTmp;
277 }
278 }
206 279
207 /** 280 /**
208 * Checks to see if the queue has data in it or not. Note that there 281 * Checks to see if the queue has data in it or not. Note that there
@@ -235,12 +308,31 @@ namespace Bu
235 cBlock.unlock(); 308 cBlock.unlock();
236 } 309 }
237 310
311 void stop()
312 {
313 mRunning.lock();
314 bRunning = false;
315 mRunning.unlock();
316 unblockAll();
317 }
318
319 bool isRunning() const
320 {
321 bool bRet;
322 mRunning.lock();
323 bRet = bRunning;
324 mRunning.unlock();
325 return bRet;
326 }
327
238 private: 328 private:
239 Item *pStart; /**< The start of the queue, the next element to dequeue. */ 329 Item *pStart; /**< The start of the queue, the next element to dequeue. */
240 Item *pEnd; /**< The end of the queue, the last element to dequeue. */ 330 Item *pEnd; /**< The end of the queue, the last element to dequeue. */
241 long nSize; /**< The number of items in the queue. */ 331 long nSize; /**< The number of items in the queue. */
242 332
243 Condition cBlock; /**< The condition for blocking dequeues. */ 333 Condition cBlock; /**< The condition for blocking dequeues. */
334 mutable Mutex mRunning;
335 bool bRunning;
244 }; 336 };
245} 337}
246 338
diff --git a/src/unstable/itoserver.cpp b/src/unstable/itoserver.cpp
deleted file mode 100644
index 9e9d5e0..0000000
--- a/src/unstable/itoserver.cpp
+++ /dev/null
@@ -1,241 +0,0 @@
1/*
2 * Copyright (C) 2007-2019 Xagasoft, All rights reserved.
3 *
4 * This file is part of the libbu++ library and is released under the
5 * terms of the license contained in the file LICENSE.
6 */
7
8#include "bu/config.h"
9#include "bu/itoserver.h"
10#include <errno.h>
11#include "bu/tcpserversocket.h"
12#include "bu/client.h"
13#include "bu/tcpsocket.h"
14
15Bu::ItoServer::ItoServer() :
16 nTimeoutSec( 1 ),
17 nTimeoutUSec( 0 )
18{
19 FD_ZERO( &fdActive );
20}
21
22Bu::ItoServer::~ItoServer()
23{
24 while( !qClientCleanup.isEmpty() )
25 {
26 ItoClient *pCli = qClientCleanup.dequeue();
27 pCli->join();
28 delete pCli;
29 }
30 // TODO: Make sure here that each client has shutdown it's socket, and
31 // maybe even written any extra data, we could put a timelimit on this...
32 // anyway, it's not as clean as it could be right now.
33 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ )
34 {
35 ItoClient *pCli = (*i);
36 pCli->join();
37 delete pCli;
38 }
39}
40
41void Bu::ItoServer::addPort( int nPort, int nPoolSize )
42{
43 TcpServerSocket *s = new TcpServerSocket( nPort, nPoolSize );
44 socket_t nSocket = s->getSocket();
45 FD_SET( nSocket, &fdActive );
46 hServers.insert( nSocket, s );
47}
48
49void Bu::ItoServer::addPort( const String &sAddr, int nPort, int nPoolSize )
50{
51 TcpServerSocket *s = new TcpServerSocket( sAddr, nPort, nPoolSize );
52 socket_t nSocket = s->getSocket();
53 FD_SET( nSocket, &fdActive );
54 hServers.insert( nSocket, s );
55}
56
57void Bu::ItoServer::setTimeout( int nTimeoutSec, int nTimeoutUSec )
58{
59 this->nTimeoutSec = nTimeoutSec;
60 this->nTimeoutUSec = nTimeoutUSec;
61}
62
63void Bu::ItoServer::addClient( socket_t nSocket, int nPort )
64{
65 ItoClient *pC = new ItoClient( *this, nSocket, nPort, nTimeoutSec,
66 nTimeoutUSec );
67
68 imClients.lock();
69 hClients.insert( nSocket, pC );
70 imClients.unlock();
71
72 pC->start();
73}
74
75void Bu::ItoServer::run()
76{
77 for(;;)
78 {
79 struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec };
80
81 fd_set fdRead = fdActive;
82 //fd_set fdWrite = fdActive;
83 fd_set fdException = fdActive;
84
85 if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, &fdRead, NULL, &fdException, &xTimeout ) ) < 0 )
86 {
87 throw ExceptionBase("Error attempting to scan open connections.");
88 }
89
90 for( ServerHash::iterator i = hServers.begin(); i != hServers.end(); i++ )
91 {
92 if( FD_ISSET( i.getKey(), &fdRead ) )
93 {
94 TcpServerSocket *pSrv = i.getValue();
95 addClient( pSrv->accept(), pSrv->getPort() );
96 }
97 }
98
99 while( !qClientCleanup.isEmpty() )
100 {
101 ItoClient *pCli = qClientCleanup.dequeue();
102 pCli->join();
103 delete pCli;
104 }
105 }
106}
107
108void Bu::ItoServer::clientCleanup( socket_t iSocket )
109{
110 imClients.lock();
111 ItoClient *pCli = hClients.get( iSocket );
112 imClients.unlock();
113 qClientCleanup.enqueue( pCli );
114}
115
116Bu::ItoServer::ItoClient::ItoClient( ItoServer &rSrv, Bu::ItoServer::socket_t iSocket, int iPort,
117 int nTimeoutSec, int nTimeoutUSec ) :
118 rSrv( rSrv ),
119 iSocket( iSocket ),
120 iPort( iPort ),
121 nTimeoutSec( nTimeoutSec ),
122 nTimeoutUSec( nTimeoutUSec )
123{
124 FD_ZERO( &fdActive );
125 FD_SET( iSocket, &fdActive );
126
127 pClient = new Client(
128 new Bu::TcpSocket( iSocket ),
129 new SrvClientLinkFactory( rSrv )
130 );
131}
132
133Bu::ItoServer::ItoClient::~ItoClient()
134{
135}
136
137void Bu::ItoServer::ItoClient::run()
138{
139 imProto.lock();
140 rSrv.onNewConnection( pClient, iPort );
141 pClient->processOutput();
142 imProto.unlock();
143
144 for(;;)
145 {
146 struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec };
147
148 fd_set fdRead = fdActive;
149 fd_set fdWrite;
150 fd_set fdException = fdActive;
151
152 FD_ZERO( &fdWrite );
153 if( pClient->hasOutput() )
154 FD_SET( iSocket, &fdWrite );
155
156 if( TEMP_FAILURE_RETRY( select( FD_SETSIZE,
157 &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 )
158 {
159 throw ExceptionBase("Error attempting to scan open connections.");
160 }
161
162 while( !qMsg.isEmpty() )
163 {
164 imProto.lock();
165 Bu::String *pMsg = qMsg.dequeue();
166 pClient->onMessage( *pMsg );
167 delete pMsg;
168 pClient->processOutput();
169 imProto.unlock();
170 }
171
172 if( FD_ISSET( iSocket, &fdRead ) )
173 {
174 imProto.lock();
175 pClient->processInput();
176 imProto.unlock();
177 if( !pClient->isOpen() )
178 {
179 imProto.lock();
180 rSrv.onClosedConnection( pClient );
181 imProto.unlock();
182
183 rSrv.clientCleanup( iSocket );
184
185 return;
186 }
187 }
188
189 if( FD_ISSET( iSocket, &fdWrite ) )
190 {
191 imProto.lock();
192 pClient->processOutput();
193 imProto.unlock();
194 }
195 }
196}
197
198Bu::ItoServer::SrvClientLink::SrvClientLink( ItoClient *pClient ) :
199 pClient( pClient )
200{
201}
202
203Bu::ItoServer::SrvClientLink::~SrvClientLink()
204{
205}
206
207void Bu::ItoServer::SrvClientLink::sendMessage( const Bu::String &sMsg )
208{
209 if( !pClient->imProto.trylock() )
210 {
211 pClient->pClient->onMessage( sMsg );
212 pClient->pClient->processOutput();
213 pClient->imProto.unlock();
214 }
215 else
216 {
217 Bu::String *pMsg = new Bu::String( sMsg );
218 pClient->qMsg.enqueue( pMsg );
219 }
220}
221
222Bu::ItoServer::SrvClientLinkFactory::SrvClientLinkFactory(
223 Bu::ItoServer &rSrv ) :
224 rSrv( rSrv )
225{
226}
227
228Bu::ItoServer::SrvClientLinkFactory::~SrvClientLinkFactory()
229{
230}
231
232Bu::ClientLink *Bu::ItoServer::SrvClientLinkFactory::createLink(
233 Bu::Client *pClient )
234{
235 rSrv.imClients.lock();
236 ItoClient *pCli = rSrv.hClients.get( *pClient->getSocket() );
237 rSrv.imClients.unlock();
238
239 return new SrvClientLink( pCli );
240}
241
diff --git a/src/unstable/itoserver.h b/src/unstable/itoserver.h
deleted file mode 100644
index f5e4a71..0000000
--- a/src/unstable/itoserver.h
+++ /dev/null
@@ -1,147 +0,0 @@
1/*
2 * Copyright (C) 2007-2019 Xagasoft, All rights reserved.
3 *
4 * This file is part of the libbu++ library and is released under the
5 * terms of the license contained in the file LICENSE.
6 */
7
8#ifndef BU_ITO_SERVER_H
9#define BU_ITO_SERVER_H
10
11#include <stdint.h>
12
13#ifndef WIN32
14 #include <sys/select.h>
15#endif
16
17#include "bu/string.h"
18#include "bu/list.h"
19#include "bu/thread.h"
20#include "bu/mutex.h"
21#include "bu/synchroqueue.h"
22#include "bu/hash.h"
23
24#include "bu/clientlink.h"
25#include "bu/clientlinkfactory.h"
26
27namespace Bu
28{
29 class TcpServerSocket;
30 class TcpSocket;
31 class Client;
32
33 /**
34 * Core of a network server. This class is distinct from a ServerSocket in
35 * that a ServerSocket is one listening socket, nothing more. Socket will
36 * manage a pool of both ServerSockets and connected Sockets along with
37 * their protocols and buffers.
38 *
39 * To start serving on a new port, use the addPort functions. Each call to
40 * addPort creates a new ServerSocket, starts it listening, and adds it to
41 * the server pool.
42 *
43 * All of the real work is done by scan, which will wait for up
44 * to the timeout set by setTimeout before returning if there is no data
45 * pending. scan should probably be called in some sort of tight
46 * loop, possibly in it's own thread, or in the main control loop.
47 *
48 * In order to use a Server you must subclass it and implement the pure
49 * virtual functions. These allow you to receive notification of events
50 * happening within the server itself, and actually makes it useful.
51 *@ingroup Threading Serving
52 */
53 class ItoServer : public Thread
54 {
55 friend class ItoClient;
56 friend class SrvClientLinkFactory;
57 public:
58 ItoServer();
59 virtual ~ItoServer();
60
61#ifdef WIN32
62 typedef unsigned int socket_t;
63#else
64 typedef int socket_t;
65#endif
66
67 void addPort( int nPort, int nPoolSize=40 );
68 void addPort( const String &sAddr, int nPort, int nPoolSize=40 );
69
70 //void scan();
71 void setTimeout( int nTimeoutSec, int nTimeoutUSec=0 );
72
73 void addClient( socket_t nSocket, int nPort );
74
75 virtual void onNewConnection( Client *pClient, int nPort )=0;
76 virtual void onClosedConnection( Client *pClient )=0;
77
78 protected:
79 virtual void run();
80
81 private:
82 class SrvClientLink;
83 class ItoClient : public Thread
84 {
85 friend class Bu::ItoServer::SrvClientLink;
86 public:
87 ItoClient( ItoServer &rSrv, socket_t nSocket, int nPort,
88 int nTimeoutSec, int nTimeoutUSec );
89 virtual ~ItoClient();
90
91 typedef SynchroQueue<Bu::String *> StringQueue;
92 StringQueue qMsg;
93
94 protected:
95 virtual void run();
96
97 private:
98 ItoServer &rSrv;
99 Client *pClient;
100 fd_set fdActive;
101 socket_t iSocket;
102 int iPort;
103 int nTimeoutSec;
104 int nTimeoutUSec;
105 Mutex imProto;
106 };
107
108 class SrvClientLink : public Bu::ClientLink
109 {
110 public:
111 SrvClientLink( ItoClient *pClient );
112 virtual ~SrvClientLink();
113
114 virtual void sendMessage( const Bu::String &sMsg );
115
116 private:
117 ItoClient *pClient;
118 };
119
120 class SrvClientLinkFactory : public Bu::ClientLinkFactory
121 {
122 public:
123 SrvClientLinkFactory( ItoServer &rSrv );
124 virtual ~SrvClientLinkFactory();
125
126 virtual Bu::ClientLink *createLink( Bu::Client *pClient );
127
128 private:
129 ItoServer &rSrv;
130 };
131
132 int nTimeoutSec;
133 int nTimeoutUSec;
134 fd_set fdActive;
135 typedef Hash<socket_t,TcpServerSocket *> ServerHash;
136 ServerHash hServers;
137 typedef Hash<socket_t,ItoClient *> ClientHash;
138 typedef SynchroQueue<ItoClient *> ClientQueue;
139 ClientHash hClients;
140 ClientQueue qClientCleanup;
141 Mutex imClients;
142
143 void clientCleanup( socket_t iSocket );
144 };
145}
146
147#endif