blob: 92e53f733efe8b77ea0f00c41dd46596d975c24a [file] [log] [blame]
mbligh63073c92008-03-31 16:49:32 +00001#!/usr/bin/python
2#
3# Copyright 2008 Google Inc. Released under the GPL v2
4
5import os, pickle, random, re, select, shutil, signal, StringIO, subprocess
mbligh02ff2d52008-06-03 15:00:21 +00006import socket, sys, time, textwrap, urllib, urlparse
mblighc1cbc992008-05-27 20:01:45 +00007import error, barrier
mbligh6231cd62008-02-02 19:18:33 +00008
mblighde0d47e2008-03-28 14:37:18 +00009
jadmanski5182e162008-05-13 21:48:16 +000010def read_one_line(filename):
11 return open(filename, 'r').readline().strip()
12
13
14def write_one_line(filename, str):
15 open(filename, 'w').write(str.rstrip() + "\n")
16
17
mblighde0d47e2008-03-28 14:37:18 +000018def read_keyval(path):
19 """
20 Read a key-value pair format file into a dictionary, and return it.
21 Takes either a filename or directory name as input. If it's a
22 directory name, we assume you want the file to be called keyval.
23 """
24 if os.path.isdir(path):
25 path = os.path.join(path, 'keyval')
26 keyval = {}
27 for line in open(path):
28 line = re.sub('#.*', '', line.rstrip())
mblighcaa62c22008-04-07 21:51:17 +000029 if not re.search(r'^[-\w]+=', line):
mblighde0d47e2008-03-28 14:37:18 +000030 raise ValueError('Invalid format line: %s' % line)
31 key, value = line.split('=', 1)
32 if re.search('^\d+$', value):
33 value = int(value)
34 elif re.search('^(\d+\.)?\d+$', value):
35 value = float(value)
36 keyval[key] = value
37 return keyval
38
39
jadmanskicc549172008-05-21 18:11:51 +000040def write_keyval(path, dictionary, type_tag=None):
41 """
42 Write a key-value pair format file out to a file. This uses append
43 mode to open the file, so existing text will not be overwritten or
44 reparsed.
45
46 If type_tag is None, then the key must be composed of alphanumeric
47 characters (or dashes+underscores). However, if type-tag is not
48 null then the keys must also have "{type_tag}" as a suffix. At
49 the moment the only valid values of type_tag are "attr" and "perf".
50 """
mbligh6231cd62008-02-02 19:18:33 +000051 if os.path.isdir(path):
52 path = os.path.join(path, 'keyval')
53 keyval = open(path, 'a')
jadmanskicc549172008-05-21 18:11:51 +000054
55 if type_tag is None:
56 key_regex = re.compile(r'^[-\w]+$')
57 else:
58 if type_tag not in ('attr', 'perf'):
59 raise ValueError('Invalid type tag: %s' % type_tag)
60 escaped_tag = re.escape(type_tag)
61 key_regex = re.compile(r'^[-\w]+\{%s\}$' % escaped_tag)
mbligh6231cd62008-02-02 19:18:33 +000062 try:
63 for key, value in dictionary.iteritems():
jadmanskicc549172008-05-21 18:11:51 +000064 if not key_regex.search(key):
mbligh6231cd62008-02-02 19:18:33 +000065 raise ValueError('Invalid key: %s' % key)
66 keyval.write('%s=%s\n' % (key, value))
67 finally:
68 keyval.close()
69
70
71def is_url(path):
72 """Return true if path looks like a URL"""
73 # for now, just handle http and ftp
74 url_parts = urlparse.urlparse(path)
75 return (url_parts[0] in ('http', 'ftp'))
76
77
mbligh02ff2d52008-06-03 15:00:21 +000078def urlopen(url, data=None, proxies=None, timeout=300):
79 """Wrapper to urllib.urlopen with timeout addition."""
80
81 # Save old timeout
82 old_timeout = socket.getdefaulttimeout()
83 socket.setdefaulttimeout(timeout)
84 try:
85 return urllib.urlopen(url, data=data, proxies=proxies)
86 finally:
87 socket.setdefaulttimeout(old_timeout)
88
89
90def urlretrieve(url, filename=None, reporthook=None, data=None, timeout=300):
91 """Wrapper to urllib.urlretrieve with timeout addition."""
92 old_timeout = socket.getdefaulttimeout()
93 socket.setdefaulttimeout(timeout)
94 try:
95 return urllib.urlretrieve(url, filename=filename,
96 reporthook=reporthook, data=data)
97 finally:
98 socket.setdefaulttimeout(old_timeout)
99
100
mbligh6231cd62008-02-02 19:18:33 +0000101def get_file(src, dest, permissions=None):
102 """Get a file from src, which can be local or a remote URL"""
103 if (src == dest):
104 return
105 if (is_url(src)):
106 print 'PWD: ' + os.getcwd()
107 print 'Fetching \n\t', src, '\n\t->', dest
108 try:
109 urllib.urlretrieve(src, dest)
110 except IOError, e:
mbligh63073c92008-03-31 16:49:32 +0000111 raise error.AutotestError('Unable to retrieve %s (to %s)'
mbligh6231cd62008-02-02 19:18:33 +0000112 % (src, dest), e)
113 else:
114 shutil.copyfile(src, dest)
115 if permissions:
116 os.chmod(dest, permissions)
117 return dest
118
119
120def unmap_url(srcdir, src, destdir='.'):
121 """
122 Receives either a path to a local file or a URL.
123 returns either the path to the local file, or the fetched URL
124
125 unmap_url('/usr/src', 'foo.tar', '/tmp')
126 = '/usr/src/foo.tar'
127 unmap_url('/usr/src', 'http://site/file', '/tmp')
128 = '/tmp/file'
129 (after retrieving it)
130 """
131 if is_url(src):
132 url_parts = urlparse.urlparse(src)
133 filename = os.path.basename(url_parts[2])
134 dest = os.path.join(destdir, filename)
135 return get_file(src, dest)
136 else:
137 return os.path.join(srcdir, src)
138
139
140def update_version(srcdir, preserve_srcdir, new_version, install,
141 *args, **dargs):
142 """
143 Make sure srcdir is version new_version
144
145 If not, delete it and install() the new version.
146
147 In the preserve_srcdir case, we just check it's up to date,
148 and if not, we rerun install, without removing srcdir
149 """
150 versionfile = os.path.join(srcdir, '.version')
151 install_needed = True
152
153 if os.path.exists(versionfile):
154 old_version = pickle.load(open(versionfile))
155 if old_version == new_version:
156 install_needed = False
157
158 if install_needed:
159 if not preserve_srcdir and os.path.exists(srcdir):
160 shutil.rmtree(srcdir)
161 install(*args, **dargs)
162 if os.path.exists(srcdir):
163 pickle.dump(new_version, open(versionfile, 'w'))
mbligh462c0152008-03-13 15:37:10 +0000164
165
mbligh63073c92008-03-31 16:49:32 +0000166def run(command, timeout=None, ignore_status=False,
167 stdout_tee=None, stderr_tee=None):
168 """
169 Run a command on the host.
170
171 Args:
172 command: the command line string
173 timeout: time limit in seconds before attempting to
174 kill the running process. The run() function
175 will take a few seconds longer than 'timeout'
176 to complete if it has to kill the process.
177 ignore_status: do not raise an exception, no matter what
178 the exit code of the command is.
179 stdout_tee: optional file-like object to which stdout data
180 will be written as it is generated (data will still
181 be stored in result.stdout)
182 stderr_tee: likewise for stderr
183
184 Returns:
185 a CmdResult object
186
187 Raises:
mbligh8ea61e22008-05-09 18:09:37 +0000188 CmdError: the exit code of the command
mbligh63073c92008-03-31 16:49:32 +0000189 execution was not 0
190 """
jadmanskid93d7d22008-05-29 21:37:29 +0000191 return join_bg_job(run_bg(command), command, timeout, ignore_status,
192 stdout_tee, stderr_tee)
mbligh63073c92008-03-31 16:49:32 +0000193
194
195def run_bg(command):
196 """Run the command in a subprocess and return the subprocess."""
197 result = CmdResult(command)
198 sp = subprocess.Popen(command, stdout=subprocess.PIPE,
199 stderr=subprocess.PIPE,
200 shell=True, executable="/bin/bash")
201 return sp, result
202
203
jadmanskid93d7d22008-05-29 21:37:29 +0000204def join_bg_job(bg_job, command, timeout=None, ignore_status=False,
mbligh63073c92008-03-31 16:49:32 +0000205 stdout_tee=None, stderr_tee=None):
206 """Join the subprocess with the current thread. See run description."""
207 sp, result = bg_job
208 stdout_file = StringIO.StringIO()
209 stderr_file = StringIO.StringIO()
mbligh8ea61e22008-05-09 18:09:37 +0000210 (ret, timeouterr) = (0, False)
mbligh63073c92008-03-31 16:49:32 +0000211
212 try:
213 # We are holding ends to stdin, stdout pipes
214 # hence we need to be sure to close those fds no mater what
215 start_time = time.time()
mbligh8ea61e22008-05-09 18:09:37 +0000216 (ret, timeouterr) = _wait_for_command(sp, start_time,
217 timeout, stdout_file, stderr_file,
218 stdout_tee, stderr_tee)
mbligh63073c92008-03-31 16:49:32 +0000219 result.exit_status = ret
mbligh63073c92008-03-31 16:49:32 +0000220 result.duration = time.time() - start_time
221 # don't use os.read now, so we get all the rest of the output
222 _process_output(sp.stdout, stdout_file, stdout_tee,
223 use_os_read=False)
224 _process_output(sp.stderr, stderr_file, stderr_tee,
225 use_os_read=False)
226 finally:
227 # close our ends of the pipes to the sp no matter what
228 sp.stdout.close()
229 sp.stderr.close()
230
231 result.stdout = stdout_file.getvalue()
232 result.stderr = stderr_file.getvalue()
233
mbligh8ea61e22008-05-09 18:09:37 +0000234 if result.exit_status != 0:
235 if timeouterr:
jadmanskid93d7d22008-05-29 21:37:29 +0000236 raise error.CmdError(command, result, "Command did not "
237 "complete within %d seconds" % timeout)
mbligh8ea61e22008-05-09 18:09:37 +0000238 elif not ignore_status:
jadmanskid93d7d22008-05-29 21:37:29 +0000239 raise error.CmdError(command, result,
240 "Command returned non-zero exit status")
mbligh63073c92008-03-31 16:49:32 +0000241
242 return result
243
mbligh8ea61e22008-05-09 18:09:37 +0000244# this returns a tuple with the return code and a flag to specify if the error
245# is due to the process not terminating within timeout
mbligh63073c92008-03-31 16:49:32 +0000246def _wait_for_command(subproc, start_time, timeout, stdout_file, stderr_file,
247 stdout_tee, stderr_tee):
248 if timeout:
249 stop_time = start_time + timeout
250 time_left = stop_time - time.time()
251 else:
252 time_left = None # so that select never times out
253 while not timeout or time_left > 0:
254 # select will return when stdout is ready (including when it is
255 # EOF, that is the process has terminated).
256 ready, _, _ = select.select([subproc.stdout, subproc.stderr],
257 [], [], time_left)
258 # os.read() has to be used instead of
259 # subproc.stdout.read() which will otherwise block
260 if subproc.stdout in ready:
261 _process_output(subproc.stdout, stdout_file,
262 stdout_tee)
263 if subproc.stderr in ready:
264 _process_output(subproc.stderr, stderr_file,
265 stderr_tee)
266
267 exit_status_indication = subproc.poll()
268
269 if exit_status_indication is not None:
mbligh8ea61e22008-05-09 18:09:37 +0000270 return (exit_status_indication, False)
271
mbligh63073c92008-03-31 16:49:32 +0000272 if timeout:
273 time_left = stop_time - time.time()
274
275 # the process has not terminated within timeout,
276 # kill it via an escalating series of signals.
277 if exit_status_indication is None:
mbligh8ea61e22008-05-09 18:09:37 +0000278 exit_status_indication = nuke_subprocess(subproc)
279
280 return (exit_status_indication, True)
mbligh63073c92008-03-31 16:49:32 +0000281
282
283def _process_output(pipe, fbuffer, teefile=None, use_os_read=True):
284 if use_os_read:
285 data = os.read(pipe.fileno(), 1024)
286 else:
287 data = pipe.read()
288 fbuffer.write(data)
289 if teefile:
290 teefile.write(data)
291 teefile.flush()
292
293
294def nuke_subprocess(subproc):
295 # the process has not terminated within timeout,
296 # kill it via an escalating series of signals.
297 signal_queue = [signal.SIGTERM, signal.SIGKILL]
298 for sig in signal_queue:
299 try:
300 os.kill(subproc.pid, sig)
301 # The process may have died before we could kill it.
302 except OSError:
303 pass
304
305 for i in range(5):
306 rc = subproc.poll()
307 if rc != None:
mbligh8ea61e22008-05-09 18:09:37 +0000308 return rc
mbligh63073c92008-03-31 16:49:32 +0000309 time.sleep(1)
310
311
312def nuke_pid(pid):
313 # the process has not terminated within timeout,
314 # kill it via an escalating series of signals.
315 signal_queue = [signal.SIGTERM, signal.SIGKILL]
316 for sig in signal_queue:
317 try:
318 os.kill(pid, sig)
319
320 # The process may have died before we could kill it.
321 except OSError:
322 pass
323
324 try:
325 for i in range(5):
326 status = os.waitpid(pid, os.WNOHANG)[0]
327 if status == pid:
328 return
329 time.sleep(1)
330
331 if status != pid:
332 raise error.AutoservRunError('Could not kill %d'
333 % pid, None)
334
335 # the process died before we join it.
336 except OSError:
337 pass
338
339
340def _process_output(pipe, fbuffer, teefile=None, use_os_read=True):
341 if use_os_read:
342 data = os.read(pipe.fileno(), 1024)
343 else:
344 data = pipe.read()
345 fbuffer.write(data)
346 if teefile:
347 teefile.write(data)
348 teefile.flush()
349
350
351def system(command, timeout=None, ignore_status=False):
352 return run(command, timeout, ignore_status,
353 stdout_tee=sys.stdout, stderr_tee=sys.stderr).exit_status
354
355
mbligh8ea61e22008-05-09 18:09:37 +0000356def system_output(command, timeout=None, ignore_status=False,
357 retain_output=False):
358 if retain_output:
359 out = run(command, timeout, ignore_status,
360 stdout_tee=sys.stdout, stderr_tee=sys.stderr).stdout
361 else:
362 out = run(command, timeout, ignore_status).stdout
mbligh63073c92008-03-31 16:49:32 +0000363 if out[-1:] == '\n': out = out[:-1]
364 return out
365
mblighc1cbc992008-05-27 20:01:45 +0000366"""
367This function is used when there is a need to run more than one
368job simultaneously starting exactly at the same time. It basically returns
369a modified control file (containing the synchronization code prepended)
370whenever it is ready to run the control file. The synchronization
371is done using barriers to make sure that the jobs start at the same time.
372
373Here is how the synchronization is done to make sure that the tests
374start at exactly the same time on the client.
375sc_bar is a server barrier and s_bar, c_bar are the normal barriers
376
377 Job1 Job2 ...... JobN
378 Server: | sc_bar
379 Server: | s_bar ...... s_bar
380 Server: | at.run() at.run() ...... at.run()
381 ----------|------------------------------------------------------
382 Client | sc_bar
383 Client | c_bar c_bar ...... c_bar
384 Client | <run test> <run test> ...... <run test>
385
386
387PARAMS:
388 control_file : The control file which to which the above synchronization
389 code would be prepended to
390 host_name : The host name on which the job is going to run
391 host_num (non negative) : A number to identify the machine so that we have
392 different sets of s_bar_ports for each of the machines.
393 instance : The number of the job
394 num_jobs : Total number of jobs that are going to run in parallel with
395 this job starting at the same time
396 port_base : Port number that is used to derive the actual barrier ports.
397
398RETURN VALUE:
399 The modified control file.
400
401"""
402def get_sync_control_file(control, host_name, host_num,
403 instance, num_jobs, port_base=63100):
404 sc_bar_port = port_base
405 c_bar_port = port_base
406 if host_num < 0:
407 print "Please provide a non negative number for the host"
408 return None
409 s_bar_port = port_base + 1 + host_num # The set of s_bar_ports are
410 # the same for a given machine
411
412 sc_bar_timeout = 180
413 s_bar_timeout = c_bar_timeout = 120
414
415 # The barrier code snippet is prepended into the conrol file
416 # dynamically before at.run() is called finally.
417 control_new = []
418
419 # jobid is the unique name used to identify the processes
420 # trying to reach the barriers
421 jobid = "%s#%d" % (host_name, instance)
422
423 rendv = []
424 # rendvstr is a temp holder for the rendezvous list of the processes
425 for n in range(num_jobs):
426 rendv.append("'%s#%d'" % (host_name, n))
427 rendvstr = ",".join(rendv)
428
429 if instance == 0:
430 # Do the setup and wait at the server barrier
431 # Clean up the tmp and the control dirs for the first instance
432 control_new.append('if os.path.exists(job.tmpdir):')
433 control_new.append("\t system('umount -f %s > /dev/null"
434 "2> /dev/null' % job.tmpdir,"
435 "ignore_status=True)")
436 control_new.append("\t system('rm -rf ' + job.tmpdir)")
437 control_new.append(
438 'b0 = job.barrier("%s", "sc_bar", %d, port=%d)'
439 % (jobid, sc_bar_timeout, sc_bar_port))
440 control_new.append(
441 'b0.rendevous_servers("PARALLEL_MASTER", "%s")'
442 % jobid)
443
444 elif instance == 1:
445 # Wait at the server barrier to wait for instance=0
446 # process to complete setup
447 b0 = barrier.barrier("PARALLEL_MASTER", "sc_bar", sc_bar_timeout,
448 port=sc_bar_port)
449 b0.rendevous_servers("PARALLEL_MASTER", jobid)
450
451 if(num_jobs > 2):
452 b1 = barrier.barrier(jobid, "s_bar", s_bar_timeout,
453 port=s_bar_port)
454 b1.rendevous(rendvstr)
455
456 else:
457 # For the rest of the clients
458 b2 = barrier.barrier(jobid, "s_bar", s_bar_timeout, port=s_bar_port)
459 b2.rendevous(rendvstr)
460
461 # Client side barrier for all the tests to start at the same time
462 control_new.append('b1 = job.barrier("%s", "c_bar", %d, port=%d)'
463 % (jobid, c_bar_timeout, c_bar_port))
464 control_new.append("b1.rendevous(%s)" % rendvstr)
465
466 # Stick in the rest of the control file
467 control_new.append(control)
468
469 return "\n".join(control_new)
470
mbligh63073c92008-03-31 16:49:32 +0000471
472class CmdResult(object):
473 """
474 Command execution result.
475
476 command: String containing the command line itself
477 exit_status: Integer exit code of the process
478 stdout: String containing stdout of the process
479 stderr: String containing stderr of the process
480 duration: Elapsed wall clock time running the process
481 """
482
483
mbligh526d0542008-06-06 15:17:44 +0000484 def __init__(self, command=None, stdout="", stderr="",
485 exit_status=None, duration=0):
mbligh63073c92008-03-31 16:49:32 +0000486 self.command = command
mbligh526d0542008-06-06 15:17:44 +0000487 self.exit_status = exit_status
488 self.stdout = stdout
489 self.stderr = stderr
490 self.duration = duration
mbligh63073c92008-03-31 16:49:32 +0000491
492
493 def __repr__(self):
494 wrapper = textwrap.TextWrapper(width = 78,
495 initial_indent="\n ",
496 subsequent_indent=" ")
497
498 stdout = self.stdout.rstrip()
499 if stdout:
500 stdout = "\nstdout:\n%s" % stdout
501
502 stderr = self.stderr.rstrip()
503 if stderr:
504 stderr = "\nstderr:\n%s" % stderr
505
506 return ("* Command: %s\n"
507 "Exit status: %s\n"
508 "Duration: %s\n"
509 "%s"
510 "%s"
511 % (wrapper.fill(self.command), self.exit_status,
512 self.duration, stdout, stderr))
513
514
mbligh462c0152008-03-13 15:37:10 +0000515class run_randomly:
mbligh8445dcb2008-06-04 15:05:29 +0000516 def __init__(self, run_sequentially=False):
517 # Run sequentially is for debugging control files
mbligh462c0152008-03-13 15:37:10 +0000518 self.test_list = []
mbligh8445dcb2008-06-04 15:05:29 +0000519 self.run_sequentially = run_sequentially
mbligh462c0152008-03-13 15:37:10 +0000520
521
522 def add(self, *args, **dargs):
523 test = (args, dargs)
524 self.test_list.append(test)
525
526
527 def run(self, fn):
528 while self.test_list:
529 test_index = random.randint(0, len(self.test_list)-1)
mbligh8445dcb2008-06-04 15:05:29 +0000530 if self.run_sequentially:
531 test_index = 0
mbligh462c0152008-03-13 15:37:10 +0000532 (args, dargs) = self.test_list.pop(test_index)
533 fn(*args, **dargs)