diff options
Diffstat (limited to 'src/stable/server.cpp')
-rw-r--r-- | src/stable/server.cpp | 194 |
1 files changed, 151 insertions, 43 deletions
diff --git a/src/stable/server.cpp b/src/stable/server.cpp index 7c44c11..cc89f64 100644 --- a/src/stable/server.cpp +++ b/src/stable/server.cpp | |||
@@ -8,11 +8,14 @@ | |||
8 | #include "bu/server.h" | 8 | #include "bu/server.h" |
9 | #include <errno.h> | 9 | #include <errno.h> |
10 | #include <unistd.h> | 10 | #include <unistd.h> |
11 | #include <sys/epoll.h> | ||
12 | #include <sys/eventfd.h> | ||
11 | #include "bu/serversocket.h" | 13 | #include "bu/serversocket.h" |
12 | #include "bu/client.h" | 14 | #include "bu/client.h" |
13 | #include "bu/socket.h" | 15 | #include "bu/socket.h" |
14 | #include "bu/config.h" | 16 | #include "bu/config.h" |
15 | #include "bu/mutexlocker.h" | 17 | #include "bu/mutexlocker.h" |
18 | #include "bu/serverinterface.h" | ||
16 | 19 | ||
17 | #include "bu/sio.h" | 20 | #include "bu/sio.h" |
18 | 21 | ||
@@ -25,14 +28,48 @@ | |||
25 | #endif | 28 | #endif |
26 | 29 | ||
27 | #define RBS 1500 | 30 | #define RBS 1500 |
31 | #define MAX_EVENTS 20 | ||
32 | |||
33 | namespace Bu | ||
34 | { | ||
35 | class __ServerCore | ||
36 | { | ||
37 | public: | ||
38 | __ServerCore() | ||
39 | { | ||
40 | fdRead = epoll_create( 1 ); | ||
41 | fdWrite = epoll_create( 1 ); | ||
42 | fdEvent = eventfd( 0, 0 ); | ||
43 | |||
44 | struct epoll_event ev; | ||
45 | ev.events = EPOLLIN; | ||
46 | ev.data.fd = fdEvent; | ||
47 | epoll_ctl( fdRead, EPOLL_CTL_ADD, fdEvent, &ev ); | ||
48 | epoll_ctl( fdWrite, EPOLL_CTL_ADD, fdEvent, &ev ); | ||
49 | } | ||
50 | |||
51 | ~__ServerCore() | ||
52 | { | ||
53 | close( fdRead ); | ||
54 | close( fdWrite ); | ||
55 | close( fdEvent ); | ||
56 | } | ||
57 | |||
58 | Server::fd fdRead; | ||
59 | Server::fd fdWrite; | ||
60 | Server::fd fdEvent; | ||
61 | }; | ||
62 | } | ||
28 | 63 | ||
29 | Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : | 64 | Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : |
65 | pCore( new Bu::__ServerCore() ), | ||
30 | nTimeoutSec( 0 ), | 66 | nTimeoutSec( 0 ), |
31 | nTimeoutUSec( 0 ), | 67 | nTimeoutUSec( 0 ), |
32 | bAutoTick( false ) | 68 | bAutoTick( false ), |
69 | tMonitorWrite( *this ), | ||
70 | bRunning( true ) | ||
33 | { | 71 | { |
34 | BU_PROFILE_START("server"); | 72 | BU_PROFILE_START("server"); |
35 | FD_ZERO( &fdActive ); | ||
36 | 73 | ||
37 | if( iIoWorkers < 1 ) | 74 | if( iIoWorkers < 1 ) |
38 | iIoWorkers = 1; | 75 | iIoWorkers = 1; |
@@ -52,11 +89,14 @@ Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : | |||
52 | lClientWorker.append( pWorker ); | 89 | lClientWorker.append( pWorker ); |
53 | pWorker->start(); | 90 | pWorker->start(); |
54 | } | 91 | } |
92 | tMonitorWrite.start(); | ||
55 | } | 93 | } |
56 | 94 | ||
57 | Bu::Server::~Server() | 95 | Bu::Server::~Server() |
58 | { | 96 | { |
59 | shutdown(); | 97 | shutdown(); |
98 | tMonitorWrite.join(); | ||
99 | delete pCore; | ||
60 | BU_PROFILE_START("server"); | 100 | BU_PROFILE_START("server"); |
61 | } | 101 | } |
62 | 102 | ||
@@ -68,7 +108,11 @@ void Bu::Server::addServerSocket( Bu::ServerSocket *pSocket ) | |||
68 | throw Bu::ExceptionBase("Cannot get file descriptor from " | 108 | throw Bu::ExceptionBase("Cannot get file descriptor from " |
69 | "provided ServerSocket."); | 109 | "provided ServerSocket."); |
70 | } | 110 | } |
71 | FD_SET( iFd, &fdActive ); | 111 | |
112 | struct epoll_event ev; | ||
113 | ev.events = EPOLLIN; | ||
114 | ev.data.fd = iFd; | ||
115 | epoll_ctl( pCore->fdRead, EPOLL_CTL_ADD, iFd, &ev ); | ||
72 | hServers.insert( iFd, pSocket ); | 116 | hServers.insert( iFd, pSocket ); |
73 | } | 117 | } |
74 | 118 | ||
@@ -81,26 +125,22 @@ void Bu::Server::setTimeout( int nTimeoutSec, int nTimeoutUSec ) | |||
81 | void Bu::Server::scan() | 125 | void Bu::Server::scan() |
82 | { | 126 | { |
83 | BU_PROFILE_START("scan"); | 127 | BU_PROFILE_START("scan"); |
84 | struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec }; | ||
85 | 128 | ||
86 | fd_set fdRead = fdActive; | ||
87 | fd_set fdWrite /* = fdActive*/; | ||
88 | fd_set fdException = fdActive; | ||
89 | |||
90 | FD_ZERO( &fdWrite ); | ||
91 | mClients.lock(); | 129 | mClients.lock(); |
92 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) | 130 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) |
93 | { | 131 | { |
94 | if( (*i)->hasOutput() ) | 132 | if( (*i)->hasOutput() ) |
95 | { | 133 | { |
96 | Bu::println("Socket has output..."); | 134 | // Queue it for output here? |
97 | FD_SET( i.getKey(), &fdWrite ); | ||
98 | } | 135 | } |
99 | } | 136 | } |
100 | mClients.unlock(); | 137 | mClients.unlock(); |
101 | 138 | ||
102 | if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, | 139 | struct epoll_event ev[MAX_EVENTS]; |
103 | &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 ) | 140 | |
141 | int iCount = epoll_wait( pCore->fdRead, ev, MAX_EVENTS, nTimeoutSec*1000+nTimeoutUSec/1000 ); | ||
142 | |||
143 | if( iCount < 0 ) | ||
104 | { | 144 | { |
105 | char buf[1024]; | 145 | char buf[1024]; |
106 | strerror_r( errno, buf, 1024 ); | 146 | strerror_r( errno, buf, 1024 ); |
@@ -110,41 +150,29 @@ void Bu::Server::scan() | |||
110 | ); | 150 | ); |
111 | } | 151 | } |
112 | 152 | ||
113 | evIoCycle.clear(); | ||
114 | Bu::println("Cycle clear"); | 153 | Bu::println("Cycle clear"); |
115 | 154 | ||
116 | for( int j = 0; j < FD_SETSIZE; j++ ) | 155 | for( int j = 0; j < iCount; j++ ) |
117 | { | 156 | { |
118 | if( FD_ISSET( j, &fdRead ) ) | 157 | if( hServers.has( ev[j].data.fd ) ) |
119 | { | 158 | { |
120 | if( hServers.has( j ) ) | 159 | Bu::ServerSocket *pSrv = hServers.get( ev[j].data.fd ); |
121 | { | 160 | addClient( pSrv, pSrv->accept() ); |
122 | Bu::ServerSocket *pSrv = hServers.get( j ); | ||
123 | addClient( pSrv, pSrv->accept() ); | ||
124 | } | ||
125 | else | ||
126 | { | ||
127 | evIoCycle.increment(); | ||
128 | Bu::println("Increment (read)"); | ||
129 | qIoEvent.enqueue( new Event( j, Event::Read ) ); | ||
130 | } | ||
131 | } | 161 | } |
132 | if( FD_ISSET( j, &fdWrite ) ) | 162 | else |
133 | { | 163 | { |
134 | evIoCycle.increment(); | 164 | qIoEvent.enqueue( new Event( ev[j].data.fd, Event::Read ) ); |
135 | Bu::println("Increment (write)"); | ||
136 | qIoEvent.enqueue( new Event( j, Event::Write ) ); | ||
137 | } | 165 | } |
138 | } | 166 | } |
139 | 167 | ||
140 | Bu::println("Waiting"); | 168 | Bu::println("Waiting"); |
141 | while( evIoCycle.wait() > 0 ) { } | ||
142 | 169 | ||
143 | Bu::List<int> lDelete; | 170 | Bu::List<int> lDelete; |
144 | // Now we just try to write all the pending data on all the sockets. | 171 | // Now we just try to write all the pending data on all the sockets. |
145 | // this could be done better eventually, if we care about the socket | 172 | // this could be done better eventually, if we care about the socket |
146 | // wanting to accept writes (using a select). | 173 | // wanting to accept writes (using a select). |
147 | mClients.lock(); | 174 | mClients.lock(); |
175 | |||
148 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) | 176 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) |
149 | { | 177 | { |
150 | if( (*i)->wantsDisconnect() && !(*i)->hasOutput() ) | 178 | if( (*i)->wantsDisconnect() && !(*i)->hasOutput() ) |
@@ -174,10 +202,18 @@ void Bu::Server::addClient( const Bu::ServerSocket *pSrv, Bu::Socket *pSocket ) | |||
174 | { | 202 | { |
175 | throw Bu::ExceptionBase("No file descriptor?"); | 203 | throw Bu::ExceptionBase("No file descriptor?"); |
176 | } | 204 | } |
177 | FD_SET( iFdCli, &fdActive ); | 205 | |
206 | struct epoll_event ev; | ||
207 | ev.events = EPOLLIN | EPOLLONESHOT; | ||
208 | ev.data.fd = iFdCli; | ||
209 | epoll_ctl( pCore->fdRead, EPOLL_CTL_ADD, iFdCli, &ev ); | ||
210 | |||
211 | ev.events = 0; | ||
212 | epoll_ctl( pCore->fdWrite, EPOLL_CTL_ADD, iFdCli, &ev ); | ||
178 | 213 | ||
179 | Client *pClient = new Client( | 214 | Client *pClient = new Client( |
180 | new SrvClientLinkFactory() | 215 | iFdCli, |
216 | ServerInterface( *this ) | ||
181 | ); | 217 | ); |
182 | { | 218 | { |
183 | Bu::MutexLocker l( mClients ); | 219 | Bu::MutexLocker l( mClients ); |
@@ -252,6 +288,11 @@ void Bu::Server::tick() | |||
252 | void Bu::Server::shutdown() | 288 | void Bu::Server::shutdown() |
253 | { | 289 | { |
254 | { | 290 | { |
291 | int64_t iVal = 1; | ||
292 | write( pCore->fdEvent, &iVal, sizeof(int64_t) ); | ||
293 | } | ||
294 | |||
295 | { | ||
255 | qIoEvent.stop(); | 296 | qIoEvent.stop(); |
256 | qClientEvent.stop(); | 297 | qClientEvent.stop(); |
257 | Bu::Server::Event *pEv; | 298 | Bu::Server::Event *pEv; |
@@ -294,6 +335,24 @@ void Bu::Server::shutdown() | |||
294 | } | 335 | } |
295 | hClients.clear(); | 336 | hClients.clear(); |
296 | } | 337 | } |
338 | |||
339 | void Bu::Server::clientReadReady( fd iFd ) | ||
340 | { | ||
341 | // Re-arm | ||
342 | struct epoll_event ev; | ||
343 | ev.events = EPOLLIN | EPOLLONESHOT; | ||
344 | ev.data.fd = iFd; | ||
345 | epoll_ctl( pCore->fdRead, EPOLL_CTL_MOD, iFd, &ev ); | ||
346 | } | ||
347 | |||
348 | void Bu::Server::clientWriteReady( fd iFd ) | ||
349 | { | ||
350 | // Re-arm | ||
351 | struct epoll_event ev; | ||
352 | ev.events = EPOLLOUT | EPOLLONESHOT; | ||
353 | ev.data.fd = iFd; | ||
354 | epoll_ctl( pCore->fdWrite, EPOLL_CTL_MOD, iFd, &ev ); | ||
355 | } | ||
297 | 356 | ||
298 | void Bu::Server::closeClient( fd iSocket ) | 357 | void Bu::Server::closeClient( fd iSocket ) |
299 | { | 358 | { |
@@ -306,13 +365,60 @@ void Bu::Server::closeClient( fd iSocket ) | |||
306 | hClients.erase( iSocket ); | 365 | hClients.erase( iSocket ); |
307 | pSocket->close(); | 366 | pSocket->close(); |
308 | hSockets.erase( iSocket ); | 367 | hSockets.erase( iSocket ); |
309 | FD_CLR( iSocket, &fdActive ); | 368 | |
369 | struct epoll_event ev; | ||
370 | epoll_ctl( pCore->fdRead, EPOLL_CTL_DEL, iSocket, &ev ); | ||
371 | epoll_ctl( pCore->fdWrite, EPOLL_CTL_DEL, iSocket, &ev ); | ||
372 | |||
310 | delete pClient; | 373 | delete pClient; |
311 | delete pSocket; | 374 | delete pSocket; |
312 | BU_PROFILE_END("closeClient"); | 375 | BU_PROFILE_END("closeClient"); |
313 | } | 376 | } |
314 | 377 | ||
315 | //////// | 378 | //////// |
379 | // WriteMonitor | ||
380 | //// | ||
381 | Bu::Server::WriteMonitor::WriteMonitor( Server &rSrv ) : | ||
382 | rSrv( rSrv ) | ||
383 | { | ||
384 | } | ||
385 | |||
386 | Bu::Server::WriteMonitor::~WriteMonitor() | ||
387 | { | ||
388 | } | ||
389 | |||
390 | void Bu::Server::WriteMonitor::run() | ||
391 | { | ||
392 | setName("busrv-writeMon"); | ||
393 | struct epoll_event ev[MAX_EVENTS]; | ||
394 | for(;;) | ||
395 | { | ||
396 | int iCount = epoll_wait( rSrv.pCore->fdWrite, ev, MAX_EVENTS, 5000 ); | ||
397 | if( iCount < 0 ) | ||
398 | { | ||
399 | // Bad error? | ||
400 | return; | ||
401 | } | ||
402 | |||
403 | for( int j = 0; j < iCount; j++ ) | ||
404 | { | ||
405 | if( ev[j].data.fd == rSrv.pCore->fdEvent ) | ||
406 | { | ||
407 | Bu::println("Bu::Server::WriteMonitor -> got event on fdEvent, exiting..."); | ||
408 | return; | ||
409 | } | ||
410 | else | ||
411 | { | ||
412 | // Queue the write | ||
413 | rSrv.qIoEvent.enqueue( | ||
414 | new Event( ev[j].data.fd, Event::Write ) | ||
415 | ); | ||
416 | } | ||
417 | } | ||
418 | } | ||
419 | } | ||
420 | |||
421 | //////// | ||
316 | // Event | 422 | // Event |
317 | //// | 423 | //// |
318 | 424 | ||
@@ -365,7 +471,6 @@ void Bu::Server::IoWorker::run() | |||
365 | if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) ) | 471 | if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) ) |
366 | { | 472 | { |
367 | delete pEv; | 473 | delete pEv; |
368 | rSrv.evIoCycle.decrement(); | ||
369 | continue; | 474 | continue; |
370 | } | 475 | } |
371 | 476 | ||
@@ -384,9 +489,6 @@ void Bu::Server::IoWorker::run() | |||
384 | } | 489 | } |
385 | 490 | ||
386 | delete pEv; | 491 | delete pEv; |
387 | |||
388 | Bu::println("decrement"); | ||
389 | rSrv.evIoCycle.decrement(); | ||
390 | } | 492 | } |
391 | } | 493 | } |
392 | 494 | ||
@@ -404,7 +506,6 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) | |||
404 | try | 506 | try |
405 | { | 507 | { |
406 | iRead = pSocket->read( buf, RBS ); | 508 | iRead = pSocket->read( buf, RBS ); |
407 | Bu::println("IoWorker::handleRead: read() -> %1").arg( iRead ); | ||
408 | 509 | ||
409 | if( iRead == 0 ) | 510 | if( iRead == 0 ) |
410 | { | 511 | { |
@@ -421,7 +522,8 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) | |||
421 | catch( Bu::ExceptionBase &e ) | 522 | catch( Bu::ExceptionBase &e ) |
422 | { | 523 | { |
423 | Bu::println("IoWorker::handleRead: exception, closing: %1").arg( e.what() ); | 524 | Bu::println("IoWorker::handleRead: exception, closing: %1").arg( e.what() ); |
424 | close( pSocket ); | 525 | pClient->disconnect(); |
526 | //close( pSocket ); | ||
425 | return; | 527 | return; |
426 | } | 528 | } |
427 | } | 529 | } |
@@ -430,13 +532,16 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) | |||
430 | if( iTotal == 0 ) | 532 | if( iTotal == 0 ) |
431 | { | 533 | { |
432 | Bu::println("IoWorker::handleRead: read nothing, closing"); | 534 | Bu::println("IoWorker::handleRead: read nothing, closing"); |
433 | close( pSocket ); | 535 | pClient->disconnect(); |
536 | //close( pSocket ); | ||
434 | } | 537 | } |
435 | else | 538 | else |
436 | { | 539 | { |
437 | Bu::Server::fd iFd; | 540 | Bu::Server::fd iFd; |
438 | pSocket->getFd( iFd ); | 541 | pSocket->getFd( iFd ); |
439 | rSrv.qClientEvent.enqueue( new Event( iFd, Event::Process ) ); | 542 | rSrv.qClientEvent.enqueue( new Event( iFd, Event::Process ) ); |
543 | |||
544 | rSrv.clientReadReady( iFd ); | ||
440 | } | 545 | } |
441 | } | 546 | } |
442 | 547 | ||
@@ -449,7 +554,6 @@ void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket ) | |||
449 | int iAmnt = RBS; | 554 | int iAmnt = RBS; |
450 | iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); | 555 | iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); |
451 | int iReal = pSocket->write( buf, iAmnt ); | 556 | int iReal = pSocket->write( buf, iAmnt ); |
452 | Bu::println("IoWorker::handleWrite: Copy out: iAmnt=%1, iReal=%2\n>%3<").arg( iAmnt ).arg( iReal ).arg( Bu::String( buf, iReal ) ); | ||
453 | pClient->cbBuffer.server().seek( iReal ); | 557 | pClient->cbBuffer.server().seek( iReal ); |
454 | } | 558 | } |
455 | } | 559 | } |
@@ -494,6 +598,10 @@ void Bu::Server::ClientWorker::run() | |||
494 | pClient->processInput(); | 598 | pClient->processInput(); |
495 | Bu::println("Processing input complete."); | 599 | Bu::println("Processing input complete."); |
496 | Bu::println("*** ClientBuf: input: %1b, output: %2b").arg( pClient->getInputSize() ).arg( pClient->getOutputSize() ); | 600 | Bu::println("*** ClientBuf: input: %1b, output: %2b").arg( pClient->getInputSize() ).arg( pClient->getOutputSize() ); |
601 | if( pClient->getOutputSize() > 0 ) | ||
602 | { | ||
603 | rSrv.clientWriteReady( pClient->getId() ); | ||
604 | } | ||
497 | delete pEv; | 605 | delete pEv; |
498 | } | 606 | } |
499 | } | 607 | } |