blob: ded573a4c4f7d5a4ba857e2b6850fd289b4ae5ce [file] [log] [blame]
Guido van Rossume7b146f2000-02-04 15:28:42 +00001"""Proposed new threading module, emulating a subset of Java's threading model."""
Guido van Rossum7f5013a1998-04-09 22:01:42 +00002
3import sys
4import time
5import thread
6import traceback
7import StringIO
8
9# Rename some stuff so "from threading import *" is safe
10
11_sys = sys
12del sys
13
14_time = time.time
15_sleep = time.sleep
16del time
17
18_start_new_thread = thread.start_new_thread
19_allocate_lock = thread.allocate_lock
20_get_ident = thread.get_ident
21del thread
22
23_print_exc = traceback.print_exc
24del traceback
25
26_StringIO = StringIO.StringIO
27del StringIO
28
29
30# Debug support (adapted from ihooks.py)
31
32_VERBOSE = 0
33
34if __debug__:
35
36 class _Verbose:
37
38 def __init__(self, verbose=None):
39 if verbose is None:
40 verbose = _VERBOSE
41 self.__verbose = verbose
42
43 def _note(self, format, *args):
44 if self.__verbose:
45 format = format % args
46 format = "%s: %s\n" % (
47 currentThread().getName(), format)
48 _sys.stderr.write(format)
49
50else:
51 # Disable this when using "python -O"
52 class _Verbose:
53 def __init__(self, verbose=None):
54 pass
55 def _note(self, *args):
56 pass
57
58
59# Synchronization classes
60
61Lock = _allocate_lock
62
63def RLock(*args, **kwargs):
64 return apply(_RLock, args, kwargs)
65
66class _RLock(_Verbose):
67
68 def __init__(self, verbose=None):
69 _Verbose.__init__(self, verbose)
70 self.__block = _allocate_lock()
71 self.__owner = None
72 self.__count = 0
73
74 def __repr__(self):
75 return "<%s(%s, %d)>" % (
76 self.__class__.__name__,
77 self.__owner and self.__owner.getName(),
78 self.__count)
79
80 def acquire(self, blocking=1):
81 me = currentThread()
82 if self.__owner is me:
83 self.__count = self.__count + 1
84 if __debug__:
85 self._note("%s.acquire(%s): recursive success", self, blocking)
86 return 1
87 rc = self.__block.acquire(blocking)
88 if rc:
89 self.__owner = me
90 self.__count = 1
91 if __debug__:
92 self._note("%s.acquire(%s): initial succes", self, blocking)
93 else:
94 if __debug__:
95 self._note("%s.acquire(%s): failure", self, blocking)
96 return rc
97
98 def release(self):
99 me = currentThread()
100 assert self.__owner is me, "release() of un-acquire()d lock"
101 self.__count = count = self.__count - 1
102 if not count:
103 self.__owner = None
104 self.__block.release()
105 if __debug__:
106 self._note("%s.release(): final release", self)
107 else:
108 if __debug__:
109 self._note("%s.release(): non-final release", self)
110
111 # Internal methods used by condition variables
112
113 def _acquire_restore(self, (count, owner)):
114 self.__block.acquire()
115 self.__count = count
116 self.__owner = owner
117 if __debug__:
118 self._note("%s._acquire_restore()", self)
119
120 def _release_save(self):
121 if __debug__:
122 self._note("%s._release_save()", self)
123 count = self.__count
124 self.__count = 0
125 owner = self.__owner
126 self.__owner = None
127 self.__block.release()
128 return (count, owner)
129
130 def _is_owned(self):
131 return self.__owner is currentThread()
132
133
134def Condition(*args, **kwargs):
135 return apply(_Condition, args, kwargs)
136
137class _Condition(_Verbose):
138
139 def __init__(self, lock=None, verbose=None):
140 _Verbose.__init__(self, verbose)
141 if lock is None:
142 lock = RLock()
143 self.__lock = lock
144 # Export the lock's acquire() and release() methods
145 self.acquire = lock.acquire
146 self.release = lock.release
147 # If the lock defines _release_save() and/or _acquire_restore(),
148 # these override the default implementations (which just call
149 # release() and acquire() on the lock). Ditto for _is_owned().
150 try:
151 self._release_save = lock._release_save
152 except AttributeError:
153 pass
154 try:
155 self._acquire_restore = lock._acquire_restore
156 except AttributeError:
157 pass
158 try:
159 self._is_owned = lock._is_owned
160 except AttributeError:
161 pass
162 self.__waiters = []
163
164 def __repr__(self):
165 return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
166
167 def _release_save(self):
168 self.__lock.release() # No state to save
169
170 def _acquire_restore(self, x):
171 self.__lock.acquire() # Ignore saved state
172
173 def _is_owned(self):
174 if self.__lock.acquire(0):
175 self.__lock.release()
176 return 0
177 else:
178 return 1
179
180 def wait(self, timeout=None):
181 me = currentThread()
182 assert self._is_owned(), "wait() of un-acquire()d lock"
183 waiter = _allocate_lock()
184 waiter.acquire()
185 self.__waiters.append(waiter)
186 saved_state = self._release_save()
187 if timeout is None:
188 waiter.acquire()
189 if __debug__:
190 self._note("%s.wait(): got it", self)
191 else:
192 endtime = _time() + timeout
193 delay = 0.000001 # 1 usec
194 while 1:
195 gotit = waiter.acquire(0)
196 if gotit or _time() >= endtime:
197 break
198 _sleep(delay)
199 if delay < 1.0:
200 delay = delay * 2.0
201 if not gotit:
202 if __debug__:
203 self._note("%s.wait(%s): timed out", self, timeout)
204 try:
205 self.__waiters.remove(waiter)
206 except ValueError:
207 pass
208 else:
209 if __debug__:
210 self._note("%s.wait(%s): got it", self, timeout)
211 self._acquire_restore(saved_state)
212
213 def notify(self, n=1):
214 me = currentThread()
215 assert self._is_owned(), "notify() of un-acquire()d lock"
216 __waiters = self.__waiters
217 waiters = __waiters[:n]
218 if not waiters:
219 if __debug__:
220 self._note("%s.notify(): no waiters", self)
221 return
222 self._note("%s.notify(): notifying %d waiter%s", self, n,
223 n!=1 and "s" or "")
224 for waiter in waiters:
225 waiter.release()
226 try:
227 __waiters.remove(waiter)
228 except ValueError:
229 pass
230
231 def notifyAll(self):
232 self.notify(len(self.__waiters))
233
234
235def Semaphore(*args, **kwargs):
236 return apply(_Semaphore, args, kwargs)
237
238class _Semaphore(_Verbose):
239
Andrew M. Kuchling39d3bfc2000-02-29 00:10:24 +0000240 # After Tim Peters' semaphore class, but not quite the same (no maximum)
Guido van Rossum7f5013a1998-04-09 22:01:42 +0000241
242 def __init__(self, value=1, verbose=None):
243 assert value >= 0, "Semaphore initial value must be >= 0"
244 _Verbose.__init__(self, verbose)
245 self.__cond = Condition(Lock())
246 self.__value = value
247
248 def acquire(self, blocking=1):
249 rc = 0
250 self.__cond.acquire()
251 while self.__value == 0:
252 if not blocking:
253 break
254 self.__cond.wait()
255 else:
256 self.__value = self.__value - 1
257 rc = 1
258 self.__cond.release()
259 return rc
260
261 def release(self):
262 self.__cond.acquire()
263 self.__value = self.__value + 1
264 self.__cond.notify()
265 self.__cond.release()
266
267
268def Event(*args, **kwargs):
269 return apply(_Event, args, kwargs)
270
271class _Event(_Verbose):
272
273 # After Tim Peters' event class (without is_posted())
274
275 def __init__(self, verbose=None):
276 _Verbose.__init__(self, verbose)
277 self.__cond = Condition(Lock())
278 self.__flag = 0
279
280 def isSet(self):
281 return self.__flag
282
283 def set(self):
284 self.__cond.acquire()
285 self.__flag = 1
286 self.__cond.notifyAll()
287 self.__cond.release()
288
289 def clear(self):
290 self.__cond.acquire()
291 self.__flag = 0
292 self.__cond.release()
293
294 def wait(self, timeout=None):
295 self.__cond.acquire()
296 if not self.__flag:
297 self.__cond.wait(timeout)
298 self.__cond.release()
299
300
301# Helper to generate new thread names
302_counter = 0
303def _newname(template="Thread-%d"):
304 global _counter
305 _counter = _counter + 1
306 return template % _counter
307
308# Active thread administration
309_active_limbo_lock = _allocate_lock()
310_active = {}
311_limbo = {}
312
313
314# Main class for threads
315
316class Thread(_Verbose):
317
318 __initialized = 0
319
320 def __init__(self, group=None, target=None, name=None,
321 args=(), kwargs={}, verbose=None):
Guido van Rossum5a43e1a1998-06-09 19:04:26 +0000322 assert group is None, "group argument must be None for now"
Guido van Rossum7f5013a1998-04-09 22:01:42 +0000323 _Verbose.__init__(self, verbose)
324 self.__target = target
325 self.__name = str(name or _newname())
326 self.__args = args
327 self.__kwargs = kwargs
328 self.__daemonic = self._set_daemon()
329 self.__started = 0
330 self.__stopped = 0
331 self.__block = Condition(Lock())
332 self.__initialized = 1
333
334 def _set_daemon(self):
335 # Overridden in _MainThread and _DummyThread
336 return currentThread().isDaemon()
337
338 def __repr__(self):
339 assert self.__initialized, "Thread.__init__() was not called"
340 status = "initial"
341 if self.__started:
342 status = "started"
343 if self.__stopped:
344 status = "stopped"
345 if self.__daemonic:
346 status = status + " daemon"
347 return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
348
349 def start(self):
350 assert self.__initialized, "Thread.__init__() not called"
351 assert not self.__started, "thread already started"
352 if __debug__:
353 self._note("%s.start(): starting thread", self)
354 _active_limbo_lock.acquire()
355 _limbo[self] = self
356 _active_limbo_lock.release()
357 _start_new_thread(self.__bootstrap, ())
358 self.__started = 1
359 _sleep(0.000001) # 1 usec, to let the thread run (Solaris hack)
360
361 def run(self):
362 if self.__target:
363 apply(self.__target, self.__args, self.__kwargs)
364
365 def __bootstrap(self):
366 try:
367 self.__started = 1
368 _active_limbo_lock.acquire()
369 _active[_get_ident()] = self
370 del _limbo[self]
371 _active_limbo_lock.release()
372 if __debug__:
373 self._note("%s.__bootstrap(): thread started", self)
374 try:
375 self.run()
376 except SystemExit:
377 if __debug__:
378 self._note("%s.__bootstrap(): raised SystemExit", self)
379 except:
380 if __debug__:
381 self._note("%s.__bootstrap(): unhandled exception", self)
382 s = _StringIO()
383 _print_exc(file=s)
384 _sys.stderr.write("Exception in thread %s:\n%s\n" %
385 (self.getName(), s.getvalue()))
386 else:
387 if __debug__:
388 self._note("%s.__bootstrap(): normal return", self)
389 finally:
390 self.__stop()
391 self.__delete()
392
393 def __stop(self):
394 self.__block.acquire()
395 self.__stopped = 1
396 self.__block.notifyAll()
397 self.__block.release()
398
399 def __delete(self):
400 _active_limbo_lock.acquire()
401 del _active[_get_ident()]
402 _active_limbo_lock.release()
403
404 def join(self, timeout=None):
405 assert self.__initialized, "Thread.__init__() not called"
Guido van Rossum5a43e1a1998-06-09 19:04:26 +0000406 assert self.__started, "cannot join thread before it is started"
Guido van Rossum7f5013a1998-04-09 22:01:42 +0000407 assert self is not currentThread(), "cannot join current thread"
408 if __debug__:
409 if not self.__stopped:
410 self._note("%s.join(): waiting until thread stops", self)
411 self.__block.acquire()
412 if timeout is None:
Guido van Rossum5a43e1a1998-06-09 19:04:26 +0000413 while not self.__stopped:
Guido van Rossum7f5013a1998-04-09 22:01:42 +0000414 self.__block.wait()
415 if __debug__:
416 self._note("%s.join(): thread stopped", self)
417 else:
Guido van Rossumb39e4611998-05-29 17:47:10 +0000418 deadline = _time() + timeout
Guido van Rossum7f5013a1998-04-09 22:01:42 +0000419 while not self.__stopped:
Guido van Rossumb39e4611998-05-29 17:47:10 +0000420 delay = deadline - _time()
Guido van Rossum7f5013a1998-04-09 22:01:42 +0000421 if delay <= 0:
422 if __debug__:
423 self._note("%s.join(): timed out", self)
424 break
425 self.__block.wait(delay)
426 else:
427 if __debug__:
428 self._note("%s.join(): thread stopped", self)
429 self.__block.release()
430
431 def getName(self):
432 assert self.__initialized, "Thread.__init__() not called"
433 return self.__name
434
435 def setName(self, name):
436 assert self.__initialized, "Thread.__init__() not called"
437 self.__name = str(name)
438
439 def isAlive(self):
440 assert self.__initialized, "Thread.__init__() not called"
441 return self.__started and not self.__stopped
442
443 def isDaemon(self):
444 assert self.__initialized, "Thread.__init__() not called"
445 return self.__daemonic
446
447 def setDaemon(self, daemonic):
448 assert self.__initialized, "Thread.__init__() not called"
449 assert not self.__started, "cannot set daemon status of active thread"
450 self.__daemonic = daemonic
451
452
453# Special thread class to represent the main thread
454# This is garbage collected through an exit handler
455
456class _MainThread(Thread):
457
458 def __init__(self):
459 Thread.__init__(self, name="MainThread")
460 self._Thread__started = 1
461 _active_limbo_lock.acquire()
462 _active[_get_ident()] = self
463 _active_limbo_lock.release()
464 try:
465 self.__oldexitfunc = _sys.exitfunc
466 except AttributeError:
467 self.__oldexitfunc = None
468 _sys.exitfunc = self.__exitfunc
469
470 def _set_daemon(self):
471 return 0
472
473 def __exitfunc(self):
474 self._Thread__stop()
475 t = _pickSomeNonDaemonThread()
476 if t:
477 if __debug__:
478 self._note("%s: waiting for other threads", self)
479 while t:
480 t.join()
481 t = _pickSomeNonDaemonThread()
482 if self.__oldexitfunc:
483 if __debug__:
484 self._note("%s: calling exit handler", self)
485 self.__oldexitfunc()
486 if __debug__:
487 self._note("%s: exiting", self)
488 self._Thread__delete()
489
490def _pickSomeNonDaemonThread():
491 for t in enumerate():
492 if not t.isDaemon() and t.isAlive():
493 return t
494 return None
495
496
497# Dummy thread class to represent threads not started here.
498# These aren't garbage collected when they die,
499# nor can they be waited for.
500# Their purpose is to return *something* from currentThread().
501# They are marked as daemon threads so we won't wait for them
502# when we exit (conform previous semantics).
503
504class _DummyThread(Thread):
505
506 def __init__(self):
507 Thread.__init__(self, name=_newname("Dummy-%d"))
Guido van Rossum8e7eaa81999-09-29 15:26:52 +0000508 self._Thread__started = 1
Guido van Rossum7f5013a1998-04-09 22:01:42 +0000509 _active_limbo_lock.acquire()
510 _active[_get_ident()] = self
511 _active_limbo_lock.release()
512
513 def _set_daemon(self):
514 return 1
515
516 def join(self):
517 assert 0, "cannot join a dummy thread"
518
519
520# Global API functions
521
522def currentThread():
523 try:
524 return _active[_get_ident()]
525 except KeyError:
526 print "currentThread(): no current thread for", _get_ident()
527 return _DummyThread()
528
529def activeCount():
530 _active_limbo_lock.acquire()
531 count = len(_active) + len(_limbo)
532 _active_limbo_lock.release()
533 return count
534
535def enumerate():
536 _active_limbo_lock.acquire()
537 active = _active.values() + _limbo.values()
538 _active_limbo_lock.release()
539 return active
540
541
542# Create the main thread object
543
544_MainThread()
545
546
547# Self-test code
548
549def _test():
550
Guido van Rossumb26a1b41998-05-20 17:05:52 +0000551 import random
Guido van Rossum7f5013a1998-04-09 22:01:42 +0000552
553 class BoundedQueue(_Verbose):
554
555 def __init__(self, limit):
556 _Verbose.__init__(self)
557 self.mon = RLock()
558 self.rc = Condition(self.mon)
559 self.wc = Condition(self.mon)
560 self.limit = limit
561 self.queue = []
562
563 def put(self, item):
564 self.mon.acquire()
565 while len(self.queue) >= self.limit:
566 self._note("put(%s): queue full", item)
567 self.wc.wait()
568 self.queue.append(item)
569 self._note("put(%s): appended, length now %d",
570 item, len(self.queue))
571 self.rc.notify()
572 self.mon.release()
573
574 def get(self):
575 self.mon.acquire()
576 while not self.queue:
577 self._note("get(): queue empty")
578 self.rc.wait()
579 item = self.queue[0]
580 del self.queue[0]
581 self._note("get(): got %s, %d left", item, len(self.queue))
582 self.wc.notify()
583 self.mon.release()
584 return item
585
586 class ProducerThread(Thread):
587
588 def __init__(self, queue, quota):
589 Thread.__init__(self, name="Producer")
590 self.queue = queue
591 self.quota = quota
592
593 def run(self):
Guido van Rossumb26a1b41998-05-20 17:05:52 +0000594 from random import random
Guido van Rossum7f5013a1998-04-09 22:01:42 +0000595 counter = 0
596 while counter < self.quota:
597 counter = counter + 1
598 self.queue.put("%s.%d" % (self.getName(), counter))
599 _sleep(random() * 0.00001)
600
601
602 class ConsumerThread(Thread):
603
604 def __init__(self, queue, count):
605 Thread.__init__(self, name="Consumer")
606 self.queue = queue
607 self.count = count
608
609 def run(self):
610 while self.count > 0:
611 item = self.queue.get()
612 print item
613 self.count = self.count - 1
614
615 import time
616
617 NP = 3
618 QL = 4
619 NI = 5
620
621 Q = BoundedQueue(QL)
622 P = []
623 for i in range(NP):
624 t = ProducerThread(Q, NI)
625 t.setName("Producer-%d" % (i+1))
626 P.append(t)
627 C = ConsumerThread(Q, NI*NP)
628 for t in P:
629 t.start()
630 _sleep(0.000001)
631 C.start()
632 for t in P:
633 t.join()
634 C.join()
635
636if __name__ == '__main__':
637 _test()