blob: 9dd2773914a21ed2cd641e055f4b8de1d564a6d1 [file] [log] [blame]
Jake Slack03928ae2014-05-13 18:41:56 -07001//
2// ========================================================================
3// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4// ------------------------------------------------------------------------
5// All rights reserved. This program and the accompanying materials
6// are made available under the terms of the Eclipse Public License v1.0
7// and Apache License v2.0 which accompanies this distribution.
8//
9// The Eclipse Public License is available at
10// http://www.eclipse.org/legal/epl-v10.html
11//
12// The Apache License v2.0 is available at
13// http://www.opensource.org/licenses/apache2.0.php
14//
15// You may elect to redistribute this code under either of these licenses.
16// ========================================================================
17//
18
19package org.eclipse.jetty.websocket;
20
21import java.io.IOException;
22import java.io.UnsupportedEncodingException;
23import java.security.MessageDigest;
24import java.util.Collections;
25import java.util.List;
26
27import org.eclipse.jetty.io.AbstractConnection;
28import org.eclipse.jetty.io.AsyncEndPoint;
29import org.eclipse.jetty.io.Buffer;
30import org.eclipse.jetty.io.ByteArrayBuffer;
31import org.eclipse.jetty.io.Connection;
32import org.eclipse.jetty.io.EndPoint;
33import org.eclipse.jetty.util.B64Code;
34import org.eclipse.jetty.util.StringUtil;
35import org.eclipse.jetty.util.Utf8Appendable;
36import org.eclipse.jetty.util.Utf8StringBuilder;
37import org.eclipse.jetty.util.log.Log;
38import org.eclipse.jetty.util.log.Logger;
39import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
40import org.eclipse.jetty.websocket.WebSocket.OnControl;
41import org.eclipse.jetty.websocket.WebSocket.OnFrame;
42import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
43
44
45/* ------------------------------------------------------------ */
46/**
47 * <pre>
48 * 0 1 2 3
49 * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
50 * +-+-+-+-+-------+-+-------------+-------------------------------+
51 * |F|R|R|R| opcode|M| Payload len | Extended payload length |
52 * |I|S|S|S| (4) |A| (7) | (16/64) |
53 * |N|V|V|V| |S| | (if payload len==126/127) |
54 * | |1|2|3| |K| | |
55 * +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
56 * | Extended payload length continued, if payload len == 127 |
57 * + - - - - - - - - - - - - - - - +-------------------------------+
58 * | |Masking-key, if MASK set to 1 |
59 * +-------------------------------+-------------------------------+
60 * | Masking-key (continued) | Payload Data |
61 * +-------------------------------- - - - - - - - - - - - - - - - +
62 * : Payload Data continued ... :
63 * + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
64 * | Payload Data continued ... |
65 * +---------------------------------------------------------------+
66 * </pre>
67 */
68public class WebSocketConnectionRFC6455 extends AbstractConnection implements WebSocketConnection
69{
70 private static final Logger LOG = Log.getLogger(WebSocketConnectionRFC6455.class);
71
72 final static byte OP_CONTINUATION = 0x00;
73 final static byte OP_TEXT = 0x01;
74 final static byte OP_BINARY = 0x02;
75 final static byte OP_EXT_DATA = 0x03;
76
77 final static byte OP_CONTROL = 0x08;
78 final static byte OP_CLOSE = 0x08;
79 final static byte OP_PING = 0x09;
80 final static byte OP_PONG = 0x0A;
81 final static byte OP_EXT_CTRL = 0x0B;
82
83 final static int CLOSE_NORMAL=1000;
84 final static int CLOSE_SHUTDOWN=1001;
85 final static int CLOSE_PROTOCOL=1002;
86 final static int CLOSE_BAD_DATA=1003;
87 final static int CLOSE_UNDEFINED=1004;
88 final static int CLOSE_NO_CODE=1005;
89 final static int CLOSE_NO_CLOSE=1006;
90 final static int CLOSE_BAD_PAYLOAD=1007;
91 final static int CLOSE_POLICY_VIOLATION=1008;
92 final static int CLOSE_MESSAGE_TOO_LARGE=1009;
93 final static int CLOSE_REQUIRED_EXTENSION=1010;
94 final static int CLOSE_SERVER_ERROR=1011;
95 final static int CLOSE_FAILED_TLS_HANDSHAKE=1015;
96
97 final static int FLAG_FIN=0x8;
98
99 // Per RFC 6455, section 1.3 - Opening Handshake - this version is "13"
100 final static int VERSION=13;
101
102 static boolean isLastFrame(byte flags)
103 {
104 return (flags&FLAG_FIN)!=0;
105 }
106
107 static boolean isControlFrame(byte opcode)
108 {
109 return (opcode&OP_CONTROL)!=0;
110 }
111
112 private final static byte[] MAGIC;
113 private final List<Extension> _extensions;
114 private final WebSocketParserRFC6455 _parser;
115 private final WebSocketGeneratorRFC6455 _generator;
116 private final WebSocketGenerator _outbound;
117 private final WebSocket _webSocket;
118 private final OnFrame _onFrame;
119 private final OnBinaryMessage _onBinaryMessage;
120 private final OnTextMessage _onTextMessage;
121 private final OnControl _onControl;
122 private final String _protocol;
123 private final int _draft;
124 private final ClassLoader _context;
125 private volatile int _closeCode;
126 private volatile String _closeMessage;
127 private volatile boolean _closedIn;
128 private volatile boolean _closedOut;
129 private int _maxTextMessageSize=-1;
130 private int _maxBinaryMessageSize=-1;
131
132 static
133 {
134 try
135 {
136 MAGIC="258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StringUtil.__ISO_8859_1);
137 }
138 catch (UnsupportedEncodingException e)
139 {
140 throw new RuntimeException(e);
141 }
142 }
143
144 private final WebSocket.FrameConnection _connection = new WSFrameConnection();
145
146
147 /* ------------------------------------------------------------ */
148 public WebSocketConnectionRFC6455(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft)
149 throws IOException
150 {
151 this(websocket,endpoint,buffers,timestamp,maxIdleTime,protocol,extensions,draft,null);
152 }
153
154 /* ------------------------------------------------------------ */
155 public WebSocketConnectionRFC6455(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft, MaskGen maskgen)
156 throws IOException
157 {
158 super(endpoint,timestamp);
159
160 _context=Thread.currentThread().getContextClassLoader();
161
162 _draft=draft;
163 _endp.setMaxIdleTime(maxIdleTime);
164
165 _webSocket = websocket;
166 _onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null;
167 _onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null;
168 _onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null;
169 _onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null;
170 _generator = new WebSocketGeneratorRFC6455(buffers, _endp,maskgen);
171
172 _extensions=extensions;
173 WebSocketParser.FrameHandler frameHandler = new WSFrameHandler();
174 if (_extensions!=null)
175 {
176 int e=0;
177 for (Extension extension : _extensions)
178 {
179 extension.bind(
180 _connection,
181 e==extensions.size()-1? frameHandler :extensions.get(e+1),
182 e==0?_generator:extensions.get(e-1));
183 e++;
184 }
185 }
186
187 _outbound=(_extensions==null||_extensions.size()==0)?_generator:extensions.get(extensions.size()-1);
188 WebSocketParser.FrameHandler inbound = (_extensions == null || _extensions.size() == 0) ? frameHandler : extensions.get(0);
189
190 _parser = new WebSocketParserRFC6455(buffers, endpoint, inbound,maskgen==null);
191
192 _protocol=protocol;
193
194 }
195
196 /* ------------------------------------------------------------ */
197 public WebSocket.Connection getConnection()
198 {
199 return _connection;
200 }
201
202 /* ------------------------------------------------------------ */
203 public List<Extension> getExtensions()
204 {
205 if (_extensions==null)
206 return Collections.emptyList();
207
208 return _extensions;
209 }
210
211 /* ------------------------------------------------------------ */
212 public Connection handle() throws IOException
213 {
214 Thread current = Thread.currentThread();
215 ClassLoader oldcontext = current.getContextClassLoader();
216 current.setContextClassLoader(_context);
217 try
218 {
219 // handle the framing protocol
220 boolean progress=true;
221
222 while (progress)
223 {
224 int flushed=_generator.flushBuffer();
225 int filled=_parser.parseNext();
226
227 progress = flushed>0 || filled>0;
228 _endp.flush();
229
230 if (_endp instanceof AsyncEndPoint && ((AsyncEndPoint)_endp).hasProgressed())
231 progress=true;
232 }
233 }
234 catch(IOException e)
235 {
236 try
237 {
238 if (_endp.isOpen())
239 _endp.close();
240 }
241 catch(IOException e2)
242 {
243 LOG.ignore(e2);
244 }
245 throw e;
246 }
247 finally
248 {
249 current.setContextClassLoader(oldcontext);
250 _parser.returnBuffer();
251 _generator.returnBuffer();
252 if (_endp.isOpen())
253 {
254 if (_closedIn && _closedOut && _outbound.isBufferEmpty())
255 _endp.close();
256 else if (_endp.isInputShutdown() && !_closedIn)
257 closeIn(CLOSE_NO_CLOSE,null);
258 else
259 checkWriteable();
260 }
261 }
262 return this;
263 }
264
265 /* ------------------------------------------------------------ */
266 public void onInputShutdown() throws IOException
267 {
268 if (!_closedIn)
269 _endp.close();
270 }
271
272 /* ------------------------------------------------------------ */
273 public boolean isIdle()
274 {
275 return _parser.isBufferEmpty() && _outbound.isBufferEmpty();
276 }
277
278 /* ------------------------------------------------------------ */
279 @Override
280 public void onIdleExpired(long idleForMs)
281 {
282 closeOut(WebSocketConnectionRFC6455.CLOSE_NORMAL,"Idle for "+idleForMs+"ms > "+_endp.getMaxIdleTime()+"ms");
283 }
284
285 /* ------------------------------------------------------------ */
286 public boolean isSuspended()
287 {
288 return false;
289 }
290
291 /* ------------------------------------------------------------ */
292 public void onClose()
293 {
294 final boolean closed;
295 synchronized (this)
296 {
297 closed=_closeCode==0;
298 if (closed)
299 _closeCode=WebSocketConnectionRFC6455.CLOSE_NO_CLOSE;
300 }
301 if (closed)
302 _webSocket.onClose(WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,"closed");
303 }
304
305 /* ------------------------------------------------------------ */
306 public void closeIn(int code,String message)
307 {
308 LOG.debug("ClosedIn {} {} {}",this,code,message);
309
310 final boolean closed_out;
311 final boolean tell_app;
312 synchronized (this)
313 {
314 closed_out=_closedOut;
315 _closedIn=true;
316 tell_app=_closeCode==0;
317 if (tell_app)
318 {
319 _closeCode=code;
320 _closeMessage=message;
321 }
322 }
323
324 try
325 {
326 if (!closed_out)
327 closeOut(code,message);
328 }
329 finally
330 {
331 if (tell_app)
332 _webSocket.onClose(code,message);
333 }
334 }
335
336 /* ------------------------------------------------------------ */
337 public void closeOut(int code,String message)
338 {
339 LOG.debug("ClosedOut {} {} {}",this,code,message);
340
341 final boolean closed_out;
342 final boolean tell_app;
343 synchronized (this)
344 {
345 closed_out=_closedOut;
346 _closedOut=true;
347 tell_app=_closeCode==0;
348 if (tell_app)
349 {
350 _closeCode=code;
351 _closeMessage=message;
352 }
353 }
354
355 try
356 {
357 if (tell_app)
358 _webSocket.onClose(code,message);
359 }
360 finally
361 {
362 try
363 {
364 if (!closed_out)
365 {
366 // Close code 1005/1006/1015 are never to be sent as a status over
367 // a Close control frame. Code<-1 also means no node.
368
369 if (code < 0 || (code == WebSocketConnectionRFC6455.CLOSE_NO_CODE) || (code == WebSocketConnectionRFC6455.CLOSE_NO_CLOSE)
370 || (code == WebSocketConnectionRFC6455.CLOSE_FAILED_TLS_HANDSHAKE))
371 {
372 code = -1;
373 }
374 else if (code == 0)
375 {
376 code = WebSocketConnectionRFC6455.CLOSE_NORMAL;
377 }
378
379 byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
380 bytes[0]=(byte)(code/0x100);
381 bytes[1]=(byte)(code%0x100);
382 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionRFC6455.OP_CLOSE,bytes,0,code>0?bytes.length:0);
383 _outbound.flush();
384 }
385 }
386 catch(IOException e)
387 {
388 LOG.ignore(e);
389 }
390 }
391 }
392
393 public void shutdown()
394 {
395 final WebSocket.Connection connection = _connection;
396 if (connection != null)
397 connection.close(CLOSE_SHUTDOWN, null);
398 }
399
400 /* ------------------------------------------------------------ */
401 public void fillBuffersFrom(Buffer buffer)
402 {
403 _parser.fill(buffer);
404 }
405
406 /* ------------------------------------------------------------ */
407 private void checkWriteable()
408 {
409 if (!_outbound.isBufferEmpty() && _endp instanceof AsyncEndPoint)
410 {
411 ((AsyncEndPoint)_endp).scheduleWrite();
412 }
413 }
414
415 protected void onFrameHandshake()
416 {
417 if (_onFrame != null)
418 {
419 _onFrame.onHandshake(_connection);
420 }
421 }
422
423 protected void onWebSocketOpen()
424 {
425 _webSocket.onOpen(_connection);
426 }
427
428 /* ------------------------------------------------------------ */
429 private class WSFrameConnection implements WebSocket.FrameConnection
430 {
431 private volatile boolean _disconnecting;
432
433 /* ------------------------------------------------------------ */
434 public void sendMessage(String content) throws IOException
435 {
436 if (_closedOut)
437 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
438 byte[] data = content.getBytes(StringUtil.__UTF8);
439 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionRFC6455.OP_TEXT,data,0,data.length);
440 checkWriteable();
441 }
442
443 /* ------------------------------------------------------------ */
444 public void sendMessage(byte[] content, int offset, int length) throws IOException
445 {
446 if (_closedOut)
447 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
448 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionRFC6455.OP_BINARY,content,offset,length);
449 checkWriteable();
450 }
451
452 /* ------------------------------------------------------------ */
453 public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
454 {
455 if (_closedOut)
456 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
457 _outbound.addFrame(flags,opcode,content,offset,length);
458 checkWriteable();
459 }
460
461 /* ------------------------------------------------------------ */
462 public void sendControl(byte ctrl, byte[] data, int offset, int length) throws IOException
463 {
464 // TODO: section 5.5 states that control frames MUST never be length > 125 bytes and MUST NOT be fragmented
465 if (_closedOut)
466 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
467 _outbound.addFrame((byte)FLAG_FIN,ctrl,data,offset,length);
468 checkWriteable();
469 }
470
471 /* ------------------------------------------------------------ */
472 public boolean isMessageComplete(byte flags)
473 {
474 return isLastFrame(flags);
475 }
476
477 /* ------------------------------------------------------------ */
478 public boolean isOpen()
479 {
480 return _endp!=null&&_endp.isOpen();
481 }
482
483 /* ------------------------------------------------------------ */
484 public void close(int code, String message)
485 {
486 if (_disconnecting)
487 return;
488 _disconnecting=true;
489 WebSocketConnectionRFC6455.this.closeOut(code,message);
490 }
491
492 /* ------------------------------------------------------------ */
493 public void setMaxIdleTime(int ms)
494 {
495 try
496 {
497 _endp.setMaxIdleTime(ms);
498 }
499 catch(IOException e)
500 {
501 LOG.warn(e);
502 }
503 }
504
505 /* ------------------------------------------------------------ */
506 public void setMaxTextMessageSize(int size)
507 {
508 _maxTextMessageSize=size;
509 }
510
511 /* ------------------------------------------------------------ */
512 public void setMaxBinaryMessageSize(int size)
513 {
514 _maxBinaryMessageSize=size;
515 }
516
517 /* ------------------------------------------------------------ */
518 public int getMaxIdleTime()
519 {
520 return _endp.getMaxIdleTime();
521 }
522
523 /* ------------------------------------------------------------ */
524 public int getMaxTextMessageSize()
525 {
526 return _maxTextMessageSize;
527 }
528
529 /* ------------------------------------------------------------ */
530 public int getMaxBinaryMessageSize()
531 {
532 return _maxBinaryMessageSize;
533 }
534
535 /* ------------------------------------------------------------ */
536 public String getProtocol()
537 {
538 return _protocol;
539 }
540
541 /* ------------------------------------------------------------ */
542 public byte binaryOpcode()
543 {
544 return OP_BINARY;
545 }
546
547 /* ------------------------------------------------------------ */
548 public byte textOpcode()
549 {
550 return OP_TEXT;
551 }
552
553 /* ------------------------------------------------------------ */
554 public byte continuationOpcode()
555 {
556 return OP_CONTINUATION;
557 }
558
559 /* ------------------------------------------------------------ */
560 public byte finMask()
561 {
562 return FLAG_FIN;
563 }
564
565 /* ------------------------------------------------------------ */
566 public boolean isControl(byte opcode)
567 {
568 return isControlFrame(opcode);
569 }
570
571 /* ------------------------------------------------------------ */
572 public boolean isText(byte opcode)
573 {
574 return opcode==OP_TEXT;
575 }
576
577 /* ------------------------------------------------------------ */
578 public boolean isBinary(byte opcode)
579 {
580 return opcode==OP_BINARY;
581 }
582
583 /* ------------------------------------------------------------ */
584 public boolean isContinuation(byte opcode)
585 {
586 return opcode==OP_CONTINUATION;
587 }
588
589 /* ------------------------------------------------------------ */
590 public boolean isClose(byte opcode)
591 {
592 return opcode==OP_CLOSE;
593 }
594
595 /* ------------------------------------------------------------ */
596 public boolean isPing(byte opcode)
597 {
598 return opcode==OP_PING;
599 }
600
601 /* ------------------------------------------------------------ */
602 public boolean isPong(byte opcode)
603 {
604 return opcode==OP_PONG;
605 }
606
607 /* ------------------------------------------------------------ */
608 public void disconnect()
609 {
610 close(CLOSE_NORMAL,null);
611 }
612
613 /* ------------------------------------------------------------ */
614 public void close()
615 {
616 close(CLOSE_NORMAL,null);
617 }
618
619 /* ------------------------------------------------------------ */
620 public void setAllowFrameFragmentation(boolean allowFragmentation)
621 {
622 _parser.setFakeFragments(allowFragmentation);
623 }
624
625 /* ------------------------------------------------------------ */
626 public boolean isAllowFrameFragmentation()
627 {
628 return _parser.isFakeFragments();
629 }
630
631 /* ------------------------------------------------------------ */
632 @Override
633 public String toString()
634 {
635 return String.format("%s@%x l(%s:%d)<->r(%s:%d)",
636 getClass().getSimpleName(),
637 hashCode(),
638 _endp.getLocalAddr(),
639 _endp.getLocalPort(),
640 _endp.getRemoteAddr(),
641 _endp.getRemotePort());
642 }
643 }
644
645 /* ------------------------------------------------------------ */
646 /* ------------------------------------------------------------ */
647 /* ------------------------------------------------------------ */
648 private class WSFrameHandler implements WebSocketParser.FrameHandler
649 {
650 private static final int MAX_CONTROL_FRAME_PAYLOAD = 125;
651 private final Utf8StringBuilder _utf8 = new Utf8StringBuilder(512); // TODO configure initial capacity
652 private ByteArrayBuffer _aggregate;
653 private byte _opcode=-1;
654
655 public void onFrame(final byte flags, final byte opcode, final Buffer buffer)
656 {
657 boolean lastFrame = isLastFrame(flags);
658
659 synchronized(WebSocketConnectionRFC6455.this)
660 {
661 // Ignore incoming after a close
662 if (_closedIn)
663 return;
664 }
665 try
666 {
667 byte[] array=buffer.array();
668
669 if (isControlFrame(opcode) && buffer.length()>MAX_CONTROL_FRAME_PAYLOAD)
670 {
671 errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Control frame too large: " + buffer.length() + " > " + MAX_CONTROL_FRAME_PAYLOAD);
672 return;
673 }
674
675 // TODO: check extensions for RSV bit(s) meanings
676 if ((flags&0x7)!=0)
677 {
678 errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"RSV bits set 0x"+Integer.toHexString(flags));
679 return;
680 }
681
682 // Ignore all frames after error close
683 if (_closeCode!=0 && _closeCode!=CLOSE_NORMAL && opcode!=OP_CLOSE)
684 {
685 return;
686 }
687
688 // Deliver frame if websocket is a FrameWebSocket
689 if (_onFrame!=null)
690 {
691 if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
692 return;
693 }
694
695 if (_onControl!=null && isControlFrame(opcode))
696 {
697 if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
698 return;
699 }
700
701 switch(opcode)
702 {
703 case WebSocketConnectionRFC6455.OP_CONTINUATION:
704 {
705 if (_opcode==-1)
706 {
707 errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Bad Continuation");
708 return;
709 }
710
711 // If text, append to the message buffer
712 if (_onTextMessage!=null && _opcode==WebSocketConnectionRFC6455.OP_TEXT)
713 {
714 if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
715 {
716 // If this is the last fragment, deliver the text buffer
717 if (lastFrame)
718 {
719 _opcode=-1;
720 String msg =_utf8.toString();
721 _utf8.reset();
722 _onTextMessage.onMessage(msg);
723 }
724 }
725 else
726 textMessageTooLarge();
727 }
728
729 if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
730 {
731 if (_aggregate!=null && checkBinaryMessageSize(_aggregate.length(),buffer.length()))
732 {
733 _aggregate.put(buffer);
734
735 // If this is the last fragment, deliver
736 if (lastFrame && _onBinaryMessage!=null)
737 {
738 try
739 {
740 _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
741 }
742 finally
743 {
744 _opcode=-1;
745 _aggregate.clear();
746 }
747 }
748 }
749 }
750 break;
751 }
752 case WebSocketConnectionRFC6455.OP_PING:
753 {
754 LOG.debug("PING {}",this);
755 if (!_closedOut)
756 {
757 _connection.sendControl(WebSocketConnectionRFC6455.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
758 }
759 break;
760 }
761
762 case WebSocketConnectionRFC6455.OP_PONG:
763 {
764 LOG.debug("PONG {}",this);
765 break;
766 }
767
768 case WebSocketConnectionRFC6455.OP_CLOSE:
769 {
770 int code=WebSocketConnectionRFC6455.CLOSE_NO_CODE;
771 String message=null;
772 if (buffer.length()>=2)
773 {
774 code=(0xff&buffer.array()[buffer.getIndex()])*0x100+(0xff&buffer.array()[buffer.getIndex()+1]);
775
776 // Validate close status codes.
777 if (code < WebSocketConnectionRFC6455.CLOSE_NORMAL ||
778 code == WebSocketConnectionRFC6455.CLOSE_UNDEFINED ||
779 code == WebSocketConnectionRFC6455.CLOSE_NO_CLOSE ||
780 code == WebSocketConnectionRFC6455.CLOSE_NO_CODE ||
781 ( code > 1011 && code <= 2999 ) ||
782 code >= 5000 )
783 {
784 errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Invalid close code " + code);
785 return;
786 }
787
788 if (buffer.length()>2)
789 {
790 if(_utf8.append(buffer.array(),buffer.getIndex()+2,buffer.length()-2,_connection.getMaxTextMessageSize()))
791 {
792 message = _utf8.toString();
793 _utf8.reset();
794 }
795 }
796 }
797 else if(buffer.length() == 1)
798 {
799 // Invalid length. use status code 1002 (Protocol error)
800 errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Invalid payload length of 1");
801 return;
802 }
803 closeIn(code,message);
804 break;
805 }
806
807 case WebSocketConnectionRFC6455.OP_TEXT:
808 {
809 if (_opcode!=-1)
810 {
811 errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Expected Continuation"+Integer.toHexString(opcode));
812 return;
813 }
814
815 if(_onTextMessage!=null)
816 {
817 if (_connection.getMaxTextMessageSize()<=0)
818 {
819 // No size limit, so handle only final frames
820 if (lastFrame)
821 _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
822 else
823 {
824 LOG.warn("Frame discarded. Text aggregation disabled for {}",_endp);
825 errorClose(WebSocketConnectionRFC6455.CLOSE_POLICY_VIOLATION,"Text frame aggregation disabled");
826 }
827 }
828 // append bytes to message buffer (if they fit)
829 else if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
830 {
831 if (lastFrame)
832 {
833 String msg =_utf8.toString();
834 _utf8.reset();
835 _onTextMessage.onMessage(msg);
836 }
837 else
838 {
839 _opcode=WebSocketConnectionRFC6455.OP_TEXT;
840 }
841 }
842 else
843 textMessageTooLarge();
844 }
845 break;
846 }
847
848 case WebSocketConnectionRFC6455.OP_BINARY:
849 {
850 if (_opcode!=-1)
851 {
852 errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Expected Continuation"+Integer.toHexString(opcode));
853 return;
854 }
855
856 if (_onBinaryMessage!=null && checkBinaryMessageSize(0,buffer.length()))
857 {
858 if (lastFrame)
859 {
860 _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
861 }
862 else if (_connection.getMaxBinaryMessageSize()>=0)
863 {
864 _opcode=opcode;
865 // TODO use a growing buffer rather than a fixed one.
866 if (_aggregate==null)
867 _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
868 _aggregate.put(buffer);
869 }
870 else
871 {
872 LOG.warn("Frame discarded. Binary aggregation disabed for {}",_endp);
873 errorClose(WebSocketConnectionRFC6455.CLOSE_POLICY_VIOLATION,"Binary frame aggregation disabled");
874 }
875 }
876 break;
877 }
878
879 default:
880 errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Bad opcode 0x"+Integer.toHexString(opcode));
881 break;
882 }
883 }
884 catch(Utf8Appendable.NotUtf8Exception notUtf8)
885 {
886 LOG.warn("NOTUTF8 - {} for {}",notUtf8,_endp, notUtf8);
887 LOG.debug(notUtf8);
888 errorClose(WebSocketConnectionRFC6455.CLOSE_BAD_PAYLOAD,"Invalid UTF-8");
889 }
890 catch(Throwable e)
891 {
892 LOG.warn("{} for {}",e,_endp, e);
893 LOG.debug(e);
894 errorClose(WebSocketConnectionRFC6455.CLOSE_SERVER_ERROR,"Internal Server Error: "+e);
895 }
896 }
897
898 private void errorClose(int code, String message)
899 {
900 _connection.close(code,message);
901
902 // Brutally drop the connection
903 try
904 {
905 _endp.close();
906 }
907 catch (IOException e)
908 {
909 LOG.warn(e.toString());
910 LOG.debug(e);
911 }
912 }
913
914 private boolean checkBinaryMessageSize(int bufferLen, int length)
915 {
916 int max = _connection.getMaxBinaryMessageSize();
917 if (max>0 && (bufferLen+length)>max)
918 {
919 LOG.warn("Binary message too large > {}B for {}",_connection.getMaxBinaryMessageSize(),_endp);
920 _connection.close(WebSocketConnectionRFC6455.CLOSE_MESSAGE_TOO_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
921 _opcode=-1;
922 if (_aggregate!=null)
923 _aggregate.clear();
924 return false;
925 }
926 return true;
927 }
928
929 private void textMessageTooLarge()
930 {
931 LOG.warn("Text message too large > {} chars for {}",_connection.getMaxTextMessageSize(),_endp);
932 _connection.close(WebSocketConnectionRFC6455.CLOSE_MESSAGE_TOO_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
933
934 _opcode=-1;
935 _utf8.reset();
936 }
937
938 public void close(int code,String message)
939 {
940 if (code!=CLOSE_NORMAL)
941 LOG.warn("Close: "+code+" "+message);
942 _connection.close(code,message);
943 }
944
945 @Override
946 public String toString()
947 {
948 return WebSocketConnectionRFC6455.this.toString()+"FH";
949 }
950 }
951
952 /* ------------------------------------------------------------ */
953 public static String hashKey(String key)
954 {
955 try
956 {
957 MessageDigest md = MessageDigest.getInstance("SHA1");
958 md.update(key.getBytes("UTF-8"));
959 md.update(MAGIC);
960 return new String(B64Code.encode(md.digest()));
961 }
962 catch (Exception e)
963 {
964 throw new RuntimeException(e);
965 }
966 }
967
968 /* ------------------------------------------------------------ */
969 @Override
970 public String toString()
971 {
972 return String.format("%s p=%s g=%s", getClass().getSimpleName(), _parser, _generator);
973 }
974}