blob: 85898a109f4e7472c7a9ba325c694036238b1fb0 [file] [log] [blame]
mblighb0fab822007-07-25 16:40:19 +00001__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
2
mbligh589bf322008-05-27 21:28:15 +00003import sys, os, subprocess, traceback, time, signal
jadmanskid93d7d22008-05-29 21:37:29 +00004
5from autotest_lib.client.common_lib import error, utils
mblighb0fab822007-07-25 16:40:19 +00006
7
mblighc3aee0f2008-01-17 16:26:39 +00008def parallel(tasklist, timeout=None):
mblighb0fab822007-07-25 16:40:19 +00009 """Run an set of predefined subcommands in parallel"""
10 pids = []
mbligh589bf322008-05-27 21:28:15 +000011 run_error = False
mblighb0fab822007-07-25 16:40:19 +000012 for task in tasklist:
13 task.fork_start()
mblighc3aee0f2008-01-17 16:26:39 +000014
15 remaining_timeout = None
16 if timeout:
17 endtime = time.time() + timeout
18
mblighb0fab822007-07-25 16:40:19 +000019 for task in tasklist:
mblighc3aee0f2008-01-17 16:26:39 +000020 if timeout:
21 remaining_timeout = max(endtime - time.time(), 1)
mbligh158ba7b2008-03-07 18:29:12 +000022 try:
23 status = task.fork_waitfor(remaining_timeout)
mbligh589bf322008-05-27 21:28:15 +000024 except error.AutoservSubcommandError:
25 run_error = True
mbligh158ba7b2008-03-07 18:29:12 +000026 else:
27 if status != 0:
mbligh589bf322008-05-27 21:28:15 +000028 run_error = True
mbligh158ba7b2008-03-07 18:29:12 +000029
mbligh589bf322008-05-27 21:28:15 +000030 if run_error:
31 raise error.AutoservError('One or more subcommands failed')
mblighb0fab822007-07-25 16:40:19 +000032
33
mblighc3aee0f2008-01-17 16:26:39 +000034def parallel_simple(function, arglist, log=True, timeout=None):
mblighb0fab822007-07-25 16:40:19 +000035 """Each element in the arglist used to create a subcommand object,
36 where that arg is used both as a subdir name, and a single argument
37 to pass to "function".
38 We create a subcommand object for each element in the list,
39 then execute those subcommand objects in parallel."""
mblighdd3235b2008-01-14 16:44:19 +000040
41 # Bypass the multithreading if only one machine.
42 if len (arglist) == 1:
43 function(arglist[0])
44 return
45
mblighb0fab822007-07-25 16:40:19 +000046 subcommands = []
47 for arg in arglist:
mbligh0c9e7822007-07-25 22:47:51 +000048 args = [arg]
mbligh84c0ab12007-10-24 21:28:58 +000049 if log:
50 subdir = str(arg)
51 else:
mblighc3aee0f2008-01-17 16:26:39 +000052 subdir = None
mbligh0c9e7822007-07-25 22:47:51 +000053 subcommands.append(subcommand(function, args, subdir))
mblighc3aee0f2008-01-17 16:26:39 +000054 parallel(subcommands, timeout)
mblighb0fab822007-07-25 16:40:19 +000055
56
mblighcee25b12007-08-31 08:53:05 +000057def _where_art_thy_filehandles():
mblighb0fab822007-07-25 16:40:19 +000058 os.system("ls -l /proc/%d/fd >> /dev/tty" % os.getpid())
59
mblighdc735a22007-08-02 16:54:37 +000060
mblighcee25b12007-08-31 08:53:05 +000061def _print_to_tty(string):
mblighb0fab822007-07-25 16:40:19 +000062 open('/dev/tty', 'w').write(string + '\n')
63
64
mblighcee25b12007-08-31 08:53:05 +000065def _redirect_stream(fd, output):
mblighb0fab822007-07-25 16:40:19 +000066 newfd = os.open(output, os.O_WRONLY | os.O_CREAT)
67 os.dup2(newfd, fd)
68 os.close(newfd)
69 if fd == 1:
70 sys.stdout = os.fdopen(fd, 'w')
71 if fd == 2:
72 sys.stderr = os.fdopen(fd, 'w')
73
74
mblighcee25b12007-08-31 08:53:05 +000075def _redirect_stream_tee(fd, output, tag):
mblighb0fab822007-07-25 16:40:19 +000076 """Use the low-level fork & pipe operations here to get a fd,
77 not a filehandle. This ensures that we get both the
78 filehandle and fd for stdout/stderr redirected correctly."""
79 r, w = os.pipe()
80 pid = os.fork()
81 if pid: # Parent
82 os.dup2(w, fd)
83 os.close(r)
84 os.close(w)
85 if fd == 1:
86 sys.stdout = os.fdopen(fd, 'w', 1)
87 if fd == 2:
88 sys.stderr = os.fdopen(fd, 'w', 1)
89 return
90 else: # Child
91 os.close(w)
92 log = open(output, 'w')
93 f = os.fdopen(r, 'r')
94 for line in iter(f.readline, ''):
95 # Tee straight to file
96 log.write(line)
97 log.flush()
98 # Prepend stdout with the tag
99 print tag + ' : ' + line,
100 sys.stdout.flush()
101 log.close()
102 os._exit(0)
103
104
105class subcommand:
mblighd7685d32007-08-10 22:08:42 +0000106 def __init__(self, func, args, subdir = None, stdprint = True):
mblighb0fab822007-07-25 16:40:19 +0000107 # func(args) - the subcommand to run
108 # subdir - the subdirectory to log results in
mblighd7685d32007-08-10 22:08:42 +0000109 # stdprint - whether to print results to stdout/stderr
110 if subdir:
111 self.subdir = os.path.abspath(subdir)
mbligha1956d32008-02-08 16:49:56 +0000112 if not os.path.exists(self.subdir):
113 os.mkdir(self.subdir)
mblighd7685d32007-08-10 22:08:42 +0000114 self.debug = os.path.join(self.subdir, 'debug')
mbligh61878d92008-02-08 16:50:17 +0000115 if not os.path.exists(self.debug):
116 os.mkdir(self.debug)
mblighd7685d32007-08-10 22:08:42 +0000117 self.stdout = os.path.join(self.debug, 'stdout')
118 self.stderr = os.path.join(self.debug, 'stderr')
119 else:
120 self.subdir = None
121 self.debug = '/dev/null'
122 self.stdout = '/dev/null'
123 self.stderr = '/dev/null'
124
mblighb0fab822007-07-25 16:40:19 +0000125 self.func = func
126 self.args = args
127 self.lambda_function = lambda: func(*args)
128 self.pid = None
mblighd7685d32007-08-10 22:08:42 +0000129 self.stdprint = stdprint
mblighb0fab822007-07-25 16:40:19 +0000130
131
132 def redirect_output(self):
mblighd7685d32007-08-10 22:08:42 +0000133 if self.stdprint:
134 if self.subdir:
135 tag = os.path.basename(self.subdir)
mblighcee25b12007-08-31 08:53:05 +0000136 _redirect_stream_tee(1, self.stdout, tag)
137 _redirect_stream_tee(2, self.stderr, tag)
mblighb491d022007-08-09 23:04:56 +0000138 else:
mblighcee25b12007-08-31 08:53:05 +0000139 _redirect_stream(1, self.stdout)
140 _redirect_stream(2, self.stderr)
mblighb0fab822007-07-25 16:40:19 +0000141
142
143 def fork_start(self):
144 sys.stdout.flush()
145 sys.stderr.flush()
146 self.pid = os.fork()
147
148 if self.pid: # I am the parent
149 return
150
151 # We are the child from this point on. Never return.
mblighbb421852008-03-11 22:36:16 +0000152 signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler
mblighd7685d32007-08-10 22:08:42 +0000153 if self.subdir:
154 os.chdir(self.subdir)
mblighb0fab822007-07-25 16:40:19 +0000155 self.redirect_output()
156
157 try:
158 self.lambda_function()
159
160 except:
mbligh42ff92f2007-11-24 19:13:15 +0000161 traceback.print_exc()
mblighb0fab822007-07-25 16:40:19 +0000162 sys.stdout.flush()
163 sys.stderr.flush()
164 os._exit(1)
165
166 sys.stdout.flush()
167 sys.stderr.flush()
168 os._exit(0)
169
170
mblighc3aee0f2008-01-17 16:26:39 +0000171 def fork_waitfor(self, timeout=None):
172 if not timeout:
173 (pid, status) = os.waitpid(self.pid, 0)
174 else:
175 pid = None
176 start_time = time.time()
177 while time.time() <= start_time + timeout:
178 (pid, status) = os.waitpid(self.pid, os.WNOHANG)
179 if pid:
180 break
181 time.sleep(1)
182
183 if not pid:
184 utils.nuke_pid(self.pid)
185 print "subcommand failed pid %d" % self.pid
mbligheec4d7a2008-02-08 16:50:43 +0000186 print "%s" % (self.func,)
mblighc3aee0f2008-01-17 16:26:39 +0000187 print "timeout after %ds" % timeout
188 print
189 return None
mblighb0fab822007-07-25 16:40:19 +0000190
191 if status != 0:
192 print "subcommand failed pid %d" % pid
mbligheec4d7a2008-02-08 16:50:43 +0000193 print "%s" % (self.func,)
mblighb0fab822007-07-25 16:40:19 +0000194 print "rc=%d" % status
195 print
196 if os.path.exists(self.stderr):
197 for line in open(self.stderr).readlines():
198 print line,
199 print "\n--------------------------------------------\n"
mbligh589bf322008-05-27 21:28:15 +0000200 raise error.AutoservSubcommandError(self.func, status)
mblighb0fab822007-07-25 16:40:19 +0000201 return status