aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Buland <eichlan@xagasoft.com>2007-09-11 04:05:26 +0000
committerMike Buland <eichlan@xagasoft.com>2007-09-11 04:05:26 +0000
commitc82dc43edc9fd913e8ddb20bebe778781ec0d6f7 (patch)
tree101382ab930a2fa895a0ca3ea9e781ac71a1a625
parentace50b182f318b96a87505aa3d6b509959d49544 (diff)
downloadlibbu++-c82dc43edc9fd913e8ddb20bebe778781ec0d6f7.tar.gz
libbu++-c82dc43edc9fd913e8ddb20bebe778781ec0d6f7.tar.bz2
libbu++-c82dc43edc9fd913e8ddb20bebe778781ec0d6f7.tar.xz
libbu++-c82dc43edc9fd913e8ddb20bebe778781ec0d6f7.zip
Everything seems to work with the new Bu::ItoServer class, it operates very,
very similarly to the Bu::Server class, except that every incoming connection gets it's own thread. This functionality may have to be tuned later, to allow for maintaining a pool of connections as an option, but this is fine for now.
Diffstat (limited to '')
-rw-r--r--build.conf2
-rw-r--r--src/itoserver.cpp178
-rw-r--r--src/itoserver.h89
-rw-r--r--src/tests/itoserver.cpp69
-rw-r--r--src/trace.h10
5 files changed, 347 insertions, 1 deletions
diff --git a/build.conf b/build.conf
index 36f1f8b..2f21860 100644
--- a/build.conf
+++ b/build.conf
@@ -40,7 +40,7 @@ filesIn("src/tests") filter regexp("^src/tests/(.*)\\.cpp$", "tests/{re:1}"):
40 set "LDFLAGS" += "-L. -lbu++", 40 set "LDFLAGS" += "-L. -lbu++",
41 input "src/{target}.cpp" 41 input "src/{target}.cpp"
42 42
43["tests/itoqueue1", "tests/itoqueue2", "tests/socketblock"]: 43["tests/itoqueue1", "tests/itoqueue2", "tests/socketblock", "tests/itoserver"]:
44 set "LDFLAGS" += "-lpthread" 44 set "LDFLAGS" += "-lpthread"
45 45
46filesIn("src/unit") filter regexp("^src/unit/(.*)\\.cpp$", "unit/{re:1}"): 46filesIn("src/unit") filter regexp("^src/unit/(.*)\\.cpp$", "unit/{re:1}"):
diff --git a/src/itoserver.cpp b/src/itoserver.cpp
new file mode 100644
index 0000000..0337057
--- /dev/null
+++ b/src/itoserver.cpp
@@ -0,0 +1,178 @@
1#include "bu/itoserver.h"
2#include <errno.h>
3#include "bu/serversocket.h"
4#include "bu/client.h"
5#include "bu/socket.h"
6#include "osx_compatibility.h"
7
8Bu::ItoServer::ItoServer() :
9 nTimeoutSec( 1 ),
10 nTimeoutUSec( 0 )
11{
12 FD_ZERO( &fdActive );
13}
14
15Bu::ItoServer::~ItoServer()
16{
17}
18
19void Bu::ItoServer::addPort( int nPort, int nPoolSize )
20{
21 ServerSocket *s = new ServerSocket( nPort, nPoolSize );
22 int nSocket = s->getSocket();
23 FD_SET( nSocket, &fdActive );
24 hServers.insert( nSocket, s );
25}
26
27void Bu::ItoServer::addPort( const FString &sAddr, int nPort, int nPoolSize )
28{
29 ServerSocket *s = new ServerSocket( sAddr, nPort, nPoolSize );
30 int nSocket = s->getSocket();
31 FD_SET( nSocket, &fdActive );
32 hServers.insert( nSocket, s );
33}
34
35void Bu::ItoServer::setTimeout( int nTimeoutSec, int nTimeoutUSec )
36{
37 this->nTimeoutSec = nTimeoutSec;
38 this->nTimeoutUSec = nTimeoutUSec;
39}
40/*
41void Bu::ItoServer::scan()
42{
43 struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec };
44
45 fd_set fdRead = fdActive;
46 fd_set fdWrite = fdActive;
47 fd_set fdException = fdActive;
48
49 if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, &fdRead, NULL, &fdException, &xTimeout ) ) < 0 )
50 {
51 throw ExceptionBase("Error attempting to scan open connections.");
52 }
53
54 for( int j = 0; j < FD_SETSIZE; j++ )
55 {
56 if( FD_ISSET( j, &fdRead ) )
57 {
58 if( hServers.has( j ) )
59 {
60 ServerSocket *pSrv = hServers.get( j );
61 addClient( pSrv->accept(), pSrv->getPort() );
62 }
63 else
64 {
65 Client *pClient = hClients.get( j );
66 pClient->processInput();
67 if( !pClient->isOpen() )
68 {
69 onClosedConnection( pClient );
70 hClients.erase( j );
71 FD_CLR( j, &fdActive );
72 }
73 }
74 }
75 }
76
77 // Now we just try to write all the pending data on all the sockets.
78 // this could be done better eventually, if we care about the socket
79 // wanting to accept writes (using a select).
80 for( ClientHash::iterator i = hClients.begin(); i != hClients.end(); i++ )
81 {
82 (*i)->processOutput();
83 }
84}
85*/
86void Bu::ItoServer::addClient( int nSocket, int nPort )
87{
88 ItoClient *pC = new ItoClient( *this, nSocket, nPort, nTimeoutSec,
89 nTimeoutUSec );
90 pC->start();
91
92}
93
94void *Bu::ItoServer::run()
95{
96 for(;;)
97 {
98 struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec };
99
100 fd_set fdRead = fdActive;
101 fd_set fdWrite = fdActive;
102 fd_set fdException = fdActive;
103
104 if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, &fdRead, NULL, &fdException, &xTimeout ) ) < 0 )
105 {
106 throw ExceptionBase("Error attempting to scan open connections.");
107 }
108
109 for( ServerHash::iterator i = hServers.begin(); i != hServers.end(); i++ )
110 {
111 if( FD_ISSET( i.getKey(), &fdRead ) )
112 {
113 ServerSocket *pSrv = i.getValue();
114 addClient( pSrv->accept(), pSrv->getPort() );
115 }
116 }
117 }
118
119 return NULL;
120}
121
122Bu::ItoServer::ItoClient::ItoClient( ItoServer &rSrv, int iSocket, int iPort,
123 int nTimeoutSec, int nTimeoutUSec ) :
124 rSrv( rSrv ),
125 iSocket( iSocket ),
126 iPort( iPort ),
127 nTimeoutSec( nTimeoutSec ),
128 nTimeoutUSec( nTimeoutUSec )
129{
130 FD_ZERO( &fdActive );
131 FD_SET( iSocket, &fdActive );
132
133 pClient = new Client(
134 new Bu::Socket( iSocket )
135 );
136
137}
138
139Bu::ItoServer::ItoClient::~ItoClient()
140{
141}
142
143void *Bu::ItoServer::ItoClient::run()
144{
145 rSrv.onNewConnection( pClient, iPort );
146
147 for(;;)
148 {
149 struct timeval xTimeout = { nTimeoutSec, nTimeoutUSec };
150
151 fd_set fdRead = fdActive;
152 fd_set fdException = fdActive;
153
154 if( TEMP_FAILURE_RETRY( select( FD_SETSIZE, &fdRead, NULL, &fdException, &xTimeout ) ) < 0 )
155 {
156 throw ExceptionBase("Error attempting to scan open connections.");
157 }
158
159 if( FD_ISSET( iSocket, &fdRead ) )
160 {
161 pClient->processInput();
162 if( !pClient->isOpen() )
163 {
164 rSrv.onClosedConnection( pClient );
165
166 return NULL;
167 }
168 }
169
170 // Now we just try to write all the pending data on the socket.
171 // this could be done better eventually, if we care about the socket
172 // wanting to accept writes (using a select).
173 pClient->processOutput();
174 }
175
176 return NULL;
177}
178
diff --git a/src/itoserver.h b/src/itoserver.h
new file mode 100644
index 0000000..19c34ca
--- /dev/null
+++ b/src/itoserver.h
@@ -0,0 +1,89 @@
1#ifndef BU_ITO_SERVER_H
2#define BU_ITO_SERVER_H
3
4#include <stdint.h>
5
6#include "bu/fstring.h"
7#include "bu/list.h"
8#include "bu/ito.h"
9#include "bu/itomutex.h"
10#include "bu/set.h"
11
12namespace Bu
13{
14 class ServerSocket;
15 class Socket;
16 class Client;
17
18 /**
19 * Core of a network server. This class is distinct from a ServerSocket in
20 * that a ServerSocket is one listening socket, nothing more. Socket will
21 * manage a pool of both ServerSockets and connected Sockets along with
22 * their protocols and buffers.
23 *
24 * To start serving on a new port, use the addPort functions. Each call to
25 * addPort creates a new ServerSocket, starts it listening, and adds it to
26 * the server pool.
27 *
28 * All of the real work is done by scan, which will wait for up
29 * to the timeout set by setTimeout before returning if there is no data
30 * pending. scan should probably be called in some sort of tight
31 * loop, possibly in it's own thread, or in the main control loop.
32 *
33 * In order to use a Server you must subclass it and implement the pure
34 * virtual functions. These allow you to receive notification of events
35 * happening within the server itself, and actually makes it useful.
36 */
37 class ItoServer : public Ito
38 {
39 public:
40 ItoServer();
41 virtual ~ItoServer();
42
43 void addPort( int nPort, int nPoolSize=40 );
44 void addPort( const FString &sAddr, int nPort, int nPoolSize=40 );
45
46 //void scan();
47 void setTimeout( int nTimeoutSec, int nTimeoutUSec=0 );
48
49 void addClient( int nSocket, int nPort );
50
51 virtual void onNewConnection( Client *pClient, int nPort )=0;
52 virtual void onClosedConnection( Client *pClient )=0;
53
54 virtual void *run();
55
56 private:
57 class ItoClient : public Ito
58 {
59 public:
60 ItoClient( ItoServer &rSrv, int nSocket, int nPort,
61 int nTimeoutSec, int nTimeoutUSec );
62 virtual ~ItoClient();
63
64 virtual void *run();
65
66 private:
67 ItoServer &rSrv;
68 Client *pClient;
69 fd_set fdActive;
70 int iSocket;
71 int iPort;
72 int nTimeoutSec;
73 int nTimeoutUSec;
74 };
75
76 int nTimeoutSec;
77 int nTimeoutUSec;
78 fd_set fdActive;
79 typedef Hash<int,ServerSocket *> ServerHash;
80 ServerHash hServers;
81 //typedef Bu::Set<ItoClient *> ClientSet;
82 //ClientSet sClients;
83 //typedef Hash<int,Client *> ClientHash;
84 //ClientHash hClients;
85 ItoMutex im;
86 };
87}
88
89#endif
diff --git a/src/tests/itoserver.cpp b/src/tests/itoserver.cpp
new file mode 100644
index 0000000..4f5c644
--- /dev/null
+++ b/src/tests/itoserver.cpp
@@ -0,0 +1,69 @@
1#include "bu/itoserver.h"
2#include "bu/protocol.h"
3#include "bu/client.h"
4
5#define BU_TRACE
6#include "bu/trace.h"
7
8class ProtocolEcho : public Bu::Protocol
9{
10public:
11 ProtocolEcho()
12 {
13 TRACE();
14 }
15 virtual ~ProtocolEcho()
16 {
17 TRACE();
18 }
19
20 virtual void onNewConnection( Bu::Client *pClient )
21 {
22 TRACE();
23 // Huh...
24 }
25
26 virtual void onNewData( Bu::Client *pClient )
27 {
28 TRACE();
29 pClient->write( pClient->getInput().getStr(), pClient->getInputSize() );
30 pClient->seek( pClient->getInputSize() );
31 }
32};
33
34class TestServer : public Bu::ItoServer
35{
36public:
37 TestServer()
38 {
39 TRACE();
40 }
41 virtual ~TestServer()
42 {
43 TRACE();
44 }
45
46 virtual void onNewConnection( Bu::Client *pClient, int iPort )
47 {
48 TRACE();
49 pClient->setProtocol( new ProtocolEcho() );
50 }
51
52 virtual void onClosedConnection( Bu::Client *pClient )
53 {
54 TRACE();
55 delete pClient->getProtocol();
56 }
57};
58
59int main()
60{
61 TRACE();
62
63 TestServer ts;
64
65 ts.addPort( 5555 );
66 ts.start();
67 ts.join();
68}
69
diff --git a/src/trace.h b/src/trace.h
new file mode 100644
index 0000000..d7a8485
--- /dev/null
+++ b/src/trace.h
@@ -0,0 +1,10 @@
1#ifndef BU_TRACE_H
2#define BU_TRACE_H
3
4#ifdef BU_TRACE
5# define TRACE() printf("trace: %s\n", __PRETTY_FUNCTION__ )
6#else
7# define TRACE() {}
8#endif
9
10#endif