blob: 94379799fec64c605c83a9a99f580a4241ed98c3 [file] [log] [blame]
mbligh63073c92008-03-31 16:49:32 +00001#!/usr/bin/python
2#
3# Copyright 2008 Google Inc. Released under the GPL v2
4
mbligh849a0f62008-08-28 20:12:19 +00005import os, pickle, random, re, resource, select, shutil, signal, StringIO
6import socket, struct, subprocess, sys, time, textwrap, urllib, urlparse
7import warnings
mbligh76a42932008-10-09 20:35:38 +00008from autotest_lib.client.common_lib import error, barrier, debug
mbligh81edd792008-08-26 16:54:02 +00009
mbligh849a0f62008-08-28 20:12:19 +000010def deprecated(func):
11 """This is a decorator which can be used to mark functions as deprecated.
12 It will result in a warning being emmitted when the function is used."""
13 def new_func(*args, **dargs):
14 warnings.warn("Call to deprecated function %s." % func.__name__,
15 category=DeprecationWarning)
16 return func(*args, **dargs)
17 new_func.__name__ = func.__name__
18 new_func.__doc__ = func.__doc__
19 new_func.__dict__.update(func.__dict__)
20 return new_func
21
22
23class BgJob(object):
mblighbd96b452008-09-03 23:14:27 +000024 def __init__(self, command, stdout_tee=None, stderr_tee=None, verbose=True):
mbligh849a0f62008-08-28 20:12:19 +000025 self.command = command
26 self.stdout_tee = stdout_tee
27 self.stderr_tee = stderr_tee
28 self.result = CmdResult(command)
mbligh76a42932008-10-09 20:35:38 +000029 self.log = debug.get_logger()
mblighbd96b452008-09-03 23:14:27 +000030 if verbose:
mbligh76a42932008-10-09 20:35:38 +000031 self.log.debug("running: %s" % command)
mbligh849a0f62008-08-28 20:12:19 +000032 self.sp = subprocess.Popen(command, stdout=subprocess.PIPE,
33 stderr=subprocess.PIPE,
34 preexec_fn=self._reset_sigpipe, shell=True,
35 executable="/bin/bash")
36
37
38 def output_prepare(self, stdout_file=None, stderr_file=None):
39 self.stdout_file = stdout_file
40 self.stderr_file = stderr_file
41
42 def process_output(self, stdout=True, final_read=False):
43 """output_prepare must be called prior to calling this"""
44 if stdout:
45 pipe, buf, tee = self.sp.stdout, self.stdout_file, self.stdout_tee
46 else:
47 pipe, buf, tee = self.sp.stderr, self.stderr_file, self.stderr_tee
48
49 if final_read:
50 # read in all the data we can from pipe and then stop
51 data = []
52 while select.select([pipe], [], [], 0)[0]:
53 data.append(os.read(pipe.fileno(), 1024))
54 if len(data[-1]) == 0:
55 break
56 data = "".join(data)
57 else:
58 # perform a single read
59 data = os.read(pipe.fileno(), 1024)
60 buf.write(data)
61 if tee:
62 tee.write(data)
63 tee.flush()
64
65
66 def cleanup(self):
67 self.sp.stdout.close()
68 self.sp.stderr.close()
69 self.result.stdout = self.stdout_file.getvalue()
70 self.result.stderr = self.stderr_file.getvalue()
71
72
73 def _reset_sigpipe(self):
74 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
75
mbligh81edd792008-08-26 16:54:02 +000076
77def ip_to_long(ip):
78 # !L is a long in network byte order
79 return struct.unpack('!L', socket.inet_aton(ip))[0]
80
81
82def long_to_ip(number):
83 # See above comment.
84 return socket.inet_ntoa(struct.pack('!L', number))
85
86
87def create_subnet_mask(bits):
88 # ~ does weird things in python...but this does work
89 return (1 << 32) - (1 << 32-bits)
90
91
92def format_ip_with_mask(ip, mask_bits):
93 masked_ip = ip_to_long(ip) & create_subnet_mask(mask_bits)
94 return "%s/%s" % (long_to_ip(masked_ip), mask_bits)
mbligh6231cd62008-02-02 19:18:33 +000095
mblighde0d47e2008-03-28 14:37:18 +000096
jadmanskie80d4712008-10-03 16:15:59 +000097def normalize_hostname(alias):
98 ip = socket.gethostbyname(alias)
99 return socket.gethostbyaddr(ip)[0]
100
101
mblighd6d043c2008-09-27 21:00:45 +0000102def get_ip_local_port_range():
103 match = re.match(r'\s*(\d+)\s*(\d+)\s*$',
104 read_one_line('/proc/sys/net/ipv4/ip_local_port_range'))
105 return (int(match.group(1)), int(match.group(2)))
106
107
108def set_ip_local_port_range(lower, upper):
109 write_one_line('/proc/sys/net/ipv4/ip_local_port_range',
110 '%d %d\n' % (lower, upper))
111
mbligh315b9412008-10-01 03:34:11 +0000112
jadmanski5182e162008-05-13 21:48:16 +0000113def read_one_line(filename):
mbligh6e8840c2008-07-11 18:05:49 +0000114 return open(filename, 'r').readline().rstrip('\n')
jadmanski5182e162008-05-13 21:48:16 +0000115
116
117def write_one_line(filename, str):
mbligh618ac9e2008-10-06 17:14:32 +0000118 f = open(filename, 'w')
119 f.write(str.rstrip('\n') + '\n')
120 f.flush()
jadmanski5182e162008-05-13 21:48:16 +0000121
122
mblighde0d47e2008-03-28 14:37:18 +0000123def read_keyval(path):
jadmanski0afbb632008-06-06 21:10:57 +0000124 """
125 Read a key-value pair format file into a dictionary, and return it.
126 Takes either a filename or directory name as input. If it's a
127 directory name, we assume you want the file to be called keyval.
128 """
129 if os.path.isdir(path):
130 path = os.path.join(path, 'keyval')
131 keyval = {}
132 for line in open(path):
jadmanskia6014a02008-07-14 19:41:54 +0000133 line = re.sub('#.*', '', line).rstrip()
jadmanski0afbb632008-06-06 21:10:57 +0000134 if not re.search(r'^[-\w]+=', line):
135 raise ValueError('Invalid format line: %s' % line)
136 key, value = line.split('=', 1)
137 if re.search('^\d+$', value):
138 value = int(value)
139 elif re.search('^(\d+\.)?\d+$', value):
140 value = float(value)
141 keyval[key] = value
142 return keyval
mblighde0d47e2008-03-28 14:37:18 +0000143
144
jadmanskicc549172008-05-21 18:11:51 +0000145def write_keyval(path, dictionary, type_tag=None):
jadmanski0afbb632008-06-06 21:10:57 +0000146 """
147 Write a key-value pair format file out to a file. This uses append
148 mode to open the file, so existing text will not be overwritten or
149 reparsed.
jadmanskicc549172008-05-21 18:11:51 +0000150
jadmanski0afbb632008-06-06 21:10:57 +0000151 If type_tag is None, then the key must be composed of alphanumeric
152 characters (or dashes+underscores). However, if type-tag is not
153 null then the keys must also have "{type_tag}" as a suffix. At
154 the moment the only valid values of type_tag are "attr" and "perf".
155 """
156 if os.path.isdir(path):
157 path = os.path.join(path, 'keyval')
158 keyval = open(path, 'a')
jadmanskicc549172008-05-21 18:11:51 +0000159
jadmanski0afbb632008-06-06 21:10:57 +0000160 if type_tag is None:
161 key_regex = re.compile(r'^[-\w]+$')
162 else:
163 if type_tag not in ('attr', 'perf'):
164 raise ValueError('Invalid type tag: %s' % type_tag)
165 escaped_tag = re.escape(type_tag)
166 key_regex = re.compile(r'^[-\w]+\{%s\}$' % escaped_tag)
167 try:
168 for key, value in dictionary.iteritems():
169 if not key_regex.search(key):
170 raise ValueError('Invalid key: %s' % key)
171 keyval.write('%s=%s\n' % (key, value))
172 finally:
173 keyval.close()
mbligh6231cd62008-02-02 19:18:33 +0000174
175
176def is_url(path):
jadmanski0afbb632008-06-06 21:10:57 +0000177 """Return true if path looks like a URL"""
178 # for now, just handle http and ftp
179 url_parts = urlparse.urlparse(path)
180 return (url_parts[0] in ('http', 'ftp'))
mbligh6231cd62008-02-02 19:18:33 +0000181
182
jadmanskied91ba92008-09-30 17:19:27 +0000183def urlopen(url, data=None, proxies=None, timeout=5):
jadmanski0afbb632008-06-06 21:10:57 +0000184 """Wrapper to urllib.urlopen with timeout addition."""
mbligh02ff2d52008-06-03 15:00:21 +0000185
jadmanski0afbb632008-06-06 21:10:57 +0000186 # Save old timeout
187 old_timeout = socket.getdefaulttimeout()
188 socket.setdefaulttimeout(timeout)
189 try:
190 return urllib.urlopen(url, data=data, proxies=proxies)
191 finally:
192 socket.setdefaulttimeout(old_timeout)
mbligh02ff2d52008-06-03 15:00:21 +0000193
194
195def urlretrieve(url, filename=None, reporthook=None, data=None, timeout=300):
jadmanski0afbb632008-06-06 21:10:57 +0000196 """Wrapper to urllib.urlretrieve with timeout addition."""
197 old_timeout = socket.getdefaulttimeout()
198 socket.setdefaulttimeout(timeout)
199 try:
200 return urllib.urlretrieve(url, filename=filename,
201 reporthook=reporthook, data=data)
202 finally:
203 socket.setdefaulttimeout(old_timeout)
204
mbligh02ff2d52008-06-03 15:00:21 +0000205
mbligh6231cd62008-02-02 19:18:33 +0000206def get_file(src, dest, permissions=None):
jadmanski0afbb632008-06-06 21:10:57 +0000207 """Get a file from src, which can be local or a remote URL"""
208 if (src == dest):
209 return
210 if (is_url(src)):
211 print 'PWD: ' + os.getcwd()
212 print 'Fetching \n\t', src, '\n\t->', dest
213 try:
214 urllib.urlretrieve(src, dest)
215 except IOError, e:
216 raise error.AutotestError('Unable to retrieve %s (to %s)'
217 % (src, dest), e)
218 else:
219 shutil.copyfile(src, dest)
220 if permissions:
221 os.chmod(dest, permissions)
222 return dest
mbligh6231cd62008-02-02 19:18:33 +0000223
224
225def unmap_url(srcdir, src, destdir='.'):
jadmanski0afbb632008-06-06 21:10:57 +0000226 """
227 Receives either a path to a local file or a URL.
228 returns either the path to the local file, or the fetched URL
mbligh6231cd62008-02-02 19:18:33 +0000229
jadmanski0afbb632008-06-06 21:10:57 +0000230 unmap_url('/usr/src', 'foo.tar', '/tmp')
231 = '/usr/src/foo.tar'
232 unmap_url('/usr/src', 'http://site/file', '/tmp')
233 = '/tmp/file'
234 (after retrieving it)
235 """
236 if is_url(src):
237 url_parts = urlparse.urlparse(src)
238 filename = os.path.basename(url_parts[2])
239 dest = os.path.join(destdir, filename)
240 return get_file(src, dest)
241 else:
242 return os.path.join(srcdir, src)
mbligh6231cd62008-02-02 19:18:33 +0000243
244
245def update_version(srcdir, preserve_srcdir, new_version, install,
jadmanski0afbb632008-06-06 21:10:57 +0000246 *args, **dargs):
247 """
248 Make sure srcdir is version new_version
mbligh6231cd62008-02-02 19:18:33 +0000249
jadmanski0afbb632008-06-06 21:10:57 +0000250 If not, delete it and install() the new version.
mbligh6231cd62008-02-02 19:18:33 +0000251
jadmanski0afbb632008-06-06 21:10:57 +0000252 In the preserve_srcdir case, we just check it's up to date,
253 and if not, we rerun install, without removing srcdir
254 """
255 versionfile = os.path.join(srcdir, '.version')
256 install_needed = True
mbligh6231cd62008-02-02 19:18:33 +0000257
jadmanski0afbb632008-06-06 21:10:57 +0000258 if os.path.exists(versionfile):
259 old_version = pickle.load(open(versionfile))
260 if old_version == new_version:
261 install_needed = False
mbligh6231cd62008-02-02 19:18:33 +0000262
jadmanski0afbb632008-06-06 21:10:57 +0000263 if install_needed:
264 if not preserve_srcdir and os.path.exists(srcdir):
265 shutil.rmtree(srcdir)
266 install(*args, **dargs)
267 if os.path.exists(srcdir):
268 pickle.dump(new_version, open(versionfile, 'w'))
mbligh462c0152008-03-13 15:37:10 +0000269
270
mbligh63073c92008-03-31 16:49:32 +0000271def run(command, timeout=None, ignore_status=False,
mblighbd96b452008-09-03 23:14:27 +0000272 stdout_tee=None, stderr_tee=None, verbose=True):
jadmanski0afbb632008-06-06 21:10:57 +0000273 """
274 Run a command on the host.
mbligh63073c92008-03-31 16:49:32 +0000275
jadmanski0afbb632008-06-06 21:10:57 +0000276 Args:
277 command: the command line string
278 timeout: time limit in seconds before attempting to
279 kill the running process. The run() function
280 will take a few seconds longer than 'timeout'
281 to complete if it has to kill the process.
282 ignore_status: do not raise an exception, no matter what
283 the exit code of the command is.
284 stdout_tee: optional file-like object to which stdout data
285 will be written as it is generated (data will still
286 be stored in result.stdout)
287 stderr_tee: likewise for stderr
mbligh63073c92008-03-31 16:49:32 +0000288
jadmanski0afbb632008-06-06 21:10:57 +0000289 Returns:
290 a CmdResult object
mbligh63073c92008-03-31 16:49:32 +0000291
jadmanski0afbb632008-06-06 21:10:57 +0000292 Raises:
293 CmdError: the exit code of the command
294 execution was not 0
295 """
mblighbd96b452008-09-03 23:14:27 +0000296 bg_job = join_bg_jobs((BgJob(command, stdout_tee, stderr_tee, verbose),),
mbligh849a0f62008-08-28 20:12:19 +0000297 timeout)[0]
298 if not ignore_status and bg_job.result.exit_status:
jadmanski9c1098b2008-09-02 14:18:48 +0000299 raise error.CmdError(command, bg_job.result,
mbligh849a0f62008-08-28 20:12:19 +0000300 "Command returned non-zero exit status")
mbligh63073c92008-03-31 16:49:32 +0000301
mbligh849a0f62008-08-28 20:12:19 +0000302 return bg_job.result
mbligh63073c92008-03-31 16:49:32 +0000303
mbligha5630a52008-09-03 22:09:50 +0000304def run_parallel(commands, timeout=None, ignore_status=False,
305 stdout_tee=None, stderr_tee=None):
306 """Beahves the same as run with the following exceptions:
307
308 - commands is a list of commands to run in parallel.
309 - ignore_status toggles whether or not an exception should be raised
310 on any error.
311
312 returns a list of CmdResult objects
313 """
314 bg_jobs = []
315 for command in commands:
316 bg_jobs.append(BgJob(command, stdout_tee, stderr_tee))
317
318 # Updates objects in bg_jobs list with their process information
319 join_bg_jobs(bg_jobs, timeout)
320
321 for bg_job in bg_jobs:
322 if not ignore_status and bg_job.result.exit_status:
323 raise error.CmdError(command, bg_job.result,
324 "Command returned non-zero exit status")
325
326 return [bg_job.result for bg_job in bg_jobs]
327
328
mbligh849a0f62008-08-28 20:12:19 +0000329@deprecated
mbligh63073c92008-03-31 16:49:32 +0000330def run_bg(command):
mbligh849a0f62008-08-28 20:12:19 +0000331 """Function deprecated. Please use BgJob class instead."""
332 bg_job = BgJob(command)
333 return bg_job.sp, bg_job.result
mbligh63073c92008-03-31 16:49:32 +0000334
335
mbligh849a0f62008-08-28 20:12:19 +0000336def join_bg_jobs(bg_jobs, timeout=None):
mbligha5630a52008-09-03 22:09:50 +0000337 """Joins the bg_jobs with the current thread.
338
339 Returns the same list of bg_jobs objects that was passed in.
340 """
mbligh849a0f62008-08-28 20:12:19 +0000341 ret, timeouterr = 0, False
342 for bg_job in bg_jobs:
343 bg_job.output_prepare(StringIO.StringIO(), StringIO.StringIO())
mbligh63073c92008-03-31 16:49:32 +0000344
jadmanski0afbb632008-06-06 21:10:57 +0000345 try:
346 # We are holding ends to stdin, stdout pipes
347 # hence we need to be sure to close those fds no mater what
348 start_time = time.time()
mbligh849a0f62008-08-28 20:12:19 +0000349 timeout_error = _wait_for_commands(bg_jobs, start_time, timeout)
350
351 for bg_job in bg_jobs:
352 # Process stdout and stderr
353 bg_job.process_output(stdout=True,final_read=True)
354 bg_job.process_output(stdout=False,final_read=True)
jadmanski0afbb632008-06-06 21:10:57 +0000355 finally:
356 # close our ends of the pipes to the sp no matter what
mbligh849a0f62008-08-28 20:12:19 +0000357 for bg_job in bg_jobs:
358 bg_job.cleanup()
mbligh63073c92008-03-31 16:49:32 +0000359
mbligh849a0f62008-08-28 20:12:19 +0000360 if timeout_error:
361 # TODO: This needs to be fixed to better represent what happens when
362 # running in parallel. However this is backwards compatable, so it will
363 # do for the time being.
364 raise error.CmdError(bg_jobs[0].command, bg_jobs[0].result,
365 "Command(s) did not complete within %d seconds"
366 % timeout)
mbligh63073c92008-03-31 16:49:32 +0000367
mbligh63073c92008-03-31 16:49:32 +0000368
mbligh849a0f62008-08-28 20:12:19 +0000369 return bg_jobs
mbligh63073c92008-03-31 16:49:32 +0000370
mbligh849a0f62008-08-28 20:12:19 +0000371
372def _wait_for_commands(bg_jobs, start_time, timeout):
373 # This returns True if it must return due to a timeout, otherwise False.
374
mblighf0b4a0a2008-09-03 20:46:16 +0000375 # To check for processes which terminate without producing any output
376 # a 1 second timeout is used in select.
377 SELECT_TIMEOUT = 1
378
mbligh849a0f62008-08-28 20:12:19 +0000379 select_list = []
380 reverse_dict = {}
381 for bg_job in bg_jobs:
382 select_list.append(bg_job.sp.stdout)
383 select_list.append(bg_job.sp.stderr)
384 reverse_dict[bg_job.sp.stdout] = (bg_job,True)
385 reverse_dict[bg_job.sp.stderr] = (bg_job,False)
386
jadmanski0afbb632008-06-06 21:10:57 +0000387 if timeout:
388 stop_time = start_time + timeout
389 time_left = stop_time - time.time()
390 else:
391 time_left = None # so that select never times out
392 while not timeout or time_left > 0:
393 # select will return when stdout is ready (including when it is
394 # EOF, that is the process has terminated).
mblighf0b4a0a2008-09-03 20:46:16 +0000395 ready, _, _ = select.select(select_list, [], [], SELECT_TIMEOUT)
mbligh849a0f62008-08-28 20:12:19 +0000396
jadmanski0afbb632008-06-06 21:10:57 +0000397 # os.read() has to be used instead of
398 # subproc.stdout.read() which will otherwise block
mbligh849a0f62008-08-28 20:12:19 +0000399 for fileno in ready:
400 bg_job,stdout = reverse_dict[fileno]
401 bg_job.process_output(stdout)
mbligh63073c92008-03-31 16:49:32 +0000402
mbligh849a0f62008-08-28 20:12:19 +0000403 remaining_jobs = [x for x in bg_jobs if x.result.exit_status is None]
404 if len(remaining_jobs) == 0:
405 return False
406 for bg_job in remaining_jobs:
407 bg_job.result.exit_status = bg_job.sp.poll()
mbligh8ea61e22008-05-09 18:09:37 +0000408
jadmanski0afbb632008-06-06 21:10:57 +0000409 if timeout:
410 time_left = stop_time - time.time()
mbligh63073c92008-03-31 16:49:32 +0000411
mbligh849a0f62008-08-28 20:12:19 +0000412 # Kill all processes which did not complete prior to timeout
413 for bg_job in [x for x in bg_jobs if x.result.exit_status is None]:
414 nuke_subprocess(bg_job.sp)
mbligh095dc642008-10-01 03:41:35 +0000415 bg_job.result.exit_status = bg_job.sp.poll()
mbligh8ea61e22008-05-09 18:09:37 +0000416
mbligh849a0f62008-08-28 20:12:19 +0000417 return True
mbligh63073c92008-03-31 16:49:32 +0000418
419
mbligh63073c92008-03-31 16:49:32 +0000420def nuke_subprocess(subproc):
jadmanski09f92032008-09-17 14:05:27 +0000421 # check if the subprocess is still alive, first
422 if subproc.poll() is not None:
423 return subproc.poll()
424
jadmanski0afbb632008-06-06 21:10:57 +0000425 # the process has not terminated within timeout,
426 # kill it via an escalating series of signals.
427 signal_queue = [signal.SIGTERM, signal.SIGKILL]
428 for sig in signal_queue:
429 try:
430 os.kill(subproc.pid, sig)
431 # The process may have died before we could kill it.
432 except OSError:
433 pass
mbligh63073c92008-03-31 16:49:32 +0000434
jadmanski0afbb632008-06-06 21:10:57 +0000435 for i in range(5):
436 rc = subproc.poll()
437 if rc != None:
438 return rc
439 time.sleep(1)
mbligh63073c92008-03-31 16:49:32 +0000440
441
442def nuke_pid(pid):
jadmanski0afbb632008-06-06 21:10:57 +0000443 # the process has not terminated within timeout,
444 # kill it via an escalating series of signals.
445 signal_queue = [signal.SIGTERM, signal.SIGKILL]
446 for sig in signal_queue:
447 try:
448 os.kill(pid, sig)
mbligh63073c92008-03-31 16:49:32 +0000449
jadmanski0afbb632008-06-06 21:10:57 +0000450 # The process may have died before we could kill it.
451 except OSError:
452 pass
mbligh63073c92008-03-31 16:49:32 +0000453
jadmanski0afbb632008-06-06 21:10:57 +0000454 try:
455 for i in range(5):
456 status = os.waitpid(pid, os.WNOHANG)[0]
457 if status == pid:
458 return
459 time.sleep(1)
mbligh63073c92008-03-31 16:49:32 +0000460
jadmanski0afbb632008-06-06 21:10:57 +0000461 if status != pid:
462 raise error.AutoservRunError('Could not kill %d'
463 % pid, None)
mbligh63073c92008-03-31 16:49:32 +0000464
jadmanski0afbb632008-06-06 21:10:57 +0000465 # the process died before we join it.
466 except OSError:
467 pass
mbligh63073c92008-03-31 16:49:32 +0000468
469
mbligh63073c92008-03-31 16:49:32 +0000470
471def system(command, timeout=None, ignore_status=False):
mbligha5630a52008-09-03 22:09:50 +0000472 """This function returns the exit status of command."""
jadmanski0afbb632008-06-06 21:10:57 +0000473 return run(command, timeout, ignore_status,
mbligha5630a52008-09-03 22:09:50 +0000474 stdout_tee=sys.stdout, stderr_tee=sys.stderr).exit_status
mbligh63073c92008-03-31 16:49:32 +0000475
476
mbligha5630a52008-09-03 22:09:50 +0000477def system_parallel(commands, timeout=None, ignore_status=False):
478 """This function returns a list of exit statuses for the respective
479 list of commands."""
480 return [bg_jobs.exit_status for bg_jobs in
481 run_parallel(commands, timeout, ignore_status,
482 stdout_tee=sys.stdout, stderr_tee=sys.stderr)]
mbligh849a0f62008-08-28 20:12:19 +0000483
484
mbligh8ea61e22008-05-09 18:09:37 +0000485def system_output(command, timeout=None, ignore_status=False,
jadmanski0afbb632008-06-06 21:10:57 +0000486 retain_output=False):
487 if retain_output:
488 out = run(command, timeout, ignore_status,
489 stdout_tee=sys.stdout, stderr_tee=sys.stderr).stdout
490 else:
491 out = run(command, timeout, ignore_status).stdout
492 if out[-1:] == '\n': out = out[:-1]
493 return out
mbligh63073c92008-03-31 16:49:32 +0000494
mbligh849a0f62008-08-28 20:12:19 +0000495
mbligha5630a52008-09-03 22:09:50 +0000496def system_output_parallel(commands, timeout=None, ignore_status=False,
497 retain_output=False):
498 if retain_output:
499 out = [bg_job.stdout for bg_job in run_parallel(commands, timeout,
500 ignore_status,
501 stdout_tee=sys.stdout,
502 stderr_tee=sys.stderr)]
503 else:
504 out = [bg_job.stdout for bg_job in run_parallel(commands, timeout,
505 ignore_status)]
506 for x in out:
507 if out[-1:] == '\n': out = out[:-1]
508 return out
509
510
511def get_cpu_percentage(function, *args, **dargs):
512 """Returns a tuple containing the CPU% and return value from function call.
513
514 This function calculates the usage time by taking the difference of
515 the user and system times both before and after the function call.
516 """
517 child_pre = resource.getrusage(resource.RUSAGE_CHILDREN)
518 self_pre = resource.getrusage(resource.RUSAGE_SELF)
519 start = time.time()
520 to_return = function(*args, **dargs)
521 elapsed = time.time() - start
522 self_post = resource.getrusage(resource.RUSAGE_SELF)
523 child_post = resource.getrusage(resource.RUSAGE_CHILDREN)
524
525 # Calculate CPU Percentage
526 s_user, s_system = [a - b for a, b in zip(self_post, self_pre)[:2]]
527 c_user, c_system = [a - b for a, b in zip(child_post, child_pre)[:2]]
528 cpu_percent = (s_user + c_user + s_system + c_system) / elapsed
529
530 return cpu_percent, to_return
531
532
mblighc1cbc992008-05-27 20:01:45 +0000533"""
534This function is used when there is a need to run more than one
535job simultaneously starting exactly at the same time. It basically returns
536a modified control file (containing the synchronization code prepended)
537whenever it is ready to run the control file. The synchronization
538is done using barriers to make sure that the jobs start at the same time.
539
540Here is how the synchronization is done to make sure that the tests
541start at exactly the same time on the client.
542sc_bar is a server barrier and s_bar, c_bar are the normal barriers
543
544 Job1 Job2 ...... JobN
545 Server: | sc_bar
546 Server: | s_bar ...... s_bar
547 Server: | at.run() at.run() ...... at.run()
548 ----------|------------------------------------------------------
549 Client | sc_bar
550 Client | c_bar c_bar ...... c_bar
551 Client | <run test> <run test> ...... <run test>
552
553
554PARAMS:
555 control_file : The control file which to which the above synchronization
556 code would be prepended to
557 host_name : The host name on which the job is going to run
558 host_num (non negative) : A number to identify the machine so that we have
559 different sets of s_bar_ports for each of the machines.
560 instance : The number of the job
561 num_jobs : Total number of jobs that are going to run in parallel with
562 this job starting at the same time
563 port_base : Port number that is used to derive the actual barrier ports.
564
565RETURN VALUE:
566 The modified control file.
567
568"""
569def get_sync_control_file(control, host_name, host_num,
jadmanski0afbb632008-06-06 21:10:57 +0000570 instance, num_jobs, port_base=63100):
571 sc_bar_port = port_base
572 c_bar_port = port_base
573 if host_num < 0:
574 print "Please provide a non negative number for the host"
575 return None
576 s_bar_port = port_base + 1 + host_num # The set of s_bar_ports are
577 # the same for a given machine
mblighc1cbc992008-05-27 20:01:45 +0000578
jadmanski0afbb632008-06-06 21:10:57 +0000579 sc_bar_timeout = 180
580 s_bar_timeout = c_bar_timeout = 120
mblighc1cbc992008-05-27 20:01:45 +0000581
jadmanski0afbb632008-06-06 21:10:57 +0000582 # The barrier code snippet is prepended into the conrol file
583 # dynamically before at.run() is called finally.
584 control_new = []
mblighc1cbc992008-05-27 20:01:45 +0000585
jadmanski0afbb632008-06-06 21:10:57 +0000586 # jobid is the unique name used to identify the processes
587 # trying to reach the barriers
588 jobid = "%s#%d" % (host_name, instance)
mblighc1cbc992008-05-27 20:01:45 +0000589
jadmanski0afbb632008-06-06 21:10:57 +0000590 rendv = []
591 # rendvstr is a temp holder for the rendezvous list of the processes
592 for n in range(num_jobs):
593 rendv.append("'%s#%d'" % (host_name, n))
594 rendvstr = ",".join(rendv)
mblighc1cbc992008-05-27 20:01:45 +0000595
jadmanski0afbb632008-06-06 21:10:57 +0000596 if instance == 0:
597 # Do the setup and wait at the server barrier
598 # Clean up the tmp and the control dirs for the first instance
599 control_new.append('if os.path.exists(job.tmpdir):')
600 control_new.append("\t system('umount -f %s > /dev/null"
601 "2> /dev/null' % job.tmpdir,"
602 "ignore_status=True)")
603 control_new.append("\t system('rm -rf ' + job.tmpdir)")
604 control_new.append(
605 'b0 = job.barrier("%s", "sc_bar", %d, port=%d)'
606 % (jobid, sc_bar_timeout, sc_bar_port))
607 control_new.append(
608 'b0.rendevous_servers("PARALLEL_MASTER", "%s")'
609 % jobid)
mblighc1cbc992008-05-27 20:01:45 +0000610
jadmanski0afbb632008-06-06 21:10:57 +0000611 elif instance == 1:
612 # Wait at the server barrier to wait for instance=0
613 # process to complete setup
614 b0 = barrier.barrier("PARALLEL_MASTER", "sc_bar", sc_bar_timeout,
615 port=sc_bar_port)
616 b0.rendevous_servers("PARALLEL_MASTER", jobid)
mblighc1cbc992008-05-27 20:01:45 +0000617
jadmanski0afbb632008-06-06 21:10:57 +0000618 if(num_jobs > 2):
619 b1 = barrier.barrier(jobid, "s_bar", s_bar_timeout,
620 port=s_bar_port)
621 b1.rendevous(rendvstr)
mblighc1cbc992008-05-27 20:01:45 +0000622
jadmanski0afbb632008-06-06 21:10:57 +0000623 else:
624 # For the rest of the clients
625 b2 = barrier.barrier(jobid, "s_bar", s_bar_timeout, port=s_bar_port)
626 b2.rendevous(rendvstr)
mblighc1cbc992008-05-27 20:01:45 +0000627
jadmanski0afbb632008-06-06 21:10:57 +0000628 # Client side barrier for all the tests to start at the same time
629 control_new.append('b1 = job.barrier("%s", "c_bar", %d, port=%d)'
630 % (jobid, c_bar_timeout, c_bar_port))
631 control_new.append("b1.rendevous(%s)" % rendvstr)
mblighc1cbc992008-05-27 20:01:45 +0000632
jadmanski0afbb632008-06-06 21:10:57 +0000633 # Stick in the rest of the control file
634 control_new.append(control)
mblighc1cbc992008-05-27 20:01:45 +0000635
jadmanski0afbb632008-06-06 21:10:57 +0000636 return "\n".join(control_new)
mblighc1cbc992008-05-27 20:01:45 +0000637
mbligh63073c92008-03-31 16:49:32 +0000638
mblighc5ddfd12008-08-04 17:15:00 +0000639def get_arch(run_function=run):
640 """
641 Get the hardware architecture of the machine.
642 run_function is used to execute the commands. It defaults to
643 utils.run() but a custom method (if provided) should be of the
644 same schema as utils.run. It should return a CmdResult object and
645 throw a CmdError exception.
646 """
647 arch = run_function('/bin/uname -m').stdout.rstrip()
648 if re.match(r'i\d86$', arch):
649 arch = 'i386'
650 return arch
651
652
mbligh63073c92008-03-31 16:49:32 +0000653class CmdResult(object):
jadmanski0afbb632008-06-06 21:10:57 +0000654 """
655 Command execution result.
mbligh63073c92008-03-31 16:49:32 +0000656
jadmanski0afbb632008-06-06 21:10:57 +0000657 command: String containing the command line itself
658 exit_status: Integer exit code of the process
659 stdout: String containing stdout of the process
660 stderr: String containing stderr of the process
661 duration: Elapsed wall clock time running the process
662 """
mbligh63073c92008-03-31 16:49:32 +0000663
664
jadmanski0afbb632008-06-06 21:10:57 +0000665 def __init__(self, command=None, stdout="", stderr="",
666 exit_status=None, duration=0):
667 self.command = command
668 self.exit_status = exit_status
669 self.stdout = stdout
670 self.stderr = stderr
671 self.duration = duration
mbligh63073c92008-03-31 16:49:32 +0000672
673
jadmanski0afbb632008-06-06 21:10:57 +0000674 def __repr__(self):
675 wrapper = textwrap.TextWrapper(width = 78,
676 initial_indent="\n ",
677 subsequent_indent=" ")
678
679 stdout = self.stdout.rstrip()
680 if stdout:
681 stdout = "\nstdout:\n%s" % stdout
682
683 stderr = self.stderr.rstrip()
684 if stderr:
685 stderr = "\nstderr:\n%s" % stderr
686
687 return ("* Command: %s\n"
688 "Exit status: %s\n"
689 "Duration: %s\n"
690 "%s"
691 "%s"
692 % (wrapper.fill(self.command), self.exit_status,
693 self.duration, stdout, stderr))
mbligh63073c92008-03-31 16:49:32 +0000694
695
mbligh462c0152008-03-13 15:37:10 +0000696class run_randomly:
jadmanski0afbb632008-06-06 21:10:57 +0000697 def __init__(self, run_sequentially=False):
698 # Run sequentially is for debugging control files
699 self.test_list = []
700 self.run_sequentially = run_sequentially
mbligh462c0152008-03-13 15:37:10 +0000701
702
jadmanski0afbb632008-06-06 21:10:57 +0000703 def add(self, *args, **dargs):
704 test = (args, dargs)
705 self.test_list.append(test)
mbligh462c0152008-03-13 15:37:10 +0000706
707
jadmanski0afbb632008-06-06 21:10:57 +0000708 def run(self, fn):
709 while self.test_list:
710 test_index = random.randint(0, len(self.test_list)-1)
711 if self.run_sequentially:
712 test_index = 0
713 (args, dargs) = self.test_list.pop(test_index)
714 fn(*args, **dargs)