diff options
| author | Mike Buland <eichlan@xagasoft.com> | 2023-07-31 15:17:52 -0700 |
|---|---|---|
| committer | Mike Buland <eichlan@xagasoft.com> | 2023-07-31 15:17:52 -0700 |
| commit | a49af97abf091a32f6ec2c3985aa0890ded65d9c (patch) | |
| tree | 9332acb37ccaa1552bf273d2383d4a0292deea14 /src/stable/server.cpp | |
| parent | 9a7dde29dc1bc1f699508ad9c0335f4d7abf319f (diff) | |
| download | libbu++-a49af97abf091a32f6ec2c3985aa0890ded65d9c.tar.gz libbu++-a49af97abf091a32f6ec2c3985aa0890ded65d9c.tar.bz2 libbu++-a49af97abf091a32f6ec2c3985aa0890ded65d9c.tar.xz libbu++-a49af97abf091a32f6ec2c3985aa0890ded65d9c.zip | |
Debugging yet, but the new server works.
It at least seems to!
Diffstat (limited to 'src/stable/server.cpp')
| -rw-r--r-- | src/stable/server.cpp | 194 |
1 files changed, 151 insertions, 43 deletions
diff --git a/src/stable/server.cpp b/src/stable/server.cpp index 7c44c11..cc89f64 100644 --- a/src/stable/server.cpp +++ b/src/stable/server.cpp | |||
| @@ -8,11 +8,14 @@ | |||
| 8 | #include "bu/server.h" | 8 | #include "bu/server.h" |
| 9 | #include <errno.h> | 9 | #include <errno.h> |
| 10 | #include <unistd.h> | 10 | #include <unistd.h> |
| 11 | #include <sys/epoll.h> | ||
| 12 | #include <sys/eventfd.h> | ||
| 11 | #include "bu/serversocket.h" | 13 | #include "bu/serversocket.h" |
| 12 | #include "bu/client.h" | 14 | #include "bu/client.h" |
| 13 | #include "bu/socket.h" | 15 | #include "bu/socket.h" |
| 14 | #include "bu/config.h" | 16 | #include "bu/config.h" |
| 15 | #include "bu/mutexlocker.h" | 17 | #include "bu/mutexlocker.h" |
| 18 | #include "bu/serverinterface.h" | ||
| 16 | 19 | ||
| 17 | #include "bu/sio.h" | 20 | #include "bu/sio.h" |
| 18 | 21 | ||
| @@ -25,14 +28,48 @@ | |||
| 25 | #endif | 28 | #endif |
| 26 | 29 | ||
| 27 | #define RBS 1500 | 30 | #define RBS 1500 |
| 31 | #define MAX_EVENTS 20 | ||
| 32 | |||
| 33 | namespace Bu | ||
| 34 | { | ||
| 35 | class __ServerCore | ||
| 36 | { | ||
| 37 | public: | ||
| 38 | __ServerCore() | ||
| 39 | { | ||
| 40 | fdRead = epoll_create( 1 ); | ||
| 41 | fdWrite = epoll_create( 1 ); | ||
| 42 | fdEvent = eventfd( 0, 0 ); | ||
| 43 | |||
| 44 | struct epoll_event ev; | ||
| 45 | ev.events = EPOLLIN; | ||
| 46 | ev.data.fd = fdEvent; | ||
| 47 | epoll_ctl( fdRead, EPOLL_CTL_ADD, fdEvent, &ev ); | ||
| 48 | epoll_ctl( fdWrite, EPOLL_CTL_ADD, fdEvent, &ev ); | ||
| 49 | } | ||
| 50 | |||
| 51 | ~__ServerCore() | ||
| 52 | { | ||
| 53 | close( fdRead ); | ||
| 54 | close( fdWrite ); | ||
| 55 | close( fdEvent ); | ||
| 56 | } | ||
| 57 | |||
| 58 | Server::fd fdRead; | ||
| 59 | Server::fd fdWrite; | ||
| 60 | Server::fd fdEvent; | ||
| 61 | }; | ||
| 62 | } | ||
| 28 | 63 | ||
| 29 | Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : | 64 | Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : |
| 65 | pCore( new Bu::__ServerCore() ), | ||
| 30 | nTimeoutSec( 0 ), | 66 | nTimeoutSec( 0 ), |
| 31 | nTimeoutUSec( 0 ), | 67 | nTimeoutUSec( 0 ), |
| 32 | bAutoTick( false ) | 68 | bAutoTick( false ), |
| 69 | tMonitorWrite( *this ), | ||
| 70 | bRunning( true ) | ||
| 33 | { | 71 | { |
| 34 | BU_PROFILE_START("server"); | 72 | BU_PROFILE_START("server"); |
| 35 | FD_ZERO( &fdActive ); | ||
| 36 | 73 | ||
| 37 | if( iIoWorkers < 1 ) | 74 | if( iIoWorkers < 1 ) |
| 38 | iIoWorkers = 1; | 75 | iIoWorkers = 1; |
| @@ -52,11 +89,14 @@ Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : | |||
| 52 | lClientWorker.append( pWorker ); | 89 | lClientWorker.append( pWorker ); |
| 53 | pWorker->start(); | 90 | pWorker->start(); |
| 54 | } | 91 | } |
| 92 | tMonitorWrite.start(); | ||
| 55 | } | 93 | } |
| 56 | 94 | ||
| 57 | Bu::Server::~Server() | 95 | Bu::Server::~Server() |
| 58 | { | 96 | { |
| 59 | shutdown(); | 97 | shutdown(); |
| 98 | tMonitorWrite.join(); | ||
| 99 | delete pCore; | ||
| 60 | BU_PROFILE_START("server"); | 100 | BU_PROFILE_START("server"); |
| 61 | } | 101 | } |
| 62 | 102 | ||
| @@ -68,7 +108,11 @@ void Bu::Server::addServerSocket( Bu::ServerSocket *pSocket ) | |||
| 68 | throw Bu::ExceptionBase("Cannot get file descriptor from " | 108 | throw Bu::ExceptionBase("Cannot get file descriptor from " |
| 69 | "provided ServerSocket."); | 109 | "provided ServerSocket."); |
| 70 | } | 110 | } |
| 71 | FD_SET( iFd, &fdActive ); | 111 | |
| 112 | struct epoll_event ev; | ||
| 113 | ev.events = EPOLLIN; | ||
| 114 | ev.data.fd = iFd; | ||
| 115 | epoll_ctl( pCore->fdRead, EPOLL_CTL_ADD, iFd, &ev ); | ||
| 72 | hServers.insert( iFd, pSocket ); | 116 | hServers.insert( iFd, pSocket ); |
| 73 | } | 117 | } |
| 74 | 118 | ||
| @@ -81,26 +125,22 @@ void Bu::Server::setTimeout( int nTimeoutSec, int nTimeoutUSec ) | |||
| 81 | void Bu::Server::scan() | 125 | void Bu::Server::scan() |
| 82 | { | 126 | { |
| 83 | BU_PROFILE_START("scan"); | 127 | BU_PROFILE_START("scan"); |
| 84 | struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec }; | ||
| 85 | 128 | ||
| 86 | fd_set fdRead = fdActive; | ||
| 87 | fd_set fdWrite /* = fdActive*/; | ||
| 88 | fd_set fdException = fdActive; | ||
| 89 | |||
| 90 | FD_ZERO( &fdWrite ); | ||
| 91 | mClients.lock(); | 129 | mClients.lock(); |
| 92 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) | 130 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) |
| 93 | { | 131 | { |
| 94 | if( (*i)->hasOutput() ) | 132 | if( (*i)->hasOutput() ) |
| 95 | { | 133 | { |
| 96 | Bu::println("Socket has output..."); | 134 | // Queue it for output here? |
| 97 | FD_SET( i.getKey(), &fdWrite ); | ||
| 98 | } | 135 | } |
| 99 | } | 136 | } |
| 100 | mClients.unlock(); | 137 | mClients.unlock(); |
| 101 | 138 | ||
| 102 | if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, | 139 | struct epoll_event ev[MAX_EVENTS]; |
| 103 | &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 ) | 140 | |
| 141 | int iCount = epoll_wait( pCore->fdRead, ev, MAX_EVENTS, nTimeoutSec*1000+nTimeoutUSec/1000 ); | ||
| 142 | |||
| 143 | if( iCount < 0 ) | ||
| 104 | { | 144 | { |
| 105 | char buf[1024]; | 145 | char buf[1024]; |
| 106 | strerror_r( errno, buf, 1024 ); | 146 | strerror_r( errno, buf, 1024 ); |
| @@ -110,41 +150,29 @@ void Bu::Server::scan() | |||
| 110 | ); | 150 | ); |
| 111 | } | 151 | } |
| 112 | 152 | ||
| 113 | evIoCycle.clear(); | ||
| 114 | Bu::println("Cycle clear"); | 153 | Bu::println("Cycle clear"); |
| 115 | 154 | ||
| 116 | for( int j = 0; j < FD_SETSIZE; j++ ) | 155 | for( int j = 0; j < iCount; j++ ) |
| 117 | { | 156 | { |
| 118 | if( FD_ISSET( j, &fdRead ) ) | 157 | if( hServers.has( ev[j].data.fd ) ) |
| 119 | { | 158 | { |
| 120 | if( hServers.has( j ) ) | 159 | Bu::ServerSocket *pSrv = hServers.get( ev[j].data.fd ); |
| 121 | { | 160 | addClient( pSrv, pSrv->accept() ); |
| 122 | Bu::ServerSocket *pSrv = hServers.get( j ); | ||
| 123 | addClient( pSrv, pSrv->accept() ); | ||
| 124 | } | ||
| 125 | else | ||
| 126 | { | ||
| 127 | evIoCycle.increment(); | ||
| 128 | Bu::println("Increment (read)"); | ||
| 129 | qIoEvent.enqueue( new Event( j, Event::Read ) ); | ||
| 130 | } | ||
| 131 | } | 161 | } |
| 132 | if( FD_ISSET( j, &fdWrite ) ) | 162 | else |
| 133 | { | 163 | { |
| 134 | evIoCycle.increment(); | 164 | qIoEvent.enqueue( new Event( ev[j].data.fd, Event::Read ) ); |
| 135 | Bu::println("Increment (write)"); | ||
| 136 | qIoEvent.enqueue( new Event( j, Event::Write ) ); | ||
| 137 | } | 165 | } |
| 138 | } | 166 | } |
| 139 | 167 | ||
| 140 | Bu::println("Waiting"); | 168 | Bu::println("Waiting"); |
| 141 | while( evIoCycle.wait() > 0 ) { } | ||
| 142 | 169 | ||
| 143 | Bu::List<int> lDelete; | 170 | Bu::List<int> lDelete; |
| 144 | // Now we just try to write all the pending data on all the sockets. | 171 | // Now we just try to write all the pending data on all the sockets. |
| 145 | // this could be done better eventually, if we care about the socket | 172 | // this could be done better eventually, if we care about the socket |
| 146 | // wanting to accept writes (using a select). | 173 | // wanting to accept writes (using a select). |
| 147 | mClients.lock(); | 174 | mClients.lock(); |
| 175 | |||
| 148 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) | 176 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) |
| 149 | { | 177 | { |
| 150 | if( (*i)->wantsDisconnect() && !(*i)->hasOutput() ) | 178 | if( (*i)->wantsDisconnect() && !(*i)->hasOutput() ) |
| @@ -174,10 +202,18 @@ void Bu::Server::addClient( const Bu::ServerSocket *pSrv, Bu::Socket *pSocket ) | |||
| 174 | { | 202 | { |
| 175 | throw Bu::ExceptionBase("No file descriptor?"); | 203 | throw Bu::ExceptionBase("No file descriptor?"); |
| 176 | } | 204 | } |
| 177 | FD_SET( iFdCli, &fdActive ); | 205 | |
| 206 | struct epoll_event ev; | ||
| 207 | ev.events = EPOLLIN | EPOLLONESHOT; | ||
| 208 | ev.data.fd = iFdCli; | ||
| 209 | epoll_ctl( pCore->fdRead, EPOLL_CTL_ADD, iFdCli, &ev ); | ||
| 210 | |||
| 211 | ev.events = 0; | ||
| 212 | epoll_ctl( pCore->fdWrite, EPOLL_CTL_ADD, iFdCli, &ev ); | ||
| 178 | 213 | ||
| 179 | Client *pClient = new Client( | 214 | Client *pClient = new Client( |
| 180 | new SrvClientLinkFactory() | 215 | iFdCli, |
| 216 | ServerInterface( *this ) | ||
| 181 | ); | 217 | ); |
| 182 | { | 218 | { |
| 183 | Bu::MutexLocker l( mClients ); | 219 | Bu::MutexLocker l( mClients ); |
| @@ -252,6 +288,11 @@ void Bu::Server::tick() | |||
| 252 | void Bu::Server::shutdown() | 288 | void Bu::Server::shutdown() |
| 253 | { | 289 | { |
| 254 | { | 290 | { |
| 291 | int64_t iVal = 1; | ||
| 292 | write( pCore->fdEvent, &iVal, sizeof(int64_t) ); | ||
| 293 | } | ||
| 294 | |||
| 295 | { | ||
| 255 | qIoEvent.stop(); | 296 | qIoEvent.stop(); |
| 256 | qClientEvent.stop(); | 297 | qClientEvent.stop(); |
| 257 | Bu::Server::Event *pEv; | 298 | Bu::Server::Event *pEv; |
| @@ -294,6 +335,24 @@ void Bu::Server::shutdown() | |||
| 294 | } | 335 | } |
| 295 | hClients.clear(); | 336 | hClients.clear(); |
| 296 | } | 337 | } |
| 338 | |||
| 339 | void Bu::Server::clientReadReady( fd iFd ) | ||
| 340 | { | ||
| 341 | // Re-arm | ||
| 342 | struct epoll_event ev; | ||
| 343 | ev.events = EPOLLIN | EPOLLONESHOT; | ||
| 344 | ev.data.fd = iFd; | ||
| 345 | epoll_ctl( pCore->fdRead, EPOLL_CTL_MOD, iFd, &ev ); | ||
| 346 | } | ||
| 347 | |||
| 348 | void Bu::Server::clientWriteReady( fd iFd ) | ||
| 349 | { | ||
| 350 | // Re-arm | ||
| 351 | struct epoll_event ev; | ||
| 352 | ev.events = EPOLLOUT | EPOLLONESHOT; | ||
| 353 | ev.data.fd = iFd; | ||
| 354 | epoll_ctl( pCore->fdWrite, EPOLL_CTL_MOD, iFd, &ev ); | ||
| 355 | } | ||
| 297 | 356 | ||
| 298 | void Bu::Server::closeClient( fd iSocket ) | 357 | void Bu::Server::closeClient( fd iSocket ) |
| 299 | { | 358 | { |
| @@ -306,13 +365,60 @@ void Bu::Server::closeClient( fd iSocket ) | |||
| 306 | hClients.erase( iSocket ); | 365 | hClients.erase( iSocket ); |
| 307 | pSocket->close(); | 366 | pSocket->close(); |
| 308 | hSockets.erase( iSocket ); | 367 | hSockets.erase( iSocket ); |
| 309 | FD_CLR( iSocket, &fdActive ); | 368 | |
| 369 | struct epoll_event ev; | ||
| 370 | epoll_ctl( pCore->fdRead, EPOLL_CTL_DEL, iSocket, &ev ); | ||
| 371 | epoll_ctl( pCore->fdWrite, EPOLL_CTL_DEL, iSocket, &ev ); | ||
| 372 | |||
| 310 | delete pClient; | 373 | delete pClient; |
| 311 | delete pSocket; | 374 | delete pSocket; |
| 312 | BU_PROFILE_END("closeClient"); | 375 | BU_PROFILE_END("closeClient"); |
| 313 | } | 376 | } |
| 314 | 377 | ||
| 315 | //////// | 378 | //////// |
| 379 | // WriteMonitor | ||
| 380 | //// | ||
| 381 | Bu::Server::WriteMonitor::WriteMonitor( Server &rSrv ) : | ||
| 382 | rSrv( rSrv ) | ||
| 383 | { | ||
| 384 | } | ||
| 385 | |||
| 386 | Bu::Server::WriteMonitor::~WriteMonitor() | ||
| 387 | { | ||
| 388 | } | ||
| 389 | |||
| 390 | void Bu::Server::WriteMonitor::run() | ||
| 391 | { | ||
| 392 | setName("busrv-writeMon"); | ||
| 393 | struct epoll_event ev[MAX_EVENTS]; | ||
| 394 | for(;;) | ||
| 395 | { | ||
| 396 | int iCount = epoll_wait( rSrv.pCore->fdWrite, ev, MAX_EVENTS, 5000 ); | ||
| 397 | if( iCount < 0 ) | ||
| 398 | { | ||
| 399 | // Bad error? | ||
| 400 | return; | ||
| 401 | } | ||
| 402 | |||
| 403 | for( int j = 0; j < iCount; j++ ) | ||
| 404 | { | ||
| 405 | if( ev[j].data.fd == rSrv.pCore->fdEvent ) | ||
| 406 | { | ||
| 407 | Bu::println("Bu::Server::WriteMonitor -> got event on fdEvent, exiting..."); | ||
| 408 | return; | ||
| 409 | } | ||
| 410 | else | ||
| 411 | { | ||
| 412 | // Queue the write | ||
| 413 | rSrv.qIoEvent.enqueue( | ||
| 414 | new Event( ev[j].data.fd, Event::Write ) | ||
| 415 | ); | ||
| 416 | } | ||
| 417 | } | ||
| 418 | } | ||
| 419 | } | ||
| 420 | |||
| 421 | //////// | ||
| 316 | // Event | 422 | // Event |
| 317 | //// | 423 | //// |
| 318 | 424 | ||
| @@ -365,7 +471,6 @@ void Bu::Server::IoWorker::run() | |||
| 365 | if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) ) | 471 | if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) ) |
| 366 | { | 472 | { |
| 367 | delete pEv; | 473 | delete pEv; |
| 368 | rSrv.evIoCycle.decrement(); | ||
| 369 | continue; | 474 | continue; |
| 370 | } | 475 | } |
| 371 | 476 | ||
| @@ -384,9 +489,6 @@ void Bu::Server::IoWorker::run() | |||
| 384 | } | 489 | } |
| 385 | 490 | ||
| 386 | delete pEv; | 491 | delete pEv; |
| 387 | |||
| 388 | Bu::println("decrement"); | ||
| 389 | rSrv.evIoCycle.decrement(); | ||
| 390 | } | 492 | } |
| 391 | } | 493 | } |
| 392 | 494 | ||
| @@ -404,7 +506,6 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) | |||
| 404 | try | 506 | try |
| 405 | { | 507 | { |
| 406 | iRead = pSocket->read( buf, RBS ); | 508 | iRead = pSocket->read( buf, RBS ); |
| 407 | Bu::println("IoWorker::handleRead: read() -> %1").arg( iRead ); | ||
| 408 | 509 | ||
| 409 | if( iRead == 0 ) | 510 | if( iRead == 0 ) |
| 410 | { | 511 | { |
| @@ -421,7 +522,8 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) | |||
| 421 | catch( Bu::ExceptionBase &e ) | 522 | catch( Bu::ExceptionBase &e ) |
| 422 | { | 523 | { |
| 423 | Bu::println("IoWorker::handleRead: exception, closing: %1").arg( e.what() ); | 524 | Bu::println("IoWorker::handleRead: exception, closing: %1").arg( e.what() ); |
| 424 | close( pSocket ); | 525 | pClient->disconnect(); |
| 526 | //close( pSocket ); | ||
| 425 | return; | 527 | return; |
| 426 | } | 528 | } |
| 427 | } | 529 | } |
| @@ -430,13 +532,16 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) | |||
| 430 | if( iTotal == 0 ) | 532 | if( iTotal == 0 ) |
| 431 | { | 533 | { |
| 432 | Bu::println("IoWorker::handleRead: read nothing, closing"); | 534 | Bu::println("IoWorker::handleRead: read nothing, closing"); |
| 433 | close( pSocket ); | 535 | pClient->disconnect(); |
| 536 | //close( pSocket ); | ||
| 434 | } | 537 | } |
| 435 | else | 538 | else |
| 436 | { | 539 | { |
| 437 | Bu::Server::fd iFd; | 540 | Bu::Server::fd iFd; |
| 438 | pSocket->getFd( iFd ); | 541 | pSocket->getFd( iFd ); |
| 439 | rSrv.qClientEvent.enqueue( new Event( iFd, Event::Process ) ); | 542 | rSrv.qClientEvent.enqueue( new Event( iFd, Event::Process ) ); |
| 543 | |||
| 544 | rSrv.clientReadReady( iFd ); | ||
| 440 | } | 545 | } |
| 441 | } | 546 | } |
| 442 | 547 | ||
| @@ -449,7 +554,6 @@ void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket ) | |||
| 449 | int iAmnt = RBS; | 554 | int iAmnt = RBS; |
| 450 | iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); | 555 | iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); |
| 451 | int iReal = pSocket->write( buf, iAmnt ); | 556 | int iReal = pSocket->write( buf, iAmnt ); |
| 452 | Bu::println("IoWorker::handleWrite: Copy out: iAmnt=%1, iReal=%2\n>%3<").arg( iAmnt ).arg( iReal ).arg( Bu::String( buf, iReal ) ); | ||
| 453 | pClient->cbBuffer.server().seek( iReal ); | 557 | pClient->cbBuffer.server().seek( iReal ); |
| 454 | } | 558 | } |
| 455 | } | 559 | } |
| @@ -494,6 +598,10 @@ void Bu::Server::ClientWorker::run() | |||
| 494 | pClient->processInput(); | 598 | pClient->processInput(); |
| 495 | Bu::println("Processing input complete."); | 599 | Bu::println("Processing input complete."); |
| 496 | Bu::println("*** ClientBuf: input: %1b, output: %2b").arg( pClient->getInputSize() ).arg( pClient->getOutputSize() ); | 600 | Bu::println("*** ClientBuf: input: %1b, output: %2b").arg( pClient->getInputSize() ).arg( pClient->getOutputSize() ); |
| 601 | if( pClient->getOutputSize() > 0 ) | ||
| 602 | { | ||
| 603 | rSrv.clientWriteReady( pClient->getId() ); | ||
| 604 | } | ||
| 497 | delete pEv; | 605 | delete pEv; |
| 498 | } | 606 | } |
| 499 | } | 607 | } |
