blob: 982d691724eb1433e2f9bfe845c70583edd7f901 [file] [log] [blame]
mblighf1c52842007-10-16 15:21:38 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
9__author__ = """
10Martin J. Bligh <mbligh@google.com>
11Andy Whitcroft <apw@shadowen.org>
12"""
13
mblighdbdac6c2008-03-05 15:49:58 +000014import os, sys, re, time, select, subprocess, traceback
mblighf1c52842007-10-16 15:21:38 +000015
mbligh6437ff52008-04-17 15:24:38 +000016from autotest_lib.client.bin import fd_stack
mbligh302482e2008-05-01 20:06:16 +000017from autotest_lib.client.common_lib import error, logging
mbligh6437ff52008-04-17 15:24:38 +000018from autotest_lib.server import test, subcommand
19from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
20from autotest_lib.server.utils import *
21
mbligh3f4bced2007-11-05 17:55:53 +000022
mblighed5a4102007-11-20 00:46:41 +000023# load up a control segment
24# these are all stored in <server_dir>/control_segments
25def load_control_segment(name):
26 server_dir = os.path.dirname(os.path.abspath(__file__))
mbligh7f86e0b2007-11-24 19:45:07 +000027 script_file = os.path.join(server_dir, "control_segments", name)
mblighed5a4102007-11-20 00:46:41 +000028 if os.path.exists(script_file):
29 return file(script_file).read()
30 else:
31 return ""
32
33
mblighf1c52842007-10-16 15:21:38 +000034preamble = """\
35import os, sys
36
mblighccb9e182008-04-17 15:42:10 +000037from autotest_lib.server import hosts, autotest, kvm, git, standalone_profiler
38from autotest_lib.server import source_kernel, rpm_kernel, deb_kernel
39from autotest_lib.server import git_kernel
40from autotest_lib.server.subcommand import *
41from autotest_lib.server.utils import run, get_tmp_dir, sh_escape
mbligh1965dfa2008-06-04 19:58:37 +000042from autotest_lib.server.utils import parse_machine
mblighccb9e182008-04-17 15:42:10 +000043from autotest_lib.client.common_lib.error import *
44from autotest_lib.client.common_lib import barrier
mblighf1c52842007-10-16 15:21:38 +000045
mbligh119c12a2007-11-12 22:13:44 +000046autotest.Autotest.job = job
mbligh31a49de2007-11-05 18:41:19 +000047hosts.SSHHost.job = job
mblighe1417fa2007-12-10 16:55:13 +000048barrier = barrier.barrier
mbligh1fb77cc2008-02-27 16:41:20 +000049if len(machines) > 1:
50 open('.machines', 'w').write('\\n'.join(machines) + '\\n')
mblighf1c52842007-10-16 15:21:38 +000051"""
52
53client_wrapper = """
54at = autotest.Autotest()
55
56def run_client(machine):
mbligh1965dfa2008-06-04 19:58:37 +000057 hostname, user, password, port = parse_machine(machine,
58 ssh_user, ssh_port, ssh_pass)
59
60 host = hosts.SSHHost(hostname, user, port, password=password)
mblighf1c52842007-10-16 15:21:38 +000061 at.run(control, host=host)
62
mbligh6437ff52008-04-17 15:24:38 +000063job.parallel_simple(run_client, machines)
mblighf1c52842007-10-16 15:21:38 +000064"""
65
mbligh303ccac2007-11-05 18:07:28 +000066crashdumps = """
67def crashdumps(machine):
mbligh1965dfa2008-06-04 19:58:37 +000068 hostname, user, password, port = parse_machine(machine,
69 ssh_user, ssh_port, ssh_pass)
70
71 host = hosts.SSHHost(hostname, user, port, initialize=False, \
72 password=password)
mbligh303ccac2007-11-05 18:07:28 +000073 host.get_crashdumps(test_start_time)
74
mbligh6437ff52008-04-17 15:24:38 +000075job.parallel_simple(crashdumps, machines, log=False)
mbligh303ccac2007-11-05 18:07:28 +000076"""
77
mbligh98ff1462007-12-19 16:27:55 +000078reboot_segment="""\
79def reboot(machine):
mbligh1965dfa2008-06-04 19:58:37 +000080 hostname, user, password, port = parse_machine(machine,
81 ssh_user, ssh_port, ssh_pass)
82
83 host = hosts.SSHHost(hostname, user, port, initialize=False, \
84 password=password)
mbligh17f0c662007-11-05 18:28:19 +000085 host.reboot()
mblighf1c52842007-10-16 15:21:38 +000086
mbligh6437ff52008-04-17 15:24:38 +000087job.parallel_simple(reboot, machines, log=False)
mblighf1c52842007-10-16 15:21:38 +000088"""
89
mblighf36243d2007-10-30 15:36:16 +000090install="""\
91def install(machine):
mbligh1965dfa2008-06-04 19:58:37 +000092 hostname, user, password, port = parse_machine(machine,
93 ssh_user, ssh_port, ssh_pass)
94
95 host = hosts.SSHHost(hostname, user, port, initialize=False, \
96 password=password)
mbligh17f0c662007-11-05 18:28:19 +000097 host.machine_install()
mblighf36243d2007-10-30 15:36:16 +000098
mbligh6437ff52008-04-17 15:24:38 +000099job.parallel_simple(install, machines, log=False)
mblighf36243d2007-10-30 15:36:16 +0000100"""
101
mbligh7f86e0b2007-11-24 19:45:07 +0000102# load up the verifier control segment, with an optional site-specific hook
mblighed5a4102007-11-20 00:46:41 +0000103verify = load_control_segment("site_verify")
104verify += load_control_segment("verify")
mbligh1d42d4e2007-11-05 22:42:00 +0000105
mbligh7f86e0b2007-11-24 19:45:07 +0000106# load up the repair control segment, with an optional site-specific hook
107repair = load_control_segment("site_repair")
108repair += load_control_segment("repair")
109
mbligh1d42d4e2007-11-05 22:42:00 +0000110
mbligh970b94e2008-01-24 16:29:34 +0000111# load up site-specific code for generating site-specific job data
112try:
113 import site_job
114 get_site_job_data = site_job.get_site_job_data
115 del site_job
116except ImportError:
117 # by default provide a stub that generates no site data
118 def get_site_job_data(job):
119 return {}
120
121
mblighcaa62c22008-04-07 21:51:17 +0000122class base_server_job:
mblighf1c52842007-10-16 15:21:38 +0000123 """The actual job against which we do everything.
124
125 Properties:
126 autodir
127 The top level autotest directory (/usr/local/autotest).
128 serverdir
129 <autodir>/server/
130 clientdir
131 <autodir>/client/
132 conmuxdir
133 <autodir>/conmux/
134 testdir
135 <autodir>/server/tests/
136 control
137 the control file for this job
138 """
139
jadmanski6e8bf752008-05-14 00:17:48 +0000140 STATUS_VERSION = 1
141
142
mblighe8b37a92007-12-19 15:54:11 +0000143 def __init__(self, control, args, resultdir, label, user, machines,
mbligh1965dfa2008-06-04 19:58:37 +0000144 client=False, parse_job="",
145 ssh_user='root', ssh_port=22, ssh_pass=''):
mblighf1c52842007-10-16 15:21:38 +0000146 """
147 control
148 The control file (pathname of)
149 args
150 args to pass to the control file
151 resultdir
152 where to throw the results
mbligh18420c22007-10-16 22:27:14 +0000153 label
154 label for the job
mblighf1c52842007-10-16 15:21:38 +0000155 user
156 Username for the job (email address)
157 client
158 True if a client-side control file
159 """
mblighf5427bb2008-04-09 15:55:57 +0000160 path = os.path.dirname(__file__)
mblighf1c52842007-10-16 15:21:38 +0000161 self.autodir = os.path.abspath(os.path.join(path, '..'))
162 self.serverdir = os.path.join(self.autodir, 'server')
mbligh05269362007-10-16 16:58:11 +0000163 self.testdir = os.path.join(self.serverdir, 'tests')
164 self.tmpdir = os.path.join(self.serverdir, 'tmp')
mblighf1c52842007-10-16 15:21:38 +0000165 self.conmuxdir = os.path.join(self.autodir, 'conmux')
166 self.clientdir = os.path.join(self.autodir, 'client')
mblighe25fd5b2008-01-22 17:23:37 +0000167 if control:
168 self.control = open(control, 'r').read()
169 self.control = re.sub('\r', '', self.control)
170 else:
171 self.control = None
mblighf1c52842007-10-16 15:21:38 +0000172 self.resultdir = resultdir
173 if not os.path.exists(resultdir):
174 os.mkdir(resultdir)
mbligh3ccb8592007-11-05 18:13:40 +0000175 self.debugdir = os.path.join(resultdir, 'debug')
176 if not os.path.exists(self.debugdir):
177 os.mkdir(self.debugdir)
mbligh3dcf2c92007-10-16 22:24:00 +0000178 self.status = os.path.join(resultdir, 'status')
mbligh18420c22007-10-16 22:27:14 +0000179 self.label = label
mblighf1c52842007-10-16 15:21:38 +0000180 self.user = user
181 self.args = args
mblighe8b37a92007-12-19 15:54:11 +0000182 self.machines = machines
mblighf1c52842007-10-16 15:21:38 +0000183 self.client = client
184 self.record_prefix = ''
mblighf4e04152008-02-21 16:05:53 +0000185 self.warning_loggers = set()
mbligh1965dfa2008-06-04 19:58:37 +0000186 self.ssh_user = ssh_user
187 self.ssh_port = ssh_port
188 self.ssh_pass = ssh_pass
mblighf1c52842007-10-16 15:21:38 +0000189
mbligh3f4bced2007-11-05 17:55:53 +0000190 self.stdout = fd_stack.fd_stack(1, sys.stdout)
191 self.stderr = fd_stack.fd_stack(2, sys.stderr)
192
mbligh3dcf2c92007-10-16 22:24:00 +0000193 if os.path.exists(self.status):
194 os.unlink(self.status)
jadmanski6e8bf752008-05-14 00:17:48 +0000195 job_data = {'label' : label, 'user' : user,
196 'hostname' : ','.join(machines),
197 'status_version' : str(self.STATUS_VERSION)}
mbligh970b94e2008-01-24 16:29:34 +0000198 job_data.update(get_site_job_data(self))
mblighf1c52842007-10-16 15:21:38 +0000199 write_keyval(self.resultdir, job_data)
200
mbligh6437ff52008-04-17 15:24:38 +0000201 self.parse_job = parse_job
202 if self.parse_job and len(machines) == 1:
203 self.using_parser = True
204 self.init_parser(resultdir)
205 else:
206 self.using_parser = False
207
208
209 def init_parser(self, resultdir):
210 """Start the continuous parsing of resultdir. This sets up
211 the database connection and inserts the basic job object into
212 the database if necessary."""
213 # redirect parser debugging to .parse.log
214 parse_log = os.path.join(resultdir, '.parse.log')
215 parse_log = open(parse_log, 'w', 0)
216 tko_utils.redirect_parser_debugging(parse_log)
217 # create a job model object and set up the db
218 self.results_db = tko_db.db(autocommit=True)
jadmanski6e8bf752008-05-14 00:17:48 +0000219 self.parser = status_lib.parser(self.STATUS_VERSION)
mbligh6437ff52008-04-17 15:24:38 +0000220 self.job_model = self.parser.make_job(resultdir)
221 self.parser.start(self.job_model)
222 # check if a job already exists in the db and insert it if
223 # it does not
224 job_idx = self.results_db.find_job(self.parse_job)
225 if job_idx is None:
226 self.results_db.insert_job(self.parse_job,
227 self.job_model)
228 else:
229 machine_idx = self.results_db.lookup_machine(
230 self.job_model.machine)
231 self.job_model.index = job_idx
232 self.job_model.machine_idx = machine_idx
233
234
mblighfe0af112008-04-17 15:27:47 +0000235 def cleanup_parser(self):
mbligh6437ff52008-04-17 15:24:38 +0000236 """This should be called after the server job is finished
237 to carry out any remaining cleanup (e.g. flushing any
238 remaining test results to the results db)"""
239 if not self.using_parser:
240 return
241 final_tests = self.parser.end()
242 for test in final_tests:
jadmanski28816c22008-05-21 18:18:05 +0000243 self.__insert_test(test)
mblighfe0af112008-04-17 15:27:47 +0000244 self.using_parser = False
mbligh6437ff52008-04-17 15:24:38 +0000245
mblighf1c52842007-10-16 15:21:38 +0000246
mblighe25fd5b2008-01-22 17:23:37 +0000247 def verify(self):
248 if not self.machines:
mbligh6437ff52008-04-17 15:24:38 +0000249 raise error.AutoservError(
250 'No machines specified to verify')
mblighe25fd5b2008-01-22 17:23:37 +0000251 try:
mbligh1965dfa2008-06-04 19:58:37 +0000252 namespace = {'machines' : self.machines, 'job' : self, \
253 'ssh_user' : self.ssh_user, 'ssh_port' : ssh_port, \
254 'ssh_pass' : ssh_pass}
mblighe25fd5b2008-01-22 17:23:37 +0000255 exec(preamble + verify, namespace, namespace)
256 except Exception, e:
mbligh302482e2008-05-01 20:06:16 +0000257 msg = ('Verify failed\n' + str(e) + '\n'
258 + traceback.format_exc())
mblighe25fd5b2008-01-22 17:23:37 +0000259 self.record('ABORT', None, None, msg)
260 raise
261
262
263 def repair(self):
264 if not self.machines:
mbligh6437ff52008-04-17 15:24:38 +0000265 raise error.AutoservError(
266 'No machines specified to repair')
mbligh1965dfa2008-06-04 19:58:37 +0000267 namespace = {'machines' : self.machines, 'job' : self, \
268 'ssh_user' : self.ssh_user, 'ssh_port' : ssh_port, \
269 'ssh_pass' : ssh_pass}
mbligh16c722d2008-03-05 00:58:44 +0000270 # no matter what happens during repair, go on to try to reverify
271 try:
272 exec(preamble + repair, namespace, namespace)
273 except Exception, exc:
274 print 'Exception occured during repair'
275 traceback.print_exc()
mbligh8141f862008-01-25 17:20:40 +0000276 self.verify()
mblighe25fd5b2008-01-22 17:23:37 +0000277
278
mblighcaa62c22008-04-07 21:51:17 +0000279 def enable_external_logging(self):
280 """Start or restart external logging mechanism.
281 """
282 pass
283
284
285 def disable_external_logging(self):
286 """ Pause or stop external logging mechanism.
287 """
288 pass
289
290
291 def use_external_logging(self):
292 """Return True if external logging should be used.
293 """
294 return False
295
296
mbligh6437ff52008-04-17 15:24:38 +0000297 def parallel_simple(self, function, machines, log=True, timeout=None):
298 """Run 'function' using parallel_simple, with an extra
299 wrapper to handle the necessary setup for continuous parsing,
300 if possible. If continuous parsing is already properly
301 initialized then this should just work."""
302 is_forking = not (len(machines) == 1 and
303 self.machines == machines)
304 if self.parse_job and is_forking:
305 def wrapper(machine):
306 self.parse_job += "/" + machine
307 self.using_parser = True
308 self.machines = [machine]
309 self.resultdir = os.path.join(self.resultdir,
310 machine)
311 self.init_parser(self.resultdir)
312 result = function(machine)
mblighfe0af112008-04-17 15:27:47 +0000313 self.cleanup_parser()
mbligh6437ff52008-04-17 15:24:38 +0000314 return result
315 else:
316 wrapper = function
317 subcommand.parallel_simple(wrapper, machines, log, timeout)
318
319
mblighe8b37a92007-12-19 15:54:11 +0000320 def run(self, reboot = False, install_before = False,
mblighddd54332008-03-07 18:14:06 +0000321 install_after = False, collect_crashdumps = True,
322 namespace = {}):
mbligh60dbd502007-10-26 14:59:31 +0000323 # use a copy so changes don't affect the original dictionary
324 namespace = namespace.copy()
mblighe8b37a92007-12-19 15:54:11 +0000325 machines = self.machines
mbligh60dbd502007-10-26 14:59:31 +0000326
mblighfaf0cd42007-11-19 16:00:24 +0000327 self.aborted = False
mblighf1c52842007-10-16 15:21:38 +0000328 namespace['machines'] = machines
329 namespace['args'] = self.args
330 namespace['job'] = self
mbligh1965dfa2008-06-04 19:58:37 +0000331 namespace['ssh_user'] = self.ssh_user
332 namespace['ssh_port'] = self.ssh_port
333 namespace['ssh_pass'] = self.ssh_pass
mbligh6e294382007-11-05 18:11:29 +0000334 test_start_time = int(time.time())
mblighf1c52842007-10-16 15:21:38 +0000335
mbligh87c5d882007-10-29 17:07:24 +0000336 os.chdir(self.resultdir)
mblighcaa62c22008-04-07 21:51:17 +0000337
338 self.enable_external_logging()
mbligh87c5d882007-10-29 17:07:24 +0000339 status_log = os.path.join(self.resultdir, 'status.log')
mblighf1c52842007-10-16 15:21:38 +0000340 try:
mblighf36243d2007-10-30 15:36:16 +0000341 if install_before and machines:
342 exec(preamble + install, namespace, namespace)
mblighf1c52842007-10-16 15:21:38 +0000343 if self.client:
344 namespace['control'] = self.control
345 open('control', 'w').write(self.control)
346 open('control.srv', 'w').write(client_wrapper)
347 server_control = client_wrapper
348 else:
349 open('control.srv', 'w').write(self.control)
350 server_control = self.control
mblighf1c52842007-10-16 15:21:38 +0000351 exec(preamble + server_control, namespace, namespace)
352
353 finally:
mblighddd54332008-03-07 18:14:06 +0000354 if machines and collect_crashdumps:
mbligh6e294382007-11-05 18:11:29 +0000355 namespace['test_start_time'] = test_start_time
mbligh98ff1462007-12-19 16:27:55 +0000356 exec(preamble + crashdumps,
357 namespace, namespace)
mblighcaa62c22008-04-07 21:51:17 +0000358 self.disable_external_logging()
mblighf1c52842007-10-16 15:21:38 +0000359 if reboot and machines:
mbligh98ff1462007-12-19 16:27:55 +0000360 exec(preamble + reboot_segment,
361 namespace, namespace)
mblighf36243d2007-10-30 15:36:16 +0000362 if install_after and machines:
363 exec(preamble + install, namespace, namespace)
mblighf1c52842007-10-16 15:21:38 +0000364
365
366 def run_test(self, url, *args, **dargs):
367 """Summon a test object and run it.
368
369 tag
370 tag to add to testname
371 url
372 url of the test to run
373 """
374
mblighf1c52842007-10-16 15:21:38 +0000375 (group, testname) = test.testname(url)
376 tag = None
377 subdir = testname
mbligh43ac5222007-10-16 15:55:01 +0000378
mblighf1c52842007-10-16 15:21:38 +0000379 if dargs.has_key('tag'):
380 tag = dargs['tag']
381 del dargs['tag']
382 if tag:
383 subdir += '.' + tag
mblighf1c52842007-10-16 15:21:38 +0000384
mbligh43ac5222007-10-16 15:55:01 +0000385 try:
386 test.runtest(self, url, tag, args, dargs)
387 self.record('GOOD', subdir, testname, 'completed successfully')
mbligh302482e2008-05-01 20:06:16 +0000388 except error.TestNAError, detail:
389 self.record('TEST_NA', subdir, testmame, str(detail))
mbligh43ac5222007-10-16 15:55:01 +0000390 except Exception, detail:
mbligh302482e2008-05-01 20:06:16 +0000391 info = str(detail) + "\n" + traceback.format_exc()
392 self.record('FAIL', subdir, testname, info)
mblighf1c52842007-10-16 15:21:38 +0000393
394
395 def run_group(self, function, *args, **dargs):
396 """\
397 function:
398 subroutine to run
399 *args:
400 arguments for the function
401 """
402
403 result = None
404 name = function.__name__
405
406 # Allow the tag for the group to be specified.
407 if dargs.has_key('tag'):
408 tag = dargs['tag']
409 del dargs['tag']
410 if tag:
411 name = tag
412
413 # if tag:
414 # name += '.' + tag
415 old_record_prefix = self.record_prefix
416 try:
417 try:
418 self.record('START', None, name)
419 self.record_prefix += '\t'
420 result = function(*args, **dargs)
jadmanski0c109552008-06-02 18:02:29 +0000421 except Exception, e:
mblighf1c52842007-10-16 15:21:38 +0000422 self.record_prefix = old_record_prefix
jadmanski0c109552008-06-02 18:02:29 +0000423 err_msg = str(e) + '\n'
424 err_msg += traceback.format_exc()
425 self.record('END FAIL', None, name, err_msg)
jadmanskif35bbb62008-05-29 21:36:04 +0000426 else:
427 self.record_prefix = old_record_prefix
428 self.record('END GOOD', None, name)
mbligh302482e2008-05-01 20:06:16 +0000429
mblighf1c52842007-10-16 15:21:38 +0000430 # We don't want to raise up an error higher if it's just
431 # a TestError - we want to carry on to other tests. Hence
432 # this outer try/except block.
mbligh6437ff52008-04-17 15:24:38 +0000433 except error.TestError:
mblighf1c52842007-10-16 15:21:38 +0000434 pass
435 except:
mbligh6437ff52008-04-17 15:24:38 +0000436 raise error.TestError(name + ' failed\n' +
mbligh0a1727a2008-04-21 18:10:07 +0000437 traceback.format_exc())
mblighf1c52842007-10-16 15:21:38 +0000438
439 return result
440
441
jadmanskif35bbb62008-05-29 21:36:04 +0000442 def run_reboot(self, reboot_func, get_kernel_func):
443 """\
444 A specialization of run_group meant specifically for handling
445 a reboot. Includes support for capturing the kernel version
446 after the reboot.
447
448 reboot_func: a function that carries out the reboot
449
450 get_kernel_func: a function that returns a string
451 representing the kernel version.
452 """
453
454 old_record_prefix = self.record_prefix
455 try:
456 self.record('START', None, 'reboot')
457 self.record_prefix += '\t'
458 reboot_func()
jadmanski0c109552008-06-02 18:02:29 +0000459 except Exception, e:
jadmanskif35bbb62008-05-29 21:36:04 +0000460 self.record_prefix = old_record_prefix
jadmanski0c109552008-06-02 18:02:29 +0000461 err_msg = str(e) + '\n' + traceback.format_exc()
462 self.record('END FAIL', None, 'reboot', err_msg)
jadmanskif35bbb62008-05-29 21:36:04 +0000463 else:
464 kernel = get_kernel_func()
465 self.record_prefix = old_record_prefix
466 self.record('END GOOD', None, 'reboot',
467 optional_fields={"kernel": kernel})
468
469
470 def record(self, status_code, subdir, operation, status='',
471 optional_fields=None):
mblighf1c52842007-10-16 15:21:38 +0000472 """
473 Record job-level status
474
475 The intent is to make this file both machine parseable and
476 human readable. That involves a little more complexity, but
477 really isn't all that bad ;-)
478
479 Format is <status code>\t<subdir>\t<operation>\t<status>
480
mbligh302482e2008-05-01 20:06:16 +0000481 status code: see common_lib.logging.is_valid_status()
482 for valid status definition
mblighf1c52842007-10-16 15:21:38 +0000483
484 subdir: MUST be a relevant subdirectory in the results,
485 or None, which will be represented as '----'
486
487 operation: description of what you ran (e.g. "dbench", or
488 "mkfs -t foobar /dev/sda9")
489
490 status: error message or "completed sucessfully"
491
492 ------------------------------------------------------------
493
494 Initial tabs indicate indent levels for grouping, and is
495 governed by self.record_prefix
496
497 multiline messages have secondary lines prefaced by a double
498 space (' ')
mblighf4e04152008-02-21 16:05:53 +0000499
500 Executing this method will trigger the logging of all new
501 warnings to date from the various console loggers.
502 """
mblighdab39662008-02-27 16:47:55 +0000503 # poll all our warning loggers for new warnings
504 warnings = self._read_warnings()
505 for timestamp, msg in warnings:
506 self.__record("WARN", None, None, msg, timestamp)
507
508 # write out the actual status log line
jadmanskif35bbb62008-05-29 21:36:04 +0000509 self.__record(status_code, subdir, operation, status,
510 optional_fields=optional_fields)
mblighdab39662008-02-27 16:47:55 +0000511
512
513 def _read_warnings(self):
mblighf4e04152008-02-21 16:05:53 +0000514 warnings = []
515 while True:
516 # pull in a line of output from every logger that has
517 # output ready to be read
518 loggers, _, _ = select.select(self.warning_loggers,
519 [], [], 0)
520 closed_loggers = set()
521 for logger in loggers:
522 line = logger.readline()
523 # record any broken pipes (aka line == empty)
524 if len(line) == 0:
525 closed_loggers.add(logger)
526 continue
527 timestamp, msg = line.split('\t', 1)
528 warnings.append((int(timestamp), msg.strip()))
529
530 # stop listening to loggers that are closed
531 self.warning_loggers -= closed_loggers
532
533 # stop if none of the loggers have any output left
534 if not loggers:
535 break
536
mblighdab39662008-02-27 16:47:55 +0000537 # sort into timestamp order
538 warnings.sort()
539 return warnings
mblighf4e04152008-02-21 16:05:53 +0000540
541
mblighdab39662008-02-27 16:47:55 +0000542 def _render_record(self, status_code, subdir, operation, status='',
jadmanskif35bbb62008-05-29 21:36:04 +0000543 epoch_time=None, record_prefix=None,
544 optional_fields=None):
mblighf4e04152008-02-21 16:05:53 +0000545 """
mblighdab39662008-02-27 16:47:55 +0000546 Internal Function to generate a record to be written into a
547 status log. For use by server_job.* classes only.
mblighf1c52842007-10-16 15:21:38 +0000548 """
mblighf1c52842007-10-16 15:21:38 +0000549 if subdir:
550 if re.match(r'[\n\t]', subdir):
mbligh6437ff52008-04-17 15:24:38 +0000551 raise ValueError(
552 'Invalid character in subdir string')
mblighf1c52842007-10-16 15:21:38 +0000553 substr = subdir
554 else:
555 substr = '----'
mbligh6437ff52008-04-17 15:24:38 +0000556
mbligh302482e2008-05-01 20:06:16 +0000557 if not logging.is_valid_status(status_code):
mbligh6437ff52008-04-17 15:24:38 +0000558 raise ValueError('Invalid status code supplied: %s' %
559 status_code)
mblighe25fd5b2008-01-22 17:23:37 +0000560 if not operation:
561 operation = '----'
mblighf1c52842007-10-16 15:21:38 +0000562 if re.match(r'[\n\t]', operation):
mbligh6437ff52008-04-17 15:24:38 +0000563 raise ValueError(
564 'Invalid character in operation string')
mblighf1c52842007-10-16 15:21:38 +0000565 operation = operation.rstrip()
566 status = status.rstrip()
567 status = re.sub(r"\t", " ", status)
568 # Ensure any continuation lines are marked so we can
569 # detect them in the status file to ensure it is parsable.
570 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
571
jadmanskif35bbb62008-05-29 21:36:04 +0000572 if not optional_fields:
573 optional_fields = {}
574
mbligh30270302007-11-05 20:33:52 +0000575 # Generate timestamps for inclusion in the logs
mblighf4e04152008-02-21 16:05:53 +0000576 if epoch_time is None:
577 epoch_time = int(time.time())
mbligh30270302007-11-05 20:33:52 +0000578 local_time = time.localtime(epoch_time)
jadmanskif35bbb62008-05-29 21:36:04 +0000579 optional_fields["timestamp"] = str(epoch_time)
580 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
581 local_time)
582
583 fields = [status_code, substr, operation]
584 fields += ["%s=%s" % x for x in optional_fields.iteritems()]
585 fields.append(status)
mbligh30270302007-11-05 20:33:52 +0000586
mblighdab39662008-02-27 16:47:55 +0000587 if record_prefix is None:
588 record_prefix = self.record_prefix
589
jadmanskif35bbb62008-05-29 21:36:04 +0000590 msg = '\t'.join(str(x) for x in fields)
591
mblighdab39662008-02-27 16:47:55 +0000592 return record_prefix + msg + '\n'
593
594
595 def _record_prerendered(self, msg):
596 """
597 Record a pre-rendered msg into the status logs. The only
598 change this makes to the message is to add on the local
599 indentation. Should not be called outside of server_job.*
600 classes. Unlike __record, this does not write the message
601 to standard output.
602 """
mbligh6437ff52008-04-17 15:24:38 +0000603 lines = []
mblighdab39662008-02-27 16:47:55 +0000604 status_file = os.path.join(self.resultdir, 'status.log')
605 status_log = open(status_file, 'a')
606 for line in msg.splitlines():
607 line = self.record_prefix + line + '\n'
mbligh6437ff52008-04-17 15:24:38 +0000608 lines.append(line)
mblighdab39662008-02-27 16:47:55 +0000609 status_log.write(line)
610 status_log.close()
mbligh6437ff52008-04-17 15:24:38 +0000611 self.__parse_status(lines)
mblighdab39662008-02-27 16:47:55 +0000612
613
614 def __record(self, status_code, subdir, operation, status='',
jadmanskif35bbb62008-05-29 21:36:04 +0000615 epoch_time=None, optional_fields=None):
mblighdab39662008-02-27 16:47:55 +0000616 """
617 Actual function for recording a single line into the status
618 logs. Should never be called directly, only by job.record as
619 this would bypass the console monitor logging.
620 """
621
622 msg = self._render_record(status_code, subdir, operation,
jadmanskif35bbb62008-05-29 21:36:04 +0000623 status, epoch_time,
624 optional_fields=optional_fields)
mblighdab39662008-02-27 16:47:55 +0000625
mblighf1c52842007-10-16 15:21:38 +0000626
mbligh31a49de2007-11-05 18:41:19 +0000627 status_file = os.path.join(self.resultdir, 'status.log')
mblighdab39662008-02-27 16:47:55 +0000628 sys.stdout.write(msg)
629 open(status_file, "a").write(msg)
mblighf1c52842007-10-16 15:21:38 +0000630 if subdir:
mblighd56eb592008-01-22 16:36:34 +0000631 test_dir = os.path.join(self.resultdir, subdir)
632 if not os.path.exists(test_dir):
633 os.mkdir(test_dir)
634 status_file = os.path.join(test_dir, 'status')
mblighdab39662008-02-27 16:47:55 +0000635 open(status_file, "a").write(msg)
jadmanski96bb7642008-05-15 17:58:16 +0000636 self.__parse_status(msg.splitlines())
mblighb03ba642008-03-13 17:37:17 +0000637
638
mbligh6437ff52008-04-17 15:24:38 +0000639 def __parse_status(self, new_lines):
640 if not self.using_parser:
641 return
642 new_tests = self.parser.process_lines(new_lines)
643 for test in new_tests:
jadmanski28816c22008-05-21 18:18:05 +0000644 self.__insert_test(test)
645
646
647 def __insert_test(self, test):
648 """ An internal method to insert a new test result into the
649 database. This method will not raise an exception, even if an
650 error occurs during the insert, to avoid failing a test
651 simply because of unexpected database issues."""
652 try:
mbligh6437ff52008-04-17 15:24:38 +0000653 self.results_db.insert_test(self.job_model, test)
jadmanski28816c22008-05-21 18:18:05 +0000654 except Exception:
655 msg = ("WARNING: An unexpected error occured while "
656 "inserting test results into the database. "
657 "Ignoring error.\n" + traceback.format_exc())
658 print >> sys.stderr, msg
mblighdab39662008-02-27 16:47:55 +0000659
660
661# a file-like object for catching stderr from an autotest client and
662# extracting status logs from it
663class client_logger(object):
664 """Partial file object to write to both stdout and
665 the status log file. We only implement those methods
666 utils.run() actually calls.
667 """
668 parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
669 extract_indent = re.compile(r"^(\t*).*$")
670
671 def __init__(self, job):
672 self.job = job
673 self.leftover = ""
674 self.last_line = ""
675 self.logs = {}
676
677
678 def _process_log_dict(self, log_dict):
679 log_list = log_dict.pop("logs", [])
680 for key in sorted(log_dict.iterkeys()):
681 log_list += self._process_log_dict(log_dict.pop(key))
682 return log_list
683
684
685 def _process_logs(self):
686 """Go through the accumulated logs in self.log and print them
687 out to stdout and the status log. Note that this processes
688 logs in an ordering where:
689
690 1) logs to different tags are never interleaved
691 2) logs to x.y come before logs to x.y.z for all z
692 3) logs to x.y come before x.z whenever y < z
693
694 Note that this will in general not be the same as the
695 chronological ordering of the logs. However, if a chronological
696 ordering is desired that one can be reconstructed from the
697 status log by looking at timestamp lines."""
698 log_list = self._process_log_dict(self.logs)
699 for line in log_list:
700 self.job._record_prerendered(line + '\n')
701 if log_list:
702 self.last_line = log_list[-1]
703
704
705 def _process_quoted_line(self, tag, line):
706 """Process a line quoted with an AUTOTEST_STATUS flag. If the
707 tag is blank then we want to push out all the data we've been
708 building up in self.logs, and then the newest line. If the
709 tag is not blank, then push the line into the logs for handling
710 later."""
711 print line
712 if tag == "":
713 self._process_logs()
714 self.job._record_prerendered(line + '\n')
715 self.last_line = line
716 else:
717 tag_parts = [int(x) for x in tag.split(".")]
718 log_dict = self.logs
719 for part in tag_parts:
720 log_dict = log_dict.setdefault(part, {})
721 log_list = log_dict.setdefault("logs", [])
722 log_list.append(line)
723
724
725 def _process_line(self, line):
726 """Write out a line of data to the appropriate stream. Status
727 lines sent by autotest will be prepended with
728 "AUTOTEST_STATUS", and all other lines are ssh error
729 messages."""
730 match = self.parser.search(line)
731 if match:
732 tag, line = match.groups()
733 self._process_quoted_line(tag, line)
734 else:
mblighfe749d22008-03-07 18:14:46 +0000735 print line
mblighdab39662008-02-27 16:47:55 +0000736
737
738 def _format_warnings(self, last_line, warnings):
mbligh71d340d2008-03-05 15:51:16 +0000739 # use the indentation of whatever the last log line was
mblighdab39662008-02-27 16:47:55 +0000740 indent = self.extract_indent.match(last_line).group(1)
mbligh71d340d2008-03-05 15:51:16 +0000741 # if the last line starts a new group, add an extra indent
742 if last_line.lstrip('\t').startswith("START\t"):
743 indent += '\t'
mblighdab39662008-02-27 16:47:55 +0000744 return [self.job._render_record("WARN", None, None, msg,
745 timestamp, indent).rstrip('\n')
746 for timestamp, msg in warnings]
747
748
749 def _process_warnings(self, last_line, log_dict, warnings):
750 if log_dict.keys() in ([], ["logs"]):
751 # there are no sub-jobs, just append the warnings here
752 warnings = self._format_warnings(last_line, warnings)
753 log_list = log_dict.setdefault("logs", [])
754 log_list += warnings
755 for warning in warnings:
756 sys.stdout.write(warning + '\n')
757 else:
758 # there are sub-jobs, so put the warnings in there
759 log_list = log_dict.get("logs", [])
760 if log_list:
761 last_line = log_list[-1]
762 for key in sorted(log_dict.iterkeys()):
763 if key != "logs":
764 self._process_warnings(last_line,
765 log_dict[key],
766 warnings)
767
768
769 def write(self, data):
770 # first check for any new console warnings
771 warnings = self.job._read_warnings()
772 self._process_warnings(self.last_line, self.logs, warnings)
773 # now process the newest data written out
774 data = self.leftover + data
775 lines = data.split("\n")
776 # process every line but the last one
777 for line in lines[:-1]:
778 self._process_line(line)
779 # save the last line for later processing
780 # since we may not have the whole line yet
781 self.leftover = lines[-1]
782
783
784 def flush(self):
785 sys.stdout.flush()
mblighdab39662008-02-27 16:47:55 +0000786
787
788 def close(self):
789 if self.leftover:
790 self._process_line(self.leftover)
791 self._process_logs()
792 self.flush()
mblighcaa62c22008-04-07 21:51:17 +0000793
794# site_server_job.py may be non-existant or empty, make sure that an
795# appropriate site_server_job class is created nevertheless
796try:
mblighccb9e182008-04-17 15:42:10 +0000797 from autotest_lib.server.site_server_job import site_server_job
mblighcaa62c22008-04-07 21:51:17 +0000798except ImportError:
799 class site_server_job(base_server_job):
800 pass
801
802class server_job(site_server_job):
803 pass