Nicolas Noble | ddef246 | 2015-01-06 18:08:25 -0800 | [diff] [blame^] | 1 | """Run a group of subprocesses and then finish.""" |
| 2 | |
| 3 | import multiprocessing |
| 4 | import random |
| 5 | import subprocess |
| 6 | import sys |
| 7 | import threading |
| 8 | |
| 9 | # multiplicative factor to over subscribe CPU cores |
| 10 | # (many tests sleep for a long time) |
| 11 | _OVERSUBSCRIBE = 32 |
| 12 | _active_jobs = threading.Semaphore( |
| 13 | multiprocessing.cpu_count() * _OVERSUBSCRIBE) |
| 14 | _output_lock = threading.Lock() |
| 15 | |
| 16 | |
| 17 | def shuffle_iteratable(it): |
| 18 | """Return an iterable that randomly walks it""" |
| 19 | # take a random sampling from the passed in iterable |
| 20 | # we take an element with probablity 1/p and rapidly increase |
| 21 | # p as we take elements - this gives us a somewhat random set of values before |
| 22 | # we've seen all the values, but starts producing values without having to |
| 23 | # compute ALL of them at once, allowing tests to start a little earlier |
| 24 | nextit = [] |
| 25 | p = 1 |
| 26 | for val in it: |
| 27 | if random.randint(0, p) == 0: |
| 28 | p *= 2 |
| 29 | yield val |
| 30 | else: |
| 31 | nextit.append(val) |
| 32 | # after taking a random sampling, we shuffle the rest of the elements and |
| 33 | # yield them |
| 34 | random.shuffle(nextit) |
| 35 | for val in nextit: |
| 36 | yield val |
| 37 | |
| 38 | |
| 39 | class Jobset(object): |
| 40 | """Manages one run of jobs.""" |
| 41 | |
| 42 | def __init__(self, cmdlines): |
| 43 | self._cmdlines = shuffle_iteratable(cmdlines) |
| 44 | self._failures = 0 |
| 45 | |
| 46 | def _run_thread(self, cmdline): |
| 47 | try: |
| 48 | # start the process |
| 49 | p = subprocess.Popen(args=cmdline, |
| 50 | stderr=subprocess.STDOUT, |
| 51 | stdout=subprocess.PIPE) |
| 52 | stdout, _ = p.communicate() |
| 53 | # log output (under a lock) |
| 54 | _output_lock.acquire() |
| 55 | try: |
| 56 | if p.returncode != 0: |
| 57 | sys.stdout.write('\x1b[0G\x1b[2K\x1b[31mFAILED\x1b[0m: %s' |
| 58 | ' [ret=%d]\n' |
| 59 | '%s\n' % ( |
| 60 | ' '.join(cmdline), p.returncode, |
| 61 | stdout)) |
| 62 | self._failures += 1 |
| 63 | else: |
| 64 | sys.stdout.write('\x1b[0G\x1b[2K\x1b[32mPASSED\x1b[0m: %s' % |
| 65 | ' '.join(cmdline)) |
| 66 | sys.stdout.flush() |
| 67 | finally: |
| 68 | _output_lock.release() |
| 69 | finally: |
| 70 | _active_jobs.release() |
| 71 | |
| 72 | def run(self): |
| 73 | threads = [] |
| 74 | for cmdline in self._cmdlines: |
| 75 | # cap number of active jobs - release in _run_thread |
| 76 | _active_jobs.acquire() |
| 77 | t = threading.Thread(target=self._run_thread, |
| 78 | args=[cmdline]) |
| 79 | t.start() |
| 80 | threads.append(t) |
| 81 | for thread in threads: |
| 82 | thread.join() |
| 83 | return self._failures == 0 |
| 84 | |
| 85 | |
| 86 | def run(cmdlines): |
| 87 | return Jobset(cmdlines).run() |
| 88 | |