Add a --forever flag, to continuously run tests as things change.
	Change on 2015/01/07 by ctiller <ctiller@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=83451760
diff --git a/tools/run_tests/jobset.py b/tools/run_tests/jobset.py
index 0890cc5..d3a46b6 100755
--- a/tools/run_tests/jobset.py
+++ b/tools/run_tests/jobset.py
@@ -4,14 +4,11 @@
 import random
 import subprocess
 import sys
-import threading
+import tempfile
+import time
 
-# multiplicative factor to over subscribe CPU cores
-# (many tests sleep for a long time)
-_OVERSUBSCRIBE = 32
-_active_jobs = threading.Semaphore(
-    multiprocessing.cpu_count() * _OVERSUBSCRIBE)
-_output_lock = threading.Lock()
+
+_MAX_JOBS = 16 * multiprocessing.cpu_count()
 
 
 def shuffle_iteratable(it):
@@ -25,7 +22,7 @@
   p = 1
   for val in it:
     if random.randint(0, p) == 0:
-      p *= 2
+      p = min(p*2, 100)
       yield val
     else:
       nextit.append(val)
@@ -36,53 +33,107 @@
     yield val
 
 
+_SUCCESS = object()
+_FAILURE = object()
+_RUNNING = object()
+_KILLED = object()
+
+
+class Job(object):
+  """Manages one job."""
+
+  def __init__(self, cmdline):
+    self._cmdline = ' '.join(cmdline)
+    self._tempfile = tempfile.TemporaryFile()
+    self._process = subprocess.Popen(args=cmdline,
+                                     stderr=subprocess.STDOUT,
+                                     stdout=self._tempfile)
+    self._state = _RUNNING
+    sys.stdout.write('\x1b[0G\x1b[2K\x1b[33mSTART\x1b[0m: %s' %
+                     self._cmdline)
+    sys.stdout.flush()
+
+  def state(self):
+    """Poll current state of the job. Prints messages at completion."""
+    if self._state == _RUNNING and self._process.poll() is not None:
+      if self._process.returncode != 0:
+        self._state = _FAILURE
+        self._tempfile.seek(0)
+        stdout = self._tempfile.read()
+        sys.stdout.write('\x1b[0G\x1b[2K\x1b[31mFAILED\x1b[0m: %s'
+                         ' [ret=%d]\n'
+                         '%s\n' % (
+                             self._cmdline, self._process.returncode, stdout))
+        sys.stdout.flush()
+      else:
+        self._state = _SUCCESS
+        sys.stdout.write('\x1b[0G\x1b[2K\x1b[32mPASSED\x1b[0m: %s' %
+                         self._cmdline)
+        sys.stdout.flush()
+    return self._state
+
+  def kill(self):
+    if self._state == _RUNNING:
+      self._state = _KILLED
+      self._process.terminate()
+
+
 class Jobset(object):
   """Manages one run of jobs."""
 
-  def __init__(self, cmdlines):
-    self._cmdlines = shuffle_iteratable(cmdlines)
+  def __init__(self, check_cancelled):
+    self._running = set()
+    self._check_cancelled = check_cancelled
+    self._cancelled = False
     self._failures = 0
 
-  def _run_thread(self, cmdline):
-    try:
-      # start the process
-      p = subprocess.Popen(args=cmdline,
-                           stderr=subprocess.STDOUT,
-                           stdout=subprocess.PIPE)
-      stdout, _ = p.communicate()
-      # log output (under a lock)
-      _output_lock.acquire()
-      try:
-        if p.returncode != 0:
-          sys.stdout.write('\x1b[0G\x1b[2K\x1b[31mFAILED\x1b[0m: %s'
-                           ' [ret=%d]\n'
-                           '%s\n' % (
-                               ' '.join(cmdline), p.returncode,
-                               stdout))
-          self._failures += 1
-        else:
-          sys.stdout.write('\x1b[0G\x1b[2K\x1b[32mPASSED\x1b[0m: %s' %
-                           ' '.join(cmdline))
-        sys.stdout.flush()
-      finally:
-        _output_lock.release()
-    finally:
-      _active_jobs.release()
+  def start(self, cmdline):
+    """Start a job. Return True on success, False on failure."""
+    while len(self._running) >= _MAX_JOBS:
+      if self.cancelled(): return False
+      self.reap()
+    if self.cancelled(): return False
+    self._running.add(Job(cmdline))
+    return True
 
