blob: e8aeb078b12b8f7a6643108c463a40cfe40932f5 [file] [log] [blame]
J. Duke319a3b92007-12-01 00:00:00 +00001/*
2 * Copyright 1999-2001 Sun Microsystems, Inc. All Rights Reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 *
8 * - Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 *
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 *
15 * - Neither the name of Sun Microsystems nor the names of its
16 * contributors may be used to endorse or promote products derived
17 * from this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
20 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
23 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
24 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
25 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
26 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
27 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
28 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
29 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 */
31
32import java.io.*;
33import java.net.*;
34import java.lang.Byte;
35
36/**
37 * Simple Java "server" using the Poller class
38 * to multiplex on incoming connections. Note
39 * that handoff of events, via linked Q is not
40 * actually be a performance booster here, since
41 * the processing of events is cheaper than
42 * the overhead in scheduling/executing them.
43 * Although this demo does allow for concurrency
44 * in handling connections, it uses a rather
45 * primitive "gang scheduling" policy to keep
46 * the code simpler.
47 */
48
49public class PollingServer
50{
51 public final static int MAXCONN = 10000;
52 public final static int PORTNUM = 4444;
53 public final static int BYTESPEROP = 10;
54
55 /**
56 * This synchronization object protects access to certain
57 * data (bytesRead,eventsToProcess) by concurrent Consumer threads.
58 */
59 private final static Object eventSync = new Object();
60
61 private static InputStream[] instr = new InputStream[MAXCONN];
62 private static int[] mapping = new int[65535];
63 private static LinkedQueue linkedQ = new LinkedQueue();
64 private static int bytesRead = 0;
65 private static int bytesToRead;
66 private static int eventsToProcess=0;
67
68 public PollingServer(int concurrency) {
69 Socket[] sockArr = new Socket[MAXCONN];
70 long timestart, timestop;
71 short[] revents = new short[MAXCONN];
72 int[] fds = new int[MAXCONN];
73 int bytes;
74 Poller Mux;
75 int serverFd;
76 int totalConn=0;
77 int connects=0;
78
79 System.out.println ("Serv: Initializing port " + PORTNUM);
80 try {
81
82 ServerSocket skMain = new ServerSocket (PORTNUM);
83 /*
84 * Create the Poller object Mux, allow for up to MAXCONN
85 * sockets/filedescriptors to be polled.
86 */
87 Mux = new Poller(MAXCONN);
88 serverFd = Mux.add(skMain, Poller.POLLIN);
89
90 Socket ctrlSock = skMain.accept();
91
92 BufferedReader ctrlReader =
93 new BufferedReader(new InputStreamReader(ctrlSock.getInputStream()));
94 String ctrlString = ctrlReader.readLine();
95 bytesToRead = Integer.valueOf(ctrlString).intValue();
96 ctrlString = ctrlReader.readLine();
97 totalConn = Integer.valueOf(ctrlString).intValue();
98
99 System.out.println("Receiving " + bytesToRead + " bytes from " +
100 totalConn + " client connections");
101
102 timestart = System.currentTimeMillis();
103
104 /*
105 * Start the consumer threads to read data.
106 */
107 for (int consumerThread = 0;
108 consumerThread < concurrency; consumerThread++ ) {
109 new Consumer(consumerThread).start();
110 }
111
112 /*
113 * Take connections, read Data
114 */
115 int numEvents=0;
116
117 while ( bytesRead < bytesToRead ) {
118
119 int loopWaits=0;
120 while (eventsToProcess > 0) {
121 synchronized (eventSync) {
122 loopWaits++;
123 if (eventsToProcess <= 0) break;
124 try { eventSync.wait(); } catch (Exception e) {e.printStackTrace();};
125 }
126 }
127 if (loopWaits > 1)
128 System.out.println("Done waiting...loops = " + loopWaits +
129 " events " + numEvents +
130 " bytes read : " + bytesRead );
131
132 if (bytesRead >= bytesToRead) break; // may be done!
133
134 /*
135 * Wait for events
136 */
137 numEvents = Mux.waitMultiple(100, fds, revents);
138 synchronized (eventSync) {
139 eventsToProcess = numEvents;
140 }
141 /*
142 * Process all the events we got from Mux.waitMultiple
143 */
144 int cnt = 0;
145 while ( (cnt < numEvents) && (bytesRead < bytesToRead) ) {
146 int fd = fds[cnt];
147
148 if (revents[cnt] == Poller.POLLIN) {
149 if (fd == serverFd) {
150 /*
151 * New connection coming in on the ServerSocket
152 * Add the socket to the Mux, keep track of mapping
153 * the fdval returned by Mux.add to the connection.
154 */
155 sockArr[connects] = skMain.accept();
156 instr[connects] = sockArr[connects].getInputStream();
157 int fdval = Mux.add(sockArr[connects], Poller.POLLIN);
158 mapping[fdval] = connects;
159 synchronized(eventSync) {
160 eventsToProcess--; // just processed this one!
161 }
162 connects++;
163 } else {
164 /*
165 * We've got data from this client connection.
166 * Put it on the queue for the consumer threads to process.
167 */
168 linkedQ.put(new Integer(fd));
169 }
170 } else {
171 System.out.println("Got revents[" + cnt + "] == " + revents[cnt]);
172 }
173 cnt++;
174 }
175 }
176 timestop = System.currentTimeMillis();
177 System.out.println("Time for all reads (" + totalConn +
178 " sockets) : " + (timestop-timestart));
179
180 // Tell the client it can now go away
181 byte[] buff = new byte[BYTESPEROP];
182 ctrlSock.getOutputStream().write(buff,0,BYTESPEROP);
183
184 // Tell the cunsumer threads they can exit.
185 for (int cThread = 0; cThread < concurrency; cThread++ ) {
186 linkedQ.put(new Integer(-1));
187 }
188 } catch (Exception exc) { exc.printStackTrace(); }
189 }
190
191 /*
192 * main ... just check if a concurrency was specified
193 */
194 public static void main (String args[])
195 {
196 int concurrency;
197
198 if (args.length == 1)
199 concurrency = java.lang.Integer.valueOf(args[0]).intValue();
200 else
201 concurrency = Poller.getNumCPUs() + 1;
202 PollingServer server = new PollingServer(concurrency);
203 }
204
205 /*
206 * This class is for handling the Client data.
207 * The PollingServer spawns off a number of these based upon
208 * the number of CPUs (or concurrency argument).
209 * Each just loops grabbing events off the queue and
210 * processing them.
211 */
212 class Consumer extends Thread {
213 private int threadNumber;
214 public Consumer(int i) { threadNumber = i; }
215
216 public void run() {
217 byte[] buff = new byte[BYTESPEROP];
218 int bytes = 0;
219
220 InputStream instream;
221 while (bytesRead < bytesToRead) {
222 try {
223 Integer Fd = (Integer) linkedQ.take();
224 int fd = Fd.intValue();
225 if (fd == -1) break; /* got told we could exit */
226
227 /*
228 * We have to map the fd value returned from waitMultiple
229 * to the actual input stream associated with that fd.
230 * Take a look at how the Mux.add() was done to see how
231 * we stored that.
232 */
233 int map = mapping[fd];
234 instream = instr[map];
235 bytes = instream.read(buff,0,BYTESPEROP);
236 } catch (Exception e) { System.out.println(e.toString()); }
237
238 if (bytes > 0) {
239 /*
240 * Any real server would do some synchronized and some
241 * unsynchronized work on behalf of the client, and
242 * most likely send some data back...but this is a
243 * gross oversimplification.
244 */
245 synchronized(eventSync) {
246 bytesRead += bytes;
247 eventsToProcess--;
248 if (eventsToProcess <= 0) {
249 eventSync.notify();
250 }
251 }
252 }
253 }
254 }
255 }
256}