diff options
| author | Mike Buland <eichlan@xagasoft.com> | 2023-07-31 15:17:52 -0700 |
|---|---|---|
| committer | Mike Buland <eichlan@xagasoft.com> | 2023-07-31 15:17:52 -0700 |
| commit | a49af97abf091a32f6ec2c3985aa0890ded65d9c (patch) | |
| tree | 9332acb37ccaa1552bf273d2383d4a0292deea14 /src/stable | |
| parent | 9a7dde29dc1bc1f699508ad9c0335f4d7abf319f (diff) | |
| download | libbu++-a49af97abf091a32f6ec2c3985aa0890ded65d9c.tar.gz libbu++-a49af97abf091a32f6ec2c3985aa0890ded65d9c.tar.bz2 libbu++-a49af97abf091a32f6ec2c3985aa0890ded65d9c.tar.xz libbu++-a49af97abf091a32f6ec2c3985aa0890ded65d9c.zip | |
Debugging yet, but the new server works.
It at least seems to!
Diffstat (limited to '')
| -rw-r--r-- | src/stable/client.cpp | 18 | ||||
| -rw-r--r-- | src/stable/client.h | 9 | ||||
| -rw-r--r-- | src/stable/clientbuf.cpp | 10 | ||||
| -rw-r--r-- | src/stable/server.cpp | 194 | ||||
| -rw-r--r-- | src/stable/server.h | 29 | ||||
| -rw-r--r-- | src/stable/serverinterface.cpp | 29 | ||||
| -rw-r--r-- | src/stable/serverinterface.h | 40 |
7 files changed, 263 insertions, 66 deletions
diff --git a/src/stable/client.cpp b/src/stable/client.cpp index d2d48d7..159d103 100644 --- a/src/stable/client.cpp +++ b/src/stable/client.cpp | |||
| @@ -23,16 +23,21 @@ | |||
| 23 | #define BU_PROFILE_END( x ) (void)0 | 23 | #define BU_PROFILE_END( x ) (void)0 |
| 24 | #endif | 24 | #endif |
| 25 | 25 | ||
| 26 | Bu::Client::Client( class Bu::ClientLinkFactory *pfLink ) : | 26 | Bu::Client::Client( int iId, const Bu::ServerInterface &rServer ) : |
| 27 | pProto( NULL ), | 27 | pProto( NULL ), |
| 28 | bWantsDisconnect( false ), | 28 | bWantsDisconnect( false ), |
| 29 | pfLink( pfLink ) | 29 | iId( iId ), |
| 30 | xServer( rServer ) | ||
| 30 | { | 31 | { |
| 31 | } | 32 | } |
| 32 | 33 | ||
| 33 | Bu::Client::~Client() | 34 | Bu::Client::~Client() |
| 34 | { | 35 | { |
| 35 | delete pfLink; | 36 | } |
| 37 | |||
| 38 | int Bu::Client::getId() const | ||
| 39 | { | ||
| 40 | return iId; | ||
| 36 | } | 41 | } |
| 37 | 42 | ||
| 38 | void Bu::Client::processInput() | 43 | void Bu::Client::processInput() |
| @@ -47,6 +52,11 @@ void Bu::Client::processInput() | |||
| 47 | } | 52 | } |
| 48 | } | 53 | } |
| 49 | 54 | ||
| 55 | void Bu::Client::outputReady() | ||
| 56 | { | ||
| 57 | xServer.outputReady( iId ); | ||
| 58 | } | ||
| 59 | |||
| 50 | void Bu::Client::setProtocol( Protocol *pProto ) | 60 | void Bu::Client::setProtocol( Protocol *pProto ) |
| 51 | { | 61 | { |
| 52 | Bu::MutexLocker l( mProto ); | 62 | Bu::MutexLocker l( mProto ); |
| @@ -161,7 +171,7 @@ void Bu::Client::close() | |||
| 161 | 171 | ||
| 162 | Bu::ClientLink *Bu::Client::getLink() | 172 | Bu::ClientLink *Bu::Client::getLink() |
| 163 | { | 173 | { |
| 164 | return pfLink->createLink( this ); | 174 | return NULL; //pfLink->createLink( this ); |
| 165 | } | 175 | } |
| 166 | 176 | ||
| 167 | void Bu::Client::onMessage( const Bu::String &sMsg ) | 177 | void Bu::Client::onMessage( const Bu::String &sMsg ) |
diff --git a/src/stable/client.h b/src/stable/client.h index abe807e..0ff7914 100644 --- a/src/stable/client.h +++ b/src/stable/client.h | |||
| @@ -15,6 +15,7 @@ | |||
| 15 | #include "bu/clientbuf.h" | 15 | #include "bu/clientbuf.h" |
| 16 | #include "bu/mutex.h" | 16 | #include "bu/mutex.h" |
| 17 | #include "bu/readwritemutex.h" | 17 | #include "bu/readwritemutex.h" |
| 18 | #include "bu/serverinterface.h" | ||
| 18 | 19 | ||
| 19 | #ifndef PROFILE_BU_SERVER | 20 | #ifndef PROFILE_BU_SERVER |
| 20 | // #define PROFILE_BU_SERVER 1 | 21 | // #define PROFILE_BU_SERVER 1 |
| @@ -37,10 +38,13 @@ namespace Bu | |||
| 37 | { | 38 | { |
| 38 | friend class Server; | 39 | friend class Server; |
| 39 | public: | 40 | public: |
| 40 | Client( Bu::ClientLinkFactory *pfLink ); | 41 | Client( int iId, const Bu::ServerInterface &rServer ); |
| 41 | virtual ~Client(); | 42 | virtual ~Client(); |
| 42 | 43 | ||
| 44 | int getId() const; | ||
| 45 | |||
| 43 | void processInput(); | 46 | void processInput(); |
| 47 | void outputReady(); | ||
| 44 | 48 | ||
| 45 | Bu::size write( const Bu::String &sData ); | 49 | Bu::size write( const Bu::String &sData ); |
| 46 | Bu::size write( const void *pData, Bu::size nBytes ); | 50 | Bu::size write( const void *pData, Bu::size nBytes ); |
| @@ -116,9 +120,10 @@ namespace Bu | |||
| 116 | Bu::Protocol *pProto; | 120 | Bu::Protocol *pProto; |
| 117 | Bu::ClientBuf cbBuffer; | 121 | Bu::ClientBuf cbBuffer; |
| 118 | bool bWantsDisconnect; | 122 | bool bWantsDisconnect; |
| 119 | class Bu::ClientLinkFactory *pfLink; | ||
| 120 | mutable Bu::Mutex mProto; | 123 | mutable Bu::Mutex mProto; |
| 121 | mutable Bu::Mutex mDisconnect; | 124 | mutable Bu::Mutex mDisconnect; |
| 125 | int iId; | ||
| 126 | Bu::ServerInterface xServer; | ||
| 122 | }; | 127 | }; |
| 123 | } | 128 | } |
| 124 | 129 | ||
diff --git a/src/stable/clientbuf.cpp b/src/stable/clientbuf.cpp index c6b310f..8c4afc4 100644 --- a/src/stable/clientbuf.cpp +++ b/src/stable/clientbuf.cpp | |||
| @@ -175,19 +175,16 @@ void Bu::ClientBuf::ClientAccess::close() | |||
| 175 | 175 | ||
| 176 | Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes ) | 176 | Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes ) |
| 177 | { | 177 | { |
| 178 | Bu::println("ClientAccess::read( ptr, %1 )").arg( iBytes ); | ||
| 179 | char *pBuf = (char *)pBufRaw; | 178 | char *pBuf = (char *)pBufRaw; |
| 180 | Bu::MutexLocker l( mAccess ); | 179 | Bu::MutexLocker l( mAccess ); |
| 181 | // Read from QueueBuf first | 180 | // Read from QueueBuf first |
| 182 | Bu::size ps = qbPeek.read( pBuf, iBytes ); | 181 | Bu::size ps = qbPeek.read( pBuf, iBytes ); |
| 183 | Bu::println("ClientAccess::read: attempted qbPeek, got %1\n>%2<").arg( ps ).arg( Bu::String(pBuf, ps) ); | ||
| 184 | iBytes -= ps; | 182 | iBytes -= ps; |
| 185 | pBuf += ps; | 183 | pBuf += ps; |
| 186 | // Request space left? Try the client | 184 | // Request space left? Try the client |
| 187 | if( iBytes > 0 ) | 185 | if( iBytes > 0 ) |
| 188 | { | 186 | { |
| 189 | ps += rBuf.accClientFiltered.read( pBuf, iBytes ); | 187 | ps += rBuf.accClientFiltered.read( pBuf, iBytes ); |
| 190 | Bu::println("ClientAccess::read: attempted completion from socket buffer, got %1\n>%2<").arg( ps ).arg( Bu::String(pBuf, ps) ); | ||
| 191 | } | 188 | } |
| 192 | return ps; | 189 | return ps; |
| 193 | } | 190 | } |
| @@ -195,26 +192,22 @@ Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes ) | |||
| 195 | Bu::size Bu::ClientBuf::ClientAccess::peek( void *pData, int iBytes, | 192 | Bu::size Bu::ClientBuf::ClientAccess::peek( void *pData, int iBytes, |
| 196 | int iOffset ) | 193 | int iOffset ) |
| 197 | { | 194 | { |
| 198 | Bu::println("ClientAccess::peek( ptr, %1, %2 )").arg( iBytes ).arg( iOffset ); | ||
| 199 | Bu::MutexLocker l( mAccess ); | 195 | Bu::MutexLocker l( mAccess ); |
| 200 | // Do we have enough data in the peek buffer to handle this? | 196 | // Do we have enough data in the peek buffer to handle this? |
| 201 | if( qbPeek.getSize() < iBytes+iOffset ) | 197 | if( qbPeek.getSize() < iBytes+iOffset ) |
| 202 | { | 198 | { |
| 203 | Bu::println("ClientAccess::peek: Insufficient buffered (have %1b, need %2b)").arg( qbPeek.getSize() ).arg( iBytes+iOffset ); | ||
| 204 | // Nope, make an attempt to fill it in. | 199 | // Nope, make an attempt to fill it in. |
| 205 | int nDiff = (iBytes+iOffset)-qbPeek.getSize(); | 200 | int nDiff = (iBytes+iOffset)-qbPeek.getSize(); |
| 206 | // We have to make our own buffer, since iBytes+nOffeset could be bigger | 201 | // We have to make our own buffer, since iBytes+nOffeset could be bigger |
| 207 | // than pData. | 202 | // than pData. |
| 208 | char *pTmp = new char[nDiff]; | 203 | char *pTmp = new char[nDiff]; |
| 209 | Bu::size ps = rBuf.accClientFiltered.read( pTmp, nDiff ); | 204 | Bu::size ps = rBuf.accClientFiltered.read( pTmp, nDiff ); |
| 210 | Bu::println("ClientAccess::peek: Tried to fill buffer, read %1 of needed %2\n>%3<").arg( ps ).arg( nDiff ).arg( Bu::String(pTmp, ps) ); | ||
| 211 | if( ps > 0 ) | 205 | if( ps > 0 ) |
| 212 | { | 206 | { |
| 213 | // Add the data read to the peek buffer. | 207 | // Add the data read to the peek buffer. |
| 214 | qbPeek.write( pTmp, ps ); | 208 | qbPeek.write( pTmp, ps ); |
| 215 | } | 209 | } |
| 216 | delete[] pTmp; | 210 | delete[] pTmp; |
| 217 | Bu::println("ClientAccess::peek: buffer left with %1").arg( qbPeek.getSize() ); | ||
| 218 | } | 211 | } |
| 219 | 212 | ||
| 220 | return qbPeek.peek( pData, iBytes, iOffset ); | 213 | return qbPeek.peek( pData, iBytes, iOffset ); |
| @@ -234,7 +227,6 @@ Bu::size Bu::ClientBuf::ClientAccess::tell() | |||
| 234 | 227 | ||
| 235 | void Bu::ClientBuf::ClientAccess::seek( Bu::size offset ) | 228 | void Bu::ClientBuf::ClientAccess::seek( Bu::size offset ) |
| 236 | { | 229 | { |
| 237 | Bu::println("ClientAccess::seek( %1 )").arg( offset ); | ||
| 238 | Bu::MutexLocker l( mAccess ); | 230 | Bu::MutexLocker l( mAccess ); |
| 239 | // For this type of stream seek is basically a destructive skip. It's like | 231 | // For this type of stream seek is basically a destructive skip. It's like |
| 240 | // reading the data but with no output buffer. Let's remove data from the | 232 | // reading the data but with no output buffer. Let's remove data from the |
| @@ -242,7 +234,6 @@ void Bu::ClientBuf::ClientAccess::seek( Bu::size offset ) | |||
| 242 | if( qbPeek.getSize() > 0 ) | 234 | if( qbPeek.getSize() > 0 ) |
| 243 | { | 235 | { |
| 244 | Bu::size amount = Bu::buMin( qbPeek.getSize(), offset ); | 236 | Bu::size amount = Bu::buMin( qbPeek.getSize(), offset ); |
| 245 | Bu::println("ClientAccess::seek: buffered: %1, amount: %2").arg( qbPeek.getSize() ).arg( amount ); | ||
| 246 | qbPeek.seek( amount ); | 237 | qbPeek.seek( amount ); |
| 247 | offset -= amount; | 238 | offset -= amount; |
| 248 | } | 239 | } |
| @@ -250,7 +241,6 @@ void Bu::ClientBuf::ClientAccess::seek( Bu::size offset ) | |||
| 250 | // If there's offset left, then apply it to the underlying stream | 241 | // If there's offset left, then apply it to the underlying stream |
| 251 | if( offset > 0 ) | 242 | if( offset > 0 ) |
| 252 | { | 243 | { |
| 253 | Bu::println("ClientAccess::seek: seeking remaining %1 in socket buffer").arg( offset ); | ||
| 254 | rBuf.accClientFiltered.seek( offset ); | 244 | rBuf.accClientFiltered.seek( offset ); |
| 255 | } | 245 | } |
| 256 | } | 246 | } |
diff --git a/src/stable/server.cpp b/src/stable/server.cpp index 7c44c11..cc89f64 100644 --- a/src/stable/server.cpp +++ b/src/stable/server.cpp | |||
| @@ -8,11 +8,14 @@ | |||
| 8 | #include "bu/server.h" | 8 | #include "bu/server.h" |
| 9 | #include <errno.h> | 9 | #include <errno.h> |
| 10 | #include <unistd.h> | 10 | #include <unistd.h> |
| 11 | #include <sys/epoll.h> | ||
| 12 | #include <sys/eventfd.h> | ||
| 11 | #include "bu/serversocket.h" | 13 | #include "bu/serversocket.h" |
| 12 | #include "bu/client.h" | 14 | #include "bu/client.h" |
| 13 | #include "bu/socket.h" | 15 | #include "bu/socket.h" |
| 14 | #include "bu/config.h" | 16 | #include "bu/config.h" |
| 15 | #include "bu/mutexlocker.h" | 17 | #include "bu/mutexlocker.h" |
| 18 | #include "bu/serverinterface.h" | ||
| 16 | 19 | ||
| 17 | #include "bu/sio.h" | 20 | #include "bu/sio.h" |
| 18 | 21 | ||
| @@ -25,14 +28,48 @@ | |||
| 25 | #endif | 28 | #endif |
| 26 | 29 | ||
| 27 | #define RBS 1500 | 30 | #define RBS 1500 |
| 31 | #define MAX_EVENTS 20 | ||
| 32 | |||
| 33 | namespace Bu | ||
| 34 | { | ||
| 35 | class __ServerCore | ||
| 36 | { | ||
| 37 | public: | ||
| 38 | __ServerCore() | ||
| 39 | { | ||
| 40 | fdRead = epoll_create( 1 ); | ||
| 41 | fdWrite = epoll_create( 1 ); | ||
| 42 | fdEvent = eventfd( 0, 0 ); | ||
| 43 | |||
| 44 | struct epoll_event ev; | ||
| 45 | ev.events = EPOLLIN; | ||
| 46 | ev.data.fd = fdEvent; | ||
| 47 | epoll_ctl( fdRead, EPOLL_CTL_ADD, fdEvent, &ev ); | ||
| 48 | epoll_ctl( fdWrite, EPOLL_CTL_ADD, fdEvent, &ev ); | ||
| 49 | } | ||
| 50 | |||
| 51 | ~__ServerCore() | ||
| 52 | { | ||
| 53 | close( fdRead ); | ||
| 54 | close( fdWrite ); | ||
| 55 | close( fdEvent ); | ||
| 56 | } | ||
| 57 | |||
| 58 | Server::fd fdRead; | ||
| 59 | Server::fd fdWrite; | ||
| 60 | Server::fd fdEvent; | ||
| 61 | }; | ||
| 62 | } | ||
| 28 | 63 | ||
| 29 | Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : | 64 | Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : |
| 65 | pCore( new Bu::__ServerCore() ), | ||
| 30 | nTimeoutSec( 0 ), | 66 | nTimeoutSec( 0 ), |
| 31 | nTimeoutUSec( 0 ), | 67 | nTimeoutUSec( 0 ), |
| 32 | bAutoTick( false ) | 68 | bAutoTick( false ), |
| 69 | tMonitorWrite( *this ), | ||
| 70 | bRunning( true ) | ||
| 33 | { | 71 | { |
| 34 | BU_PROFILE_START("server"); | 72 | BU_PROFILE_START("server"); |
| 35 | FD_ZERO( &fdActive ); | ||
| 36 | 73 | ||
| 37 | if( iIoWorkers < 1 ) | 74 | if( iIoWorkers < 1 ) |
| 38 | iIoWorkers = 1; | 75 | iIoWorkers = 1; |
| @@ -52,11 +89,14 @@ Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : | |||
| 52 | lClientWorker.append( pWorker ); | 89 | lClientWorker.append( pWorker ); |
| 53 | pWorker->start(); | 90 | pWorker->start(); |
| 54 | } | 91 | } |
| 92 | tMonitorWrite.start(); | ||
| 55 | } | 93 | } |
| 56 | 94 | ||
| 57 | Bu::Server::~Server() | 95 | Bu::Server::~Server() |
| 58 | { | 96 | { |
| 59 | shutdown(); | 97 | shutdown(); |
| 98 | tMonitorWrite.join(); | ||
| 99 | delete pCore; | ||
| 60 | BU_PROFILE_START("server"); | 100 | BU_PROFILE_START("server"); |
| 61 | } | 101 | } |
| 62 | 102 | ||
| @@ -68,7 +108,11 @@ void Bu::Server::addServerSocket( Bu::ServerSocket *pSocket ) | |||
| 68 | throw Bu::ExceptionBase("Cannot get file descriptor from " | 108 | throw Bu::ExceptionBase("Cannot get file descriptor from " |
| 69 | "provided ServerSocket."); | 109 | "provided ServerSocket."); |
| 70 | } | 110 | } |
| 71 | FD_SET( iFd, &fdActive ); | 111 | |
| 112 | struct epoll_event ev; | ||
| 113 | ev.events = EPOLLIN; | ||
| 114 | ev.data.fd = iFd; | ||
| 115 | epoll_ctl( pCore->fdRead, EPOLL_CTL_ADD, iFd, &ev ); | ||
| 72 | hServers.insert( iFd, pSocket ); | 116 | hServers.insert( iFd, pSocket ); |
| 73 | } | 117 | } |
| 74 | 118 | ||
| @@ -81,26 +125,22 @@ void Bu::Server::setTimeout( int nTimeoutSec, int nTimeoutUSec ) | |||
| 81 | void Bu::Server::scan() | 125 | void Bu::Server::scan() |
| 82 | { | 126 | { |
| 83 | BU_PROFILE_START("scan"); | 127 | BU_PROFILE_START("scan"); |
| 84 | struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec }; | ||
| 85 | 128 | ||
| 86 | fd_set fdRead = fdActive; | ||
| 87 | fd_set fdWrite /* = fdActive*/; | ||
| 88 | fd_set fdException = fdActive; | ||
| 89 | |||
| 90 | FD_ZERO( &fdWrite ); | ||
| 91 | mClients.lock(); | 129 | mClients.lock(); |
| 92 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) | 130 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) |
| 93 | { | 131 | { |
| 94 | if( (*i)->hasOutput() ) | 132 | if( (*i)->hasOutput() ) |
| 95 | { | 133 | { |
| 96 | Bu::println("Socket has output..."); | 134 | // Queue it for output here? |
| 97 | FD_SET( i.getKey(), &fdWrite ); | ||
| 98 | } | 135 | } |
| 99 | } | 136 | } |
| 100 | mClients.unlock(); | 137 | mClients.unlock(); |
| 101 | 138 | ||
| 102 | if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, | 139 | struct epoll_event ev[MAX_EVENTS]; |
| 103 | &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 ) | 140 | |
| 141 | int iCount = epoll_wait( pCore->fdRead, ev, MAX_EVENTS, nTimeoutSec*1000+nTimeoutUSec/1000 ); | ||
| 142 | |||
| 143 | if( iCount < 0 ) | ||
| 104 | { | 144 | { |
| 105 | char buf[1024]; | 145 | char buf[1024]; |
| 106 | strerror_r( errno, buf, 1024 ); | 146 | strerror_r( errno, buf, 1024 ); |
| @@ -110,41 +150,29 @@ void Bu::Server::scan() | |||
| 110 | ); | 150 | ); |
| 111 | } | 151 | } |
| 112 | 152 | ||
| 113 | evIoCycle.clear(); | ||
| 114 | Bu::println("Cycle clear"); | 153 | Bu::println("Cycle clear"); |
| 115 | 154 | ||
| 116 | for( int j = 0; j < FD_SETSIZE; j++ ) | 155 | for( int j = 0; j < iCount; j++ ) |
| 117 | { | 156 | { |
| 118 | if( FD_ISSET( j, &fdRead ) ) | 157 | if( hServers.has( ev[j].data.fd ) ) |
| 119 | { | 158 | { |
| 120 | if( hServers.has( j ) ) | 159 | Bu::ServerSocket *pSrv = hServers.get( ev[j].data.fd ); |
| 121 | { | 160 | addClient( pSrv, pSrv->accept() ); |
| 122 | Bu::ServerSocket *pSrv = hServers.get( j ); | ||
| 123 | addClient( pSrv, pSrv->accept() ); | ||
| 124 | } | ||
| 125 | else | ||
| 126 | { | ||
| 127 | evIoCycle.increment(); | ||
| 128 | Bu::println("Increment (read)"); | ||
| 129 | qIoEvent.enqueue( new Event( j, Event::Read ) ); | ||
| 130 | } | ||
| 131 | } | 161 | } |
| 132 | if( FD_ISSET( j, &fdWrite ) ) | 162 | else |
| 133 | { | 163 | { |
| 134 | evIoCycle.increment(); | 164 | qIoEvent.enqueue( new Event( ev[j].data.fd, Event::Read ) ); |
| 135 | Bu::println("Increment (write)"); | ||
| 136 | qIoEvent.enqueue( new Event( j, Event::Write ) ); | ||
| 137 | } | 165 | } |
| 138 | } | 166 | } |
| 139 | 167 | ||
| 140 | Bu::println("Waiting"); | 168 | Bu::println("Waiting"); |
| 141 | while( evIoCycle.wait() > 0 ) { } | ||
| 142 | 169 | ||
| 143 | Bu::List<int> lDelete; | 170 | Bu::List<int> lDelete; |
| 144 | // Now we just try to write all the pending data on all the sockets. | 171 | // Now we just try to write all the pending data on all the sockets. |
| 145 | // this could be done better eventually, if we care about the socket | 172 | // this could be done better eventually, if we care about the socket |
| 146 | // wanting to accept writes (using a select). | 173 | // wanting to accept writes (using a select). |
| 147 | mClients.lock(); | 174 | mClients.lock(); |
| 175 | |||
| 148 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) | 176 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) |
| 149 | { | 177 | { |
| 150 | if( (*i)->wantsDisconnect() && !(*i)->hasOutput() ) | 178 | if( (*i)->wantsDisconnect() && !(*i)->hasOutput() ) |
| @@ -174,10 +202,18 @@ void Bu::Server::addClient( const Bu::ServerSocket *pSrv, Bu::Socket *pSocket ) | |||
| 174 | { | 202 | { |
| 175 | throw Bu::ExceptionBase("No file descriptor?"); | 203 | throw Bu::ExceptionBase("No file descriptor?"); |
| 176 | } | 204 | } |
| 177 | FD_SET( iFdCli, &fdActive ); | 205 | |
| 206 | struct epoll_event ev; | ||
| 207 | ev.events = EPOLLIN | EPOLLONESHOT; | ||
| 208 | ev.data.fd = iFdCli; | ||
| 209 | epoll_ctl( pCore->fdRead, EPOLL_CTL_ADD, iFdCli, &ev ); | ||
| 210 | |||
| 211 | ev.events = 0; | ||
| 212 | epoll_ctl( pCore->fdWrite, EPOLL_CTL_ADD, iFdCli, &ev ); | ||
| 178 | 213 | ||
| 179 | Client *pClient = new Client( | 214 | Client *pClient = new Client( |
| 180 | new SrvClientLinkFactory() | 215 | iFdCli, |
| 216 | ServerInterface( *this ) | ||
| 181 | ); | 217 | ); |
| 182 | { | 218 | { |
| 183 | Bu::MutexLocker l( mClients ); | 219 | Bu::MutexLocker l( mClients ); |
| @@ -252,6 +288,11 @@ void Bu::Server::tick() | |||
| 252 | void Bu::Server::shutdown() | 288 | void Bu::Server::shutdown() |
| 253 | { | 289 | { |
| 254 | { | 290 | { |
| 291 | int64_t iVal = 1; | ||
| 292 | write( pCore->fdEvent, &iVal, sizeof(int64_t) ); | ||
| 293 | } | ||
| 294 | |||
| 295 | { | ||
| 255 | qIoEvent.stop(); | 296 | qIoEvent.stop(); |
| 256 | qClientEvent.stop(); | 297 | qClientEvent.stop(); |
| 257 | Bu::Server::Event *pEv; | 298 | Bu::Server::Event *pEv; |
| @@ -294,6 +335,24 @@ void Bu::Server::shutdown() | |||
| 294 | } | 335 | } |
| 295 | hClients.clear(); | 336 | hClients.clear(); |
| 296 | } | 337 | } |
| 338 | |||
| 339 | void Bu::Server::clientReadReady( fd iFd ) | ||
| 340 | { | ||
| 341 | // Re-arm | ||
| 342 | struct epoll_event ev; | ||
| 343 | ev.events = EPOLLIN | EPOLLONESHOT; | ||
| 344 | ev.data.fd = iFd; | ||
| 345 | epoll_ctl( pCore->fdRead, EPOLL_CTL_MOD, iFd, &ev ); | ||
| 346 | } | ||
| 347 | |||
| 348 | void Bu::Server::clientWriteReady( fd iFd ) | ||
| 349 | { | ||
| 350 | // Re-arm | ||
| 351 | struct epoll_event ev; | ||
| 352 | ev.events = EPOLLOUT | EPOLLONESHOT; | ||
| 353 | ev.data.fd = iFd; | ||
| 354 | epoll_ctl( pCore->fdWrite, EPOLL_CTL_MOD, iFd, &ev ); | ||
| 355 | } | ||
| 297 | 356 | ||
| 298 | void Bu::Server::closeClient( fd iSocket ) | 357 | void Bu::Server::closeClient( fd iSocket ) |
| 299 | { | 358 | { |
| @@ -306,13 +365,60 @@ void Bu::Server::closeClient( fd iSocket ) | |||
| 306 | hClients.erase( iSocket ); | 365 | hClients.erase( iSocket ); |
| 307 | pSocket->close(); | 366 | pSocket->close(); |
| 308 | hSockets.erase( iSocket ); | 367 | hSockets.erase( iSocket ); |
| 309 | FD_CLR( iSocket, &fdActive ); | 368 | |
| 369 | struct epoll_event ev; | ||
| 370 | epoll_ctl( pCore->fdRead, EPOLL_CTL_DEL, iSocket, &ev ); | ||
| 371 | epoll_ctl( pCore->fdWrite, EPOLL_CTL_DEL, iSocket, &ev ); | ||
| 372 | |||
| 310 | delete pClient; | 373 | delete pClient; |
| 311 | delete pSocket; | 374 | delete pSocket; |
| 312 | BU_PROFILE_END("closeClient"); | 375 | BU_PROFILE_END("closeClient"); |
| 313 | } | 376 | } |
| 314 | 377 | ||
| 315 | //////// | 378 | //////// |
| 379 | // WriteMonitor | ||
| 380 | //// | ||
| 381 | Bu::Server::WriteMonitor::WriteMonitor( Server &rSrv ) : | ||
| 382 | rSrv( rSrv ) | ||
| 383 | { | ||
| 384 | } | ||
| 385 | |||
| 386 | Bu::Server::WriteMonitor::~WriteMonitor() | ||
| 387 | { | ||
| 388 | } | ||
| 389 | |||
| 390 | void Bu::Server::WriteMonitor::run() | ||
| 391 | { | ||
| 392 | setName("busrv-writeMon"); | ||
| 393 | struct epoll_event ev[MAX_EVENTS]; | ||
| 394 | for(;;) | ||
| 395 | { | ||
| 396 | int iCount = epoll_wait( rSrv.pCore->fdWrite, ev, MAX_EVENTS, 5000 ); | ||
| 397 | if( iCount < 0 ) | ||
| 398 | { | ||
| 399 | // Bad error? | ||
| 400 | return; | ||
| 401 | } | ||
| 402 | |||
| 403 | for( int j = 0; j < iCount; j++ ) | ||
| 404 | { | ||
| 405 | if( ev[j].data.fd == rSrv.pCore->fdEvent ) | ||
| 406 | { | ||
| 407 | Bu::println("Bu::Server::WriteMonitor -> got event on fdEvent, exiting..."); | ||
| 408 | return; | ||
| 409 | } | ||
| 410 | else | ||
| 411 | { | ||
| 412 | // Queue the write | ||
| 413 | rSrv.qIoEvent.enqueue( | ||
| 414 | new Event( ev[j].data.fd, Event::Write ) | ||
| 415 | ); | ||
| 416 | } | ||
| 417 | } | ||
| 418 | } | ||
| 419 | } | ||
| 420 | |||
| 421 | //////// | ||
| 316 | // Event | 422 | // Event |
| 317 | //// | 423 | //// |
| 318 | 424 | ||
| @@ -365,7 +471,6 @@ void Bu::Server::IoWorker::run() | |||
| 365 | if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) ) | 471 | if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) ) |
| 366 | { | 472 | { |
| 367 | delete pEv; | 473 | delete pEv; |
| 368 | rSrv.evIoCycle.decrement(); | ||
| 369 | continue; | 474 | continue; |
| 370 | } | 475 | } |
| 371 | 476 | ||
| @@ -384,9 +489,6 @@ void Bu::Server::IoWorker::run() | |||
| 384 | } | 489 | } |
| 385 | 490 | ||
| 386 | delete pEv; | 491 | delete pEv; |
| 387 | |||
| 388 | Bu::println("decrement"); | ||
| 389 | rSrv.evIoCycle.decrement(); | ||
| 390 | } | 492 | } |
| 391 | } | 493 | } |
| 392 | 494 | ||
| @@ -404,7 +506,6 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) | |||
| 404 | try | 506 | try |
| 405 | { | 507 | { |
| 406 | iRead = pSocket->read( buf, RBS ); | 508 | iRead = pSocket->read( buf, RBS ); |
| 407 | Bu::println("IoWorker::handleRead: read() -> %1").arg( iRead ); | ||
| 408 | 509 | ||
| 409 | if( iRead == 0 ) | 510 | if( iRead == 0 ) |
| 410 | { | 511 | { |
| @@ -421,7 +522,8 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) | |||
| 421 | catch( Bu::ExceptionBase &e ) | 522 | catch( Bu::ExceptionBase &e ) |
| 422 | { | 523 | { |
| 423 | Bu::println("IoWorker::handleRead: exception, closing: %1").arg( e.what() ); | 524 | Bu::println("IoWorker::handleRead: exception, closing: %1").arg( e.what() ); |
| 424 | close( pSocket ); | 525 | pClient->disconnect(); |
| 526 | //close( pSocket ); | ||
| 425 | return; | 527 | return; |
| 426 | } | 528 | } |
| 427 | } | 529 | } |
| @@ -430,13 +532,16 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) | |||
| 430 | if( iTotal == 0 ) | 532 | if( iTotal == 0 ) |
| 431 | { | 533 | { |
| 432 | Bu::println("IoWorker::handleRead: read nothing, closing"); | 534 | Bu::println("IoWorker::handleRead: read nothing, closing"); |
| 433 | close( pSocket ); | 535 | pClient->disconnect(); |
| 536 | //close( pSocket ); | ||
| 434 | } | 537 | } |
| 435 | else | 538 | else |
| 436 | { | 539 | { |
| 437 | Bu::Server::fd iFd; | 540 | Bu::Server::fd iFd; |
| 438 | pSocket->getFd( iFd ); | 541 | pSocket->getFd( iFd ); |
| 439 | rSrv.qClientEvent.enqueue( new Event( iFd, Event::Process ) ); | 542 | rSrv.qClientEvent.enqueue( new Event( iFd, Event::Process ) ); |
| 543 | |||
| 544 | rSrv.clientReadReady( iFd ); | ||
| 440 | } | 545 | } |
| 441 | } | 546 | } |
| 442 | 547 | ||
| @@ -449,7 +554,6 @@ void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket ) | |||
| 449 | int iAmnt = RBS; | 554 | int iAmnt = RBS; |
| 450 | iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); | 555 | iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); |
| 451 | int iReal = pSocket->write( buf, iAmnt ); | 556 | int iReal = pSocket->write( buf, iAmnt ); |
| 452 | Bu::println("IoWorker::handleWrite: Copy out: iAmnt=%1, iReal=%2\n>%3<").arg( iAmnt ).arg( iReal ).arg( Bu::String( buf, iReal ) ); | ||
| 453 | pClient->cbBuffer.server().seek( iReal ); | 557 | pClient->cbBuffer.server().seek( iReal ); |
| 454 | } | 558 | } |
| 455 | } | 559 | } |
| @@ -494,6 +598,10 @@ void Bu::Server::ClientWorker::run() | |||
| 494 | pClient->processInput(); | 598 | pClient->processInput(); |
| 495 | Bu::println("Processing input complete."); | 599 | Bu::println("Processing input complete."); |
| 496 | Bu::println("*** ClientBuf: input: %1b, output: %2b").arg( pClient->getInputSize() ).arg( pClient->getOutputSize() ); | 600 | Bu::println("*** ClientBuf: input: %1b, output: %2b").arg( pClient->getInputSize() ).arg( pClient->getOutputSize() ); |
| 601 | if( pClient->getOutputSize() > 0 ) | ||
| 602 | { | ||
| 603 | rSrv.clientWriteReady( pClient->getId() ); | ||
| 604 | } | ||
| 497 | delete pEv; | 605 | delete pEv; |
| 498 | } | 606 | } |
| 499 | } | 607 | } |
diff --git a/src/stable/server.h b/src/stable/server.h index e2b7d53..56ac29a 100644 --- a/src/stable/server.h +++ b/src/stable/server.h | |||
| @@ -10,10 +10,6 @@ | |||
| 10 | 10 | ||
| 11 | #include <stdint.h> | 11 | #include <stdint.h> |
| 12 | 12 | ||
| 13 | #ifndef WIN32 | ||
| 14 | #include <sys/select.h> | ||
| 15 | #endif | ||
| 16 | |||
| 17 | #include "bu/string.h" | 13 | #include "bu/string.h" |
| 18 | #include "bu/list.h" | 14 | #include "bu/list.h" |
| 19 | 15 | ||
| @@ -62,6 +58,7 @@ namespace Bu | |||
| 62 | */ | 58 | */ |
| 63 | class Server | 59 | class Server |
| 64 | { | 60 | { |
| 61 | friend class ServerInterface; | ||
| 65 | public: | 62 | public: |
| 66 | Server( int iIoWorkers=4, int iClientWorkers=8 ); | 63 | Server( int iIoWorkers=4, int iClientWorkers=8 ); |
| 67 | virtual ~Server(); | 64 | virtual ~Server(); |
| @@ -90,8 +87,12 @@ namespace Bu | |||
| 90 | 87 | ||
| 91 | void shutdown(); | 88 | void shutdown(); |
| 92 | 89 | ||
| 93 | private: | 90 | protected: |
| 91 | void clientReadReady( fd iFd ); | ||
| 92 | void clientWriteReady( fd iFd ); | ||
| 94 | void closeClient( fd iSocket ); | 93 | void closeClient( fd iSocket ); |
| 94 | |||
| 95 | private: | ||
| 95 | class SrvClientLink : public Bu::ClientLink | 96 | class SrvClientLink : public Bu::ClientLink |
| 96 | { | 97 | { |
| 97 | public: | 98 | public: |
| @@ -113,6 +114,19 @@ namespace Bu | |||
| 113 | virtual Bu::ClientLink *createLink( Bu::Client *pClient ); | 114 | virtual Bu::ClientLink *createLink( Bu::Client *pClient ); |
| 114 | }; | 115 | }; |
| 115 | 116 | ||
| 117 | class WriteMonitor : public Bu::Thread | ||
| 118 | { | ||
| 119 | public: | ||
| 120 | WriteMonitor( Server &rSrv ); | ||
| 121 | virtual ~WriteMonitor(); | ||
| 122 | |||
| 123 | protected: | ||
| 124 | virtual void run(); | ||
| 125 | |||
| 126 | private: | ||
| 127 | Server &rSrv; | ||
| 128 | }; | ||
| 129 | |||
| 116 | class Event | 130 | class Event |
| 117 | { | 131 | { |
| 118 | public: | 132 | public: |
| @@ -168,9 +182,9 @@ namespace Bu | |||
| 168 | }; | 182 | }; |
| 169 | friend class Bu::Server::ClientWorker; | 183 | friend class Bu::Server::ClientWorker; |
| 170 | 184 | ||
| 185 | class __ServerCore *pCore; | ||
| 171 | int nTimeoutSec; | 186 | int nTimeoutSec; |
| 172 | int nTimeoutUSec; | 187 | int nTimeoutUSec; |
| 173 | fd_set fdActive; | ||
| 174 | typedef Hash<fd,ServerSocket *> SrvHash; | 188 | typedef Hash<fd,ServerSocket *> SrvHash; |
| 175 | SrvHash hServers; | 189 | SrvHash hServers; |
| 176 | typedef Hash<fd,Client *> ClientHash; | 190 | typedef Hash<fd,Client *> ClientHash; |
| @@ -188,7 +202,8 @@ namespace Bu | |||
| 188 | typedef List<ClientWorker *> ClientWorkerList; | 202 | typedef List<ClientWorker *> ClientWorkerList; |
| 189 | IoWorkerList lIoWorker; | 203 | IoWorkerList lIoWorker; |
| 190 | ClientWorkerList lClientWorker; | 204 | ClientWorkerList lClientWorker; |
| 191 | Bu::CounterEvent evIoCycle; | 205 | WriteMonitor tMonitorWrite; |
| 206 | bool bRunning; | ||
| 192 | }; | 207 | }; |
| 193 | } | 208 | } |
| 194 | 209 | ||
diff --git a/src/stable/serverinterface.cpp b/src/stable/serverinterface.cpp new file mode 100644 index 0000000..0e3122e --- /dev/null +++ b/src/stable/serverinterface.cpp | |||
| @@ -0,0 +1,29 @@ | |||
| 1 | /* | ||
| 2 | * Copyright (C) 2007-2019 Xagasoft, All rights reserved. | ||
| 3 | * | ||
| 4 | * This file is part of the libbu++ library and is released under the | ||
| 5 | * terms of the license contained in the file LICENSE. | ||
| 6 | */ | ||
| 7 | |||
| 8 | #include "bu/serverinterface.h" | ||
| 9 | #include "bu/server.h" | ||
| 10 | |||
| 11 | Bu::ServerInterface::ServerInterface( Server &rSrv ) : | ||
| 12 | rSrv( rSrv ) | ||
| 13 | { | ||
| 14 | } | ||
| 15 | |||
| 16 | Bu::ServerInterface::ServerInterface( const ServerInterface &rSrc ) : | ||
| 17 | rSrv( rSrc.rSrv ) | ||
| 18 | { | ||
| 19 | } | ||
| 20 | |||
| 21 | Bu::ServerInterface::~ServerInterface() | ||
| 22 | { | ||
| 23 | } | ||
| 24 | |||
| 25 | void Bu::ServerInterface::outputReady( int iClientId ) | ||
| 26 | { | ||
| 27 | rSrv.clientWriteReady( iClientId ); | ||
| 28 | } | ||
| 29 | |||
diff --git a/src/stable/serverinterface.h b/src/stable/serverinterface.h new file mode 100644 index 0000000..edc8335 --- /dev/null +++ b/src/stable/serverinterface.h | |||
| @@ -0,0 +1,40 @@ | |||
| 1 | /* | ||
| 2 | * Copyright (C) 2007-2019 Xagasoft, All rights reserved. | ||
| 3 | * | ||
| 4 | * This file is part of the libbu++ library and is released under the | ||
| 5 | * terms of the license contained in the file LICENSE. | ||
| 6 | */ | ||
| 7 | |||
| 8 | #ifndef BU_SERVER_INTERFACE_H | ||
| 9 | #define BU_SERVER_INTERFACE_H | ||
| 10 | |||
| 11 | #include <stdint.h> | ||
| 12 | |||
| 13 | namespace Bu | ||
| 14 | { | ||
| 15 | class Server; | ||
| 16 | |||
| 17 | class ServerInterface | ||
| 18 | { | ||
| 19 | friend class Bu::Server; | ||
| 20 | private: | ||
| 21 | ServerInterface( Bu::Server &rSrv ); | ||
| 22 | public: | ||
| 23 | ServerInterface( const Bu::ServerInterface &rSrc ); | ||
| 24 | ~ServerInterface(); | ||
| 25 | |||
| 26 | /** | ||
| 27 | * Clients will call this when they have filled the output buffer and | ||
| 28 | * are ready for that data to be sent. This only needs to be called | ||
| 29 | * when async output is generated, that is when output is generated not | ||
| 30 | * in the processInput method. | ||
| 31 | */ | ||
| 32 | void outputReady( int iClientId ); | ||
| 33 | |||
| 34 | private: | ||
| 35 | Bu::Server &rSrv; | ||
| 36 | }; | ||
| 37 | } | ||
| 38 | |||
| 39 | #endif | ||
| 40 | |||
