blob: 00b2bf95cee21aa1e25f695e40e638d8cc61f21a [file] [log] [blame]
mblighbd1eb202008-07-29 22:09:29 +00001import os, sys
2
3
4class ParallelError(Exception):
5 def __init__(self, str, errors):
6 self.str = str
7 self.errors = errors
8 Exception.__init__(self, str)
9
10
11class ParallelExecute(object):
12 def __init__(self, functions, max_simultaneous_procs=20):
13 """\
14 This takes in a dictionary of functions which map to a set of
15 functions that they depend on.
16
17 functions: This is either a list of or dictionary of functions to
18 be run. If it's a dictionary, the value should be a set
19 of other functions this function is dependent on. If its
20 a list (or tuple or anything iterable that returns a
21 single element each iteration), then it's assumed that
22 there are no dependencies.
23
24 max_simultaneous_procs: Throttle the number of processes we have
25 running at once.
26 """
27 if not isinstance(functions, dict):
28 function_list = functions
29 functions = {}
30 for fn in function_list:
31 functions[fn] = set()
32
33 dependents = {}
34 for fn, deps in functions.iteritems():
35 dependents[fn] = []
36 for fn, deps in functions.iteritems():
37 for dep in deps:
38 dependents[dep].append(fn)
39
40 self.max_procs = max_simultaneous_procs
41 self.functions = functions
42 self.dependents = dependents
43 self.pid_map = {}
44 self.ready_to_run = []
45
46
47 def _run(self, function):
48 self.functions.pop(function)
49 pid = os.fork()
50 if pid:
51 self.pid_map[pid] = function
52 else:
53 function()
54 sys.exit(0)
55
56
57 def run_until_completion(self):
58 for fn, deps in self.functions.iteritems():
59 if len(deps) == 0:
60 self.ready_to_run.append(fn)
61
62 errors = []
63 while len(self.pid_map) > 0 or len(self.ready_to_run) > 0:
64 max_allowed = self.max_procs - len(self.pid_map)
65 max_able = len(self.ready_to_run)
66 for i in xrange(min(max_allowed, max_able)):
67 self._run(self.ready_to_run.pop())
68
69 # Handle one proc that's finished.
70 pid, status = os.wait()
71 fn = self.pid_map.pop(pid)
72 if status != 0:
73 errors.append("%s failed" % fn.__name__)
74 continue
75
76 for dependent in self.dependents[fn]:
77 self.functions[dependent].remove(fn)
78 if len(self.functions[dependent]) == 0:
79 self.ready_to_run.append(dependent)
80
mblighab0b0382008-08-21 20:18:04 +000081 if len(self.functions) > 0 and len(errors) == 0:
mblighbd1eb202008-07-29 22:09:29 +000082 errors.append("Deadlock detected")
83
84 if len(errors) > 0:
85 msg = "Errors occurred during execution:"
86 msg = '\n'.join([msg] + errors)
87 raise ParallelError(msg, errors)
88
89
90def redirect_io(log_file='/dev/null'):
91 # Always redirect stdin.
92 in_fd = os.open('/dev/null', os.O_RDONLY)
93 try:
94 os.dup2(in_fd, 0)
95 finally:
96 os.close(in_fd)
97
98 out_fd = os.open(log_file, os.O_WRONLY | os.O_CREAT)
99 try:
100 os.dup2(out_fd, 2)
101 os.dup2(out_fd, 1)
102 finally:
103 os.close(out_fd)
104
105 sys.stdin = os.fdopen(0, 'r')
106 sys.stdout = os.fdopen(1, 'w')
107 sys.stderr = os.fdopen(2, 'w')