summaryrefslogtreecommitdiff
path: root/src/unstable/itoserver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/unstable/itoserver.cpp')
-rw-r--r--src/unstable/itoserver.cpp242
1 files changed, 242 insertions, 0 deletions
diff --git a/src/unstable/itoserver.cpp b/src/unstable/itoserver.cpp
new file mode 100644
index 0000000..c7165e2
--- /dev/null
+++ b/src/unstable/itoserver.cpp
@@ -0,0 +1,242 @@
1/*
2 * Copyright (C) 2007-2011 Xagasoft, All rights reserved.
3 *
4 * This file is part of the libbu++ library and is released under the
5 * terms of the license contained in the file LICENSE.
6 */
7
8#include "bu/itoserver.h"
9#include <errno.h>
10#include "bu/tcpserversocket.h"
11#include "bu/client.h"
12#include "bu/tcpsocket.h"
13
14#include "bu/config.h"
15
16Bu::ItoServer::ItoServer() :
17 nTimeoutSec( 1 ),
18 nTimeoutUSec( 0 )
19{
20 FD_ZERO( &fdActive );
21}
22
23Bu::ItoServer::~ItoServer()
24{
25 while( !qClientCleanup.isEmpty() )
26 {
27 ItoClient *pCli = qClientCleanup.dequeue();
28 pCli->join();
29 delete pCli;
30 }
31 // TODO: Make sure here that each client has shutdown it's socket, and
32 // maybe even written any extra data, we could put a timelimit on this...
33 // anyway, it's not as clean as it could be right now.
34 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ )
35 {
36 ItoClient *pCli = (*i);
37 pCli->join();
38 delete pCli;
39 }
40}
41
42void Bu::ItoServer::addPort( int nPort, int nPoolSize )
43{
44 TcpServerSocket *s = new TcpServerSocket( nPort, nPoolSize );
45 int nSocket = s->getSocket();
46 FD_SET( nSocket, &fdActive );
47 hServers.insert( nSocket, s );
48}
49
50void Bu::ItoServer::addPort( const String &sAddr, int nPort, int nPoolSize )
51{
52 TcpServerSocket *s = new TcpServerSocket( sAddr, nPort, nPoolSize );
53 int nSocket = s->getSocket();
54 FD_SET( nSocket, &fdActive );
55 hServers.insert( nSocket, s );
56}
57
58void Bu::ItoServer::setTimeout( int nTimeoutSec, int nTimeoutUSec )
59{
60 this->nTimeoutSec = nTimeoutSec;
61 this->nTimeoutUSec = nTimeoutUSec;
62}
63
64void Bu::ItoServer::addClient( int nSocket, int nPort )
65{
66 ItoClient *pC = new ItoClient( *this, nSocket, nPort, nTimeoutSec,
67 nTimeoutUSec );
68
69 imClients.lock();
70 hClients.insert( nSocket, pC );
71 imClients.unlock();
72
73 pC->start();
74}
75
76void Bu::ItoServer::run()
77{
78 for(;;)
79 {
80 struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec };
81
82 fd_set fdRead = fdActive;
83 //fd_set fdWrite = fdActive;
84 fd_set fdException = fdActive;
85
86 if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, &fdRead, NULL, &fdException, &xTimeout ) ) < 0 )
87 {
88 throw ExceptionBase("Error attempting to scan open connections.");
89 }
90
91 for( ServerHash::iterator i = hServers.begin(); i != hServers.end(); i++ )
92 {
93 if( FD_ISSET( i.getKey(), &fdRead ) )
94 {
95 TcpServerSocket *pSrv = i.getValue();
96 addClient( pSrv->accept(), pSrv->getPort() );
97 }
98 }
99
100 while( !qClientCleanup.isEmpty() )
101 {
102 ItoClient *pCli = qClientCleanup.dequeue();
103 pCli->join();
104 delete pCli;
105 }
106 }
107}
108
109void Bu::ItoServer::clientCleanup( int iSocket )
110{
111 imClients.lock();
112 ItoClient *pCli = hClients.get( iSocket );
113 imClients.unlock();
114 qClientCleanup.enqueue( pCli );
115}
116
117Bu::ItoServer::ItoClient::ItoClient( ItoServer &rSrv, int iSocket, int iPort,
118 int nTimeoutSec, int nTimeoutUSec ) :
119 rSrv( rSrv ),
120 iSocket( iSocket ),
121 iPort( iPort ),
122 nTimeoutSec( nTimeoutSec ),
123 nTimeoutUSec( nTimeoutUSec )
124{
125 FD_ZERO( &fdActive );
126 FD_SET( iSocket, &fdActive );
127
128 pClient = new Client(
129 new Bu::TcpSocket( iSocket ),
130 new SrvClientLinkFactory( rSrv )
131 );
132}
133
134Bu::ItoServer::ItoClient::~ItoClient()
135{
136}
137
138void Bu::ItoServer::ItoClient::run()
139{
140 imProto.lock();
141 rSrv.onNewConnection( pClient, iPort );
142 pClient->processOutput();
143 imProto.unlock();
144
145 for(;;)
146 {
147 struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec };
148
149 fd_set fdRead = fdActive;
150 fd_set fdWrite;
151 fd_set fdException = fdActive;
152
153 FD_ZERO( &fdWrite );
154 if( pClient->hasOutput() )
155 FD_SET( iSocket, &fdWrite );
156
157 if( TEMP_FAILURE_RETRY( select( FD_SETSIZE,
158 &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 )
159 {
160 throw ExceptionBase("Error attempting to scan open connections.");
161 }
162
163 while( !qMsg.isEmpty() )
164 {
165 imProto.lock();
166 Bu::String *pMsg = qMsg.dequeue();
167 pClient->onMessage( *pMsg );
168 delete pMsg;
169 pClient->processOutput();
170 imProto.unlock();
171 }
172
173 if( FD_ISSET( iSocket, &fdRead ) )
174 {
175 imProto.lock();
176 pClient->processInput();
177 imProto.unlock();
178 if( !pClient->isOpen() )
179 {
180 imProto.lock();
181 rSrv.onClosedConnection( pClient );
182 imProto.unlock();
183
184 rSrv.clientCleanup( iSocket );
185
186 return;
187 }
188 }
189
190 if( FD_ISSET( iSocket, &fdWrite ) )
191 {
192 imProto.lock();
193 pClient->processOutput();
194 imProto.unlock();
195 }
196 }
197}
198
199Bu::ItoServer::SrvClientLink::SrvClientLink( ItoClient *pClient ) :
200 pClient( pClient )
201{
202}
203
204Bu::ItoServer::SrvClientLink::~SrvClientLink()
205{
206}
207
208void Bu::ItoServer::SrvClientLink::sendMessage( const Bu::String &sMsg )
209{
210 if( !pClient->imProto.trylock() )
211 {
212 pClient->pClient->onMessage( sMsg );
213 pClient->pClient->processOutput();
214 pClient->imProto.unlock();
215 }
216 else
217 {
218 Bu::String *pMsg = new Bu::String( sMsg );
219 pClient->qMsg.enqueue( pMsg );
220 }
221}
222
223Bu::ItoServer::SrvClientLinkFactory::SrvClientLinkFactory(
224 Bu::ItoServer &rSrv ) :
225 rSrv( rSrv )
226{
227}
228
229Bu::ItoServer::SrvClientLinkFactory::~SrvClientLinkFactory()
230{
231}
232
233Bu::ClientLink *Bu::ItoServer::SrvClientLinkFactory::createLink(
234 Bu::Client *pClient )
235{
236 rSrv.imClients.lock();
237 ItoClient *pCli = rSrv.hClients.get( *pClient->getSocket() );
238 rSrv.imClients.unlock();
239
240 return new SrvClientLink( pCli );
241}
242