blob: d81a9745d3ebe9c76569736f025d326aa50a4d3e [file] [log] [blame]
__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
import sys, os, subprocess, traceback, time, signal, pickle
from autotest_lib.client.common_lib import error, utils
def parallel(tasklist, timeout=None, return_results=False):
"""Run a set of predefined subcommands in parallel.
If return_results is True instead of an exception being raised on
any error a list of the results/exceptions from the functions is
returned.
"""
pids = []
run_error = False
for task in tasklist:
task.fork_start()
remaining_timeout = None
if timeout:
endtime = time.time() + timeout
results = []
for task in tasklist:
if timeout:
remaining_timeout = max(endtime - time.time(), 1)
try:
status = task.fork_waitfor(remaining_timeout)
except error.AutoservSubcommandError:
run_error = True
else:
if status != 0:
run_error = True
results.append(pickle.load(task.result_pickle))
task.result_pickle.close()
if return_results:
return results
elif run_error:
raise error.AutoservError('One or more subcommands failed')
def parallel_simple(function, arglist, log=True, timeout=None):
"""Each element in the arglist used to create a subcommand object,
where that arg is used both as a subdir name, and a single argument
to pass to "function".
We create a subcommand object for each element in the list,
then execute those subcommand objects in parallel."""
# Bypass the multithreading if only one machine.
if len (arglist) == 1:
function(arglist[0])
return
subcommands = []
for arg in arglist:
args = [arg]
if log:
subdir = str(arg)
else:
subdir = None
subcommands.append(subcommand(function, args, subdir))
parallel(subcommands, timeout)
def _where_art_thy_filehandles():
os.system("ls -l /proc/%d/fd >> /dev/tty" % os.getpid())
def _print_to_tty(string):
open('/dev/tty', 'w').write(string + '\n')
def _redirect_stream(fd, output):
newfd = os.open(output, os.O_WRONLY | os.O_CREAT | os.O_APPEND)
os.dup2(newfd, fd)
os.close(newfd)
if fd == 1:
sys.stdout = os.fdopen(fd, 'w')
if fd == 2:
sys.stderr = os.fdopen(fd, 'w')
def _redirect_stream_tee(fd, output, tag):
"""Use the low-level fork & pipe operations here to get a fd,
not a filehandle. This ensures that we get both the
filehandle and fd for stdout/stderr redirected correctly."""
r, w = os.pipe()
pid = os.fork()
if pid: # Parent
os.dup2(w, fd)
os.close(r)
os.close(w)
if fd == 1:
sys.stdout = os.fdopen(fd, 'w', 1)
if fd == 2:
sys.stderr = os.fdopen(fd, 'w', 1)
return
else: # Child
signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler
os.close(w)
log = open(output, 'a')
f = os.fdopen(r, 'r')
for line in iter(f.readline, ''):
# Tee straight to file
log.write(line)
log.flush()
# Prepend stdout with the tag
print tag + ' : ' + line,
sys.stdout.flush()
log.close()
os._exit(0)
class subcommand(object):
fork_hooks, join_hooks = [], []
def __init__(self, func, args, subdir = None, stdprint = True):
# func(args) - the subcommand to run
# subdir - the subdirectory to log results in
# stdprint - whether to print results to stdout/stderr
if subdir:
self.subdir = os.path.abspath(subdir)
if not os.path.exists(self.subdir):
os.mkdir(self.subdir)
self.debug = os.path.join(self.subdir, 'debug')
if not os.path.exists(self.debug):
os.mkdir(self.debug)
self.stdout = os.path.join(self.debug, 'stdout')
self.stderr = os.path.join(self.debug, 'stderr')
else:
self.subdir = None
self.debug = '/dev/null'
self.stdout = '/dev/null'
self.stderr = '/dev/null'
self.func = func
self.args = args
self.lambda_function = lambda: func(*args)
self.pid = None
self.stdprint = stdprint
self.returncode = None
@classmethod
def register_fork_hook(cls, hook):
""" Register a function to be called from the child process after
forking. """
cls.fork_hooks.append(hook)
@classmethod
def register_join_hook(cls, hook):
""" Register a function to be called when from the child process
just before the child process terminates (joins to the parent). """
cls.join_hooks.append(hook)
def redirect_output(self):
if self.stdprint:
if self.subdir:
tag = os.path.basename(self.subdir)
_redirect_stream_tee(1, self.stdout, tag)
_redirect_stream_tee(2, self.stderr, tag)
else:
_redirect_stream(1, self.stdout)
_redirect_stream(2, self.stderr)
def fork_start(self):
sys.stdout.flush()
sys.stderr.flush()
r, w = os.pipe()
self.returncode = None
self.pid = os.fork()
if self.pid: # I am the parent
os.close(w)
self.result_pickle = os.fdopen(r, 'r')
return
else:
os.close(r)
# We are the child from this point on. Never return.
signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler
if self.subdir:
os.chdir(self.subdir)
self.redirect_output()
try:
for hook in self.fork_hooks:
hook(self)
result = self.lambda_function()
os.write(w, pickle.dumps(result))
except Exception, e:
traceback.print_exc()
exit_code = 1
os.write(w, pickle.dumps(e))
else:
exit_code = 0
try:
for hook in self.join_hooks:
hook(self)
finally:
sys.stdout.flush()
sys.stderr.flush()
os._exit(exit_code)
def _handle_exitstatus(self, sts):
"""
This is partially borrowed from subprocess.Popen.
"""
if os.WIFSIGNALED(sts):
self.returncode = -os.WTERMSIG(sts)
elif os.WIFEXITED(sts):
self.returncode = os.WEXITSTATUS(sts)
else:
# Should never happen
raise RuntimeError("Unknown child exit status!")
if self.returncode != 0:
print "subcommand failed pid %d" % self.pid
print "%s" % (self.func,)
print "rc=%d" % self.returncode
print
if os.path.exists(self.stderr):
for line in open(self.stderr).readlines():
print line,
print "\n--------------------------------------------\n"
raise error.AutoservSubcommandError(self.func, self.returncode)
def poll(self):
"""
This is borrowed from subprocess.Popen.
"""
if self.returncode is None:
try:
pid, sts = os.waitpid(self.pid, os.WNOHANG)
if pid == self.pid:
self._handle_exitstatus(sts)
except os.error:
pass
return self.returncode
def wait(self):
"""
This is borrowed from subprocess.Popen.
"""
if self.returncode is None:
pid, sts = os.waitpid(self.pid, 0)
self._handle_exitstatus(sts)
return self.returncode
def fork_waitfor(self, timeout=None):
if not timeout:
return self.wait()
else:
start_time = time.time()
while time.time() <= start_time + timeout:
self.poll()
if self.returncode is not None:
return self.returncode
time.sleep(1)
utils.nuke_pid(self.pid)
print "subcommand failed pid %d" % self.pid
print "%s" % (self.func,)
print "timeout after %ds" % timeout
print
return None