blob: 0c272ec70b6eaa16ed8c911d746c7645831b0ee7 [file] [log] [blame]
mblighf1c52842007-10-16 15:21:38 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
9__author__ = """
10Martin J. Bligh <mbligh@google.com>
11Andy Whitcroft <apw@shadowen.org>
12"""
13
mblighf4e04152008-02-21 16:05:53 +000014import os, sys, re, time, select
mbligh03f4fc72007-11-29 20:56:14 +000015import test
mblighf1c52842007-10-16 15:21:38 +000016from utils import *
mblighf31b0c02007-11-29 18:19:22 +000017from common.error import *
mblighf1c52842007-10-16 15:21:38 +000018
mbligh3f4bced2007-11-05 17:55:53 +000019# this magic incantation should give us access to a client library
20server_dir = os.path.dirname(__file__)
21client_dir = os.path.join(server_dir, "..", "client", "bin")
22sys.path.append(client_dir)
23import fd_stack
24sys.path.pop()
25
mblighed5a4102007-11-20 00:46:41 +000026# load up a control segment
27# these are all stored in <server_dir>/control_segments
28def load_control_segment(name):
29 server_dir = os.path.dirname(os.path.abspath(__file__))
mbligh7f86e0b2007-11-24 19:45:07 +000030 script_file = os.path.join(server_dir, "control_segments", name)
mblighed5a4102007-11-20 00:46:41 +000031 if os.path.exists(script_file):
32 return file(script_file).read()
33 else:
34 return ""
35
36
mblighf1c52842007-10-16 15:21:38 +000037preamble = """\
38import os, sys
39
mblighb3c9f372008-01-14 16:39:44 +000040import hosts, autotest, kvm, git, standalone_profiler
mblighd0868ab2007-12-04 22:47:46 +000041import source_kernel, rpm_kernel, deb_kernel, git_kernel
mbligh03f4fc72007-11-29 20:56:14 +000042from common.error import *
mblighe1417fa2007-12-10 16:55:13 +000043from common import barrier
mblighf1c52842007-10-16 15:21:38 +000044from subcommand import *
45from utils import run, get_tmp_dir, sh_escape
46
mbligh119c12a2007-11-12 22:13:44 +000047autotest.Autotest.job = job
mbligh31a49de2007-11-05 18:41:19 +000048hosts.SSHHost.job = job
mblighe1417fa2007-12-10 16:55:13 +000049barrier = barrier.barrier
mbligh1fb77cc2008-02-27 16:41:20 +000050
51if len(machines) > 1:
52 open('.machines', 'w').write('\\n'.join(machines) + '\\n')
mblighf1c52842007-10-16 15:21:38 +000053"""
54
55client_wrapper = """
56at = autotest.Autotest()
57
58def run_client(machine):
59 host = hosts.SSHHost(machine)
60 at.run(control, host=host)
61
mbligh1fb77cc2008-02-27 16:41:20 +000062parallel_simple(run_client, machines)
mblighf1c52842007-10-16 15:21:38 +000063"""
64
mbligh303ccac2007-11-05 18:07:28 +000065crashdumps = """
66def crashdumps(machine):
67 host = hosts.SSHHost(machine, initialize=False)
68 host.get_crashdumps(test_start_time)
69
70parallel_simple(crashdumps, machines, log=False)
71"""
72
mbligh98ff1462007-12-19 16:27:55 +000073reboot_segment="""\
74def reboot(machine):
mbligh17f0c662007-11-05 18:28:19 +000075 host = hosts.SSHHost(machine, initialize=False)
76 host.reboot()
mblighf1c52842007-10-16 15:21:38 +000077
mbligh98ff1462007-12-19 16:27:55 +000078parallel_simple(reboot, machines, log=False)
mblighf1c52842007-10-16 15:21:38 +000079"""
80
mblighf36243d2007-10-30 15:36:16 +000081install="""\
82def install(machine):
mbligh17f0c662007-11-05 18:28:19 +000083 host = hosts.SSHHost(machine, initialize=False)
84 host.machine_install()
mblighf36243d2007-10-30 15:36:16 +000085
mbligh009b25a2007-11-05 18:38:51 +000086parallel_simple(install, machines, log=False)
mblighf36243d2007-10-30 15:36:16 +000087"""
88
mbligh7f86e0b2007-11-24 19:45:07 +000089# load up the verifier control segment, with an optional site-specific hook
mblighed5a4102007-11-20 00:46:41 +000090verify = load_control_segment("site_verify")
91verify += load_control_segment("verify")
mbligh1d42d4e2007-11-05 22:42:00 +000092
mbligh7f86e0b2007-11-24 19:45:07 +000093# load up the repair control segment, with an optional site-specific hook
94repair = load_control_segment("site_repair")
95repair += load_control_segment("repair")
96
mbligh1d42d4e2007-11-05 22:42:00 +000097
mbligh970b94e2008-01-24 16:29:34 +000098# load up site-specific code for generating site-specific job data
99try:
100 import site_job
101 get_site_job_data = site_job.get_site_job_data
102 del site_job
103except ImportError:
104 # by default provide a stub that generates no site data
105 def get_site_job_data(job):
106 return {}
107
108
mblighf1c52842007-10-16 15:21:38 +0000109class server_job:
110 """The actual job against which we do everything.
111
112 Properties:
113 autodir
114 The top level autotest directory (/usr/local/autotest).
115 serverdir
116 <autodir>/server/
117 clientdir
118 <autodir>/client/
119 conmuxdir
120 <autodir>/conmux/
121 testdir
122 <autodir>/server/tests/
123 control
124 the control file for this job
125 """
126
mblighe8b37a92007-12-19 15:54:11 +0000127 def __init__(self, control, args, resultdir, label, user, machines,
128 client = False):
mblighf1c52842007-10-16 15:21:38 +0000129 """
130 control
131 The control file (pathname of)
132 args
133 args to pass to the control file
134 resultdir
135 where to throw the results
mbligh18420c22007-10-16 22:27:14 +0000136 label
137 label for the job
mblighf1c52842007-10-16 15:21:38 +0000138 user
139 Username for the job (email address)
140 client
141 True if a client-side control file
142 """
mbligh05269362007-10-16 16:58:11 +0000143 path = os.path.dirname(sys.modules['server_job'].__file__)
mblighf1c52842007-10-16 15:21:38 +0000144 self.autodir = os.path.abspath(os.path.join(path, '..'))
145 self.serverdir = os.path.join(self.autodir, 'server')
mbligh05269362007-10-16 16:58:11 +0000146 self.testdir = os.path.join(self.serverdir, 'tests')
147 self.tmpdir = os.path.join(self.serverdir, 'tmp')
mblighf1c52842007-10-16 15:21:38 +0000148 self.conmuxdir = os.path.join(self.autodir, 'conmux')
149 self.clientdir = os.path.join(self.autodir, 'client')
mblighe25fd5b2008-01-22 17:23:37 +0000150 if control:
151 self.control = open(control, 'r').read()
152 self.control = re.sub('\r', '', self.control)
153 else:
154 self.control = None
mblighf1c52842007-10-16 15:21:38 +0000155 self.resultdir = resultdir
156 if not os.path.exists(resultdir):
157 os.mkdir(resultdir)
mbligh3ccb8592007-11-05 18:13:40 +0000158 self.debugdir = os.path.join(resultdir, 'debug')
159 if not os.path.exists(self.debugdir):
160 os.mkdir(self.debugdir)
mbligh3dcf2c92007-10-16 22:24:00 +0000161 self.status = os.path.join(resultdir, 'status')
mbligh18420c22007-10-16 22:27:14 +0000162 self.label = label
mblighf1c52842007-10-16 15:21:38 +0000163 self.user = user
164 self.args = args
mblighe8b37a92007-12-19 15:54:11 +0000165 self.machines = machines
mblighf1c52842007-10-16 15:21:38 +0000166 self.client = client
167 self.record_prefix = ''
mblighf4e04152008-02-21 16:05:53 +0000168 self.warning_loggers = set()
mblighf1c52842007-10-16 15:21:38 +0000169
mbligh3f4bced2007-11-05 17:55:53 +0000170 self.stdout = fd_stack.fd_stack(1, sys.stdout)
171 self.stderr = fd_stack.fd_stack(2, sys.stderr)
172
mbligh3dcf2c92007-10-16 22:24:00 +0000173 if os.path.exists(self.status):
174 os.unlink(self.status)
mblighe8b37a92007-12-19 15:54:11 +0000175 job_data = { 'label' : label, 'user' : user,
176 'hostname' : ','.join(machines) }
mbligh970b94e2008-01-24 16:29:34 +0000177 job_data.update(get_site_job_data(self))
mblighf1c52842007-10-16 15:21:38 +0000178 write_keyval(self.resultdir, job_data)
179
180
mblighe25fd5b2008-01-22 17:23:37 +0000181 def verify(self):
182 if not self.machines:
183 raise AutoservError('No machines specified to verify')
184 try:
185 namespace = {'machines' : self.machines, 'job' : self}
186 exec(preamble + verify, namespace, namespace)
187 except Exception, e:
188 msg = 'Verify failed\n' + str(e) + '\n' + format_error()
189 self.record('ABORT', None, None, msg)
190 raise
191
192
193 def repair(self):
194 if not self.machines:
195 raise AutoservError('No machines specified to repair')
196 namespace = {'machines' : self.machines, 'job' : self}
197 exec(preamble + repair, namespace, namespace)
mbligh8141f862008-01-25 17:20:40 +0000198 self.verify()
mblighe25fd5b2008-01-22 17:23:37 +0000199
200
mblighe8b37a92007-12-19 15:54:11 +0000201 def run(self, reboot = False, install_before = False,
mblighf36243d2007-10-30 15:36:16 +0000202 install_after = False, namespace = {}):
mbligh60dbd502007-10-26 14:59:31 +0000203 # use a copy so changes don't affect the original dictionary
204 namespace = namespace.copy()
mblighe8b37a92007-12-19 15:54:11 +0000205 machines = self.machines
mbligh60dbd502007-10-26 14:59:31 +0000206
mblighfaf0cd42007-11-19 16:00:24 +0000207 self.aborted = False
mblighf1c52842007-10-16 15:21:38 +0000208 namespace['machines'] = machines
209 namespace['args'] = self.args
210 namespace['job'] = self
mbligh6e294382007-11-05 18:11:29 +0000211 test_start_time = int(time.time())
mblighf1c52842007-10-16 15:21:38 +0000212
mbligh87c5d882007-10-29 17:07:24 +0000213 os.chdir(self.resultdir)
214
215 status_log = os.path.join(self.resultdir, 'status.log')
mblighf1c52842007-10-16 15:21:38 +0000216 try:
mblighf36243d2007-10-30 15:36:16 +0000217 if install_before and machines:
218 exec(preamble + install, namespace, namespace)
mblighf1c52842007-10-16 15:21:38 +0000219 if self.client:
220 namespace['control'] = self.control
221 open('control', 'w').write(self.control)
222 open('control.srv', 'w').write(client_wrapper)
223 server_control = client_wrapper
224 else:
225 open('control.srv', 'w').write(self.control)
226 server_control = self.control
mblighf1c52842007-10-16 15:21:38 +0000227 exec(preamble + server_control, namespace, namespace)
228
229 finally:
mbligh6e294382007-11-05 18:11:29 +0000230 if machines:
231 namespace['test_start_time'] = test_start_time
mbligh98ff1462007-12-19 16:27:55 +0000232 exec(preamble + crashdumps,
233 namespace, namespace)
mblighf1c52842007-10-16 15:21:38 +0000234 if reboot and machines:
mbligh98ff1462007-12-19 16:27:55 +0000235 exec(preamble + reboot_segment,
236 namespace, namespace)
mblighf36243d2007-10-30 15:36:16 +0000237 if install_after and machines:
238 exec(preamble + install, namespace, namespace)
mblighf1c52842007-10-16 15:21:38 +0000239
240
241 def run_test(self, url, *args, **dargs):
242 """Summon a test object and run it.
243
244 tag
245 tag to add to testname
246 url
247 url of the test to run
248 """
249
mblighf1c52842007-10-16 15:21:38 +0000250 (group, testname) = test.testname(url)
251 tag = None
252 subdir = testname
mbligh43ac5222007-10-16 15:55:01 +0000253
mblighf1c52842007-10-16 15:21:38 +0000254 if dargs.has_key('tag'):
255 tag = dargs['tag']
256 del dargs['tag']
257 if tag:
258 subdir += '.' + tag
mblighf1c52842007-10-16 15:21:38 +0000259
mbligh43ac5222007-10-16 15:55:01 +0000260 try:
261 test.runtest(self, url, tag, args, dargs)
262 self.record('GOOD', subdir, testname, 'completed successfully')
263 except Exception, detail:
mbligh05269362007-10-16 16:58:11 +0000264 self.record('FAIL', subdir, testname, format_error())
mblighf1c52842007-10-16 15:21:38 +0000265
266
267 def run_group(self, function, *args, **dargs):
268 """\
269 function:
270 subroutine to run
271 *args:
272 arguments for the function
273 """
274
275 result = None
276 name = function.__name__
277
278 # Allow the tag for the group to be specified.
279 if dargs.has_key('tag'):
280 tag = dargs['tag']
281 del dargs['tag']
282 if tag:
283 name = tag
284
285 # if tag:
286 # name += '.' + tag
287 old_record_prefix = self.record_prefix
288 try:
289 try:
290 self.record('START', None, name)
291 self.record_prefix += '\t'
292 result = function(*args, **dargs)
293 self.record_prefix = old_record_prefix
294 self.record('END GOOD', None, name)
295 except:
296 self.record_prefix = old_record_prefix
297 self.record('END FAIL', None, name, format_error())
298 # We don't want to raise up an error higher if it's just
299 # a TestError - we want to carry on to other tests. Hence
300 # this outer try/except block.
301 except TestError:
302 pass
303 except:
304 raise TestError(name + ' failed\n' + format_error())
305
306 return result
307
308
mblighf4e04152008-02-21 16:05:53 +0000309 def record(self, status_code, subdir, operation, status=''):
mblighf1c52842007-10-16 15:21:38 +0000310 """
311 Record job-level status
312
313 The intent is to make this file both machine parseable and
314 human readable. That involves a little more complexity, but
315 really isn't all that bad ;-)
316
317 Format is <status code>\t<subdir>\t<operation>\t<status>
318
319 status code: (GOOD|WARN|FAIL|ABORT)
320 or START
321 or END (GOOD|WARN|FAIL|ABORT)
322
323 subdir: MUST be a relevant subdirectory in the results,
324 or None, which will be represented as '----'
325
326 operation: description of what you ran (e.g. "dbench", or
327 "mkfs -t foobar /dev/sda9")
328
329 status: error message or "completed sucessfully"
330
331 ------------------------------------------------------------
332
333 Initial tabs indicate indent levels for grouping, and is
334 governed by self.record_prefix
335
336 multiline messages have secondary lines prefaced by a double
337 space (' ')
mblighf4e04152008-02-21 16:05:53 +0000338
339 Executing this method will trigger the logging of all new
340 warnings to date from the various console loggers.
341 """
mblighdab39662008-02-27 16:47:55 +0000342 # poll all our warning loggers for new warnings
343 warnings = self._read_warnings()
344 for timestamp, msg in warnings:
345 self.__record("WARN", None, None, msg, timestamp)
346
347 # write out the actual status log line
348 self.__record(status_code, subdir, operation, status)
349
350
351 def _read_warnings(self):
mblighf4e04152008-02-21 16:05:53 +0000352 warnings = []
353 while True:
354 # pull in a line of output from every logger that has
355 # output ready to be read
356 loggers, _, _ = select.select(self.warning_loggers,
357 [], [], 0)
358 closed_loggers = set()
359 for logger in loggers:
360 line = logger.readline()
361 # record any broken pipes (aka line == empty)
362 if len(line) == 0:
363 closed_loggers.add(logger)
364 continue
365 timestamp, msg = line.split('\t', 1)
366 warnings.append((int(timestamp), msg.strip()))
367
368 # stop listening to loggers that are closed
369 self.warning_loggers -= closed_loggers
370
371 # stop if none of the loggers have any output left
372 if not loggers:
373 break
374
mblighdab39662008-02-27 16:47:55 +0000375 # sort into timestamp order
376 warnings.sort()
377 return warnings
mblighf4e04152008-02-21 16:05:53 +0000378
379
mblighdab39662008-02-27 16:47:55 +0000380 def _render_record(self, status_code, subdir, operation, status='',
381 epoch_time=None, record_prefix=None):
mblighf4e04152008-02-21 16:05:53 +0000382 """
mblighdab39662008-02-27 16:47:55 +0000383 Internal Function to generate a record to be written into a
384 status log. For use by server_job.* classes only.
mblighf1c52842007-10-16 15:21:38 +0000385 """
mblighf1c52842007-10-16 15:21:38 +0000386 if subdir:
387 if re.match(r'[\n\t]', subdir):
mbligh4d6feff2008-01-14 16:48:56 +0000388 raise ValueError('Invalid character in subdir string')
mblighf1c52842007-10-16 15:21:38 +0000389 substr = subdir
390 else:
391 substr = '----'
392
393 if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \
394 status_code):
mbligh4d6feff2008-01-14 16:48:56 +0000395 raise ValueError('Invalid status code supplied: %s' % status_code)
mblighe25fd5b2008-01-22 17:23:37 +0000396 if not operation:
397 operation = '----'
mblighf1c52842007-10-16 15:21:38 +0000398 if re.match(r'[\n\t]', operation):
mbligh4d6feff2008-01-14 16:48:56 +0000399 raise ValueError('Invalid character in operation string')
mblighf1c52842007-10-16 15:21:38 +0000400 operation = operation.rstrip()
401 status = status.rstrip()
402 status = re.sub(r"\t", " ", status)
403 # Ensure any continuation lines are marked so we can
404 # detect them in the status file to ensure it is parsable.
405 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
406
mbligh30270302007-11-05 20:33:52 +0000407 # Generate timestamps for inclusion in the logs
mblighf4e04152008-02-21 16:05:53 +0000408 if epoch_time is None:
409 epoch_time = int(time.time())
mbligh30270302007-11-05 20:33:52 +0000410 local_time = time.localtime(epoch_time)
411 epoch_time_str = "timestamp=%d" % (epoch_time,)
412 local_time_str = time.strftime("localtime=%b %d %H:%M:%S",
413 local_time)
414
mblighdab39662008-02-27 16:47:55 +0000415 if record_prefix is None:
416 record_prefix = self.record_prefix
417
mbligh30270302007-11-05 20:33:52 +0000418 msg = '\t'.join(str(x) for x in (status_code, substr, operation,
419 epoch_time_str, local_time_str,
420 status))
mblighdab39662008-02-27 16:47:55 +0000421 return record_prefix + msg + '\n'
422
423
424 def _record_prerendered(self, msg):
425 """
426 Record a pre-rendered msg into the status logs. The only
427 change this makes to the message is to add on the local
428 indentation. Should not be called outside of server_job.*
429 classes. Unlike __record, this does not write the message
430 to standard output.
431 """
432 status_file = os.path.join(self.resultdir, 'status.log')
433 status_log = open(status_file, 'a')
434 for line in msg.splitlines():
435 line = self.record_prefix + line + '\n'
436 status_log.write(line)
437 status_log.close()
438
439
440
441 def __record(self, status_code, subdir, operation, status='',
442 epoch_time=None):
443 """
444 Actual function for recording a single line into the status
445 logs. Should never be called directly, only by job.record as
446 this would bypass the console monitor logging.
447 """
448
449 msg = self._render_record(status_code, subdir, operation,
450 status, epoch_time)
451
mblighf1c52842007-10-16 15:21:38 +0000452
mbligh31a49de2007-11-05 18:41:19 +0000453 status_file = os.path.join(self.resultdir, 'status.log')
mblighdab39662008-02-27 16:47:55 +0000454 sys.stdout.write(msg)
455 open(status_file, "a").write(msg)
mblighf1c52842007-10-16 15:21:38 +0000456 if subdir:
mblighd56eb592008-01-22 16:36:34 +0000457 test_dir = os.path.join(self.resultdir, subdir)
458 if not os.path.exists(test_dir):
459 os.mkdir(test_dir)
460 status_file = os.path.join(test_dir, 'status')
mblighdab39662008-02-27 16:47:55 +0000461 open(status_file, "a").write(msg)
462
463
464# a file-like object for catching stderr from an autotest client and
465# extracting status logs from it
466class client_logger(object):
467 """Partial file object to write to both stdout and
468 the status log file. We only implement those methods
469 utils.run() actually calls.
470 """
471 parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
472 extract_indent = re.compile(r"^(\t*).*$")
473
474 def __init__(self, job):
475 self.job = job
476 self.leftover = ""
477 self.last_line = ""
478 self.logs = {}
479
480
481 def _process_log_dict(self, log_dict):
482 log_list = log_dict.pop("logs", [])
483 for key in sorted(log_dict.iterkeys()):
484 log_list += self._process_log_dict(log_dict.pop(key))
485 return log_list
486
487
488 def _process_logs(self):
489 """Go through the accumulated logs in self.log and print them
490 out to stdout and the status log. Note that this processes
491 logs in an ordering where:
492
493 1) logs to different tags are never interleaved
494 2) logs to x.y come before logs to x.y.z for all z
495 3) logs to x.y come before x.z whenever y < z
496
497 Note that this will in general not be the same as the
498 chronological ordering of the logs. However, if a chronological
499 ordering is desired that one can be reconstructed from the
500 status log by looking at timestamp lines."""
501 log_list = self._process_log_dict(self.logs)
502 for line in log_list:
503 self.job._record_prerendered(line + '\n')
504 if log_list:
505 self.last_line = log_list[-1]
506
507
508 def _process_quoted_line(self, tag, line):
509 """Process a line quoted with an AUTOTEST_STATUS flag. If the
510 tag is blank then we want to push out all the data we've been
511 building up in self.logs, and then the newest line. If the
512 tag is not blank, then push the line into the logs for handling
513 later."""
514 print line
515 if tag == "":
516 self._process_logs()
517 self.job._record_prerendered(line + '\n')
518 self.last_line = line
519 else:
520 tag_parts = [int(x) for x in tag.split(".")]
521 log_dict = self.logs
522 for part in tag_parts:
523 log_dict = log_dict.setdefault(part, {})
524 log_list = log_dict.setdefault("logs", [])
525 log_list.append(line)
526
527
528 def _process_line(self, line):
529 """Write out a line of data to the appropriate stream. Status
530 lines sent by autotest will be prepended with
531 "AUTOTEST_STATUS", and all other lines are ssh error
532 messages."""
533 match = self.parser.search(line)
534 if match:
535 tag, line = match.groups()
536 self._process_quoted_line(tag, line)
537 else:
538 print >> sys.stderr, line
539
540
541 def _format_warnings(self, last_line, warnings):
542 indent = self.extract_indent.match(last_line).group(1)
543 return [self.job._render_record("WARN", None, None, msg,
544 timestamp, indent).rstrip('\n')
545 for timestamp, msg in warnings]
546
547
548 def _process_warnings(self, last_line, log_dict, warnings):
549 if log_dict.keys() in ([], ["logs"]):
550 # there are no sub-jobs, just append the warnings here
551 warnings = self._format_warnings(last_line, warnings)
552 log_list = log_dict.setdefault("logs", [])
553 log_list += warnings
554 for warning in warnings:
555 sys.stdout.write(warning + '\n')
556 else:
557 # there are sub-jobs, so put the warnings in there
558 log_list = log_dict.get("logs", [])
559 if log_list:
560 last_line = log_list[-1]
561 for key in sorted(log_dict.iterkeys()):
562 if key != "logs":
563 self._process_warnings(last_line,
564 log_dict[key],
565 warnings)
566
567
568 def write(self, data):
569 # first check for any new console warnings
570 warnings = self.job._read_warnings()
571 self._process_warnings(self.last_line, self.logs, warnings)
572 # now process the newest data written out
573 data = self.leftover + data
574 lines = data.split("\n")
575 # process every line but the last one
576 for line in lines[:-1]:
577 self._process_line(line)
578 # save the last line for later processing
579 # since we may not have the whole line yet
580 self.leftover = lines[-1]
581
582
583 def flush(self):
584 sys.stdout.flush()
585 sys.stderr.flush()
586
587
588 def close(self):
589 if self.leftover:
590 self._process_line(self.leftover)
591 self._process_logs()
592 self.flush()