blob: e143fd1079beb004d39b8fc52b5aa9329129dce0 [file] [log] [blame]
mblighf1c52842007-10-16 15:21:38 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
9__author__ = """
10Martin J. Bligh <mbligh@google.com>
11Andy Whitcroft <apw@shadowen.org>
12"""
13
mblighdbdac6c2008-03-05 15:49:58 +000014import os, sys, re, time, select, subprocess, traceback
mblighf1c52842007-10-16 15:21:38 +000015
mbligh6437ff52008-04-17 15:24:38 +000016from autotest_lib.client.bin import fd_stack
mbligh302482e2008-05-01 20:06:16 +000017from autotest_lib.client.common_lib import error, logging
mbligh6437ff52008-04-17 15:24:38 +000018from autotest_lib.server import test, subcommand
19from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
20from autotest_lib.server.utils import *
21
mbligh3f4bced2007-11-05 17:55:53 +000022
mblighed5a4102007-11-20 00:46:41 +000023# load up a control segment
24# these are all stored in <server_dir>/control_segments
25def load_control_segment(name):
26 server_dir = os.path.dirname(os.path.abspath(__file__))
mbligh7f86e0b2007-11-24 19:45:07 +000027 script_file = os.path.join(server_dir, "control_segments", name)
mblighed5a4102007-11-20 00:46:41 +000028 if os.path.exists(script_file):
29 return file(script_file).read()
30 else:
31 return ""
32
33
mblighf1c52842007-10-16 15:21:38 +000034preamble = """\
35import os, sys
36
mblighccb9e182008-04-17 15:42:10 +000037from autotest_lib.server import hosts, autotest, kvm, git, standalone_profiler
38from autotest_lib.server import source_kernel, rpm_kernel, deb_kernel
39from autotest_lib.server import git_kernel
40from autotest_lib.server.subcommand import *
41from autotest_lib.server.utils import run, get_tmp_dir, sh_escape
mbligh1965dfa2008-06-04 19:58:37 +000042from autotest_lib.server.utils import parse_machine
mblighccb9e182008-04-17 15:42:10 +000043from autotest_lib.client.common_lib.error import *
44from autotest_lib.client.common_lib import barrier
mblighf1c52842007-10-16 15:21:38 +000045
mbligh119c12a2007-11-12 22:13:44 +000046autotest.Autotest.job = job
mbligh31a49de2007-11-05 18:41:19 +000047hosts.SSHHost.job = job
mblighe1417fa2007-12-10 16:55:13 +000048barrier = barrier.barrier
mbligh1fb77cc2008-02-27 16:41:20 +000049if len(machines) > 1:
50 open('.machines', 'w').write('\\n'.join(machines) + '\\n')
mblighf1c52842007-10-16 15:21:38 +000051"""
52
53client_wrapper = """
54at = autotest.Autotest()
55
56def run_client(machine):
mbligh1965dfa2008-06-04 19:58:37 +000057 hostname, user, password, port = parse_machine(machine,
58 ssh_user, ssh_port, ssh_pass)
59
60 host = hosts.SSHHost(hostname, user, port, password=password)
mblighf1c52842007-10-16 15:21:38 +000061 at.run(control, host=host)
62
mbligh6437ff52008-04-17 15:24:38 +000063job.parallel_simple(run_client, machines)
mblighf1c52842007-10-16 15:21:38 +000064"""
65
mbligh303ccac2007-11-05 18:07:28 +000066crashdumps = """
67def crashdumps(machine):
mbligh1965dfa2008-06-04 19:58:37 +000068 hostname, user, password, port = parse_machine(machine,
69 ssh_user, ssh_port, ssh_pass)
70
71 host = hosts.SSHHost(hostname, user, port, initialize=False, \
72 password=password)
mbligh303ccac2007-11-05 18:07:28 +000073 host.get_crashdumps(test_start_time)
74
mbligh6437ff52008-04-17 15:24:38 +000075job.parallel_simple(crashdumps, machines, log=False)
mbligh303ccac2007-11-05 18:07:28 +000076"""
77
mbligh98ff1462007-12-19 16:27:55 +000078reboot_segment="""\
79def reboot(machine):
mbligh1965dfa2008-06-04 19:58:37 +000080 hostname, user, password, port = parse_machine(machine,
81 ssh_user, ssh_port, ssh_pass)
82
83 host = hosts.SSHHost(hostname, user, port, initialize=False, \
84 password=password)
mbligh17f0c662007-11-05 18:28:19 +000085 host.reboot()
mblighf1c52842007-10-16 15:21:38 +000086
mbligh6437ff52008-04-17 15:24:38 +000087job.parallel_simple(reboot, machines, log=False)
mblighf1c52842007-10-16 15:21:38 +000088"""
89
mblighf36243d2007-10-30 15:36:16 +000090install="""\
91def install(machine):
mbligh1965dfa2008-06-04 19:58:37 +000092 hostname, user, password, port = parse_machine(machine,
93 ssh_user, ssh_port, ssh_pass)
94
95 host = hosts.SSHHost(hostname, user, port, initialize=False, \
96 password=password)
mbligh17f0c662007-11-05 18:28:19 +000097 host.machine_install()
mblighf36243d2007-10-30 15:36:16 +000098
mbligh6437ff52008-04-17 15:24:38 +000099job.parallel_simple(install, machines, log=False)
mblighf36243d2007-10-30 15:36:16 +0000100"""
101
mbligh7f86e0b2007-11-24 19:45:07 +0000102# load up the verifier control segment, with an optional site-specific hook
mblighed5a4102007-11-20 00:46:41 +0000103verify = load_control_segment("site_verify")
104verify += load_control_segment("verify")
mbligh1d42d4e2007-11-05 22:42:00 +0000105
mbligh7f86e0b2007-11-24 19:45:07 +0000106# load up the repair control segment, with an optional site-specific hook
107repair = load_control_segment("site_repair")
108repair += load_control_segment("repair")
109
mbligh1d42d4e2007-11-05 22:42:00 +0000110
mbligh970b94e2008-01-24 16:29:34 +0000111# load up site-specific code for generating site-specific job data
112try:
113 import site_job
114 get_site_job_data = site_job.get_site_job_data
115 del site_job
116except ImportError:
117 # by default provide a stub that generates no site data
118 def get_site_job_data(job):
119 return {}
120
121
mblighcaa62c22008-04-07 21:51:17 +0000122class base_server_job:
mblighf1c52842007-10-16 15:21:38 +0000123 """The actual job against which we do everything.
124
125 Properties:
126 autodir
127 The top level autotest directory (/usr/local/autotest).
128 serverdir
129 <autodir>/server/
130 clientdir
131 <autodir>/client/
132 conmuxdir
133 <autodir>/conmux/
134 testdir
135 <autodir>/server/tests/
136 control
137 the control file for this job
138 """
139
jadmanski6e8bf752008-05-14 00:17:48 +0000140 STATUS_VERSION = 1
141
142
mblighe8b37a92007-12-19 15:54:11 +0000143 def __init__(self, control, args, resultdir, label, user, machines,
mbligh1965dfa2008-06-04 19:58:37 +0000144 client=False, parse_job="",
145 ssh_user='root', ssh_port=22, ssh_pass=''):
mblighf1c52842007-10-16 15:21:38 +0000146 """
147 control
148 The control file (pathname of)
149 args
150 args to pass to the control file
151 resultdir
152 where to throw the results
mbligh18420c22007-10-16 22:27:14 +0000153 label
154 label for the job
mblighf1c52842007-10-16 15:21:38 +0000155 user
156 Username for the job (email address)
157 client
158 True if a client-side control file
159 """
mblighf5427bb2008-04-09 15:55:57 +0000160 path = os.path.dirname(__file__)
mblighf1c52842007-10-16 15:21:38 +0000161 self.autodir = os.path.abspath(os.path.join(path, '..'))
162 self.serverdir = os.path.join(self.autodir, 'server')
mbligh05269362007-10-16 16:58:11 +0000163 self.testdir = os.path.join(self.serverdir, 'tests')
164 self.tmpdir = os.path.join(self.serverdir, 'tmp')
mblighf1c52842007-10-16 15:21:38 +0000165 self.conmuxdir = os.path.join(self.autodir, 'conmux')
166 self.clientdir = os.path.join(self.autodir, 'client')
mblighe25fd5b2008-01-22 17:23:37 +0000167 if control:
168 self.control = open(control, 'r').read()
169 self.control = re.sub('\r', '', self.control)
170 else:
171 self.control = None
mblighf1c52842007-10-16 15:21:38 +0000172 self.resultdir = resultdir
173 if not os.path.exists(resultdir):
174 os.mkdir(resultdir)
mbligh3ccb8592007-11-05 18:13:40 +0000175 self.debugdir = os.path.join(resultdir, 'debug')
176 if not os.path.exists(self.debugdir):
177 os.mkdir(self.debugdir)
mbligh3dcf2c92007-10-16 22:24:00 +0000178 self.status = os.path.join(resultdir, 'status')
mbligh18420c22007-10-16 22:27:14 +0000179 self.label = label
mblighf1c52842007-10-16 15:21:38 +0000180 self.user = user
181 self.args = args
mblighe8b37a92007-12-19 15:54:11 +0000182 self.machines = machines
mblighf1c52842007-10-16 15:21:38 +0000183 self.client = client
184 self.record_prefix = ''
mblighf4e04152008-02-21 16:05:53 +0000185 self.warning_loggers = set()
mbligh1965dfa2008-06-04 19:58:37 +0000186 self.ssh_user = ssh_user
187 self.ssh_port = ssh_port
188 self.ssh_pass = ssh_pass
mblighf1c52842007-10-16 15:21:38 +0000189
mbligh3f4bced2007-11-05 17:55:53 +0000190 self.stdout = fd_stack.fd_stack(1, sys.stdout)
191 self.stderr = fd_stack.fd_stack(2, sys.stderr)
192
mbligh3dcf2c92007-10-16 22:24:00 +0000193 if os.path.exists(self.status):
194 os.unlink(self.status)
jadmanski6e8bf752008-05-14 00:17:48 +0000195 job_data = {'label' : label, 'user' : user,
196 'hostname' : ','.join(machines),
197 'status_version' : str(self.STATUS_VERSION)}
mbligh970b94e2008-01-24 16:29:34 +0000198 job_data.update(get_site_job_data(self))
mblighf1c52842007-10-16 15:21:38 +0000199 write_keyval(self.resultdir, job_data)
200
mbligh6437ff52008-04-17 15:24:38 +0000201 self.parse_job = parse_job
202 if self.parse_job and len(machines) == 1:
203 self.using_parser = True
204 self.init_parser(resultdir)
205 else:
206 self.using_parser = False
207
208
209 def init_parser(self, resultdir):
210 """Start the continuous parsing of resultdir. This sets up
211 the database connection and inserts the basic job object into
212 the database if necessary."""
213 # redirect parser debugging to .parse.log
214 parse_log = os.path.join(resultdir, '.parse.log')
215 parse_log = open(parse_log, 'w', 0)
216 tko_utils.redirect_parser_debugging(parse_log)
217 # create a job model object and set up the db
218 self.results_db = tko_db.db(autocommit=True)
jadmanski6e8bf752008-05-14 00:17:48 +0000219 self.parser = status_lib.parser(self.STATUS_VERSION)
mbligh6437ff52008-04-17 15:24:38 +0000220 self.job_model = self.parser.make_job(resultdir)
221 self.parser.start(self.job_model)
222 # check if a job already exists in the db and insert it if
223 # it does not
224 job_idx = self.results_db.find_job(self.parse_job)
225 if job_idx is None:
226 self.results_db.insert_job(self.parse_job,
227 self.job_model)
228 else:
229 machine_idx = self.results_db.lookup_machine(
230 self.job_model.machine)
231 self.job_model.index = job_idx
232 self.job_model.machine_idx = machine_idx
233
234
mblighfe0af112008-04-17 15:27:47 +0000235 def cleanup_parser(self):
mbligh6437ff52008-04-17 15:24:38 +0000236 """This should be called after the server job is finished
237 to carry out any remaining cleanup (e.g. flushing any
238 remaining test results to the results db)"""
239 if not self.using_parser:
240 return
241 final_tests = self.parser.end()
242 for test in final_tests:
jadmanski28816c22008-05-21 18:18:05 +0000243 self.__insert_test(test)
mblighfe0af112008-04-17 15:27:47 +0000244 self.using_parser = False
mbligh6437ff52008-04-17 15:24:38 +0000245
mblighf1c52842007-10-16 15:21:38 +0000246
mblighe25fd5b2008-01-22 17:23:37 +0000247 def verify(self):
248 if not self.machines:
mbligh6437ff52008-04-17 15:24:38 +0000249 raise error.AutoservError(
250 'No machines specified to verify')
mblighe25fd5b2008-01-22 17:23:37 +0000251 try:
mbligh1965dfa2008-06-04 19:58:37 +0000252 namespace = {'machines' : self.machines, 'job' : self, \
253 'ssh_user' : self.ssh_user, 'ssh_port' : ssh_port, \
254 'ssh_pass' : ssh_pass}
mblighe25fd5b2008-01-22 17:23:37 +0000255 exec(preamble + verify, namespace, namespace)
256 except Exception, e:
mbligh302482e2008-05-01 20:06:16 +0000257 msg = ('Verify failed\n' + str(e) + '\n'
258 + traceback.format_exc())
mblighe25fd5b2008-01-22 17:23:37 +0000259 self.record('ABORT', None, None, msg)
260 raise
261
262
263 def repair(self):
264 if not self.machines:
mbligh6437ff52008-04-17 15:24:38 +0000265 raise error.AutoservError(
266 'No machines specified to repair')
mbligh1965dfa2008-06-04 19:58:37 +0000267 namespace = {'machines' : self.machines, 'job' : self, \
268 'ssh_user' : self.ssh_user, 'ssh_port' : ssh_port, \
269 'ssh_pass' : ssh_pass}
mbligh16c722d2008-03-05 00:58:44 +0000270 # no matter what happens during repair, go on to try to reverify
271 try:
272 exec(preamble + repair, namespace, namespace)
273 except Exception, exc:
274 print 'Exception occured during repair'
275 traceback.print_exc()
mbligh8141f862008-01-25 17:20:40 +0000276 self.verify()
mblighe25fd5b2008-01-22 17:23:37 +0000277
278
mblighcaa62c22008-04-07 21:51:17 +0000279 def enable_external_logging(self):
280 """Start or restart external logging mechanism.
281 """
282 pass
283
284
285 def disable_external_logging(self):
286 """ Pause or stop external logging mechanism.
287 """
288 pass
289
290
291 def use_external_logging(self):
292 """Return True if external logging should be used.
293 """
294 return False
295
296
mbligh6437ff52008-04-17 15:24:38 +0000297 def parallel_simple(self, function, machines, log=True, timeout=None):
298 """Run 'function' using parallel_simple, with an extra
299 wrapper to handle the necessary setup for continuous parsing,
300 if possible. If continuous parsing is already properly
301 initialized then this should just work."""
302 is_forking = not (len(machines) == 1 and
303 self.machines == machines)
304 if self.parse_job and is_forking:
305 def wrapper(machine):
306 self.parse_job += "/" + machine
307 self.using_parser = True
308 self.machines = [machine]
309 self.resultdir = os.path.join(self.resultdir,
310 machine)
311 self.init_parser(self.resultdir)
312 result = function(machine)
mblighfe0af112008-04-17 15:27:47 +0000313 self.cleanup_parser()
mbligh6437ff52008-04-17 15:24:38 +0000314 return result
315 else:
316 wrapper = function
317 subcommand.parallel_simple(wrapper, machines, log, timeout)
318
319
mblighe8b37a92007-12-19 15:54:11 +0000320 def run(self, reboot = False, install_before = False,
mblighddd54332008-03-07 18:14:06 +0000321 install_after = False, collect_crashdumps = True,
322 namespace = {}):
mbligh60dbd502007-10-26 14:59:31 +0000323 # use a copy so changes don't affect the original dictionary
324 namespace = namespace.copy()
mblighe8b37a92007-12-19 15:54:11 +0000325 machines = self.machines
mbligh60dbd502007-10-26 14:59:31 +0000326
mblighfaf0cd42007-11-19 16:00:24 +0000327 self.aborted = False
mblighf1c52842007-10-16 15:21:38 +0000328 namespace['machines'] = machines
329 namespace['args'] = self.args
330 namespace['job'] = self
mbligh1965dfa2008-06-04 19:58:37 +0000331 namespace['ssh_user'] = self.ssh_user
332 namespace['ssh_port'] = self.ssh_port
333 namespace['ssh_pass'] = self.ssh_pass
mbligh6e294382007-11-05 18:11:29 +0000334 test_start_time = int(time.time())
mblighf1c52842007-10-16 15:21:38 +0000335
mbligh87c5d882007-10-29 17:07:24 +0000336 os.chdir(self.resultdir)
mblighcaa62c22008-04-07 21:51:17 +0000337
338 self.enable_external_logging()
mbligh87c5d882007-10-29 17:07:24 +0000339 status_log = os.path.join(self.resultdir, 'status.log')
mblighf1c52842007-10-16 15:21:38 +0000340 try:
mblighf36243d2007-10-30 15:36:16 +0000341 if install_before and machines:
342 exec(preamble + install, namespace, namespace)
mblighf1c52842007-10-16 15:21:38 +0000343 if self.client:
344 namespace['control'] = self.control
345 open('control', 'w').write(self.control)
346 open('control.srv', 'w').write(client_wrapper)
347 server_control = client_wrapper
348 else:
349 open('control.srv', 'w').write(self.control)
350 server_control = self.control
mblighf1c52842007-10-16 15:21:38 +0000351 exec(preamble + server_control, namespace, namespace)
352
353 finally:
mblighddd54332008-03-07 18:14:06 +0000354 if machines and collect_crashdumps:
mbligh6e294382007-11-05 18:11:29 +0000355 namespace['test_start_time'] = test_start_time
mbligh98ff1462007-12-19 16:27:55 +0000356 exec(preamble + crashdumps,
357 namespace, namespace)
mblighcaa62c22008-04-07 21:51:17 +0000358 self.disable_external_logging()
mblighf1c52842007-10-16 15:21:38 +0000359 if reboot and machines:
mbligh98ff1462007-12-19 16:27:55 +0000360 exec(preamble + reboot_segment,
361 namespace, namespace)
mblighf36243d2007-10-30 15:36:16 +0000362 if install_after and machines:
363 exec(preamble + install, namespace, namespace)
mblighf1c52842007-10-16 15:21:38 +0000364
365
366 def run_test(self, url, *args, **dargs):
367 """Summon a test object and run it.
368
369 tag
370 tag to add to testname
371 url
372 url of the test to run
373 """
374
mblighf1c52842007-10-16 15:21:38 +0000375 (group, testname) = test.testname(url)
376 tag = None
377 subdir = testname
mbligh43ac5222007-10-16 15:55:01 +0000378
mblighf1c52842007-10-16 15:21:38 +0000379 if dargs.has_key('tag'):
380 tag = dargs['tag']
381 del dargs['tag']
382 if tag:
383 subdir += '.' + tag
mblighf1c52842007-10-16 15:21:38 +0000384
mblighd660afe2008-06-05 22:17:53 +0000385 outputdir = os.path.join(self.resultdir, subdir)
386 if os.path.exists(outputdir):
387 msg = ("%s already exists, test <%s> may have"
388 " already run with tag <%s>"
389 % (outputdir, testname, tag) )
390 raise error.TestError(msg)
391 os.mkdir(outputdir)
392
mbligh43ac5222007-10-16 15:55:01 +0000393 try:
394 test.runtest(self, url, tag, args, dargs)
395 self.record('GOOD', subdir, testname, 'completed successfully')
mbligh302482e2008-05-01 20:06:16 +0000396 except error.TestNAError, detail:
mblighd660afe2008-06-05 22:17:53 +0000397 self.record('TEST_NA', subdir, testname, str(detail))
mbligh43ac5222007-10-16 15:55:01 +0000398 except Exception, detail:
mbligh302482e2008-05-01 20:06:16 +0000399 info = str(detail) + "\n" + traceback.format_exc()
400 self.record('FAIL', subdir, testname, info)
mblighf1c52842007-10-16 15:21:38 +0000401
402
403 def run_group(self, function, *args, **dargs):
404 """\
405 function:
406 subroutine to run
407 *args:
408 arguments for the function
409 """
410
411 result = None
412 name = function.__name__
413
414 # Allow the tag for the group to be specified.
415 if dargs.has_key('tag'):
416 tag = dargs['tag']
417 del dargs['tag']
418 if tag:
419 name = tag
420
mblighf1c52842007-10-16 15:21:38 +0000421 old_record_prefix = self.record_prefix
422 try:
423 try:
424 self.record('START', None, name)
425 self.record_prefix += '\t'
426 result = function(*args, **dargs)
jadmanski0c109552008-06-02 18:02:29 +0000427 except Exception, e:
mblighf1c52842007-10-16 15:21:38 +0000428 self.record_prefix = old_record_prefix
jadmanski0c109552008-06-02 18:02:29 +0000429 err_msg = str(e) + '\n'
430 err_msg += traceback.format_exc()
431 self.record('END FAIL', None, name, err_msg)
jadmanskif35bbb62008-05-29 21:36:04 +0000432 else:
433 self.record_prefix = old_record_prefix
434 self.record('END GOOD', None, name)
mbligh302482e2008-05-01 20:06:16 +0000435
mblighf1c52842007-10-16 15:21:38 +0000436 # We don't want to raise up an error higher if it's just
437 # a TestError - we want to carry on to other tests. Hence
438 # this outer try/except block.
mbligh6437ff52008-04-17 15:24:38 +0000439 except error.TestError:
mblighf1c52842007-10-16 15:21:38 +0000440 pass
441 except:
mbligh6437ff52008-04-17 15:24:38 +0000442 raise error.TestError(name + ' failed\n' +
mbligh0a1727a2008-04-21 18:10:07 +0000443 traceback.format_exc())
mblighf1c52842007-10-16 15:21:38 +0000444
445 return result
446
447
jadmanskif35bbb62008-05-29 21:36:04 +0000448 def run_reboot(self, reboot_func, get_kernel_func):
449 """\
450 A specialization of run_group meant specifically for handling
451 a reboot. Includes support for capturing the kernel version
452 after the reboot.
453
454 reboot_func: a function that carries out the reboot
455
456 get_kernel_func: a function that returns a string
457 representing the kernel version.
458 """
459
460 old_record_prefix = self.record_prefix
461 try:
462 self.record('START', None, 'reboot')
463 self.record_prefix += '\t'
464 reboot_func()
jadmanski0c109552008-06-02 18:02:29 +0000465 except Exception, e:
jadmanskif35bbb62008-05-29 21:36:04 +0000466 self.record_prefix = old_record_prefix
jadmanski0c109552008-06-02 18:02:29 +0000467 err_msg = str(e) + '\n' + traceback.format_exc()
468 self.record('END FAIL', None, 'reboot', err_msg)
jadmanskif35bbb62008-05-29 21:36:04 +0000469 else:
470 kernel = get_kernel_func()
471 self.record_prefix = old_record_prefix
472 self.record('END GOOD', None, 'reboot',
473 optional_fields={"kernel": kernel})
474
475
476 def record(self, status_code, subdir, operation, status='',
477 optional_fields=None):
mblighf1c52842007-10-16 15:21:38 +0000478 """
479 Record job-level status
480
481 The intent is to make this file both machine parseable and
482 human readable. That involves a little more complexity, but
483 really isn't all that bad ;-)
484
485 Format is <status code>\t<subdir>\t<operation>\t<status>
486
mbligh302482e2008-05-01 20:06:16 +0000487 status code: see common_lib.logging.is_valid_status()
488 for valid status definition
mblighf1c52842007-10-16 15:21:38 +0000489
490 subdir: MUST be a relevant subdirectory in the results,
491 or None, which will be represented as '----'
492
493 operation: description of what you ran (e.g. "dbench", or
494 "mkfs -t foobar /dev/sda9")
495
496 status: error message or "completed sucessfully"
497
498 ------------------------------------------------------------
499
500 Initial tabs indicate indent levels for grouping, and is
501 governed by self.record_prefix
502
503 multiline messages have secondary lines prefaced by a double
504 space (' ')
mblighf4e04152008-02-21 16:05:53 +0000505
506 Executing this method will trigger the logging of all new
507 warnings to date from the various console loggers.
508 """
mblighdab39662008-02-27 16:47:55 +0000509 # poll all our warning loggers for new warnings
510 warnings = self._read_warnings()
511 for timestamp, msg in warnings:
512 self.__record("WARN", None, None, msg, timestamp)
513
514 # write out the actual status log line
jadmanskif35bbb62008-05-29 21:36:04 +0000515 self.__record(status_code, subdir, operation, status,
516 optional_fields=optional_fields)
mblighdab39662008-02-27 16:47:55 +0000517
518
519 def _read_warnings(self):
mblighf4e04152008-02-21 16:05:53 +0000520 warnings = []
521 while True:
522 # pull in a line of output from every logger that has
523 # output ready to be read
524 loggers, _, _ = select.select(self.warning_loggers,
525 [], [], 0)
526 closed_loggers = set()
527 for logger in loggers:
528 line = logger.readline()
529 # record any broken pipes (aka line == empty)
530 if len(line) == 0:
531 closed_loggers.add(logger)
532 continue
533 timestamp, msg = line.split('\t', 1)
534 warnings.append((int(timestamp), msg.strip()))
535
536 # stop listening to loggers that are closed
537 self.warning_loggers -= closed_loggers
538
539 # stop if none of the loggers have any output left
540 if not loggers:
541 break
542
mblighdab39662008-02-27 16:47:55 +0000543 # sort into timestamp order
544 warnings.sort()
545 return warnings
mblighf4e04152008-02-21 16:05:53 +0000546
547
mblighdab39662008-02-27 16:47:55 +0000548 def _render_record(self, status_code, subdir, operation, status='',
jadmanskif35bbb62008-05-29 21:36:04 +0000549 epoch_time=None, record_prefix=None,
550 optional_fields=None):
mblighf4e04152008-02-21 16:05:53 +0000551 """
mblighdab39662008-02-27 16:47:55 +0000552 Internal Function to generate a record to be written into a
553 status log. For use by server_job.* classes only.
mblighf1c52842007-10-16 15:21:38 +0000554 """
mblighf1c52842007-10-16 15:21:38 +0000555 if subdir:
556 if re.match(r'[\n\t]', subdir):
mbligh6437ff52008-04-17 15:24:38 +0000557 raise ValueError(
558 'Invalid character in subdir string')
mblighf1c52842007-10-16 15:21:38 +0000559 substr = subdir
560 else:
561 substr = '----'
mbligh6437ff52008-04-17 15:24:38 +0000562
mbligh302482e2008-05-01 20:06:16 +0000563 if not logging.is_valid_status(status_code):
mbligh6437ff52008-04-17 15:24:38 +0000564 raise ValueError('Invalid status code supplied: %s' %
565 status_code)
mblighe25fd5b2008-01-22 17:23:37 +0000566 if not operation:
567 operation = '----'
mblighf1c52842007-10-16 15:21:38 +0000568 if re.match(r'[\n\t]', operation):
mbligh6437ff52008-04-17 15:24:38 +0000569 raise ValueError(
570 'Invalid character in operation string')
mblighf1c52842007-10-16 15:21:38 +0000571 operation = operation.rstrip()
572 status = status.rstrip()
573 status = re.sub(r"\t", " ", status)
574 # Ensure any continuation lines are marked so we can
575 # detect them in the status file to ensure it is parsable.
576 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
577
jadmanskif35bbb62008-05-29 21:36:04 +0000578 if not optional_fields:
579 optional_fields = {}
580
mbligh30270302007-11-05 20:33:52 +0000581 # Generate timestamps for inclusion in the logs
mblighf4e04152008-02-21 16:05:53 +0000582 if epoch_time is None:
583 epoch_time = int(time.time())
mbligh30270302007-11-05 20:33:52 +0000584 local_time = time.localtime(epoch_time)
jadmanskif35bbb62008-05-29 21:36:04 +0000585 optional_fields["timestamp"] = str(epoch_time)
586 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
587 local_time)
588
589 fields = [status_code, substr, operation]
590 fields += ["%s=%s" % x for x in optional_fields.iteritems()]
591 fields.append(status)
mbligh30270302007-11-05 20:33:52 +0000592
mblighdab39662008-02-27 16:47:55 +0000593 if record_prefix is None:
594 record_prefix = self.record_prefix
595
jadmanskif35bbb62008-05-29 21:36:04 +0000596 msg = '\t'.join(str(x) for x in fields)
597
mblighdab39662008-02-27 16:47:55 +0000598 return record_prefix + msg + '\n'
599
600
601 def _record_prerendered(self, msg):
602 """
603 Record a pre-rendered msg into the status logs. The only
604 change this makes to the message is to add on the local
605 indentation. Should not be called outside of server_job.*
606 classes. Unlike __record, this does not write the message
607 to standard output.
608 """
mbligh6437ff52008-04-17 15:24:38 +0000609 lines = []
mblighdab39662008-02-27 16:47:55 +0000610 status_file = os.path.join(self.resultdir, 'status.log')
611 status_log = open(status_file, 'a')
612 for line in msg.splitlines():
613 line = self.record_prefix + line + '\n'
mbligh6437ff52008-04-17 15:24:38 +0000614 lines.append(line)
mblighdab39662008-02-27 16:47:55 +0000615 status_log.write(line)
616 status_log.close()
mbligh6437ff52008-04-17 15:24:38 +0000617 self.__parse_status(lines)
mblighdab39662008-02-27 16:47:55 +0000618
619
620 def __record(self, status_code, subdir, operation, status='',
jadmanskif35bbb62008-05-29 21:36:04 +0000621 epoch_time=None, optional_fields=None):
mblighdab39662008-02-27 16:47:55 +0000622 """
623 Actual function for recording a single line into the status
624 logs. Should never be called directly, only by job.record as
625 this would bypass the console monitor logging.
626 """
627
628 msg = self._render_record(status_code, subdir, operation,
jadmanskif35bbb62008-05-29 21:36:04 +0000629 status, epoch_time,
630 optional_fields=optional_fields)
mblighdab39662008-02-27 16:47:55 +0000631
mblighf1c52842007-10-16 15:21:38 +0000632
mbligh31a49de2007-11-05 18:41:19 +0000633 status_file = os.path.join(self.resultdir, 'status.log')
mblighdab39662008-02-27 16:47:55 +0000634 sys.stdout.write(msg)
635 open(status_file, "a").write(msg)
mblighf1c52842007-10-16 15:21:38 +0000636 if subdir:
mblighd56eb592008-01-22 16:36:34 +0000637 test_dir = os.path.join(self.resultdir, subdir)
mblighd56eb592008-01-22 16:36:34 +0000638 status_file = os.path.join(test_dir, 'status')
mblighdab39662008-02-27 16:47:55 +0000639 open(status_file, "a").write(msg)
jadmanski96bb7642008-05-15 17:58:16 +0000640 self.__parse_status(msg.splitlines())
mblighb03ba642008-03-13 17:37:17 +0000641
642
mbligh6437ff52008-04-17 15:24:38 +0000643 def __parse_status(self, new_lines):
644 if not self.using_parser:
645 return
646 new_tests = self.parser.process_lines(new_lines)
647 for test in new_tests:
jadmanski28816c22008-05-21 18:18:05 +0000648 self.__insert_test(test)
649
650
651 def __insert_test(self, test):
652 """ An internal method to insert a new test result into the
653 database. This method will not raise an exception, even if an
654 error occurs during the insert, to avoid failing a test
655 simply because of unexpected database issues."""
656 try:
mbligh6437ff52008-04-17 15:24:38 +0000657 self.results_db.insert_test(self.job_model, test)
jadmanski28816c22008-05-21 18:18:05 +0000658 except Exception:
659 msg = ("WARNING: An unexpected error occured while "
660 "inserting test results into the database. "
661 "Ignoring error.\n" + traceback.format_exc())
662 print >> sys.stderr, msg
mblighdab39662008-02-27 16:47:55 +0000663
664
665# a file-like object for catching stderr from an autotest client and
666# extracting status logs from it
667class client_logger(object):
668 """Partial file object to write to both stdout and
669 the status log file. We only implement those methods
670 utils.run() actually calls.
671 """
672 parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
673 extract_indent = re.compile(r"^(\t*).*$")
674
675 def __init__(self, job):
676 self.job = job
677 self.leftover = ""
678 self.last_line = ""
679 self.logs = {}
680
681
682 def _process_log_dict(self, log_dict):
683 log_list = log_dict.pop("logs", [])
684 for key in sorted(log_dict.iterkeys()):
685 log_list += self._process_log_dict(log_dict.pop(key))
686 return log_list
687
688
689 def _process_logs(self):
690 """Go through the accumulated logs in self.log and print them
691 out to stdout and the status log. Note that this processes
692 logs in an ordering where:
693
694 1) logs to different tags are never interleaved
695 2) logs to x.y come before logs to x.y.z for all z
696 3) logs to x.y come before x.z whenever y < z
697
698 Note that this will in general not be the same as the
699 chronological ordering of the logs. However, if a chronological
700 ordering is desired that one can be reconstructed from the
701 status log by looking at timestamp lines."""
702 log_list = self._process_log_dict(self.logs)
703 for line in log_list:
704 self.job._record_prerendered(line + '\n')
705 if log_list:
706 self.last_line = log_list[-1]
707
708
709 def _process_quoted_line(self, tag, line):
710 """Process a line quoted with an AUTOTEST_STATUS flag. If the
711 tag is blank then we want to push out all the data we've been
712 building up in self.logs, and then the newest line. If the
713 tag is not blank, then push the line into the logs for handling
714 later."""
715 print line
716 if tag == "":
717 self._process_logs()
718 self.job._record_prerendered(line + '\n')
719 self.last_line = line
720 else:
721 tag_parts = [int(x) for x in tag.split(".")]
722 log_dict = self.logs
723 for part in tag_parts:
724 log_dict = log_dict.setdefault(part, {})
725 log_list = log_dict.setdefault("logs", [])
726 log_list.append(line)
727
728
729 def _process_line(self, line):
730 """Write out a line of data to the appropriate stream. Status
731 lines sent by autotest will be prepended with
732 "AUTOTEST_STATUS", and all other lines are ssh error
733 messages."""
734 match = self.parser.search(line)
735 if match:
736 tag, line = match.groups()
737 self._process_quoted_line(tag, line)
738 else:
mblighfe749d22008-03-07 18:14:46 +0000739 print line
mblighdab39662008-02-27 16:47:55 +0000740
741
742 def _format_warnings(self, last_line, warnings):
mbligh71d340d2008-03-05 15:51:16 +0000743 # use the indentation of whatever the last log line was
mblighdab39662008-02-27 16:47:55 +0000744 indent = self.extract_indent.match(last_line).group(1)
mbligh71d340d2008-03-05 15:51:16 +0000745 # if the last line starts a new group, add an extra indent
746 if last_line.lstrip('\t').startswith("START\t"):
747 indent += '\t'
mblighdab39662008-02-27 16:47:55 +0000748 return [self.job._render_record("WARN", None, None, msg,
749 timestamp, indent).rstrip('\n')
750 for timestamp, msg in warnings]
751
752
753 def _process_warnings(self, last_line, log_dict, warnings):
754 if log_dict.keys() in ([], ["logs"]):
755 # there are no sub-jobs, just append the warnings here
756 warnings = self._format_warnings(last_line, warnings)
757 log_list = log_dict.setdefault("logs", [])
758 log_list += warnings
759 for warning in warnings:
760 sys.stdout.write(warning + '\n')
761 else:
762 # there are sub-jobs, so put the warnings in there
763 log_list = log_dict.get("logs", [])
764 if log_list:
765 last_line = log_list[-1]
766 for key in sorted(log_dict.iterkeys()):
767 if key != "logs":
768 self._process_warnings(last_line,
769 log_dict[key],
770 warnings)
771
772
773 def write(self, data):
774 # first check for any new console warnings
775 warnings = self.job._read_warnings()
776 self._process_warnings(self.last_line, self.logs, warnings)
777 # now process the newest data written out
778 data = self.leftover + data
779 lines = data.split("\n")
780 # process every line but the last one
781 for line in lines[:-1]:
782 self._process_line(line)
783 # save the last line for later processing
784 # since we may not have the whole line yet
785 self.leftover = lines[-1]
786
787
788 def flush(self):
789 sys.stdout.flush()
mblighdab39662008-02-27 16:47:55 +0000790
791
792 def close(self):
793 if self.leftover:
794 self._process_line(self.leftover)
795 self._process_logs()
796 self.flush()
mblighcaa62c22008-04-07 21:51:17 +0000797
798# site_server_job.py may be non-existant or empty, make sure that an
799# appropriate site_server_job class is created nevertheless
800try:
mblighccb9e182008-04-17 15:42:10 +0000801 from autotest_lib.server.site_server_job import site_server_job
mblighcaa62c22008-04-07 21:51:17 +0000802except ImportError:
803 class site_server_job(base_server_job):
804 pass
805
806class server_job(site_server_job):
807 pass