blob: 11fb65aada5ba44eb8d5088638927190059b6840 [file] [log] [blame]
Guido van Rossum2108a501996-07-30 19:07:18 +00001# A parallelized "find(1)" using the thread module.
Guido van Rossum1b789f91993-12-17 14:45:06 +00002
3# This demonstrates the use of a work queue and worker threads.
4# It really does do more stats/sec when using multiple threads,
5# although the improvement is only about 20-30 percent.
6
7# I'm too lazy to write a command line parser for the full find(1)
8# command line syntax, so the predicate it searches for is wired-in,
9# see function selector() below. (It currently searches for files with
10# group or world write permission.)
11
12# Usage: parfind.py [-w nworkers] [directory] ...
13# Default nworkers is 4, maximum appears to be 8 (on Irix 4.0.2)
14
15
16import sys
17import getopt
18import string
19import time
20import os
21from stat import *
22import thread
23
24
25# Work queue class. Usage:
26# wq = WorkQ()
27# wq.addwork(func, (arg1, arg2, ...)) # one or more calls
28# wq.run(nworkers)
29# The work is done when wq.run() completes.
30# The function calls executed by the workers may add more work.
31# Don't use keyboard interrupts!
32
33class WorkQ:
34
Tim Peters172c40b2001-01-21 07:07:30 +000035 # Invariants:
Guido van Rossum1b789f91993-12-17 14:45:06 +000036
Tim Peters172c40b2001-01-21 07:07:30 +000037 # - busy and work are only modified when mutex is locked
38 # - len(work) is the number of jobs ready to be taken
39 # - busy is the number of jobs being done
40 # - todo is locked iff there is no work and somebody is busy
Guido van Rossum1b789f91993-12-17 14:45:06 +000041
Tim Peters172c40b2001-01-21 07:07:30 +000042 def __init__(self):
43 self.mutex = thread.allocate()
44 self.todo = thread.allocate()
45 self.todo.acquire()
46 self.work = []
47 self.busy = 0
Guido van Rossum1b789f91993-12-17 14:45:06 +000048
Tim Peters172c40b2001-01-21 07:07:30 +000049 def addwork(self, func, args):
50 job = (func, args)
51 self.mutex.acquire()
52 self.work.append(job)
53 self.mutex.release()
54 if len(self.work) == 1:
55 self.todo.release()
Guido van Rossum1b789f91993-12-17 14:45:06 +000056
Tim Peters172c40b2001-01-21 07:07:30 +000057 def _getwork(self):
58 self.todo.acquire()
59 self.mutex.acquire()
60 if self.busy == 0 and len(self.work) == 0:
61 self.mutex.release()
62 self.todo.release()
63 return None
64 job = self.work[0]
65 del self.work[0]
66 self.busy = self.busy + 1
67 self.mutex.release()
68 if len(self.work) > 0:
69 self.todo.release()
70 return job
Guido van Rossum1b789f91993-12-17 14:45:06 +000071
Tim Peters172c40b2001-01-21 07:07:30 +000072 def _donework(self):
73 self.mutex.acquire()
74 self.busy = self.busy - 1
75 if self.busy == 0 and len(self.work) == 0:
76 self.todo.release()
77 self.mutex.release()
Guido van Rossum1b789f91993-12-17 14:45:06 +000078
Tim Peters172c40b2001-01-21 07:07:30 +000079 def _worker(self):
80 time.sleep(0.00001) # Let other threads run
81 while 1:
82 job = self._getwork()
83 if not job:
84 break
85 func, args = job
86 apply(func, args)
87 self._donework()
Guido van Rossum1b789f91993-12-17 14:45:06 +000088
Tim Peters172c40b2001-01-21 07:07:30 +000089 def run(self, nworkers):
90 if not self.work:
91 return # Nothing to do
92 for i in range(nworkers-1):
93 thread.start_new(self._worker, ())
94 self._worker()
95 self.todo.acquire()
Guido van Rossum1b789f91993-12-17 14:45:06 +000096
97
98# Main program
99
100def main():
Tim Peters172c40b2001-01-21 07:07:30 +0000101 sys.argv.append("/tmp")
102 nworkers = 4
103 opts, args = getopt.getopt(sys.argv[1:], '-w:')
104 for opt, arg in opts:
105 if opt == '-w':
106 nworkers = string.atoi(arg)
107 if not args:
108 args = [os.curdir]
Guido van Rossum1b789f91993-12-17 14:45:06 +0000109
Tim Peters172c40b2001-01-21 07:07:30 +0000110 wq = WorkQ()
111 for dir in args:
112 wq.addwork(find, (dir, selector, wq))
Guido van Rossum1b789f91993-12-17 14:45:06 +0000113
Tim Peters172c40b2001-01-21 07:07:30 +0000114 t1 = time.time()
115 wq.run(nworkers)
116 t2 = time.time()
Guido van Rossum1b789f91993-12-17 14:45:06 +0000117
Tim Peters172c40b2001-01-21 07:07:30 +0000118 sys.stderr.write('Total time ' + `t2-t1` + ' sec.\n')
Guido van Rossum1b789f91993-12-17 14:45:06 +0000119
120
121# The predicate -- defines what files we look for.
122# Feel free to change this to suit your purpose
123
124def selector(dir, name, fullname, stat):
Tim Peters172c40b2001-01-21 07:07:30 +0000125 # Look for group or world writable files
126 return (stat[ST_MODE] & 0022) != 0
Guido van Rossum1b789f91993-12-17 14:45:06 +0000127
128
129# The find procedure -- calls wq.addwork() for subdirectories
130
131def find(dir, pred, wq):
Tim Peters172c40b2001-01-21 07:07:30 +0000132 try:
133 names = os.listdir(dir)
134 except os.error, msg:
135 print `dir`, ':', msg
136 return
137 for name in names:
138 if name not in (os.curdir, os.pardir):
139 fullname = os.path.join(dir, name)
140 try:
141 stat = os.lstat(fullname)
142 except os.error, msg:
143 print `fullname`, ':', msg
144 continue
145 if pred(dir, name, fullname, stat):
146 print fullname
147 if S_ISDIR(stat[ST_MODE]):
148 if not os.path.ismount(fullname):
149 wq.addwork(find, (fullname, pred, wq))
Guido van Rossum1b789f91993-12-17 14:45:06 +0000150
151
152# Call the main program
153
154main()