diff options
Diffstat (limited to '')
-rw-r--r-- | src/unstable/itoserver.cpp | 312 |
1 files changed, 156 insertions, 156 deletions
diff --git a/src/unstable/itoserver.cpp b/src/unstable/itoserver.cpp index 7dbce6c..103bf90 100644 --- a/src/unstable/itoserver.cpp +++ b/src/unstable/itoserver.cpp | |||
@@ -13,121 +13,121 @@ | |||
13 | #include "bu/tcpsocket.h" | 13 | #include "bu/tcpsocket.h" |
14 | 14 | ||
15 | Bu::ItoServer::ItoServer() : | 15 | Bu::ItoServer::ItoServer() : |
16 | nTimeoutSec( 1 ), | 16 | nTimeoutSec( 1 ), |
17 | nTimeoutUSec( 0 ) | 17 | nTimeoutUSec( 0 ) |
18 | { | 18 | { |
19 | FD_ZERO( &fdActive ); | 19 | FD_ZERO( &fdActive ); |
20 | } | 20 | } |
21 | 21 | ||
22 | Bu::ItoServer::~ItoServer() | 22 | Bu::ItoServer::~ItoServer() |
23 | { | 23 | { |
24 | while( !qClientCleanup.isEmpty() ) | 24 | while( !qClientCleanup.isEmpty() ) |
25 | { | 25 | { |
26 | ItoClient *pCli = qClientCleanup.dequeue(); | 26 | ItoClient *pCli = qClientCleanup.dequeue(); |
27 | pCli->join(); | 27 | pCli->join(); |
28 | delete pCli; | 28 | delete pCli; |
29 | } | 29 | } |
30 | // TODO: Make sure here that each client has shutdown it's socket, and | 30 | // TODO: Make sure here that each client has shutdown it's socket, and |
31 | // maybe even written any extra data, we could put a timelimit on this... | 31 | // maybe even written any extra data, we could put a timelimit on this... |
32 | // anyway, it's not as clean as it could be right now. | 32 | // anyway, it's not as clean as it could be right now. |
33 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) | 33 | for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) |
34 | { | 34 | { |
35 | ItoClient *pCli = (*i); | 35 | ItoClient *pCli = (*i); |
36 | pCli->join(); | 36 | pCli->join(); |
37 | delete pCli; | 37 | delete pCli; |
38 | } | 38 | } |
39 | } | 39 | } |
40 | 40 | ||
41 | void Bu::ItoServer::addPort( int nPort, int nPoolSize ) | 41 | void Bu::ItoServer::addPort( int nPort, int nPoolSize ) |
42 | { | 42 | { |
43 | TcpServerSocket *s = new TcpServerSocket( nPort, nPoolSize ); | 43 | TcpServerSocket *s = new TcpServerSocket( nPort, nPoolSize ); |
44 | socket_t nSocket = s->getSocket(); | 44 | socket_t nSocket = s->getSocket(); |
45 | FD_SET( nSocket, &fdActive ); | 45 | FD_SET( nSocket, &fdActive ); |
46 | hServers.insert( nSocket, s ); | 46 | hServers.insert( nSocket, s ); |
47 | } | 47 | } |
48 | 48 | ||
49 | void Bu::ItoServer::addPort( const String &sAddr, int nPort, int nPoolSize ) | 49 | void Bu::ItoServer::addPort( const String &sAddr, int nPort, int nPoolSize ) |
50 | { | 50 | { |
51 | TcpServerSocket *s = new TcpServerSocket( sAddr, nPort, nPoolSize ); | 51 | TcpServerSocket *s = new TcpServerSocket( sAddr, nPort, nPoolSize ); |
52 | socket_t nSocket = s->getSocket(); | 52 | socket_t nSocket = s->getSocket(); |
53 | FD_SET( nSocket, &fdActive ); | 53 | FD_SET( nSocket, &fdActive ); |
54 | hServers.insert( nSocket, s ); | 54 | hServers.insert( nSocket, s ); |
55 | } | 55 | } |
56 | 56 | ||
57 | void Bu::ItoServer::setTimeout( int nTimeoutSec, int nTimeoutUSec ) | 57 | void Bu::ItoServer::setTimeout( int nTimeoutSec, int nTimeoutUSec ) |
58 | { | 58 | { |
59 | this->nTimeoutSec = nTimeoutSec; | 59 | this->nTimeoutSec = nTimeoutSec; |
60 | this->nTimeoutUSec = nTimeoutUSec; | 60 | this->nTimeoutUSec = nTimeoutUSec; |
61 | } | 61 | } |
62 | 62 | ||
63 | void Bu::ItoServer::addClient( socket_t nSocket, int nPort ) | 63 | void Bu::ItoServer::addClient( socket_t nSocket, int nPort ) |
64 | { | 64 | { |
65 | ItoClient *pC = new ItoClient( *this, nSocket, nPort, nTimeoutSec, | 65 | ItoClient *pC = new ItoClient( *this, nSocket, nPort, nTimeoutSec, |
66 | nTimeoutUSec ); | 66 | nTimeoutUSec ); |
67 | 67 | ||
68 | imClients.lock(); | 68 | imClients.lock(); |
69 | hClients.insert( nSocket, pC ); | 69 | hClients.insert( nSocket, pC ); |
70 | imClients.unlock(); | 70 | imClients.unlock(); |
71 | 71 | ||
72 | pC->start(); | 72 | pC->start(); |
73 | } | 73 | } |
74 | 74 | ||
75 | void Bu::ItoServer::run() | 75 | void Bu::ItoServer::run() |
76 | { | 76 | { |
77 | for(;;) | 77 | for(;;) |
78 | { | 78 | { |
79 | struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec }; | 79 | struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec }; |
80 | 80 | ||
81 | fd_set fdRead = fdActive; | 81 | fd_set fdRead = fdActive; |
82 | //fd_set fdWrite = fdActive; | 82 | //fd_set fdWrite = fdActive; |
83 | fd_set fdException = fdActive; | 83 | fd_set fdException = fdActive; |
84 | 84 | ||
85 | if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, &fdRead, NULL, &fdException, &xTimeout ) ) < 0 ) | 85 | if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, &fdRead, NULL, &fdException, &xTimeout ) ) < 0 ) |
86 | { | 86 | { |
87 | throw ExceptionBase("Error attempting to scan open connections."); | 87 | throw ExceptionBase("Error attempting to scan open connections."); |
88 | } | 88 | } |
89 | 89 | ||
90 | for( ServerHash::iterator i = hServers.begin(); i != hServers.end(); i++ ) | 90 | for( ServerHash::iterator i = hServers.begin(); i != hServers.end(); i++ ) |
91 | { | 91 | { |
92 | if( FD_ISSET( i.getKey(), &fdRead ) ) | 92 | if( FD_ISSET( i.getKey(), &fdRead ) ) |
93 | { | 93 | { |
94 | TcpServerSocket *pSrv = i.getValue(); | 94 | TcpServerSocket *pSrv = i.getValue(); |
95 | addClient( pSrv->accept(), pSrv->getPort() ); | 95 | addClient( pSrv->accept(), pSrv->getPort() ); |
96 | } | 96 | } |
97 | } | 97 | } |
98 | 98 | ||
99 | while( !qClientCleanup.isEmpty() ) | 99 | while( !qClientCleanup.isEmpty() ) |
100 | { | 100 | { |
101 | ItoClient *pCli = qClientCleanup.dequeue(); | 101 | ItoClient *pCli = qClientCleanup.dequeue(); |
102 | pCli->join(); | 102 | pCli->join(); |
103 | delete pCli; | 103 | delete pCli; |
104 | } | 104 | } |
105 | } | 105 | } |
106 | } | 106 | } |
107 | 107 | ||
108 | void Bu::ItoServer::clientCleanup( socket_t iSocket ) | 108 | void Bu::ItoServer::clientCleanup( socket_t iSocket ) |
109 | { | 109 | { |
110 | imClients.lock(); | 110 | imClients.lock(); |
111 | ItoClient *pCli = hClients.get( iSocket ); | 111 | ItoClient *pCli = hClients.get( iSocket ); |
112 | imClients.unlock(); | 112 | imClients.unlock(); |
113 | qClientCleanup.enqueue( pCli ); | 113 | qClientCleanup.enqueue( pCli ); |
114 | } | 114 | } |
115 | 115 | ||
116 | Bu::ItoServer::ItoClient::ItoClient( ItoServer &rSrv, Bu::ItoServer::socket_t iSocket, int iPort, | 116 | Bu::ItoServer::ItoClient::ItoClient( ItoServer &rSrv, Bu::ItoServer::socket_t iSocket, int iPort, |
117 | int nTimeoutSec, int nTimeoutUSec ) : | 117 | int nTimeoutSec, int nTimeoutUSec ) : |
118 | rSrv( rSrv ), | 118 | rSrv( rSrv ), |
119 | iSocket( iSocket ), | 119 | iSocket( iSocket ), |
120 | iPort( iPort ), | 120 | iPort( iPort ), |
121 | nTimeoutSec( nTimeoutSec ), | 121 | nTimeoutSec( nTimeoutSec ), |
122 | nTimeoutUSec( nTimeoutUSec ) | 122 | nTimeoutUSec( nTimeoutUSec ) |
123 | { | 123 | { |
124 | FD_ZERO( &fdActive ); | 124 | FD_ZERO( &fdActive ); |
125 | FD_SET( iSocket, &fdActive ); | 125 | FD_SET( iSocket, &fdActive ); |
126 | 126 | ||
127 | pClient = new Client( | 127 | pClient = new Client( |
128 | new Bu::TcpSocket( iSocket ), | 128 | new Bu::TcpSocket( iSocket ), |
129 | new SrvClientLinkFactory( rSrv ) | 129 | new SrvClientLinkFactory( rSrv ) |
130 | ); | 130 | ); |
131 | } | 131 | } |
132 | 132 | ||
133 | Bu::ItoServer::ItoClient::~ItoClient() | 133 | Bu::ItoServer::ItoClient::~ItoClient() |
@@ -136,67 +136,67 @@ Bu::ItoServer::ItoClient::~ItoClient() | |||
136 | 136 | ||
137 | void Bu::ItoServer::ItoClient::run() | 137 | void Bu::ItoServer::ItoClient::run() |
138 | { | 138 | { |
139 | imProto.lock(); | 139 | imProto.lock(); |
140 | rSrv.onNewConnection( pClient, iPort ); | 140 | rSrv.onNewConnection( pClient, iPort ); |
141 | pClient->processOutput(); | 141 | pClient->processOutput(); |
142 | imProto.unlock(); | 142 | imProto.unlock(); |
143 | 143 | ||
144 | for(;;) | 144 | for(;;) |
145 | { | 145 | { |
146 | struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec }; | 146 | struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec }; |
147 | 147 | ||
148 | fd_set fdRead = fdActive; | 148 | fd_set fdRead = fdActive; |
149 | fd_set fdWrite; | 149 | fd_set fdWrite; |
150 | fd_set fdException = fdActive; | 150 | fd_set fdException = fdActive; |
151 | 151 | ||
152 | FD_ZERO( &fdWrite ); | 152 | FD_ZERO( &fdWrite ); |
153 | if( pClient->hasOutput() ) | 153 | if( pClient->hasOutput() ) |
154 | FD_SET( iSocket, &fdWrite ); | 154 | FD_SET( iSocket, &fdWrite ); |
155 | 155 | ||
156 | if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, | 156 | if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, |
157 | &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 ) | 157 | &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 ) |
158 | { | 158 | { |
159 | throw ExceptionBase("Error attempting to scan open connections."); | 159 | throw ExceptionBase("Error attempting to scan open connections."); |
160 | } | 160 | } |
161 | 161 | ||
162 | while( !qMsg.isEmpty() ) | 162 | while( !qMsg.isEmpty() ) |
163 | { | 163 | { |
164 | imProto.lock(); | 164 | imProto.lock(); |
165 | Bu::String *pMsg = qMsg.dequeue(); | 165 | Bu::String *pMsg = qMsg.dequeue(); |
166 | pClient->onMessage( *pMsg ); | 166 | pClient->onMessage( *pMsg ); |
167 | delete pMsg; | 167 | delete pMsg; |
168 | pClient->processOutput(); | 168 | pClient->processOutput(); |
169 | imProto.unlock(); | 169 | imProto.unlock(); |
170 | } | 170 | } |
171 | 171 | ||
172 | if( FD_ISSET( iSocket, &fdRead ) ) | 172 | if( FD_ISSET( iSocket, &fdRead ) ) |
173 | { | 173 | { |
174 | imProto.lock(); | 174 | imProto.lock(); |
175 | pClient->processInput(); | 175 | pClient->processInput(); |
176 | imProto.unlock(); | 176 | imProto.unlock(); |
177 | if( !pClient->isOpen() ) | 177 | if( !pClient->isOpen() ) |
178 | { | 178 | { |
179 | imProto.lock(); | 179 | imProto.lock(); |
180 | rSrv.onClosedConnection( pClient ); | 180 | rSrv.onClosedConnection( pClient ); |
181 | imProto.unlock(); | 181 | imProto.unlock(); |
182 | 182 | ||
183 | rSrv.clientCleanup( iSocket ); | 183 | rSrv.clientCleanup( iSocket ); |
184 | 184 | ||
185 | return; | 185 | return; |
186 | } | 186 | } |
187 | } | 187 | } |
188 | 188 | ||
189 | if( FD_ISSET( iSocket, &fdWrite ) ) | 189 | if( FD_ISSET( iSocket, &fdWrite ) ) |
190 | { | 190 | { |
191 | imProto.lock(); | 191 | imProto.lock(); |
192 | pClient->processOutput(); | 192 | pClient->processOutput(); |
193 | imProto.unlock(); | 193 | imProto.unlock(); |
194 | } | 194 | } |
195 | } | 195 | } |
196 | } | 196 | } |
197 | 197 | ||
198 | Bu::ItoServer::SrvClientLink::SrvClientLink( ItoClient *pClient ) : | 198 | Bu::ItoServer::SrvClientLink::SrvClientLink( ItoClient *pClient ) : |
199 | pClient( pClient ) | 199 | pClient( pClient ) |
200 | { | 200 | { |
201 | } | 201 | } |
202 | 202 | ||
@@ -206,22 +206,22 @@ Bu::ItoServer::SrvClientLink::~SrvClientLink() | |||
206 | 206 | ||
207 | void Bu::ItoServer::SrvClientLink::sendMessage( const Bu::String &sMsg ) | 207 | void Bu::ItoServer::SrvClientLink::sendMessage( const Bu::String &sMsg ) |
208 | { | 208 | { |
209 | if( !pClient->imProto.trylock() ) | 209 | if( !pClient->imProto.trylock() ) |
210 | { | 210 | { |
211 | pClient->pClient->onMessage( sMsg ); | 211 | pClient->pClient->onMessage( sMsg ); |
212 | pClient->pClient->processOutput(); | 212 | pClient->pClient->processOutput(); |
213 | pClient->imProto.unlock(); | 213 | pClient->imProto.unlock(); |
214 | } | 214 | } |
215 | else | 215 | else |
216 | { | 216 | { |
217 | Bu::String *pMsg = new Bu::String( sMsg ); | 217 | Bu::String *pMsg = new Bu::String( sMsg ); |
218 | pClient->qMsg.enqueue( pMsg ); | 218 | pClient->qMsg.enqueue( pMsg ); |
219 | } | 219 | } |
220 | } | 220 | } |
221 | 221 | ||
222 | Bu::ItoServer::SrvClientLinkFactory::SrvClientLinkFactory( | 222 | Bu::ItoServer::SrvClientLinkFactory::SrvClientLinkFactory( |
223 | Bu::ItoServer &rSrv ) : | 223 | Bu::ItoServer &rSrv ) : |
224 | rSrv( rSrv ) | 224 | rSrv( rSrv ) |
225 | { | 225 | { |
226 | } | 226 | } |
227 | 227 | ||
@@ -230,12 +230,12 @@ Bu::ItoServer::SrvClientLinkFactory::~SrvClientLinkFactory() | |||
230 | } | 230 | } |
231 | 231 | ||
232 | Bu::ClientLink *Bu::ItoServer::SrvClientLinkFactory::createLink( | 232 | Bu::ClientLink *Bu::ItoServer::SrvClientLinkFactory::createLink( |
233 | Bu::Client *pClient ) | 233 | Bu::Client *pClient ) |
234 | { | 234 | { |
235 | rSrv.imClients.lock(); | 235 | rSrv.imClients.lock(); |
236 | ItoClient *pCli = rSrv.hClients.get( *pClient->getSocket() ); | 236 | ItoClient *pCli = rSrv.hClients.get( *pClient->getSocket() ); |
237 | rSrv.imClients.unlock(); | 237 | rSrv.imClients.unlock(); |
238 | 238 | ||
239 | return new SrvClientLink( pCli ); | 239 | return new SrvClientLink( pCli ); |
240 | } | 240 | } |
241 | 241 | ||