aboutsummaryrefslogtreecommitdiff
path: root/src/unstable/itoserver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/unstable/itoserver.cpp')
-rw-r--r--src/unstable/itoserver.cpp241
1 files changed, 0 insertions, 241 deletions
diff --git a/src/unstable/itoserver.cpp b/src/unstable/itoserver.cpp
deleted file mode 100644
index 9e9d5e0..0000000
--- a/src/unstable/itoserver.cpp
+++ /dev/null
@@ -1,241 +0,0 @@
1/*
2 * Copyright (C) 2007-2019 Xagasoft, All rights reserved.
3 *
4 * This file is part of the libbu++ library and is released under the
5 * terms of the license contained in the file LICENSE.
6 */
7
8#include "bu/config.h"
9#include "bu/itoserver.h"
10#include <errno.h>
11#include "bu/tcpserversocket.h"
12#include "bu/client.h"
13#include "bu/tcpsocket.h"
14
15Bu::ItoServer::ItoServer() :
16 nTimeoutSec( 1 ),
17 nTimeoutUSec( 0 )
18{
19 FD_ZERO( &fdActive );
20}
21
22Bu::ItoServer::~ItoServer()
23{
24 while( !qClientCleanup.isEmpty() )
25 {
26 ItoClient *pCli = qClientCleanup.dequeue();
27 pCli->join();
28 delete pCli;
29 }
30 // TODO: Make sure here that each client has shutdown it's socket, and
31 // maybe even written any extra data, we could put a timelimit on this...
32 // anyway, it's not as clean as it could be right now.
33 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ )
34 {
35 ItoClient *pCli = (*i);
36 pCli->join();
37 delete pCli;
38 }
39}
40
41void Bu::ItoServer::addPort( int nPort, int nPoolSize )
42{
43 TcpServerSocket *s = new TcpServerSocket( nPort, nPoolSize );
44 socket_t nSocket = s->getSocket();
45 FD_SET( nSocket, &fdActive );
46 hServers.insert( nSocket, s );
47}
48
49void Bu::ItoServer::addPort( const String &sAddr, int nPort, int nPoolSize )
50{
51 TcpServerSocket *s = new TcpServerSocket( sAddr, nPort, nPoolSize );
52 socket_t nSocket = s->getSocket();
53 FD_SET( nSocket, &fdActive );
54 hServers.insert( nSocket, s );
55}
56
57void Bu::ItoServer::setTimeout( int nTimeoutSec, int nTimeoutUSec )
58{
59 this->nTimeoutSec = nTimeoutSec;
60 this->nTimeoutUSec = nTimeoutUSec;
61}
62
63void Bu::ItoServer::addClient( socket_t nSocket, int nPort )
64{
65 ItoClient *pC = new ItoClient( *this, nSocket, nPort, nTimeoutSec,
66 nTimeoutUSec );
67
68 imClients.lock();
69 hClients.insert( nSocket, pC );
70 imClients.unlock();
71
72 pC->start();
73}
74
75void Bu::ItoServer::run()
76{
77 for(;;)
78 {
79 struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec };
80
81 fd_set fdRead = fdActive;
82 //fd_set fdWrite = fdActive;
83 fd_set fdException = fdActive;
84
85 if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, &fdRead, NULL, &fdException, &xTimeout ) ) < 0 )
86 {
87 throw ExceptionBase("Error attempting to scan open connections.");
88 }
89
90 for( ServerHash::iterator i = hServers.begin(); i != hServers.end(); i++ )
91 {
92 if( FD_ISSET( i.getKey(), &fdRead ) )
93 {
94 TcpServerSocket *pSrv = i.getValue();
95 addClient( pSrv->accept(), pSrv->getPort() );
96 }
97 }
98
99 while( !qClientCleanup.isEmpty() )
100 {
101 ItoClient *pCli = qClientCleanup.dequeue();
102 pCli->join();
103 delete pCli;
104 }
105 }
106}
107
108void Bu::ItoServer::clientCleanup( socket_t iSocket )
109{
110 imClients.lock();
111 ItoClient *pCli = hClients.get( iSocket );
112 imClients.unlock();
113 qClientCleanup.enqueue( pCli );
114}
115
116Bu::ItoServer::ItoClient::ItoClient( ItoServer &rSrv, Bu::ItoServer::socket_t iSocket, int iPort,
117 int nTimeoutSec, int nTimeoutUSec ) :
118 rSrv( rSrv ),
119 iSocket( iSocket ),
120 iPort( iPort ),
121 nTimeoutSec( nTimeoutSec ),
122 nTimeoutUSec( nTimeoutUSec )
123{
124 FD_ZERO( &fdActive );
125 FD_SET( iSocket, &fdActive );
126
127 pClient = new Client(
128 new Bu::TcpSocket( iSocket ),
129 new SrvClientLinkFactory( rSrv )
130 );
131}
132
133Bu::ItoServer::ItoClient::~ItoClient()
134{
135}
136
137void Bu::ItoServer::ItoClient::run()
138{
139 imProto.lock();
140 rSrv.onNewConnection( pClient, iPort );
141 pClient->processOutput();
142 imProto.unlock();
143
144 for(;;)
145 {
146 struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec };
147
148 fd_set fdRead = fdActive;
149 fd_set fdWrite;
150 fd_set fdException = fdActive;
151
152 FD_ZERO( &fdWrite );
153 if( pClient->hasOutput() )
154 FD_SET( iSocket, &fdWrite );
155
156 if( TEMP_FAILURE_RETRY( select( FD_SETSIZE,
157 &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 )
158 {
159 throw ExceptionBase("Error attempting to scan open connections.");
160 }
161
162 while( !qMsg.isEmpty() )
163 {
164 imProto.lock();
165 Bu::String *pMsg = qMsg.dequeue();
166 pClient->onMessage( *pMsg );
167 delete pMsg;
168 pClient->processOutput();
169 imProto.unlock();
170 }
171
172 if( FD_ISSET( iSocket, &fdRead ) )
173 {
174 imProto.lock();
175 pClient->processInput();
176 imProto.unlock();
177 if( !pClient->isOpen() )
178 {
179 imProto.lock();
180 rSrv.onClosedConnection( pClient );
181 imProto.unlock();
182
183 rSrv.clientCleanup( iSocket );
184
185 return;
186 }
187 }
188
189 if( FD_ISSET( iSocket, &fdWrite ) )
190 {
191 imProto.lock();
192 pClient->processOutput();
193 imProto.unlock();
194 }
195 }
196}
197
198Bu::ItoServer::SrvClientLink::SrvClientLink( ItoClient *pClient ) :
199 pClient( pClient )
200{
201}
202
203Bu::ItoServer::SrvClientLink::~SrvClientLink()
204{
205}
206
207void Bu::ItoServer::SrvClientLink::sendMessage( const Bu::String &sMsg )
208{
209 if( !pClient->imProto.trylock() )
210 {
211 pClient->pClient->onMessage( sMsg );
212 pClient->pClient->processOutput();
213 pClient->imProto.unlock();
214 }
215 else
216 {
217 Bu::String *pMsg = new Bu::String( sMsg );
218 pClient->qMsg.enqueue( pMsg );
219 }
220}
221
222Bu::ItoServer::SrvClientLinkFactory::SrvClientLinkFactory(
223 Bu::ItoServer &rSrv ) :
224 rSrv( rSrv )
225{
226}
227
228Bu::ItoServer::SrvClientLinkFactory::~SrvClientLinkFactory()
229{
230}
231
232Bu::ClientLink *Bu::ItoServer::SrvClientLinkFactory::createLink(
233 Bu::Client *pClient )
234{
235 rSrv.imClients.lock();
236 ItoClient *pCli = rSrv.hClients.get( *pClient->getSocket() );
237 rSrv.imClients.unlock();
238
239 return new SrvClientLink( pCli );
240}
241