aboutsummaryrefslogtreecommitdiff
path: root/src/unstable
diff options
context:
space:
mode:
Diffstat (limited to 'src/unstable')
-rw-r--r--src/unstable/itoserver.cpp241
-rw-r--r--src/unstable/itoserver.h147
2 files changed, 0 insertions, 388 deletions
diff --git a/src/unstable/itoserver.cpp b/src/unstable/itoserver.cpp
deleted file mode 100644
index 9e9d5e0..0000000
--- a/src/unstable/itoserver.cpp
+++ /dev/null
@@ -1,241 +0,0 @@
1/*
2 * Copyright (C) 2007-2019 Xagasoft, All rights reserved.
3 *
4 * This file is part of the libbu++ library and is released under the
5 * terms of the license contained in the file LICENSE.
6 */
7
8#include "bu/config.h"
9#include "bu/itoserver.h"
10#include <errno.h>
11#include "bu/tcpserversocket.h"
12#include "bu/client.h"
13#include "bu/tcpsocket.h"
14
15Bu::ItoServer::ItoServer() :
16 nTimeoutSec( 1 ),
17 nTimeoutUSec( 0 )
18{
19 FD_ZERO( &fdActive );
20}
21
22Bu::ItoServer::~ItoServer()
23{
24 while( !qClientCleanup.isEmpty() )
25 {
26 ItoClient *pCli = qClientCleanup.dequeue();
27 pCli->join();
28 delete pCli;
29 }
30 // TODO: Make sure here that each client has shutdown it's socket, and
31 // maybe even written any extra data, we could put a timelimit on this...
32 // anyway, it's not as clean as it could be right now.
33 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ )
34 {
35 ItoClient *pCli = (*i);
36 pCli->join();
37 delete pCli;
38 }
39}
40
41void Bu::ItoServer::addPort( int nPort, int nPoolSize )
42{
43 TcpServerSocket *s = new TcpServerSocket( nPort, nPoolSize );
44 socket_t nSocket = s->getSocket();
45 FD_SET( nSocket, &fdActive );
46 hServers.insert( nSocket, s );
47}
48
49void Bu::ItoServer::addPort( const String &sAddr, int nPort, int nPoolSize )
50{
51 TcpServerSocket *s = new TcpServerSocket( sAddr, nPort, nPoolSize );
52 socket_t nSocket = s->getSocket();
53 FD_SET( nSocket, &fdActive );
54 hServers.insert( nSocket, s );
55}
56
57void Bu::ItoServer::setTimeout( int nTimeoutSec, int nTimeoutUSec )
58{
59 this->nTimeoutSec = nTimeoutSec;
60 this->nTimeoutUSec = nTimeoutUSec;
61}
62
63void Bu::ItoServer::addClient( socket_t nSocket, int nPort )
64{
65 ItoClient *pC = new ItoClient( *this, nSocket, nPort, nTimeoutSec,
66 nTimeoutUSec );
67
68 imClients.lock();
69 hClients.insert( nSocket, pC );
70 imClients.unlock();
71
72 pC->start();
73}
74
75void Bu::ItoServer::run()
76{
77 for(;;)
78 {
79 struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec };
80
81 fd_set fdRead = fdActive;
82 //fd_set fdWrite = fdActive;
83 fd_set fdException = fdActive;
84
85 if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, &fdRead, NULL, &fdException, &xTimeout ) ) < 0 )
86 {
87 throw ExceptionBase("Error attempting to scan open connections.");
88 }
89
90 for( ServerHash::iterator i = hServers.begin(); i != hServers.end(); i++ )
91 {
92 if( FD_ISSET( i.getKey(), &fdRead ) )
93 {
94 TcpServerSocket *pSrv = i.getValue();
95 addClient( pSrv->accept(), pSrv->getPort() );
96 }
97 }
98
99 while( !qClientCleanup.isEmpty() )
100 {
101 ItoClient *pCli = qClientCleanup.dequeue();
102 pCli->join();
103 delete pCli;
104 }
105 }
106}
107
108void Bu::ItoServer::clientCleanup( socket_t iSocket )
109{
110 imClients.lock();
111 ItoClient *pCli = hClients.get( iSocket );
112 imClients.unlock();
113 qClientCleanup.enqueue( pCli );
114}
115
116Bu::ItoServer::ItoClient::ItoClient( ItoServer &rSrv, Bu::ItoServer::socket_t iSocket, int iPort,
117 int nTimeoutSec, int nTimeoutUSec ) :
118 rSrv( rSrv ),
119 iSocket( iSocket ),
120 iPort( iPort ),
121 nTimeoutSec( nTimeoutSec ),
122 nTimeoutUSec( nTimeoutUSec )
123{
124 FD_ZERO( &fdActive );
125 FD_SET( iSocket, &fdActive );
126
127 pClient = new Client(
128 new Bu::TcpSocket( iSocket ),
129 new SrvClientLinkFactory( rSrv )
130 );
131}
132
133Bu::ItoServer::ItoClient::~ItoClient()
134{
135}
136
137void Bu::ItoServer::ItoClient::run()
138{
139 imProto.lock();
140 rSrv.onNewConnection( pClient, iPort );
141 pClient->processOutput();
142 imProto.unlock();
143
144 for(;;)
145 {
146 struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec };
147
148 fd_set fdRead = fdActive;
149 fd_set fdWrite;
150 fd_set fdException = fdActive;
151
152 FD_ZERO( &fdWrite );
153 if( pClient->hasOutput() )
154 FD_SET( iSocket, &fdWrite );
155
156 if( TEMP_FAILURE_RETRY( select( FD_SETSIZE,
157 &fdRead, &fdWrite, &fdException, &xTimeout ) ) < 0 )
158 {
159 throw ExceptionBase("Error attempting to scan open connections.");
160 }
161
162 while( !qMsg.isEmpty() )
163 {
164 imProto.lock();
165 Bu::String *pMsg = qMsg.dequeue();
166 pClient->onMessage( *pMsg );
167 delete pMsg;
168 pClient->processOutput();
169 imProto.unlock();
170 }
171
172 if( FD_ISSET( iSocket, &fdRead ) )
173 {
174 imProto.lock();
175 pClient->processInput();
176 imProto.unlock();
177 if( !pClient->isOpen() )
178 {
179 imProto.lock();
180 rSrv.onClosedConnection( pClient );
181 imProto.unlock();
182
183 rSrv.clientCleanup( iSocket );
184
185 return;
186 }
187 }
188
189 if( FD_ISSET( iSocket, &fdWrite ) )
190 {
191 imProto.lock();
192 pClient->processOutput();
193 imProto.unlock();
194 }
195 }
196}
197
198Bu::ItoServer::SrvClientLink::SrvClientLink( ItoClient *pClient ) :
199 pClient( pClient )
200{
201}
202
203Bu::ItoServer::SrvClientLink::~SrvClientLink()
204{
205}
206
207void Bu::ItoServer::SrvClientLink::sendMessage( const Bu::String &sMsg )
208{
209 if( !pClient->imProto.trylock() )
210 {
211 pClient->pClient->onMessage( sMsg );
212 pClient->pClient->processOutput();
213 pClient->imProto.unlock();
214 }
215 else
216 {
217 Bu::String *pMsg = new Bu::String( sMsg );
218 pClient->qMsg.enqueue( pMsg );
219 }
220}
221
222Bu::ItoServer::SrvClientLinkFactory::SrvClientLinkFactory(
223 Bu::ItoServer &rSrv ) :
224 rSrv( rSrv )
225{
226}
227
228Bu::ItoServer::SrvClientLinkFactory::~SrvClientLinkFactory()
229{
230}
231
232Bu::ClientLink *Bu::ItoServer::SrvClientLinkFactory::createLink(
233 Bu::Client *pClient )
234{
235 rSrv.imClients.lock();
236 ItoClient *pCli = rSrv.hClients.get( *pClient->getSocket() );
237 rSrv.imClients.unlock();
238
239 return new SrvClientLink( pCli );
240}
241
diff --git a/src/unstable/itoserver.h b/src/unstable/itoserver.h
deleted file mode 100644
index f5e4a71..0000000
--- a/src/unstable/itoserver.h
+++ /dev/null
@@ -1,147 +0,0 @@
1/*
2 * Copyright (C) 2007-2019 Xagasoft, All rights reserved.
3 *
4 * This file is part of the libbu++ library and is released under the
5 * terms of the license contained in the file LICENSE.
6 */
7
8#ifndef BU_ITO_SERVER_H
9#define BU_ITO_SERVER_H
10
11#include <stdint.h>
12
13#ifndef WIN32
14 #include <sys/select.h>
15#endif
16
17#include "bu/string.h"
18#include "bu/list.h"
19#include "bu/thread.h"
20#include "bu/mutex.h"
21#include "bu/synchroqueue.h"
22#include "bu/hash.h"
23
24#include "bu/clientlink.h"
25#include "bu/clientlinkfactory.h"
26
27namespace Bu
28{
29 class TcpServerSocket;
30 class TcpSocket;
31 class Client;
32
33 /**
34 * Core of a network server. This class is distinct from a ServerSocket in
35 * that a ServerSocket is one listening socket, nothing more. Socket will
36 * manage a pool of both ServerSockets and connected Sockets along with
37 * their protocols and buffers.
38 *
39 * To start serving on a new port, use the addPort functions. Each call to
40 * addPort creates a new ServerSocket, starts it listening, and adds it to
41 * the server pool.
42 *
43 * All of the real work is done by scan, which will wait for up
44 * to the timeout set by setTimeout before returning if there is no data
45 * pending. scan should probably be called in some sort of tight
46 * loop, possibly in it's own thread, or in the main control loop.
47 *
48 * In order to use a Server you must subclass it and implement the pure
49 * virtual functions. These allow you to receive notification of events
50 * happening within the server itself, and actually makes it useful.
51 *@ingroup Threading Serving
52 */
53 class ItoServer : public Thread
54 {
55 friend class ItoClient;
56 friend class SrvClientLinkFactory;
57 public:
58 ItoServer();
59 virtual ~ItoServer();
60
61#ifdef WIN32
62 typedef unsigned int socket_t;
63#else
64 typedef int socket_t;
65#endif
66
67 void addPort( int nPort, int nPoolSize=40 );
68 void addPort( const String &sAddr, int nPort, int nPoolSize=40 );
69
70 //void scan();
71 void setTimeout( int nTimeoutSec, int nTimeoutUSec=0 );
72
73 void addClient( socket_t nSocket, int nPort );
74
75 virtual void onNewConnection( Client *pClient, int nPort )=0;
76 virtual void onClosedConnection( Client *pClient )=0;
77
78 protected:
79 virtual void run();
80
81 private:
82 class SrvClientLink;
83 class ItoClient : public Thread
84 {
85 friend class Bu::ItoServer::SrvClientLink;
86 public:
87 ItoClient( ItoServer &rSrv, socket_t nSocket, int nPort,
88 int nTimeoutSec, int nTimeoutUSec );
89 virtual ~ItoClient();
90
91 typedef SynchroQueue<Bu::String *> StringQueue;
92 StringQueue qMsg;
93
94 protected:
95 virtual void run();
96
97 private:
98 ItoServer &rSrv;
99 Client *pClient;
100 fd_set fdActive;
101 socket_t iSocket;
102 int iPort;
103 int nTimeoutSec;
104 int nTimeoutUSec;
105 Mutex imProto;
106 };
107
108 class SrvClientLink : public Bu::ClientLink
109 {
110 public:
111 SrvClientLink( ItoClient *pClient );
112 virtual ~SrvClientLink();
113
114 virtual void sendMessage( const Bu::String &sMsg );
115
116 private:
117 ItoClient *pClient;
118 };
119
120 class SrvClientLinkFactory : public Bu::ClientLinkFactory
121 {
122 public:
123 SrvClientLinkFactory( ItoServer &rSrv );
124 virtual ~SrvClientLinkFactory();
125
126 virtual Bu::ClientLink *createLink( Bu::Client *pClient );
127
128 private:
129 ItoServer &rSrv;
130 };
131
132 int nTimeoutSec;
133 int nTimeoutUSec;
134 fd_set fdActive;
135 typedef Hash<socket_t,TcpServerSocket *> ServerHash;
136 ServerHash hServers;
137 typedef Hash<socket_t,ItoClient *> ClientHash;
138 typedef SynchroQueue<ItoClient *> ClientQueue;
139 ClientHash hClients;
140 ClientQueue qClientCleanup;
141 Mutex imClients;
142
143 void clientCleanup( socket_t iSocket );
144 };
145}
146
147#endif