blob: 95053d11ef1ac331775c619fd8854e8ff59a6539 [file] [log] [blame]
mblighb0fab822007-07-25 16:40:19 +00001__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
2
David James3f638af2014-10-25 18:51:26 -07003import sys, os, signal, time, cPickle, logging
jadmanskid93d7d22008-05-29 21:37:29 +00004
5from autotest_lib.client.common_lib import error, utils
David James3f638af2014-10-25 18:51:26 -07006from autotest_lib.client.common_lib.cros import retry
mblighb0fab822007-07-25 16:40:19 +00007
8
showard75cdfee2009-06-10 17:40:41 +00009# entry points that use subcommand must set this to their logging manager
10# to get log redirection for subcommands
11logging_manager_object = None
12
13
mblighe1836812009-04-17 18:25:14 +000014def parallel(tasklist, timeout=None, return_results=False):
mbligh415dc212009-06-15 21:53:34 +000015 """
16 Run a set of predefined subcommands in parallel.
17
18 @param tasklist: A list of subcommand instances to execute.
19 @param timeout: Number of seconds after which the commands should timeout.
20 @param return_results: If True instead of an AutoServError being raised
21 on any error a list of the results|exceptions from the tasks is
22 returned. [default: False]
mblighe1836812009-04-17 18:25:14 +000023 """
jadmanski0afbb632008-06-06 21:10:57 +000024 run_error = False
25 for task in tasklist:
26 task.fork_start()
mblighc3aee0f2008-01-17 16:26:39 +000027
jadmanski0afbb632008-06-06 21:10:57 +000028 remaining_timeout = None
29 if timeout:
30 endtime = time.time() + timeout
mblighc3aee0f2008-01-17 16:26:39 +000031
mblighe1836812009-04-17 18:25:14 +000032 results = []
jadmanski0afbb632008-06-06 21:10:57 +000033 for task in tasklist:
34 if timeout:
35 remaining_timeout = max(endtime - time.time(), 1)
36 try:
mbligha57cc922009-08-24 22:04:19 +000037 status = task.fork_waitfor(timeout=remaining_timeout)
jadmanski0afbb632008-06-06 21:10:57 +000038 except error.AutoservSubcommandError:
39 run_error = True
40 else:
41 if status != 0:
42 run_error = True
mbligh158ba7b2008-03-07 18:29:12 +000043
mbligha57cc922009-08-24 22:04:19 +000044 results.append(cPickle.load(task.result_pickle))
mblighe1836812009-04-17 18:25:14 +000045 task.result_pickle.close()
46
47 if return_results:
48 return results
49 elif run_error:
mbligha57cc922009-08-24 22:04:19 +000050 message = 'One or more subcommands failed:\n'
51 for task, result in zip(tasklist, results):
52 message += 'task: %s returned/raised: %r\n' % (task, result)
53 raise error.AutoservError(message)
mblighb0fab822007-07-25 16:40:19 +000054
55
Prathmesh Prabhud08c86b2017-07-21 16:14:33 -070056def parallel_simple(function, arglist, subdir_name_constructor=lambda x: str(x),
57 log=True, timeout=None, return_results=False):
mbligh415dc212009-06-15 21:53:34 +000058 """
59 Each element in the arglist used to create a subcommand object,
jadmanski0afbb632008-06-06 21:10:57 +000060 where that arg is used both as a subdir name, and a single argument
61 to pass to "function".
mblighdd3235b2008-01-14 16:44:19 +000062
mbligh415dc212009-06-15 21:53:34 +000063 We create a subcommand object for each element in the list,
64 then execute those subcommand objects in parallel.
65
66 NOTE: As an optimization, if len(arglist) == 1 a subcommand is not used.
67
68 @param function: A callable to run in parallel once per arg in arglist.
Prathmesh Prabhud08c86b2017-07-21 16:14:33 -070069 @param arglist: A list of arguments to be used one per subcommand
70 @param subdir_name_constructor: A function that returns a name for the
71 result sub-directory created per subcommand.
72 Signature is:
73 subdir_name_constructor(arg)
74 where arg is the argument passed to function.
mbligh415dc212009-06-15 21:53:34 +000075 @param log: If True, output will be written to output in a subdirectory
76 named after each subcommand's arg.
77 @param timeout: Number of seconds after which the commands should timeout.
78 @param return_results: If True instead of an AutoServError being raised
79 on any error a list of the results|exceptions from the function
80 called on each arg is returned. [default: False]
81
82 @returns None or a list of results/exceptions.
83 """
mbligh26f0d882009-06-22 18:30:01 +000084 if not arglist:
Ilja H. Friedel04be2bd2014-05-07 21:29:59 -070085 logging.warning('parallel_simple was called with an empty arglist, '
Prathmesh Prabhud08c86b2017-07-21 16:14:33 -070086 'did you forget to pass in a list of machines?')
87
jadmanski0afbb632008-06-06 21:10:57 +000088 # Bypass the multithreading if only one machine.
mbligh415dc212009-06-15 21:53:34 +000089 if len(arglist) == 1:
90 arg = arglist[0]
91 if return_results:
92 try:
93 result = function(arg)
94 except Exception, e:
95 return [e]
96 return [result]
97 else:
98 function(arg)
99 return
jadmanski0afbb632008-06-06 21:10:57 +0000100
101 subcommands = []
102 for arg in arglist:
103 args = [arg]
Prathmesh Prabhud08c86b2017-07-21 16:14:33 -0700104 subdir = subdir_name_constructor(arg) if log else None
jadmanski0afbb632008-06-06 21:10:57 +0000105 subcommands.append(subcommand(function, args, subdir))
mbligh415dc212009-06-15 21:53:34 +0000106 return parallel(subcommands, timeout, return_results=return_results)
mblighb0fab822007-07-25 16:40:19 +0000107
108
jadmanski550fdc22008-11-20 16:32:08 +0000109class subcommand(object):
110 fork_hooks, join_hooks = [], []
111
showard75cdfee2009-06-10 17:40:41 +0000112 def __init__(self, func, args, subdir = None):
jadmanski0afbb632008-06-06 21:10:57 +0000113 # func(args) - the subcommand to run
114 # subdir - the subdirectory to log results in
jadmanski0afbb632008-06-06 21:10:57 +0000115 if subdir:
116 self.subdir = os.path.abspath(subdir)
117 if not os.path.exists(self.subdir):
118 os.mkdir(self.subdir)
119 self.debug = os.path.join(self.subdir, 'debug')
120 if not os.path.exists(self.debug):
121 os.mkdir(self.debug)
jadmanski0afbb632008-06-06 21:10:57 +0000122 else:
123 self.subdir = None
showard75cdfee2009-06-10 17:40:41 +0000124 self.debug = None
mblighd7685d32007-08-10 22:08:42 +0000125
jadmanski0afbb632008-06-06 21:10:57 +0000126 self.func = func
127 self.args = args
jadmanski0afbb632008-06-06 21:10:57 +0000128 self.pid = None
showardc408c5e2009-01-08 23:30:53 +0000129 self.returncode = None
mblighb0fab822007-07-25 16:40:19 +0000130
131
mbligha57cc922009-08-24 22:04:19 +0000132 def __str__(self):
133 return str('subcommand(func=%s, args=%s, subdir=%s)' %
134 (self.func, self.args, self.subdir))
135
136
jadmanski550fdc22008-11-20 16:32:08 +0000137 @classmethod
138 def register_fork_hook(cls, hook):
139 """ Register a function to be called from the child process after
140 forking. """
141 cls.fork_hooks.append(hook)
142
143
144 @classmethod
145 def register_join_hook(cls, hook):
146 """ Register a function to be called when from the child process
147 just before the child process terminates (joins to the parent). """
148 cls.join_hooks.append(hook)
149
150
jadmanski0afbb632008-06-06 21:10:57 +0000151 def redirect_output(self):
showard75cdfee2009-06-10 17:40:41 +0000152 if self.subdir and logging_manager_object:
153 tag = os.path.basename(self.subdir)
154 logging_manager_object.tee_redirect_debug_dir(self.debug, tag=tag)
mblighb0fab822007-07-25 16:40:19 +0000155
156
jadmanski0afbb632008-06-06 21:10:57 +0000157 def fork_start(self):
158 sys.stdout.flush()
159 sys.stderr.flush()
mblighe1836812009-04-17 18:25:14 +0000160 r, w = os.pipe()
jadmanski77e8da82009-04-21 14:26:40 +0000161 self.returncode = None
jadmanski0afbb632008-06-06 21:10:57 +0000162 self.pid = os.fork()
mblighb0fab822007-07-25 16:40:19 +0000163
jadmanski0afbb632008-06-06 21:10:57 +0000164 if self.pid: # I am the parent
mblighe1836812009-04-17 18:25:14 +0000165 os.close(w)
166 self.result_pickle = os.fdopen(r, 'r')
jadmanski0afbb632008-06-06 21:10:57 +0000167 return
mblighe1836812009-04-17 18:25:14 +0000168 else:
169 os.close(r)
mblighb0fab822007-07-25 16:40:19 +0000170
jadmanski0afbb632008-06-06 21:10:57 +0000171 # We are the child from this point on. Never return.
172 signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler
173 if self.subdir:
174 os.chdir(self.subdir)
175 self.redirect_output()
mblighb0fab822007-07-25 16:40:19 +0000176
jadmanski0afbb632008-06-06 21:10:57 +0000177 try:
jadmanski550fdc22008-11-20 16:32:08 +0000178 for hook in self.fork_hooks:
179 hook(self)
Paul Hobbs4ea95642017-08-20 11:22:57 -0700180 result = self.func(*self.args)
mbligha57cc922009-08-24 22:04:19 +0000181 os.write(w, cPickle.dumps(result, cPickle.HIGHEST_PROTOCOL))
jadmanski550fdc22008-11-20 16:32:08 +0000182 exit_code = 0
mbligha57cc922009-08-24 22:04:19 +0000183 except Exception, e:
184 logging.exception('function failed')
185 exit_code = 1
186 os.write(w, cPickle.dumps(e, cPickle.HIGHEST_PROTOCOL))
187
188 os.close(w)
jadmanski550fdc22008-11-20 16:32:08 +0000189
190 try:
191 for hook in self.join_hooks:
192 hook(self)
193 finally:
jadmanski0afbb632008-06-06 21:10:57 +0000194 sys.stdout.flush()
195 sys.stderr.flush()
jadmanski550fdc22008-11-20 16:32:08 +0000196 os._exit(exit_code)
mblighb0fab822007-07-25 16:40:19 +0000197
198
showardc408c5e2009-01-08 23:30:53 +0000199 def _handle_exitstatus(self, sts):
200 """
201 This is partially borrowed from subprocess.Popen.
202 """
203 if os.WIFSIGNALED(sts):
204 self.returncode = -os.WTERMSIG(sts)
205 elif os.WIFEXITED(sts):
206 self.returncode = os.WEXITSTATUS(sts)
jadmanski0afbb632008-06-06 21:10:57 +0000207 else:
showardc408c5e2009-01-08 23:30:53 +0000208 # Should never happen
209 raise RuntimeError("Unknown child exit status!")
mblighc3aee0f2008-01-17 16:26:39 +0000210
showardc408c5e2009-01-08 23:30:53 +0000211 if self.returncode != 0:
212 print "subcommand failed pid %d" % self.pid
jadmanski0afbb632008-06-06 21:10:57 +0000213 print "%s" % (self.func,)
showardc408c5e2009-01-08 23:30:53 +0000214 print "rc=%d" % self.returncode
jadmanski0afbb632008-06-06 21:10:57 +0000215 print
showard75cdfee2009-06-10 17:40:41 +0000216 if self.debug:
217 stderr_file = os.path.join(self.debug, 'autoserv.stderr')
218 if os.path.exists(stderr_file):
219 for line in open(stderr_file).readlines():
220 print line,
jadmanski0afbb632008-06-06 21:10:57 +0000221 print "\n--------------------------------------------\n"
showardc408c5e2009-01-08 23:30:53 +0000222 raise error.AutoservSubcommandError(self.func, self.returncode)
223
224
225 def poll(self):
226 """
227 This is borrowed from subprocess.Popen.
228 """
229 if self.returncode is None:
230 try:
231 pid, sts = os.waitpid(self.pid, os.WNOHANG)
232 if pid == self.pid:
233 self._handle_exitstatus(sts)
234 except os.error:
235 pass
236 return self.returncode
237
238
239 def wait(self):
240 """
241 This is borrowed from subprocess.Popen.
242 """
243 if self.returncode is None:
244 pid, sts = os.waitpid(self.pid, 0)
245 self._handle_exitstatus(sts)
246 return self.returncode
247
248
249 def fork_waitfor(self, timeout=None):
250 if not timeout:
251 return self.wait()
252 else:
David James3f638af2014-10-25 18:51:26 -0700253 _, result = retry.timeout(self.wait, timeout_sec=timeout)
showardc408c5e2009-01-08 23:30:53 +0000254
David James3f638af2014-10-25 18:51:26 -0700255 if result is None:
256 utils.nuke_pid(self.pid)
257 print "subcommand failed pid %d" % self.pid
258 print "%s" % (self.func,)
259 print "timeout after %ds" % timeout
260 print
261 result = self.wait()
262
263 return result