blob: 234c81a83b82e5bf1bc4f866e9c3c2eb6192baa2 [file] [log] [blame]
Derek Beckett63e1c442020-08-11 14:49:47 -07001# Lint as: python2, python3
2from __future__ import absolute_import
3from __future__ import division
4from __future__ import print_function
5
mblighb0fab822007-07-25 16:40:19 +00006__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
7
Derek Beckett63e1c442020-08-11 14:49:47 -07008import sys, os, signal, time, six.moves.cPickle, logging
jadmanskid93d7d22008-05-29 21:37:29 +00009
10from autotest_lib.client.common_lib import error, utils
David James3f638af2014-10-25 18:51:26 -070011from autotest_lib.client.common_lib.cros import retry
Derek Beckett63e1c442020-08-11 14:49:47 -070012from six.moves import zip
mblighb0fab822007-07-25 16:40:19 +000013
14
showard75cdfee2009-06-10 17:40:41 +000015# entry points that use subcommand must set this to their logging manager
16# to get log redirection for subcommands
17logging_manager_object = None
18
19
mblighe1836812009-04-17 18:25:14 +000020def parallel(tasklist, timeout=None, return_results=False):
mbligh415dc212009-06-15 21:53:34 +000021 """
22 Run a set of predefined subcommands in parallel.
23
24 @param tasklist: A list of subcommand instances to execute.
25 @param timeout: Number of seconds after which the commands should timeout.
26 @param return_results: If True instead of an AutoServError being raised
27 on any error a list of the results|exceptions from the tasks is
28 returned. [default: False]
mblighe1836812009-04-17 18:25:14 +000029 """
jadmanski0afbb632008-06-06 21:10:57 +000030 run_error = False
31 for task in tasklist:
32 task.fork_start()
mblighc3aee0f2008-01-17 16:26:39 +000033
jadmanski0afbb632008-06-06 21:10:57 +000034 remaining_timeout = None
35 if timeout:
36 endtime = time.time() + timeout
mblighc3aee0f2008-01-17 16:26:39 +000037
mblighe1836812009-04-17 18:25:14 +000038 results = []
jadmanski0afbb632008-06-06 21:10:57 +000039 for task in tasklist:
40 if timeout:
41 remaining_timeout = max(endtime - time.time(), 1)
42 try:
mbligha57cc922009-08-24 22:04:19 +000043 status = task.fork_waitfor(timeout=remaining_timeout)
jadmanski0afbb632008-06-06 21:10:57 +000044 except error.AutoservSubcommandError:
45 run_error = True
46 else:
47 if status != 0:
48 run_error = True
mbligh158ba7b2008-03-07 18:29:12 +000049
Derek Beckett63e1c442020-08-11 14:49:47 -070050 results.append(six.moves.cPickle.load(task.result_pickle))
mblighe1836812009-04-17 18:25:14 +000051 task.result_pickle.close()
52
53 if return_results:
54 return results
55 elif run_error:
mbligha57cc922009-08-24 22:04:19 +000056 message = 'One or more subcommands failed:\n'
57 for task, result in zip(tasklist, results):
58 message += 'task: %s returned/raised: %r\n' % (task, result)
59 raise error.AutoservError(message)
mblighb0fab822007-07-25 16:40:19 +000060
61
Prathmesh Prabhud08c86b2017-07-21 16:14:33 -070062def parallel_simple(function, arglist, subdir_name_constructor=lambda x: str(x),
63 log=True, timeout=None, return_results=False):
mbligh415dc212009-06-15 21:53:34 +000064 """
65 Each element in the arglist used to create a subcommand object,
jadmanski0afbb632008-06-06 21:10:57 +000066 where that arg is used both as a subdir name, and a single argument
67 to pass to "function".
mblighdd3235b2008-01-14 16:44:19 +000068
mbligh415dc212009-06-15 21:53:34 +000069 We create a subcommand object for each element in the list,
70 then execute those subcommand objects in parallel.
71
72 NOTE: As an optimization, if len(arglist) == 1 a subcommand is not used.
73
74 @param function: A callable to run in parallel once per arg in arglist.
Prathmesh Prabhud08c86b2017-07-21 16:14:33 -070075 @param arglist: A list of arguments to be used one per subcommand
76 @param subdir_name_constructor: A function that returns a name for the
77 result sub-directory created per subcommand.
78 Signature is:
79 subdir_name_constructor(arg)
80 where arg is the argument passed to function.
mbligh415dc212009-06-15 21:53:34 +000081 @param log: If True, output will be written to output in a subdirectory
82 named after each subcommand's arg.
83 @param timeout: Number of seconds after which the commands should timeout.
84 @param return_results: If True instead of an AutoServError being raised
85 on any error a list of the results|exceptions from the function
86 called on each arg is returned. [default: False]
87
88 @returns None or a list of results/exceptions.
89 """
mbligh26f0d882009-06-22 18:30:01 +000090 if not arglist:
Ilja H. Friedel04be2bd2014-05-07 21:29:59 -070091 logging.warning('parallel_simple was called with an empty arglist, '
Prathmesh Prabhud08c86b2017-07-21 16:14:33 -070092 'did you forget to pass in a list of machines?')
93
jadmanski0afbb632008-06-06 21:10:57 +000094 # Bypass the multithreading if only one machine.
mbligh415dc212009-06-15 21:53:34 +000095 if len(arglist) == 1:
96 arg = arglist[0]
97 if return_results:
98 try:
99 result = function(arg)
Derek Beckett63e1c442020-08-11 14:49:47 -0700100 except Exception as e:
mbligh415dc212009-06-15 21:53:34 +0000101 return [e]
102 return [result]
103 else:
104 function(arg)
105 return
jadmanski0afbb632008-06-06 21:10:57 +0000106
107 subcommands = []
108 for arg in arglist:
109 args = [arg]
Prathmesh Prabhud08c86b2017-07-21 16:14:33 -0700110 subdir = subdir_name_constructor(arg) if log else None
jadmanski0afbb632008-06-06 21:10:57 +0000111 subcommands.append(subcommand(function, args, subdir))
mbligh415dc212009-06-15 21:53:34 +0000112 return parallel(subcommands, timeout, return_results=return_results)
mblighb0fab822007-07-25 16:40:19 +0000113
114
jadmanski550fdc22008-11-20 16:32:08 +0000115class subcommand(object):
116 fork_hooks, join_hooks = [], []
117
showard75cdfee2009-06-10 17:40:41 +0000118 def __init__(self, func, args, subdir = None):
jadmanski0afbb632008-06-06 21:10:57 +0000119 # func(args) - the subcommand to run
120 # subdir - the subdirectory to log results in
jadmanski0afbb632008-06-06 21:10:57 +0000121 if subdir:
122 self.subdir = os.path.abspath(subdir)
123 if not os.path.exists(self.subdir):
124 os.mkdir(self.subdir)
125 self.debug = os.path.join(self.subdir, 'debug')
126 if not os.path.exists(self.debug):
127 os.mkdir(self.debug)
jadmanski0afbb632008-06-06 21:10:57 +0000128 else:
129 self.subdir = None
showard75cdfee2009-06-10 17:40:41 +0000130 self.debug = None
mblighd7685d32007-08-10 22:08:42 +0000131
jadmanski0afbb632008-06-06 21:10:57 +0000132 self.func = func
133 self.args = args
jadmanski0afbb632008-06-06 21:10:57 +0000134 self.pid = None
showardc408c5e2009-01-08 23:30:53 +0000135 self.returncode = None
mblighb0fab822007-07-25 16:40:19 +0000136
137
mbligha57cc922009-08-24 22:04:19 +0000138 def __str__(self):
139 return str('subcommand(func=%s, args=%s, subdir=%s)' %
140 (self.func, self.args, self.subdir))
141
142
jadmanski550fdc22008-11-20 16:32:08 +0000143 @classmethod
144 def register_fork_hook(cls, hook):
145 """ Register a function to be called from the child process after
146 forking. """
147 cls.fork_hooks.append(hook)
148
149
150 @classmethod
151 def register_join_hook(cls, hook):
152 """ Register a function to be called when from the child process
153 just before the child process terminates (joins to the parent). """
154 cls.join_hooks.append(hook)
155
156
jadmanski0afbb632008-06-06 21:10:57 +0000157 def redirect_output(self):
showard75cdfee2009-06-10 17:40:41 +0000158 if self.subdir and logging_manager_object:
159 tag = os.path.basename(self.subdir)
160 logging_manager_object.tee_redirect_debug_dir(self.debug, tag=tag)
mblighb0fab822007-07-25 16:40:19 +0000161
162
jadmanski0afbb632008-06-06 21:10:57 +0000163 def fork_start(self):
164 sys.stdout.flush()
165 sys.stderr.flush()
mblighe1836812009-04-17 18:25:14 +0000166 r, w = os.pipe()
jadmanski77e8da82009-04-21 14:26:40 +0000167 self.returncode = None
jadmanski0afbb632008-06-06 21:10:57 +0000168 self.pid = os.fork()
mblighb0fab822007-07-25 16:40:19 +0000169
jadmanski0afbb632008-06-06 21:10:57 +0000170 if self.pid: # I am the parent
mblighe1836812009-04-17 18:25:14 +0000171 os.close(w)
172 self.result_pickle = os.fdopen(r, 'r')
jadmanski0afbb632008-06-06 21:10:57 +0000173 return
mblighe1836812009-04-17 18:25:14 +0000174 else:
175 os.close(r)
mblighb0fab822007-07-25 16:40:19 +0000176
jadmanski0afbb632008-06-06 21:10:57 +0000177 # We are the child from this point on. Never return.
178 signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler
179 if self.subdir:
180 os.chdir(self.subdir)
181 self.redirect_output()
mblighb0fab822007-07-25 16:40:19 +0000182
jadmanski0afbb632008-06-06 21:10:57 +0000183 try:
jadmanski550fdc22008-11-20 16:32:08 +0000184 for hook in self.fork_hooks:
185 hook(self)
Paul Hobbs4ea95642017-08-20 11:22:57 -0700186 result = self.func(*self.args)
Derek Beckett63e1c442020-08-11 14:49:47 -0700187 os.write(w, six.moves.cPickle.dumps(result, six.moves.cPickle.HIGHEST_PROTOCOL))
jadmanski550fdc22008-11-20 16:32:08 +0000188 exit_code = 0
Derek Beckett63e1c442020-08-11 14:49:47 -0700189 except Exception as e:
mbligha57cc922009-08-24 22:04:19 +0000190 logging.exception('function failed')
191 exit_code = 1
Derek Beckett63e1c442020-08-11 14:49:47 -0700192 os.write(w, six.moves.cPickle.dumps(e, six.moves.cPickle.HIGHEST_PROTOCOL))
mbligha57cc922009-08-24 22:04:19 +0000193
194 os.close(w)
jadmanski550fdc22008-11-20 16:32:08 +0000195
196 try:
197 for hook in self.join_hooks:
198 hook(self)
199 finally:
jadmanski0afbb632008-06-06 21:10:57 +0000200 sys.stdout.flush()
201 sys.stderr.flush()
jadmanski550fdc22008-11-20 16:32:08 +0000202 os._exit(exit_code)
mblighb0fab822007-07-25 16:40:19 +0000203
204
showardc408c5e2009-01-08 23:30:53 +0000205 def _handle_exitstatus(self, sts):
206 """
207 This is partially borrowed from subprocess.Popen.
208 """
209 if os.WIFSIGNALED(sts):
210 self.returncode = -os.WTERMSIG(sts)
211 elif os.WIFEXITED(sts):
212 self.returncode = os.WEXITSTATUS(sts)
jadmanski0afbb632008-06-06 21:10:57 +0000213 else:
showardc408c5e2009-01-08 23:30:53 +0000214 # Should never happen
215 raise RuntimeError("Unknown child exit status!")
mblighc3aee0f2008-01-17 16:26:39 +0000216
showardc408c5e2009-01-08 23:30:53 +0000217 if self.returncode != 0:
Derek Beckett63e1c442020-08-11 14:49:47 -0700218 print("subcommand failed pid %d" % self.pid)
219 print("%s" % (self.func,))
220 print("rc=%d" % self.returncode)
221 print()
showard75cdfee2009-06-10 17:40:41 +0000222 if self.debug:
223 stderr_file = os.path.join(self.debug, 'autoserv.stderr')
224 if os.path.exists(stderr_file):
225 for line in open(stderr_file).readlines():
Derek Beckett63e1c442020-08-11 14:49:47 -0700226 print(line, end=' ')
227 print("\n--------------------------------------------\n")
showardc408c5e2009-01-08 23:30:53 +0000228 raise error.AutoservSubcommandError(self.func, self.returncode)
229
230
231 def poll(self):
232 """
233 This is borrowed from subprocess.Popen.
234 """
235 if self.returncode is None:
236 try:
237 pid, sts = os.waitpid(self.pid, os.WNOHANG)
238 if pid == self.pid:
239 self._handle_exitstatus(sts)
240 except os.error:
241 pass
242 return self.returncode
243
244
245 def wait(self):
246 """
247 This is borrowed from subprocess.Popen.
248 """
249 if self.returncode is None:
250 pid, sts = os.waitpid(self.pid, 0)
251 self._handle_exitstatus(sts)
252 return self.returncode
253
254
255 def fork_waitfor(self, timeout=None):
256 if not timeout:
257 return self.wait()
258 else:
David James3f638af2014-10-25 18:51:26 -0700259 _, result = retry.timeout(self.wait, timeout_sec=timeout)
showardc408c5e2009-01-08 23:30:53 +0000260
David James3f638af2014-10-25 18:51:26 -0700261 if result is None:
262 utils.nuke_pid(self.pid)
Derek Beckett63e1c442020-08-11 14:49:47 -0700263 print("subcommand failed pid %d" % self.pid)
264 print("%s" % (self.func,))
265 print("timeout after %ds" % timeout)
266 print()
David James3f638af2014-10-25 18:51:26 -0700267 result = self.wait()
268
269 return result