Added timeout to parallel subcommands.
Tested by running parallel tests for the following situations:
without timeout specified
        with timeout specified and timing out
        with timeout specified and not timing out

Signed-off-by: Colby Ranger <cranger@google.com>



git-svn-id: http://test.kernel.org/svn/autotest/trunk@1176 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/server/subcommand.py b/server/subcommand.py
index dbeb8b6..b1e1b27 100644
--- a/server/subcommand.py
+++ b/server/subcommand.py
@@ -1,24 +1,31 @@
 __author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
 
-import sys, os, subprocess, traceback
+import sys, os, subprocess, traceback, time, utils
 from common.error import *
 
 
-def parallel(tasklist):
+def parallel(tasklist, timeout=None):
 	"""Run an set of predefined subcommands in parallel"""
 	pids = []
 	error = False
 	for task in tasklist:
 		task.fork_start()
+
+	remaining_timeout = None
+	if timeout:
+		endtime = time.time() + timeout
+
 	for task in tasklist:
-		status = task.fork_waitfor()
+		if timeout:
+			remaining_timeout = max(endtime - time.time(), 1)
+		status = task.fork_waitfor(remaining_timeout)
 		if status != 0:
 			error = True
 	if error:
 		raise AutoservError('One or more subcommands failed')
 
 
-def parallel_simple(function, arglist, log=True):
+def parallel_simple(function, arglist, log=True, timeout=None):
 	"""Each element in the arglist used to create a subcommand object,
 	where that arg is used both as a subdir name, and a single argument
 	to pass to "function".
@@ -36,9 +43,9 @@
 		if log:
 			subdir = str(arg)
 		else:
-			subdir = None 
+			subdir = None
 		subcommands.append(subcommand(function, args, subdir))
-	parallel(subcommands)
+	parallel(subcommands, timeout)
 
 
 def _where_art_thy_filehandles():
@@ -154,8 +161,25 @@
 		os._exit(0)
 
 
-	def fork_waitfor(self):
-		(pid, status) = os.waitpid(self.pid, 0)
+	def fork_waitfor(self, timeout=None):
+		if not timeout:
+			(pid, status) = os.waitpid(self.pid, 0)
+		else:
+			pid = None
+			start_time = time.time()
+			while time.time() <= start_time + timeout:
+				(pid, status) = os.waitpid(self.pid, os.WNOHANG)
+				if pid:
+					break
+				time.sleep(1)
+
+			if not pid:
+				utils.nuke_pid(self.pid)
+				print "subcommand failed pid %d" % self.pid
+				print "%s(%s)" % (self.func, self.args)
+				print "timeout after %ds" % timeout
+				print
+				return None
 
 		if status != 0:
 			print "subcommand failed pid %d" % pid
diff --git a/server/utils.py b/server/utils.py
index 2dc7c90..9c534fb 100644
--- a/server/utils.py
+++ b/server/utils.py
@@ -145,6 +145,34 @@
 		       time.sleep(1)
 
 
+def nuke_pid(pid):
+       # the process has not terminated within timeout,
+       # kill it via an escalating series of signals.
+       signal_queue = [signal.SIGTERM, signal.SIGKILL]
+       for sig in signal_queue:
+	       try:
+		       os.kill(pid, sig)
+
+	       # The process may have died before we could kill it.
+	       except OSError:
+		       pass
+
+	       try:
+		       for i in range(5):
+			       status = os.waitpid(pid, os.WNOHANG)[0]
+			       if status == pid:
+				       return
+			       time.sleep(1)
+
+		       if status != pid:
+			       raise AutoservRunError('Could not kill pid %d'
+				       % pid, None)
+
+	       # the process died before we join it.
+	       except OSError:
+		       pass
+
+
 def _process_output(pipe, fbuffer, teefile=None, use_os_read=True):
 	if use_os_read:
 		data = os.read(pipe.fileno(), 1024)