blob: e9ba9c3c2f9bf03405dfbbbb1d4e20819bf2f2df [file] [log] [blame]
mblighb0fab822007-07-25 16:40:19 +00001__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
2
mbligha57cc922009-08-24 22:04:19 +00003import sys, os, subprocess, time, signal, cPickle, logging
jadmanskid93d7d22008-05-29 21:37:29 +00004
5from autotest_lib.client.common_lib import error, utils
mblighb0fab822007-07-25 16:40:19 +00006
7
showard75cdfee2009-06-10 17:40:41 +00008# entry points that use subcommand must set this to their logging manager
9# to get log redirection for subcommands
10logging_manager_object = None
11
12
mblighe1836812009-04-17 18:25:14 +000013def parallel(tasklist, timeout=None, return_results=False):
mbligh415dc212009-06-15 21:53:34 +000014 """
15 Run a set of predefined subcommands in parallel.
16
17 @param tasklist: A list of subcommand instances to execute.
18 @param timeout: Number of seconds after which the commands should timeout.
19 @param return_results: If True instead of an AutoServError being raised
20 on any error a list of the results|exceptions from the tasks is
21 returned. [default: False]
mblighe1836812009-04-17 18:25:14 +000022 """
jadmanski0afbb632008-06-06 21:10:57 +000023 run_error = False
24 for task in tasklist:
25 task.fork_start()
mblighc3aee0f2008-01-17 16:26:39 +000026
jadmanski0afbb632008-06-06 21:10:57 +000027 remaining_timeout = None
28 if timeout:
29 endtime = time.time() + timeout
mblighc3aee0f2008-01-17 16:26:39 +000030
mblighe1836812009-04-17 18:25:14 +000031 results = []
jadmanski0afbb632008-06-06 21:10:57 +000032 for task in tasklist:
33 if timeout:
34 remaining_timeout = max(endtime - time.time(), 1)
35 try:
mbligha57cc922009-08-24 22:04:19 +000036 status = task.fork_waitfor(timeout=remaining_timeout)
jadmanski0afbb632008-06-06 21:10:57 +000037 except error.AutoservSubcommandError:
38 run_error = True
39 else:
40 if status != 0:
41 run_error = True
mbligh158ba7b2008-03-07 18:29:12 +000042
mbligha57cc922009-08-24 22:04:19 +000043 results.append(cPickle.load(task.result_pickle))
mblighe1836812009-04-17 18:25:14 +000044 task.result_pickle.close()
45
46 if return_results:
47 return results
48 elif run_error:
mbligha57cc922009-08-24 22:04:19 +000049 message = 'One or more subcommands failed:\n'
50 for task, result in zip(tasklist, results):
51 message += 'task: %s returned/raised: %r\n' % (task, result)
52 raise error.AutoservError(message)
mblighb0fab822007-07-25 16:40:19 +000053
54
mbligh415dc212009-06-15 21:53:34 +000055def parallel_simple(function, arglist, log=True, timeout=None,
56 return_results=False):
57 """
58 Each element in the arglist used to create a subcommand object,
jadmanski0afbb632008-06-06 21:10:57 +000059 where that arg is used both as a subdir name, and a single argument
60 to pass to "function".
mblighdd3235b2008-01-14 16:44:19 +000061
mbligh415dc212009-06-15 21:53:34 +000062 We create a subcommand object for each element in the list,
63 then execute those subcommand objects in parallel.
64
65 NOTE: As an optimization, if len(arglist) == 1 a subcommand is not used.
66
67 @param function: A callable to run in parallel once per arg in arglist.
68 @param arglist: A list of single arguments to be used one per subcommand;
69 typically a list of machine names.
70 @param log: If True, output will be written to output in a subdirectory
71 named after each subcommand's arg.
72 @param timeout: Number of seconds after which the commands should timeout.
73 @param return_results: If True instead of an AutoServError being raised
74 on any error a list of the results|exceptions from the function
75 called on each arg is returned. [default: False]
76
77 @returns None or a list of results/exceptions.
78 """
mbligh26f0d882009-06-22 18:30:01 +000079 if not arglist:
Ilja H. Friedel04be2bd2014-05-07 21:29:59 -070080 logging.warning('parallel_simple was called with an empty arglist, '
mbligha57cc922009-08-24 22:04:19 +000081 'did you forget to pass in a list of machines?')
jadmanski0afbb632008-06-06 21:10:57 +000082 # Bypass the multithreading if only one machine.
mbligh415dc212009-06-15 21:53:34 +000083 if len(arglist) == 1:
84 arg = arglist[0]
85 if return_results:
86 try:
87 result = function(arg)
88 except Exception, e:
89 return [e]
90 return [result]
91 else:
92 function(arg)
93 return
jadmanski0afbb632008-06-06 21:10:57 +000094
95 subcommands = []
96 for arg in arglist:
97 args = [arg]
98 if log:
99 subdir = str(arg)
100 else:
101 subdir = None
102 subcommands.append(subcommand(function, args, subdir))
mbligh415dc212009-06-15 21:53:34 +0000103 return parallel(subcommands, timeout, return_results=return_results)
mblighb0fab822007-07-25 16:40:19 +0000104
105
jadmanski550fdc22008-11-20 16:32:08 +0000106class subcommand(object):
107 fork_hooks, join_hooks = [], []
108
showard75cdfee2009-06-10 17:40:41 +0000109 def __init__(self, func, args, subdir = None):
jadmanski0afbb632008-06-06 21:10:57 +0000110 # func(args) - the subcommand to run
111 # subdir - the subdirectory to log results in
jadmanski0afbb632008-06-06 21:10:57 +0000112 if subdir:
113 self.subdir = os.path.abspath(subdir)
114 if not os.path.exists(self.subdir):
115 os.mkdir(self.subdir)
116 self.debug = os.path.join(self.subdir, 'debug')
117 if not os.path.exists(self.debug):
118 os.mkdir(self.debug)
jadmanski0afbb632008-06-06 21:10:57 +0000119 else:
120 self.subdir = None
showard75cdfee2009-06-10 17:40:41 +0000121 self.debug = None
mblighd7685d32007-08-10 22:08:42 +0000122
jadmanski0afbb632008-06-06 21:10:57 +0000123 self.func = func
124 self.args = args
125 self.lambda_function = lambda: func(*args)
126 self.pid = None
showardc408c5e2009-01-08 23:30:53 +0000127 self.returncode = None
mblighb0fab822007-07-25 16:40:19 +0000128
129
mbligha57cc922009-08-24 22:04:19 +0000130 def __str__(self):
131 return str('subcommand(func=%s, args=%s, subdir=%s)' %
132 (self.func, self.args, self.subdir))
133
134
jadmanski550fdc22008-11-20 16:32:08 +0000135 @classmethod
136 def register_fork_hook(cls, hook):
137 """ Register a function to be called from the child process after
138 forking. """
139 cls.fork_hooks.append(hook)
140
141
142 @classmethod
143 def register_join_hook(cls, hook):
144 """ Register a function to be called when from the child process
145 just before the child process terminates (joins to the parent). """
146 cls.join_hooks.append(hook)
147
148
jadmanski0afbb632008-06-06 21:10:57 +0000149 def redirect_output(self):
showard75cdfee2009-06-10 17:40:41 +0000150 if self.subdir and logging_manager_object:
151 tag = os.path.basename(self.subdir)
152 logging_manager_object.tee_redirect_debug_dir(self.debug, tag=tag)
mblighb0fab822007-07-25 16:40:19 +0000153
154
jadmanski0afbb632008-06-06 21:10:57 +0000155 def fork_start(self):
156 sys.stdout.flush()
157 sys.stderr.flush()
mblighe1836812009-04-17 18:25:14 +0000158 r, w = os.pipe()
jadmanski77e8da82009-04-21 14:26:40 +0000159 self.returncode = None
jadmanski0afbb632008-06-06 21:10:57 +0000160 self.pid = os.fork()
mblighb0fab822007-07-25 16:40:19 +0000161
jadmanski0afbb632008-06-06 21:10:57 +0000162 if self.pid: # I am the parent
mblighe1836812009-04-17 18:25:14 +0000163 os.close(w)
164 self.result_pickle = os.fdopen(r, 'r')
jadmanski0afbb632008-06-06 21:10:57 +0000165 return
mblighe1836812009-04-17 18:25:14 +0000166 else:
167 os.close(r)
mblighb0fab822007-07-25 16:40:19 +0000168
jadmanski0afbb632008-06-06 21:10:57 +0000169 # We are the child from this point on. Never return.
170 signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler
171 if self.subdir:
172 os.chdir(self.subdir)
173 self.redirect_output()
mblighb0fab822007-07-25 16:40:19 +0000174
jadmanski0afbb632008-06-06 21:10:57 +0000175 try:
jadmanski550fdc22008-11-20 16:32:08 +0000176 for hook in self.fork_hooks:
177 hook(self)
mblighe1836812009-04-17 18:25:14 +0000178 result = self.lambda_function()
mbligha57cc922009-08-24 22:04:19 +0000179 os.write(w, cPickle.dumps(result, cPickle.HIGHEST_PROTOCOL))
jadmanski550fdc22008-11-20 16:32:08 +0000180 exit_code = 0
mbligha57cc922009-08-24 22:04:19 +0000181 except Exception, e:
182 logging.exception('function failed')
183 exit_code = 1
184 os.write(w, cPickle.dumps(e, cPickle.HIGHEST_PROTOCOL))
185
186 os.close(w)
jadmanski550fdc22008-11-20 16:32:08 +0000187
188 try:
189 for hook in self.join_hooks:
190 hook(self)
191 finally:
jadmanski0afbb632008-06-06 21:10:57 +0000192 sys.stdout.flush()
193 sys.stderr.flush()
jadmanski550fdc22008-11-20 16:32:08 +0000194 os._exit(exit_code)
mblighb0fab822007-07-25 16:40:19 +0000195
196
showardc408c5e2009-01-08 23:30:53 +0000197 def _handle_exitstatus(self, sts):
198 """
199 This is partially borrowed from subprocess.Popen.
200 """
201 if os.WIFSIGNALED(sts):
202 self.returncode = -os.WTERMSIG(sts)
203 elif os.WIFEXITED(sts):
204 self.returncode = os.WEXITSTATUS(sts)
jadmanski0afbb632008-06-06 21:10:57 +0000205 else:
showardc408c5e2009-01-08 23:30:53 +0000206 # Should never happen
207 raise RuntimeError("Unknown child exit status!")
mblighc3aee0f2008-01-17 16:26:39 +0000208
showardc408c5e2009-01-08 23:30:53 +0000209 if self.returncode != 0:
210 print "subcommand failed pid %d" % self.pid
jadmanski0afbb632008-06-06 21:10:57 +0000211 print "%s" % (self.func,)
showardc408c5e2009-01-08 23:30:53 +0000212 print "rc=%d" % self.returncode
jadmanski0afbb632008-06-06 21:10:57 +0000213 print
showard75cdfee2009-06-10 17:40:41 +0000214 if self.debug:
215 stderr_file = os.path.join(self.debug, 'autoserv.stderr')
216 if os.path.exists(stderr_file):
217 for line in open(stderr_file).readlines():
218 print line,
jadmanski0afbb632008-06-06 21:10:57 +0000219 print "\n--------------------------------------------\n"
showardc408c5e2009-01-08 23:30:53 +0000220 raise error.AutoservSubcommandError(self.func, self.returncode)
221
222
223 def poll(self):
224 """
225 This is borrowed from subprocess.Popen.
226 """
227 if self.returncode is None:
228 try:
229 pid, sts = os.waitpid(self.pid, os.WNOHANG)
230 if pid == self.pid:
231 self._handle_exitstatus(sts)
232 except os.error:
233 pass
234 return self.returncode
235
236
237 def wait(self):
238 """
239 This is borrowed from subprocess.Popen.
240 """
241 if self.returncode is None:
242 pid, sts = os.waitpid(self.pid, 0)
243 self._handle_exitstatus(sts)
244 return self.returncode
245
246
247 def fork_waitfor(self, timeout=None):
248 if not timeout:
249 return self.wait()
250 else:
mbligha57cc922009-08-24 22:04:19 +0000251 end_time = time.time() + timeout
252 while time.time() <= end_time:
253 returncode = self.poll()
254 if returncode is not None:
255 return returncode
showardc408c5e2009-01-08 23:30:53 +0000256 time.sleep(1)
257
258 utils.nuke_pid(self.pid)
259 print "subcommand failed pid %d" % self.pid
260 print "%s" % (self.func,)
261 print "timeout after %ds" % timeout
262 print
263 return None