blob: e97bd99be0b98f33ddd5169088b9a290e78de998 [file] [log] [blame]
Guido van Rossum1b789f91993-12-17 14:45:06 +00001# A parallelized "find(1)" using the thread module (SGI only for now).
2
3# This demonstrates the use of a work queue and worker threads.
4# It really does do more stats/sec when using multiple threads,
5# although the improvement is only about 20-30 percent.
6
7# I'm too lazy to write a command line parser for the full find(1)
8# command line syntax, so the predicate it searches for is wired-in,
9# see function selector() below. (It currently searches for files with
10# group or world write permission.)
11
12# Usage: parfind.py [-w nworkers] [directory] ...
13# Default nworkers is 4, maximum appears to be 8 (on Irix 4.0.2)
14
15
16import sys
17import getopt
18import string
19import time
20import os
21from stat import *
22import thread
23
24
25# Work queue class. Usage:
26# wq = WorkQ()
27# wq.addwork(func, (arg1, arg2, ...)) # one or more calls
28# wq.run(nworkers)
29# The work is done when wq.run() completes.
30# The function calls executed by the workers may add more work.
31# Don't use keyboard interrupts!
32
33class WorkQ:
34
35 # Invariants:
36
37 # - busy and work are only modified when mutex is locked
38 # - len(work) is the number of jobs ready to be taken
39 # - busy is the number of jobs being done
40 # - todo is locked iff there is no work and somebody is busy
41
42 def __init__(self):
43 self.mutex = thread.allocate()
44 self.todo = thread.allocate()
45 self.todo.acquire()
46 self.work = []
47 self.busy = 0
48
49 def addwork(self, job):
50 if not job:
51 raise TypeError, 'cannot add null job'
52 self.mutex.acquire()
53 self.work.append(job)
54 self.mutex.release()
55 if len(self.work) == 1:
56 self.todo.release()
57
58 def _getwork(self):
59 self.todo.acquire()
60 self.mutex.acquire()
61 if self.busy == 0 and len(self.work) == 0:
62 self.mutex.release()
63 self.todo.release()
64 return None
65 job = self.work[0]
66 del self.work[0]
67 self.busy = self.busy + 1
68 self.mutex.release()
69 if len(self.work) > 0:
70 self.todo.release()
71 return job
72
73 def _donework(self):
74 self.mutex.acquire()
75 self.busy = self.busy - 1
76 if self.busy == 0 and len(self.work) == 0:
77 self.todo.release()
78 self.mutex.release()
79
80 def _worker(self):
81 while 1:
82 job = self._getwork()
83 if not job:
84 break
85 func, args = job
86 apply(func, args)
87 self._donework()
88
89 def run(self, nworkers):
90 if not self.work:
91 return # Nothing to do
92 for i in range(nworkers-1):
93 thread.start_new(self._worker, ())
94 self._worker()
95 self.todo.acquire()
96
97
98# Main program
99
100def main():
101 nworkers = 4
102 opts, args = getopt.getopt(sys.argv[1:], '-w:')
103 for opt, arg in opts:
104 if opt == '-w':
105 nworkers = string.atoi(arg)
106 if not args:
107 args = [os.curdir]
108
109 wq = WorkQ()
110 for dir in args:
111 wq.addwork(find, (dir, selector, wq))
112
113 t1 = time.millitimer()
114 wq.run(nworkers)
115 t2 = time.millitimer()
116
117 sys.stderr.write('Total time ' + `t2-t1` + ' msec.\n')
118
119
120# The predicate -- defines what files we look for.
121# Feel free to change this to suit your purpose
122
123def selector(dir, name, fullname, stat):
124 # Look for group or world writable files
125 return (stat[ST_MODE] & 0022) != 0
126
127
128# The find procedure -- calls wq.addwork() for subdirectories
129
130def find(dir, pred, wq):
131 try:
132 names = os.listdir(dir)
133 except os.error, msg:
134 print `dir`, ':', msg
135 return
136 for name in names:
137 if name not in (os.curdir, os.pardir):
138 fullname = os.path.join(dir, name)
139 try:
140 stat = os.lstat(fullname)
141 except os.error, msg:
142 print `fullname`, ':', msg
143 continue
144 if pred(dir, name, fullname, stat):
145 print fullname
146 if S_ISDIR(stat[ST_MODE]):
147 if not os.path.ismount(fullname):
148 wq.addwork(find, (fullname, pred, wq))
149
150
151# Call the main program
152
153main()