aboutsummaryrefslogtreecommitdiff
path: root/src/stable/server.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/stable/server.cpp')
-rw-r--r--src/stable/server.cpp194
1 files changed, 151 insertions, 43 deletions
diff --git a/src/stable/server.cpp b/src/stable/server.cpp
index 7c44c11..cc89f64 100644
--- a/src/stable/server.cpp
+++ b/src/stable/server.cpp
@@ -8,11 +8,14 @@
8#include "bu/server.h" 8#include "bu/server.h"
9#include <errno.h> 9#include <errno.h>
10#include <unistd.h> 10#include <unistd.h>
11#include <sys/epoll.h>
12#include <sys/eventfd.h>
11#include "bu/serversocket.h" 13#include "bu/serversocket.h"
12#include "bu/client.h" 14#include "bu/client.h"
13#include "bu/socket.h" 15#include "bu/socket.h"
14#include "bu/config.h" 16#include "bu/config.h"
15#include "bu/mutexlocker.h" 17#include "bu/mutexlocker.h"
18#include "bu/serverinterface.h"
16 19
17#include "bu/sio.h" 20#include "bu/sio.h"
18 21
@@ -25,14 +28,48 @@
25#endif 28#endif
26 29
27#define RBS 1500 30#define RBS 1500
31#define MAX_EVENTS 20
32
33namespace Bu
34{
35 class __ServerCore
36 {
37 public:
38 __ServerCore()
39 {
40 fdRead = epoll_create( 1 );
41 fdWrite = epoll_create( 1 );
42 fdEvent = eventfd( 0, 0 );
43
44 struct epoll_event ev;
45 ev.events = EPOLLIN;
46 ev.data.fd = fdEvent;
47 epoll_ctl( fdRead, EPOLL_CTL_ADD, fdEvent, &ev );
48 epoll_ctl( fdWrite, EPOLL_CTL_ADD, fdEvent, &ev );
49 }
50
51 ~__ServerCore()
52 {
53 close( fdRead );
54 close( fdWrite );
55 close( fdEvent );
56 }
57
58 Server::fd fdRead;
59 Server::fd fdWrite;
60 Server::fd fdEvent;
61 };
62}
28 63
29Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : 64Bu::Server::Server( int iIoWorkers, int iClientWorkers ) :
65 pCore( new Bu::__ServerCore() ),
30 nTimeoutSec( 0 ), 66 nTimeoutSec( 0 ),
31 nTimeoutUSec( 0 ), 67 nTimeoutUSec( 0 ),
32 bAutoTick( false ) 68 bAutoTick( false ),
69 tMonitorWrite( *this ),
70 bRunning( true )
33{ 71{
34 BU_PROFILE_START("server"); 72 BU_PROFILE_START("server");
35 FD_ZERO( &fdActive );
36 73
37 if( iIoWorkers < 1 ) 74 if( iIoWorkers < 1 )
38 iIoWorkers = 1; 75 iIoWorkers = 1;
@@ -52,11 +89,14 @@ Bu::Server::Server( int iIoWorkers, int iClientWorkers ) :
52 lClientWorker.append( pWorker ); 89 lClientWorker.append( pWorker );
53 pWorker->start(); 90 pWorker->start();
54 } 91 }
92 tMonitorWrite.start();
55} 93}
56 94
57Bu::Server::~Server() 95Bu::Server::~Server()
58{ 96{
59 shutdown(); 97 shutdown();
98 tMonitorWrite.join();
99 delete pCore;
60 BU_PROFILE_START("server"); 100 BU_PROFILE_START("server");
61} 101}
62 102
@@ -68,7 +108,11 @@ void Bu::Server::addServerSocket( Bu::ServerSocket *pSocket )
68 throw Bu::ExceptionBase("Cannot get file descriptor from " 108 throw Bu::ExceptionBase("Cannot get file descriptor from "
69 "provided ServerSocket."); 109 "provided ServerSocket.");
70 } 110 }
71 FD_SET( iFd, &fdActive ); 111
112 struct epoll_event ev;
113 ev.events = EPOLLIN;
114 ev.data.fd = iFd;
115 epoll_ctl( pCore->fdRead, EPOLL_CTL_ADD, iFd, &ev );
72 hServers.insert( iFd, pSocket ); 116 hServers.insert( iFd, pSocket );
73} 117}
74 118
@@ -81,26 +125,22 @@ void Bu::Server::setTimeout( int nTimeoutSec, int nTimeoutUSec )
81void Bu::Server::scan() 125void Bu::Server::scan()
82{ 126{
83 BU_PROFILE_START("scan"); 127 BU_PROFILE_START("scan");
84 struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec };
85 128
86 fd_set fdRead = fdActive;
87 fd_set fdWrite /* = fdActive*/;
88 fd_set fdException = fdActive;
89
90 FD_ZERO( &fdWrite );
91 mClients.lock(); 129 mClients.lock();
92 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) 130 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ )
93 { 131 {
94 if( (*i)->hasOutput() ) 132 if( (*i)->hasOutput() )
95 { 133 {
96 Bu::println("Socket has output..."); 134 // Queue it for output here?
97 FD_SET( i.getKey(), &fdWrite );
98 } 135 }
99 } 136 }
100 mClients.unlock(); 137 mClients.unlock();
101 138
102 if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, 139 struct epoll_event ev[MAX_EVENTS];
103 &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 ) 140
141 int iCount = epoll_wait( pCore->fdRead, ev, MAX_EVENTS, nTimeoutSec*1000+nTimeoutUSec/1000 );
142
143 if( iCount < 0 )
104 { 144 {
105 char buf[1024]; 145 char buf[1024];
106 strerror_r( errno, buf, 1024 ); 146 strerror_r( errno, buf, 1024 );
@@ -110,41 +150,29 @@ void Bu::Server::scan()
110 ); 150 );
111 } 151 }
112 152
113 evIoCycle.clear();
114 Bu::println("Cycle clear"); 153 Bu::println("Cycle clear");
115 154
116 for( int j = 0; j < FD_SETSIZE; j++ ) 155 for( int j = 0; j < iCount; j++ )
117 { 156 {
118 if( FD_ISSET( j, &fdRead ) ) 157 if( hServers.has( ev[j].data.fd ) )
119 { 158 {
120 if( hServers.has( j ) ) 159 Bu::ServerSocket *pSrv = hServers.get( ev[j].data.fd );
121 { 160 addClient( pSrv, pSrv->accept() );
122 Bu::ServerSocket *pSrv = hServers.get( j );
123 addClient( pSrv, pSrv->accept() );
124 }
125 else
126 {
127 evIoCycle.increment();
128 Bu::println("Increment (read)");
129 qIoEvent.enqueue( new Event( j, Event::Read ) );
130 }
131 } 161 }
132 if( FD_ISSET( j, &fdWrite ) ) 162 else
133 { 163 {
134 evIoCycle.increment(); 164 qIoEvent.enqueue( new Event( ev[j].data.fd, Event::Read ) );
135 Bu::println("Increment (write)");
136 qIoEvent.enqueue( new Event( j, Event::Write ) );
137 } 165 }
138 } 166 }
139 167
140 Bu::println("Waiting"); 168 Bu::println("Waiting");
141 while( evIoCycle.wait() > 0 ) { }
142 169
143 Bu::List<int> lDelete; 170 Bu::List<int> lDelete;
144 // Now we just try to write all the pending data on all the sockets. 171 // Now we just try to write all the pending data on all the sockets.
145 // this could be done better eventually, if we care about the socket 172 // this could be done better eventually, if we care about the socket
146 // wanting to accept writes (using a select). 173 // wanting to accept writes (using a select).
147 mClients.lock(); 174 mClients.lock();
175
148 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) 176 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ )
149 { 177 {
150 if( (*i)->wantsDisconnect() && !(*i)->hasOutput() ) 178 if( (*i)->wantsDisconnect() && !(*i)->hasOutput() )
@@ -174,10 +202,18 @@ void Bu::Server::addClient( const Bu::ServerSocket *pSrv, Bu::Socket *pSocket )
174 { 202 {
175 throw Bu::ExceptionBase("No file descriptor?"); 203 throw Bu::ExceptionBase("No file descriptor?");
176 } 204 }
177 FD_SET( iFdCli, &fdActive ); 205
206 struct epoll_event ev;
207 ev.events = EPOLLIN | EPOLLONESHOT;
208 ev.data.fd = iFdCli;
209 epoll_ctl( pCore->fdRead, EPOLL_CTL_ADD, iFdCli, &ev );
210
211 ev.events = 0;
212 epoll_ctl( pCore->fdWrite, EPOLL_CTL_ADD, iFdCli, &ev );
178 213
179 Client *pClient = new Client( 214 Client *pClient = new Client(
180 new SrvClientLinkFactory() 215 iFdCli,
216 ServerInterface( *this )
181 ); 217 );
182 { 218 {
183 Bu::MutexLocker l( mClients ); 219 Bu::MutexLocker l( mClients );
@@ -252,6 +288,11 @@ void Bu::Server::tick()
252void Bu::Server::shutdown() 288void Bu::Server::shutdown()
253{ 289{
254 { 290 {
291 int64_t iVal = 1;
292 write( pCore->fdEvent, &iVal, sizeof(int64_t) );
293 }
294
295 {
255 qIoEvent.stop(); 296 qIoEvent.stop();
256 qClientEvent.stop(); 297 qClientEvent.stop();
257 Bu::Server::Event *pEv; 298 Bu::Server::Event *pEv;
@@ -294,6 +335,24 @@ void Bu::Server::shutdown()
294 } 335 }
295 hClients.clear(); 336 hClients.clear();
296} 337}
338
339void Bu::Server::clientReadReady( fd iFd )
340{
341 // Re-arm
342 struct epoll_event ev;
343 ev.events = EPOLLIN | EPOLLONESHOT;
344 ev.data.fd = iFd;
345 epoll_ctl( pCore->fdRead, EPOLL_CTL_MOD, iFd, &ev );
346}
347
348void Bu::Server::clientWriteReady( fd iFd )
349{
350 // Re-arm
351 struct epoll_event ev;
352 ev.events = EPOLLOUT | EPOLLONESHOT;
353 ev.data.fd = iFd;
354 epoll_ctl( pCore->fdWrite, EPOLL_CTL_MOD, iFd, &ev );
355}
297 356
298void Bu::Server::closeClient( fd iSocket ) 357void Bu::Server::closeClient( fd iSocket )
299{ 358{
@@ -306,13 +365,60 @@ void Bu::Server::closeClient( fd iSocket )
306 hClients.erase( iSocket ); 365 hClients.erase( iSocket );
307 pSocket->close(); 366 pSocket->close();
308 hSockets.erase( iSocket ); 367 hSockets.erase( iSocket );
309 FD_CLR( iSocket, &fdActive ); 368
369 struct epoll_event ev;
370 epoll_ctl( pCore->fdRead, EPOLL_CTL_DEL, iSocket, &ev );
371 epoll_ctl( pCore->fdWrite, EPOLL_CTL_DEL, iSocket, &ev );
372
310 delete pClient; 373 delete pClient;
311 delete pSocket; 374 delete pSocket;
312 BU_PROFILE_END("closeClient"); 375 BU_PROFILE_END("closeClient");
313} 376}
314 377
315//////// 378////////
379// WriteMonitor
380////
381Bu::Server::WriteMonitor::WriteMonitor( Server &rSrv ) :
382 rSrv( rSrv )
383{
384}
385
386Bu::Server::WriteMonitor::~WriteMonitor()
387{
388}
389
390void Bu::Server::WriteMonitor::run()
391{
392 setName("busrv-writeMon");
393 struct epoll_event ev[MAX_EVENTS];
394 for(;;)
395 {
396 int iCount = epoll_wait( rSrv.pCore->fdWrite, ev, MAX_EVENTS, 5000 );
397 if( iCount < 0 )
398 {
399 // Bad error?
400 return;
401 }
402
403 for( int j = 0; j < iCount; j++ )
404 {
405 if( ev[j].data.fd == rSrv.pCore->fdEvent )
406 {
407 Bu::println("Bu::Server::WriteMonitor -> got event on fdEvent, exiting...");
408 return;
409 }
410 else
411 {
412 // Queue the write
413 rSrv.qIoEvent.enqueue(
414 new Event( ev[j].data.fd, Event::Write )
415 );
416 }
417 }
418 }
419}
420
421////////
316// Event 422// Event
317//// 423////
318 424
@@ -365,7 +471,6 @@ void Bu::Server::IoWorker::run()
365 if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) ) 471 if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) )
366 { 472 {
367 delete pEv; 473 delete pEv;
368 rSrv.evIoCycle.decrement();
369 continue; 474 continue;
370 } 475 }
371 476
@@ -384,9 +489,6 @@ void Bu::Server::IoWorker::run()
384 } 489 }
385 490
386 delete pEv; 491 delete pEv;
387
388 Bu::println("decrement");
389 rSrv.evIoCycle.decrement();
390 } 492 }
391} 493}
392 494
@@ -404,7 +506,6 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket )
404 try 506 try
405 { 507 {
406 iRead = pSocket->read( buf, RBS ); 508 iRead = pSocket->read( buf, RBS );
407 Bu::println("IoWorker::handleRead: read() -> %1").arg( iRead );
408 509
409 if( iRead == 0 ) 510 if( iRead == 0 )
410 { 511 {
@@ -421,7 +522,8 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket )
421 catch( Bu::ExceptionBase &e ) 522 catch( Bu::ExceptionBase &e )
422 { 523 {
423 Bu::println("IoWorker::handleRead: exception, closing: %1").arg( e.what() ); 524 Bu::println("IoWorker::handleRead: exception, closing: %1").arg( e.what() );
424 close( pSocket ); 525 pClient->disconnect();
526 //close( pSocket );
425 return; 527 return;
426 } 528 }
427 } 529 }
@@ -430,13 +532,16 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket )
430 if( iTotal == 0 ) 532 if( iTotal == 0 )
431 { 533 {
432 Bu::println("IoWorker::handleRead: read nothing, closing"); 534 Bu::println("IoWorker::handleRead: read nothing, closing");
433 close( pSocket ); 535 pClient->disconnect();
536 //close( pSocket );
434 } 537 }
435 else 538 else
436 { 539 {
437 Bu::Server::fd iFd; 540 Bu::Server::fd iFd;
438 pSocket->getFd( iFd ); 541 pSocket->getFd( iFd );
439 rSrv.qClientEvent.enqueue( new Event( iFd, Event::Process ) ); 542 rSrv.qClientEvent.enqueue( new Event( iFd, Event::Process ) );
543
544 rSrv.clientReadReady( iFd );
440 } 545 }
441} 546}
442 547
@@ -449,7 +554,6 @@ void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket )
449 int iAmnt = RBS; 554 int iAmnt = RBS;
450 iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); 555 iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt );
451 int iReal = pSocket->write( buf, iAmnt ); 556 int iReal = pSocket->write( buf, iAmnt );
452 Bu::println("IoWorker::handleWrite: Copy out: iAmnt=%1, iReal=%2\n>%3<").arg( iAmnt ).arg( iReal ).arg( Bu::String( buf, iReal ) );
453 pClient->cbBuffer.server().seek( iReal ); 557 pClient->cbBuffer.server().seek( iReal );
454 } 558 }
455} 559}
@@ -494,6 +598,10 @@ void Bu::Server::ClientWorker::run()
494 pClient->processInput(); 598 pClient->processInput();
495 Bu::println("Processing input complete."); 599 Bu::println("Processing input complete.");
496 Bu::println("*** ClientBuf: input: %1b, output: %2b").arg( pClient->getInputSize() ).arg( pClient->getOutputSize() ); 600 Bu::println("*** ClientBuf: input: %1b, output: %2b").arg( pClient->getInputSize() ).arg( pClient->getOutputSize() );
601 if( pClient->getOutputSize() > 0 )
602 {
603 rSrv.clientWriteReady( pClient->getId() );
604 }
497 delete pEv; 605 delete pEv;
498 } 606 }
499} 607}