blob: 71cac1e7605377d8663809d8b6d621128f1a37c9 [file] [log] [blame]
mbligh63073c92008-03-31 16:49:32 +00001#!/usr/bin/python
2#
3# Copyright 2008 Google Inc. Released under the GPL v2
4
5import os, pickle, random, re, select, shutil, signal, StringIO, subprocess
mbligh0a297572008-04-02 00:13:45 +00006import sys, time, textwrap, urllib, urlparse
mblighc1cbc992008-05-27 20:01:45 +00007import error, barrier
mbligh6231cd62008-02-02 19:18:33 +00008
mblighde0d47e2008-03-28 14:37:18 +00009
jadmanski5182e162008-05-13 21:48:16 +000010def read_one_line(filename):
11 return open(filename, 'r').readline().strip()
12
13
14def write_one_line(filename, str):
15 open(filename, 'w').write(str.rstrip() + "\n")
16
17
mblighde0d47e2008-03-28 14:37:18 +000018def read_keyval(path):
19 """
20 Read a key-value pair format file into a dictionary, and return it.
21 Takes either a filename or directory name as input. If it's a
22 directory name, we assume you want the file to be called keyval.
23 """
24 if os.path.isdir(path):
25 path = os.path.join(path, 'keyval')
26 keyval = {}
27 for line in open(path):
28 line = re.sub('#.*', '', line.rstrip())
mblighcaa62c22008-04-07 21:51:17 +000029 if not re.search(r'^[-\w]+=', line):
mblighde0d47e2008-03-28 14:37:18 +000030 raise ValueError('Invalid format line: %s' % line)
31 key, value = line.split('=', 1)
32 if re.search('^\d+$', value):
33 value = int(value)
34 elif re.search('^(\d+\.)?\d+$', value):
35 value = float(value)
36 keyval[key] = value
37 return keyval
38
39
jadmanskicc549172008-05-21 18:11:51 +000040def write_keyval(path, dictionary, type_tag=None):
41 """
42 Write a key-value pair format file out to a file. This uses append
43 mode to open the file, so existing text will not be overwritten or
44 reparsed.
45
46 If type_tag is None, then the key must be composed of alphanumeric
47 characters (or dashes+underscores). However, if type-tag is not
48 null then the keys must also have "{type_tag}" as a suffix. At
49 the moment the only valid values of type_tag are "attr" and "perf".
50 """
mbligh6231cd62008-02-02 19:18:33 +000051 if os.path.isdir(path):
52 path = os.path.join(path, 'keyval')
53 keyval = open(path, 'a')
jadmanskicc549172008-05-21 18:11:51 +000054
55 if type_tag is None:
56 key_regex = re.compile(r'^[-\w]+$')
57 else:
58 if type_tag not in ('attr', 'perf'):
59 raise ValueError('Invalid type tag: %s' % type_tag)
60 escaped_tag = re.escape(type_tag)
61 key_regex = re.compile(r'^[-\w]+\{%s\}$' % escaped_tag)
mbligh6231cd62008-02-02 19:18:33 +000062 try:
63 for key, value in dictionary.iteritems():
jadmanskicc549172008-05-21 18:11:51 +000064 if not key_regex.search(key):
mbligh6231cd62008-02-02 19:18:33 +000065 raise ValueError('Invalid key: %s' % key)
66 keyval.write('%s=%s\n' % (key, value))
67 finally:
68 keyval.close()
69
70
71def is_url(path):
72 """Return true if path looks like a URL"""
73 # for now, just handle http and ftp
74 url_parts = urlparse.urlparse(path)
75 return (url_parts[0] in ('http', 'ftp'))
76
77
78def get_file(src, dest, permissions=None):
79 """Get a file from src, which can be local or a remote URL"""
80 if (src == dest):
81 return
82 if (is_url(src)):
83 print 'PWD: ' + os.getcwd()
84 print 'Fetching \n\t', src, '\n\t->', dest
85 try:
86 urllib.urlretrieve(src, dest)
87 except IOError, e:
mbligh63073c92008-03-31 16:49:32 +000088 raise error.AutotestError('Unable to retrieve %s (to %s)'
mbligh6231cd62008-02-02 19:18:33 +000089 % (src, dest), e)
90 else:
91 shutil.copyfile(src, dest)
92 if permissions:
93 os.chmod(dest, permissions)
94 return dest
95
96
97def unmap_url(srcdir, src, destdir='.'):
98 """
99 Receives either a path to a local file or a URL.
100 returns either the path to the local file, or the fetched URL
101
102 unmap_url('/usr/src', 'foo.tar', '/tmp')
103 = '/usr/src/foo.tar'
104 unmap_url('/usr/src', 'http://site/file', '/tmp')
105 = '/tmp/file'
106 (after retrieving it)
107 """
108 if is_url(src):
109 url_parts = urlparse.urlparse(src)
110 filename = os.path.basename(url_parts[2])
111 dest = os.path.join(destdir, filename)
112 return get_file(src, dest)
113 else:
114 return os.path.join(srcdir, src)
115
116
117def update_version(srcdir, preserve_srcdir, new_version, install,
118 *args, **dargs):
119 """
120 Make sure srcdir is version new_version
121
122 If not, delete it and install() the new version.
123
124 In the preserve_srcdir case, we just check it's up to date,
125 and if not, we rerun install, without removing srcdir
126 """
127 versionfile = os.path.join(srcdir, '.version')
128 install_needed = True
129
130 if os.path.exists(versionfile):
131 old_version = pickle.load(open(versionfile))
132 if old_version == new_version:
133 install_needed = False
134
135 if install_needed:
136 if not preserve_srcdir and os.path.exists(srcdir):
137 shutil.rmtree(srcdir)
138 install(*args, **dargs)
139 if os.path.exists(srcdir):
140 pickle.dump(new_version, open(versionfile, 'w'))
mbligh462c0152008-03-13 15:37:10 +0000141
142
mbligh63073c92008-03-31 16:49:32 +0000143def run(command, timeout=None, ignore_status=False,
144 stdout_tee=None, stderr_tee=None):
145 """
146 Run a command on the host.
147
148 Args:
149 command: the command line string
150 timeout: time limit in seconds before attempting to
151 kill the running process. The run() function
152 will take a few seconds longer than 'timeout'
153 to complete if it has to kill the process.
154 ignore_status: do not raise an exception, no matter what
155 the exit code of the command is.
156 stdout_tee: optional file-like object to which stdout data
157 will be written as it is generated (data will still
158 be stored in result.stdout)
159 stderr_tee: likewise for stderr
160
161 Returns:
162 a CmdResult object
163
164 Raises:
mbligh8ea61e22008-05-09 18:09:37 +0000165 CmdError: the exit code of the command
mbligh63073c92008-03-31 16:49:32 +0000166 execution was not 0
167 """
168 return join_bg_job(run_bg(command), timeout, ignore_status,
169 stdout_tee, stderr_tee)
170
171
172def run_bg(command):
173 """Run the command in a subprocess and return the subprocess."""
174 result = CmdResult(command)
175 sp = subprocess.Popen(command, stdout=subprocess.PIPE,
176 stderr=subprocess.PIPE,
177 shell=True, executable="/bin/bash")
178 return sp, result
179
180
181def join_bg_job(bg_job, timeout=None, ignore_status=False,
182 stdout_tee=None, stderr_tee=None):
183 """Join the subprocess with the current thread. See run description."""
184 sp, result = bg_job
185 stdout_file = StringIO.StringIO()
186 stderr_file = StringIO.StringIO()
mbligh8ea61e22008-05-09 18:09:37 +0000187 (ret, timeouterr) = (0, False)
mbligh63073c92008-03-31 16:49:32 +0000188
189 try:
190 # We are holding ends to stdin, stdout pipes
191 # hence we need to be sure to close those fds no mater what
192 start_time = time.time()
mbligh8ea61e22008-05-09 18:09:37 +0000193 (ret, timeouterr) = _wait_for_command(sp, start_time,
194 timeout, stdout_file, stderr_file,
195 stdout_tee, stderr_tee)
mbligh63073c92008-03-31 16:49:32 +0000196 result.exit_status = ret
mbligh63073c92008-03-31 16:49:32 +0000197 result.duration = time.time() - start_time
198 # don't use os.read now, so we get all the rest of the output
199 _process_output(sp.stdout, stdout_file, stdout_tee,
200 use_os_read=False)
201 _process_output(sp.stderr, stderr_file, stderr_tee,
202 use_os_read=False)
203 finally:
204 # close our ends of the pipes to the sp no matter what
205 sp.stdout.close()
206 sp.stderr.close()
207
208 result.stdout = stdout_file.getvalue()
209 result.stderr = stderr_file.getvalue()
210
mbligh8ea61e22008-05-09 18:09:37 +0000211 if result.exit_status != 0:
212 if timeouterr:
213 raise error.CmdError('Command not complete within'
214 ' %s seconds' % timeout, result)
215 elif not ignore_status:
216 raise error.CmdError("command execution error", result)
mbligh63073c92008-03-31 16:49:32 +0000217
218 return result
219
mbligh8ea61e22008-05-09 18:09:37 +0000220# this returns a tuple with the return code and a flag to specify if the error
221# is due to the process not terminating within timeout
mbligh63073c92008-03-31 16:49:32 +0000222def _wait_for_command(subproc, start_time, timeout, stdout_file, stderr_file,
223 stdout_tee, stderr_tee):
224 if timeout:
225 stop_time = start_time + timeout
226 time_left = stop_time - time.time()
227 else:
228 time_left = None # so that select never times out
229 while not timeout or time_left > 0:
230 # select will return when stdout is ready (including when it is
231 # EOF, that is the process has terminated).
232 ready, _, _ = select.select([subproc.stdout, subproc.stderr],
233 [], [], time_left)
234 # os.read() has to be used instead of
235 # subproc.stdout.read() which will otherwise block
236 if subproc.stdout in ready:
237 _process_output(subproc.stdout, stdout_file,
238 stdout_tee)
239 if subproc.stderr in ready:
240 _process_output(subproc.stderr, stderr_file,
241 stderr_tee)
242
243 exit_status_indication = subproc.poll()
244
245 if exit_status_indication is not None:
mbligh8ea61e22008-05-09 18:09:37 +0000246 return (exit_status_indication, False)
247
mbligh63073c92008-03-31 16:49:32 +0000248 if timeout:
249 time_left = stop_time - time.time()
250
251 # the process has not terminated within timeout,
252 # kill it via an escalating series of signals.
253 if exit_status_indication is None:
mbligh8ea61e22008-05-09 18:09:37 +0000254 exit_status_indication = nuke_subprocess(subproc)
255
256 return (exit_status_indication, True)
mbligh63073c92008-03-31 16:49:32 +0000257
258
259def _process_output(pipe, fbuffer, teefile=None, use_os_read=True):
260 if use_os_read:
261 data = os.read(pipe.fileno(), 1024)
262 else:
263 data = pipe.read()
264 fbuffer.write(data)
265 if teefile:
266 teefile.write(data)
267 teefile.flush()
268
269
270def nuke_subprocess(subproc):
271 # the process has not terminated within timeout,
272 # kill it via an escalating series of signals.
273 signal_queue = [signal.SIGTERM, signal.SIGKILL]
274 for sig in signal_queue:
275 try:
276 os.kill(subproc.pid, sig)
277 # The process may have died before we could kill it.
278 except OSError:
279 pass
280
281 for i in range(5):
282 rc = subproc.poll()
283 if rc != None:
mbligh8ea61e22008-05-09 18:09:37 +0000284 return rc
mbligh63073c92008-03-31 16:49:32 +0000285 time.sleep(1)
286
287
288def nuke_pid(pid):
289 # the process has not terminated within timeout,
290 # kill it via an escalating series of signals.
291 signal_queue = [signal.SIGTERM, signal.SIGKILL]
292 for sig in signal_queue:
293 try:
294 os.kill(pid, sig)
295
296 # The process may have died before we could kill it.
297 except OSError:
298 pass
299
300 try:
301 for i in range(5):
302 status = os.waitpid(pid, os.WNOHANG)[0]
303 if status == pid:
304 return
305 time.sleep(1)
306
307 if status != pid:
308 raise error.AutoservRunError('Could not kill %d'
309 % pid, None)
310
311 # the process died before we join it.
312 except OSError:
313 pass
314
315
316def _process_output(pipe, fbuffer, teefile=None, use_os_read=True):
317 if use_os_read:
318 data = os.read(pipe.fileno(), 1024)
319 else:
320 data = pipe.read()
321 fbuffer.write(data)
322 if teefile:
323 teefile.write(data)
324 teefile.flush()
325
326
327def system(command, timeout=None, ignore_status=False):
328 return run(command, timeout, ignore_status,
329 stdout_tee=sys.stdout, stderr_tee=sys.stderr).exit_status
330
331
mbligh8ea61e22008-05-09 18:09:37 +0000332def system_output(command, timeout=None, ignore_status=False,
333 retain_output=False):
334 if retain_output:
335 out = run(command, timeout, ignore_status,
336 stdout_tee=sys.stdout, stderr_tee=sys.stderr).stdout
337 else:
338 out = run(command, timeout, ignore_status).stdout
mbligh63073c92008-03-31 16:49:32 +0000339 if out[-1:] == '\n': out = out[:-1]
340 return out
341
mblighc1cbc992008-05-27 20:01:45 +0000342"""
343This function is used when there is a need to run more than one
344job simultaneously starting exactly at the same time. It basically returns
345a modified control file (containing the synchronization code prepended)
346whenever it is ready to run the control file. The synchronization
347is done using barriers to make sure that the jobs start at the same time.
348
349Here is how the synchronization is done to make sure that the tests
350start at exactly the same time on the client.
351sc_bar is a server barrier and s_bar, c_bar are the normal barriers
352
353 Job1 Job2 ...... JobN
354 Server: | sc_bar
355 Server: | s_bar ...... s_bar
356 Server: | at.run() at.run() ...... at.run()
357 ----------|------------------------------------------------------
358 Client | sc_bar
359 Client | c_bar c_bar ...... c_bar
360 Client | <run test> <run test> ...... <run test>
361
362
363PARAMS:
364 control_file : The control file which to which the above synchronization
365 code would be prepended to
366 host_name : The host name on which the job is going to run
367 host_num (non negative) : A number to identify the machine so that we have
368 different sets of s_bar_ports for each of the machines.
369 instance : The number of the job
370 num_jobs : Total number of jobs that are going to run in parallel with
371 this job starting at the same time
372 port_base : Port number that is used to derive the actual barrier ports.
373
374RETURN VALUE:
375 The modified control file.
376
377"""
378def get_sync_control_file(control, host_name, host_num,
379 instance, num_jobs, port_base=63100):
380 sc_bar_port = port_base
381 c_bar_port = port_base
382 if host_num < 0:
383 print "Please provide a non negative number for the host"
384 return None
385 s_bar_port = port_base + 1 + host_num # The set of s_bar_ports are
386 # the same for a given machine
387
388 sc_bar_timeout = 180
389 s_bar_timeout = c_bar_timeout = 120
390
391 # The barrier code snippet is prepended into the conrol file
392 # dynamically before at.run() is called finally.
393 control_new = []
394
395 # jobid is the unique name used to identify the processes
396 # trying to reach the barriers
397 jobid = "%s#%d" % (host_name, instance)
398
399 rendv = []
400 # rendvstr is a temp holder for the rendezvous list of the processes
401 for n in range(num_jobs):
402 rendv.append("'%s#%d'" % (host_name, n))
403 rendvstr = ",".join(rendv)
404
405 if instance == 0:
406 # Do the setup and wait at the server barrier
407 # Clean up the tmp and the control dirs for the first instance
408 control_new.append('if os.path.exists(job.tmpdir):')
409 control_new.append("\t system('umount -f %s > /dev/null"
410 "2> /dev/null' % job.tmpdir,"
411 "ignore_status=True)")
412 control_new.append("\t system('rm -rf ' + job.tmpdir)")
413 control_new.append(
414 'b0 = job.barrier("%s", "sc_bar", %d, port=%d)'
415 % (jobid, sc_bar_timeout, sc_bar_port))
416 control_new.append(
417 'b0.rendevous_servers("PARALLEL_MASTER", "%s")'
418 % jobid)
419
420 elif instance == 1:
421 # Wait at the server barrier to wait for instance=0
422 # process to complete setup
423 b0 = barrier.barrier("PARALLEL_MASTER", "sc_bar", sc_bar_timeout,
424 port=sc_bar_port)
425 b0.rendevous_servers("PARALLEL_MASTER", jobid)
426
427 if(num_jobs > 2):
428 b1 = barrier.barrier(jobid, "s_bar", s_bar_timeout,
429 port=s_bar_port)
430 b1.rendevous(rendvstr)
431
432 else:
433 # For the rest of the clients
434 b2 = barrier.barrier(jobid, "s_bar", s_bar_timeout, port=s_bar_port)
435 b2.rendevous(rendvstr)
436
437 # Client side barrier for all the tests to start at the same time
438 control_new.append('b1 = job.barrier("%s", "c_bar", %d, port=%d)'
439 % (jobid, c_bar_timeout, c_bar_port))
440 control_new.append("b1.rendevous(%s)" % rendvstr)
441
442 # Stick in the rest of the control file
443 control_new.append(control)
444
445 return "\n".join(control_new)
446
mbligh63073c92008-03-31 16:49:32 +0000447
448class CmdResult(object):
449 """
450 Command execution result.
451
452 command: String containing the command line itself
453 exit_status: Integer exit code of the process
454 stdout: String containing stdout of the process
455 stderr: String containing stderr of the process
456 duration: Elapsed wall clock time running the process
457 """
458
459
460 def __init__(self, command = None):
461 self.command = command
462 self.exit_status = None
463 self.stdout = ""
464 self.stderr = ""
465 self.duration = 0
466
467
468 def __repr__(self):
469 wrapper = textwrap.TextWrapper(width = 78,
470 initial_indent="\n ",
471 subsequent_indent=" ")
472
473 stdout = self.stdout.rstrip()
474 if stdout:
475 stdout = "\nstdout:\n%s" % stdout
476
477 stderr = self.stderr.rstrip()
478 if stderr:
479 stderr = "\nstderr:\n%s" % stderr
480
481 return ("* Command: %s\n"
482 "Exit status: %s\n"
483 "Duration: %s\n"
484 "%s"
485 "%s"
486 % (wrapper.fill(self.command), self.exit_status,
487 self.duration, stdout, stderr))
488
489
mbligh462c0152008-03-13 15:37:10 +0000490class run_randomly:
mbligh63073c92008-03-31 16:49:32 +0000491
492
mbligh462c0152008-03-13 15:37:10 +0000493 def __init__(self):
494 self.test_list = []
495
496
497 def add(self, *args, **dargs):
498 test = (args, dargs)
499 self.test_list.append(test)
500
501
502 def run(self, fn):
503 while self.test_list:
504 test_index = random.randint(0, len(self.test_list)-1)
505 (args, dargs) = self.test_list.pop(test_index)
506 fn(*args, **dargs)