blob: 225448bf561fda8f4ff081af48ec5964fb38e039 [file] [log] [blame]
Andrew Hsieh83760d22013-06-18 12:24:28 -07001"""Thread module emulating a subset of Java's threading model."""
2
3import sys as _sys
4
5try:
6 import thread
7except ImportError:
8 del _sys.modules[__name__]
9 raise
10
11import warnings
12
13from collections import deque as _deque
14from time import time as _time, sleep as _sleep
15from traceback import format_exc as _format_exc
16
17# Note regarding PEP 8 compliant aliases
18# This threading model was originally inspired by Java, and inherited
19# the convention of camelCase function and method names from that
20# language. While those names are not in any imminent danger of being
21# deprecated, starting with Python 2.6, the module now provides a
22# PEP 8 compliant alias for any such method name.
23# Using the new PEP 8 compliant names also facilitates substitution
24# with the multiprocessing module, which doesn't provide the old
25# Java inspired names.
26
27
28# Rename some stuff so "from threading import *" is safe
29__all__ = ['activeCount', 'active_count', 'Condition', 'currentThread',
30 'current_thread', 'enumerate', 'Event',
31 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
32 'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
33
34_start_new_thread = thread.start_new_thread
35_allocate_lock = thread.allocate_lock
36_get_ident = thread.get_ident
37ThreadError = thread.error
38del thread
39
40
41# sys.exc_clear is used to work around the fact that except blocks
42# don't fully clear the exception until 3.0.
43warnings.filterwarnings('ignore', category=DeprecationWarning,
44 module='threading', message='sys.exc_clear')
45
46# Debug support (adapted from ihooks.py).
47# All the major classes here derive from _Verbose. We force that to
48# be a new-style class so that all the major classes here are new-style.
49# This helps debugging (type(instance) is more revealing for instances
50# of new-style classes).
51
52_VERBOSE = False
53
54if __debug__:
55
56 class _Verbose(object):
57
58 def __init__(self, verbose=None):
59 if verbose is None:
60 verbose = _VERBOSE
61 self.__verbose = verbose
62
63 def _note(self, format, *args):
64 if self.__verbose:
65 format = format % args
66 # Issue #4188: calling current_thread() can incur an infinite
67 # recursion if it has to create a DummyThread on the fly.
68 ident = _get_ident()
69 try:
70 name = _active[ident].name
71 except KeyError:
72 name = "<OS thread %d>" % ident
73 format = "%s: %s\n" % (name, format)
74 _sys.stderr.write(format)
75
76else:
77 # Disable this when using "python -O"
78 class _Verbose(object):
79 def __init__(self, verbose=None):
80 pass
81 def _note(self, *args):
82 pass
83
84# Support for profile and trace hooks
85
86_profile_hook = None
87_trace_hook = None
88
89def setprofile(func):
90 """Set a profile function for all threads started from the threading module.
91
92 The func will be passed to sys.setprofile() for each thread, before its
93 run() method is called.
94
95 """
96 global _profile_hook
97 _profile_hook = func
98
99def settrace(func):
100 """Set a trace function for all threads started from the threading module.
101
102 The func will be passed to sys.settrace() for each thread, before its run()
103 method is called.
104
105 """
106 global _trace_hook
107 _trace_hook = func
108
109# Synchronization classes
110
111Lock = _allocate_lock
112
113def RLock(*args, **kwargs):
114 """Factory function that returns a new reentrant lock.
115
116 A reentrant lock must be released by the thread that acquired it. Once a
117 thread has acquired a reentrant lock, the same thread may acquire it again
118 without blocking; the thread must release it once for each time it has
119 acquired it.
120
121 """
122 return _RLock(*args, **kwargs)
123
124class _RLock(_Verbose):
125 """A reentrant lock must be released by the thread that acquired it. Once a
126 thread has acquired a reentrant lock, the same thread may acquire it
127 again without blocking; the thread must release it once for each time it
128 has acquired it.
129 """
130
131 def __init__(self, verbose=None):
132 _Verbose.__init__(self, verbose)
133 self.__block = _allocate_lock()
134 self.__owner = None
135 self.__count = 0
136
137 def __repr__(self):
138 owner = self.__owner
139 try:
140 owner = _active[owner].name
141 except KeyError:
142 pass
143 return "<%s owner=%r count=%d>" % (
144 self.__class__.__name__, owner, self.__count)
145
146 def acquire(self, blocking=1):
147 """Acquire a lock, blocking or non-blocking.
148
149 When invoked without arguments: if this thread already owns the lock,
150 increment the recursion level by one, and return immediately. Otherwise,
151 if another thread owns the lock, block until the lock is unlocked. Once
152 the lock is unlocked (not owned by any thread), then grab ownership, set
153 the recursion level to one, and return. If more than one thread is
154 blocked waiting until the lock is unlocked, only one at a time will be
155 able to grab ownership of the lock. There is no return value in this
156 case.
157
158 When invoked with the blocking argument set to true, do the same thing
159 as when called without arguments, and return true.
160
161 When invoked with the blocking argument set to false, do not block. If a
162 call without an argument would block, return false immediately;
163 otherwise, do the same thing as when called without arguments, and
164 return true.
165
166 """
167 me = _get_ident()
168 if self.__owner == me:
169 self.__count = self.__count + 1
170 if __debug__:
171 self._note("%s.acquire(%s): recursive success", self, blocking)
172 return 1
173 rc = self.__block.acquire(blocking)
174 if rc:
175 self.__owner = me
176 self.__count = 1
177 if __debug__:
178 self._note("%s.acquire(%s): initial success", self, blocking)
179 else:
180 if __debug__:
181 self._note("%s.acquire(%s): failure", self, blocking)
182 return rc
183
184 __enter__ = acquire
185
186 def release(self):
187 """Release a lock, decrementing the recursion level.
188
189 If after the decrement it is zero, reset the lock to unlocked (not owned
190 by any thread), and if any other threads are blocked waiting for the
191 lock to become unlocked, allow exactly one of them to proceed. If after
192 the decrement the recursion level is still nonzero, the lock remains
193 locked and owned by the calling thread.
194
195 Only call this method when the calling thread owns the lock. A
196 RuntimeError is raised if this method is called when the lock is
197 unlocked.
198
199 There is no return value.
200
201 """
202 if self.__owner != _get_ident():
203 raise RuntimeError("cannot release un-acquired lock")
204 self.__count = count = self.__count - 1
205 if not count:
206 self.__owner = None
207 self.__block.release()
208 if __debug__:
209 self._note("%s.release(): final release", self)
210 else:
211 if __debug__:
212 self._note("%s.release(): non-final release", self)
213
214 def __exit__(self, t, v, tb):
215 self.release()
216
217 # Internal methods used by condition variables
218
219 def _acquire_restore(self, count_owner):
220 count, owner = count_owner
221 self.__block.acquire()
222 self.__count = count
223 self.__owner = owner
224 if __debug__:
225 self._note("%s._acquire_restore()", self)
226
227 def _release_save(self):
228 if __debug__:
229 self._note("%s._release_save()", self)
230 count = self.__count
231 self.__count = 0
232 owner = self.__owner
233 self.__owner = None
234 self.__block.release()
235 return (count, owner)
236
237 def _is_owned(self):
238 return self.__owner == _get_ident()
239
240
241def Condition(*args, **kwargs):
242 """Factory function that returns a new condition variable object.
243
244 A condition variable allows one or more threads to wait until they are
245 notified by another thread.
246
247 If the lock argument is given and not None, it must be a Lock or RLock
248 object, and it is used as the underlying lock. Otherwise, a new RLock object
249 is created and used as the underlying lock.
250
251 """
252 return _Condition(*args, **kwargs)
253
254class _Condition(_Verbose):
255 """Condition variables allow one or more threads to wait until they are
256 notified by another thread.
257 """
258
259 def __init__(self, lock=None, verbose=None):
260 _Verbose.__init__(self, verbose)
261 if lock is None:
262 lock = RLock()
263 self.__lock = lock
264 # Export the lock's acquire() and release() methods
265 self.acquire = lock.acquire
266 self.release = lock.release
267 # If the lock defines _release_save() and/or _acquire_restore(),
268 # these override the default implementations (which just call
269 # release() and acquire() on the lock). Ditto for _is_owned().
270 try:
271 self._release_save = lock._release_save
272 except AttributeError:
273 pass
274 try:
275 self._acquire_restore = lock._acquire_restore
276 except AttributeError:
277 pass
278 try:
279 self._is_owned = lock._is_owned
280 except AttributeError:
281 pass
282 self.__waiters = []
283
284 def __enter__(self):
285 return self.__lock.__enter__()
286
287 def __exit__(self, *args):
288 return self.__lock.__exit__(*args)
289
290 def __repr__(self):
291 return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
292
293 def _release_save(self):
294 self.__lock.release() # No state to save
295
296 def _acquire_restore(self, x):
297 self.__lock.acquire() # Ignore saved state
298
299 def _is_owned(self):
300 # Return True if lock is owned by current_thread.
301 # This method is called only if __lock doesn't have _is_owned().
302 if self.__lock.acquire(0):
303 self.__lock.release()
304 return False
305 else:
306 return True
307
308 def wait(self, timeout=None):
309 """Wait until notified or until a timeout occurs.
310
311 If the calling thread has not acquired the lock when this method is
312 called, a RuntimeError is raised.
313
314 This method releases the underlying lock, and then blocks until it is
315 awakened by a notify() or notifyAll() call for the same condition
316 variable in another thread, or until the optional timeout occurs. Once
317 awakened or timed out, it re-acquires the lock and returns.
318
319 When the timeout argument is present and not None, it should be a
320 floating point number specifying a timeout for the operation in seconds
321 (or fractions thereof).
322
323 When the underlying lock is an RLock, it is not released using its
324 release() method, since this may not actually unlock the lock when it
325 was acquired multiple times recursively. Instead, an internal interface
326 of the RLock class is used, which really unlocks it even when it has
327 been recursively acquired several times. Another internal interface is
328 then used to restore the recursion level when the lock is reacquired.
329
330 """
331 if not self._is_owned():
332 raise RuntimeError("cannot wait on un-acquired lock")
333 waiter = _allocate_lock()
334 waiter.acquire()
335 self.__waiters.append(waiter)
336 saved_state = self._release_save()
337 try: # restore state no matter what (e.g., KeyboardInterrupt)
338 if timeout is None:
339 waiter.acquire()
340 if __debug__:
341 self._note("%s.wait(): got it", self)
342 else:
343 # Balancing act: We can't afford a pure busy loop, so we
344 # have to sleep; but if we sleep the whole timeout time,
345 # we'll be unresponsive. The scheme here sleeps very
346 # little at first, longer as time goes on, but never longer
347 # than 20 times per second (or the timeout time remaining).
348 endtime = _time() + timeout
349 delay = 0.0005 # 500 us -> initial delay of 1 ms
350 while True:
351 gotit = waiter.acquire(0)
352 if gotit:
353 break
354 remaining = endtime - _time()
355 if remaining <= 0:
356 break
357 delay = min(delay * 2, remaining, .05)
358 _sleep(delay)
359 if not gotit:
360 if __debug__:
361 self._note("%s.wait(%s): timed out", self, timeout)
362 try:
363 self.__waiters.remove(waiter)
364 except ValueError:
365 pass
366 else:
367 if __debug__:
368 self._note("%s.wait(%s): got it", self, timeout)
369 finally:
370 self._acquire_restore(saved_state)
371
372 def notify(self, n=1):
373 """Wake up one or more threads waiting on this condition, if any.
374
375 If the calling thread has not acquired the lock when this method is
376 called, a RuntimeError is raised.
377
378 This method wakes up at most n of the threads waiting for the condition
379 variable; it is a no-op if no threads are waiting.
380
381 """
382 if not self._is_owned():
383 raise RuntimeError("cannot notify on un-acquired lock")
384 __waiters = self.__waiters
385 waiters = __waiters[:n]
386 if not waiters:
387 if __debug__:
388 self._note("%s.notify(): no waiters", self)
389 return
390 self._note("%s.notify(): notifying %d waiter%s", self, n,
391 n!=1 and "s" or "")
392 for waiter in waiters:
393 waiter.release()
394 try:
395 __waiters.remove(waiter)
396 except ValueError:
397 pass
398
399 def notifyAll(self):
400 """Wake up all threads waiting on this condition.
401
402 If the calling thread has not acquired the lock when this method
403 is called, a RuntimeError is raised.
404
405 """
406 self.notify(len(self.__waiters))
407
408 notify_all = notifyAll
409
410
411def Semaphore(*args, **kwargs):
412 """A factory function that returns a new semaphore.
413
414 Semaphores manage a counter representing the number of release() calls minus
415 the number of acquire() calls, plus an initial value. The acquire() method
416 blocks if necessary until it can return without making the counter
417 negative. If not given, value defaults to 1.
418
419 """
420 return _Semaphore(*args, **kwargs)
421
422class _Semaphore(_Verbose):
423 """Semaphores manage a counter representing the number of release() calls
424 minus the number of acquire() calls, plus an initial value. The acquire()
425 method blocks if necessary until it can return without making the counter
426 negative. If not given, value defaults to 1.
427
428 """
429
430 # After Tim Peters' semaphore class, but not quite the same (no maximum)
431
432 def __init__(self, value=1, verbose=None):
433 if value < 0:
434 raise ValueError("semaphore initial value must be >= 0")
435 _Verbose.__init__(self, verbose)
436 self.__cond = Condition(Lock())
437 self.__value = value
438
439 def acquire(self, blocking=1):
440 """Acquire a semaphore, decrementing the internal counter by one.
441
442 When invoked without arguments: if the internal counter is larger than
443 zero on entry, decrement it by one and return immediately. If it is zero
444 on entry, block, waiting until some other thread has called release() to
445 make it larger than zero. This is done with proper interlocking so that
446 if multiple acquire() calls are blocked, release() will wake exactly one
447 of them up. The implementation may pick one at random, so the order in
448 which blocked threads are awakened should not be relied on. There is no
449 return value in this case.
450
451 When invoked with blocking set to true, do the same thing as when called
452 without arguments, and return true.
453
454 When invoked with blocking set to false, do not block. If a call without
455 an argument would block, return false immediately; otherwise, do the
456 same thing as when called without arguments, and return true.
457
458 """
459 rc = False
460 with self.__cond:
461 while self.__value == 0:
462 if not blocking:
463 break
464 if __debug__:
465 self._note("%s.acquire(%s): blocked waiting, value=%s",
466 self, blocking, self.__value)
467 self.__cond.wait()
468 else:
469 self.__value = self.__value - 1
470 if __debug__:
471 self._note("%s.acquire: success, value=%s",
472 self, self.__value)
473 rc = True
474 return rc
475
476 __enter__ = acquire
477
478 def release(self):
479 """Release a semaphore, incrementing the internal counter by one.
480
481 When the counter is zero on entry and another thread is waiting for it
482 to become larger than zero again, wake up that thread.
483
484 """
485 with self.__cond:
486 self.__value = self.__value + 1
487 if __debug__:
488 self._note("%s.release: success, value=%s",
489 self, self.__value)
490 self.__cond.notify()
491
492 def __exit__(self, t, v, tb):
493 self.release()
494
495
496def BoundedSemaphore(*args, **kwargs):
497 """A factory function that returns a new bounded semaphore.
498
499 A bounded semaphore checks to make sure its current value doesn't exceed its
500 initial value. If it does, ValueError is raised. In most situations
501 semaphores are used to guard resources with limited capacity.
502
503 If the semaphore is released too many times it's a sign of a bug. If not
504 given, value defaults to 1.
505
506 Like regular semaphores, bounded semaphores manage a counter representing
507 the number of release() calls minus the number of acquire() calls, plus an
508 initial value. The acquire() method blocks if necessary until it can return
509 without making the counter negative. If not given, value defaults to 1.
510
511 """
512 return _BoundedSemaphore(*args, **kwargs)
513
514class _BoundedSemaphore(_Semaphore):
515 """A bounded semaphore checks to make sure its current value doesn't exceed
516 its initial value. If it does, ValueError is raised. In most situations
517 semaphores are used to guard resources with limited capacity.
518 """
519
520 def __init__(self, value=1, verbose=None):
521 _Semaphore.__init__(self, value, verbose)
522 self._initial_value = value
523
524 def release(self):
525 """Release a semaphore, incrementing the internal counter by one.
526
527 When the counter is zero on entry and another thread is waiting for it
528 to become larger than zero again, wake up that thread.
529
530 If the number of releases exceeds the number of acquires,
531 raise a ValueError.
532
533 """
534 if self._Semaphore__value >= self._initial_value:
535 raise ValueError("Semaphore released too many times")
536 return _Semaphore.release(self)
537
538
539def Event(*args, **kwargs):
540 """A factory function that returns a new event.
541
542 Events manage a flag that can be set to true with the set() method and reset
543 to false with the clear() method. The wait() method blocks until the flag is
544 true.
545
546 """
547 return _Event(*args, **kwargs)
548
549class _Event(_Verbose):
550 """A factory function that returns a new event object. An event manages a
551 flag that can be set to true with the set() method and reset to false
552 with the clear() method. The wait() method blocks until the flag is true.
553
554 """
555
556 # After Tim Peters' event class (without is_posted())
557
558 def __init__(self, verbose=None):
559 _Verbose.__init__(self, verbose)
560 self.__cond = Condition(Lock())
561 self.__flag = False
562
563 def _reset_internal_locks(self):
564 # private! called by Thread._reset_internal_locks by _after_fork()
565 self.__cond.__init__()
566
567 def isSet(self):
568 'Return true if and only if the internal flag is true.'
569 return self.__flag
570
571 is_set = isSet
572
573 def set(self):
574 """Set the internal flag to true.
575
576 All threads waiting for the flag to become true are awakened. Threads
577 that call wait() once the flag is true will not block at all.
578
579 """
580 self.__cond.acquire()
581 try:
582 self.__flag = True
583 self.__cond.notify_all()
584 finally:
585 self.__cond.release()
586
587 def clear(self):
588 """Reset the internal flag to false.
589
590 Subsequently, threads calling wait() will block until set() is called to
591 set the internal flag to true again.
592
593 """
594 self.__cond.acquire()
595 try:
596 self.__flag = False
597 finally:
598 self.__cond.release()
599
600 def wait(self, timeout=None):
601 """Block until the internal flag is true.
602
603 If the internal flag is true on entry, return immediately. Otherwise,
604 block until another thread calls set() to set the flag to true, or until
605 the optional timeout occurs.
606
607 When the timeout argument is present and not None, it should be a
608 floating point number specifying a timeout for the operation in seconds
609 (or fractions thereof).
610
611 This method returns the internal flag on exit, so it will always return
612 True except if a timeout is given and the operation times out.
613
614 """
615 self.__cond.acquire()
616 try:
617 if not self.__flag:
618 self.__cond.wait(timeout)
619 return self.__flag
620 finally:
621 self.__cond.release()
622
623# Helper to generate new thread names
624_counter = 0
625def _newname(template="Thread-%d"):
626 global _counter
627 _counter = _counter + 1
628 return template % _counter
629
630# Active thread administration
631_active_limbo_lock = _allocate_lock()
632_active = {} # maps thread id to Thread object
633_limbo = {}
634
635
636# Main class for threads
637
638class Thread(_Verbose):
639 """A class that represents a thread of control.
640
641 This class can be safely subclassed in a limited fashion.
642
643 """
644 __initialized = False
645 # Need to store a reference to sys.exc_info for printing
646 # out exceptions when a thread tries to use a global var. during interp.
647 # shutdown and thus raises an exception about trying to perform some
648 # operation on/with a NoneType
649 __exc_info = _sys.exc_info
650 # Keep sys.exc_clear too to clear the exception just before
651 # allowing .join() to return.
652 __exc_clear = _sys.exc_clear
653
654 def __init__(self, group=None, target=None, name=None,
655 args=(), kwargs=None, verbose=None):
656 """This constructor should always be called with keyword arguments. Arguments are:
657
658 *group* should be None; reserved for future extension when a ThreadGroup
659 class is implemented.
660
661 *target* is the callable object to be invoked by the run()
662 method. Defaults to None, meaning nothing is called.
663
664 *name* is the thread name. By default, a unique name is constructed of
665 the form "Thread-N" where N is a small decimal number.
666
667 *args* is the argument tuple for the target invocation. Defaults to ().
668
669 *kwargs* is a dictionary of keyword arguments for the target
670 invocation. Defaults to {}.
671
672 If a subclass overrides the constructor, it must make sure to invoke
673 the base class constructor (Thread.__init__()) before doing anything
674 else to the thread.
675
676"""
677 assert group is None, "group argument must be None for now"
678 _Verbose.__init__(self, verbose)
679 if kwargs is None:
680 kwargs = {}
681 self.__target = target
682 self.__name = str(name or _newname())
683 self.__args = args
684 self.__kwargs = kwargs
685 self.__daemonic = self._set_daemon()
686 self.__ident = None
687 self.__started = Event()
688 self.__stopped = False
689 self.__block = Condition(Lock())
690 self.__initialized = True
691 # sys.stderr is not stored in the class like
692 # sys.exc_info since it can be changed between instances
693 self.__stderr = _sys.stderr
694
695 def _reset_internal_locks(self):
696 # private! Called by _after_fork() to reset our internal locks as
697 # they may be in an invalid state leading to a deadlock or crash.
698 if hasattr(self, '_Thread__block'): # DummyThread deletes self.__block
699 self.__block.__init__()
700 self.__started._reset_internal_locks()
701
702 @property
703 def _block(self):
704 # used by a unittest
705 return self.__block
706
707 def _set_daemon(self):
708 # Overridden in _MainThread and _DummyThread
709 return current_thread().daemon
710
711 def __repr__(self):
712 assert self.__initialized, "Thread.__init__() was not called"
713 status = "initial"
714 if self.__started.is_set():
715 status = "started"
716 if self.__stopped:
717 status = "stopped"
718 if self.__daemonic:
719 status += " daemon"
720 if self.__ident is not None:
721 status += " %s" % self.__ident
722 return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
723
724 def start(self):
725 """Start the thread's activity.
726
727 It must be called at most once per thread object. It arranges for the
728 object's run() method to be invoked in a separate thread of control.
729
730 This method will raise a RuntimeError if called more than once on the
731 same thread object.
732
733 """
734 if not self.__initialized:
735 raise RuntimeError("thread.__init__() not called")
736 if self.__started.is_set():
737 raise RuntimeError("threads can only be started once")
738 if __debug__:
739 self._note("%s.start(): starting thread", self)
740 with _active_limbo_lock:
741 _limbo[self] = self
742 try:
743 _start_new_thread(self.__bootstrap, ())
744 except Exception:
745 with _active_limbo_lock:
746 del _limbo[self]
747 raise
748 self.__started.wait()
749
750 def run(self):
751 """Method representing the thread's activity.
752
753 You may override this method in a subclass. The standard run() method
754 invokes the callable object passed to the object's constructor as the
755 target argument, if any, with sequential and keyword arguments taken
756 from the args and kwargs arguments, respectively.
757
758 """
759 try:
760 if self.__target:
761 self.__target(*self.__args, **self.__kwargs)
762 finally:
763 # Avoid a refcycle if the thread is running a function with
764 # an argument that has a member that points to the thread.
765 del self.__target, self.__args, self.__kwargs
766
767 def __bootstrap(self):
768 # Wrapper around the real bootstrap code that ignores
769 # exceptions during interpreter cleanup. Those typically
770 # happen when a daemon thread wakes up at an unfortunate
771 # moment, finds the world around it destroyed, and raises some
772 # random exception *** while trying to report the exception in
773 # __bootstrap_inner() below ***. Those random exceptions
774 # don't help anybody, and they confuse users, so we suppress
775 # them. We suppress them only when it appears that the world
776 # indeed has already been destroyed, so that exceptions in
777 # __bootstrap_inner() during normal business hours are properly
778 # reported. Also, we only suppress them for daemonic threads;
779 # if a non-daemonic encounters this, something else is wrong.
780 try:
781 self.__bootstrap_inner()
782 except:
783 if self.__daemonic and _sys is None:
784 return
785 raise
786
787 def _set_ident(self):
788 self.__ident = _get_ident()
789
790 def __bootstrap_inner(self):
791 try:
792 self._set_ident()
793 self.__started.set()
794 with _active_limbo_lock:
795 _active[self.__ident] = self
796 del _limbo[self]
797 if __debug__:
798 self._note("%s.__bootstrap(): thread started", self)
799
800 if _trace_hook:
801 self._note("%s.__bootstrap(): registering trace hook", self)
802 _sys.settrace(_trace_hook)
803 if _profile_hook:
804 self._note("%s.__bootstrap(): registering profile hook", self)
805 _sys.setprofile(_profile_hook)
806
807 try:
808 self.run()
809 except SystemExit:
810 if __debug__:
811 self._note("%s.__bootstrap(): raised SystemExit", self)
812 except:
813 if __debug__:
814 self._note("%s.__bootstrap(): unhandled exception", self)
815 # If sys.stderr is no more (most likely from interpreter
816 # shutdown) use self.__stderr. Otherwise still use sys (as in
817 # _sys) in case sys.stderr was redefined since the creation of
818 # self.
819 if _sys:
820 _sys.stderr.write("Exception in thread %s:\n%s\n" %
821 (self.name, _format_exc()))
822 else:
823 # Do the best job possible w/o a huge amt. of code to
824 # approximate a traceback (code ideas from
825 # Lib/traceback.py)
826 exc_type, exc_value, exc_tb = self.__exc_info()
827 try:
828 print>>self.__stderr, (
829 "Exception in thread " + self.name +
830 " (most likely raised during interpreter shutdown):")
831 print>>self.__stderr, (
832 "Traceback (most recent call last):")
833 while exc_tb:
834 print>>self.__stderr, (
835 ' File "%s", line %s, in %s' %
836 (exc_tb.tb_frame.f_code.co_filename,
837 exc_tb.tb_lineno,
838 exc_tb.tb_frame.f_code.co_name))
839 exc_tb = exc_tb.tb_next
840 print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
841 # Make sure that exc_tb gets deleted since it is a memory
842 # hog; deleting everything else is just for thoroughness
843 finally:
844 del exc_type, exc_value, exc_tb
845 else:
846 if __debug__:
847 self._note("%s.__bootstrap(): normal return", self)
848 finally:
849 # Prevent a race in
850 # test_threading.test_no_refcycle_through_target when
851 # the exception keeps the target alive past when we
852 # assert that it's dead.
853 self.__exc_clear()
854 finally:
855 with _active_limbo_lock:
856 self.__stop()
857 try:
858 # We don't call self.__delete() because it also
859 # grabs _active_limbo_lock.
860 del _active[_get_ident()]
861 except:
862 pass
863
864 def __stop(self):
865 # DummyThreads delete self.__block, but they have no waiters to
866 # notify anyway (join() is forbidden on them).
867 if not hasattr(self, '_Thread__block'):
868 return
869 self.__block.acquire()
870 self.__stopped = True
871 self.__block.notify_all()
872 self.__block.release()
873
874 def __delete(self):
875 "Remove current thread from the dict of currently running threads."
876
877 # Notes about running with dummy_thread:
878 #
879 # Must take care to not raise an exception if dummy_thread is being
880 # used (and thus this module is being used as an instance of
881 # dummy_threading). dummy_thread.get_ident() always returns -1 since
882 # there is only one thread if dummy_thread is being used. Thus
883 # len(_active) is always <= 1 here, and any Thread instance created
884 # overwrites the (if any) thread currently registered in _active.
885 #
886 # An instance of _MainThread is always created by 'threading'. This
887 # gets overwritten the instant an instance of Thread is created; both
888 # threads return -1 from dummy_thread.get_ident() and thus have the
889 # same key in the dict. So when the _MainThread instance created by
890 # 'threading' tries to clean itself up when atexit calls this method
891 # it gets a KeyError if another Thread instance was created.
892 #
893 # This all means that KeyError from trying to delete something from
894 # _active if dummy_threading is being used is a red herring. But
895 # since it isn't if dummy_threading is *not* being used then don't
896 # hide the exception.
897
898 try:
899 with _active_limbo_lock:
900 del _active[_get_ident()]
901 # There must not be any python code between the previous line
902 # and after the lock is released. Otherwise a tracing function
903 # could try to acquire the lock again in the same thread, (in
904 # current_thread()), and would block.
905 except KeyError:
906 if 'dummy_threading' not in _sys.modules:
907 raise
908
909 def join(self, timeout=None):
910 """Wait until the thread terminates.
911
912 This blocks the calling thread until the thread whose join() method is
913 called terminates -- either normally or through an unhandled exception
914 or until the optional timeout occurs.
915
916 When the timeout argument is present and not None, it should be a
917 floating point number specifying a timeout for the operation in seconds
918 (or fractions thereof). As join() always returns None, you must call
919 isAlive() after join() to decide whether a timeout happened -- if the
920 thread is still alive, the join() call timed out.
921
922 When the timeout argument is not present or None, the operation will
923 block until the thread terminates.
924
925 A thread can be join()ed many times.
926
927 join() raises a RuntimeError if an attempt is made to join the current
928 thread as that would cause a deadlock. It is also an error to join() a
929 thread before it has been started and attempts to do so raises the same
930 exception.
931
932 """
933 if not self.__initialized:
934 raise RuntimeError("Thread.__init__() not called")
935 if not self.__started.is_set():
936 raise RuntimeError("cannot join thread before it is started")
937 if self is current_thread():
938 raise RuntimeError("cannot join current thread")
939
940 if __debug__:
941 if not self.__stopped:
942 self._note("%s.join(): waiting until thread stops", self)
943 self.__block.acquire()
944 try:
945 if timeout is None:
946 while not self.__stopped:
947 self.__block.wait()
948 if __debug__:
949 self._note("%s.join(): thread stopped", self)
950 else:
951 deadline = _time() + timeout
952 while not self.__stopped:
953 delay = deadline - _time()
954 if delay <= 0:
955 if __debug__:
956 self._note("%s.join(): timed out", self)
957 break
958 self.__block.wait(delay)
959 else:
960 if __debug__:
961 self._note("%s.join(): thread stopped", self)
962 finally:
963 self.__block.release()
964
965 @property
966 def name(self):
967 """A string used for identification purposes only.
968
969 It has no semantics. Multiple threads may be given the same name. The
970 initial name is set by the constructor.
971
972 """
973 assert self.__initialized, "Thread.__init__() not called"
974 return self.__name
975
976 @name.setter
977 def name(self, name):
978 assert self.__initialized, "Thread.__init__() not called"
979 self.__name = str(name)
980
981 @property
982 def ident(self):
983 """Thread identifier of this thread or None if it has not been started.
984
985 This is a nonzero integer. See the thread.get_ident() function. Thread
986 identifiers may be recycled when a thread exits and another thread is
987 created. The identifier is available even after the thread has exited.
988
989 """
990 assert self.__initialized, "Thread.__init__() not called"
991 return self.__ident
992
993 def isAlive(self):
994 """Return whether the thread is alive.
995
996 This method returns True just before the run() method starts until just
997 after the run() method terminates. The module function enumerate()
998 returns a list of all alive threads.
999
1000 """
1001 assert self.__initialized, "Thread.__init__() not called"
1002 return self.__started.is_set() and not self.__stopped
1003
1004 is_alive = isAlive
1005
1006 @property
1007 def daemon(self):
1008 """A boolean value indicating whether this thread is a daemon thread (True) or not (False).
1009
1010 This must be set before start() is called, otherwise RuntimeError is
1011 raised. Its initial value is inherited from the creating thread; the
1012 main thread is not a daemon thread and therefore all threads created in
1013 the main thread default to daemon = False.
1014
1015 The entire Python program exits when no alive non-daemon threads are
1016 left.
1017
1018 """
1019 assert self.__initialized, "Thread.__init__() not called"
1020 return self.__daemonic
1021
1022 @daemon.setter
1023 def daemon(self, daemonic):
1024 if not self.__initialized:
1025 raise RuntimeError("Thread.__init__() not called")
1026 if self.__started.is_set():
1027 raise RuntimeError("cannot set daemon status of active thread");
1028 self.__daemonic = daemonic
1029
1030 def isDaemon(self):
1031 return self.daemon
1032
1033 def setDaemon(self, daemonic):
1034 self.daemon = daemonic
1035
1036 def getName(self):
1037 return self.name
1038
1039 def setName(self, name):
1040 self.name = name
1041
1042# The timer class was contributed by Itamar Shtull-Trauring
1043
1044def Timer(*args, **kwargs):
1045 """Factory function to create a Timer object.
1046
1047 Timers call a function after a specified number of seconds:
1048
1049 t = Timer(30.0, f, args=[], kwargs={})
1050 t.start()
1051 t.cancel() # stop the timer's action if it's still waiting
1052
1053 """
1054 return _Timer(*args, **kwargs)
1055
1056class _Timer(Thread):
1057 """Call a function after a specified number of seconds:
1058
1059 t = Timer(30.0, f, args=[], kwargs={})
1060 t.start()
1061 t.cancel() # stop the timer's action if it's still waiting
1062
1063 """
1064
1065 def __init__(self, interval, function, args=[], kwargs={}):
1066 Thread.__init__(self)
1067 self.interval = interval
1068 self.function = function
1069 self.args = args
1070 self.kwargs = kwargs
1071 self.finished = Event()
1072
1073 def cancel(self):
1074 """Stop the timer if it hasn't finished yet"""
1075 self.finished.set()
1076
1077 def run(self):
1078 self.finished.wait(self.interval)
1079 if not self.finished.is_set():
1080 self.function(*self.args, **self.kwargs)
1081 self.finished.set()
1082
1083# Special thread class to represent the main thread
1084# This is garbage collected through an exit handler
1085
1086class _MainThread(Thread):
1087
1088 def __init__(self):
1089 Thread.__init__(self, name="MainThread")
1090 self._Thread__started.set()
1091 self._set_ident()
1092 with _active_limbo_lock:
1093 _active[_get_ident()] = self
1094
1095 def _set_daemon(self):
1096 return False
1097
1098 def _exitfunc(self):
1099 self._Thread__stop()
1100 t = _pickSomeNonDaemonThread()
1101 if t:
1102 if __debug__:
1103 self._note("%s: waiting for other threads", self)
1104 while t:
1105 t.join()
1106 t = _pickSomeNonDaemonThread()
1107 if __debug__:
1108 self._note("%s: exiting", self)
1109 self._Thread__delete()
1110
1111def _pickSomeNonDaemonThread():
1112 for t in enumerate():
1113 if not t.daemon and t.is_alive():
1114 return t
1115 return None
1116
1117
1118# Dummy thread class to represent threads not started here.
1119# These aren't garbage collected when they die, nor can they be waited for.
1120# If they invoke anything in threading.py that calls current_thread(), they
1121# leave an entry in the _active dict forever after.
1122# Their purpose is to return *something* from current_thread().
1123# They are marked as daemon threads so we won't wait for them
1124# when we exit (conform previous semantics).
1125
1126class _DummyThread(Thread):
1127
1128 def __init__(self):
1129 Thread.__init__(self, name=_newname("Dummy-%d"))
1130
1131 # Thread.__block consumes an OS-level locking primitive, which
1132 # can never be used by a _DummyThread. Since a _DummyThread
1133 # instance is immortal, that's bad, so release this resource.
1134 del self._Thread__block
1135
1136 self._Thread__started.set()
1137 self._set_ident()
1138 with _active_limbo_lock:
1139 _active[_get_ident()] = self
1140
1141 def _set_daemon(self):
1142 return True
1143
1144 def join(self, timeout=None):
1145 assert False, "cannot join a dummy thread"
1146
1147
1148# Global API functions
1149
1150def currentThread():
1151 """Return the current Thread object, corresponding to the caller's thread of control.
1152
1153 If the caller's thread of control was not created through the threading
1154 module, a dummy thread object with limited functionality is returned.
1155
1156 """
1157 try:
1158 return _active[_get_ident()]
1159 except KeyError:
1160 ##print "current_thread(): no current thread for", _get_ident()
1161 return _DummyThread()
1162
1163current_thread = currentThread
1164
1165def activeCount():
1166 """Return the number of Thread objects currently alive.
1167
1168 The returned count is equal to the length of the list returned by
1169 enumerate().
1170
1171 """
1172 with _active_limbo_lock:
1173 return len(_active) + len(_limbo)
1174
1175active_count = activeCount
1176
1177def _enumerate():
1178 # Same as enumerate(), but without the lock. Internal use only.
1179 return _active.values() + _limbo.values()
1180
1181def enumerate():
1182 """Return a list of all Thread objects currently alive.
1183
1184 The list includes daemonic threads, dummy thread objects created by
1185 current_thread(), and the main thread. It excludes terminated threads and
1186 threads that have not yet been started.
1187
1188 """
1189 with _active_limbo_lock:
1190 return _active.values() + _limbo.values()
1191
1192from thread import stack_size
1193
1194# Create the main thread object,
1195# and make it available for the interpreter
1196# (Py_Main) as threading._shutdown.
1197
1198_shutdown = _MainThread()._exitfunc
1199
1200# get thread-local implementation, either from the thread
1201# module, or from the python fallback
1202
1203try:
1204 from thread import _local as local
1205except ImportError:
1206 from _threading_local import local
1207
1208
1209def _after_fork():
1210 # This function is called by Python/ceval.c:PyEval_ReInitThreads which
1211 # is called from PyOS_AfterFork. Here we cleanup threading module state
1212 # that should not exist after a fork.
1213
1214 # Reset _active_limbo_lock, in case we forked while the lock was held
1215 # by another (non-forked) thread. http://bugs.python.org/issue874900
1216 global _active_limbo_lock
1217 _active_limbo_lock = _allocate_lock()
1218
1219 # fork() only copied the current thread; clear references to others.
1220 new_active = {}
1221 current = current_thread()
1222 with _active_limbo_lock:
1223 for thread in _active.itervalues():
1224 # Any lock/condition variable may be currently locked or in an
1225 # invalid state, so we reinitialize them.
1226 if hasattr(thread, '_reset_internal_locks'):
1227 thread._reset_internal_locks()
1228 if thread is current:
1229 # There is only one active thread. We reset the ident to
1230 # its new value since it can have changed.
1231 ident = _get_ident()
1232 thread._Thread__ident = ident
1233 new_active[ident] = thread
1234 else:
1235 # All the others are already stopped.
1236 thread._Thread__stop()
1237
1238 _limbo.clear()
1239 _active.clear()
1240 _active.update(new_active)
1241 assert len(_active) == 1
1242
1243
1244# Self-test code
1245
1246def _test():
1247
1248 class BoundedQueue(_Verbose):
1249
1250 def __init__(self, limit):
1251 _Verbose.__init__(self)
1252 self.mon = RLock()
1253 self.rc = Condition(self.mon)
1254 self.wc = Condition(self.mon)
1255 self.limit = limit
1256 self.queue = _deque()
1257
1258 def put(self, item):
1259 self.mon.acquire()
1260 while len(self.queue) >= self.limit:
1261 self._note("put(%s): queue full", item)
1262 self.wc.wait()
1263 self.queue.append(item)
1264 self._note("put(%s): appended, length now %d",
1265 item, len(self.queue))
1266 self.rc.notify()
1267 self.mon.release()
1268
1269 def get(self):
1270 self.mon.acquire()
1271 while not self.queue:
1272 self._note("get(): queue empty")
1273 self.rc.wait()
1274 item = self.queue.popleft()
1275 self._note("get(): got %s, %d left", item, len(self.queue))
1276 self.wc.notify()
1277 self.mon.release()
1278 return item
1279
1280 class ProducerThread(Thread):
1281
1282 def __init__(self, queue, quota):
1283 Thread.__init__(self, name="Producer")
1284 self.queue = queue
1285 self.quota = quota
1286
1287 def run(self):
1288 from random import random
1289 counter = 0
1290 while counter < self.quota:
1291 counter = counter + 1
1292 self.queue.put("%s.%d" % (self.name, counter))
1293 _sleep(random() * 0.00001)
1294
1295
1296 class ConsumerThread(Thread):
1297
1298 def __init__(self, queue, count):
1299 Thread.__init__(self, name="Consumer")
1300 self.queue = queue
1301 self.count = count
1302
1303 def run(self):
1304 while self.count > 0:
1305 item = self.queue.get()
1306 print item
1307 self.count = self.count - 1
1308
1309 NP = 3
1310 QL = 4
1311 NI = 5
1312
1313 Q = BoundedQueue(QL)
1314 P = []
1315 for i in range(NP):
1316 t = ProducerThread(Q, NI)
1317 t.name = ("Producer-%d" % (i+1))
1318 P.append(t)
1319 C = ConsumerThread(Q, NI*NP)
1320 for t in P:
1321 t.start()
1322 _sleep(0.000001)
1323 C.start()
1324 for t in P:
1325 t.join()
1326 C.join()
1327
1328if __name__ == '__main__':
1329 _test()