aboutsummaryrefslogtreecommitdiff
path: root/src/unstable/itoserver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/unstable/itoserver.cpp')
-rw-r--r--src/unstable/itoserver.cpp312
1 files changed, 156 insertions, 156 deletions
diff --git a/src/unstable/itoserver.cpp b/src/unstable/itoserver.cpp
index 7dbce6c..103bf90 100644
--- a/src/unstable/itoserver.cpp
+++ b/src/unstable/itoserver.cpp
@@ -13,121 +13,121 @@
13#include "bu/tcpsocket.h" 13#include "bu/tcpsocket.h"
14 14
15Bu::ItoServer::ItoServer() : 15Bu::ItoServer::ItoServer() :
16 nTimeoutSec( 1 ), 16 nTimeoutSec( 1 ),
17 nTimeoutUSec( 0 ) 17 nTimeoutUSec( 0 )
18{ 18{
19 FD_ZERO( &fdActive ); 19 FD_ZERO( &fdActive );
20} 20}
21 21
22Bu::ItoServer::~ItoServer() 22Bu::ItoServer::~ItoServer()
23{ 23{
24 while( !qClientCleanup.isEmpty() ) 24 while( !qClientCleanup.isEmpty() )
25 { 25 {
26 ItoClient *pCli = qClientCleanup.dequeue(); 26 ItoClient *pCli = qClientCleanup.dequeue();
27 pCli->join(); 27 pCli->join();
28 delete pCli; 28 delete pCli;
29 } 29 }
30 // TODO: Make sure here that each client has shutdown it's socket, and 30 // TODO: Make sure here that each client has shutdown it's socket, and
31 // maybe even written any extra data, we could put a timelimit on this... 31 // maybe even written any extra data, we could put a timelimit on this...
32 // anyway, it's not as clean as it could be right now. 32 // anyway, it's not as clean as it could be right now.
33 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ ) 33 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ )
34 { 34 {
35 ItoClient *pCli = (*i); 35 ItoClient *pCli = (*i);
36 pCli->join(); 36 pCli->join();
37 delete pCli; 37 delete pCli;
38 } 38 }
39} 39}
40 40
41void Bu::ItoServer::addPort( int nPort, int nPoolSize ) 41void Bu::ItoServer::addPort( int nPort, int nPoolSize )
42{ 42{
43 TcpServerSocket *s = new TcpServerSocket( nPort, nPoolSize ); 43 TcpServerSocket *s = new TcpServerSocket( nPort, nPoolSize );
44 socket_t nSocket = s->getSocket(); 44 socket_t nSocket = s->getSocket();
45 FD_SET( nSocket, &fdActive ); 45 FD_SET( nSocket, &fdActive );
46 hServers.insert( nSocket, s ); 46 hServers.insert( nSocket, s );
47} 47}
48 48
49void Bu::ItoServer::addPort( const String &sAddr, int nPort, int nPoolSize ) 49void Bu::ItoServer::addPort( const String &sAddr, int nPort, int nPoolSize )
50{ 50{
51 TcpServerSocket *s = new TcpServerSocket( sAddr, nPort, nPoolSize ); 51 TcpServerSocket *s = new TcpServerSocket( sAddr, nPort, nPoolSize );
52 socket_t nSocket = s->getSocket(); 52 socket_t nSocket = s->getSocket();
53 FD_SET( nSocket, &fdActive ); 53 FD_SET( nSocket, &fdActive );
54 hServers.insert( nSocket, s ); 54 hServers.insert( nSocket, s );
55} 55}
56 56
57void Bu::ItoServer::setTimeout( int nTimeoutSec, int nTimeoutUSec ) 57void Bu::ItoServer::setTimeout( int nTimeoutSec, int nTimeoutUSec )
58{ 58{
59 this->nTimeoutSec = nTimeoutSec; 59 this->nTimeoutSec = nTimeoutSec;
60 this->nTimeoutUSec = nTimeoutUSec; 60 this->nTimeoutUSec = nTimeoutUSec;
61} 61}
62 62
63void Bu::ItoServer::addClient( socket_t nSocket, int nPort ) 63void Bu::ItoServer::addClient( socket_t nSocket, int nPort )
64{ 64{
65 ItoClient *pC = new ItoClient( *this, nSocket, nPort, nTimeoutSec, 65 ItoClient *pC = new ItoClient( *this, nSocket, nPort, nTimeoutSec,
66 nTimeoutUSec ); 66 nTimeoutUSec );
67 67
68 imClients.lock(); 68 imClients.lock();
69 hClients.insert( nSocket, pC ); 69 hClients.insert( nSocket, pC );
70 imClients.unlock(); 70 imClients.unlock();
71 71
72 pC->start(); 72 pC->start();
73} 73}
74 74
75void Bu::ItoServer::run() 75void Bu::ItoServer::run()
76{ 76{
77 for(;;) 77 for(;;)
78 { 78 {
79 struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec }; 79 struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec };
80 80
81 fd_set fdRead = fdActive; 81 fd_set fdRead = fdActive;
82 //fd_set fdWrite = fdActive; 82 //fd_set fdWrite = fdActive;
83 fd_set fdException = fdActive; 83 fd_set fdException = fdActive;
84 84
85 if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, &fdRead, NULL, &fdException, &xTimeout ) ) < 0 ) 85 if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, &fdRead, NULL, &fdException, &xTimeout ) ) < 0 )
86 { 86 {
87 throw ExceptionBase("Error attempting to scan open connections."); 87 throw ExceptionBase("Error attempting to scan open connections.");
88 } 88 }
89 89
90 for( ServerHash::iterator i = hServers.begin(); i != hServers.end(); i++ ) 90 for( ServerHash::iterator i = hServers.begin(); i != hServers.end(); i++ )
91 { 91 {
92 if( FD_ISSET( i.getKey(), &fdRead ) ) 92 if( FD_ISSET( i.getKey(), &fdRead ) )
93 { 93 {
94 TcpServerSocket *pSrv = i.getValue(); 94 TcpServerSocket *pSrv = i.getValue();
95 addClient( pSrv->accept(), pSrv->getPort() ); 95 addClient( pSrv->accept(), pSrv->getPort() );
96 } 96 }
97 } 97 }
98 98
99 while( !qClientCleanup.isEmpty() ) 99 while( !qClientCleanup.isEmpty() )
100 { 100 {
101 ItoClient *pCli = qClientCleanup.dequeue(); 101 ItoClient *pCli = qClientCleanup.dequeue();
102 pCli->join(); 102 pCli->join();
103 delete pCli; 103 delete pCli;
104 } 104 }
105 } 105 }
106} 106}
107 107
108void Bu::ItoServer::clientCleanup( socket_t iSocket ) 108void Bu::ItoServer::clientCleanup( socket_t iSocket )
109{ 109{
110 imClients.lock(); 110 imClients.lock();
111 ItoClient *pCli = hClients.get( iSocket ); 111 ItoClient *pCli = hClients.get( iSocket );
112 imClients.unlock(); 112 imClients.unlock();
113 qClientCleanup.enqueue( pCli ); 113 qClientCleanup.enqueue( pCli );
114} 114}
115 115
116Bu::ItoServer::ItoClient::ItoClient( ItoServer &rSrv, Bu::ItoServer::socket_t iSocket, int iPort, 116Bu::ItoServer::ItoClient::ItoClient( ItoServer &rSrv, Bu::ItoServer::socket_t iSocket, int iPort,
117 int nTimeoutSec, int nTimeoutUSec ) : 117 int nTimeoutSec, int nTimeoutUSec ) :
118 rSrv( rSrv ), 118 rSrv( rSrv ),
119 iSocket( iSocket ), 119 iSocket( iSocket ),
120 iPort( iPort ), 120 iPort( iPort ),
121 nTimeoutSec( nTimeoutSec ), 121 nTimeoutSec( nTimeoutSec ),
122 nTimeoutUSec( nTimeoutUSec ) 122 nTimeoutUSec( nTimeoutUSec )
123{ 123{
124 FD_ZERO( &fdActive ); 124 FD_ZERO( &fdActive );
125 FD_SET( iSocket, &fdActive ); 125 FD_SET( iSocket, &fdActive );
126 126
127 pClient = new Client( 127 pClient = new Client(
128 new Bu::TcpSocket( iSocket ), 128 new Bu::TcpSocket( iSocket ),
129 new SrvClientLinkFactory( rSrv ) 129 new SrvClientLinkFactory( rSrv )
130 ); 130 );
131} 131}
132 132
133Bu::ItoServer::ItoClient::~ItoClient() 133Bu::ItoServer::ItoClient::~ItoClient()
@@ -136,67 +136,67 @@ Bu::ItoServer::ItoClient::~ItoClient()
136 136
137void Bu::ItoServer::ItoClient::run() 137void Bu::ItoServer::ItoClient::run()
138{ 138{
139 imProto.lock(); 139 imProto.lock();
140 rSrv.onNewConnection( pClient, iPort ); 140 rSrv.onNewConnection( pClient, iPort );
141 pClient->processOutput(); 141 pClient->processOutput();
142 imProto.unlock(); 142 imProto.unlock();
143 143
144 for(;;) 144 for(;;)
145 { 145 {
146 struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec }; 146 struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec };
147 147
148 fd_set fdRead = fdActive; 148 fd_set fdRead = fdActive;
149 fd_set fdWrite; 149 fd_set fdWrite;
150 fd_set fdException = fdActive; 150 fd_set fdException = fdActive;
151 151
152 FD_ZERO( &fdWrite ); 152 FD_ZERO( &fdWrite );
153 if( pClient->hasOutput() ) 153 if( pClient->hasOutput() )
154 FD_SET( iSocket, &fdWrite ); 154 FD_SET( iSocket, &fdWrite );
155 155
156 if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, 156 if( TEMP_FAILURE_RETRY( select( FD_SETSIZE,
157 &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 ) 157 &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 )
158 { 158 {
159 throw ExceptionBase("Error attempting to scan open connections."); 159 throw ExceptionBase("Error attempting to scan open connections.");
160 } 160 }
161 161
162 while( !qMsg.isEmpty() ) 162 while( !qMsg.isEmpty() )
163 { 163 {
164 imProto.lock(); 164 imProto.lock();
165 Bu::String *pMsg = qMsg.dequeue(); 165 Bu::String *pMsg = qMsg.dequeue();
166 pClient->onMessage( *pMsg ); 166 pClient->onMessage( *pMsg );
167 delete pMsg; 167 delete pMsg;
168 pClient->processOutput(); 168 pClient->processOutput();
169 imProto.unlock(); 169 imProto.unlock();
170 } 170 }
171 171
172 if( FD_ISSET( iSocket, &fdRead ) ) 172 if( FD_ISSET( iSocket, &fdRead ) )
173 { 173 {
174 imProto.lock(); 174 imProto.lock();
175 pClient->processInput(); 175 pClient->processInput();
176 imProto.unlock(); 176 imProto.unlock();
177 if( !pClient->isOpen() ) 177 if( !pClient->isOpen() )
178 { 178 {
179 imProto.lock(); 179 imProto.lock();
180 rSrv.onClosedConnection( pClient ); 180 rSrv.onClosedConnection( pClient );
181 imProto.unlock(); 181 imProto.unlock();
182 182
183 rSrv.clientCleanup( iSocket ); 183 rSrv.clientCleanup( iSocket );
184 184
185 return; 185 return;
186 } 186 }
187 } 187 }
188 188
189 if( FD_ISSET( iSocket, &fdWrite ) ) 189 if( FD_ISSET( iSocket, &fdWrite ) )
190 { 190 {
191 imProto.lock(); 191 imProto.lock();
192 pClient->processOutput(); 192 pClient->processOutput();
193 imProto.unlock(); 193 imProto.unlock();
194 } 194 }
195 } 195 }
196} 196}
197 197
198Bu::ItoServer::SrvClientLink::SrvClientLink( ItoClient *pClient ) : 198Bu::ItoServer::SrvClientLink::SrvClientLink( ItoClient *pClient ) :
199 pClient( pClient ) 199 pClient( pClient )
200{ 200{
201} 201}
202 202
@@ -206,22 +206,22 @@ Bu::ItoServer::SrvClientLink::~SrvClientLink()
206 206
207void Bu::ItoServer::SrvClientLink::sendMessage( const Bu::String &sMsg ) 207void Bu::ItoServer::SrvClientLink::sendMessage( const Bu::String &sMsg )
208{ 208{
209 if( !pClient->imProto.trylock() ) 209 if( !pClient->imProto.trylock() )
210 { 210 {
211 pClient->pClient->onMessage( sMsg ); 211 pClient->pClient->onMessage( sMsg );
212 pClient->pClient->processOutput(); 212 pClient->pClient->processOutput();
213 pClient->imProto.unlock(); 213 pClient->imProto.unlock();
214 } 214 }
215 else 215 else
216 { 216 {
217 Bu::String *pMsg = new Bu::String( sMsg ); 217 Bu::String *pMsg = new Bu::String( sMsg );
218 pClient->qMsg.enqueue( pMsg ); 218 pClient->qMsg.enqueue( pMsg );
219 } 219 }
220} 220}
221 221
222Bu::ItoServer::SrvClientLinkFactory::SrvClientLinkFactory( 222Bu::ItoServer::SrvClientLinkFactory::SrvClientLinkFactory(
223 Bu::ItoServer &rSrv ) : 223 Bu::ItoServer &rSrv ) :
224 rSrv( rSrv ) 224 rSrv( rSrv )
225{ 225{
226} 226}
227 227
@@ -230,12 +230,12 @@ Bu::ItoServer::SrvClientLinkFactory::~SrvClientLinkFactory()
230} 230}
231 231
232Bu::ClientLink *Bu::ItoServer::SrvClientLinkFactory::createLink( 232Bu::ClientLink *Bu::ItoServer::SrvClientLinkFactory::createLink(
233 Bu::Client *pClient ) 233 Bu::Client *pClient )
234{ 234{
235 rSrv.imClients.lock(); 235 rSrv.imClients.lock();
236 ItoClient *pCli = rSrv.hClients.get( *pClient->getSocket() ); 236 ItoClient *pCli = rSrv.hClients.get( *pClient->getSocket() );
237 rSrv.imClients.unlock(); 237 rSrv.imClients.unlock();
238 238
239 return new SrvClientLink( pCli ); 239 return new SrvClientLink( pCli );
240} 240}
241 241