blob: c235ba23dfa4d9fca434a1110ca2fcc6b273761b [file] [log] [blame]
mbligh63073c92008-03-31 16:49:32 +00001#!/usr/bin/python
2#
3# Copyright 2008 Google Inc. Released under the GPL v2
4
mbligh849a0f62008-08-28 20:12:19 +00005import os, pickle, random, re, resource, select, shutil, signal, StringIO
6import socket, struct, subprocess, sys, time, textwrap, urllib, urlparse
7import warnings
mbligh81edd792008-08-26 16:54:02 +00008from autotest_lib.client.common_lib import error, barrier
9
mbligh849a0f62008-08-28 20:12:19 +000010def deprecated(func):
11 """This is a decorator which can be used to mark functions as deprecated.
12 It will result in a warning being emmitted when the function is used."""
13 def new_func(*args, **dargs):
14 warnings.warn("Call to deprecated function %s." % func.__name__,
15 category=DeprecationWarning)
16 return func(*args, **dargs)
17 new_func.__name__ = func.__name__
18 new_func.__doc__ = func.__doc__
19 new_func.__dict__.update(func.__dict__)
20 return new_func
21
22
23class BgJob(object):
24 def __init__(self, command, stdout_tee=None, stderr_tee=None):
25 self.command = command
26 self.stdout_tee = stdout_tee
27 self.stderr_tee = stderr_tee
28 self.result = CmdResult(command)
29 print "running: %s" % command
30 self.sp = subprocess.Popen(command, stdout=subprocess.PIPE,
31 stderr=subprocess.PIPE,
32 preexec_fn=self._reset_sigpipe, shell=True,
33 executable="/bin/bash")
34
35
36 def output_prepare(self, stdout_file=None, stderr_file=None):
37 self.stdout_file = stdout_file
38 self.stderr_file = stderr_file
39
40 def process_output(self, stdout=True, final_read=False):
41 """output_prepare must be called prior to calling this"""
42 if stdout:
43 pipe, buf, tee = self.sp.stdout, self.stdout_file, self.stdout_tee
44 else:
45 pipe, buf, tee = self.sp.stderr, self.stderr_file, self.stderr_tee
46
47 if final_read:
48 # read in all the data we can from pipe and then stop
49 data = []
50 while select.select([pipe], [], [], 0)[0]:
51 data.append(os.read(pipe.fileno(), 1024))
52 if len(data[-1]) == 0:
53 break
54 data = "".join(data)
55 else:
56 # perform a single read
57 data = os.read(pipe.fileno(), 1024)
58 buf.write(data)
59 if tee:
60 tee.write(data)
61 tee.flush()
62
63
64 def cleanup(self):
65 self.sp.stdout.close()
66 self.sp.stderr.close()
67 self.result.stdout = self.stdout_file.getvalue()
68 self.result.stderr = self.stderr_file.getvalue()
69
70
71 def _reset_sigpipe(self):
72 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
73
mbligh81edd792008-08-26 16:54:02 +000074
75def ip_to_long(ip):
76 # !L is a long in network byte order
77 return struct.unpack('!L', socket.inet_aton(ip))[0]
78
79
80def long_to_ip(number):
81 # See above comment.
82 return socket.inet_ntoa(struct.pack('!L', number))
83
84
85def create_subnet_mask(bits):
86 # ~ does weird things in python...but this does work
87 return (1 << 32) - (1 << 32-bits)
88
89
90def format_ip_with_mask(ip, mask_bits):
91 masked_ip = ip_to_long(ip) & create_subnet_mask(mask_bits)
92 return "%s/%s" % (long_to_ip(masked_ip), mask_bits)
mbligh6231cd62008-02-02 19:18:33 +000093
mblighde0d47e2008-03-28 14:37:18 +000094
jadmanski5182e162008-05-13 21:48:16 +000095def read_one_line(filename):
mbligh6e8840c2008-07-11 18:05:49 +000096 return open(filename, 'r').readline().rstrip('\n')
jadmanski5182e162008-05-13 21:48:16 +000097
98
99def write_one_line(filename, str):
mbligh6e8840c2008-07-11 18:05:49 +0000100 open(filename, 'w').write(str.rstrip('\n') + '\n')
jadmanski5182e162008-05-13 21:48:16 +0000101
102
mblighde0d47e2008-03-28 14:37:18 +0000103def read_keyval(path):
jadmanski0afbb632008-06-06 21:10:57 +0000104 """
105 Read a key-value pair format file into a dictionary, and return it.
106 Takes either a filename or directory name as input. If it's a
107 directory name, we assume you want the file to be called keyval.
108 """
109 if os.path.isdir(path):
110 path = os.path.join(path, 'keyval')
111 keyval = {}
112 for line in open(path):
jadmanskia6014a02008-07-14 19:41:54 +0000113 line = re.sub('#.*', '', line).rstrip()
jadmanski0afbb632008-06-06 21:10:57 +0000114 if not re.search(r'^[-\w]+=', line):
115 raise ValueError('Invalid format line: %s' % line)
116 key, value = line.split('=', 1)
117 if re.search('^\d+$', value):
118 value = int(value)
119 elif re.search('^(\d+\.)?\d+$', value):
120 value = float(value)
121 keyval[key] = value
122 return keyval
mblighde0d47e2008-03-28 14:37:18 +0000123
124
jadmanskicc549172008-05-21 18:11:51 +0000125def write_keyval(path, dictionary, type_tag=None):
jadmanski0afbb632008-06-06 21:10:57 +0000126 """
127 Write a key-value pair format file out to a file. This uses append
128 mode to open the file, so existing text will not be overwritten or
129 reparsed.
jadmanskicc549172008-05-21 18:11:51 +0000130
jadmanski0afbb632008-06-06 21:10:57 +0000131 If type_tag is None, then the key must be composed of alphanumeric
132 characters (or dashes+underscores). However, if type-tag is not
133 null then the keys must also have "{type_tag}" as a suffix. At
134 the moment the only valid values of type_tag are "attr" and "perf".
135 """
136 if os.path.isdir(path):
137 path = os.path.join(path, 'keyval')
138 keyval = open(path, 'a')
jadmanskicc549172008-05-21 18:11:51 +0000139
jadmanski0afbb632008-06-06 21:10:57 +0000140 if type_tag is None:
141 key_regex = re.compile(r'^[-\w]+$')
142 else:
143 if type_tag not in ('attr', 'perf'):
144 raise ValueError('Invalid type tag: %s' % type_tag)
145 escaped_tag = re.escape(type_tag)
146 key_regex = re.compile(r'^[-\w]+\{%s\}$' % escaped_tag)
147 try:
148 for key, value in dictionary.iteritems():
149 if not key_regex.search(key):
150 raise ValueError('Invalid key: %s' % key)
151 keyval.write('%s=%s\n' % (key, value))
152 finally:
153 keyval.close()
mbligh6231cd62008-02-02 19:18:33 +0000154
155
156def is_url(path):
jadmanski0afbb632008-06-06 21:10:57 +0000157 """Return true if path looks like a URL"""
158 # for now, just handle http and ftp
159 url_parts = urlparse.urlparse(path)
160 return (url_parts[0] in ('http', 'ftp'))
mbligh6231cd62008-02-02 19:18:33 +0000161
162
mbligh02ff2d52008-06-03 15:00:21 +0000163def urlopen(url, data=None, proxies=None, timeout=300):
jadmanski0afbb632008-06-06 21:10:57 +0000164 """Wrapper to urllib.urlopen with timeout addition."""
mbligh02ff2d52008-06-03 15:00:21 +0000165
jadmanski0afbb632008-06-06 21:10:57 +0000166 # Save old timeout
167 old_timeout = socket.getdefaulttimeout()
168 socket.setdefaulttimeout(timeout)
169 try:
170 return urllib.urlopen(url, data=data, proxies=proxies)
171 finally:
172 socket.setdefaulttimeout(old_timeout)
mbligh02ff2d52008-06-03 15:00:21 +0000173
174
175def urlretrieve(url, filename=None, reporthook=None, data=None, timeout=300):
jadmanski0afbb632008-06-06 21:10:57 +0000176 """Wrapper to urllib.urlretrieve with timeout addition."""
177 old_timeout = socket.getdefaulttimeout()
178 socket.setdefaulttimeout(timeout)
179 try:
180 return urllib.urlretrieve(url, filename=filename,
181 reporthook=reporthook, data=data)
182 finally:
183 socket.setdefaulttimeout(old_timeout)
184
mbligh02ff2d52008-06-03 15:00:21 +0000185
mbligh6231cd62008-02-02 19:18:33 +0000186def get_file(src, dest, permissions=None):
jadmanski0afbb632008-06-06 21:10:57 +0000187 """Get a file from src, which can be local or a remote URL"""
188 if (src == dest):
189 return
190 if (is_url(src)):
191 print 'PWD: ' + os.getcwd()
192 print 'Fetching \n\t', src, '\n\t->', dest
193 try:
194 urllib.urlretrieve(src, dest)
195 except IOError, e:
196 raise error.AutotestError('Unable to retrieve %s (to %s)'
197 % (src, dest), e)
198 else:
199 shutil.copyfile(src, dest)
200 if permissions:
201 os.chmod(dest, permissions)
202 return dest
mbligh6231cd62008-02-02 19:18:33 +0000203
204
205def unmap_url(srcdir, src, destdir='.'):
jadmanski0afbb632008-06-06 21:10:57 +0000206 """
207 Receives either a path to a local file or a URL.
208 returns either the path to the local file, or the fetched URL
mbligh6231cd62008-02-02 19:18:33 +0000209
jadmanski0afbb632008-06-06 21:10:57 +0000210 unmap_url('/usr/src', 'foo.tar', '/tmp')
211 = '/usr/src/foo.tar'
212 unmap_url('/usr/src', 'http://site/file', '/tmp')
213 = '/tmp/file'
214 (after retrieving it)
215 """
216 if is_url(src):
217 url_parts = urlparse.urlparse(src)
218 filename = os.path.basename(url_parts[2])
219 dest = os.path.join(destdir, filename)
220 return get_file(src, dest)
221 else:
222 return os.path.join(srcdir, src)
mbligh6231cd62008-02-02 19:18:33 +0000223
224
225def update_version(srcdir, preserve_srcdir, new_version, install,
jadmanski0afbb632008-06-06 21:10:57 +0000226 *args, **dargs):
227 """
228 Make sure srcdir is version new_version
mbligh6231cd62008-02-02 19:18:33 +0000229
jadmanski0afbb632008-06-06 21:10:57 +0000230 If not, delete it and install() the new version.
mbligh6231cd62008-02-02 19:18:33 +0000231
jadmanski0afbb632008-06-06 21:10:57 +0000232 In the preserve_srcdir case, we just check it's up to date,
233 and if not, we rerun install, without removing srcdir
234 """
235 versionfile = os.path.join(srcdir, '.version')
236 install_needed = True
mbligh6231cd62008-02-02 19:18:33 +0000237
jadmanski0afbb632008-06-06 21:10:57 +0000238 if os.path.exists(versionfile):
239 old_version = pickle.load(open(versionfile))
240 if old_version == new_version:
241 install_needed = False
mbligh6231cd62008-02-02 19:18:33 +0000242
jadmanski0afbb632008-06-06 21:10:57 +0000243 if install_needed:
244 if not preserve_srcdir and os.path.exists(srcdir):
245 shutil.rmtree(srcdir)
246 install(*args, **dargs)
247 if os.path.exists(srcdir):
248 pickle.dump(new_version, open(versionfile, 'w'))
mbligh462c0152008-03-13 15:37:10 +0000249
250
mbligh63073c92008-03-31 16:49:32 +0000251def run(command, timeout=None, ignore_status=False,
jadmanski0afbb632008-06-06 21:10:57 +0000252 stdout_tee=None, stderr_tee=None):
253 """
254 Run a command on the host.
mbligh63073c92008-03-31 16:49:32 +0000255
jadmanski0afbb632008-06-06 21:10:57 +0000256 Args:
257 command: the command line string
258 timeout: time limit in seconds before attempting to
259 kill the running process. The run() function
260 will take a few seconds longer than 'timeout'
261 to complete if it has to kill the process.
262 ignore_status: do not raise an exception, no matter what
263 the exit code of the command is.
264 stdout_tee: optional file-like object to which stdout data
265 will be written as it is generated (data will still
266 be stored in result.stdout)
267 stderr_tee: likewise for stderr
mbligh63073c92008-03-31 16:49:32 +0000268
jadmanski0afbb632008-06-06 21:10:57 +0000269 Returns:
270 a CmdResult object
mbligh63073c92008-03-31 16:49:32 +0000271
jadmanski0afbb632008-06-06 21:10:57 +0000272 Raises:
273 CmdError: the exit code of the command
274 execution was not 0
275 """
mbligh849a0f62008-08-28 20:12:19 +0000276 bg_job = join_bg_jobs((BgJob(command, stdout_tee, stderr_tee),),
277 timeout)[0]
278 if not ignore_status and bg_job.result.exit_status:
jadmanski9c1098b2008-09-02 14:18:48 +0000279 raise error.CmdError(command, bg_job.result,
mbligh849a0f62008-08-28 20:12:19 +0000280 "Command returned non-zero exit status")
mbligh63073c92008-03-31 16:49:32 +0000281
mbligh849a0f62008-08-28 20:12:19 +0000282 return bg_job.result
mbligh63073c92008-03-31 16:49:32 +0000283
mbligha5630a52008-09-03 22:09:50 +0000284def run_parallel(commands, timeout=None, ignore_status=False,
285 stdout_tee=None, stderr_tee=None):
286 """Beahves the same as run with the following exceptions:
287
288 - commands is a list of commands to run in parallel.
289 - ignore_status toggles whether or not an exception should be raised
290 on any error.
291
292 returns a list of CmdResult objects
293 """
294 bg_jobs = []
295 for command in commands:
296 bg_jobs.append(BgJob(command, stdout_tee, stderr_tee))
297
298 # Updates objects in bg_jobs list with their process information
299 join_bg_jobs(bg_jobs, timeout)
300
301 for bg_job in bg_jobs:
302 if not ignore_status and bg_job.result.exit_status:
303 raise error.CmdError(command, bg_job.result,
304 "Command returned non-zero exit status")
305
306 return [bg_job.result for bg_job in bg_jobs]
307
308
mbligh849a0f62008-08-28 20:12:19 +0000309@deprecated
mbligh63073c92008-03-31 16:49:32 +0000310def run_bg(command):
mbligh849a0f62008-08-28 20:12:19 +0000311 """Function deprecated. Please use BgJob class instead."""
312 bg_job = BgJob(command)
313 return bg_job.sp, bg_job.result
mbligh63073c92008-03-31 16:49:32 +0000314
315
mbligh849a0f62008-08-28 20:12:19 +0000316def join_bg_jobs(bg_jobs, timeout=None):
mbligha5630a52008-09-03 22:09:50 +0000317 """Joins the bg_jobs with the current thread.
318
319 Returns the same list of bg_jobs objects that was passed in.
320 """
mbligh849a0f62008-08-28 20:12:19 +0000321 ret, timeouterr = 0, False
322 for bg_job in bg_jobs:
323 bg_job.output_prepare(StringIO.StringIO(), StringIO.StringIO())
mbligh63073c92008-03-31 16:49:32 +0000324
jadmanski0afbb632008-06-06 21:10:57 +0000325 try:
326 # We are holding ends to stdin, stdout pipes
327 # hence we need to be sure to close those fds no mater what
328 start_time = time.time()
mbligh849a0f62008-08-28 20:12:19 +0000329 timeout_error = _wait_for_commands(bg_jobs, start_time, timeout)
330
331 for bg_job in bg_jobs:
332 # Process stdout and stderr
333 bg_job.process_output(stdout=True,final_read=True)
334 bg_job.process_output(stdout=False,final_read=True)
jadmanski0afbb632008-06-06 21:10:57 +0000335 finally:
336 # close our ends of the pipes to the sp no matter what
mbligh849a0f62008-08-28 20:12:19 +0000337 for bg_job in bg_jobs:
338 bg_job.cleanup()
mbligh63073c92008-03-31 16:49:32 +0000339
mbligh849a0f62008-08-28 20:12:19 +0000340 if timeout_error:
341 # TODO: This needs to be fixed to better represent what happens when
342 # running in parallel. However this is backwards compatable, so it will
343 # do for the time being.
344 raise error.CmdError(bg_jobs[0].command, bg_jobs[0].result,
345 "Command(s) did not complete within %d seconds"
346 % timeout)
mbligh63073c92008-03-31 16:49:32 +0000347
mbligh63073c92008-03-31 16:49:32 +0000348
mbligh849a0f62008-08-28 20:12:19 +0000349 return bg_jobs
mbligh63073c92008-03-31 16:49:32 +0000350
mbligh849a0f62008-08-28 20:12:19 +0000351
352def _wait_for_commands(bg_jobs, start_time, timeout):
353 # This returns True if it must return due to a timeout, otherwise False.
354
mblighf0b4a0a2008-09-03 20:46:16 +0000355 # To check for processes which terminate without producing any output
356 # a 1 second timeout is used in select.
357 SELECT_TIMEOUT = 1
358
mbligh849a0f62008-08-28 20:12:19 +0000359 select_list = []
360 reverse_dict = {}
361 for bg_job in bg_jobs:
362 select_list.append(bg_job.sp.stdout)
363 select_list.append(bg_job.sp.stderr)
364 reverse_dict[bg_job.sp.stdout] = (bg_job,True)
365 reverse_dict[bg_job.sp.stderr] = (bg_job,False)
366
jadmanski0afbb632008-06-06 21:10:57 +0000367 if timeout:
368 stop_time = start_time + timeout
369 time_left = stop_time - time.time()
370 else:
371 time_left = None # so that select never times out
372 while not timeout or time_left > 0:
373 # select will return when stdout is ready (including when it is
374 # EOF, that is the process has terminated).
mblighf0b4a0a2008-09-03 20:46:16 +0000375 ready, _, _ = select.select(select_list, [], [], SELECT_TIMEOUT)
mbligh849a0f62008-08-28 20:12:19 +0000376
jadmanski0afbb632008-06-06 21:10:57 +0000377 # os.read() has to be used instead of
378 # subproc.stdout.read() which will otherwise block
mbligh849a0f62008-08-28 20:12:19 +0000379 for fileno in ready:
380 bg_job,stdout = reverse_dict[fileno]
381 bg_job.process_output(stdout)
mbligh63073c92008-03-31 16:49:32 +0000382
mbligh849a0f62008-08-28 20:12:19 +0000383 remaining_jobs = [x for x in bg_jobs if x.result.exit_status is None]
384 if len(remaining_jobs) == 0:
385 return False
386 for bg_job in remaining_jobs:
387 bg_job.result.exit_status = bg_job.sp.poll()
mbligh8ea61e22008-05-09 18:09:37 +0000388
jadmanski0afbb632008-06-06 21:10:57 +0000389 if timeout:
390 time_left = stop_time - time.time()
mbligh63073c92008-03-31 16:49:32 +0000391
mbligh849a0f62008-08-28 20:12:19 +0000392 # Kill all processes which did not complete prior to timeout
393 for bg_job in [x for x in bg_jobs if x.result.exit_status is None]:
394 nuke_subprocess(bg_job.sp)
mbligh8ea61e22008-05-09 18:09:37 +0000395
mbligh849a0f62008-08-28 20:12:19 +0000396 return True
mbligh63073c92008-03-31 16:49:32 +0000397
398
mbligh63073c92008-03-31 16:49:32 +0000399def nuke_subprocess(subproc):
jadmanski0afbb632008-06-06 21:10:57 +0000400 # the process has not terminated within timeout,
401 # kill it via an escalating series of signals.
402 signal_queue = [signal.SIGTERM, signal.SIGKILL]
403 for sig in signal_queue:
404 try:
405 os.kill(subproc.pid, sig)
406 # The process may have died before we could kill it.
407 except OSError:
408 pass
mbligh63073c92008-03-31 16:49:32 +0000409
jadmanski0afbb632008-06-06 21:10:57 +0000410 for i in range(5):
411 rc = subproc.poll()
412 if rc != None:
413 return rc
414 time.sleep(1)
mbligh63073c92008-03-31 16:49:32 +0000415
416
417def nuke_pid(pid):
jadmanski0afbb632008-06-06 21:10:57 +0000418 # the process has not terminated within timeout,
419 # kill it via an escalating series of signals.
420 signal_queue = [signal.SIGTERM, signal.SIGKILL]
421 for sig in signal_queue:
422 try:
423 os.kill(pid, sig)
mbligh63073c92008-03-31 16:49:32 +0000424
jadmanski0afbb632008-06-06 21:10:57 +0000425 # The process may have died before we could kill it.
426 except OSError:
427 pass
mbligh63073c92008-03-31 16:49:32 +0000428
jadmanski0afbb632008-06-06 21:10:57 +0000429 try:
430 for i in range(5):
431 status = os.waitpid(pid, os.WNOHANG)[0]
432 if status == pid:
433 return
434 time.sleep(1)
mbligh63073c92008-03-31 16:49:32 +0000435
jadmanski0afbb632008-06-06 21:10:57 +0000436 if status != pid:
437 raise error.AutoservRunError('Could not kill %d'
438 % pid, None)
mbligh63073c92008-03-31 16:49:32 +0000439
jadmanski0afbb632008-06-06 21:10:57 +0000440 # the process died before we join it.
441 except OSError:
442 pass
mbligh63073c92008-03-31 16:49:32 +0000443
444
mbligh63073c92008-03-31 16:49:32 +0000445
446def system(command, timeout=None, ignore_status=False):
mbligha5630a52008-09-03 22:09:50 +0000447 """This function returns the exit status of command."""
jadmanski0afbb632008-06-06 21:10:57 +0000448 return run(command, timeout, ignore_status,
mbligha5630a52008-09-03 22:09:50 +0000449 stdout_tee=sys.stdout, stderr_tee=sys.stderr).exit_status
mbligh63073c92008-03-31 16:49:32 +0000450
451
mbligha5630a52008-09-03 22:09:50 +0000452def system_parallel(commands, timeout=None, ignore_status=False):
453 """This function returns a list of exit statuses for the respective
454 list of commands."""
455 return [bg_jobs.exit_status for bg_jobs in
456 run_parallel(commands, timeout, ignore_status,
457 stdout_tee=sys.stdout, stderr_tee=sys.stderr)]
mbligh849a0f62008-08-28 20:12:19 +0000458
459
mbligh8ea61e22008-05-09 18:09:37 +0000460def system_output(command, timeout=None, ignore_status=False,
jadmanski0afbb632008-06-06 21:10:57 +0000461 retain_output=False):
462 if retain_output:
463 out = run(command, timeout, ignore_status,
464 stdout_tee=sys.stdout, stderr_tee=sys.stderr).stdout
465 else:
466 out = run(command, timeout, ignore_status).stdout
467 if out[-1:] == '\n': out = out[:-1]
468 return out
mbligh63073c92008-03-31 16:49:32 +0000469
mbligh849a0f62008-08-28 20:12:19 +0000470
mbligha5630a52008-09-03 22:09:50 +0000471def system_output_parallel(commands, timeout=None, ignore_status=False,
472 retain_output=False):
473 if retain_output:
474 out = [bg_job.stdout for bg_job in run_parallel(commands, timeout,
475 ignore_status,
476 stdout_tee=sys.stdout,
477 stderr_tee=sys.stderr)]
478 else:
479 out = [bg_job.stdout for bg_job in run_parallel(commands, timeout,
480 ignore_status)]
481 for x in out:
482 if out[-1:] == '\n': out = out[:-1]
483 return out
484
485
486def get_cpu_percentage(function, *args, **dargs):
487 """Returns a tuple containing the CPU% and return value from function call.
488
489 This function calculates the usage time by taking the difference of
490 the user and system times both before and after the function call.
491 """
492 child_pre = resource.getrusage(resource.RUSAGE_CHILDREN)
493 self_pre = resource.getrusage(resource.RUSAGE_SELF)
494 start = time.time()
495 to_return = function(*args, **dargs)
496 elapsed = time.time() - start
497 self_post = resource.getrusage(resource.RUSAGE_SELF)
498 child_post = resource.getrusage(resource.RUSAGE_CHILDREN)
499
500 # Calculate CPU Percentage
501 s_user, s_system = [a - b for a, b in zip(self_post, self_pre)[:2]]
502 c_user, c_system = [a - b for a, b in zip(child_post, child_pre)[:2]]
503 cpu_percent = (s_user + c_user + s_system + c_system) / elapsed
504
505 return cpu_percent, to_return
506
507
mblighc1cbc992008-05-27 20:01:45 +0000508"""
509This function is used when there is a need to run more than one
510job simultaneously starting exactly at the same time. It basically returns
511a modified control file (containing the synchronization code prepended)
512whenever it is ready to run the control file. The synchronization
513is done using barriers to make sure that the jobs start at the same time.
514
515Here is how the synchronization is done to make sure that the tests
516start at exactly the same time on the client.
517sc_bar is a server barrier and s_bar, c_bar are the normal barriers
518
519 Job1 Job2 ...... JobN
520 Server: | sc_bar
521 Server: | s_bar ...... s_bar
522 Server: | at.run() at.run() ...... at.run()
523 ----------|------------------------------------------------------
524 Client | sc_bar
525 Client | c_bar c_bar ...... c_bar
526 Client | <run test> <run test> ...... <run test>
527
528
529PARAMS:
530 control_file : The control file which to which the above synchronization
531 code would be prepended to
532 host_name : The host name on which the job is going to run
533 host_num (non negative) : A number to identify the machine so that we have
534 different sets of s_bar_ports for each of the machines.
535 instance : The number of the job
536 num_jobs : Total number of jobs that are going to run in parallel with
537 this job starting at the same time
538 port_base : Port number that is used to derive the actual barrier ports.
539
540RETURN VALUE:
541 The modified control file.
542
543"""
544def get_sync_control_file(control, host_name, host_num,
jadmanski0afbb632008-06-06 21:10:57 +0000545 instance, num_jobs, port_base=63100):
546 sc_bar_port = port_base
547 c_bar_port = port_base
548 if host_num < 0:
549 print "Please provide a non negative number for the host"
550 return None
551 s_bar_port = port_base + 1 + host_num # The set of s_bar_ports are
552 # the same for a given machine
mblighc1cbc992008-05-27 20:01:45 +0000553
jadmanski0afbb632008-06-06 21:10:57 +0000554 sc_bar_timeout = 180
555 s_bar_timeout = c_bar_timeout = 120
mblighc1cbc992008-05-27 20:01:45 +0000556
jadmanski0afbb632008-06-06 21:10:57 +0000557 # The barrier code snippet is prepended into the conrol file
558 # dynamically before at.run() is called finally.
559 control_new = []
mblighc1cbc992008-05-27 20:01:45 +0000560
jadmanski0afbb632008-06-06 21:10:57 +0000561 # jobid is the unique name used to identify the processes
562 # trying to reach the barriers
563 jobid = "%s#%d" % (host_name, instance)
mblighc1cbc992008-05-27 20:01:45 +0000564
jadmanski0afbb632008-06-06 21:10:57 +0000565 rendv = []
566 # rendvstr is a temp holder for the rendezvous list of the processes
567 for n in range(num_jobs):
568 rendv.append("'%s#%d'" % (host_name, n))
569 rendvstr = ",".join(rendv)
mblighc1cbc992008-05-27 20:01:45 +0000570
jadmanski0afbb632008-06-06 21:10:57 +0000571 if instance == 0:
572 # Do the setup and wait at the server barrier
573 # Clean up the tmp and the control dirs for the first instance
574 control_new.append('if os.path.exists(job.tmpdir):')
575 control_new.append("\t system('umount -f %s > /dev/null"
576 "2> /dev/null' % job.tmpdir,"
577 "ignore_status=True)")
578 control_new.append("\t system('rm -rf ' + job.tmpdir)")
579 control_new.append(
580 'b0 = job.barrier("%s", "sc_bar", %d, port=%d)'
581 % (jobid, sc_bar_timeout, sc_bar_port))
582 control_new.append(
583 'b0.rendevous_servers("PARALLEL_MASTER", "%s")'
584 % jobid)
mblighc1cbc992008-05-27 20:01:45 +0000585
jadmanski0afbb632008-06-06 21:10:57 +0000586 elif instance == 1:
587 # Wait at the server barrier to wait for instance=0
588 # process to complete setup
589 b0 = barrier.barrier("PARALLEL_MASTER", "sc_bar", sc_bar_timeout,
590 port=sc_bar_port)
591 b0.rendevous_servers("PARALLEL_MASTER", jobid)
mblighc1cbc992008-05-27 20:01:45 +0000592
jadmanski0afbb632008-06-06 21:10:57 +0000593 if(num_jobs > 2):
594 b1 = barrier.barrier(jobid, "s_bar", s_bar_timeout,
595 port=s_bar_port)
596 b1.rendevous(rendvstr)
mblighc1cbc992008-05-27 20:01:45 +0000597
jadmanski0afbb632008-06-06 21:10:57 +0000598 else:
599 # For the rest of the clients
600 b2 = barrier.barrier(jobid, "s_bar", s_bar_timeout, port=s_bar_port)
601 b2.rendevous(rendvstr)
mblighc1cbc992008-05-27 20:01:45 +0000602
jadmanski0afbb632008-06-06 21:10:57 +0000603 # Client side barrier for all the tests to start at the same time
604 control_new.append('b1 = job.barrier("%s", "c_bar", %d, port=%d)'
605 % (jobid, c_bar_timeout, c_bar_port))
606 control_new.append("b1.rendevous(%s)" % rendvstr)
mblighc1cbc992008-05-27 20:01:45 +0000607
jadmanski0afbb632008-06-06 21:10:57 +0000608 # Stick in the rest of the control file
609 control_new.append(control)
mblighc1cbc992008-05-27 20:01:45 +0000610
jadmanski0afbb632008-06-06 21:10:57 +0000611 return "\n".join(control_new)
mblighc1cbc992008-05-27 20:01:45 +0000612
mbligh63073c92008-03-31 16:49:32 +0000613
mblighc5ddfd12008-08-04 17:15:00 +0000614def get_arch(run_function=run):
615 """
616 Get the hardware architecture of the machine.
617 run_function is used to execute the commands. It defaults to
618 utils.run() but a custom method (if provided) should be of the
619 same schema as utils.run. It should return a CmdResult object and
620 throw a CmdError exception.
621 """
622 arch = run_function('/bin/uname -m').stdout.rstrip()
623 if re.match(r'i\d86$', arch):
624 arch = 'i386'
625 return arch
626
627
mbligh63073c92008-03-31 16:49:32 +0000628class CmdResult(object):
jadmanski0afbb632008-06-06 21:10:57 +0000629 """
630 Command execution result.
mbligh63073c92008-03-31 16:49:32 +0000631
jadmanski0afbb632008-06-06 21:10:57 +0000632 command: String containing the command line itself
633 exit_status: Integer exit code of the process
634 stdout: String containing stdout of the process
635 stderr: String containing stderr of the process
636 duration: Elapsed wall clock time running the process
637 """
mbligh63073c92008-03-31 16:49:32 +0000638
639
jadmanski0afbb632008-06-06 21:10:57 +0000640 def __init__(self, command=None, stdout="", stderr="",
641 exit_status=None, duration=0):
642 self.command = command
643 self.exit_status = exit_status
644 self.stdout = stdout
645 self.stderr = stderr
646 self.duration = duration
mbligh63073c92008-03-31 16:49:32 +0000647
648
jadmanski0afbb632008-06-06 21:10:57 +0000649 def __repr__(self):
650 wrapper = textwrap.TextWrapper(width = 78,
651 initial_indent="\n ",
652 subsequent_indent=" ")
653
654 stdout = self.stdout.rstrip()
655 if stdout:
656 stdout = "\nstdout:\n%s" % stdout
657
658 stderr = self.stderr.rstrip()
659 if stderr:
660 stderr = "\nstderr:\n%s" % stderr
661
662 return ("* Command: %s\n"
663 "Exit status: %s\n"
664 "Duration: %s\n"
665 "%s"
666 "%s"
667 % (wrapper.fill(self.command), self.exit_status,
668 self.duration, stdout, stderr))
mbligh63073c92008-03-31 16:49:32 +0000669
670
mbligh462c0152008-03-13 15:37:10 +0000671class run_randomly:
jadmanski0afbb632008-06-06 21:10:57 +0000672 def __init__(self, run_sequentially=False):
673 # Run sequentially is for debugging control files
674 self.test_list = []
675 self.run_sequentially = run_sequentially
mbligh462c0152008-03-13 15:37:10 +0000676
677
jadmanski0afbb632008-06-06 21:10:57 +0000678 def add(self, *args, **dargs):
679 test = (args, dargs)
680 self.test_list.append(test)
mbligh462c0152008-03-13 15:37:10 +0000681
682
jadmanski0afbb632008-06-06 21:10:57 +0000683 def run(self, fn):
684 while self.test_list:
685 test_index = random.randint(0, len(self.test_list)-1)
686 if self.run_sequentially:
687 test_index = 0
688 (args, dargs) = self.test_list.pop(test_index)
689 fn(*args, **dargs)