blob: 166846a343a09b09a111b8d8fbff17901562c258 [file] [log] [blame]
mblighf1c52842007-10-16 15:21:38 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
9__author__ = """
10Martin J. Bligh <mbligh@google.com>
11Andy Whitcroft <apw@shadowen.org>
12"""
13
mblighdbdac6c2008-03-05 15:49:58 +000014import os, sys, re, time, select, subprocess, traceback
mblighf1c52842007-10-16 15:21:38 +000015
mbligh6437ff52008-04-17 15:24:38 +000016from autotest_lib.client.bin import fd_stack
mbligh302482e2008-05-01 20:06:16 +000017from autotest_lib.client.common_lib import error, logging
mbligh6437ff52008-04-17 15:24:38 +000018from autotest_lib.server import test, subcommand
19from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
20from autotest_lib.server.utils import *
21
mbligh3f4bced2007-11-05 17:55:53 +000022
mblighed5a4102007-11-20 00:46:41 +000023# load up a control segment
24# these are all stored in <server_dir>/control_segments
25def load_control_segment(name):
26 server_dir = os.path.dirname(os.path.abspath(__file__))
mbligh7f86e0b2007-11-24 19:45:07 +000027 script_file = os.path.join(server_dir, "control_segments", name)
mblighed5a4102007-11-20 00:46:41 +000028 if os.path.exists(script_file):
29 return file(script_file).read()
30 else:
31 return ""
32
33
mblighf1c52842007-10-16 15:21:38 +000034preamble = """\
35import os, sys
36
mblighccb9e182008-04-17 15:42:10 +000037from autotest_lib.server import hosts, autotest, kvm, git, standalone_profiler
38from autotest_lib.server import source_kernel, rpm_kernel, deb_kernel
39from autotest_lib.server import git_kernel
40from autotest_lib.server.subcommand import *
41from autotest_lib.server.utils import run, get_tmp_dir, sh_escape
mbligh1965dfa2008-06-04 19:58:37 +000042from autotest_lib.server.utils import parse_machine
mblighccb9e182008-04-17 15:42:10 +000043from autotest_lib.client.common_lib.error import *
44from autotest_lib.client.common_lib import barrier
mblighf1c52842007-10-16 15:21:38 +000045
mbligh119c12a2007-11-12 22:13:44 +000046autotest.Autotest.job = job
mbligh31a49de2007-11-05 18:41:19 +000047hosts.SSHHost.job = job
mblighe1417fa2007-12-10 16:55:13 +000048barrier = barrier.barrier
mbligh1fb77cc2008-02-27 16:41:20 +000049if len(machines) > 1:
50 open('.machines', 'w').write('\\n'.join(machines) + '\\n')
mblighf1c52842007-10-16 15:21:38 +000051"""
52
53client_wrapper = """
54at = autotest.Autotest()
55
56def run_client(machine):
mbligh1965dfa2008-06-04 19:58:37 +000057 hostname, user, password, port = parse_machine(machine,
58 ssh_user, ssh_port, ssh_pass)
59
60 host = hosts.SSHHost(hostname, user, port, password=password)
mblighf1c52842007-10-16 15:21:38 +000061 at.run(control, host=host)
62
mbligh6437ff52008-04-17 15:24:38 +000063job.parallel_simple(run_client, machines)
mblighf1c52842007-10-16 15:21:38 +000064"""
65
mbligh303ccac2007-11-05 18:07:28 +000066crashdumps = """
67def crashdumps(machine):
mbligh1965dfa2008-06-04 19:58:37 +000068 hostname, user, password, port = parse_machine(machine,
69 ssh_user, ssh_port, ssh_pass)
70
71 host = hosts.SSHHost(hostname, user, port, initialize=False, \
72 password=password)
mbligh303ccac2007-11-05 18:07:28 +000073 host.get_crashdumps(test_start_time)
74
mbligh6437ff52008-04-17 15:24:38 +000075job.parallel_simple(crashdumps, machines, log=False)
mbligh303ccac2007-11-05 18:07:28 +000076"""
77
mbligh98ff1462007-12-19 16:27:55 +000078reboot_segment="""\
79def reboot(machine):
mbligh1965dfa2008-06-04 19:58:37 +000080 hostname, user, password, port = parse_machine(machine,
81 ssh_user, ssh_port, ssh_pass)
82
83 host = hosts.SSHHost(hostname, user, port, initialize=False, \
84 password=password)
mbligh17f0c662007-11-05 18:28:19 +000085 host.reboot()
mblighf1c52842007-10-16 15:21:38 +000086
mbligh6437ff52008-04-17 15:24:38 +000087job.parallel_simple(reboot, machines, log=False)
mblighf1c52842007-10-16 15:21:38 +000088"""
89
mblighf36243d2007-10-30 15:36:16 +000090install="""\
91def install(machine):
mbligh1965dfa2008-06-04 19:58:37 +000092 hostname, user, password, port = parse_machine(machine,
93 ssh_user, ssh_port, ssh_pass)
94
95 host = hosts.SSHHost(hostname, user, port, initialize=False, \
96 password=password)
mbligh17f0c662007-11-05 18:28:19 +000097 host.machine_install()
mblighf36243d2007-10-30 15:36:16 +000098
mbligh6437ff52008-04-17 15:24:38 +000099job.parallel_simple(install, machines, log=False)
mblighf36243d2007-10-30 15:36:16 +0000100"""
101
mbligh7f86e0b2007-11-24 19:45:07 +0000102# load up the verifier control segment, with an optional site-specific hook
mblighed5a4102007-11-20 00:46:41 +0000103verify = load_control_segment("site_verify")
104verify += load_control_segment("verify")
mbligh1d42d4e2007-11-05 22:42:00 +0000105
mbligh7f86e0b2007-11-24 19:45:07 +0000106# load up the repair control segment, with an optional site-specific hook
107repair = load_control_segment("site_repair")
108repair += load_control_segment("repair")
109
mbligh1d42d4e2007-11-05 22:42:00 +0000110
mbligh970b94e2008-01-24 16:29:34 +0000111# load up site-specific code for generating site-specific job data
112try:
113 import site_job
114 get_site_job_data = site_job.get_site_job_data
115 del site_job
116except ImportError:
117 # by default provide a stub that generates no site data
118 def get_site_job_data(job):
119 return {}
120
121
mblighcaa62c22008-04-07 21:51:17 +0000122class base_server_job:
mblighf1c52842007-10-16 15:21:38 +0000123 """The actual job against which we do everything.
124
125 Properties:
126 autodir
127 The top level autotest directory (/usr/local/autotest).
128 serverdir
129 <autodir>/server/
130 clientdir
131 <autodir>/client/
132 conmuxdir
133 <autodir>/conmux/
134 testdir
135 <autodir>/server/tests/
136 control
137 the control file for this job
138 """
139
jadmanski6e8bf752008-05-14 00:17:48 +0000140 STATUS_VERSION = 1
141
142
mblighe8b37a92007-12-19 15:54:11 +0000143 def __init__(self, control, args, resultdir, label, user, machines,
mbligh1965dfa2008-06-04 19:58:37 +0000144 client=False, parse_job="",
145 ssh_user='root', ssh_port=22, ssh_pass=''):
mblighf1c52842007-10-16 15:21:38 +0000146 """
147 control
148 The control file (pathname of)
149 args
150 args to pass to the control file
151 resultdir
152 where to throw the results
mbligh18420c22007-10-16 22:27:14 +0000153 label
154 label for the job
mblighf1c52842007-10-16 15:21:38 +0000155 user
156 Username for the job (email address)
157 client
158 True if a client-side control file
159 """
mblighf5427bb2008-04-09 15:55:57 +0000160 path = os.path.dirname(__file__)
mblighf1c52842007-10-16 15:21:38 +0000161 self.autodir = os.path.abspath(os.path.join(path, '..'))
162 self.serverdir = os.path.join(self.autodir, 'server')
mbligh05269362007-10-16 16:58:11 +0000163 self.testdir = os.path.join(self.serverdir, 'tests')
164 self.tmpdir = os.path.join(self.serverdir, 'tmp')
mblighf1c52842007-10-16 15:21:38 +0000165 self.conmuxdir = os.path.join(self.autodir, 'conmux')
166 self.clientdir = os.path.join(self.autodir, 'client')
mblighe25fd5b2008-01-22 17:23:37 +0000167 if control:
168 self.control = open(control, 'r').read()
169 self.control = re.sub('\r', '', self.control)
170 else:
171 self.control = None
mblighf1c52842007-10-16 15:21:38 +0000172 self.resultdir = resultdir
173 if not os.path.exists(resultdir):
174 os.mkdir(resultdir)
mbligh3ccb8592007-11-05 18:13:40 +0000175 self.debugdir = os.path.join(resultdir, 'debug')
176 if not os.path.exists(self.debugdir):
177 os.mkdir(self.debugdir)
mbligh3dcf2c92007-10-16 22:24:00 +0000178 self.status = os.path.join(resultdir, 'status')
mbligh18420c22007-10-16 22:27:14 +0000179 self.label = label
mblighf1c52842007-10-16 15:21:38 +0000180 self.user = user
181 self.args = args
mblighe8b37a92007-12-19 15:54:11 +0000182 self.machines = machines
mblighf1c52842007-10-16 15:21:38 +0000183 self.client = client
184 self.record_prefix = ''
mblighf4e04152008-02-21 16:05:53 +0000185 self.warning_loggers = set()
mbligh1965dfa2008-06-04 19:58:37 +0000186 self.ssh_user = ssh_user
187 self.ssh_port = ssh_port
188 self.ssh_pass = ssh_pass
mblighf1c52842007-10-16 15:21:38 +0000189
mbligh3f4bced2007-11-05 17:55:53 +0000190 self.stdout = fd_stack.fd_stack(1, sys.stdout)
191 self.stderr = fd_stack.fd_stack(2, sys.stderr)
192
mbligh3dcf2c92007-10-16 22:24:00 +0000193 if os.path.exists(self.status):
194 os.unlink(self.status)
jadmanski6e8bf752008-05-14 00:17:48 +0000195 job_data = {'label' : label, 'user' : user,
196 'hostname' : ','.join(machines),
197 'status_version' : str(self.STATUS_VERSION)}
mbligh970b94e2008-01-24 16:29:34 +0000198 job_data.update(get_site_job_data(self))
mblighf1c52842007-10-16 15:21:38 +0000199 write_keyval(self.resultdir, job_data)
200
mbligh6437ff52008-04-17 15:24:38 +0000201 self.parse_job = parse_job
202 if self.parse_job and len(machines) == 1:
203 self.using_parser = True
204 self.init_parser(resultdir)
205 else:
206 self.using_parser = False
207
208
209 def init_parser(self, resultdir):
210 """Start the continuous parsing of resultdir. This sets up
211 the database connection and inserts the basic job object into
212 the database if necessary."""
213 # redirect parser debugging to .parse.log
214 parse_log = os.path.join(resultdir, '.parse.log')
215 parse_log = open(parse_log, 'w', 0)
216 tko_utils.redirect_parser_debugging(parse_log)
217 # create a job model object and set up the db
218 self.results_db = tko_db.db(autocommit=True)
jadmanski6e8bf752008-05-14 00:17:48 +0000219 self.parser = status_lib.parser(self.STATUS_VERSION)
mbligh6437ff52008-04-17 15:24:38 +0000220 self.job_model = self.parser.make_job(resultdir)
221 self.parser.start(self.job_model)
222 # check if a job already exists in the db and insert it if
223 # it does not
224 job_idx = self.results_db.find_job(self.parse_job)
225 if job_idx is None:
226 self.results_db.insert_job(self.parse_job,
227 self.job_model)
228 else:
229 machine_idx = self.results_db.lookup_machine(
230 self.job_model.machine)
231 self.job_model.index = job_idx
232 self.job_model.machine_idx = machine_idx
233
234
mblighfe0af112008-04-17 15:27:47 +0000235 def cleanup_parser(self):
mbligh6437ff52008-04-17 15:24:38 +0000236 """This should be called after the server job is finished
237 to carry out any remaining cleanup (e.g. flushing any
238 remaining test results to the results db)"""
239 if not self.using_parser:
240 return
241 final_tests = self.parser.end()
242 for test in final_tests:
jadmanski28816c22008-05-21 18:18:05 +0000243 self.__insert_test(test)
mblighfe0af112008-04-17 15:27:47 +0000244 self.using_parser = False
mbligh6437ff52008-04-17 15:24:38 +0000245
mblighf1c52842007-10-16 15:21:38 +0000246
mblighe25fd5b2008-01-22 17:23:37 +0000247 def verify(self):
248 if not self.machines:
mbligh6437ff52008-04-17 15:24:38 +0000249 raise error.AutoservError(
250 'No machines specified to verify')
mblighe25fd5b2008-01-22 17:23:37 +0000251 try:
mbligh1965dfa2008-06-04 19:58:37 +0000252 namespace = {'machines' : self.machines, 'job' : self, \
mbligh0ab8fee2008-06-06 16:08:03 +0000253 'ssh_user' : self.ssh_user, \
254 'ssh_port' : self.ssh_port, \
255 'ssh_pass' : self.ssh_pass}
mblighe25fd5b2008-01-22 17:23:37 +0000256 exec(preamble + verify, namespace, namespace)
257 except Exception, e:
mbligh302482e2008-05-01 20:06:16 +0000258 msg = ('Verify failed\n' + str(e) + '\n'
259 + traceback.format_exc())
mblighe25fd5b2008-01-22 17:23:37 +0000260 self.record('ABORT', None, None, msg)
261 raise
262
263
264 def repair(self):
265 if not self.machines:
mbligh6437ff52008-04-17 15:24:38 +0000266 raise error.AutoservError(
267 'No machines specified to repair')
mbligh1965dfa2008-06-04 19:58:37 +0000268 namespace = {'machines' : self.machines, 'job' : self, \
mbligh0ab8fee2008-06-06 16:08:03 +0000269 'ssh_user' : self.ssh_user, \
270 'ssh_port' : self.ssh_port, \
271 'ssh_pass' : self.ssh_pass}
mbligh16c722d2008-03-05 00:58:44 +0000272 # no matter what happens during repair, go on to try to reverify
273 try:
274 exec(preamble + repair, namespace, namespace)
275 except Exception, exc:
276 print 'Exception occured during repair'
277 traceback.print_exc()
mbligh8141f862008-01-25 17:20:40 +0000278 self.verify()
mblighe25fd5b2008-01-22 17:23:37 +0000279
280
mblighcaa62c22008-04-07 21:51:17 +0000281 def enable_external_logging(self):
282 """Start or restart external logging mechanism.
283 """
284 pass
285
286
287 def disable_external_logging(self):
288 """ Pause or stop external logging mechanism.
289 """
290 pass
291
292
293 def use_external_logging(self):
294 """Return True if external logging should be used.
295 """
296 return False
297
298
mbligh6437ff52008-04-17 15:24:38 +0000299 def parallel_simple(self, function, machines, log=True, timeout=None):
300 """Run 'function' using parallel_simple, with an extra
301 wrapper to handle the necessary setup for continuous parsing,
302 if possible. If continuous parsing is already properly
303 initialized then this should just work."""
304 is_forking = not (len(machines) == 1 and
305 self.machines == machines)
306 if self.parse_job and is_forking:
307 def wrapper(machine):
308 self.parse_job += "/" + machine
309 self.using_parser = True
310 self.machines = [machine]
311 self.resultdir = os.path.join(self.resultdir,
312 machine)
313 self.init_parser(self.resultdir)
314 result = function(machine)
mblighfe0af112008-04-17 15:27:47 +0000315 self.cleanup_parser()
mbligh6437ff52008-04-17 15:24:38 +0000316 return result
317 else:
318 wrapper = function
319 subcommand.parallel_simple(wrapper, machines, log, timeout)
320
321
mblighe8b37a92007-12-19 15:54:11 +0000322 def run(self, reboot = False, install_before = False,
mblighddd54332008-03-07 18:14:06 +0000323 install_after = False, collect_crashdumps = True,
324 namespace = {}):
mbligh60dbd502007-10-26 14:59:31 +0000325 # use a copy so changes don't affect the original dictionary
326 namespace = namespace.copy()
mblighe8b37a92007-12-19 15:54:11 +0000327 machines = self.machines
mbligh60dbd502007-10-26 14:59:31 +0000328
mblighfaf0cd42007-11-19 16:00:24 +0000329 self.aborted = False
mblighf1c52842007-10-16 15:21:38 +0000330 namespace['machines'] = machines
331 namespace['args'] = self.args
332 namespace['job'] = self
mbligh1965dfa2008-06-04 19:58:37 +0000333 namespace['ssh_user'] = self.ssh_user
334 namespace['ssh_port'] = self.ssh_port
335 namespace['ssh_pass'] = self.ssh_pass
mbligh6e294382007-11-05 18:11:29 +0000336 test_start_time = int(time.time())
mblighf1c52842007-10-16 15:21:38 +0000337
mbligh87c5d882007-10-29 17:07:24 +0000338 os.chdir(self.resultdir)
mblighcaa62c22008-04-07 21:51:17 +0000339
340 self.enable_external_logging()
mbligh87c5d882007-10-29 17:07:24 +0000341 status_log = os.path.join(self.resultdir, 'status.log')
mblighf1c52842007-10-16 15:21:38 +0000342 try:
mblighf36243d2007-10-30 15:36:16 +0000343 if install_before and machines:
344 exec(preamble + install, namespace, namespace)
mblighf1c52842007-10-16 15:21:38 +0000345 if self.client:
346 namespace['control'] = self.control
347 open('control', 'w').write(self.control)
348 open('control.srv', 'w').write(client_wrapper)
349 server_control = client_wrapper
350 else:
351 open('control.srv', 'w').write(self.control)
352 server_control = self.control
mblighf1c52842007-10-16 15:21:38 +0000353 exec(preamble + server_control, namespace, namespace)
354
355 finally:
mblighddd54332008-03-07 18:14:06 +0000356 if machines and collect_crashdumps:
mbligh6e294382007-11-05 18:11:29 +0000357 namespace['test_start_time'] = test_start_time
mbligh98ff1462007-12-19 16:27:55 +0000358 exec(preamble + crashdumps,
359 namespace, namespace)
mblighcaa62c22008-04-07 21:51:17 +0000360 self.disable_external_logging()
mblighf1c52842007-10-16 15:21:38 +0000361 if reboot and machines:
mbligh98ff1462007-12-19 16:27:55 +0000362 exec(preamble + reboot_segment,
363 namespace, namespace)
mblighf36243d2007-10-30 15:36:16 +0000364 if install_after and machines:
365 exec(preamble + install, namespace, namespace)
mblighf1c52842007-10-16 15:21:38 +0000366
367
368 def run_test(self, url, *args, **dargs):
369 """Summon a test object and run it.
370
371 tag
372 tag to add to testname
373 url
374 url of the test to run
375 """
376
mblighf1c52842007-10-16 15:21:38 +0000377 (group, testname) = test.testname(url)
378 tag = None
379 subdir = testname
mbligh43ac5222007-10-16 15:55:01 +0000380
mblighf1c52842007-10-16 15:21:38 +0000381 if dargs.has_key('tag'):
382 tag = dargs['tag']
383 del dargs['tag']
384 if tag:
385 subdir += '.' + tag
mblighf1c52842007-10-16 15:21:38 +0000386
mblighd660afe2008-06-05 22:17:53 +0000387 outputdir = os.path.join(self.resultdir, subdir)
388 if os.path.exists(outputdir):
389 msg = ("%s already exists, test <%s> may have"
390 " already run with tag <%s>"
391 % (outputdir, testname, tag) )
392 raise error.TestError(msg)
393 os.mkdir(outputdir)
394
mbligh43ac5222007-10-16 15:55:01 +0000395 try:
396 test.runtest(self, url, tag, args, dargs)
397 self.record('GOOD', subdir, testname, 'completed successfully')
mbligh302482e2008-05-01 20:06:16 +0000398 except error.TestNAError, detail:
mblighd660afe2008-06-05 22:17:53 +0000399 self.record('TEST_NA', subdir, testname, str(detail))
mbligh43ac5222007-10-16 15:55:01 +0000400 except Exception, detail:
mbligh302482e2008-05-01 20:06:16 +0000401 info = str(detail) + "\n" + traceback.format_exc()
402 self.record('FAIL', subdir, testname, info)
mblighf1c52842007-10-16 15:21:38 +0000403
404
405 def run_group(self, function, *args, **dargs):
406 """\
407 function:
408 subroutine to run
409 *args:
410 arguments for the function
411 """
412
413 result = None
414 name = function.__name__
415
416 # Allow the tag for the group to be specified.
417 if dargs.has_key('tag'):
418 tag = dargs['tag']
419 del dargs['tag']
420 if tag:
421 name = tag
422
mblighf1c52842007-10-16 15:21:38 +0000423 old_record_prefix = self.record_prefix
424 try:
425 try:
426 self.record('START', None, name)
427 self.record_prefix += '\t'
428 result = function(*args, **dargs)
jadmanski0c109552008-06-02 18:02:29 +0000429 except Exception, e:
mblighf1c52842007-10-16 15:21:38 +0000430 self.record_prefix = old_record_prefix
jadmanski0c109552008-06-02 18:02:29 +0000431 err_msg = str(e) + '\n'
432 err_msg += traceback.format_exc()
433 self.record('END FAIL', None, name, err_msg)
jadmanskif35bbb62008-05-29 21:36:04 +0000434 else:
435 self.record_prefix = old_record_prefix
436 self.record('END GOOD', None, name)
mbligh302482e2008-05-01 20:06:16 +0000437
mblighf1c52842007-10-16 15:21:38 +0000438 # We don't want to raise up an error higher if it's just
439 # a TestError - we want to carry on to other tests. Hence
440 # this outer try/except block.
mbligh6437ff52008-04-17 15:24:38 +0000441 except error.TestError:
mblighf1c52842007-10-16 15:21:38 +0000442 pass
443 except:
mbligh6437ff52008-04-17 15:24:38 +0000444 raise error.TestError(name + ' failed\n' +
mbligh0a1727a2008-04-21 18:10:07 +0000445 traceback.format_exc())
mblighf1c52842007-10-16 15:21:38 +0000446
447 return result
448
449
jadmanskif35bbb62008-05-29 21:36:04 +0000450 def run_reboot(self, reboot_func, get_kernel_func):
451 """\
452 A specialization of run_group meant specifically for handling
453 a reboot. Includes support for capturing the kernel version
454 after the reboot.
455
456 reboot_func: a function that carries out the reboot
457
458 get_kernel_func: a function that returns a string
459 representing the kernel version.
460 """
461
462 old_record_prefix = self.record_prefix
463 try:
464 self.record('START', None, 'reboot')
465 self.record_prefix += '\t'
466 reboot_func()
jadmanski0c109552008-06-02 18:02:29 +0000467 except Exception, e:
jadmanskif35bbb62008-05-29 21:36:04 +0000468 self.record_prefix = old_record_prefix
jadmanski0c109552008-06-02 18:02:29 +0000469 err_msg = str(e) + '\n' + traceback.format_exc()
470 self.record('END FAIL', None, 'reboot', err_msg)
jadmanskif35bbb62008-05-29 21:36:04 +0000471 else:
472 kernel = get_kernel_func()
473 self.record_prefix = old_record_prefix
474 self.record('END GOOD', None, 'reboot',
475 optional_fields={"kernel": kernel})
476
477
478 def record(self, status_code, subdir, operation, status='',
479 optional_fields=None):
mblighf1c52842007-10-16 15:21:38 +0000480 """
481 Record job-level status
482
483 The intent is to make this file both machine parseable and
484 human readable. That involves a little more complexity, but
485 really isn't all that bad ;-)
486
487 Format is <status code>\t<subdir>\t<operation>\t<status>
488
mbligh302482e2008-05-01 20:06:16 +0000489 status code: see common_lib.logging.is_valid_status()
490 for valid status definition
mblighf1c52842007-10-16 15:21:38 +0000491
492 subdir: MUST be a relevant subdirectory in the results,
493 or None, which will be represented as '----'
494
495 operation: description of what you ran (e.g. "dbench", or
496 "mkfs -t foobar /dev/sda9")
497
498 status: error message or "completed sucessfully"
499
500 ------------------------------------------------------------
501
502 Initial tabs indicate indent levels for grouping, and is
503 governed by self.record_prefix
504
505 multiline messages have secondary lines prefaced by a double
506 space (' ')
mblighf4e04152008-02-21 16:05:53 +0000507
508 Executing this method will trigger the logging of all new
509 warnings to date from the various console loggers.
510 """
mblighdab39662008-02-27 16:47:55 +0000511 # poll all our warning loggers for new warnings
512 warnings = self._read_warnings()
513 for timestamp, msg in warnings:
514 self.__record("WARN", None, None, msg, timestamp)
515
516 # write out the actual status log line
jadmanskif35bbb62008-05-29 21:36:04 +0000517 self.__record(status_code, subdir, operation, status,
518 optional_fields=optional_fields)
mblighdab39662008-02-27 16:47:55 +0000519
520
521 def _read_warnings(self):
mblighf4e04152008-02-21 16:05:53 +0000522 warnings = []
523 while True:
524 # pull in a line of output from every logger that has
525 # output ready to be read
526 loggers, _, _ = select.select(self.warning_loggers,
527 [], [], 0)
528 closed_loggers = set()
529 for logger in loggers:
530 line = logger.readline()
531 # record any broken pipes (aka line == empty)
532 if len(line) == 0:
533 closed_loggers.add(logger)
534 continue
535 timestamp, msg = line.split('\t', 1)
536 warnings.append((int(timestamp), msg.strip()))
537
538 # stop listening to loggers that are closed
539 self.warning_loggers -= closed_loggers
540
541 # stop if none of the loggers have any output left
542 if not loggers:
543 break
544
mblighdab39662008-02-27 16:47:55 +0000545 # sort into timestamp order
546 warnings.sort()
547 return warnings
mblighf4e04152008-02-21 16:05:53 +0000548
549
mblighdab39662008-02-27 16:47:55 +0000550 def _render_record(self, status_code, subdir, operation, status='',
jadmanskif35bbb62008-05-29 21:36:04 +0000551 epoch_time=None, record_prefix=None,
552 optional_fields=None):
mblighf4e04152008-02-21 16:05:53 +0000553 """
mblighdab39662008-02-27 16:47:55 +0000554 Internal Function to generate a record to be written into a
555 status log. For use by server_job.* classes only.
mblighf1c52842007-10-16 15:21:38 +0000556 """
mblighf1c52842007-10-16 15:21:38 +0000557 if subdir:
558 if re.match(r'[\n\t]', subdir):
mbligh6437ff52008-04-17 15:24:38 +0000559 raise ValueError(
560 'Invalid character in subdir string')
mblighf1c52842007-10-16 15:21:38 +0000561 substr = subdir
562 else:
563 substr = '----'
mbligh6437ff52008-04-17 15:24:38 +0000564
mbligh302482e2008-05-01 20:06:16 +0000565 if not logging.is_valid_status(status_code):
mbligh6437ff52008-04-17 15:24:38 +0000566 raise ValueError('Invalid status code supplied: %s' %
567 status_code)
mblighe25fd5b2008-01-22 17:23:37 +0000568 if not operation:
569 operation = '----'
mblighf1c52842007-10-16 15:21:38 +0000570 if re.match(r'[\n\t]', operation):
mbligh6437ff52008-04-17 15:24:38 +0000571 raise ValueError(
572 'Invalid character in operation string')
mblighf1c52842007-10-16 15:21:38 +0000573 operation = operation.rstrip()
574 status = status.rstrip()
575 status = re.sub(r"\t", " ", status)
576 # Ensure any continuation lines are marked so we can
577 # detect them in the status file to ensure it is parsable.
578 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
579
jadmanskif35bbb62008-05-29 21:36:04 +0000580 if not optional_fields:
581 optional_fields = {}
582
mbligh30270302007-11-05 20:33:52 +0000583 # Generate timestamps for inclusion in the logs
mblighf4e04152008-02-21 16:05:53 +0000584 if epoch_time is None:
585 epoch_time = int(time.time())
mbligh30270302007-11-05 20:33:52 +0000586 local_time = time.localtime(epoch_time)
jadmanskif35bbb62008-05-29 21:36:04 +0000587 optional_fields["timestamp"] = str(epoch_time)
588 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
589 local_time)
590
591 fields = [status_code, substr, operation]
592 fields += ["%s=%s" % x for x in optional_fields.iteritems()]
593 fields.append(status)
mbligh30270302007-11-05 20:33:52 +0000594
mblighdab39662008-02-27 16:47:55 +0000595 if record_prefix is None:
596 record_prefix = self.record_prefix
597
jadmanskif35bbb62008-05-29 21:36:04 +0000598 msg = '\t'.join(str(x) for x in fields)
599
mblighdab39662008-02-27 16:47:55 +0000600 return record_prefix + msg + '\n'
601
602
603 def _record_prerendered(self, msg):
604 """
605 Record a pre-rendered msg into the status logs. The only
606 change this makes to the message is to add on the local
607 indentation. Should not be called outside of server_job.*
608 classes. Unlike __record, this does not write the message
609 to standard output.
610 """
mbligh6437ff52008-04-17 15:24:38 +0000611 lines = []
mblighdab39662008-02-27 16:47:55 +0000612 status_file = os.path.join(self.resultdir, 'status.log')
613 status_log = open(status_file, 'a')
614 for line in msg.splitlines():
615 line = self.record_prefix + line + '\n'
mbligh6437ff52008-04-17 15:24:38 +0000616 lines.append(line)
mblighdab39662008-02-27 16:47:55 +0000617 status_log.write(line)
618 status_log.close()
mbligh6437ff52008-04-17 15:24:38 +0000619 self.__parse_status(lines)
mblighdab39662008-02-27 16:47:55 +0000620
621
622 def __record(self, status_code, subdir, operation, status='',
jadmanskif35bbb62008-05-29 21:36:04 +0000623 epoch_time=None, optional_fields=None):
mblighdab39662008-02-27 16:47:55 +0000624 """
625 Actual function for recording a single line into the status
626 logs. Should never be called directly, only by job.record as
627 this would bypass the console monitor logging.
628 """
629
630 msg = self._render_record(status_code, subdir, operation,
jadmanskif35bbb62008-05-29 21:36:04 +0000631 status, epoch_time,
632 optional_fields=optional_fields)
mblighdab39662008-02-27 16:47:55 +0000633
mblighf1c52842007-10-16 15:21:38 +0000634
mbligh31a49de2007-11-05 18:41:19 +0000635 status_file = os.path.join(self.resultdir, 'status.log')
mblighdab39662008-02-27 16:47:55 +0000636 sys.stdout.write(msg)
637 open(status_file, "a").write(msg)
mblighf1c52842007-10-16 15:21:38 +0000638 if subdir:
mblighd56eb592008-01-22 16:36:34 +0000639 test_dir = os.path.join(self.resultdir, subdir)
mblighd56eb592008-01-22 16:36:34 +0000640 status_file = os.path.join(test_dir, 'status')
mblighdab39662008-02-27 16:47:55 +0000641 open(status_file, "a").write(msg)
jadmanski96bb7642008-05-15 17:58:16 +0000642 self.__parse_status(msg.splitlines())
mblighb03ba642008-03-13 17:37:17 +0000643
644
mbligh6437ff52008-04-17 15:24:38 +0000645 def __parse_status(self, new_lines):
646 if not self.using_parser:
647 return
648 new_tests = self.parser.process_lines(new_lines)
649 for test in new_tests:
jadmanski28816c22008-05-21 18:18:05 +0000650 self.__insert_test(test)
651
652
653 def __insert_test(self, test):
654 """ An internal method to insert a new test result into the
655 database. This method will not raise an exception, even if an
656 error occurs during the insert, to avoid failing a test
657 simply because of unexpected database issues."""
658 try:
mbligh6437ff52008-04-17 15:24:38 +0000659 self.results_db.insert_test(self.job_model, test)
jadmanski28816c22008-05-21 18:18:05 +0000660 except Exception:
661 msg = ("WARNING: An unexpected error occured while "
662 "inserting test results into the database. "
663 "Ignoring error.\n" + traceback.format_exc())
664 print >> sys.stderr, msg
mblighdab39662008-02-27 16:47:55 +0000665
666
667# a file-like object for catching stderr from an autotest client and
668# extracting status logs from it
669class client_logger(object):
670 """Partial file object to write to both stdout and
671 the status log file. We only implement those methods
672 utils.run() actually calls.
673 """
674 parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
675 extract_indent = re.compile(r"^(\t*).*$")
676
677 def __init__(self, job):
678 self.job = job
679 self.leftover = ""
680 self.last_line = ""
681 self.logs = {}
682
683
684 def _process_log_dict(self, log_dict):
685 log_list = log_dict.pop("logs", [])
686 for key in sorted(log_dict.iterkeys()):
687 log_list += self._process_log_dict(log_dict.pop(key))
688 return log_list
689
690
691 def _process_logs(self):
692 """Go through the accumulated logs in self.log and print them
693 out to stdout and the status log. Note that this processes
694 logs in an ordering where:
695
696 1) logs to different tags are never interleaved
697 2) logs to x.y come before logs to x.y.z for all z
698 3) logs to x.y come before x.z whenever y < z
699
700 Note that this will in general not be the same as the
701 chronological ordering of the logs. However, if a chronological
702 ordering is desired that one can be reconstructed from the
703 status log by looking at timestamp lines."""
704 log_list = self._process_log_dict(self.logs)
705 for line in log_list:
706 self.job._record_prerendered(line + '\n')
707 if log_list:
708 self.last_line = log_list[-1]
709
710
711 def _process_quoted_line(self, tag, line):
712 """Process a line quoted with an AUTOTEST_STATUS flag. If the
713 tag is blank then we want to push out all the data we've been
714 building up in self.logs, and then the newest line. If the
715 tag is not blank, then push the line into the logs for handling
716 later."""
717 print line
718 if tag == "":
719 self._process_logs()
720 self.job._record_prerendered(line + '\n')
721 self.last_line = line
722 else:
723 tag_parts = [int(x) for x in tag.split(".")]
724 log_dict = self.logs
725 for part in tag_parts:
726 log_dict = log_dict.setdefault(part, {})
727 log_list = log_dict.setdefault("logs", [])
728 log_list.append(line)
729
730
731 def _process_line(self, line):
732 """Write out a line of data to the appropriate stream. Status
733 lines sent by autotest will be prepended with
734 "AUTOTEST_STATUS", and all other lines are ssh error
735 messages."""
736 match = self.parser.search(line)
737 if match:
738 tag, line = match.groups()
739 self._process_quoted_line(tag, line)
740 else:
mblighfe749d22008-03-07 18:14:46 +0000741 print line
mblighdab39662008-02-27 16:47:55 +0000742
743
744 def _format_warnings(self, last_line, warnings):
mbligh71d340d2008-03-05 15:51:16 +0000745 # use the indentation of whatever the last log line was
mblighdab39662008-02-27 16:47:55 +0000746 indent = self.extract_indent.match(last_line).group(1)
mbligh71d340d2008-03-05 15:51:16 +0000747 # if the last line starts a new group, add an extra indent
748 if last_line.lstrip('\t').startswith("START\t"):
749 indent += '\t'
mblighdab39662008-02-27 16:47:55 +0000750 return [self.job._render_record("WARN", None, None, msg,
751 timestamp, indent).rstrip('\n')
752 for timestamp, msg in warnings]
753
754
755 def _process_warnings(self, last_line, log_dict, warnings):
756 if log_dict.keys() in ([], ["logs"]):
757 # there are no sub-jobs, just append the warnings here
758 warnings = self._format_warnings(last_line, warnings)
759 log_list = log_dict.setdefault("logs", [])
760 log_list += warnings
761 for warning in warnings:
762 sys.stdout.write(warning + '\n')
763 else:
764 # there are sub-jobs, so put the warnings in there
765 log_list = log_dict.get("logs", [])
766 if log_list:
767 last_line = log_list[-1]
768 for key in sorted(log_dict.iterkeys()):
769 if key != "logs":
770 self._process_warnings(last_line,
771 log_dict[key],
772 warnings)
773
774
775 def write(self, data):
776 # first check for any new console warnings
777 warnings = self.job._read_warnings()
778 self._process_warnings(self.last_line, self.logs, warnings)
779 # now process the newest data written out
780 data = self.leftover + data
781 lines = data.split("\n")
782 # process every line but the last one
783 for line in lines[:-1]:
784 self._process_line(line)
785 # save the last line for later processing
786 # since we may not have the whole line yet
787 self.leftover = lines[-1]
788
789
790 def flush(self):
791 sys.stdout.flush()
mblighdab39662008-02-27 16:47:55 +0000792
793
794 def close(self):
795 if self.leftover:
796 self._process_line(self.leftover)
797 self._process_logs()
798 self.flush()
mblighcaa62c22008-04-07 21:51:17 +0000799
800# site_server_job.py may be non-existant or empty, make sure that an
801# appropriate site_server_job class is created nevertheless
802try:
mblighccb9e182008-04-17 15:42:10 +0000803 from autotest_lib.server.site_server_job import site_server_job
mblighcaa62c22008-04-07 21:51:17 +0000804except ImportError:
805 class site_server_job(base_server_job):
806 pass
807
808class server_job(site_server_job):
809 pass