aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Buland <eichlan@xagasoft.com>2023-07-31 15:17:52 -0700
committerMike Buland <eichlan@xagasoft.com>2023-07-31 15:17:52 -0700
commita49af97abf091a32f6ec2c3985aa0890ded65d9c (patch)
tree9332acb37ccaa1552bf273d2383d4a0292deea14
parent9a7dde29dc1bc1f699508ad9c0335f4d7abf319f (diff)
downloadlibbu++-a49af97abf091a32f6ec2c3985aa0890ded65d9c.tar.gz
libbu++-a49af97abf091a32f6ec2c3985aa0890ded65d9c.tar.bz2
libbu++-a49af97abf091a32f6ec2c3985aa0890ded65d9c.tar.xz
libbu++-a49af97abf091a32f6ec2c3985aa0890ded65d9c.zip
Debugging yet, but the new server works.
It at least seems to!
-rw-r--r--src/stable/client.cpp18
-rw-r--r--src/stable/client.h9
-rw-r--r--src/stable/clientbuf.cpp10
-rw-r--r--src/stable/server.cpp194
-rw-r--r--src/stable/server.h29
-rw-r--r--src/stable/serverinterface.cpp29
-rw-r--r--src/stable/serverinterface.h40
-rw-r--r--src/unstable/protocolwebsocket.cpp5
8 files changed, 266 insertions, 68 deletions
diff --git a/src/stable/client.cpp b/src/stable/client.cpp
index d2d48d7..159d103 100644
--- a/src/stable/client.cpp
+++ b/src/stable/client.cpp
@@ -23,16 +23,21 @@
23#define BU_PROFILE_END( x ) (void)0 23#define BU_PROFILE_END( x ) (void)0
24#endif 24#endif
25 25
26Bu::Client::Client( class Bu::ClientLinkFactory *pfLink ) : 26Bu::Client::Client( int iId, const Bu::ServerInterface &rServer ) :
27 pProto( NULL ), 27 pProto( NULL ),
28 bWantsDisconnect( false ), 28 bWantsDisconnect( false ),
29 pfLink( pfLink ) 29 iId( iId ),
30 xServer( rServer )
30{ 31{
31} 32}
32 33
33Bu::Client::~Client() 34Bu::Client::~Client()
34{ 35{
35 delete pfLink; 36}
37
38int Bu::Client::getId() const
39{
40 return iId;
36} 41}
37 42
38void Bu::Client::processInput() 43void Bu::Client::processInput()
@@ -47,6 +52,11 @@ void Bu::Client::processInput()
47 } 52 }
48} 53}
49 54
55void Bu::Client::outputReady()
56{
57 xServer.outputReady( iId );
58}
59
50void Bu::Client::setProtocol( Protocol *pProto ) 60void Bu::Client::setProtocol( Protocol *pProto )
51{ 61{
52 Bu::MutexLocker l( mProto ); 62 Bu::MutexLocker l( mProto );
@@ -161,7 +171,7 @@ void Bu::Client::close()
161 171
162Bu::ClientLink *Bu::Client::getLink() 172Bu::ClientLink *Bu::Client::getLink()
163{ 173{
164 return pfLink->createLink( this ); 174 return NULL; //pfLink->createLink( this );
165} 175}
166 176
167void Bu::Client::onMessage( const Bu::String &sMsg ) 177void Bu::Client::onMessage( const Bu::String &sMsg )
diff --git a/src/stable/client.h b/src/stable/client.h
index abe807e..0ff7914 100644
--- a/src/stable/client.h
+++ b/src/stable/client.h
@@ -15,6 +15,7 @@
15#include "bu/clientbuf.h" 15#include "bu/clientbuf.h"
16#include "bu/mutex.h" 16#include "bu/mutex.h"
17#include "bu/readwritemutex.h" 17#include "bu/readwritemutex.h"
18#include "bu/serverinterface.h"
18 19
19#ifndef PROFILE_BU_SERVER 20#ifndef PROFILE_BU_SERVER
20// #define PROFILE_BU_SERVER 1 21// #define PROFILE_BU_SERVER 1
@@ -37,10 +38,13 @@ namespace Bu
37 { 38 {
38 friend class Server; 39 friend class Server;
39 public: 40 public:
40 Client( Bu::ClientLinkFactory *pfLink ); 41 Client( int iId, const Bu::ServerInterface &rServer );
41 virtual ~Client(); 42 virtual ~Client();
42 43
44 int getId() const;
45
43 void processInput(); 46 void processInput();
47 void outputReady();
44 48
45 Bu::size write( const Bu::String &sData ); 49 Bu::size write( const Bu::String &sData );
46 Bu::size write( const void *pData, Bu::size nBytes ); 50 Bu::size write( const void *pData, Bu::size nBytes );
@@ -116,9 +120,10 @@ namespace Bu
116 Bu::Protocol *pProto; 120 Bu::Protocol *pProto;
117 Bu::ClientBuf cbBuffer; 121 Bu::ClientBuf cbBuffer;
118 bool bWantsDisconnect; 122 bool bWantsDisconnect;
119 class Bu::ClientLinkFactory *pfLink;
120 mutable Bu::Mutex mProto; 123 mutable Bu::Mutex mProto;
121 mutable Bu::Mutex mDisconnect; 124 mutable Bu::Mutex mDisconnect;
125 int iId;
126 Bu::ServerInterface xServer;
122 }; 127 };
123} 128}
124 129
diff --git a/src/stable/clientbuf.cpp b/src/stable/clientbuf.cpp
index c6b310f..8c4afc4 100644
--- a/src/stable/clientbuf.cpp
+++ b/src/stable/clientbuf.cpp
@@ -175,19 +175,16 @@ void Bu::ClientBuf::ClientAccess::close()
175 175
176Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes ) 176Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes )
177{ 177{
178 Bu::println("ClientAccess::read( ptr, %1 )").arg( iBytes );
179 char *pBuf = (char *)pBufRaw; 178 char *pBuf = (char *)pBufRaw;
180 Bu::MutexLocker l( mAccess ); 179 Bu::MutexLocker l( mAccess );
181 // Read from QueueBuf first 180 // Read from QueueBuf first
182 Bu::size ps = qbPeek.read( pBuf, iBytes ); 181 Bu::size ps = qbPeek.read( pBuf, iBytes );
183 Bu::println("ClientAccess::read: attempted qbPeek, got %1\n>%2<").arg( ps ).arg( Bu::String(pBuf, ps) );
184 iBytes -= ps; 182 iBytes -= ps;
185 pBuf += ps; 183 pBuf += ps;
186 // Request space left? Try the client 184 // Request space left? Try the client
187 if( iBytes > 0 ) 185 if( iBytes > 0 )
188 { 186 {
189 ps += rBuf.accClientFiltered.read( pBuf, iBytes ); 187 ps += rBuf.accClientFiltered.read( pBuf, iBytes );
190 Bu::println("ClientAccess::read: attempted completion from socket buffer, got %1\n>%2<").arg( ps ).arg( Bu::String(pBuf, ps) );
191 } 188 }
192 return ps; 189 return ps;
193} 190}
@@ -195,26 +192,22 @@ Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes )
195Bu::size Bu::ClientBuf::ClientAccess::peek( void *pData, int iBytes, 192Bu::size Bu::ClientBuf::ClientAccess::peek( void *pData, int iBytes,
196 int iOffset ) 193 int iOffset )
197{ 194{
198 Bu::println("ClientAccess::peek( ptr, %1, %2 )").arg( iBytes ).arg( iOffset );
199 Bu::MutexLocker l( mAccess ); 195 Bu::MutexLocker l( mAccess );
200 // Do we have enough data in the peek buffer to handle this? 196 // Do we have enough data in the peek buffer to handle this?
201 if( qbPeek.getSize() < iBytes+iOffset ) 197 if( qbPeek.getSize() < iBytes+iOffset )
202 { 198 {
203 Bu::println("ClientAccess::peek: Insufficient buffered (have %1b, need %2b)").arg( qbPeek.getSize() ).arg( iBytes+iOffset );
204 // Nope, make an attempt to fill it in. 199 // Nope, make an attempt to fill it in.
205 int nDiff = (iBytes+iOffset)-qbPeek.getSize(); 200 int nDiff = (iBytes+iOffset)-qbPeek.getSize();
206 // We have to make our own buffer, since iBytes+nOffeset could be bigger 201 // We have to make our own buffer, since iBytes+nOffeset could be bigger
207 // than pData. 202 // than pData.
208 char *pTmp = new char[nDiff]; 203 char *pTmp = new char[nDiff];
209 Bu::size ps = rBuf.accClientFiltered.read( pTmp, nDiff ); 204 Bu::size ps = rBuf.accClientFiltered.read( pTmp, nDiff );
210 Bu::println("ClientAccess::peek: Tried to fill buffer, read %1 of needed %2\n>%3<").arg( ps ).arg( nDiff ).arg( Bu::String(pTmp, ps) );
211 if( ps > 0 ) 205 if( ps > 0 )
212 { 206 {
213 // Add the data read to the peek buffer. 207 // Add the data read to the peek buffer.
214 qbPeek.write( pTmp, ps ); 208 qbPeek.write( pTmp, ps );
215 } 209 }
216 delete[] pTmp; 210 delete[] pTmp;
217 Bu::println("ClientAccess::peek: buffer left with %1").arg( qbPeek.getSize() );
218 } 211 }
219 212
220 return qbPeek.peek( pData, iBytes, iOffset ); 213 return qbPeek.peek( pData, iBytes, iOffset );
@@ -234,7 +227,6 @@ Bu::size Bu::ClientBuf::ClientAccess::tell()
234 227
235void Bu::ClientBuf::ClientAccess::seek( Bu::size offset ) 228void Bu::ClientBuf::ClientAccess::seek( Bu::size offset )
236{ 229{
237 Bu::println("ClientAccess::seek( %1 )").arg( offset );
238 Bu::MutexLocker l( mAccess ); 230 Bu::MutexLocker l( mAccess );
239 // For this type of stream seek is basically a destructive skip. It's like 231 // For this type of stream seek is basically a destructive skip. It's like
240 // reading the data but with no output buffer. Let's remove data from the 232 // reading the data but with no output buffer. Let's remove data from the
@@ -242,7 +234,6 @@ void Bu::ClientBuf::ClientAccess::seek( Bu::size offset )
242 if( qbPeek.getSize() > 0 ) 234 if( qbPeek.getSize() > 0 )
243 { 235 {
244 Bu::size amount = Bu::buMin( qbPeek.getSize(), offset ); 236 Bu::size amount = Bu::buMin( qbPeek.getSize(), offset );
245 Bu::println("ClientAccess::seek: buffered: %1, amount: %2").arg( qbPeek.getSize() ).arg( amount );
246 qbPeek.seek( amount ); 237 qbPeek.seek( amount );
247 offset -= amount; 238 offset -= amount;
248 } 239 }
@@ -250,7 +241,6 @@ void Bu::ClientBuf::ClientAccess::seek( Bu::size offset )
250 // If there's offset left, then apply it to the underlying stream 241 // If there's offset left, then apply it to the underlying stream
251 if( offset > 0 ) 242 if( offset > 0 )
252 { 243 {
253 Bu::println("ClientAccess::seek: seeking remaining %1 in socket buffer").arg( offset );
254 rBuf.accClientFiltered.seek( offset ); 244 rBuf.accClientFiltered.seek( offset );
255 } 245 }
256} 246}
diff --git a/src/stable/server.cpp b/src/stable/server.cpp
index 7c44c11..cc89f64 100644
--- a/src/stable/server.cpp
+++ b/src/stable/server.cpp
@@ -8,11 +8,14 @@
8#include "bu/server.h" 8#include "bu/server.h"
9#include <errno.h> 9#include <errno.h>
10#include <unistd.h> 10#include <unistd.h>
11#include <sys/epoll.h>
12#include <sys/eventfd.h>
11#include "bu/serversocket.h" 13#include "bu/serversocket.h"
12#include "bu/client.h" 14#include "bu/client.h"
13#include "bu/socket.h" 15#include "bu/socket.h"
14#include "bu/config.h" 16#include "bu/config.h"
15#include "bu/mutexlocker.h" 17#include "bu/mutexlocker.h"
18#include "bu/serverinterface.h"
16 19
17#include "bu/sio.h" 20#include "bu/sio.h"
18 21
@@ -25,14 +28,48 @@
25#endif 28#endif
26 29
27#define RBS 1500 30#define RBS 1500
31#define MAX_EVENTS 20
32
33namespace Bu
34{
35 class __ServerCore
36 {
37 public:
38 __ServerCore()
39 {
40 fdRead = epoll_create( 1 );
41 fdWrite = epoll_create( 1 );
42 fdEvent = eventfd( 0, 0 );
43
44 struct epoll_event ev;
45 ev.events = EPOLLIN;
46 ev.data.fd = fdEvent;
47 epoll_ctl( fdRead, EPOLL_CTL_ADD, fdEvent, &ev );
48 epoll_ctl( fdWrite, EPOLL_CTL_ADD, fdEvent, &ev );
49 }
50
51 ~__ServerCore()
52 {
53 close( fdRead );
54 close( fdWrite );
55 close( fdEvent );
56 }
57
58 Server::fd fdRead;
59 Server::fd fdWrite;
60 Server::fd fdEvent;
61 };
62}
28 63
29Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : 64Bu::Server::Server( int iIoWorkers, int iClientWorkers ) :
65 pCore( new Bu::__ServerCore() ),
30 nTimeoutSec( 0 ), 66 nTimeoutSec( 0 ),
31 nTimeoutUSec( 0 ), 67 nTimeoutUSec( 0 ),
32 bAutoTick( false ) 68 bAutoTick( false ),
69 tMonitorWrite( *this ),
70 bRunning( true )
33{ 71{
34 BU_PROFILE_START("server"); 72 BU_PROFILE_START("server");
35 FD_ZERO( &fdActive );
36 73
37 if( iIoWorkers < 1 ) 74 if( iIoWorkers < 1 )
38 iIoWorkers = 1; 75 iIoWorkers = 1;
@@ -52,11 +89,14 @@ Bu::Server::Server( int iIoWorkers, int iClientWorkers ) :
52 lClientWorker.append( pWorker ); 89 lClientWorker.append( pWorker );
53 pWorker->start(); 90 pWorker->start();
54 } 91 }
92 tMonitorWrite.start();
55} 93}
56 94
57Bu::Server::~Server() 95Bu::Server::~Server()
58{ 96{
59 shutdown(); 97 shutdown();
98 tMonitorWrite.join();
99 delete pCore;
60 BU_PROFILE_START("server"); 100 BU_PROFILE_START("server");
61} 101}
62 102
@@ -68,7 +108,11 @@ void Bu::Server::addServerSocket( Bu::ServerSocket *pSocket )
68 throw Bu::ExceptionBase("Cannot get file descriptor from " 108 throw Bu::ExceptionBase("Cannot get file descriptor from "
69 "provided ServerSocket."); 109 "provided ServerSocket.");
70 } 110 }
71 FD_SET( iFd, &fdActive ); 111
112 struct epoll_event ev;
113 ev.events = EPOLLIN;
114 ev.data.fd = iFd;
115 epoll_ctl( pCore->fdRead, EPOLL_CTL_ADD, iFd, &ev );
72 hServers.insert( iFd, pSocket ); 116 hServers.insert( iFd, pSocket );
73} 117}
74 118
@@ -81,26 +125,22 @@ void Bu::Server::setTimeout( int nTimeoutSec, int nTimeoutUSec )
81void Bu::Server::scan() 125void Bu::Server::scan()
82{ 126{
83 BU_PROFILE_START("scan"); 127 BU_PROFILE_START("scan");
84 struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec };
85 128
86 fd_set fdRead = fdActive;
87 fd_set fdWrite /* = fdActive*/;
88 fd_set fdException = fdActive;
89
90 FD_ZERO( &fdWrite );
91 mClients.lock(); 129 mClients.lock();
92 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) 130 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ )
93 { 131 {
94 if( (*i)->hasOutput() ) 132 if( (*i)->hasOutput() )
95 { 133 {
96 Bu::println("Socket has output..."); 134 // Queue it for output here?
97 FD_SET( i.getKey(), &fdWrite );
98 } 135 }
99 } 136 }
100 mClients.unlock(); 137 mClients.unlock();
101 138
102 if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, 139 struct epoll_event ev[MAX_EVENTS];
103 &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 ) 140
141 int iCount = epoll_wait( pCore->fdRead, ev, MAX_EVENTS, nTimeoutSec*1000+nTimeoutUSec/1000 );
142
143 if( iCount < 0 )
104 { 144 {
105 char buf[1024]; 145 char buf[1024];
106 strerror_r( errno, buf, 1024 ); 146 strerror_r( errno, buf, 1024 );
@@ -110,41 +150,29 @@ void Bu::Server::scan()
110 ); 150 );
111 } 151 }
112 152
113 evIoCycle.clear();
114 Bu::println("Cycle clear"); 153 Bu::println("Cycle clear");
115 154
116 for( int j = 0; j < FD_SETSIZE; j++ ) 155 for( int j = 0; j < iCount; j++ )
117 { 156 {
118 if( FD_ISSET( j, &fdRead ) ) 157 if( hServers.has( ev[j].data.fd ) )
119 { 158 {
120 if( hServers.has( j ) ) 159 Bu::ServerSocket *pSrv = hServers.get( ev[j].data.fd );
121 { 160 addClient( pSrv, pSrv->accept() );
122 Bu::ServerSocket *pSrv = hServers.get( j );
123 addClient( pSrv, pSrv->accept() );
124 }
125 else
126 {
127 evIoCycle.increment();
128 Bu::println("Increment (read)");
129 qIoEvent.enqueue( new Event( j, Event::Read ) );
130 }
131 } 161 }
132 if( FD_ISSET( j, &fdWrite ) ) 162 else
133 { 163 {
134 evIoCycle.increment(); 164 qIoEvent.enqueue( new Event( ev[j].data.fd, Event::Read ) );
135 Bu::println("Increment (write)");
136 qIoEvent.enqueue( new Event( j, Event::Write ) );
137 } 165 }
138 } 166 }
139 167
140 Bu::println("Waiting"); 168 Bu::println("Waiting");
141 while( evIoCycle.wait() > 0 ) { }
142 169
143 Bu::List<int> lDelete; 170 Bu::List<int> lDelete;
144 // Now we just try to write all the pending data on all the sockets. 171 // Now we just try to write all the pending data on all the sockets.
145 // this could be done better eventually, if we care about the socket 172 // this could be done better eventually, if we care about the socket
146 // wanting to accept writes (using a select). 173 // wanting to accept writes (using a select).
147 mClients.lock(); 174 mClients.lock();
175
148 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) 176 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ )
149 { 177 {
150 if( (*i)->wantsDisconnect() && !(*i)->hasOutput() ) 178 if( (*i)->wantsDisconnect() && !(*i)->hasOutput() )
@@ -174,10 +202,18 @@ void Bu::Server::addClient( const Bu::ServerSocket *pSrv, Bu::Socket *pSocket )
174 { 202 {
175 throw Bu::ExceptionBase("No file descriptor?"); 203 throw Bu::ExceptionBase("No file descriptor?");
176 } 204 }
177 FD_SET( iFdCli, &fdActive ); 205
206 struct epoll_event ev;
207 ev.events = EPOLLIN | EPOLLONESHOT;
208 ev.data.fd = iFdCli;
209 epoll_ctl( pCore->fdRead, EPOLL_CTL_ADD, iFdCli, &ev );
210
211 ev.events = 0;
212 epoll_ctl( pCore->fdWrite, EPOLL_CTL_ADD, iFdCli, &ev );
178 213
179 Client *pClient = new Client( 214 Client *pClient = new Client(
180 new SrvClientLinkFactory() 215 iFdCli,
216 ServerInterface( *this )
181 ); 217 );
182 { 218 {
183 Bu::MutexLocker l( mClients ); 219 Bu::MutexLocker l( mClients );
@@ -252,6 +288,11 @@ void Bu::Server::tick()
252void Bu::Server::shutdown() 288void Bu::Server::shutdown()
253{ 289{
254 { 290 {
291 int64_t iVal = 1;
292 write( pCore->fdEvent, &iVal, sizeof(int64_t) );
293 }
294
295 {
255 qIoEvent.stop(); 296 qIoEvent.stop();
256 qClientEvent.stop(); 297 qClientEvent.stop();
257 Bu::Server::Event *pEv; 298 Bu::Server::Event *pEv;
@@ -294,6 +335,24 @@ void Bu::Server::shutdown()
294 } 335 }
295 hClients.clear(); 336 hClients.clear();
296} 337}
338
339void Bu::Server::clientReadReady( fd iFd )
340{
341 // Re-arm
342 struct epoll_event ev;
343 ev.events = EPOLLIN | EPOLLONESHOT;
344 ev.data.fd = iFd;
345 epoll_ctl( pCore->fdRead, EPOLL_CTL_MOD, iFd, &ev );
346}
347
348void Bu::Server::clientWriteReady( fd iFd )
349{
350 // Re-arm
351 struct epoll_event ev;
352 ev.events = EPOLLOUT | EPOLLONESHOT;
353 ev.data.fd = iFd;
354 epoll_ctl( pCore->fdWrite, EPOLL_CTL_MOD, iFd, &ev );
355}
297 356
298void Bu::Server::closeClient( fd iSocket ) 357void Bu::Server::closeClient( fd iSocket )
299{ 358{
@@ -306,13 +365,60 @@ void Bu::Server::closeClient( fd iSocket )
306 hClients.erase( iSocket ); 365 hClients.erase( iSocket );
307 pSocket->close(); 366 pSocket->close();
308 hSockets.erase( iSocket ); 367 hSockets.erase( iSocket );
309 FD_CLR( iSocket, &fdActive ); 368
369 struct epoll_event ev;
370 epoll_ctl( pCore->fdRead, EPOLL_CTL_DEL, iSocket, &ev );
371 epoll_ctl( pCore->fdWrite, EPOLL_CTL_DEL, iSocket, &ev );
372
310 delete pClient; 373 delete pClient;
311 delete pSocket; 374 delete pSocket;
312 BU_PROFILE_END("closeClient"); 375 BU_PROFILE_END("closeClient");
313} 376}
314 377
315//////// 378////////
379// WriteMonitor
380////
381Bu::Server::WriteMonitor::WriteMonitor( Server &rSrv ) :
382 rSrv( rSrv )
383{
384}
385
386Bu::Server::WriteMonitor::~WriteMonitor()
387{
388}
389
390void Bu::Server::WriteMonitor::run()
391{
392 setName("busrv-writeMon");
393 struct epoll_event ev[MAX_EVENTS];
394 for(;;)
395 {
396 int iCount = epoll_wait( rSrv.pCore->fdWrite, ev, MAX_EVENTS, 5000 );
397 if( iCount < 0 )
398 {
399 // Bad error?
400 return;
401 }
402
403 for( int j = 0; j < iCount; j++ )
404 {
405 if( ev[j].data.fd == rSrv.pCore->fdEvent )
406 {
407 Bu::println("Bu::Server::WriteMonitor -> got event on fdEvent, exiting...");
408 return;
409 }
410 else
411 {
412 // Queue the write
413 rSrv.qIoEvent.enqueue(
414 new Event( ev[j].data.fd, Event::Write )
415 );
416 }
417 }
418 }
419}
420
421////////
316// Event 422// Event
317//// 423////
318 424
@@ -365,7 +471,6 @@ void Bu::Server::IoWorker::run()
365 if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) ) 471 if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) )
366 { 472 {
367 delete pEv; 473 delete pEv;
368 rSrv.evIoCycle.decrement();
369 continue; 474 continue;
370 } 475 }
371 476
@@ -384,9 +489,6 @@ void Bu::Server::IoWorker::run()
384 } 489 }
385 490
386 delete pEv; 491 delete pEv;
387
388 Bu::println("decrement");
389 rSrv.evIoCycle.decrement();
390 } 492 }
391} 493}
392 494
@@ -404,7 +506,6 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket )
404 try 506 try
405 { 507 {
406 iRead = pSocket->read( buf, RBS ); 508 iRead = pSocket->read( buf, RBS );
407 Bu::println("IoWorker::handleRead: read() -> %1").arg( iRead );
408 509
409 if( iRead == 0 ) 510 if( iRead == 0 )
410 { 511 {
@@ -421,7 +522,8 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket )
421 catch( Bu::ExceptionBase &e ) 522 catch( Bu::ExceptionBase &e )
422 { 523 {
423 Bu::println("IoWorker::handleRead: exception, closing: %1").arg( e.what() ); 524 Bu::println("IoWorker::handleRead: exception, closing: %1").arg( e.what() );
424 close( pSocket ); 525 pClient->disconnect();
526 //close( pSocket );
425 return; 527 return;
426 } 528 }
427 } 529 }
@@ -430,13 +532,16 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket )
430 if( iTotal == 0 ) 532 if( iTotal == 0 )
431 { 533 {
432 Bu::println("IoWorker::handleRead: read nothing, closing"); 534 Bu::println("IoWorker::handleRead: read nothing, closing");
433 close( pSocket ); 535 pClient->disconnect();
536 //close( pSocket );
434 } 537 }
435 else 538 else
436 { 539 {
437 Bu::Server::fd iFd; 540 Bu::Server::fd iFd;
438 pSocket->getFd( iFd ); 541 pSocket->getFd( iFd );
439 rSrv.qClientEvent.enqueue( new Event( iFd, Event::Process ) ); 542 rSrv.qClientEvent.enqueue( new Event( iFd, Event::Process ) );
543
544 rSrv.clientReadReady( iFd );
440 } 545 }
441} 546}
442 547
@@ -449,7 +554,6 @@ void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket )
449 int iAmnt = RBS; 554 int iAmnt = RBS;
450 iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); 555 iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt );
451 int iReal = pSocket->write( buf, iAmnt ); 556 int iReal = pSocket->write( buf, iAmnt );
452 Bu::println("IoWorker::handleWrite: Copy out: iAmnt=%1, iReal=%2\n>%3<").arg( iAmnt ).arg( iReal ).arg( Bu::String( buf, iReal ) );
453 pClient->cbBuffer.server().seek( iReal ); 557 pClient->cbBuffer.server().seek( iReal );
454 } 558 }
455} 559}
@@ -494,6 +598,10 @@ void Bu::Server::ClientWorker::run()
494 pClient->processInput(); 598 pClient->processInput();
495 Bu::println("Processing input complete."); 599 Bu::println("Processing input complete.");
496 Bu::println("*** ClientBuf: input: %1b, output: %2b").arg( pClient->getInputSize() ).arg( pClient->getOutputSize() ); 600 Bu::println("*** ClientBuf: input: %1b, output: %2b").arg( pClient->getInputSize() ).arg( pClient->getOutputSize() );
601 if( pClient->getOutputSize() > 0 )
602 {
603 rSrv.clientWriteReady( pClient->getId() );
604 }
497 delete pEv; 605 delete pEv;
498 } 606 }
499} 607}
diff --git a/src/stable/server.h b/src/stable/server.h
index e2b7d53..56ac29a 100644
--- a/src/stable/server.h
+++ b/src/stable/server.h
@@ -10,10 +10,6 @@
10 10
11#include <stdint.h> 11#include <stdint.h>
12 12
13#ifndef WIN32
14 #include <sys/select.h>
15#endif
16
17#include "bu/string.h" 13#include "bu/string.h"
18#include "bu/list.h" 14#include "bu/list.h"
19 15
@@ -62,6 +58,7 @@ namespace Bu
62 */ 58 */
63 class Server 59 class Server
64 { 60 {
61 friend class ServerInterface;
65 public: 62 public:
66 Server( int iIoWorkers=4, int iClientWorkers=8 ); 63 Server( int iIoWorkers=4, int iClientWorkers=8 );
67 virtual ~Server(); 64 virtual ~Server();
@@ -90,8 +87,12 @@ namespace Bu
90 87
91 void shutdown(); 88 void shutdown();
92 89
93 private: 90 protected:
91 void clientReadReady( fd iFd );
92 void clientWriteReady( fd iFd );
94 void closeClient( fd iSocket ); 93 void closeClient( fd iSocket );
94
95 private:
95 class SrvClientLink : public Bu::ClientLink 96 class SrvClientLink : public Bu::ClientLink
96 { 97 {
97 public: 98 public:
@@ -113,6 +114,19 @@ namespace Bu
113 virtual Bu::ClientLink *createLink( Bu::Client *pClient ); 114 virtual Bu::ClientLink *createLink( Bu::Client *pClient );
114 }; 115 };
115 116
117 class WriteMonitor : public Bu::Thread
118 {
119 public:
120 WriteMonitor( Server &rSrv );
121 virtual ~WriteMonitor();
122
123 protected:
124 virtual void run();
125
126 private:
127 Server &rSrv;
128 };
129
116 class Event 130 class Event
117 { 131 {
118 public: 132 public:
@@ -168,9 +182,9 @@ namespace Bu
168 }; 182 };
169 friend class Bu::Server::ClientWorker; 183 friend class Bu::Server::ClientWorker;
170 184
185 class __ServerCore *pCore;
171 int nTimeoutSec; 186 int nTimeoutSec;
172 int nTimeoutUSec; 187 int nTimeoutUSec;
173 fd_set fdActive;
174 typedef Hash<fd,ServerSocket *> SrvHash; 188 typedef Hash<fd,ServerSocket *> SrvHash;
175 SrvHash hServers; 189 SrvHash hServers;
176 typedef Hash<fd,Client *> ClientHash; 190 typedef Hash<fd,Client *> ClientHash;
@@ -188,7 +202,8 @@ namespace Bu
188 typedef List<ClientWorker *> ClientWorkerList; 202 typedef List<ClientWorker *> ClientWorkerList;
189 IoWorkerList lIoWorker; 203 IoWorkerList lIoWorker;
190 ClientWorkerList lClientWorker; 204 ClientWorkerList lClientWorker;
191 Bu::CounterEvent evIoCycle; 205 WriteMonitor tMonitorWrite;
206 bool bRunning;
192 }; 207 };
193} 208}
194 209
diff --git a/src/stable/serverinterface.cpp b/src/stable/serverinterface.cpp
new file mode 100644
index 0000000..0e3122e
--- /dev/null
+++ b/src/stable/serverinterface.cpp
@@ -0,0 +1,29 @@
1/*
2 * Copyright (C) 2007-2019 Xagasoft, All rights reserved.
3 *
4 * This file is part of the libbu++ library and is released under the
5 * terms of the license contained in the file LICENSE.
6 */
7
8#include "bu/serverinterface.h"
9#include "bu/server.h"
10
11Bu::ServerInterface::ServerInterface( Server &rSrv ) :
12 rSrv( rSrv )
13{
14}
15
16Bu::ServerInterface::ServerInterface( const ServerInterface &rSrc ) :
17 rSrv( rSrc.rSrv )
18{
19}
20
21Bu::ServerInterface::~ServerInterface()
22{
23}
24
25void Bu::ServerInterface::outputReady( int iClientId )
26{
27 rSrv.clientWriteReady( iClientId );
28}
29
diff --git a/src/stable/serverinterface.h b/src/stable/serverinterface.h
new file mode 100644
index 0000000..edc8335
--- /dev/null
+++ b/src/stable/serverinterface.h
@@ -0,0 +1,40 @@
1/*
2 * Copyright (C) 2007-2019 Xagasoft, All rights reserved.
3 *
4 * This file is part of the libbu++ library and is released under the
5 * terms of the license contained in the file LICENSE.
6 */
7
8#ifndef BU_SERVER_INTERFACE_H
9#define BU_SERVER_INTERFACE_H
10
11#include <stdint.h>
12
13namespace Bu
14{
15 class Server;
16
17 class ServerInterface
18 {
19 friend class Bu::Server;
20 private:
21 ServerInterface( Bu::Server &rSrv );
22 public:
23 ServerInterface( const Bu::ServerInterface &rSrc );
24 ~ServerInterface();
25
26 /**
27 * Clients will call this when they have filled the output buffer and
28 * are ready for that data to be sent. This only needs to be called
29 * when async output is generated, that is when output is generated not
30 * in the processInput method.
31 */
32 void outputReady( int iClientId );
33
34 private:
35 Bu::Server &rSrv;
36 };
37}
38
39#endif
40
diff --git a/src/unstable/protocolwebsocket.cpp b/src/unstable/protocolwebsocket.cpp
index fa2d882..2bec1ce 100644
--- a/src/unstable/protocolwebsocket.cpp
+++ b/src/unstable/protocolwebsocket.cpp
@@ -21,8 +21,8 @@
21 21
22#include <stdlib.h> 22#include <stdlib.h>
23 23
24//#define DEBUG( X ) { } (void)0 24#define DEBUG( X ) { } (void)0
25#define DEBUG( X ) { X; } (void)0 25//#define DEBUG( X ) { X; } (void)0
26 26
27Bu::ProtocolWebSocket::ProtocolWebSocket() : 27Bu::ProtocolWebSocket::ProtocolWebSocket() :
28 eStatus( stProtoId ) 28 eStatus( stProtoId )
@@ -120,6 +120,7 @@ void Bu::ProtocolWebSocket::writeMessage( const Bu::String &sData,
120 return; 120 return;
121 pClient->write( cHeader, idx ); 121 pClient->write( cHeader, idx );
122 pClient->write( sData ); 122 pClient->write( sData );
123 pClient->outputReady();
123} 124}
124 125
125bool Bu::ProtocolWebSocket::stateProtoId() 126bool Bu::ProtocolWebSocket::stateProtoId()