blob: 9d16f5d3417fd277b9491e38b13fb7a6a5cef2aa [file] [log] [blame]
Jake Slack03928ae2014-05-13 18:41:56 -07001//
2// ========================================================================
3// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4// ------------------------------------------------------------------------
5// All rights reserved. This program and the accompanying materials
6// are made available under the terms of the Eclipse Public License v1.0
7// and Apache License v2.0 which accompanies this distribution.
8//
9// The Eclipse Public License is available at
10// http://www.eclipse.org/legal/epl-v10.html
11//
12// The Apache License v2.0 is available at
13// http://www.opensource.org/licenses/apache2.0.php
14//
15// You may elect to redistribute this code under either of these licenses.
16// ========================================================================
17//
18
19package org.eclipse.jetty.io.nio;
20
21import java.io.IOException;
22import java.io.InterruptedIOException;
23import java.nio.channels.ClosedChannelException;
24import java.nio.channels.SelectableChannel;
25import java.nio.channels.SelectionKey;
26import java.nio.channels.SocketChannel;
27import java.util.Locale;
28
29import org.eclipse.jetty.io.AsyncEndPoint;
30import org.eclipse.jetty.io.Buffer;
31import org.eclipse.jetty.io.ConnectedEndPoint;
32import org.eclipse.jetty.io.Connection;
33import org.eclipse.jetty.io.EofException;
34import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
35import org.eclipse.jetty.util.log.Log;
36import org.eclipse.jetty.util.log.Logger;
37import org.eclipse.jetty.util.thread.Timeout.Task;
38
39/* ------------------------------------------------------------ */
40/**
41 * An Endpoint that can be scheduled by {@link SelectorManager}.
42 */
43public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
44{
45 public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
46
47 private final boolean WORK_AROUND_JVM_BUG_6346658 = System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win");
48 private final SelectorManager.SelectSet _selectSet;
49 private final SelectorManager _manager;
50 private SelectionKey _key;
51 private final Runnable _handler = new Runnable()
52 {
53 public void run() { handle(); }
54 };
55
56 /** The desired value for {@link SelectionKey#interestOps()} */
57 private int _interestOps;
58
59 /**
60 * The connection instance is the handler for any IO activity on the endpoint.
61 * There is a different type of connection for HTTP, AJP, WebSocket and
62 * ProxyConnect. The connection may change for an SCEP as it is upgraded
63 * from HTTP to proxy connect or websocket.
64 */
65 private volatile AsyncConnection _connection;
66
67 private static final int STATE_NEEDS_DISPATCH=-1;
68 private static final int STATE_UNDISPATCHED=0;
69 private static final int STATE_DISPATCHED=1;
70 private static final int STATE_ASYNC=2;
71 private int _state;
72
73 private boolean _onIdle;
74
75 /** true if the last write operation succeed and wrote all offered bytes */
76 private volatile boolean _writable = true;
77
78
79 /** True if a thread has is blocked in {@link #blockReadable(long)} */
80 private boolean _readBlocked;
81
82 /** True if a thread has is blocked in {@link #blockWritable(long)} */
83 private boolean _writeBlocked;
84
85 /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */
86 private boolean _open;
87
88 private volatile long _idleTimestamp;
89 private volatile boolean _checkIdle;
90
91 private boolean _interruptable;
92
93 private boolean _ishut;
94
95 /* ------------------------------------------------------------ */
96 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
97 throws IOException
98 {
99 super(channel, maxIdleTime);
100
101 _manager = selectSet.getManager();
102 _selectSet = selectSet;
103 _state=STATE_UNDISPATCHED;
104 _onIdle=false;
105 _open=true;
106 _key = key;
107
108 setCheckForIdle(true);
109 }
110
111 /* ------------------------------------------------------------ */
112 public SelectionKey getSelectionKey()
113 {
114 synchronized (this)
115 {
116 return _key;
117 }
118 }
119
120 /* ------------------------------------------------------------ */
121 public SelectorManager getSelectManager()
122 {
123 return _manager;
124 }
125
126 /* ------------------------------------------------------------ */
127 public Connection getConnection()
128 {
129 return _connection;
130 }
131
132 /* ------------------------------------------------------------ */
133 public void setConnection(Connection connection)
134 {
135 Connection old=_connection;
136 _connection=(AsyncConnection)connection;
137 if (old!=null && old!=_connection)
138 _manager.endPointUpgraded(this,old);
139 }
140
141 /* ------------------------------------------------------------ */
142 public long getIdleTimestamp()
143 {
144 return _idleTimestamp;
145 }
146
147 /* ------------------------------------------------------------ */
148 /** Called by selectSet to schedule handling
149 *
150 */
151 public void schedule()
152 {
153 synchronized (this)
154 {
155 // If there is no key, then do nothing
156 if (_key == null || !_key.isValid())
157 {
158 _readBlocked=false;
159 _writeBlocked=false;
160 this.notifyAll();
161 return;
162 }
163
164 // If there are threads dispatched reading and writing
165 if (_readBlocked || _writeBlocked)
166 {
167 // assert _dispatched;
168 if (_readBlocked && _key.isReadable())
169 _readBlocked=false;
170 if (_writeBlocked && _key.isWritable())
171 _writeBlocked=false;
172
173 // wake them up is as good as a dispatched.
174 this.notifyAll();
175
176 // we are not interested in further selecting
177 _key.interestOps(0);
178 if (_state<STATE_DISPATCHED)
179 updateKey();
180 return;
181 }
182
183 // Remove writeable op
184 if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
185 {
186 // Remove writeable op
187 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
188 _key.interestOps(_interestOps);
189 _writable = true; // Once writable is in ops, only removed with dispatch.
190 }
191
192 // If dispatched, then deregister interest
193 if (_state>=STATE_DISPATCHED)
194 _key.interestOps(0);
195 else
196 {
197 // other wise do the dispatch
198 dispatch();
199 if (_state>=STATE_DISPATCHED && !_selectSet.getManager().isDeferringInterestedOps0())
200 {
201 _key.interestOps(0);
202 }
203 }
204 }
205 }
206
207 /* ------------------------------------------------------------ */
208 public void asyncDispatch()
209 {
210 synchronized(this)
211 {
212 switch(_state)
213 {
214 case STATE_NEEDS_DISPATCH:
215 case STATE_UNDISPATCHED:
216 dispatch();
217 break;
218
219 case STATE_DISPATCHED:
220 case STATE_ASYNC:
221 _state=STATE_ASYNC;
222 break;
223 }
224 }
225 }
226
227 /* ------------------------------------------------------------ */
228 public void dispatch()
229 {
230 synchronized(this)
231 {
232 if (_state<=STATE_UNDISPATCHED)
233 {
234 if (_onIdle)
235 _state = STATE_NEEDS_DISPATCH;
236 else
237 {
238 _state = STATE_DISPATCHED;
239 boolean dispatched = _manager.dispatch(_handler);
240 if(!dispatched)
241 {
242 _state = STATE_NEEDS_DISPATCH;
243 LOG.warn("Dispatched Failed! "+this+" to "+_manager);
244 updateKey();
245 }
246 }
247 }
248 }
249 }
250
251 /* ------------------------------------------------------------ */
252 /**
253 * Called when a dispatched thread is no longer handling the endpoint.
254 * The selection key operations are updated.
255 * @return If false is returned, the endpoint has been redispatched and
256 * thread must keep handling the endpoint.
257 */
258 protected boolean undispatch()
259 {
260 synchronized (this)
261 {
262 switch(_state)
263 {
264 case STATE_ASYNC:
265 _state=STATE_DISPATCHED;
266 return false;
267
268 default:
269 _state=STATE_UNDISPATCHED;
270 updateKey();
271 return true;
272 }
273 }
274 }
275
276 /* ------------------------------------------------------------ */
277 public void cancelTimeout(Task task)
278 {
279 getSelectSet().cancelTimeout(task);
280 }
281
282 /* ------------------------------------------------------------ */
283 public void scheduleTimeout(Task task, long timeoutMs)
284 {
285 getSelectSet().scheduleTimeout(task,timeoutMs);
286 }
287
288 /* ------------------------------------------------------------ */
289 public void setCheckForIdle(boolean check)
290 {
291 if (check)
292 {
293 _idleTimestamp=System.currentTimeMillis();
294 _checkIdle=true;
295 }
296 else
297 _checkIdle=false;
298 }
299
300 /* ------------------------------------------------------------ */
301 public boolean isCheckForIdle()
302 {
303 return _checkIdle;
304 }
305
306 /* ------------------------------------------------------------ */
307 protected void notIdle()
308 {
309 _idleTimestamp=System.currentTimeMillis();
310 }
311
312 /* ------------------------------------------------------------ */
313 public void checkIdleTimestamp(long now)
314 {
315 if (isCheckForIdle() && _maxIdleTime>0)
316 {
317 final long idleForMs=now-_idleTimestamp;
318
319 if (idleForMs>_maxIdleTime)
320 {
321 // Don't idle out again until onIdleExpired task completes.
322 setCheckForIdle(false);
323 _manager.dispatch(new Runnable()
324 {
325 public void run()
326 {
327 try
328 {
329 onIdleExpired(idleForMs);
330 }
331 finally
332 {
333 setCheckForIdle(true);
334 }
335 }
336 });
337 }
338 }
339 }
340
341 /* ------------------------------------------------------------ */
342 public void onIdleExpired(long idleForMs)
343 {
344 try
345 {
346 synchronized (this)
347 {
348 _onIdle=true;
349 }
350
351 _connection.onIdleExpired(idleForMs);
352 }
353 finally
354 {
355 synchronized (this)
356 {
357 _onIdle=false;
358 if (_state==STATE_NEEDS_DISPATCH)
359 dispatch();
360 }
361 }
362 }
363
364 /* ------------------------------------------------------------ */
365 @Override
366 public int fill(Buffer buffer) throws IOException
367 {
368 int fill=super.fill(buffer);
369 if (fill>0)
370 notIdle();
371 return fill;
372 }
373
374 /* ------------------------------------------------------------ */
375 @Override
376 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
377 {
378 int l = super.flush(header, buffer, trailer);
379
380 // If there was something to write and it wasn't written, then we are not writable.
381 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
382 {
383 synchronized (this)
384 {
385 _writable=false;
386 if (_state<STATE_DISPATCHED)
387 updateKey();
388 }
389 }
390 else if (l>0)
391 {
392 _writable=true;
393 notIdle();
394 }
395 return l;
396 }
397
398 /* ------------------------------------------------------------ */
399 /*
400 */
401 @Override
402 public int flush(Buffer buffer) throws IOException
403 {
404 int l = super.flush(buffer);
405
406 // If there was something to write and it wasn't written, then we are not writable.
407 if (l==0 && buffer!=null && buffer.hasContent())
408 {
409 synchronized (this)
410 {
411 _writable=false;
412 if (_state<STATE_DISPATCHED)
413 updateKey();
414 }
415 }
416 else if (l>0)
417 {
418 _writable=true;
419 notIdle();
420 }
421
422 return l;
423 }
424
425 /* ------------------------------------------------------------ */
426 /*
427 * Allows thread to block waiting for further events.
428 */
429 @Override
430 public boolean blockReadable(long timeoutMs) throws IOException
431 {
432 synchronized (this)
433 {
434 if (isInputShutdown())
435 throw new EofException();
436
437 long now=_selectSet.getNow();
438 long end=now+timeoutMs;
439 boolean check=isCheckForIdle();
440 setCheckForIdle(true);
441 try
442 {
443 _readBlocked=true;
444 while (!isInputShutdown() && _readBlocked)
445 {
446 try
447 {
448 updateKey();
449 this.wait(timeoutMs>0?(end-now):10000);
450 }
451 catch (final InterruptedException e)
452 {
453 LOG.warn(e);
454 if (_interruptable)
455 throw new InterruptedIOException(){{this.initCause(e);}};
456 }
457 finally
458 {
459 now=_selectSet.getNow();
460 }
461
462 if (_readBlocked && timeoutMs>0 && now>=end)
463 return false;
464 }
465 }
466 finally
467 {
468 _readBlocked=false;
469 setCheckForIdle(check);
470 }
471 }
472 return true;
473 }
474
475 /* ------------------------------------------------------------ */
476 /*
477 * Allows thread to block waiting for further events.
478 */
479 @Override
480 public boolean blockWritable(long timeoutMs) throws IOException
481 {
482 synchronized (this)
483 {
484 if (isOutputShutdown())
485 throw new EofException();
486
487 long now=_selectSet.getNow();
488 long end=now+timeoutMs;
489 boolean check=isCheckForIdle();
490 setCheckForIdle(true);
491 try
492 {
493 _writeBlocked=true;
494 while (_writeBlocked && !isOutputShutdown())
495 {
496 try
497 {
498 updateKey();
499 this.wait(timeoutMs>0?(end-now):10000);
500 }
501 catch (final InterruptedException e)
502 {
503 LOG.warn(e);
504 if (_interruptable)
505 throw new InterruptedIOException(){{this.initCause(e);}};
506 }
507 finally
508 {
509 now=_selectSet.getNow();
510 }
511 if (_writeBlocked && timeoutMs>0 && now>=end)
512 return false;
513 }
514 }
515 finally
516 {
517 _writeBlocked=false;
518 setCheckForIdle(check);
519 }
520 }
521 return true;
522 }
523
524 /* ------------------------------------------------------------ */
525 /** Set the interruptable mode of the endpoint.
526 * If set to false (default), then interrupts are assumed to be spurious
527 * and blocking operations continue unless the endpoint has been closed.
528 * If true, then interrupts of blocking operations result in InterruptedIOExceptions
529 * being thrown.
530 * @param interupable
531 */
532 public void setInterruptable(boolean interupable)
533 {
534 synchronized (this)
535 {
536 _interruptable=interupable;
537 }
538 }
539
540 /* ------------------------------------------------------------ */
541 public boolean isInterruptable()
542 {
543 return _interruptable;
544 }
545
546 /* ------------------------------------------------------------ */
547 /**
548 * @see org.eclipse.jetty.io.AsyncEndPoint#scheduleWrite()
549 */
550 public void scheduleWrite()
551 {
552 if (_writable)
553 LOG.debug("Required scheduleWrite {}",this);
554
555 _writable=false;
556 updateKey();
557 }
558
559 /* ------------------------------------------------------------ */
560 public boolean isWritable()
561 {
562 return _writable;
563 }
564
565 /* ------------------------------------------------------------ */
566 public boolean hasProgressed()
567 {
568 return false;
569 }
570
571 /* ------------------------------------------------------------ */
572 /**
573 * Updates selection key. Adds operations types to the selection key as needed. No operations
574 * are removed as this is only done during dispatch. This method records the new key and
575 * schedules a call to doUpdateKey to do the keyChange
576 */
577 private void updateKey()
578 {
579 final boolean changed;
580 synchronized (this)
581 {
582 int current_ops=-1;
583 if (getChannel().isOpen())
584 {
585 boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended());
586 boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable);
587
588 _interestOps =
589 ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0)
590 | ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0);
591 try
592 {
593 current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
594 }
595 catch(Exception e)
596 {
597 _key=null;
598 LOG.ignore(e);
599 }
600 }
601 changed=_interestOps!=current_ops;
602 }
603
604 if(changed)
605 {
606 _selectSet.addChange(this);
607 _selectSet.wakeup();
608 }
609 }
610
611
612 /* ------------------------------------------------------------ */
613 /**
614 * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
615 */
616 void doUpdateKey()
617 {
618 synchronized (this)
619 {
620 if (getChannel().isOpen())
621 {
622 if (_interestOps>0)
623 {
624 if (_key==null || !_key.isValid())
625 {
626 SelectableChannel sc = (SelectableChannel)getChannel();
627 if (sc.isRegistered())
628 {
629 updateKey();
630 }
631 else
632 {
633 try
634 {
635 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
636 }
637 catch (Exception e)
638 {
639 LOG.ignore(e);
640 if (_key!=null && _key.isValid())
641 {
642 _key.cancel();
643 }
644
645 if (_open)
646 {
647 _selectSet.destroyEndPoint(this);
648 }
649 _open=false;
650 _key = null;
651 }
652 }
653 }
654 else
655 {
656 _key.interestOps(_interestOps);
657 }
658 }
659 else
660 {
661 if (_key!=null && _key.isValid())
662 _key.interestOps(0);
663 else
664 _key=null;
665 }
666 }
667 else
668 {
669 if (_key!=null && _key.isValid())
670 _key.cancel();
671
672 if (_open)
673 {
674 _open=false;
675 _selectSet.destroyEndPoint(this);
676 }
677 _key = null;
678 }
679 }
680 }
681
682 /* ------------------------------------------------------------ */
683 /*
684 */
685 protected void handle()
686 {
687 boolean dispatched=true;
688 try
689 {
690 while(dispatched)
691 {
692 try
693 {
694 while(true)
695 {
696 final AsyncConnection next = (AsyncConnection)_connection.handle();
697 if (next!=_connection)
698 {
699 LOG.debug("{} replaced {}",next,_connection);
700 Connection old=_connection;
701 _connection=next;
702 _manager.endPointUpgraded(this,old);
703 continue;
704 }
705 break;
706 }
707 }
708 catch (ClosedChannelException e)
709 {
710 LOG.ignore(e);
711 }
712 catch (EofException e)
713 {
714 LOG.debug("EOF", e);
715 try{close();}
716 catch(IOException e2){LOG.ignore(e2);}
717 }
718 catch (IOException e)
719 {
720 LOG.warn(e.toString());
721 try{close();}
722 catch(IOException e2){LOG.ignore(e2);}
723 }
724 catch (Throwable e)
725 {
726 LOG.warn("handle failed", e);
727 try{close();}
728 catch(IOException e2){LOG.ignore(e2);}
729 }
730 finally
731 {
732 if (!_ishut && isInputShutdown() && isOpen())
733 {
734 _ishut=true;
735 try
736 {
737 _connection.onInputShutdown();
738 }
739 catch(Throwable x)
740 {
741 LOG.warn("onInputShutdown failed", x);
742 try{close();}
743 catch(IOException e2){LOG.ignore(e2);}
744 }
745 finally
746 {
747 updateKey();
748 }
749 }
750 dispatched=!undispatch();
751 }
752 }
753 }
754 finally
755 {
756 if (dispatched)
757 {
758 dispatched=!undispatch();
759 while (dispatched)
760 {
761 LOG.warn("SCEP.run() finally DISPATCHED");
762 dispatched=!undispatch();
763 }
764 }
765 }
766 }
767
768 /* ------------------------------------------------------------ */
769 /*
770 * @see org.eclipse.io.nio.ChannelEndPoint#close()
771 */
772 @Override
773 public void close() throws IOException
774 {
775 // On unix systems there is a JVM issue that if you cancel before closing, it can
776 // cause the selector to block waiting for a channel to close and that channel can
777 // block waiting for the remote end. But on windows, if you don't cancel before a
778 // close, then the selector can block anyway!
779 // https://bugs.eclipse.org/bugs/show_bug.cgi?id=357318
780 if (WORK_AROUND_JVM_BUG_6346658)
781 {
782 try
783 {
784 SelectionKey key = _key;
785 if (key!=null)
786 key.cancel();
787 }
788 catch (Throwable e)
789 {
790 LOG.ignore(e);
791 }
792 }
793
794 try
795 {
796 super.close();
797 }
798 catch (IOException e)
799 {
800 LOG.ignore(e);
801 }
802 finally
803 {
804 updateKey();
805 }
806 }
807
808 /* ------------------------------------------------------------ */
809 @Override
810 public String toString()
811 {
812 // Do NOT use synchronized (this)
813 // because it's very easy to deadlock when debugging is enabled.
814 // We do a best effort to print the right toString() and that's it.
815 SelectionKey key = _key;
816 String keyString = "";
817 if (key != null)
818 {
819 if (key.isValid())
820 {
821 if (key.isReadable())
822 keyString += "r";
823 if (key.isWritable())
824 keyString += "w";
825 }
826 else
827 {
828 keyString += "!";
829 }
830 }
831 else
832 {
833 keyString += "-";
834 }
835 return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}",
836 hashCode(),
837 _socket.getRemoteSocketAddress(),
838 _socket.getLocalSocketAddress(),
839 _state,
840 isOpen(),
841 isInputShutdown(),
842 isOutputShutdown(),
843 _readBlocked,
844 _writeBlocked,
845 _writable,
846 _interestOps,
847 keyString,
848 _connection);
849 }
850
851 /* ------------------------------------------------------------ */
852 public SelectSet getSelectSet()
853 {
854 return _selectSet;
855 }
856
857 /* ------------------------------------------------------------ */
858 /**
859 * Don't set the SoTimeout
860 * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
861 */
862 @Override
863 public void setMaxIdleTime(int timeMs) throws IOException
864 {
865 _maxIdleTime=timeMs;
866 }
867
868}