blob: e377230ef5d8fc9ea08768744349f0e37dd0581a [file] [log] [blame]
Guido van Rossumad50ca92002-12-30 22:30:22 +00001# Very rudimentary test of threading module
2
3# Create a bunch of threads, let each do some work, wait until all are done
4
5from test.test_support import verbose
6import random
7import dummy_threading as _threading
8import time
9
10
11class TestThread(_threading.Thread):
Tim Petersf2715e02003-02-19 02:35:07 +000012
Guido van Rossumad50ca92002-12-30 22:30:22 +000013 def run(self):
14 global running
15 delay = random.random() * 2
16 if verbose:
17 print 'task', self.getName(), 'will run for', delay, 'sec'
18 sema.acquire()
19 mutex.acquire()
20 running = running + 1
21 if verbose:
22 print running, 'tasks are running'
23 mutex.release()
24 time.sleep(delay)
25 if verbose:
26 print 'task', self.getName(), 'done'
27 mutex.acquire()
28 running = running - 1
29 if verbose:
30 print self.getName(), 'is finished.', running, 'tasks are running'
31 mutex.release()
32 sema.release()
33
34def starttasks():
35 for i in range(numtasks):
36 t = TestThread(name="<thread %d>"%i)
37 threads.append(t)
38 t.start()
39
40
41def test_main():
42 # This takes about n/3 seconds to run (about n/3 clumps of tasks, times
43 # about 1 second per clump).
44 global numtasks
45 numtasks = 10
46
47 # no more than 3 of the 10 can run at once
48 global sema
49 sema = _threading.BoundedSemaphore(value=3)
50 global mutex
51 mutex = _threading.RLock()
52 global running
53 running = 0
54
55 global threads
56 threads = []
Tim Petersf2715e02003-02-19 02:35:07 +000057
Guido van Rossumad50ca92002-12-30 22:30:22 +000058 starttasks()
59
60 if verbose:
61 print 'waiting for all tasks to complete'
62 for t in threads:
63 t.join()
64 if verbose:
65 print 'all tasks done'
66
67
68
69if __name__ == '__main__':
70 test_main()