blob: da53c0365b9400581931b83324d0cc8267597114 [file] [log] [blame]
Jake Slack03928ae2014-05-13 18:41:56 -07001//
2// ========================================================================
3// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4// ------------------------------------------------------------------------
5// All rights reserved. This program and the accompanying materials
6// are made available under the terms of the Eclipse Public License v1.0
7// and Apache License v2.0 which accompanies this distribution.
8//
9// The Eclipse Public License is available at
10// http://www.eclipse.org/legal/epl-v10.html
11//
12// The Apache License v2.0 is available at
13// http://www.opensource.org/licenses/apache2.0.php
14//
15// You may elect to redistribute this code under either of these licenses.
16// ========================================================================
17//
18
19
20package org.eclipse.jetty.util.thread;
21
22import java.io.IOException;
23import java.util.ArrayList;
24import java.util.Arrays;
25import java.util.List;
26import java.util.concurrent.ArrayBlockingQueue;
27import java.util.concurrent.BlockingQueue;
28import java.util.concurrent.ConcurrentLinkedQueue;
29import java.util.concurrent.Executor;
30import java.util.concurrent.RejectedExecutionException;
31import java.util.concurrent.TimeUnit;
32import java.util.concurrent.atomic.AtomicInteger;
33import java.util.concurrent.atomic.AtomicLong;
34
35import org.eclipse.jetty.util.BlockingArrayQueue;
36import org.eclipse.jetty.util.component.AbstractLifeCycle;
37import org.eclipse.jetty.util.component.AggregateLifeCycle;
38import org.eclipse.jetty.util.component.Dumpable;
39import org.eclipse.jetty.util.component.LifeCycle;
40import org.eclipse.jetty.util.log.Log;
41import org.eclipse.jetty.util.log.Logger;
42import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
43
44public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Executor, Dumpable
45{
46 private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
47
48 private final AtomicInteger _threadsStarted = new AtomicInteger();
49 private final AtomicInteger _threadsIdle = new AtomicInteger();
50 private final AtomicLong _lastShrink = new AtomicLong();
51 private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>();
52 private final Object _joinLock = new Object();
53 private BlockingQueue<Runnable> _jobs;
54 private String _name;
55 private int _maxIdleTimeMs=60000;
56 private int _maxThreads=254;
57 private int _minThreads=8;
58 private int _maxQueued=-1;
59 private int _priority=Thread.NORM_PRIORITY;
60 private boolean _daemon=false;
61 private int _maxStopTime=100;
62 private boolean _detailedDump=false;
63
64 /* ------------------------------------------------------------------- */
65 /** Construct
66 */
67 public QueuedThreadPool()
68 {
69 _name="qtp"+super.hashCode();
70 }
71
72 /* ------------------------------------------------------------------- */
73 /** Construct
74 */
75 public QueuedThreadPool(int maxThreads)
76 {
77 this();
78 setMaxThreads(maxThreads);
79 }
80
81 /* ------------------------------------------------------------------- */
82 /** Construct
83 */
84 public QueuedThreadPool(BlockingQueue<Runnable> jobQ)
85 {
86 this();
87 _jobs=jobQ;
88 _jobs.clear();
89 }
90
91
92 /* ------------------------------------------------------------ */
93 @Override
94 protected void doStart() throws Exception
95 {
96 super.doStart();
97 _threadsStarted.set(0);
98
99 if (_jobs==null)
100 {
101 _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
102 :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
103 }
104
105 int threads=_threadsStarted.get();
106 while (isRunning() && threads<_minThreads)
107 {
108 startThread(threads);
109 threads=_threadsStarted.get();
110 }
111 }
112
113 /* ------------------------------------------------------------ */
114 @Override
115 protected void doStop() throws Exception
116 {
117 super.doStop();
118 long start=System.currentTimeMillis();
119
120 // let jobs complete naturally for a while
121 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2))
122 Thread.sleep(1);
123
124 // kill queued jobs and flush out idle jobs
125 _jobs.clear();
126 Runnable noop = new Runnable(){public void run(){}};
127 for (int i=_threadsIdle.get();i-->0;)
128 _jobs.offer(noop);
129 Thread.yield();
130
131 // interrupt remaining threads
132 if (_threadsStarted.get()>0)
133 for (Thread thread : _threads)
134 thread.interrupt();
135
136 // wait for remaining threads to die
137 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime)
138 {
139 Thread.sleep(1);
140 }
141 Thread.yield();
142 int size=_threads.size();
143 if (size>0)
144 {
145 LOG.warn(size+" threads could not be stopped");
146
147 if (size==1 || LOG.isDebugEnabled())
148 {
149 for (Thread unstopped : _threads)
150 {
151 LOG.info("Couldn't stop "+unstopped);
152 for (StackTraceElement element : unstopped.getStackTrace())
153 {
154 LOG.info(" at "+element);
155 }
156 }
157 }
158 }
159
160 synchronized (_joinLock)
161 {
162 _joinLock.notifyAll();
163 }
164 }
165
166 /* ------------------------------------------------------------ */
167 /**
168 * Delegated to the named or anonymous Pool.
169 */
170 public void setDaemon(boolean daemon)
171 {
172 _daemon=daemon;
173 }
174
175 /* ------------------------------------------------------------ */
176 /** Set the maximum thread idle time.
177 * Threads that are idle for longer than this period may be
178 * stopped.
179 * Delegated to the named or anonymous Pool.
180 * @see #getMaxIdleTimeMs
181 * @param maxIdleTimeMs Max idle time in ms.
182 */
183 public void setMaxIdleTimeMs(int maxIdleTimeMs)
184 {
185 _maxIdleTimeMs=maxIdleTimeMs;
186 }
187
188 /* ------------------------------------------------------------ */
189 /**
190 * @param stopTimeMs maximum total time that stop() will wait for threads to die.
191 */
192 public void setMaxStopTimeMs(int stopTimeMs)
193 {
194 _maxStopTime = stopTimeMs;
195 }
196
197 /* ------------------------------------------------------------ */
198 /** Set the maximum number of threads.
199 * Delegated to the named or anonymous Pool.
200 * @see #getMaxThreads
201 * @param maxThreads maximum number of threads.
202 */
203 public void setMaxThreads(int maxThreads)
204 {
205 _maxThreads=maxThreads;
206 if (_minThreads>_maxThreads)
207 _minThreads=_maxThreads;
208 }
209
210 /* ------------------------------------------------------------ */
211 /** Set the minimum number of threads.
212 * Delegated to the named or anonymous Pool.
213 * @see #getMinThreads
214 * @param minThreads minimum number of threads
215 */
216 public void setMinThreads(int minThreads)
217 {
218 _minThreads=minThreads;
219
220 if (_minThreads>_maxThreads)
221 _maxThreads=_minThreads;
222
223 int threads=_threadsStarted.get();
224 while (isStarted() && threads<_minThreads)
225 {
226 startThread(threads);
227 threads=_threadsStarted.get();
228 }
229 }
230
231 /* ------------------------------------------------------------ */
232 /**
233 * @param name Name of the BoundedThreadPool to use when naming Threads.
234 */
235 public void setName(String name)
236 {
237 if (isRunning())
238 throw new IllegalStateException("started");
239 _name= name;
240 }
241
242 /* ------------------------------------------------------------ */
243 /** Set the priority of the pool threads.
244 * @param priority the new thread priority.
245 */
246 public void setThreadsPriority(int priority)
247 {
248 _priority=priority;
249 }
250
251 /* ------------------------------------------------------------ */
252 /**
253 * @return maximum queue size
254 */
255 public int getMaxQueued()
256 {
257 return _maxQueued;
258 }
259
260 /* ------------------------------------------------------------ */
261 /**
262 * @param max job queue size
263 */
264 public void setMaxQueued(int max)
265 {
266 if (isRunning())
267 throw new IllegalStateException("started");
268 _maxQueued=max;
269 }
270
271 /* ------------------------------------------------------------ */
272 /** Get the maximum thread idle time.
273 * Delegated to the named or anonymous Pool.
274 * @see #setMaxIdleTimeMs
275 * @return Max idle time in ms.
276 */
277 public int getMaxIdleTimeMs()
278 {
279 return _maxIdleTimeMs;
280 }
281
282 /* ------------------------------------------------------------ */
283 /**
284 * @return maximum total time that stop() will wait for threads to die.
285 */
286 public int getMaxStopTimeMs()
287 {
288 return _maxStopTime;
289 }
290
291 /* ------------------------------------------------------------ */
292 /** Set the maximum number of threads.
293 * Delegated to the named or anonymous Pool.
294 * @see #setMaxThreads
295 * @return maximum number of threads.
296 */
297 public int getMaxThreads()
298 {
299 return _maxThreads;
300 }
301
302 /* ------------------------------------------------------------ */
303 /** Get the minimum number of threads.
304 * Delegated to the named or anonymous Pool.
305 * @see #setMinThreads
306 * @return minimum number of threads.
307 */
308 public int getMinThreads()
309 {
310 return _minThreads;
311 }
312
313 /* ------------------------------------------------------------ */
314 /**
315 * @return The name of the BoundedThreadPool.
316 */
317 public String getName()
318 {
319 return _name;
320 }
321
322 /* ------------------------------------------------------------ */
323 /** Get the priority of the pool threads.
324 * @return the priority of the pool threads.
325 */
326 public int getThreadsPriority()
327 {
328 return _priority;
329 }
330
331 /* ------------------------------------------------------------ */
332 /**
333 * Delegated to the named or anonymous Pool.
334 */
335 public boolean isDaemon()
336 {
337 return _daemon;
338 }
339
340 /* ------------------------------------------------------------ */
341 public boolean isDetailedDump()
342 {
343 return _detailedDump;
344 }
345
346 /* ------------------------------------------------------------ */
347 public void setDetailedDump(boolean detailedDump)
348 {
349 _detailedDump = detailedDump;
350 }
351
352 /* ------------------------------------------------------------ */
353 public boolean dispatch(Runnable job)
354 {
355 if (isRunning())
356 {
357 final int jobQ = _jobs.size();
358 final int idle = getIdleThreads();
359 if(_jobs.offer(job))
360 {
361 // If we had no idle threads or the jobQ is greater than the idle threads
362 if (idle==0 || jobQ>idle)
363 {
364 int threads=_threadsStarted.get();
365 if (threads<_maxThreads)
366 startThread(threads);
367 }
368 return true;
369 }
370 }
371 LOG.debug("Dispatched {} to stopped {}",job,this);
372 return false;
373 }
374
375 /* ------------------------------------------------------------ */
376 public void execute(Runnable job)
377 {
378 if (!dispatch(job))
379 throw new RejectedExecutionException();
380 }
381
382 /* ------------------------------------------------------------ */
383 /**
384 * Blocks until the thread pool is {@link LifeCycle#stop stopped}.
385 */
386 public void join() throws InterruptedException
387 {
388 synchronized (_joinLock)
389 {
390 while (isRunning())
391 _joinLock.wait();
392 }
393
394 while (isStopping())
395 Thread.sleep(1);
396 }
397
398 /* ------------------------------------------------------------ */
399 /**
400 * @return The total number of threads currently in the pool
401 */
402 public int getThreads()
403 {
404 return _threadsStarted.get();
405 }
406
407 /* ------------------------------------------------------------ */
408 /**
409 * @return The number of idle threads in the pool
410 */
411 public int getIdleThreads()
412 {
413 return _threadsIdle.get();
414 }
415
416 /* ------------------------------------------------------------ */
417 /**
418 * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
419 */
420 public boolean isLowOnThreads()
421 {
422 return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get();
423 }
424
425 /* ------------------------------------------------------------ */
426 private boolean startThread(int threads)
427 {
428 final int next=threads+1;
429 if (!_threadsStarted.compareAndSet(threads,next))
430 return false;
431
432 boolean started=false;
433 try
434 {
435 Thread thread=newThread(_runnable);
436 thread.setDaemon(_daemon);
437 thread.setPriority(_priority);
438 thread.setName(_name+"-"+thread.getId());
439 _threads.add(thread);
440
441 thread.start();
442 started=true;
443 }
444 finally
445 {
446 if (!started)
447 _threadsStarted.decrementAndGet();
448 }
449 return started;
450 }
451
452 /* ------------------------------------------------------------ */
453 protected Thread newThread(Runnable runnable)
454 {
455 return new Thread(runnable);
456 }
457
458
459 /* ------------------------------------------------------------ */
460 public String dump()
461 {
462 return AggregateLifeCycle.dump(this);
463 }
464
465 /* ------------------------------------------------------------ */
466 public void dump(Appendable out, String indent) throws IOException
467 {
468 List<Object> dump = new ArrayList<Object>(getMaxThreads());
469 for (final Thread thread: _threads)
470 {
471 final StackTraceElement[] trace=thread.getStackTrace();
472 boolean inIdleJobPoll=false;
473 // trace can be null on early java 6 jvms
474 if (trace != null)
475 {
476 for (StackTraceElement t : trace)
477 {
478 if ("idleJobPoll".equals(t.getMethodName()))
479 {
480 inIdleJobPoll = true;
481 break;
482 }
483 }
484 }
485 final boolean idle=inIdleJobPoll;
486
487 if (_detailedDump)
488 {
489 dump.add(new Dumpable()
490 {
491 public void dump(Appendable out, String indent) throws IOException
492 {
493 out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n');
494 if (!idle)
495 AggregateLifeCycle.dump(out,indent,Arrays.asList(trace));
496 }
497
498 public String dump()
499 {
500 return null;
501 }
502 });
503 }
504 else
505 {
506 dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":""));
507 }
508 }
509
510 AggregateLifeCycle.dumpObject(out,this);
511 AggregateLifeCycle.dump(out,indent,dump);
512
513 }
514
515
516 /* ------------------------------------------------------------ */
517 @Override
518 public String toString()
519 {
520 return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}";
521 }
522
523 /* ------------------------------------------------------------ */
524 private Runnable idleJobPoll() throws InterruptedException
525 {
526 return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS);
527 }
528
529 /* ------------------------------------------------------------ */
530 private Runnable _runnable = new Runnable()
531 {
532 public void run()
533 {
534 boolean shrink=false;
535 try
536 {
537 Runnable job=_jobs.poll();
538 while (isRunning())
539 {
540 // Job loop
541 while (job!=null && isRunning())
542 {
543 runJob(job);
544 job=_jobs.poll();
545 }
546
547 // Idle loop
548 try
549 {
550 _threadsIdle.incrementAndGet();
551
552 while (isRunning() && job==null)
553 {
554 if (_maxIdleTimeMs<=0)
555 job=_jobs.take();
556 else
557 {
558 // maybe we should shrink?
559 final int size=_threadsStarted.get();
560 if (size>_minThreads)
561 {
562 long last=_lastShrink.get();
563 long now=System.currentTimeMillis();
564 if (last==0 || (now-last)>_maxIdleTimeMs)
565 {
566 shrink=_lastShrink.compareAndSet(last,now) &&
567 _threadsStarted.compareAndSet(size,size-1);
568 if (shrink)
569 return;
570 }
571 }
572 job=idleJobPoll();
573 }
574 }
575 }
576 finally
577 {
578 _threadsIdle.decrementAndGet();
579 }
580 }
581 }
582 catch(InterruptedException e)
583 {
584 LOG.ignore(e);
585 }
586 catch(Exception e)
587 {
588 LOG.warn(e);
589 }
590 finally
591 {
592 if (!shrink)
593 _threadsStarted.decrementAndGet();
594 _threads.remove(Thread.currentThread());
595 }
596 }
597 };
598
599 /* ------------------------------------------------------------ */
600 /**
601 * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
602 * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p>
603 *
604 * @param job the job to run
605 */
606 protected void runJob(Runnable job)
607 {
608 job.run();
609 }
610
611 /* ------------------------------------------------------------ */
612 /**
613 * @return the job queue
614 */
615 protected BlockingQueue<Runnable> getQueue()
616 {
617 return _jobs;
618 }
619
620 /* ------------------------------------------------------------ */
621 /**
622 * @param id The thread ID to stop.
623 * @return true if the thread was found and stopped.
624 * @deprecated Use {@link #interruptThread(long)} in preference
625 */
626 @Deprecated
627 public boolean stopThread(long id)
628 {
629 for (Thread thread: _threads)
630 {
631 if (thread.getId()==id)
632 {
633 thread.stop();
634 return true;
635 }
636 }
637 return false;
638 }
639
640 /* ------------------------------------------------------------ */
641 /**
642 * @param id The thread ID to interrupt.
643 * @return true if the thread was found and interrupted.
644 */
645 public boolean interruptThread(long id)
646 {
647 for (Thread thread: _threads)
648 {
649 if (thread.getId()==id)
650 {
651 thread.interrupt();
652 return true;
653 }
654 }
655 return false;
656 }
657
658 /* ------------------------------------------------------------ */
659 /**
660 * @param id The thread ID to interrupt.
661 * @return true if the thread was found and interrupted.
662 */
663 public String dumpThread(long id)
664 {
665 for (Thread thread: _threads)
666 {
667 if (thread.getId()==id)
668 {
669 StringBuilder buf = new StringBuilder();
670 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
671 for (StackTraceElement element : thread.getStackTrace())
672 buf.append(" at ").append(element.toString()).append('\n');
673 return buf.toString();
674 }
675 }
676 return null;
677 }
678}