blob: c9e8370ab2964debfa4a21f17e94399c7624f953 [file] [log] [blame]
mbligh63073c92008-03-31 16:49:32 +00001#!/usr/bin/python
2#
3# Copyright 2008 Google Inc. Released under the GPL v2
4
5import os, pickle, random, re, select, shutil, signal, StringIO, subprocess
mbligh0a297572008-04-02 00:13:45 +00006import sys, time, textwrap, urllib, urlparse
mblighc1cbc992008-05-27 20:01:45 +00007import error, barrier
mbligh6231cd62008-02-02 19:18:33 +00008
mblighde0d47e2008-03-28 14:37:18 +00009
jadmanski5182e162008-05-13 21:48:16 +000010def read_one_line(filename):
11 return open(filename, 'r').readline().strip()
12
13
14def write_one_line(filename, str):
15 open(filename, 'w').write(str.rstrip() + "\n")
16
17
mblighde0d47e2008-03-28 14:37:18 +000018def read_keyval(path):
19 """
20 Read a key-value pair format file into a dictionary, and return it.
21 Takes either a filename or directory name as input. If it's a
22 directory name, we assume you want the file to be called keyval.
23 """
24 if os.path.isdir(path):
25 path = os.path.join(path, 'keyval')
26 keyval = {}
27 for line in open(path):
28 line = re.sub('#.*', '', line.rstrip())
mblighcaa62c22008-04-07 21:51:17 +000029 if not re.search(r'^[-\w]+=', line):
mblighde0d47e2008-03-28 14:37:18 +000030 raise ValueError('Invalid format line: %s' % line)
31 key, value = line.split('=', 1)
32 if re.search('^\d+$', value):
33 value = int(value)
34 elif re.search('^(\d+\.)?\d+$', value):
35 value = float(value)
36 keyval[key] = value
37 return keyval
38
39
jadmanskicc549172008-05-21 18:11:51 +000040def write_keyval(path, dictionary, type_tag=None):
41 """
42 Write a key-value pair format file out to a file. This uses append
43 mode to open the file, so existing text will not be overwritten or
44 reparsed.
45
46 If type_tag is None, then the key must be composed of alphanumeric
47 characters (or dashes+underscores). However, if type-tag is not
48 null then the keys must also have "{type_tag}" as a suffix. At
49 the moment the only valid values of type_tag are "attr" and "perf".
50 """
mbligh6231cd62008-02-02 19:18:33 +000051 if os.path.isdir(path):
52 path = os.path.join(path, 'keyval')
53 keyval = open(path, 'a')
jadmanskicc549172008-05-21 18:11:51 +000054
55 if type_tag is None:
56 key_regex = re.compile(r'^[-\w]+$')
57 else:
58 if type_tag not in ('attr', 'perf'):
59 raise ValueError('Invalid type tag: %s' % type_tag)
60 escaped_tag = re.escape(type_tag)
61 key_regex = re.compile(r'^[-\w]+\{%s\}$' % escaped_tag)
mbligh6231cd62008-02-02 19:18:33 +000062 try:
63 for key, value in dictionary.iteritems():
jadmanskicc549172008-05-21 18:11:51 +000064 if not key_regex.search(key):
mbligh6231cd62008-02-02 19:18:33 +000065 raise ValueError('Invalid key: %s' % key)
66 keyval.write('%s=%s\n' % (key, value))
67 finally:
68 keyval.close()
69
70
71def is_url(path):
72 """Return true if path looks like a URL"""
73 # for now, just handle http and ftp
74 url_parts = urlparse.urlparse(path)
75 return (url_parts[0] in ('http', 'ftp'))
76
77
78def get_file(src, dest, permissions=None):
79 """Get a file from src, which can be local or a remote URL"""
80 if (src == dest):
81 return
82 if (is_url(src)):
83 print 'PWD: ' + os.getcwd()
84 print 'Fetching \n\t', src, '\n\t->', dest
85 try:
86 urllib.urlretrieve(src, dest)
87 except IOError, e:
mbligh63073c92008-03-31 16:49:32 +000088 raise error.AutotestError('Unable to retrieve %s (to %s)'
mbligh6231cd62008-02-02 19:18:33 +000089 % (src, dest), e)
90 else:
91 shutil.copyfile(src, dest)
92 if permissions:
93 os.chmod(dest, permissions)
94 return dest
95
96
97def unmap_url(srcdir, src, destdir='.'):
98 """
99 Receives either a path to a local file or a URL.
100 returns either the path to the local file, or the fetched URL
101
102 unmap_url('/usr/src', 'foo.tar', '/tmp')
103 = '/usr/src/foo.tar'
104 unmap_url('/usr/src', 'http://site/file', '/tmp')
105 = '/tmp/file'
106 (after retrieving it)
107 """
108 if is_url(src):
109 url_parts = urlparse.urlparse(src)
110 filename = os.path.basename(url_parts[2])
111 dest = os.path.join(destdir, filename)
112 return get_file(src, dest)
113 else:
114 return os.path.join(srcdir, src)
115
116
117def update_version(srcdir, preserve_srcdir, new_version, install,
118 *args, **dargs):
119 """
120 Make sure srcdir is version new_version
121
122 If not, delete it and install() the new version.
123
124 In the preserve_srcdir case, we just check it's up to date,
125 and if not, we rerun install, without removing srcdir
126 """
127 versionfile = os.path.join(srcdir, '.version')
128 install_needed = True
129
130 if os.path.exists(versionfile):
131 old_version = pickle.load(open(versionfile))
132 if old_version == new_version:
133 install_needed = False
134
135 if install_needed:
136 if not preserve_srcdir and os.path.exists(srcdir):
137 shutil.rmtree(srcdir)
138 install(*args, **dargs)
139 if os.path.exists(srcdir):
140 pickle.dump(new_version, open(versionfile, 'w'))
mbligh462c0152008-03-13 15:37:10 +0000141
142
mbligh63073c92008-03-31 16:49:32 +0000143def run(command, timeout=None, ignore_status=False,
144 stdout_tee=None, stderr_tee=None):
145 """
146 Run a command on the host.
147
148 Args:
149 command: the command line string
150 timeout: time limit in seconds before attempting to
151 kill the running process. The run() function
152 will take a few seconds longer than 'timeout'
153 to complete if it has to kill the process.
154 ignore_status: do not raise an exception, no matter what
155 the exit code of the command is.
156 stdout_tee: optional file-like object to which stdout data
157 will be written as it is generated (data will still
158 be stored in result.stdout)
159 stderr_tee: likewise for stderr
160
161 Returns:
162 a CmdResult object
163
164 Raises:
mbligh8ea61e22008-05-09 18:09:37 +0000165 CmdError: the exit code of the command
mbligh63073c92008-03-31 16:49:32 +0000166 execution was not 0
167 """
jadmanskid93d7d22008-05-29 21:37:29 +0000168 return join_bg_job(run_bg(command), command, timeout, ignore_status,
169 stdout_tee, stderr_tee)
mbligh63073c92008-03-31 16:49:32 +0000170
171
172def run_bg(command):
173 """Run the command in a subprocess and return the subprocess."""
174 result = CmdResult(command)
175 sp = subprocess.Popen(command, stdout=subprocess.PIPE,
176 stderr=subprocess.PIPE,
177 shell=True, executable="/bin/bash")
178 return sp, result
179
180
jadmanskid93d7d22008-05-29 21:37:29 +0000181def join_bg_job(bg_job, command, timeout=None, ignore_status=False,
mbligh63073c92008-03-31 16:49:32 +0000182 stdout_tee=None, stderr_tee=None):
183 """Join the subprocess with the current thread. See run description."""
184 sp, result = bg_job
185 stdout_file = StringIO.StringIO()
186 stderr_file = StringIO.StringIO()
mbligh8ea61e22008-05-09 18:09:37 +0000187 (ret, timeouterr) = (0, False)
mbligh63073c92008-03-31 16:49:32 +0000188
189 try:
190 # We are holding ends to stdin, stdout pipes
191 # hence we need to be sure to close those fds no mater what
192 start_time = time.time()
mbligh8ea61e22008-05-09 18:09:37 +0000193 (ret, timeouterr) = _wait_for_command(sp, start_time,
194 timeout, stdout_file, stderr_file,
195 stdout_tee, stderr_tee)
mbligh63073c92008-03-31 16:49:32 +0000196 result.exit_status = ret
mbligh63073c92008-03-31 16:49:32 +0000197 result.duration = time.time() - start_time
198 # don't use os.read now, so we get all the rest of the output
199 _process_output(sp.stdout, stdout_file, stdout_tee,
200 use_os_read=False)
201 _process_output(sp.stderr, stderr_file, stderr_tee,
202 use_os_read=False)
203 finally:
204 # close our ends of the pipes to the sp no matter what
205 sp.stdout.close()
206 sp.stderr.close()
207
208 result.stdout = stdout_file.getvalue()
209 result.stderr = stderr_file.getvalue()
210
mbligh8ea61e22008-05-09 18:09:37 +0000211 if result.exit_status != 0:
212 if timeouterr:
jadmanskid93d7d22008-05-29 21:37:29 +0000213 raise error.CmdError(command, result, "Command did not "
214 "complete within %d seconds" % timeout)
mbligh8ea61e22008-05-09 18:09:37 +0000215 elif not ignore_status:
jadmanskid93d7d22008-05-29 21:37:29 +0000216 raise error.CmdError(command, result,
217 "Command returned non-zero exit status")
mbligh63073c92008-03-31 16:49:32 +0000218
219 return result
220
mbligh8ea61e22008-05-09 18:09:37 +0000221# this returns a tuple with the return code and a flag to specify if the error
222# is due to the process not terminating within timeout
mbligh63073c92008-03-31 16:49:32 +0000223def _wait_for_command(subproc, start_time, timeout, stdout_file, stderr_file,
224 stdout_tee, stderr_tee):
225 if timeout:
226 stop_time = start_time + timeout
227 time_left = stop_time - time.time()
228 else:
229 time_left = None # so that select never times out
230 while not timeout or time_left > 0:
231 # select will return when stdout is ready (including when it is
232 # EOF, that is the process has terminated).
233 ready, _, _ = select.select([subproc.stdout, subproc.stderr],
234 [], [], time_left)
235 # os.read() has to be used instead of
236 # subproc.stdout.read() which will otherwise block
237 if subproc.stdout in ready:
238 _process_output(subproc.stdout, stdout_file,
239 stdout_tee)
240 if subproc.stderr in ready:
241 _process_output(subproc.stderr, stderr_file,
242 stderr_tee)
243
244 exit_status_indication = subproc.poll()
245
246 if exit_status_indication is not None:
mbligh8ea61e22008-05-09 18:09:37 +0000247 return (exit_status_indication, False)
248
mbligh63073c92008-03-31 16:49:32 +0000249 if timeout:
250 time_left = stop_time - time.time()
251
252 # the process has not terminated within timeout,
253 # kill it via an escalating series of signals.
254 if exit_status_indication is None:
mbligh8ea61e22008-05-09 18:09:37 +0000255 exit_status_indication = nuke_subprocess(subproc)
256
257 return (exit_status_indication, True)
mbligh63073c92008-03-31 16:49:32 +0000258
259
260def _process_output(pipe, fbuffer, teefile=None, use_os_read=True):
261 if use_os_read:
262 data = os.read(pipe.fileno(), 1024)
263 else:
264 data = pipe.read()
265 fbuffer.write(data)
266 if teefile:
267 teefile.write(data)
268 teefile.flush()
269
270
271def nuke_subprocess(subproc):
272 # the process has not terminated within timeout,
273 # kill it via an escalating series of signals.
274 signal_queue = [signal.SIGTERM, signal.SIGKILL]
275 for sig in signal_queue:
276 try:
277 os.kill(subproc.pid, sig)
278 # The process may have died before we could kill it.
279 except OSError:
280 pass
281
282 for i in range(5):
283 rc = subproc.poll()
284 if rc != None:
mbligh8ea61e22008-05-09 18:09:37 +0000285 return rc
mbligh63073c92008-03-31 16:49:32 +0000286 time.sleep(1)
287
288
289def nuke_pid(pid):
290 # the process has not terminated within timeout,
291 # kill it via an escalating series of signals.
292 signal_queue = [signal.SIGTERM, signal.SIGKILL]
293 for sig in signal_queue:
294 try:
295 os.kill(pid, sig)
296
297 # The process may have died before we could kill it.
298 except OSError:
299 pass
300
301 try:
302 for i in range(5):
303 status = os.waitpid(pid, os.WNOHANG)[0]
304 if status == pid:
305 return
306 time.sleep(1)
307
308 if status != pid:
309 raise error.AutoservRunError('Could not kill %d'
310 % pid, None)
311
312 # the process died before we join it.
313 except OSError:
314 pass
315
316
317def _process_output(pipe, fbuffer, teefile=None, use_os_read=True):
318 if use_os_read:
319 data = os.read(pipe.fileno(), 1024)
320 else:
321 data = pipe.read()
322 fbuffer.write(data)
323 if teefile:
324 teefile.write(data)
325 teefile.flush()
326
327
328def system(command, timeout=None, ignore_status=False):
329 return run(command, timeout, ignore_status,
330 stdout_tee=sys.stdout, stderr_tee=sys.stderr).exit_status
331
332
mbligh8ea61e22008-05-09 18:09:37 +0000333def system_output(command, timeout=None, ignore_status=False,
334 retain_output=False):
335 if retain_output:
336 out = run(command, timeout, ignore_status,
337 stdout_tee=sys.stdout, stderr_tee=sys.stderr).stdout
338 else:
339 out = run(command, timeout, ignore_status).stdout
mbligh63073c92008-03-31 16:49:32 +0000340 if out[-1:] == '\n': out = out[:-1]
341 return out
342
mblighc1cbc992008-05-27 20:01:45 +0000343"""
344This function is used when there is a need to run more than one
345job simultaneously starting exactly at the same time. It basically returns
346a modified control file (containing the synchronization code prepended)
347whenever it is ready to run the control file. The synchronization
348is done using barriers to make sure that the jobs start at the same time.
349
350Here is how the synchronization is done to make sure that the tests
351start at exactly the same time on the client.
352sc_bar is a server barrier and s_bar, c_bar are the normal barriers
353
354 Job1 Job2 ...... JobN
355 Server: | sc_bar
356 Server: | s_bar ...... s_bar
357 Server: | at.run() at.run() ...... at.run()
358 ----------|------------------------------------------------------
359 Client | sc_bar
360 Client | c_bar c_bar ...... c_bar
361 Client | <run test> <run test> ...... <run test>
362
363
364PARAMS:
365 control_file : The control file which to which the above synchronization
366 code would be prepended to
367 host_name : The host name on which the job is going to run
368 host_num (non negative) : A number to identify the machine so that we have
369 different sets of s_bar_ports for each of the machines.
370 instance : The number of the job
371 num_jobs : Total number of jobs that are going to run in parallel with
372 this job starting at the same time
373 port_base : Port number that is used to derive the actual barrier ports.
374
375RETURN VALUE:
376 The modified control file.
377
378"""
379def get_sync_control_file(control, host_name, host_num,
380 instance, num_jobs, port_base=63100):
381 sc_bar_port = port_base
382 c_bar_port = port_base
383 if host_num < 0:
384 print "Please provide a non negative number for the host"
385 return None
386 s_bar_port = port_base + 1 + host_num # The set of s_bar_ports are
387 # the same for a given machine
388
389 sc_bar_timeout = 180
390 s_bar_timeout = c_bar_timeout = 120
391
392 # The barrier code snippet is prepended into the conrol file
393 # dynamically before at.run() is called finally.
394 control_new = []
395
396 # jobid is the unique name used to identify the processes
397 # trying to reach the barriers
398 jobid = "%s#%d" % (host_name, instance)
399
400 rendv = []
401 # rendvstr is a temp holder for the rendezvous list of the processes
402 for n in range(num_jobs):
403 rendv.append("'%s#%d'" % (host_name, n))
404 rendvstr = ",".join(rendv)
405
406 if instance == 0:
407 # Do the setup and wait at the server barrier
408 # Clean up the tmp and the control dirs for the first instance
409 control_new.append('if os.path.exists(job.tmpdir):')
410 control_new.append("\t system('umount -f %s > /dev/null"
411 "2> /dev/null' % job.tmpdir,"
412 "ignore_status=True)")
413 control_new.append("\t system('rm -rf ' + job.tmpdir)")
414 control_new.append(
415 'b0 = job.barrier("%s", "sc_bar", %d, port=%d)'
416 % (jobid, sc_bar_timeout, sc_bar_port))
417 control_new.append(
418 'b0.rendevous_servers("PARALLEL_MASTER", "%s")'
419 % jobid)
420
421 elif instance == 1:
422 # Wait at the server barrier to wait for instance=0
423 # process to complete setup
424 b0 = barrier.barrier("PARALLEL_MASTER", "sc_bar", sc_bar_timeout,
425 port=sc_bar_port)
426 b0.rendevous_servers("PARALLEL_MASTER", jobid)
427
428 if(num_jobs > 2):
429 b1 = barrier.barrier(jobid, "s_bar", s_bar_timeout,
430 port=s_bar_port)
431 b1.rendevous(rendvstr)
432
433 else:
434 # For the rest of the clients
435 b2 = barrier.barrier(jobid, "s_bar", s_bar_timeout, port=s_bar_port)
436 b2.rendevous(rendvstr)
437
438 # Client side barrier for all the tests to start at the same time
439 control_new.append('b1 = job.barrier("%s", "c_bar", %d, port=%d)'
440 % (jobid, c_bar_timeout, c_bar_port))
441 control_new.append("b1.rendevous(%s)" % rendvstr)
442
443 # Stick in the rest of the control file
444 control_new.append(control)
445
446 return "\n".join(control_new)
447
mbligh63073c92008-03-31 16:49:32 +0000448
449class CmdResult(object):
450 """
451 Command execution result.
452
453 command: String containing the command line itself
454 exit_status: Integer exit code of the process
455 stdout: String containing stdout of the process
456 stderr: String containing stderr of the process
457 duration: Elapsed wall clock time running the process
458 """
459
460
461 def __init__(self, command = None):
462 self.command = command
463 self.exit_status = None
464 self.stdout = ""
465 self.stderr = ""
466 self.duration = 0
467
468
469 def __repr__(self):
470 wrapper = textwrap.TextWrapper(width = 78,
471 initial_indent="\n ",
472 subsequent_indent=" ")
473
474 stdout = self.stdout.rstrip()
475 if stdout:
476 stdout = "\nstdout:\n%s" % stdout
477
478 stderr = self.stderr.rstrip()
479 if stderr:
480 stderr = "\nstderr:\n%s" % stderr
481
482 return ("* Command: %s\n"
483 "Exit status: %s\n"
484 "Duration: %s\n"
485 "%s"
486 "%s"
487 % (wrapper.fill(self.command), self.exit_status,
488 self.duration, stdout, stderr))
489
490
mbligh462c0152008-03-13 15:37:10 +0000491class run_randomly:
mbligh63073c92008-03-31 16:49:32 +0000492
493
mbligh462c0152008-03-13 15:37:10 +0000494 def __init__(self):
495 self.test_list = []
496
497
498 def add(self, *args, **dargs):
499 test = (args, dargs)
500 self.test_list.append(test)
501
502
503 def run(self, fn):
504 while self.test_list:
505 test_index = random.randint(0, len(self.test_list)-1)
506 (args, dargs) = self.test_list.pop(test_index)
507 fn(*args, **dargs)