blob: 0890cc5d99b4f7d50600612d1b3c4ebf98b71741 [file] [log] [blame]
Nicolas Nobleddef2462015-01-06 18:08:25 -08001"""Run a group of subprocesses and then finish."""
2
3import multiprocessing
4import random
5import subprocess
6import sys
7import threading
8
9# multiplicative factor to over subscribe CPU cores
10# (many tests sleep for a long time)
11_OVERSUBSCRIBE = 32
12_active_jobs = threading.Semaphore(
13 multiprocessing.cpu_count() * _OVERSUBSCRIBE)
14_output_lock = threading.Lock()
15
16
17def shuffle_iteratable(it):
18 """Return an iterable that randomly walks it"""
19 # take a random sampling from the passed in iterable
20 # we take an element with probablity 1/p and rapidly increase
21 # p as we take elements - this gives us a somewhat random set of values before
22 # we've seen all the values, but starts producing values without having to
23 # compute ALL of them at once, allowing tests to start a little earlier
24 nextit = []
25 p = 1
26 for val in it:
27 if random.randint(0, p) == 0:
28 p *= 2
29 yield val
30 else:
31 nextit.append(val)
32 # after taking a random sampling, we shuffle the rest of the elements and
33 # yield them
34 random.shuffle(nextit)
35 for val in nextit:
36 yield val
37
38
39class Jobset(object):
40 """Manages one run of jobs."""
41
42 def __init__(self, cmdlines):
43 self._cmdlines = shuffle_iteratable(cmdlines)
44 self._failures = 0
45
46 def _run_thread(self, cmdline):
47 try:
48 # start the process
49 p = subprocess.Popen(args=cmdline,
50 stderr=subprocess.STDOUT,
51 stdout=subprocess.PIPE)
52 stdout, _ = p.communicate()
53 # log output (under a lock)
54 _output_lock.acquire()
55 try:
56 if p.returncode != 0:
57 sys.stdout.write('\x1b[0G\x1b[2K\x1b[31mFAILED\x1b[0m: %s'
58 ' [ret=%d]\n'
59 '%s\n' % (
60 ' '.join(cmdline), p.returncode,
61 stdout))
62 self._failures += 1
63 else:
64 sys.stdout.write('\x1b[0G\x1b[2K\x1b[32mPASSED\x1b[0m: %s' %
65 ' '.join(cmdline))
66 sys.stdout.flush()
67 finally:
68 _output_lock.release()
69 finally:
70 _active_jobs.release()
71
72 def run(self):
73 threads = []
74 for cmdline in self._cmdlines:
75 # cap number of active jobs - release in _run_thread
76 _active_jobs.acquire()
77 t = threading.Thread(target=self._run_thread,
78 args=[cmdline])
79 t.start()
80 threads.append(t)
81 for thread in threads:
82 thread.join()
83 return self._failures == 0
84
85
86def run(cmdlines):
87 return Jobset(cmdlines).run()
88