blob: dff6a2fc725c97d2345fa98175fd7a0565a9f09c [file] [log] [blame]
Jake Slack03928ae2014-05-13 18:41:56 -07001//
2// ========================================================================
3// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4// ------------------------------------------------------------------------
5// All rights reserved. This program and the accompanying materials
6// are made available under the terms of the Eclipse Public License v1.0
7// and Apache License v2.0 which accompanies this distribution.
8//
9// The Eclipse Public License is available at
10// http://www.eclipse.org/legal/epl-v10.html
11//
12// The Apache License v2.0 is available at
13// http://www.opensource.org/licenses/apache2.0.php
14//
15// You may elect to redistribute this code under either of these licenses.
16// ========================================================================
17//
18
19package org.eclipse.jetty.websocket;
20
21import java.io.IOException;
22import java.net.InetSocketAddress;
23import java.net.ProtocolException;
24import java.net.SocketAddress;
25import java.net.URI;
26import java.nio.channels.ByteChannel;
27import java.nio.channels.SocketChannel;
28import java.util.List;
29import java.util.Map;
30import java.util.concurrent.ConcurrentHashMap;
31import java.util.concurrent.CopyOnWriteArrayList;
32import java.util.concurrent.CountDownLatch;
33import java.util.concurrent.ExecutionException;
34import java.util.concurrent.Future;
35import java.util.concurrent.TimeUnit;
36import java.util.concurrent.TimeoutException;
37
38import org.eclipse.jetty.util.IO;
39import org.eclipse.jetty.util.log.Logger;
40
41
42/* ------------------------------------------------------------ */
43/**
44 * <p>{@link WebSocketClient} allows to create multiple connections to multiple destinations
45 * that can speak the websocket protocol.</p>
46 * <p>When creating websocket connections, {@link WebSocketClient} accepts a {@link WebSocket}
47 * object (to receive events from the server), and returns a {@link WebSocket.Connection} to
48 * send data to the server.</p>
49 * <p>Example usage is as follows:</p>
50 * <pre>
51 * WebSocketClientFactory factory = new WebSocketClientFactory();
52 * factory.start();
53 *
54 * WebSocketClient client = factory.newWebSocketClient();
55 * // Configure the client
56 *
57 * WebSocket.Connection connection = client.open(new URI("ws://127.0.0.1:8080/"), new WebSocket.OnTextMessage()
58 * {
59 * public void onOpen(Connection connection)
60 * {
61 * // open notification
62 * }
63 *
64 * public void onClose(int closeCode, String message)
65 * {
66 * // close notification
67 * }
68 *
69 * public void onMessage(String data)
70 * {
71 * // handle incoming message
72 * }
73 * }).get(5, TimeUnit.SECONDS);
74 *
75 * connection.sendMessage("Hello World");
76 * </pre>
77 */
78public class WebSocketClient
79{
80 private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClient.class.getName());
81
82 private final WebSocketClientFactory _factory;
83 private final Map<String,String> _cookies=new ConcurrentHashMap<String, String>();
84 private final List<String> _extensions=new CopyOnWriteArrayList<String>();
85 private String _origin;
86 private String _protocol;
87 private int _maxIdleTime=-1;
88 private int _maxTextMessageSize=16*1024;
89 private int _maxBinaryMessageSize=-1;
90 private MaskGen _maskGen;
91 private SocketAddress _bindAddress;
92
93 /* ------------------------------------------------------------ */
94 /**
95 * <p>Creates a WebSocketClient from a private WebSocketClientFactory.</p>
96 * <p>This can be wasteful of resources if many clients are created.</p>
97 *
98 * @deprecated Use {@link WebSocketClientFactory#newWebSocketClient()}
99 * @throws Exception if the private WebSocketClientFactory fails to start
100 */
101 @Deprecated
102 public WebSocketClient() throws Exception
103 {
104 _factory=new WebSocketClientFactory();
105 _factory.start();
106 _maskGen=_factory.getMaskGen();
107 }
108
109 /* ------------------------------------------------------------ */
110 /**
111 * <p>Creates a WebSocketClient with shared WebSocketClientFactory.</p>
112 *
113 * @param factory the shared {@link WebSocketClientFactory}
114 */
115 public WebSocketClient(WebSocketClientFactory factory)
116 {
117 _factory=factory;
118 _maskGen=_factory.getMaskGen();
119 }
120
121 /* ------------------------------------------------------------ */
122 /**
123 * @return The WebSocketClientFactory this client was created with.
124 */
125 public WebSocketClientFactory getFactory()
126 {
127 return _factory;
128 }
129
130 /* ------------------------------------------------------------ */
131 /**
132 * @return the address to bind the socket channel to
133 * @see #setBindAddress(SocketAddress)
134 */
135 public SocketAddress getBindAddress()
136 {
137 return _bindAddress;
138 }
139
140 /* ------------------------------------------------------------ */
141 /**
142 * @param bindAddress the address to bind the socket channel to
143 * @see #getBindAddress()
144 */
145 public void setBindAddress(SocketAddress bindAddress)
146 {
147 this._bindAddress = bindAddress;
148 }
149
150 /* ------------------------------------------------------------ */
151 /**
152 * @return The maxIdleTime in ms for connections opened by this client,
153 * or -1 if the default from {@link WebSocketClientFactory#getSelectorManager()} is used.
154 * @see #setMaxIdleTime(int)
155 */
156 public int getMaxIdleTime()
157 {
158 return _maxIdleTime;
159 }
160
161 /* ------------------------------------------------------------ */
162 /**
163 * @param maxIdleTime The max idle time in ms for connections opened by this client
164 * @see #getMaxIdleTime()
165 */
166 public void setMaxIdleTime(int maxIdleTime)
167 {
168 _maxIdleTime=maxIdleTime;
169 }
170
171 /* ------------------------------------------------------------ */
172 /**
173 * @return The subprotocol string for connections opened by this client.
174 * @see #setProtocol(String)
175 */
176 public String getProtocol()
177 {
178 return _protocol;
179 }
180
181 /* ------------------------------------------------------------ */
182 /**
183 * @param protocol The subprotocol string for connections opened by this client.
184 * @see #getProtocol()
185 */
186 public void setProtocol(String protocol)
187 {
188 _protocol = protocol;
189 }
190
191 /* ------------------------------------------------------------ */
192 /**
193 * @return The origin URI of the client
194 * @see #setOrigin(String)
195 */
196 public String getOrigin()
197 {
198 return _origin;
199 }
200
201 /* ------------------------------------------------------------ */
202 /**
203 * @param origin The origin URI of the client (eg "http://example.com")
204 * @see #getOrigin()
205 */
206 public void setOrigin(String origin)
207 {
208 _origin = origin;
209 }
210
211 /* ------------------------------------------------------------ */
212 /**
213 * <p>Returns the map of the cookies that are sent during the initial HTTP handshake
214 * that upgrades to the websocket protocol.</p>
215 * @return The read-write cookie map
216 */
217 public Map<String,String> getCookies()
218 {
219 return _cookies;
220 }
221
222 /* ------------------------------------------------------------ */
223 /**
224 * @return The list of websocket protocol extensions
225 */
226 public List<String> getExtensions()
227 {
228 return _extensions;
229 }
230
231 /* ------------------------------------------------------------ */
232 /**
233 * @return the mask generator to use, or null if not mask generator should be used
234 * @see #setMaskGen(MaskGen)
235 */
236 public MaskGen getMaskGen()
237 {
238 return _maskGen;
239 }
240
241 /* ------------------------------------------------------------ */
242 /**
243 * @param maskGen the mask generator to use, or null if not mask generator should be used
244 * @see #getMaskGen()
245 */
246 public void setMaskGen(MaskGen maskGen)
247 {
248 _maskGen = maskGen;
249 }
250
251 /* ------------------------------------------------------------ */
252 /**
253 * @return The initial maximum text message size (in characters) for a connection
254 */
255 public int getMaxTextMessageSize()
256 {
257 return _maxTextMessageSize;
258 }
259
260 /* ------------------------------------------------------------ */
261 /**
262 * Set the initial maximum text message size for a connection. This can be changed by
263 * the application calling {@link WebSocket.Connection#setMaxTextMessageSize(int)}.
264 * @param maxTextMessageSize The default maximum text message size (in characters) for a connection
265 */
266 public void setMaxTextMessageSize(int maxTextMessageSize)
267 {
268 _maxTextMessageSize = maxTextMessageSize;
269 }
270
271 /* ------------------------------------------------------------ */
272 /**
273 * @return The initial maximum binary message size (in bytes) for a connection
274 */
275 public int getMaxBinaryMessageSize()
276 {
277 return _maxBinaryMessageSize;
278 }
279
280 /* ------------------------------------------------------------ */
281 /**
282 * Set the initial maximum binary message size for a connection. This can be changed by
283 * the application calling {@link WebSocket.Connection#setMaxBinaryMessageSize(int)}.
284 * @param maxBinaryMessageSize The default maximum binary message size (in bytes) for a connection
285 */
286 public void setMaxBinaryMessageSize(int maxBinaryMessageSize)
287 {
288 _maxBinaryMessageSize = maxBinaryMessageSize;
289 }
290
291 /* ------------------------------------------------------------ */
292 /**
293 * <p>Opens a websocket connection to the URI and blocks until the connection is accepted or there is an error.</p>
294 *
295 * @param uri The URI to connect to.
296 * @param websocket The {@link WebSocket} instance to handle incoming events.
297 * @param maxConnectTime The interval to wait for a successful connection
298 * @param units the units of the maxConnectTime
299 * @return A {@link WebSocket.Connection}
300 * @throws IOException if the connection fails
301 * @throws InterruptedException if the thread is interrupted
302 * @throws TimeoutException if the timeout elapses before the connection is completed
303 * @see #open(URI, WebSocket)
304 */
305 public WebSocket.Connection open(URI uri, WebSocket websocket,long maxConnectTime,TimeUnit units) throws IOException, InterruptedException, TimeoutException
306 {
307 try
308 {
309 return open(uri,websocket).get(maxConnectTime,units);
310 }
311 catch (ExecutionException e)
312 {
313 Throwable cause = e.getCause();
314 if (cause instanceof IOException)
315 throw (IOException)cause;
316 if (cause instanceof Error)
317 throw (Error)cause;
318 if (cause instanceof RuntimeException)
319 throw (RuntimeException)cause;
320 throw new RuntimeException(cause);
321 }
322 }
323
324 /* ------------------------------------------------------------ */
325 /**
326 * <p>Asynchronously opens a websocket connection and returns a {@link Future} to obtain the connection.</p>
327 * <p>The caller must call {@link Future#get(long, TimeUnit)} if they wish to impose a connect timeout on the open.</p>
328 *
329 * @param uri The URI to connect to.
330 * @param websocket The {@link WebSocket} instance to handle incoming events.
331 * @return A {@link Future} to the {@link WebSocket.Connection}
332 * @throws IOException if the connection fails
333 * @see #open(URI, WebSocket, long, TimeUnit)
334 */
335 public Future<WebSocket.Connection> open(URI uri, WebSocket websocket) throws IOException
336 {
337 if (!_factory.isStarted())
338 throw new IllegalStateException("Factory !started");
339
340 InetSocketAddress address = toSocketAddress(uri);
341
342 SocketChannel channel = null;
343 try
344 {
345 channel = SocketChannel.open();
346 if (_bindAddress != null)
347 channel.socket().bind(_bindAddress);
348 channel.socket().setTcpNoDelay(true);
349
350 WebSocketFuture holder = new WebSocketFuture(websocket,uri,this,channel);
351
352 channel.configureBlocking(false);
353 channel.connect(address);
354 _factory.getSelectorManager().register(channel,holder);
355
356 return holder;
357 }
358 catch (RuntimeException e)
359 {
360 // close the channel (prevent connection leak)
361 IO.close(channel);
362
363 // rethrow
364 throw e;
365 }
366 catch(IOException e)
367 {
368 // close the channel (prevent connection leak)
369 IO.close(channel);
370
371 // rethrow
372 throw e;
373 }
374 }
375
376 public static InetSocketAddress toSocketAddress(URI uri)
377 {
378 String scheme = uri.getScheme();
379 if (!("ws".equalsIgnoreCase(scheme) || "wss".equalsIgnoreCase(scheme)))
380 throw new IllegalArgumentException("Bad WebSocket scheme: " + scheme);
381 int port = uri.getPort();
382 if (port == 0)
383 throw new IllegalArgumentException("Bad WebSocket port: " + port);
384 if (port < 0)
385 port = "ws".equals(scheme) ? 80 : 443;
386
387 return new InetSocketAddress(uri.getHost(), port);
388 }
389
390 /* ------------------------------------------------------------ */
391 /** The Future Websocket Connection.
392 */
393 static class WebSocketFuture implements Future<WebSocket.Connection>
394 {
395 final WebSocket _websocket;
396 final URI _uri;
397 final WebSocketClient _client;
398 final CountDownLatch _done = new CountDownLatch(1);
399 ByteChannel _channel;
400 WebSocketConnection _connection;
401 Throwable _exception;
402
403 private WebSocketFuture(WebSocket websocket, URI uri, WebSocketClient client, ByteChannel channel)
404 {
405 _websocket=websocket;
406 _uri=uri;
407 _client=client;
408 _channel=channel;
409 }
410
411 public void onConnection(WebSocketConnection connection)
412 {
413 try
414 {
415 _client.getFactory().addConnection(connection);
416
417 connection.getConnection().setMaxTextMessageSize(_client.getMaxTextMessageSize());
418 connection.getConnection().setMaxBinaryMessageSize(_client.getMaxBinaryMessageSize());
419
420 WebSocketConnection con;
421 synchronized (this)
422 {
423 if (_channel!=null)
424 _connection=connection;
425 con=_connection;
426 }
427
428 if (con!=null)
429 {
430 if (_websocket instanceof WebSocket.OnFrame)
431 ((WebSocket.OnFrame)_websocket).onHandshake((WebSocket.FrameConnection)con.getConnection());
432
433 _websocket.onOpen(con.getConnection());
434 }
435 }
436 finally
437 {
438 _done.countDown();
439 }
440 }
441
442 public void handshakeFailed(Throwable ex)
443 {
444 try
445 {
446 ByteChannel channel=null;
447 synchronized (this)
448 {
449 if (_channel!=null)
450 {
451 channel=_channel;
452 _channel=null;
453 _exception=ex;
454 }
455 }
456
457 if (channel!=null)
458 {
459 if (ex instanceof ProtocolException)
460 closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_PROTOCOL,ex.getMessage());
461 else
462 closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,ex.getMessage());
463 }
464 }
465 finally
466 {
467 _done.countDown();
468 }
469 }
470
471 public Map<String,String> getCookies()
472 {
473 return _client.getCookies();
474 }
475
476 public String getProtocol()
477 {
478 return _client.getProtocol();
479 }
480
481 public WebSocket getWebSocket()
482 {
483 return _websocket;
484 }
485
486 public URI getURI()
487 {
488 return _uri;
489 }
490
491 public int getMaxIdleTime()
492 {
493 return _client.getMaxIdleTime();
494 }
495
496 public String getOrigin()
497 {
498 return _client.getOrigin();
499 }
500
501 public MaskGen getMaskGen()
502 {
503 return _client.getMaskGen();
504 }
505
506 @Override
507 public String toString()
508 {
509 return "[" + _uri + ","+_websocket+"]@"+hashCode();
510 }
511
512 public boolean cancel(boolean mayInterruptIfRunning)
513 {
514 try
515 {
516 ByteChannel channel=null;
517 synchronized (this)
518 {
519 if (_connection==null && _exception==null && _channel!=null)
520 {
521 channel=_channel;
522 _channel=null;
523 }
524 }
525
526 if (channel!=null)
527 {
528 closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,"cancelled");
529 return true;
530 }
531 return false;
532 }
533 finally
534 {
535 _done.countDown();
536 }
537 }
538
539 public boolean isCancelled()
540 {
541 synchronized (this)
542 {
543 return _channel==null && _connection==null;
544 }
545 }
546
547 public boolean isDone()
548 {
549 synchronized (this)
550 {
551 return _connection!=null && _exception==null;
552 }
553 }
554
555 public org.eclipse.jetty.websocket.WebSocket.Connection get() throws InterruptedException, ExecutionException
556 {
557 try
558 {
559 return get(Long.MAX_VALUE,TimeUnit.SECONDS);
560 }
561 catch(TimeoutException e)
562 {
563 throw new IllegalStateException("The universe has ended",e);
564 }
565 }
566
567 public org.eclipse.jetty.websocket.WebSocket.Connection get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
568 TimeoutException
569 {
570 _done.await(timeout,unit);
571 ByteChannel channel=null;
572 org.eclipse.jetty.websocket.WebSocket.Connection connection=null;
573 Throwable exception;
574 synchronized (this)
575 {
576 exception=_exception;
577 if (_connection==null)
578 {
579 exception=_exception;
580 channel=_channel;
581 _channel=null;
582 }
583 else
584 connection=_connection.getConnection();
585 }
586
587 if (channel!=null)
588 closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,"timeout");
589 if (exception!=null)
590 throw new ExecutionException(exception);
591 if (connection!=null)
592 return connection;
593 throw new TimeoutException();
594 }
595
596 private void closeChannel(ByteChannel channel,int code, String message)
597 {
598 try
599 {
600 _websocket.onClose(code,message);
601 }
602 catch(Exception e)
603 {
604 __log.warn(e);
605 }
606
607 try
608 {
609 channel.close();
610 }
611 catch(IOException e)
612 {
613 __log.debug(e);
614 }
615 }
616 }
617}