blob: 497eaa550d493663c9b59c43b647519e9a9701fc [file] [log] [blame]
mbligh57e78662008-06-17 19:53:49 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
9__author__ = """
10Martin J. Bligh <mbligh@google.com>
11Andy Whitcroft <apw@shadowen.org>
12"""
13
jadmanski10646442008-08-13 14:05:21 +000014import os, sys, re, time, select, subprocess, traceback
15
16from autotest_lib.client.bin import fd_stack
17from autotest_lib.client.common_lib import error, logging
18from autotest_lib.server import test, subcommand
19from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
20from autotest_lib.client.common_lib import utils, packages
21
22
23# load up a control segment
24# these are all stored in <server_dir>/control_segments
25def load_control_segment(name):
26 server_dir = os.path.dirname(os.path.abspath(__file__))
27 script_file = os.path.join(server_dir, "control_segments", name)
28 if os.path.exists(script_file):
29 return file(script_file).read()
30 else:
31 return ""
32
33
34preamble = """\
35import os, sys
36
37from autotest_lib.server import hosts, autotest, kvm, git, standalone_profiler
38from autotest_lib.server import source_kernel, rpm_kernel, deb_kernel
39from autotest_lib.server import git_kernel
40from autotest_lib.server.subcommand import *
41from autotest_lib.server.utils import run, get_tmp_dir, sh_escape
42from autotest_lib.server.utils import parse_machine
43from autotest_lib.client.common_lib.error import *
44from autotest_lib.client.common_lib import barrier
45
46autotest.Autotest.job = job
jadmanski1c5e3a12008-08-15 23:08:20 +000047hosts.Host.job = job
jadmanski10646442008-08-13 14:05:21 +000048barrier = barrier.barrier
49if len(machines) > 1:
50 open('.machines', 'w').write('\\n'.join(machines) + '\\n')
51"""
52
53client_wrapper = """
54at = autotest.Autotest()
55
56def run_client(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000057 hostname, user, passwd, port = parse_machine(
58 machine, ssh_user, ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000059
jadmanski1c5e3a12008-08-15 23:08:20 +000060 host = hosts.create_host(hostname, user=user, port=port, password=passwd)
jadmanski807490c2008-09-15 19:15:02 +000061 host.log_kernel()
jadmanski1c5e3a12008-08-15 23:08:20 +000062 at.run(control, host=host)
jadmanski10646442008-08-13 14:05:21 +000063
64job.parallel_simple(run_client, machines)
65"""
66
67crashdumps = """
68def crashdumps(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000069 hostname, user, passwd, port = parse_machine(machine, ssh_user,
70 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000071
jadmanski1c5e3a12008-08-15 23:08:20 +000072 host = hosts.create_host(hostname, user=user, port=port,
73 initialize=False, password=passwd)
74 host.get_crashdumps(test_start_time)
jadmanski10646442008-08-13 14:05:21 +000075
76job.parallel_simple(crashdumps, machines, log=False)
77"""
78
79reboot_segment="""\
80def reboot(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000081 hostname, user, passwd, port = parse_machine(machine, ssh_user,
82 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000083
jadmanski8e72aaf2008-08-20 19:22:29 +000084 host = hosts.create_host(hostname, user=user, port=port,
jadmanski1c5e3a12008-08-15 23:08:20 +000085 initialize=False, password=passwd)
86 host.reboot()
jadmanski10646442008-08-13 14:05:21 +000087
88job.parallel_simple(reboot, machines, log=False)
89"""
90
91install="""\
92def install(machine):
jadmanski1c5e3a12008-08-15 23:08:20 +000093 hostname, user, passwd, port = parse_machine(machine, ssh_user,
94 ssh_port, ssh_pass)
jadmanski10646442008-08-13 14:05:21 +000095
jadmanski8e72aaf2008-08-20 19:22:29 +000096 host = hosts.create_host(hostname, user=user, port=port,
jadmanski1c5e3a12008-08-15 23:08:20 +000097 initialize=False, password=passwd)
98 host.machine_install()
jadmanski10646442008-08-13 14:05:21 +000099
100job.parallel_simple(install, machines, log=False)
101"""
102
103# load up the verifier control segment, with an optional site-specific hook
104verify = load_control_segment("site_verify")
105verify += load_control_segment("verify")
106
107# load up the repair control segment, with an optional site-specific hook
108repair = load_control_segment("site_repair")
109repair += load_control_segment("repair")
110
111
112# load up site-specific code for generating site-specific job data
113try:
114 import site_job
115 get_site_job_data = site_job.get_site_job_data
116 del site_job
117except ImportError:
118 # by default provide a stub that generates no site data
119 def get_site_job_data(job):
120 return {}
121
122
123class base_server_job(object):
124 """The actual job against which we do everything.
125
126 Properties:
127 autodir
128 The top level autotest directory (/usr/local/autotest).
129 serverdir
130 <autodir>/server/
131 clientdir
132 <autodir>/client/
133 conmuxdir
134 <autodir>/conmux/
135 testdir
136 <autodir>/server/tests/
137 site_testdir
138 <autodir>/server/site_tests/
139 control
140 the control file for this job
141 """
142
143 STATUS_VERSION = 1
144
145
146 def __init__(self, control, args, resultdir, label, user, machines,
147 client=False, parse_job='',
148 ssh_user='root', ssh_port=22, ssh_pass=''):
149 """
150 control
151 The control file (pathname of)
152 args
153 args to pass to the control file
154 resultdir
155 where to throw the results
156 label
157 label for the job
158 user
159 Username for the job (email address)
160 client
161 True if a client-side control file
162 """
163 path = os.path.dirname(__file__)
164 self.autodir = os.path.abspath(os.path.join(path, '..'))
165 self.serverdir = os.path.join(self.autodir, 'server')
166 self.testdir = os.path.join(self.serverdir, 'tests')
167 self.site_testdir = os.path.join(self.serverdir, 'site_tests')
168 self.tmpdir = os.path.join(self.serverdir, 'tmp')
169 self.conmuxdir = os.path.join(self.autodir, 'conmux')
170 self.clientdir = os.path.join(self.autodir, 'client')
171 self.toolsdir = os.path.join(self.autodir, 'client/tools')
172 if control:
173 self.control = open(control, 'r').read()
174 self.control = re.sub('\r', '', self.control)
175 else:
176 self.control = None
177 self.resultdir = resultdir
178 if not os.path.exists(resultdir):
179 os.mkdir(resultdir)
180 self.debugdir = os.path.join(resultdir, 'debug')
181 if not os.path.exists(self.debugdir):
182 os.mkdir(self.debugdir)
183 self.status = os.path.join(resultdir, 'status')
184 self.label = label
185 self.user = user
186 self.args = args
187 self.machines = machines
188 self.client = client
189 self.record_prefix = ''
190 self.warning_loggers = set()
191 self.ssh_user = ssh_user
192 self.ssh_port = ssh_port
193 self.ssh_pass = ssh_pass
jadmanski23afbec2008-09-17 18:12:07 +0000194 self.run_test_cleanup = True
jadmanski10646442008-08-13 14:05:21 +0000195
196 self.stdout = fd_stack.fd_stack(1, sys.stdout)
197 self.stderr = fd_stack.fd_stack(2, sys.stderr)
198
199 if os.path.exists(self.status):
200 os.unlink(self.status)
201 job_data = {'label' : label, 'user' : user,
202 'hostname' : ','.join(machines),
203 'status_version' : str(self.STATUS_VERSION)}
204 job_data.update(get_site_job_data(self))
205 utils.write_keyval(self.resultdir, job_data)
206
207 self.parse_job = parse_job
208 if self.parse_job and len(machines) == 1:
209 self.using_parser = True
210 self.init_parser(resultdir)
211 else:
212 self.using_parser = False
213 self.pkgmgr = packages.PackageManager(
214 self.autodir, run_function_dargs={'timeout':600})
215 self.pkgdir = os.path.join(self.autodir, 'packages')
216
217
218 def init_parser(self, resultdir):
219 """Start the continuous parsing of resultdir. This sets up
220 the database connection and inserts the basic job object into
221 the database if necessary."""
222 # redirect parser debugging to .parse.log
223 parse_log = os.path.join(resultdir, '.parse.log')
224 parse_log = open(parse_log, 'w', 0)
225 tko_utils.redirect_parser_debugging(parse_log)
226 # create a job model object and set up the db
227 self.results_db = tko_db.db(autocommit=True)
228 self.parser = status_lib.parser(self.STATUS_VERSION)
229 self.job_model = self.parser.make_job(resultdir)
230 self.parser.start(self.job_model)
231 # check if a job already exists in the db and insert it if
232 # it does not
233 job_idx = self.results_db.find_job(self.parse_job)
234 if job_idx is None:
235 self.results_db.insert_job(self.parse_job,
236 self.job_model)
237 else:
238 machine_idx = self.results_db.lookup_machine(
239 self.job_model.machine)
240 self.job_model.index = job_idx
241 self.job_model.machine_idx = machine_idx
242
243
244 def cleanup_parser(self):
245 """This should be called after the server job is finished
246 to carry out any remaining cleanup (e.g. flushing any
247 remaining test results to the results db)"""
248 if not self.using_parser:
249 return
250 final_tests = self.parser.end()
251 for test in final_tests:
252 self.__insert_test(test)
253 self.using_parser = False
254
255
256 def verify(self):
257 if not self.machines:
258 raise error.AutoservError(
259 'No machines specified to verify')
260 try:
261 namespace = {'machines' : self.machines, 'job' : self, \
262 'ssh_user' : self.ssh_user, \
263 'ssh_port' : self.ssh_port, \
264 'ssh_pass' : self.ssh_pass}
265 self._execute_code(preamble + verify, namespace, namespace)
266 except Exception, e:
267 msg = ('Verify failed\n' + str(e) + '\n'
268 + traceback.format_exc())
269 self.record('ABORT', None, None, msg)
270 raise
271
272
273 def repair(self, host_protection):
274 if not self.machines:
275 raise error.AutoservError('No machines specified to repair')
276 namespace = {'machines': self.machines, 'job': self,
277 'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port,
278 'ssh_pass': self.ssh_pass,
279 'protection_level': host_protection}
280 # no matter what happens during repair, go on to try to reverify
281 try:
282 self._execute_code(preamble + repair, namespace, namespace)
283 except Exception, exc:
284 print 'Exception occured during repair'
285 traceback.print_exc()
286 self.verify()
287
288
289 def precheck(self):
290 """
291 perform any additional checks in derived classes.
292 """
293 pass
294
295
296 def enable_external_logging(self):
297 """Start or restart external logging mechanism.
298 """
299 pass
300
301
302 def disable_external_logging(self):
303 """ Pause or stop external logging mechanism.
304 """
305 pass
306
307
jadmanski23afbec2008-09-17 18:12:07 +0000308 def enable_test_cleanup(self):
309 """ By default tests run test.cleanup """
310 self.run_test_cleanup = True
311
312
313 def disable_test_cleanup(self):
314 """ By default tests do not run test.cleanup """
315 self.run_test_cleanup = False
316
317
jadmanski10646442008-08-13 14:05:21 +0000318 def use_external_logging(self):
319 """Return True if external logging should be used.
320 """
321 return False
322
323
324 def parallel_simple(self, function, machines, log=True, timeout=None):
325 """Run 'function' using parallel_simple, with an extra
326 wrapper to handle the necessary setup for continuous parsing,
327 if possible. If continuous parsing is already properly
328 initialized then this should just work."""
329 is_forking = not (len(machines) == 1 and
330 self.machines == machines)
jadmanski4dd1a002008-09-05 20:27:30 +0000331 if self.parse_job and is_forking and log:
jadmanski10646442008-08-13 14:05:21 +0000332 def wrapper(machine):
333 self.parse_job += "/" + machine
334 self.using_parser = True
335 self.machines = [machine]
336 self.resultdir = os.path.join(self.resultdir,
337 machine)
jadmanski609a5f42008-08-26 20:52:42 +0000338 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000339 self.init_parser(self.resultdir)
340 result = function(machine)
341 self.cleanup_parser()
342 return result
jadmanski4dd1a002008-09-05 20:27:30 +0000343 elif len(machines) > 1 and log:
jadmanski10646442008-08-13 14:05:21 +0000344 def wrapper(machine):
345 self.resultdir = os.path.join(self.resultdir, machine)
jadmanski609a5f42008-08-26 20:52:42 +0000346 os.chdir(self.resultdir)
jadmanski10646442008-08-13 14:05:21 +0000347 result = function(machine)
348 return result
349 else:
350 wrapper = function
351 subcommand.parallel_simple(wrapper, machines, log, timeout)
352
353
354 def run(self, reboot = False, install_before = False,
355 install_after = False, collect_crashdumps = True,
356 namespace = {}):
357 # use a copy so changes don't affect the original dictionary
358 namespace = namespace.copy()
359 machines = self.machines
360
361 self.aborted = False
362 namespace['machines'] = machines
363 namespace['args'] = self.args
364 namespace['job'] = self
365 namespace['ssh_user'] = self.ssh_user
366 namespace['ssh_port'] = self.ssh_port
367 namespace['ssh_pass'] = self.ssh_pass
368 test_start_time = int(time.time())
369
370 os.chdir(self.resultdir)
371
372 self.enable_external_logging()
373 status_log = os.path.join(self.resultdir, 'status.log')
374 try:
375 if install_before and machines:
376 self._execute_code(preamble + install, namespace, namespace)
377 if self.client:
378 namespace['control'] = self.control
379 open('control', 'w').write(self.control)
380 open('control.srv', 'w').write(client_wrapper)
381 server_control = client_wrapper
382 else:
383 open('control.srv', 'w').write(self.control)
384 server_control = self.control
385 self._execute_code(preamble + server_control, namespace,
386 namespace)
387
388 finally:
389 if machines and collect_crashdumps:
390 namespace['test_start_time'] = test_start_time
391 self._execute_code(preamble + crashdumps, namespace,
392 namespace)
393 self.disable_external_logging()
394 if reboot and machines:
395 self._execute_code(preamble + reboot_segment,namespace,
396 namespace)
397 if install_after and machines:
398 self._execute_code(preamble + install, namespace, namespace)
399
400
401 def run_test(self, url, *args, **dargs):
402 """Summon a test object and run it.
403
404 tag
405 tag to add to testname
406 url
407 url of the test to run
408 """
409
410 (group, testname) = self.pkgmgr.get_package_name(url, 'test')
jadmanski10646442008-08-13 14:05:21 +0000411
412 tag = dargs.pop('tag', None)
413 if tag:
jadmanskide292df2008-08-26 20:51:14 +0000414 testname += '.' + tag
415 subdir = testname
jadmanski10646442008-08-13 14:05:21 +0000416
417 outputdir = os.path.join(self.resultdir, subdir)
418 if os.path.exists(outputdir):
419 msg = ("%s already exists, test <%s> may have"
420 " already run with tag <%s>"
421 % (outputdir, testname, tag) )
422 raise error.TestError(msg)
423 os.mkdir(outputdir)
424
425 def group_func():
426 try:
427 test.runtest(self, url, tag, args, dargs)
428 except error.TestBaseException, e:
429 self.record(e.exit_status, subdir, testname, str(e))
430 raise
431 except Exception, e:
432 info = str(e) + "\n" + traceback.format_exc()
433 self.record('FAIL', subdir, testname, info)
434 raise
435 else:
436 self.record('GOOD', subdir, testname,
437 'completed successfully')
jadmanskide292df2008-08-26 20:51:14 +0000438
439 result, exc_info = self._run_group(testname, subdir, group_func)
440 if exc_info and isinstance(exc_info[1], error.TestBaseException):
441 return False
442 elif exc_info:
443 raise exc_info[0], exc_info[1], exc_info[2]
444 else:
445 return True
jadmanski10646442008-08-13 14:05:21 +0000446
447
448 def _run_group(self, name, subdir, function, *args, **dargs):
449 """\
450 Underlying method for running something inside of a group.
451 """
jadmanskide292df2008-08-26 20:51:14 +0000452 result, exc_info = None, None
jadmanski10646442008-08-13 14:05:21 +0000453 old_record_prefix = self.record_prefix
454 try:
455 self.record('START', subdir, name)
456 self.record_prefix += '\t'
457 try:
458 result = function(*args, **dargs)
459 finally:
460 self.record_prefix = old_record_prefix
461 except error.TestBaseException, e:
462 self.record("END %s" % e.exit_status, subdir, name, str(e))
jadmanskide292df2008-08-26 20:51:14 +0000463 exc_info = sys.exc_info()
jadmanski10646442008-08-13 14:05:21 +0000464 except Exception, e:
465 err_msg = str(e) + '\n'
466 err_msg += traceback.format_exc()
467 self.record('END ABORT', subdir, name, err_msg)
468 raise error.JobError(name + ' failed\n' + traceback.format_exc())
469 else:
470 self.record('END GOOD', subdir, name)
471
jadmanskide292df2008-08-26 20:51:14 +0000472 return result, exc_info
jadmanski10646442008-08-13 14:05:21 +0000473
474
475 def run_group(self, function, *args, **dargs):
476 """\
477 function:
478 subroutine to run
479 *args:
480 arguments for the function
481 """
482
483 name = function.__name__
484
485 # Allow the tag for the group to be specified.
486 tag = dargs.pop('tag', None)
487 if tag:
488 name = tag
489
jadmanskide292df2008-08-26 20:51:14 +0000490 return self._run_group(name, None, function, *args, **dargs)[0]
jadmanski10646442008-08-13 14:05:21 +0000491
492
493 def run_reboot(self, reboot_func, get_kernel_func):
494 """\
495 A specialization of run_group meant specifically for handling
496 a reboot. Includes support for capturing the kernel version
497 after the reboot.
498
499 reboot_func: a function that carries out the reboot
500
501 get_kernel_func: a function that returns a string
502 representing the kernel version.
503 """
504
505 old_record_prefix = self.record_prefix
506 try:
507 self.record('START', None, 'reboot')
508 self.record_prefix += '\t'
509 reboot_func()
510 except Exception, e:
511 self.record_prefix = old_record_prefix
512 err_msg = str(e) + '\n' + traceback.format_exc()
513 self.record('END FAIL', None, 'reboot', err_msg)
514 else:
515 kernel = get_kernel_func()
516 self.record_prefix = old_record_prefix
517 self.record('END GOOD', None, 'reboot',
518 optional_fields={"kernel": kernel})
519
520
521 def record(self, status_code, subdir, operation, status='',
522 optional_fields=None):
523 """
524 Record job-level status
525
526 The intent is to make this file both machine parseable and
527 human readable. That involves a little more complexity, but
528 really isn't all that bad ;-)
529
530 Format is <status code>\t<subdir>\t<operation>\t<status>
531
532 status code: see common_lib.logging.is_valid_status()
533 for valid status definition
534
535 subdir: MUST be a relevant subdirectory in the results,
536 or None, which will be represented as '----'
537
538 operation: description of what you ran (e.g. "dbench", or
539 "mkfs -t foobar /dev/sda9")
540
541 status: error message or "completed sucessfully"
542
543 ------------------------------------------------------------
544
545 Initial tabs indicate indent levels for grouping, and is
546 governed by self.record_prefix
547
548 multiline messages have secondary lines prefaced by a double
549 space (' ')
550
551 Executing this method will trigger the logging of all new
552 warnings to date from the various console loggers.
553 """
554 # poll all our warning loggers for new warnings
555 warnings = self._read_warnings()
556 for timestamp, msg in warnings:
557 self._record("WARN", None, None, msg, timestamp)
558
559 # write out the actual status log line
560 self._record(status_code, subdir, operation, status,
561 optional_fields=optional_fields)
562
563
564 def _read_warnings(self):
565 warnings = []
566 while True:
567 # pull in a line of output from every logger that has
568 # output ready to be read
569 loggers, _, _ = select.select(self.warning_loggers,
570 [], [], 0)
571 closed_loggers = set()
572 for logger in loggers:
573 line = logger.readline()
574 # record any broken pipes (aka line == empty)
575 if len(line) == 0:
576 closed_loggers.add(logger)
577 continue
578 timestamp, msg = line.split('\t', 1)
579 warnings.append((int(timestamp), msg.strip()))
580
581 # stop listening to loggers that are closed
582 self.warning_loggers -= closed_loggers
583
584 # stop if none of the loggers have any output left
585 if not loggers:
586 break
587
588 # sort into timestamp order
589 warnings.sort()
590 return warnings
591
592
593 def _render_record(self, status_code, subdir, operation, status='',
594 epoch_time=None, record_prefix=None,
595 optional_fields=None):
596 """
597 Internal Function to generate a record to be written into a
598 status log. For use by server_job.* classes only.
599 """
600 if subdir:
601 if re.match(r'[\n\t]', subdir):
602 raise ValueError(
603 'Invalid character in subdir string')
604 substr = subdir
605 else:
606 substr = '----'
607
608 if not logging.is_valid_status(status_code):
609 raise ValueError('Invalid status code supplied: %s' %
610 status_code)
611 if not operation:
612 operation = '----'
613 if re.match(r'[\n\t]', operation):
614 raise ValueError(
615 'Invalid character in operation string')
616 operation = operation.rstrip()
617 status = status.rstrip()
618 status = re.sub(r"\t", " ", status)
619 # Ensure any continuation lines are marked so we can
620 # detect them in the status file to ensure it is parsable.
621 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
622
623 if not optional_fields:
624 optional_fields = {}
625
626 # Generate timestamps for inclusion in the logs
627 if epoch_time is None:
628 epoch_time = int(time.time())
629 local_time = time.localtime(epoch_time)
630 optional_fields["timestamp"] = str(epoch_time)
631 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
632 local_time)
633
634 fields = [status_code, substr, operation]
635 fields += ["%s=%s" % x for x in optional_fields.iteritems()]
636 fields.append(status)
637
638 if record_prefix is None:
639 record_prefix = self.record_prefix
640
641 msg = '\t'.join(str(x) for x in fields)
642
643 return record_prefix + msg + '\n'
644
645
646 def _record_prerendered(self, msg):
647 """
648 Record a pre-rendered msg into the status logs. The only
649 change this makes to the message is to add on the local
650 indentation. Should not be called outside of server_job.*
651 classes. Unlike _record, this does not write the message
652 to standard output.
653 """
654 lines = []
655 status_file = os.path.join(self.resultdir, 'status.log')
656 status_log = open(status_file, 'a')
657 for line in msg.splitlines():
658 line = self.record_prefix + line + '\n'
659 lines.append(line)
660 status_log.write(line)
661 status_log.close()
662 self.__parse_status(lines)
663
664
665 def _execute_code(self, code, global_scope, local_scope):
666 exec(code, global_scope, local_scope)
667
668
669 def _record(self, status_code, subdir, operation, status='',
670 epoch_time=None, optional_fields=None):
671 """
672 Actual function for recording a single line into the status
673 logs. Should never be called directly, only by job.record as
674 this would bypass the console monitor logging.
675 """
676
677 msg = self._render_record(status_code, subdir, operation,
678 status, epoch_time,
679 optional_fields=optional_fields)
680
681
682 status_file = os.path.join(self.resultdir, 'status.log')
683 sys.stdout.write(msg)
684 open(status_file, "a").write(msg)
685 if subdir:
686 test_dir = os.path.join(self.resultdir, subdir)
jadmanski5ff55352008-09-18 19:43:46 +0000687 status_file = os.path.join(test_dir, 'status.log')
jadmanski10646442008-08-13 14:05:21 +0000688 open(status_file, "a").write(msg)
689 self.__parse_status(msg.splitlines())
690
691
692 def __parse_status(self, new_lines):
693 if not self.using_parser:
694 return
695 new_tests = self.parser.process_lines(new_lines)
696 for test in new_tests:
697 self.__insert_test(test)
698
699
700 def __insert_test(self, test):
701 """ An internal method to insert a new test result into the
702 database. This method will not raise an exception, even if an
703 error occurs during the insert, to avoid failing a test
704 simply because of unexpected database issues."""
705 try:
706 self.results_db.insert_test(self.job_model, test)
707 except Exception:
708 msg = ("WARNING: An unexpected error occured while "
709 "inserting test results into the database. "
710 "Ignoring error.\n" + traceback.format_exc())
711 print >> sys.stderr, msg
712
mblighcaa62c22008-04-07 21:51:17 +0000713
jadmanskib6eb2f12008-09-12 16:39:36 +0000714
jadmanskia1f3c202008-09-15 19:17:16 +0000715class log_collector(object):
716 def __init__(self, host, client_tag, results_dir):
717 self.host = host
718 if not client_tag:
719 client_tag = "default"
720 self.client_results_dir = os.path.join(host.get_autodir(), "results",
721 client_tag)
722 self.server_results_dir = results_dir
723
724
jadmanskib6eb2f12008-09-12 16:39:36 +0000725 def collect_client_job_results(self):
726 """ A method that collects all the current results of a running
727 client job into the results dir. By default does nothing as no
728 client job is running, but when running a client job you can override
729 this with something that will actually do something. """
jadmanskia1f3c202008-09-15 19:17:16 +0000730
731 # make an effort to wait for the machine to come up
732 try:
733 self.host.wait_up(timeout=30)
734 except error.AutoservError:
735 # don't worry about any errors, we'll try and
736 # get the results anyway
737 pass
738
739
740 # Copy all dirs in default to results_dir
741 keyval_path = self._prepare_for_copying_logs()
742 self.host.get_file(self.client_results_dir + '/',
743 self.server_results_dir)
744 self._process_copied_logs(keyval_path)
745 self._postprocess_copied_logs()
746
747
748 def _prepare_for_copying_logs(self):
749 server_keyval = os.path.join(self.server_results_dir, 'keyval')
750 if not os.path.exists(server_keyval):
751 # Client-side keyval file can be copied directly
752 return
753
754 # Copy client-side keyval to temporary location
755 suffix = '.keyval_%s' % self.host.hostname
756 fd, keyval_path = tempfile.mkstemp(suffix)
757 os.close(fd)
758 try:
759 client_keyval = os.path.join(self.client_results_dir, 'keyval')
760 try:
761 self.host.get_file(client_keyval, keyval_path)
762 finally:
763 # We will squirrel away the client side keyval
764 # away and move it back when we are done
765 remote_temp_dir = self.host.get_tmp_dir()
766 self.temp_keyval_path = os.path.join(remote_temp_dir, "keyval")
767 self.host.run('mv %s %s' % (client_keyval,
768 self.temp_keyval_path))
769 except (error.AutoservRunError, error.AutoservSSHTimeout):
770 print "Prepare for copying logs failed"
771 return keyval_path
772
773
774 def _process_copied_logs(self, keyval_path):
775 if not keyval_path:
776 # Client-side keyval file was copied directly
777 return
778
779 # Append contents of keyval_<host> file to keyval file
780 try:
781 # Read in new and old keyval files
782 new_keyval = utils.read_keyval(keyval_path)
783 old_keyval = utils.read_keyval(self.server_results_dir)
784 # 'Delete' from new keyval entries that are in both
785 tmp_keyval = {}
786 for key, val in new_keyval.iteritems():
787 if key not in old_keyval:
788 tmp_keyval[key] = val
789 # Append new info to keyval file
790 utils.write_keyval(self.server_results_dir, tmp_keyval)
791 # Delete keyval_<host> file
792 os.remove(keyval_path)
793 except IOError:
794 print "Process copied logs failed"
795
796
797 def _postprocess_copied_logs(self):
798 # we can now put our keyval file back
799 client_keyval = os.path.join(self.client_results_dir, 'keyval')
800 try:
801 self.host.run('mv %s %s' % (self.temp_keyval_path, client_keyval))
802 except Exception:
803 pass
jadmanskib6eb2f12008-09-12 16:39:36 +0000804
805
mbligh57e78662008-06-17 19:53:49 +0000806# a file-like object for catching stderr from an autotest client and
807# extracting status logs from it
808class client_logger(object):
809 """Partial file object to write to both stdout and
810 the status log file. We only implement those methods
811 utils.run() actually calls.
812 """
jadmanskia1f3c202008-09-15 19:17:16 +0000813 status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
814 test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$")
mbligh57e78662008-06-17 19:53:49 +0000815 extract_indent = re.compile(r"^(\t*).*$")
816
jadmanskia1f3c202008-09-15 19:17:16 +0000817 def __init__(self, host, tag, server_results_dir):
jadmanskib6eb2f12008-09-12 16:39:36 +0000818 self.host = host
819 self.job = host.job
jadmanskia1f3c202008-09-15 19:17:16 +0000820 self.log_collector = log_collector(host, tag, server_results_dir)
mbligh57e78662008-06-17 19:53:49 +0000821 self.leftover = ""
822 self.last_line = ""
823 self.logs = {}
824
825
826 def _process_log_dict(self, log_dict):
827 log_list = log_dict.pop("logs", [])
828 for key in sorted(log_dict.iterkeys()):
829 log_list += self._process_log_dict(log_dict.pop(key))
830 return log_list
831
832
833 def _process_logs(self):
834 """Go through the accumulated logs in self.log and print them
835 out to stdout and the status log. Note that this processes
836 logs in an ordering where:
837
838 1) logs to different tags are never interleaved
839 2) logs to x.y come before logs to x.y.z for all z
840 3) logs to x.y come before x.z whenever y < z
841
842 Note that this will in general not be the same as the
843 chronological ordering of the logs. However, if a chronological
844 ordering is desired that one can be reconstructed from the
845 status log by looking at timestamp lines."""
846 log_list = self._process_log_dict(self.logs)
847 for line in log_list:
848 self.job._record_prerendered(line + '\n')
849 if log_list:
850 self.last_line = log_list[-1]
851
852
853 def _process_quoted_line(self, tag, line):
854 """Process a line quoted with an AUTOTEST_STATUS flag. If the
855 tag is blank then we want to push out all the data we've been
856 building up in self.logs, and then the newest line. If the
857 tag is not blank, then push the line into the logs for handling
858 later."""
859 print line
860 if tag == "":
861 self._process_logs()
862 self.job._record_prerendered(line + '\n')
863 self.last_line = line
864 else:
865 tag_parts = [int(x) for x in tag.split(".")]
866 log_dict = self.logs
867 for part in tag_parts:
868 log_dict = log_dict.setdefault(part, {})
869 log_list = log_dict.setdefault("logs", [])
870 log_list.append(line)
871
872
873 def _process_line(self, line):
874 """Write out a line of data to the appropriate stream. Status
875 lines sent by autotest will be prepended with
876 "AUTOTEST_STATUS", and all other lines are ssh error
877 messages."""
jadmanskia1f3c202008-09-15 19:17:16 +0000878 status_match = self.status_parser.search(line)
879 test_complete_match = self.test_complete_parser.search(line)
880 if status_match:
881 tag, line = status_match.groups()
mbligh57e78662008-06-17 19:53:49 +0000882 self._process_quoted_line(tag, line)
jadmanskia1f3c202008-09-15 19:17:16 +0000883 elif test_complete_match:
884 fifo_path, = test_complete_match.groups()
885 self.log_collector.collect_client_job_results()
886 self.host.run("echo A > %s" % fifo_path)
mbligh57e78662008-06-17 19:53:49 +0000887 else:
888 print line
889
jadmanski4aeefe12008-06-20 20:04:25 +0000890
891 def _format_warnings(self, last_line, warnings):
mbligh57e78662008-06-17 19:53:49 +0000892 # use the indentation of whatever the last log line was
893 indent = self.extract_indent.match(last_line).group(1)
894 # if the last line starts a new group, add an extra indent
895 if last_line.lstrip('\t').startswith("START\t"):
896 indent += '\t'
897 return [self.job._render_record("WARN", None, None, msg,
898 timestamp, indent).rstrip('\n')
899 for timestamp, msg in warnings]
900
901
902 def _process_warnings(self, last_line, log_dict, warnings):
903 if log_dict.keys() in ([], ["logs"]):
904 # there are no sub-jobs, just append the warnings here
905 warnings = self._format_warnings(last_line, warnings)
906 log_list = log_dict.setdefault("logs", [])
907 log_list += warnings
908 for warning in warnings:
909 sys.stdout.write(warning + '\n')
910 else:
911 # there are sub-jobs, so put the warnings in there
912 log_list = log_dict.get("logs", [])
913 if log_list:
914 last_line = log_list[-1]
915 for key in sorted(log_dict.iterkeys()):
916 if key != "logs":
917 self._process_warnings(last_line,
918 log_dict[key],
919 warnings)
920
921
922 def write(self, data):
923 # first check for any new console warnings
924 warnings = self.job._read_warnings()
925 self._process_warnings(self.last_line, self.logs, warnings)
926 # now process the newest data written out
927 data = self.leftover + data
928 lines = data.split("\n")
929 # process every line but the last one
930 for line in lines[:-1]:
931 self._process_line(line)
932 # save the last line for later processing
933 # since we may not have the whole line yet
934 self.leftover = lines[-1]
935
936
937 def flush(self):
938 sys.stdout.flush()
939
940
941 def close(self):
942 if self.leftover:
943 self._process_line(self.leftover)
944 self._process_logs()
945 self.flush()
946
947
mblighcaa62c22008-04-07 21:51:17 +0000948# site_server_job.py may be non-existant or empty, make sure that an
949# appropriate site_server_job class is created nevertheless
950try:
jadmanski0afbb632008-06-06 21:10:57 +0000951 from autotest_lib.server.site_server_job import site_server_job
mblighcaa62c22008-04-07 21:51:17 +0000952except ImportError:
jadmanski10646442008-08-13 14:05:21 +0000953 class site_server_job(object):
jadmanski0afbb632008-06-06 21:10:57 +0000954 pass
955
jadmanski10646442008-08-13 14:05:21 +0000956class server_job(site_server_job, base_server_job):
jadmanski0afbb632008-06-06 21:10:57 +0000957 pass