aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Buland <eichlan@xagasoft.com>2023-07-31 16:12:54 -0700
committerMike Buland <eichlan@xagasoft.com>2023-07-31 16:12:54 -0700
commitfc83cb3fa5b15e9fb20a9180a3880297cf398de7 (patch)
treea55a0174fdbfd77b9b325b089b1ec314284b5644
parenta49af97abf091a32f6ec2c3985aa0890ded65d9c (diff)
downloadlibbu++-fc83cb3fa5b15e9fb20a9180a3880297cf398de7.tar.gz
libbu++-fc83cb3fa5b15e9fb20a9180a3880297cf398de7.tar.bz2
libbu++-fc83cb3fa5b15e9fb20a9180a3880297cf398de7.tar.xz
libbu++-fc83cb3fa5b15e9fb20a9180a3880297cf398de7.zip
Issue found with recycled fds.
We don't always clean up instantly, but the system is reusing ids before we're ready.
-rw-r--r--src/stable/client.cpp2
-rw-r--r--src/stable/queuebuf.cpp1
-rw-r--r--src/stable/server.cpp57
-rw-r--r--src/stable/server.h2
4 files changed, 29 insertions, 33 deletions
diff --git a/src/stable/client.cpp b/src/stable/client.cpp
index 159d103..34dc798 100644
--- a/src/stable/client.cpp
+++ b/src/stable/client.cpp
@@ -13,8 +13,6 @@
13#include "bu/clientlinkfactory.h" 13#include "bu/clientlinkfactory.h"
14#include "bu/mutexlocker.h" 14#include "bu/mutexlocker.h"
15 15
16/** Read buffer size. */
17#define RBS (2000) // 1500 is the nominal MTU for ethernet, it's a good guess
18#ifdef PROFILE_BU_SERVER 16#ifdef PROFILE_BU_SERVER
19#define BU_PROFILE_START( x ) Bu::Profiler::getInstance().startEvent( x ) 17#define BU_PROFILE_START( x ) Bu::Profiler::getInstance().startEvent( x )
20#define BU_PROFILE_END( x ) Bu::Profiler::getInstance().endEvent( x ) 18#define BU_PROFILE_END( x ) Bu::Profiler::getInstance().endEvent( x )
diff --git a/src/stable/queuebuf.cpp b/src/stable/queuebuf.cpp
index f80f9d9..29da6ba 100644
--- a/src/stable/queuebuf.cpp
+++ b/src/stable/queuebuf.cpp
@@ -22,6 +22,7 @@ Bu::QueueBuf::~QueueBuf()
22{ 22{
23 for( BlockList::iterator i = lBlocks.begin(); i; i++ ) 23 for( BlockList::iterator i = lBlocks.begin(); i; i++ )
24 delete[] *i; 24 delete[] *i;
25 lBlocks.clear();
25} 26}
26 27
27void Bu::QueueBuf::close() 28void Bu::QueueBuf::close()
diff --git a/src/stable/server.cpp b/src/stable/server.cpp
index cc89f64..160b3eb 100644
--- a/src/stable/server.cpp
+++ b/src/stable/server.cpp
@@ -15,7 +15,6 @@
15#include "bu/socket.h" 15#include "bu/socket.h"
16#include "bu/config.h" 16#include "bu/config.h"
17#include "bu/mutexlocker.h" 17#include "bu/mutexlocker.h"
18#include "bu/serverinterface.h"
19 18
20#include "bu/sio.h" 19#include "bu/sio.h"
21 20
@@ -150,8 +149,6 @@ void Bu::Server::scan()
150 ); 149 );
151 } 150 }
152 151
153 Bu::println("Cycle clear");
154
155 for( int j = 0; j < iCount; j++ ) 152 for( int j = 0; j < iCount; j++ )
156 { 153 {
157 if( hServers.has( ev[j].data.fd ) ) 154 if( hServers.has( ev[j].data.fd ) )
@@ -165,8 +162,6 @@ void Bu::Server::scan()
165 } 162 }
166 } 163 }
167 164
168 Bu::println("Waiting");
169
170 Bu::List<int> lDelete; 165 Bu::List<int> lDelete;
171 // Now we just try to write all the pending data on all the sockets. 166 // Now we just try to write all the pending data on all the sockets.
172 // this could be done better eventually, if we care about the socket 167 // this could be done better eventually, if we care about the socket
@@ -203,24 +198,32 @@ void Bu::Server::addClient( const Bu::ServerSocket *pSrv, Bu::Socket *pSocket )
203 throw Bu::ExceptionBase("No file descriptor?"); 198 throw Bu::ExceptionBase("No file descriptor?");
204 } 199 }
205 200
206 struct epoll_event ev;
207 ev.events = EPOLLIN | EPOLLONESHOT;
208 ev.data.fd = iFdCli;
209 epoll_ctl( pCore->fdRead, EPOLL_CTL_ADD, iFdCli, &ev );
210
211 ev.events = 0;
212 epoll_ctl( pCore->fdWrite, EPOLL_CTL_ADD, iFdCli, &ev );
213
214 Client *pClient = new Client( 201 Client *pClient = new Client(
215 iFdCli, 202 iFdCli,
216 ServerInterface( *this ) 203 ServerInterface( *this )
217 ); 204 );
218 { 205 {
219 Bu::MutexLocker l( mClients ); 206 Bu::MutexLocker l( mClients );
207 if( hClients.has( iFdCli ) )
208 {
209 Bu::println("!!!!!!!!! hClients has %1 already.").arg( iFdCli );
210 }
211 if( hSockets.has( iFdCli ) )
212 {
213 Bu::println("!!!!!!!!! hSockets has %1 already.").arg( iFdCli );
214 }
220 hClients.insert( iFdCli, pClient ); 215 hClients.insert( iFdCli, pClient );
221 hSockets.insert( iFdCli, pSocket ); 216 hSockets.insert( iFdCli, pSocket );
222 } 217 }
223 218
219 struct epoll_event ev;
220 ev.events = EPOLLIN | EPOLLONESHOT;
221 ev.data.fd = iFdCli;
222 epoll_ctl( pCore->fdRead, EPOLL_CTL_ADD, iFdCli, &ev );
223
224 ev.events = 0;
225 epoll_ctl( pCore->fdWrite, EPOLL_CTL_ADD, iFdCli, &ev );
226
224 onNewConnection( pSrv, pClient, pSocket ); 227 onNewConnection( pSrv, pClient, pSocket );
225 BU_PROFILE_END("addClient"); 228 BU_PROFILE_END("addClient");
226} 229}
@@ -333,6 +336,7 @@ void Bu::Server::shutdown()
333 { 336 {
334 closeClient( *i ); 337 closeClient( *i );
335 } 338 }
339 Bu::println("Cleaning up clients. Erased %1, remaining: %2.").arg( lClients.getSize() ).arg( hClients.getSize() );
336 hClients.clear(); 340 hClients.clear();
337} 341}
338 342
@@ -358,6 +362,11 @@ void Bu::Server::closeClient( fd iSocket )
358{ 362{
359 Bu::MutexLocker l( mClients ); 363 Bu::MutexLocker l( mClients );
360 BU_PROFILE_START("closeClient"); 364 BU_PROFILE_START("closeClient");
365
366 struct epoll_event ev;
367 epoll_ctl( pCore->fdRead, EPOLL_CTL_DEL, iSocket, &ev );
368 epoll_ctl( pCore->fdWrite, EPOLL_CTL_DEL, iSocket, &ev );
369
361 Bu::Client *pClient = hClients.get( iSocket ); 370 Bu::Client *pClient = hClients.get( iSocket );
362 Bu::Socket *pSocket = hSockets.get( iSocket ); 371 Bu::Socket *pSocket = hSockets.get( iSocket );
363 onClosedConnection( pClient ); 372 onClosedConnection( pClient );
@@ -366,10 +375,6 @@ void Bu::Server::closeClient( fd iSocket )
366 pSocket->close(); 375 pSocket->close();
367 hSockets.erase( iSocket ); 376 hSockets.erase( iSocket );
368 377
369 struct epoll_event ev;
370 epoll_ctl( pCore->fdRead, EPOLL_CTL_DEL, iSocket, &ev );
371 epoll_ctl( pCore->fdWrite, EPOLL_CTL_DEL, iSocket, &ev );
372
373 delete pClient; 378 delete pClient;
374 delete pSocket; 379 delete pSocket;
375 BU_PROFILE_END("closeClient"); 380 BU_PROFILE_END("closeClient");
@@ -404,7 +409,7 @@ void Bu::Server::WriteMonitor::run()
404 { 409 {
405 if( ev[j].data.fd == rSrv.pCore->fdEvent ) 410 if( ev[j].data.fd == rSrv.pCore->fdEvent )
406 { 411 {
407 Bu::println("Bu::Server::WriteMonitor -> got event on fdEvent, exiting..."); 412 // got event on fdEvent, exiting...
408 return; 413 return;
409 } 414 }
410 else 415 else
@@ -498,8 +503,6 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket )
498 Bu::size iRead; 503 Bu::size iRead;
499 Bu::size iTotal=0; 504 Bu::size iTotal=0;
500 505
501 Bu::println("IoWorker::handleRead: starting");
502
503 BU_PROFILE_START("client.read"); 506 BU_PROFILE_START("client.read");
504 for(;;) 507 for(;;)
505 { 508 {
@@ -521,7 +524,6 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket )
521 } 524 }
522 catch( Bu::ExceptionBase &e ) 525 catch( Bu::ExceptionBase &e )
523 { 526 {
524 Bu::println("IoWorker::handleRead: exception, closing: %1").arg( e.what() );
525 pClient->disconnect(); 527 pClient->disconnect();
526 //close( pSocket ); 528 //close( pSocket );
527 return; 529 return;
@@ -531,7 +533,6 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket )
531 533
532 if( iTotal == 0 ) 534 if( iTotal == 0 )
533 { 535 {
534 Bu::println("IoWorker::handleRead: read nothing, closing");
535 pClient->disconnect(); 536 pClient->disconnect();
536 //close( pSocket ); 537 //close( pSocket );
537 } 538 }
@@ -547,9 +548,8 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket )
547 548
548void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket ) 549void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket )
549{ 550{
550 Bu::println("IoWorker::handleWrite() ");
551 char buf[RBS]; 551 char buf[RBS];
552 if( pClient->hasOutput() > 0 ) 552 while( pClient->hasOutput() > 0 )
553 { 553 {
554 int iAmnt = RBS; 554 int iAmnt = RBS;
555 iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); 555 iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt );
@@ -560,9 +560,9 @@ void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket )
560 560
561void Bu::Server::IoWorker::close( Bu::Socket *pSocket ) 561void Bu::Server::IoWorker::close( Bu::Socket *pSocket )
562{ 562{
563 Bu::Server::fd iFd; 563// Bu::Server::fd iFd;
564 pSocket->getFd( iFd ); 564// pSocket->getFd( iFd );
565 rSrv.closeClient( iFd ); 565// rSrv.closeClient( iFd );
566} 566}
567 567
568///////// 568/////////
@@ -594,10 +594,7 @@ void Bu::Server::ClientWorker::run()
594 continue; 594 continue;
595 } 595 }
596 596
597 Bu::println("Processing input...");
598 pClient->processInput(); 597 pClient->processInput();
599 Bu::println("Processing input complete.");
600 Bu::println("*** ClientBuf: input: %1b, output: %2b").arg( pClient->getInputSize() ).arg( pClient->getOutputSize() );
601 if( pClient->getOutputSize() > 0 ) 598 if( pClient->getOutputSize() > 0 )
602 { 599 {
603 rSrv.clientWriteReady( pClient->getId() ); 600 rSrv.clientWriteReady( pClient->getId() );
diff --git a/src/stable/server.h b/src/stable/server.h
index 56ac29a..a839968 100644
--- a/src/stable/server.h
+++ b/src/stable/server.h
@@ -23,7 +23,7 @@
23#include "bu/config.h" 23#include "bu/config.h"
24 24
25#ifndef PROFILE_BU_SERVER 25#ifndef PROFILE_BU_SERVER
26// #define PROFILE_BU_SERVER 1 26 #define PROFILE_BU_SERVER 1
27#endif 27#endif
28 28
29#ifdef PROFILE_BU_SERVER 29#ifdef PROFILE_BU_SERVER