diff options
author | Mike Buland <eichlan@xagasoft.com> | 2023-07-31 15:17:52 -0700 |
---|---|---|
committer | Mike Buland <eichlan@xagasoft.com> | 2023-07-31 15:17:52 -0700 |
commit | a49af97abf091a32f6ec2c3985aa0890ded65d9c (patch) | |
tree | 9332acb37ccaa1552bf273d2383d4a0292deea14 /src | |
parent | 9a7dde29dc1bc1f699508ad9c0335f4d7abf319f (diff) | |
download | libbu++-a49af97abf091a32f6ec2c3985aa0890ded65d9c.tar.gz libbu++-a49af97abf091a32f6ec2c3985aa0890ded65d9c.tar.bz2 libbu++-a49af97abf091a32f6ec2c3985aa0890ded65d9c.tar.xz libbu++-a49af97abf091a32f6ec2c3985aa0890ded65d9c.zip |
Debugging yet, but the new server works.
It at least seems to!
Diffstat (limited to 'src')
-rw-r--r-- | src/stable/client.cpp | 18 | ||||
-rw-r--r-- | src/stable/client.h | 9 | ||||
-rw-r--r-- | src/stable/clientbuf.cpp | 10 | ||||
-rw-r--r-- | src/stable/server.cpp | 194 | ||||
-rw-r--r-- | src/stable/server.h | 29 | ||||
-rw-r--r-- | src/stable/serverinterface.cpp | 29 | ||||
-rw-r--r-- | src/stable/serverinterface.h | 40 | ||||
-rw-r--r-- | src/unstable/protocolwebsocket.cpp | 5 |
8 files changed, 266 insertions, 68 deletions
diff --git a/src/stable/client.cpp b/src/stable/client.cpp index d2d48d7..159d103 100644 --- a/src/stable/client.cpp +++ b/src/stable/client.cpp | |||
@@ -23,16 +23,21 @@ | |||
23 | #define BU_PROFILE_END( x ) (void)0 | 23 | #define BU_PROFILE_END( x ) (void)0 |
24 | #endif | 24 | #endif |
25 | 25 | ||
26 | Bu::Client::Client( class Bu::ClientLinkFactory *pfLink ) : | 26 | Bu::Client::Client( int iId, const Bu::ServerInterface &rServer ) : |
27 | pProto( NULL ), | 27 | pProto( NULL ), |
28 | bWantsDisconnect( false ), | 28 | bWantsDisconnect( false ), |
29 | pfLink( pfLink ) | 29 | iId( iId ), |
30 | xServer( rServer ) | ||
30 | { | 31 | { |
31 | } | 32 | } |
32 | 33 | ||
33 | Bu::Client::~Client() | 34 | Bu::Client::~Client() |
34 | { | 35 | { |
35 | delete pfLink; | 36 | } |
37 | |||
38 | int Bu::Client::getId() const | ||
39 | { | ||
40 | return iId; | ||
36 | } | 41 | } |
37 | 42 | ||
38 | void Bu::Client::processInput() | 43 | void Bu::Client::processInput() |
@@ -47,6 +52,11 @@ void Bu::Client::processInput() | |||
47 | } | 52 | } |
48 | } | 53 | } |
49 | 54 | ||
55 | void Bu::Client::outputReady() | ||
56 | { | ||
57 | xServer.outputReady( iId ); | ||
58 | } | ||
59 | |||
50 | void Bu::Client::setProtocol( Protocol *pProto ) | 60 | void Bu::Client::setProtocol( Protocol *pProto ) |
51 | { | 61 | { |
52 | Bu::MutexLocker l( mProto ); | 62 | Bu::MutexLocker l( mProto ); |
@@ -161,7 +171,7 @@ void Bu::Client::close() | |||
161 | 171 | ||
162 | Bu::ClientLink *Bu::Client::getLink() | 172 | Bu::ClientLink *Bu::Client::getLink() |
163 | { | 173 | { |
164 | return pfLink->createLink( this ); | 174 | return NULL; //pfLink->createLink( this ); |
165 | } | 175 | } |
166 | 176 | ||
167 | void Bu::Client::onMessage( const Bu::String &sMsg ) | 177 | void Bu::Client::onMessage( const Bu::String &sMsg ) |
diff --git a/src/stable/client.h b/src/stable/client.h index abe807e..0ff7914 100644 --- a/src/stable/client.h +++ b/src/stable/client.h | |||
@@ -15,6 +15,7 @@ | |||
15 | #include "bu/clientbuf.h" | 15 | #include "bu/clientbuf.h" |
16 | #include "bu/mutex.h" | 16 | #include "bu/mutex.h" |
17 | #include "bu/readwritemutex.h" | 17 | #include "bu/readwritemutex.h" |
18 | #include "bu/serverinterface.h" | ||
18 | 19 | ||
19 | #ifndef PROFILE_BU_SERVER | 20 | #ifndef PROFILE_BU_SERVER |
20 | // #define PROFILE_BU_SERVER 1 | 21 | // #define PROFILE_BU_SERVER 1 |
@@ -37,10 +38,13 @@ namespace Bu | |||
37 | { | 38 | { |
38 | friend class Server; | 39 | friend class Server; |
39 | public: | 40 | public: |
40 | Client( Bu::ClientLinkFactory *pfLink ); | 41 | Client( int iId, const Bu::ServerInterface &rServer ); |
41 | virtual ~Client(); | 42 | virtual ~Client(); |
42 | 43 | ||
44 | int getId() const; | ||
45 | |||
43 | void processInput(); | 46 | void processInput(); |
47 | void outputReady(); | ||
44 | 48 | ||
45 | Bu::size write( const Bu::String &sData ); | 49 | Bu::size write( const Bu::String &sData ); |
46 | Bu::size write( const void *pData, Bu::size nBytes ); | 50 | Bu::size write( const void *pData, Bu::size nBytes ); |
@@ -116,9 +120,10 @@ namespace Bu | |||
116 | Bu::Protocol *pProto; | 120 | Bu::Protocol *pProto; |
117 | Bu::ClientBuf cbBuffer; | 121 | Bu::ClientBuf cbBuffer; |
118 | bool bWantsDisconnect; | 122 | bool bWantsDisconnect; |
119 | class Bu::ClientLinkFactory *pfLink; | ||
120 | mutable Bu::Mutex mProto; | 123 | mutable Bu::Mutex mProto; |
121 | mutable Bu::Mutex mDisconnect; | 124 | mutable Bu::Mutex mDisconnect; |
125 | int iId; | ||
126 | Bu::ServerInterface xServer; | ||
122 | }; | 127 | }; |
123 | } | 128 | } |
124 | 129 | ||
diff --git a/src/stable/clientbuf.cpp b/src/stable/clientbuf.cpp index c6b310f..8c4afc4 100644 --- a/src/stable/clientbuf.cpp +++ b/src/stable/clientbuf.cpp | |||
@@ -175,19 +175,16 @@ void Bu::ClientBuf::ClientAccess::close() | |||
175 | 175 | ||
176 | Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes ) | 176 | Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes ) |
177 | { | 177 | { |
178 | Bu::println("ClientAccess::read( ptr, %1 )").arg( iBytes ); | ||
179 | char *pBuf = (char *)pBufRaw; | 178 | char *pBuf = (char *)pBufRaw; |
180 | Bu::MutexLocker l( mAccess ); | 179 | Bu::MutexLocker l( mAccess ); |
181 | // Read from QueueBuf first | 180 | // Read from QueueBuf first |
182 | Bu::size ps = qbPeek.read( pBuf, iBytes ); | 181 | Bu::size ps = qbPeek.read( pBuf, iBytes ); |
183 | Bu::println("ClientAccess::read: attempted qbPeek, got %1\n>%2<").arg( ps ).arg( Bu::String(pBuf, ps) ); | ||
184 | iBytes -= ps; | 182 | iBytes -= ps; |
185 | pBuf += ps; | 183 | pBuf += ps; |
186 | // Request space left? Try the client | 184 | // Request space left? Try the client |
187 | if( iBytes > 0 ) | 185 | if( iBytes > 0 ) |
188 | { | 186 | { |
189 | ps += rBuf.accClientFiltered.read( pBuf, iBytes ); | 187 | ps += rBuf.accClientFiltered.read( pBuf, iBytes ); |
190 | Bu::println("ClientAccess::read: attempted completion from socket buffer, got %1\n>%2<").arg( ps ).arg( Bu::String(pBuf, ps) ); | ||
191 | } | 188 | } |
192 | return ps; | 189 | return ps; |
193 | } | 190 | } |
@@ -195,26 +192,22 @@ Bu::size Bu::ClientBuf::ClientAccess::read( void *pBufRaw, size iBytes ) | |||
195 | Bu::size Bu::ClientBuf::ClientAccess::peek( void *pData, int iBytes, | 192 | Bu::size Bu::ClientBuf::ClientAccess::peek( void *pData, int iBytes, |
196 | int iOffset ) | 193 | int iOffset ) |
197 | { | 194 | { |
198 | Bu::println("ClientAccess::peek( ptr, %1, %2 )").arg( iBytes ).arg( iOffset ); | ||
199 | Bu::MutexLocker l( mAccess ); | 195 | Bu::MutexLocker l( mAccess ); |
200 | // Do we have enough data in the peek buffer to handle this? | 196 | // Do we have enough data in the peek buffer to handle this? |
201 | if( qbPeek.getSize() < iBytes+iOffset ) | 197 | if( qbPeek.getSize() < iBytes+iOffset ) |
202 | { | 198 | { |
203 | Bu::println("ClientAccess::peek: Insufficient buffered (have %1b, need %2b)").arg( qbPeek.getSize() ).arg( iBytes+iOffset ); | ||
204 | // Nope, make an attempt to fill it in. | 199 | // Nope, make an attempt to fill it in. |
205 | int nDiff = (iBytes+iOffset)-qbPeek.getSize(); | 200 | int nDiff = (iBytes+iOffset)-qbPeek.getSize(); |
206 | // We have to make our own buffer, since iBytes+nOffeset could be bigger | 201 | // We have to make our own buffer, since iBytes+nOffeset could be bigger |
207 | // than pData. | 202 | // than pData. |
208 | char *pTmp = new char[nDiff]; | 203 | char *pTmp = new char[nDiff]; |
209 | Bu::size ps = rBuf.accClientFiltered.read( pTmp, nDiff ); | 204 | Bu::size ps = rBuf.accClientFiltered.read( pTmp, nDiff ); |
210 | Bu::println("ClientAccess::peek: Tried to fill buffer, read %1 of needed %2\n>%3<").arg( ps ).arg( nDiff ).arg( Bu::String(pTmp, ps) ); | ||
211 | if( ps > 0 ) | 205 | if( ps > 0 ) |
212 | { | 206 | { |
213 | // Add the data read to the peek buffer. | 207 | // Add the data read to the peek buffer. |
214 | qbPeek.write( pTmp, ps ); | 208 | qbPeek.write( pTmp, ps ); |
215 | } | 209 | } |
216 | delete[] pTmp; | 210 | delete[] pTmp; |
217 | Bu::println("ClientAccess::peek: buffer left with %1").arg( qbPeek.getSize() ); | ||
218 | } | 211 | } |
219 | 212 | ||
220 | return qbPeek.peek( pData, iBytes, iOffset ); | 213 | return qbPeek.peek( pData, iBytes, iOffset ); |
@@ -234,7 +227,6 @@ Bu::size Bu::ClientBuf::ClientAccess::tell() | |||
234 | 227 | ||
235 | void Bu::ClientBuf::ClientAccess::seek( Bu::size offset ) | 228 | void Bu::ClientBuf::ClientAccess::seek( Bu::size offset ) |
236 | { | 229 | { |
237 | Bu::println("ClientAccess::seek( %1 )").arg( offset ); | ||
238 | Bu::MutexLocker l( mAccess ); | 230 | Bu::MutexLocker l( mAccess ); |
239 | // For this type of stream seek is basically a destructive skip. It's like | 231 | // For this type of stream seek is basically a destructive skip. It's like |
240 | // reading the data but with no output buffer. Let's remove data from the | 232 | // reading the data but with no output buffer. Let's remove data from the |
@@ -242,7 +234,6 @@ void Bu::ClientBuf::ClientAccess::seek( Bu::size offset ) | |||
242 | if( qbPeek.getSize() > 0 ) | 234 | if( qbPeek.getSize() > 0 ) |
243 | { | 235 | { |
244 | Bu::size amount = Bu::buMin( qbPeek.getSize(), offset ); | 236 | Bu::size amount = Bu::buMin( qbPeek.getSize(), offset ); |
245 | Bu::println("ClientAccess::seek: buffered: %1, amount: %2").arg( qbPeek.getSize() ).arg( amount ); | ||
246 | qbPeek.seek( amount ); | 237 | qbPeek.seek( amount ); |
247 | offset -= amount; | 238 | offset -= amount; |
248 | } | 239 | } |
@@ -250,7 +241,6 @@ void Bu::ClientBuf::ClientAccess::seek( Bu::size offset ) | |||
250 | // If there's offset left, then apply it to the underlying stream | 241 | // If there's offset left, then apply it to the underlying stream |
251 | if( offset > 0 ) | 242 | if( offset > 0 ) |
252 | { | 243 | { |
253 | Bu::println("ClientAccess::seek: seeking remaining %1 in socket buffer").arg( offset ); | ||
254 | rBuf.accClientFiltered.seek( offset ); | 244 | rBuf.accClientFiltered.seek( offset ); |
255 | } | 245 | } |
256 | } | 246 | } |
diff --git a/src/stable/server.cpp b/src/stable/server.cpp index 7c44c11..cc89f64 100644 --- a/src/stable/server.cpp +++ b/src/stable/server.cpp | |||
@@ -8,11 +8,14 @@ | |||
8 | #include "bu/server.h" | 8 | #include "bu/server.h" |
9 | #include <errno.h> | 9 | #include <errno.h> |
10 | #include <unistd.h> | 10 | #include <unistd.h> |
11 | #include <sys/epoll.h> | ||
12 | #include <sys/eventfd.h> | ||
11 | #include "bu/serversocket.h" | 13 | #include "bu/serversocket.h" |
12 | #include "bu/client.h" | 14 | #include "bu/client.h" |
13 | #include "bu/socket.h" | 15 | #include "bu/socket.h" |
14 | #include "bu/config.h" | 16 | #include "bu/config.h" |
15 | #include "bu/mutexlocker.h" | 17 | #include "bu/mutexlocker.h" |
18 | #include "bu/serverinterface.h" | ||
16 | 19 | ||
17 | #include "bu/sio.h" | 20 | #include "bu/sio.h" |
18 | 21 | ||
@@ -25,14 +28,48 @@ | |||
25 | #endif | 28 | #endif |
26 | 29 | ||
27 | #define RBS 1500 | 30 | #define RBS 1500 |
31 | #define MAX_EVENTS 20 | ||
32 | |||
33 | namespace Bu | ||
34 | { | ||
35 | class __ServerCore | ||
36 | { | ||
37 | public: | ||
38 | __ServerCore() | ||
39 | { | ||
40 | fdRead = epoll_create( 1 ); | ||
41 | fdWrite = epoll_create( 1 ); | ||
42 | fdEvent = eventfd( 0, 0 ); | ||
43 | |||
44 | struct epoll_event ev; | ||
45 | ev.events = EPOLLIN; | ||
46 | ev.data.fd = fdEvent; | ||
47 | epoll_ctl( fdRead, EPOLL_CTL_ADD, fdEvent, &ev ); | ||
48 | epoll_ctl( fdWrite, EPOLL_CTL_ADD, fdEvent, &ev ); | ||
49 | } | ||
50 | |||
51 | ~__ServerCore() | ||
52 | { | ||
53 | close( fdRead ); | ||
54 | close( fdWrite ); | ||
55 | close( fdEvent ); | ||
56 | } | ||
57 | |||
58 | Server::fd fdRead; | ||
59 | Server::fd fdWrite; | ||
60 | Server::fd fdEvent; | ||
61 | }; | ||
62 | } | ||
28 | 63 | ||
29 | Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : | 64 | Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : |
65 | pCore( new Bu::__ServerCore() ), | ||
30 | nTimeoutSec( 0 ), | 66 | nTimeoutSec( 0 ), |
31 | nTimeoutUSec( 0 ), | 67 | nTimeoutUSec( 0 ), |
32 | bAutoTick( false ) | 68 | bAutoTick( false ), |
69 | tMonitorWrite( *this ), | ||
70 | bRunning( true ) | ||
33 | { | 71 | { |
34 | BU_PROFILE_START("server"); | 72 | BU_PROFILE_START("server"); |
35 | FD_ZERO( &fdActive ); | ||
36 | 73 | ||
37 | if( iIoWorkers < 1 ) | 74 | if( iIoWorkers < 1 ) |
38 | iIoWorkers = 1; | 75 | iIoWorkers = 1; |
@@ -52,11 +89,14 @@ Bu::Server::Server( int iIoWorkers, int iClientWorkers ) : | |||
52 | lClientWorker.append( pWorker ); | 89 | lClientWorker.append( pWorker ); |
53 | pWorker->start(); | 90 | pWorker->start(); |
54 | } | 91 | } |
92 | tMonitorWrite.start(); | ||
55 | } | 93 | } |
56 | 94 | ||
57 | Bu::Server::~Server() | 95 | Bu::Server::~Server() |
58 | { | 96 | { |
59 | shutdown(); | 97 | shutdown(); |
98 | tMonitorWrite.join(); | ||
99 | delete pCore; | ||
60 | BU_PROFILE_START("server"); | 100 | BU_PROFILE_START("server"); |
61 | } | 101 | } |
62 | 102 | ||
@@ -68,7 +108,11 @@ void Bu::Server::addServerSocket( Bu::ServerSocket *pSocket ) | |||
68 | throw Bu::ExceptionBase("Cannot get file descriptor from " | 108 | throw Bu::ExceptionBase("Cannot get file descriptor from " |
69 | "provided ServerSocket."); | 109 | "provided ServerSocket."); |
70 | } | 110 | } |
71 | FD_SET( iFd, &fdActive ); | 111 | |
112 | struct epoll_event ev; | ||
113 | ev.events = EPOLLIN; | ||
114 | ev.data.fd = iFd; | ||
115 | epoll_ctl( pCore->fdRead, EPOLL_CTL_ADD, iFd, &ev ); | ||
72 | hServers.insert( iFd, pSocket ); | 116 | hServers.insert( iFd, pSocket ); |
73 | } | 117 | } |
74 | 118 | ||
@@ -81,26 +125,22 @@ void Bu::Server::setTimeout( int nTimeoutSec, int nTimeoutUSec ) | |||
81 | void Bu::Server::scan() | 125 | void Bu::Server::scan() |
82 | { | 126 | { |
83 | BU_PROFILE_START("scan"); | 127 | BU_PROFILE_START("scan"); |
84 | struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec }; | ||
85 | 128 | ||
86 | fd_set fdRead = fdActive; | ||
87 | fd_set fdWrite /* = fdActive*/; | ||
88 | fd_set fdException = fdActive; | ||
89 | |||
90 | FD_ZERO( &fdWrite ); | ||
91 | mClients.lock(); | 129 | mClients.lock(); |
92 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) | 130 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) |
93 | { | 131 | { |
94 | if( (*i)->hasOutput() ) | 132 | if( (*i)->hasOutput() ) |
95 | { | 133 | { |
96 | Bu::println("Socket has output..."); | 134 | // Queue it for output here? |
97 | FD_SET( i.getKey(), &fdWrite ); | ||
98 | } | 135 | } |
99 | } | 136 | } |
100 | mClients.unlock(); | 137 | mClients.unlock(); |
101 | 138 | ||
102 | if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, | 139 | struct epoll_event ev[MAX_EVENTS]; |
103 | &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 ) | 140 | |
141 | int iCount = epoll_wait( pCore->fdRead, ev, MAX_EVENTS, nTimeoutSec*1000+nTimeoutUSec/1000 ); | ||
142 | |||
143 | if( iCount < 0 ) | ||
104 | { | 144 | { |
105 | char buf[1024]; | 145 | char buf[1024]; |
106 | strerror_r( errno, buf, 1024 ); | 146 | strerror_r( errno, buf, 1024 ); |
@@ -110,41 +150,29 @@ void Bu::Server::scan() | |||
110 | ); | 150 | ); |
111 | } | 151 | } |
112 | 152 | ||
113 | evIoCycle.clear(); | ||
114 | Bu::println("Cycle clear"); | 153 | Bu::println("Cycle clear"); |
115 | 154 | ||
116 | for( int j = 0; j < FD_SETSIZE; j++ ) | 155 | for( int j = 0; j < iCount; j++ ) |
117 | { | 156 | { |
118 | if( FD_ISSET( j, &fdRead ) ) | 157 | if( hServers.has( ev[j].data.fd ) ) |
119 | { | 158 | { |
120 | if( hServers.has( j ) ) | 159 | Bu::ServerSocket *pSrv = hServers.get( ev[j].data.fd ); |
121 | { | 160 | addClient( pSrv, pSrv->accept() ); |
122 | Bu::ServerSocket *pSrv = hServers.get( j ); | ||
123 | addClient( pSrv, pSrv->accept() ); | ||
124 | } | ||
125 | else | ||
126 | { | ||
127 | evIoCycle.increment(); | ||
128 | Bu::println("Increment (read)"); | ||
129 | qIoEvent.enqueue( new Event( j, Event::Read ) ); | ||
130 | } | ||
131 | } | 161 | } |
132 | if( FD_ISSET( j, &fdWrite ) ) | 162 | else |
133 | { | 163 | { |
134 | evIoCycle.increment(); | 164 | qIoEvent.enqueue( new Event( ev[j].data.fd, Event::Read ) ); |
135 | Bu::println("Increment (write)"); | ||
136 | qIoEvent.enqueue( new Event( j, Event::Write ) ); | ||
137 | } | 165 | } |
138 | } | 166 | } |
139 | 167 | ||
140 | Bu::println("Waiting"); | 168 | Bu::println("Waiting"); |
141 | while( evIoCycle.wait() > 0 ) { } | ||
142 | 169 | ||
143 | Bu::List<int> lDelete; | 170 | Bu::List<int> lDelete; |
144 | // Now we just try to write all the pending data on all the sockets. | 171 | // Now we just try to write all the pending data on all the sockets. |
145 | // this could be done better eventually, if we care about the socket | 172 | // this could be done better eventually, if we care about the socket |
146 | // wanting to accept writes (using a select). | 173 | // wanting to accept writes (using a select). |
147 | mClients.lock(); | 174 | mClients.lock(); |
175 | |||
148 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) | 176 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) |
149 | { | 177 | { |
150 | if( (*i)->wantsDisconnect() && !(*i)->hasOutput() ) | 178 | if( (*i)->wantsDisconnect() && !(*i)->hasOutput() ) |
@@ -174,10 +202,18 @@ void Bu::Server::addClient( const Bu::ServerSocket *pSrv, Bu::Socket *pSocket ) | |||
174 | { | 202 | { |
175 | throw Bu::ExceptionBase("No file descriptor?"); | 203 | throw Bu::ExceptionBase("No file descriptor?"); |
176 | } | 204 | } |
177 | FD_SET( iFdCli, &fdActive ); | 205 | |
206 | struct epoll_event ev; | ||
207 | ev.events = EPOLLIN | EPOLLONESHOT; | ||
208 | ev.data.fd = iFdCli; | ||
209 | epoll_ctl( pCore->fdRead, EPOLL_CTL_ADD, iFdCli, &ev ); | ||
210 | |||
211 | ev.events = 0; | ||
212 | epoll_ctl( pCore->fdWrite, EPOLL_CTL_ADD, iFdCli, &ev ); | ||
178 | 213 | ||
179 | Client *pClient = new Client( | 214 | Client *pClient = new Client( |
180 | new SrvClientLinkFactory() | 215 | iFdCli, |
216 | ServerInterface( *this ) | ||
181 | ); | 217 | ); |
182 | { | 218 | { |
183 | Bu::MutexLocker l( mClients ); | 219 | Bu::MutexLocker l( mClients ); |
@@ -252,6 +288,11 @@ void Bu::Server::tick() | |||
252 | void Bu::Server::shutdown() | 288 | void Bu::Server::shutdown() |
253 | { | 289 | { |
254 | { | 290 | { |
291 | int64_t iVal = 1; | ||
292 | write( pCore->fdEvent, &iVal, sizeof(int64_t) ); | ||
293 | } | ||
294 | |||
295 | { | ||
255 | qIoEvent.stop(); | 296 | qIoEvent.stop(); |
256 | qClientEvent.stop(); | 297 | qClientEvent.stop(); |
257 | Bu::Server::Event *pEv; | 298 | Bu::Server::Event *pEv; |
@@ -294,6 +335,24 @@ void Bu::Server::shutdown() | |||
294 | } | 335 | } |
295 | hClients.clear(); | 336 | hClients.clear(); |
296 | } | 337 | } |
338 | |||
339 | void Bu::Server::clientReadReady( fd iFd ) | ||
340 | { | ||
341 | // Re-arm | ||
342 | struct epoll_event ev; | ||
343 | ev.events = EPOLLIN | EPOLLONESHOT; | ||
344 | ev.data.fd = iFd; | ||
345 | epoll_ctl( pCore->fdRead, EPOLL_CTL_MOD, iFd, &ev ); | ||
346 | } | ||
347 | |||
348 | void Bu::Server::clientWriteReady( fd iFd ) | ||
349 | { | ||
350 | // Re-arm | ||
351 | struct epoll_event ev; | ||
352 | ev.events = EPOLLOUT | EPOLLONESHOT; | ||
353 | ev.data.fd = iFd; | ||
354 | epoll_ctl( pCore->fdWrite, EPOLL_CTL_MOD, iFd, &ev ); | ||
355 | } | ||
297 | 356 | ||
298 | void Bu::Server::closeClient( fd iSocket ) | 357 | void Bu::Server::closeClient( fd iSocket ) |
299 | { | 358 | { |
@@ -306,13 +365,60 @@ void Bu::Server::closeClient( fd iSocket ) | |||
306 | hClients.erase( iSocket ); | 365 | hClients.erase( iSocket ); |
307 | pSocket->close(); | 366 | pSocket->close(); |
308 | hSockets.erase( iSocket ); | 367 | hSockets.erase( iSocket ); |
309 | FD_CLR( iSocket, &fdActive ); | 368 | |
369 | struct epoll_event ev; | ||
370 | epoll_ctl( pCore->fdRead, EPOLL_CTL_DEL, iSocket, &ev ); | ||
371 | epoll_ctl( pCore->fdWrite, EPOLL_CTL_DEL, iSocket, &ev ); | ||
372 | |||
310 | delete pClient; | 373 | delete pClient; |
311 | delete pSocket; | 374 | delete pSocket; |
312 | BU_PROFILE_END("closeClient"); | 375 | BU_PROFILE_END("closeClient"); |
313 | } | 376 | } |
314 | 377 | ||
315 | //////// | 378 | //////// |
379 | // WriteMonitor | ||
380 | //// | ||
381 | Bu::Server::WriteMonitor::WriteMonitor( Server &rSrv ) : | ||
382 | rSrv( rSrv ) | ||
383 | { | ||
384 | } | ||
385 | |||
386 | Bu::Server::WriteMonitor::~WriteMonitor() | ||
387 | { | ||
388 | } | ||
389 | |||
390 | void Bu::Server::WriteMonitor::run() | ||
391 | { | ||
392 | setName("busrv-writeMon"); | ||
393 | struct epoll_event ev[MAX_EVENTS]; | ||
394 | for(;;) | ||
395 | { | ||
396 | int iCount = epoll_wait( rSrv.pCore->fdWrite, ev, MAX_EVENTS, 5000 ); | ||
397 | if( iCount < 0 ) | ||
398 | { | ||
399 | // Bad error? | ||
400 | return; | ||
401 | } | ||
402 | |||
403 | for( int j = 0; j < iCount; j++ ) | ||
404 | { | ||
405 | if( ev[j].data.fd == rSrv.pCore->fdEvent ) | ||
406 | { | ||
407 | Bu::println("Bu::Server::WriteMonitor -> got event on fdEvent, exiting..."); | ||
408 | return; | ||
409 | } | ||
410 | else | ||
411 | { | ||
412 | // Queue the write | ||
413 | rSrv.qIoEvent.enqueue( | ||
414 | new Event( ev[j].data.fd, Event::Write ) | ||
415 | ); | ||
416 | } | ||
417 | } | ||
418 | } | ||
419 | } | ||
420 | |||
421 | //////// | ||
316 | // Event | 422 | // Event |
317 | //// | 423 | //// |
318 | 424 | ||
@@ -365,7 +471,6 @@ void Bu::Server::IoWorker::run() | |||
365 | if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) ) | 471 | if( !rSrv.getClientAndSocket( pEv->getId(), pClient, pSocket ) ) |
366 | { | 472 | { |
367 | delete pEv; | 473 | delete pEv; |
368 | rSrv.evIoCycle.decrement(); | ||
369 | continue; | 474 | continue; |
370 | } | 475 | } |
371 | 476 | ||
@@ -384,9 +489,6 @@ void Bu::Server::IoWorker::run() | |||
384 | } | 489 | } |
385 | 490 | ||
386 | delete pEv; | 491 | delete pEv; |
387 | |||
388 | Bu::println("decrement"); | ||
389 | rSrv.evIoCycle.decrement(); | ||
390 | } | 492 | } |
391 | } | 493 | } |
392 | 494 | ||
@@ -404,7 +506,6 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) | |||
404 | try | 506 | try |
405 | { | 507 | { |
406 | iRead = pSocket->read( buf, RBS ); | 508 | iRead = pSocket->read( buf, RBS ); |
407 | Bu::println("IoWorker::handleRead: read() -> %1").arg( iRead ); | ||
408 | 509 | ||
409 | if( iRead == 0 ) | 510 | if( iRead == 0 ) |
410 | { | 511 | { |
@@ -421,7 +522,8 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) | |||
421 | catch( Bu::ExceptionBase &e ) | 522 | catch( Bu::ExceptionBase &e ) |
422 | { | 523 | { |
423 | Bu::println("IoWorker::handleRead: exception, closing: %1").arg( e.what() ); | 524 | Bu::println("IoWorker::handleRead: exception, closing: %1").arg( e.what() ); |
424 | close( pSocket ); | 525 | pClient->disconnect(); |
526 | //close( pSocket ); | ||
425 | return; | 527 | return; |
426 | } | 528 | } |
427 | } | 529 | } |
@@ -430,13 +532,16 @@ void Bu::Server::IoWorker::handleRead( Client *pClient, Socket *pSocket ) | |||
430 | if( iTotal == 0 ) | 532 | if( iTotal == 0 ) |
431 | { | 533 | { |
432 | Bu::println("IoWorker::handleRead: read nothing, closing"); | 534 | Bu::println("IoWorker::handleRead: read nothing, closing"); |
433 | close( pSocket ); | 535 | pClient->disconnect(); |
536 | //close( pSocket ); | ||
434 | } | 537 | } |
435 | else | 538 | else |
436 | { | 539 | { |
437 | Bu::Server::fd iFd; | 540 | Bu::Server::fd iFd; |
438 | pSocket->getFd( iFd ); | 541 | pSocket->getFd( iFd ); |
439 | rSrv.qClientEvent.enqueue( new Event( iFd, Event::Process ) ); | 542 | rSrv.qClientEvent.enqueue( new Event( iFd, Event::Process ) ); |
543 | |||
544 | rSrv.clientReadReady( iFd ); | ||
440 | } | 545 | } |
441 | } | 546 | } |
442 | 547 | ||
@@ -449,7 +554,6 @@ void Bu::Server::IoWorker::handleWrite( Client *pClient, Socket *pSocket ) | |||
449 | int iAmnt = RBS; | 554 | int iAmnt = RBS; |
450 | iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); | 555 | iAmnt = pClient->cbBuffer.server().peek( buf, iAmnt ); |
451 | int iReal = pSocket->write( buf, iAmnt ); | 556 | int iReal = pSocket->write( buf, iAmnt ); |
452 | Bu::println("IoWorker::handleWrite: Copy out: iAmnt=%1, iReal=%2\n>%3<").arg( iAmnt ).arg( iReal ).arg( Bu::String( buf, iReal ) ); | ||
453 | pClient->cbBuffer.server().seek( iReal ); | 557 | pClient->cbBuffer.server().seek( iReal ); |
454 | } | 558 | } |
455 | } | 559 | } |
@@ -494,6 +598,10 @@ void Bu::Server::ClientWorker::run() | |||
494 | pClient->processInput(); | 598 | pClient->processInput(); |
495 | Bu::println("Processing input complete."); | 599 | Bu::println("Processing input complete."); |
496 | Bu::println("*** ClientBuf: input: %1b, output: %2b").arg( pClient->getInputSize() ).arg( pClient->getOutputSize() ); | 600 | Bu::println("*** ClientBuf: input: %1b, output: %2b").arg( pClient->getInputSize() ).arg( pClient->getOutputSize() ); |
601 | if( pClient->getOutputSize() > 0 ) | ||
602 | { | ||
603 | rSrv.clientWriteReady( pClient->getId() ); | ||
604 | } | ||
497 | delete pEv; | 605 | delete pEv; |
498 | } | 606 | } |
499 | } | 607 | } |
diff --git a/src/stable/server.h b/src/stable/server.h index e2b7d53..56ac29a 100644 --- a/src/stable/server.h +++ b/src/stable/server.h | |||
@@ -10,10 +10,6 @@ | |||
10 | 10 | ||
11 | #include <stdint.h> | 11 | #include <stdint.h> |
12 | 12 | ||
13 | #ifndef WIN32 | ||
14 | #include <sys/select.h> | ||
15 | #endif | ||
16 | |||
17 | #include "bu/string.h" | 13 | #include "bu/string.h" |
18 | #include "bu/list.h" | 14 | #include "bu/list.h" |
19 | 15 | ||
@@ -62,6 +58,7 @@ namespace Bu | |||
62 | */ | 58 | */ |
63 | class Server | 59 | class Server |
64 | { | 60 | { |
61 | friend class ServerInterface; | ||
65 | public: | 62 | public: |
66 | Server( int iIoWorkers=4, int iClientWorkers=8 ); | 63 | Server( int iIoWorkers=4, int iClientWorkers=8 ); |
67 | virtual ~Server(); | 64 | virtual ~Server(); |
@@ -90,8 +87,12 @@ namespace Bu | |||
90 | 87 | ||
91 | void shutdown(); | 88 | void shutdown(); |
92 | 89 | ||
93 | private: | 90 | protected: |
91 | void clientReadReady( fd iFd ); | ||
92 | void clientWriteReady( fd iFd ); | ||
94 | void closeClient( fd iSocket ); | 93 | void closeClient( fd iSocket ); |
94 | |||
95 | private: | ||
95 | class SrvClientLink : public Bu::ClientLink | 96 | class SrvClientLink : public Bu::ClientLink |
96 | { | 97 | { |
97 | public: | 98 | public: |
@@ -113,6 +114,19 @@ namespace Bu | |||
113 | virtual Bu::ClientLink *createLink( Bu::Client *pClient ); | 114 | virtual Bu::ClientLink *createLink( Bu::Client *pClient ); |
114 | }; | 115 | }; |
115 | 116 | ||
117 | class WriteMonitor : public Bu::Thread | ||
118 | { | ||
119 | public: | ||
120 | WriteMonitor( Server &rSrv ); | ||
121 | virtual ~WriteMonitor(); | ||
122 | |||
123 | protected: | ||
124 | virtual void run(); | ||
125 | |||
126 | private: | ||
127 | Server &rSrv; | ||
128 | }; | ||
129 | |||
116 | class Event | 130 | class Event |
117 | { | 131 | { |
118 | public: | 132 | public: |
@@ -168,9 +182,9 @@ namespace Bu | |||
168 | }; | 182 | }; |
169 | friend class Bu::Server::ClientWorker; | 183 | friend class Bu::Server::ClientWorker; |
170 | 184 | ||
185 | class __ServerCore *pCore; | ||
171 | int nTimeoutSec; | 186 | int nTimeoutSec; |
172 | int nTimeoutUSec; | 187 | int nTimeoutUSec; |
173 | fd_set fdActive; | ||
174 | typedef Hash<fd,ServerSocket *> SrvHash; | 188 | typedef Hash<fd,ServerSocket *> SrvHash; |
175 | SrvHash hServers; | 189 | SrvHash hServers; |
176 | typedef Hash<fd,Client *> ClientHash; | 190 | typedef Hash<fd,Client *> ClientHash; |
@@ -188,7 +202,8 @@ namespace Bu | |||
188 | typedef List<ClientWorker *> ClientWorkerList; | 202 | typedef List<ClientWorker *> ClientWorkerList; |
189 | IoWorkerList lIoWorker; | 203 | IoWorkerList lIoWorker; |
190 | ClientWorkerList lClientWorker; | 204 | ClientWorkerList lClientWorker; |
191 | Bu::CounterEvent evIoCycle; | 205 | WriteMonitor tMonitorWrite; |
206 | bool bRunning; | ||
192 | }; | 207 | }; |
193 | } | 208 | } |
194 | 209 | ||
diff --git a/src/stable/serverinterface.cpp b/src/stable/serverinterface.cpp new file mode 100644 index 0000000..0e3122e --- /dev/null +++ b/src/stable/serverinterface.cpp | |||
@@ -0,0 +1,29 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2007-2019 Xagasoft, All rights reserved. | ||
3 | * | ||
4 | * This file is part of the libbu++ library and is released under the | ||
5 | * terms of the license contained in the file LICENSE. | ||
6 | */ | ||
7 | |||
8 | #include "bu/serverinterface.h" | ||
9 | #include "bu/server.h" | ||
10 | |||
11 | Bu::ServerInterface::ServerInterface( Server &rSrv ) : | ||
12 | rSrv( rSrv ) | ||
13 | { | ||
14 | } | ||
15 | |||
16 | Bu::ServerInterface::ServerInterface( const ServerInterface &rSrc ) : | ||
17 | rSrv( rSrc.rSrv ) | ||
18 | { | ||
19 | } | ||
20 | |||
21 | Bu::ServerInterface::~ServerInterface() | ||
22 | { | ||
23 | } | ||
24 | |||
25 | void Bu::ServerInterface::outputReady( int iClientId ) | ||
26 | { | ||
27 | rSrv.clientWriteReady( iClientId ); | ||
28 | } | ||
29 | |||
diff --git a/src/stable/serverinterface.h b/src/stable/serverinterface.h new file mode 100644 index 0000000..edc8335 --- /dev/null +++ b/src/stable/serverinterface.h | |||
@@ -0,0 +1,40 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2007-2019 Xagasoft, All rights reserved. | ||
3 | * | ||
4 | * This file is part of the libbu++ library and is released under the | ||
5 | * terms of the license contained in the file LICENSE. | ||
6 | */ | ||
7 | |||
8 | #ifndef BU_SERVER_INTERFACE_H | ||
9 | #define BU_SERVER_INTERFACE_H | ||
10 | |||
11 | #include <stdint.h> | ||
12 | |||
13 | namespace Bu | ||
14 | { | ||
15 | class Server; | ||
16 | |||
17 | class ServerInterface | ||
18 | { | ||
19 | friend class Bu::Server; | ||
20 | private: | ||
21 | ServerInterface( Bu::Server &rSrv ); | ||
22 | public: | ||
23 | ServerInterface( const Bu::ServerInterface &rSrc ); | ||
24 | ~ServerInterface(); | ||
25 | |||
26 | /** | ||
27 | * Clients will call this when they have filled the output buffer and | ||
28 | * are ready for that data to be sent. This only needs to be called | ||
29 | * when async output is generated, that is when output is generated not | ||
30 | * in the processInput method. | ||
31 | */ | ||
32 | void outputReady( int iClientId ); | ||
33 | |||
34 | private: | ||
35 | Bu::Server &rSrv; | ||
36 | }; | ||
37 | } | ||
38 | |||
39 | #endif | ||
40 | |||
diff --git a/src/unstable/protocolwebsocket.cpp b/src/unstable/protocolwebsocket.cpp index fa2d882..2bec1ce 100644 --- a/src/unstable/protocolwebsocket.cpp +++ b/src/unstable/protocolwebsocket.cpp | |||
@@ -21,8 +21,8 @@ | |||
21 | 21 | ||
22 | #include <stdlib.h> | 22 | #include <stdlib.h> |
23 | 23 | ||
24 | //#define DEBUG( X ) { } (void)0 | 24 | #define DEBUG( X ) { } (void)0 |
25 | #define DEBUG( X ) { X; } (void)0 | 25 | //#define DEBUG( X ) { X; } (void)0 |
26 | 26 | ||
27 | Bu::ProtocolWebSocket::ProtocolWebSocket() : | 27 | Bu::ProtocolWebSocket::ProtocolWebSocket() : |
28 | eStatus( stProtoId ) | 28 | eStatus( stProtoId ) |
@@ -120,6 +120,7 @@ void Bu::ProtocolWebSocket::writeMessage( const Bu::String &sData, | |||
120 | return; | 120 | return; |
121 | pClient->write( cHeader, idx ); | 121 | pClient->write( cHeader, idx ); |
122 | pClient->write( sData ); | 122 | pClient->write( sData ); |
123 | pClient->outputReady(); | ||
123 | } | 124 | } |
124 | 125 | ||
125 | bool Bu::ProtocolWebSocket::stateProtoId() | 126 | bool Bu::ProtocolWebSocket::stateProtoId() |