-  def run(self):
-    threads = []
-    for cmdline in self._cmdlines:
-      # cap number of active jobs - release in _run_thread
-      _active_jobs.acquire()
-      t = threading.Thread(target=self._run_thread,
-                           args=[cmdline])
-      t.start()
-      threads.append(t)
-    for thread in threads:
-      thread.join()
-    return self._failures == 0
+  def reap(self):
+    """Collect the dead jobs."""
+    while self._running:
+      dead = set()
+      for job in self._running:
+        st = job.state()
+        if st == _RUNNING: continue
+        if st == _FAILURE: self._failures += 1
+        dead.add(job)
+      for job in dead:
+        self._running.remove(job)
+      if not dead: return
+      time.sleep(0.1)
+
+  def cancelled(self):
+    """Poll for cancellation."""
+    if self._cancelled: return True
+    if not self._check_cancelled(): return False
+    for job in self._running:
+      job.kill()
+    self._cancelled = True
+    return True
+
+  def finish(self):
+    while self._running:
+      if self.cancelled(): pass  # poll cancellation
+      self.reap()
+    return not self.cancelled() and self._failures == 0
 
 
-def run(cmdlines):
-  return Jobset(cmdlines).run()
+def _never_cancelled():
+  return False
+
+
+def run(cmdlines, check_cancelled=_never_cancelled):
+  js = Jobset(check_cancelled)
+  for cmdline in shuffle_iteratable(cmdlines):
+    if not js.start(cmdline):
+      break
+  return js.finish()
 
diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py
index ee61f33..9234682 100755
--- a/tools/run_tests/run_tests.py
+++ b/tools/run_tests/run_tests.py
@@ -6,8 +6,10 @@
 import itertools
 import multiprocessing
 import sys
+import time
 
 import jobset
+import watch_dirs
 
 # flags required for make for each configuration
 _CONFIGS = ['dbg', 'opt', 'tsan', 'msan', 'asan']
@@ -20,6 +22,10 @@
                   default=['all'])
 argp.add_argument('-t', '--test-filter', nargs='*', default=['*'])
 argp.add_argument('-n', '--runs_per_test', default=1, type=int)
+argp.add_argument('-f', '--forever',
+                  default=False,
+                  action='store_const',
+                  const=True)
 args = argp.parse_args()
 
 # grab config
@@ -29,21 +35,38 @@
                for x in args.config)]
 filters = args.test_filter
 runs_per_test = args.runs_per_test
+forever = args.forever
 
-# build latest, sharing cpu between the various makes
-if not jobset.run(
-    ['make',
-     '-j', '%d' % max(multiprocessing.cpu_count() / len(configs), 1),
-     'buildtests_c',
-     'CONFIG=%s' % cfg]
-    for cfg in configs):
-  sys.exit(1)
 
-# run all the tests
-jobset.run([x]
-           for x in itertools.chain.from_iterable(
-               itertools.chain.from_iterable(itertools.repeat(
-                   glob.glob('bins/%s/%s_test' % (config, filt)),
-                   runs_per_test))
-               for config in configs
-               for filt in filters))
+def _build_and_run(check_cancelled):
+  """Do one pass of building & running tests."""
+  # build latest, sharing cpu between the various makes
+  if not jobset.run(
+      (['make',
+        '-j', '%d' % max(multiprocessing.cpu_count() / len(configs), 1),
+        'buildtests_c',
+        'CONFIG=%s' % cfg]
+       for cfg in configs), check_cancelled):
+    sys.exit(1)
+
+  # run all the tests
+  jobset.run(([x]
+              for x in itertools.chain.from_iterable(
+                  itertools.chain.from_iterable(itertools.repeat(
+                      glob.glob('bins/%s/%s_test' % (config, filt)),
+                      runs_per_test))
+                  for config in configs
+                  for filt in filters)), check_cancelled)
+
+
+if forever:
+  while True:
+    dw = watch_dirs.DirWatcher(['src', 'include', 'test'])
+    initial_time = dw.most_recent_change()
+    have_files_changed = lambda: dw.most_recent_change() != initial_time
+    _build_and_run(have_files_changed)
+    while not have_files_changed():
+      time.sleep(1)
+else:
+  _build_and_run(lambda: False)
+
diff --git a/tools/run_tests/watch_dirs.py b/tools/run_tests/watch_dirs.py
new file mode 100755
index 0000000..8ebbb27
--- /dev/null
+++ b/tools/run_tests/watch_dirs.py
@@ -0,0 +1,46 @@
+"""Helper to watch a (set) of directories for modifications."""
+
+import os
+import threading
+import time
+
+
+class DirWatcher(object):
+  """Helper to watch a (set) of directories for modifications."""
+
+  def __init__(self, paths):
+    if isinstance(paths, basestring):
+      paths = [paths]
+    self._mu = threading.Lock()
+    self._done = False
+    self.paths = list(paths)
+    self.lastrun = time.time()
+    self._cache = self._calculate()
+
+  def _calculate(self):
+    """Walk over all subscribed paths, check most recent mtime."""
+    most_recent_change = None
+    for path in self.paths:
+      if not os.path.exists(path):
+        continue
+      if not os.path.isdir(path):
+        continue
+      for root, _, files in os.walk(path):
+        for f in files:
+          st = os.stat(os.path.join(root, f))
+          if most_recent_change is None:
+            most_recent_change = st.st_mtime
+          else:
+            most_recent_change = max(most_recent_change, st.st_mtime)
+    return most_recent_change
+
+  def most_recent_change(self):
+    self._mu.acquire()
+    try:
+      if time.time() - self.lastrun > 1:
+        self._cache = self._calculate()
+        self.lastrun = time.time()
+      return self._cache
+    finally:
+      self._mu.release()
+