blob: cba325ab14d0b2614fbc977312139a963bc0e2a0 [file] [log] [blame]
mblighf1c52842007-10-16 15:21:38 +00001"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
9__author__ = """
10Martin J. Bligh <mbligh@google.com>
11Andy Whitcroft <apw@shadowen.org>
12"""
13
mblighdbdac6c2008-03-05 15:49:58 +000014import os, sys, re, time, select, subprocess, traceback
mblighf1c52842007-10-16 15:21:38 +000015
mbligh6437ff52008-04-17 15:24:38 +000016from autotest_lib.client.bin import fd_stack
17from autotest_lib.client.common_lib import error
18from autotest_lib.server import test, subcommand
19from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
20from autotest_lib.server.utils import *
21
mbligh3f4bced2007-11-05 17:55:53 +000022
mblighed5a4102007-11-20 00:46:41 +000023# load up a control segment
24# these are all stored in <server_dir>/control_segments
25def load_control_segment(name):
26 server_dir = os.path.dirname(os.path.abspath(__file__))
mbligh7f86e0b2007-11-24 19:45:07 +000027 script_file = os.path.join(server_dir, "control_segments", name)
mblighed5a4102007-11-20 00:46:41 +000028 if os.path.exists(script_file):
29 return file(script_file).read()
30 else:
31 return ""
32
33
mblighf1c52842007-10-16 15:21:38 +000034preamble = """\
35import os, sys
36
mblighccb9e182008-04-17 15:42:10 +000037from autotest_lib.server import hosts, autotest, kvm, git, standalone_profiler
38from autotest_lib.server import source_kernel, rpm_kernel, deb_kernel
39from autotest_lib.server import git_kernel
40from autotest_lib.server.subcommand import *
41from autotest_lib.server.utils import run, get_tmp_dir, sh_escape
42
43from autotest_lib.client.common_lib.error import *
44from autotest_lib.client.common_lib import barrier
mblighf1c52842007-10-16 15:21:38 +000045
mbligh119c12a2007-11-12 22:13:44 +000046autotest.Autotest.job = job
mbligh31a49de2007-11-05 18:41:19 +000047hosts.SSHHost.job = job
mblighe1417fa2007-12-10 16:55:13 +000048barrier = barrier.barrier
mbligh1fb77cc2008-02-27 16:41:20 +000049
50if len(machines) > 1:
51 open('.machines', 'w').write('\\n'.join(machines) + '\\n')
mblighf1c52842007-10-16 15:21:38 +000052"""
53
54client_wrapper = """
55at = autotest.Autotest()
56
57def run_client(machine):
58 host = hosts.SSHHost(machine)
59 at.run(control, host=host)
60
mbligh6437ff52008-04-17 15:24:38 +000061job.parallel_simple(run_client, machines)
mblighf1c52842007-10-16 15:21:38 +000062"""
63
mbligh303ccac2007-11-05 18:07:28 +000064crashdumps = """
65def crashdumps(machine):
66 host = hosts.SSHHost(machine, initialize=False)
67 host.get_crashdumps(test_start_time)
68
mbligh6437ff52008-04-17 15:24:38 +000069job.parallel_simple(crashdumps, machines, log=False)
mbligh303ccac2007-11-05 18:07:28 +000070"""
71
mbligh98ff1462007-12-19 16:27:55 +000072reboot_segment="""\
73def reboot(machine):
mbligh17f0c662007-11-05 18:28:19 +000074 host = hosts.SSHHost(machine, initialize=False)
75 host.reboot()
mblighf1c52842007-10-16 15:21:38 +000076
mbligh6437ff52008-04-17 15:24:38 +000077job.parallel_simple(reboot, machines, log=False)
mblighf1c52842007-10-16 15:21:38 +000078"""
79
mblighf36243d2007-10-30 15:36:16 +000080install="""\
81def install(machine):
mbligh17f0c662007-11-05 18:28:19 +000082 host = hosts.SSHHost(machine, initialize=False)
83 host.machine_install()
mblighf36243d2007-10-30 15:36:16 +000084
mbligh6437ff52008-04-17 15:24:38 +000085job.parallel_simple(install, machines, log=False)
mblighf36243d2007-10-30 15:36:16 +000086"""
87
mbligh7f86e0b2007-11-24 19:45:07 +000088# load up the verifier control segment, with an optional site-specific hook
mblighed5a4102007-11-20 00:46:41 +000089verify = load_control_segment("site_verify")
90verify += load_control_segment("verify")
mbligh1d42d4e2007-11-05 22:42:00 +000091
mbligh7f86e0b2007-11-24 19:45:07 +000092# load up the repair control segment, with an optional site-specific hook
93repair = load_control_segment("site_repair")
94repair += load_control_segment("repair")
95
mbligh1d42d4e2007-11-05 22:42:00 +000096
mbligh970b94e2008-01-24 16:29:34 +000097# load up site-specific code for generating site-specific job data
98try:
99 import site_job
100 get_site_job_data = site_job.get_site_job_data
101 del site_job
102except ImportError:
103 # by default provide a stub that generates no site data
104 def get_site_job_data(job):
105 return {}
106
107
mblighcaa62c22008-04-07 21:51:17 +0000108class base_server_job:
mblighf1c52842007-10-16 15:21:38 +0000109 """The actual job against which we do everything.
110
111 Properties:
112 autodir
113 The top level autotest directory (/usr/local/autotest).
114 serverdir
115 <autodir>/server/
116 clientdir
117 <autodir>/client/
118 conmuxdir
119 <autodir>/conmux/
120 testdir
121 <autodir>/server/tests/
122 control
123 the control file for this job
124 """
125
mblighe8b37a92007-12-19 15:54:11 +0000126 def __init__(self, control, args, resultdir, label, user, machines,
mbligh6437ff52008-04-17 15:24:38 +0000127 client=False, parse_job=""):
mblighf1c52842007-10-16 15:21:38 +0000128 """
129 control
130 The control file (pathname of)
131 args
132 args to pass to the control file
133 resultdir
134 where to throw the results
mbligh18420c22007-10-16 22:27:14 +0000135 label
136 label for the job
mblighf1c52842007-10-16 15:21:38 +0000137 user
138 Username for the job (email address)
139 client
140 True if a client-side control file
141 """
mblighf5427bb2008-04-09 15:55:57 +0000142 path = os.path.dirname(__file__)
mblighf1c52842007-10-16 15:21:38 +0000143 self.autodir = os.path.abspath(os.path.join(path, '..'))
144 self.serverdir = os.path.join(self.autodir, 'server')
mbligh05269362007-10-16 16:58:11 +0000145 self.testdir = os.path.join(self.serverdir, 'tests')
146 self.tmpdir = os.path.join(self.serverdir, 'tmp')
mblighf1c52842007-10-16 15:21:38 +0000147 self.conmuxdir = os.path.join(self.autodir, 'conmux')
148 self.clientdir = os.path.join(self.autodir, 'client')
mblighe25fd5b2008-01-22 17:23:37 +0000149 if control:
150 self.control = open(control, 'r').read()
151 self.control = re.sub('\r', '', self.control)
152 else:
153 self.control = None
mblighf1c52842007-10-16 15:21:38 +0000154 self.resultdir = resultdir
155 if not os.path.exists(resultdir):
156 os.mkdir(resultdir)
mbligh3ccb8592007-11-05 18:13:40 +0000157 self.debugdir = os.path.join(resultdir, 'debug')
158 if not os.path.exists(self.debugdir):
159 os.mkdir(self.debugdir)
mbligh3dcf2c92007-10-16 22:24:00 +0000160 self.status = os.path.join(resultdir, 'status')
mbligh18420c22007-10-16 22:27:14 +0000161 self.label = label
mblighf1c52842007-10-16 15:21:38 +0000162 self.user = user
163 self.args = args
mblighe8b37a92007-12-19 15:54:11 +0000164 self.machines = machines
mblighf1c52842007-10-16 15:21:38 +0000165 self.client = client
166 self.record_prefix = ''
mblighf4e04152008-02-21 16:05:53 +0000167 self.warning_loggers = set()
mblighf1c52842007-10-16 15:21:38 +0000168
mbligh3f4bced2007-11-05 17:55:53 +0000169 self.stdout = fd_stack.fd_stack(1, sys.stdout)
170 self.stderr = fd_stack.fd_stack(2, sys.stderr)
171
mbligh3dcf2c92007-10-16 22:24:00 +0000172 if os.path.exists(self.status):
173 os.unlink(self.status)
mblighe8b37a92007-12-19 15:54:11 +0000174 job_data = { 'label' : label, 'user' : user,
175 'hostname' : ','.join(machines) }
mbligh970b94e2008-01-24 16:29:34 +0000176 job_data.update(get_site_job_data(self))
mblighf1c52842007-10-16 15:21:38 +0000177 write_keyval(self.resultdir, job_data)
178
mbligh6437ff52008-04-17 15:24:38 +0000179 self.parse_job = parse_job
180 if self.parse_job and len(machines) == 1:
181 self.using_parser = True
182 self.init_parser(resultdir)
183 else:
184 self.using_parser = False
185
186
187 def init_parser(self, resultdir):
188 """Start the continuous parsing of resultdir. This sets up
189 the database connection and inserts the basic job object into
190 the database if necessary."""
191 # redirect parser debugging to .parse.log
192 parse_log = os.path.join(resultdir, '.parse.log')
193 parse_log = open(parse_log, 'w', 0)
194 tko_utils.redirect_parser_debugging(parse_log)
195 # create a job model object and set up the db
196 self.results_db = tko_db.db(autocommit=True)
197 self.parser = status_lib.parser(0)
198 self.job_model = self.parser.make_job(resultdir)
199 self.parser.start(self.job_model)
200 # check if a job already exists in the db and insert it if
201 # it does not
202 job_idx = self.results_db.find_job(self.parse_job)
203 if job_idx is None:
204 self.results_db.insert_job(self.parse_job,
205 self.job_model)
206 else:
207 machine_idx = self.results_db.lookup_machine(
208 self.job_model.machine)
209 self.job_model.index = job_idx
210 self.job_model.machine_idx = machine_idx
211
212
mblighfe0af112008-04-17 15:27:47 +0000213 def cleanup_parser(self):
mbligh6437ff52008-04-17 15:24:38 +0000214 """This should be called after the server job is finished
215 to carry out any remaining cleanup (e.g. flushing any
216 remaining test results to the results db)"""
217 if not self.using_parser:
218 return
219 final_tests = self.parser.end()
220 for test in final_tests:
221 self.results_db.insert_test(self.job_model, test)
mblighfe0af112008-04-17 15:27:47 +0000222 self.using_parser = False
mbligh6437ff52008-04-17 15:24:38 +0000223
mblighf1c52842007-10-16 15:21:38 +0000224
mblighe25fd5b2008-01-22 17:23:37 +0000225 def verify(self):
226 if not self.machines:
mbligh6437ff52008-04-17 15:24:38 +0000227 raise error.AutoservError(
228 'No machines specified to verify')
mblighe25fd5b2008-01-22 17:23:37 +0000229 try:
230 namespace = {'machines' : self.machines, 'job' : self}
231 exec(preamble + verify, namespace, namespace)
232 except Exception, e:
mbligh0a1727a2008-04-21 18:10:07 +0000233 msg = 'Verify failed\n' + str(e) + '\n' + traceback.format_exc()
mblighe25fd5b2008-01-22 17:23:37 +0000234 self.record('ABORT', None, None, msg)
235 raise
236
237
238 def repair(self):
239 if not self.machines:
mbligh6437ff52008-04-17 15:24:38 +0000240 raise error.AutoservError(
241 'No machines specified to repair')
mblighe25fd5b2008-01-22 17:23:37 +0000242 namespace = {'machines' : self.machines, 'job' : self}
mbligh16c722d2008-03-05 00:58:44 +0000243 # no matter what happens during repair, go on to try to reverify
244 try:
245 exec(preamble + repair, namespace, namespace)
246 except Exception, exc:
247 print 'Exception occured during repair'
248 traceback.print_exc()
mbligh8141f862008-01-25 17:20:40 +0000249 self.verify()
mblighe25fd5b2008-01-22 17:23:37 +0000250
251
mblighcaa62c22008-04-07 21:51:17 +0000252 def enable_external_logging(self):
253 """Start or restart external logging mechanism.
254 """
255 pass
256
257
258 def disable_external_logging(self):
259 """ Pause or stop external logging mechanism.
260 """
261 pass
262
263
264 def use_external_logging(self):
265 """Return True if external logging should be used.
266 """
267 return False
268
269
mbligh6437ff52008-04-17 15:24:38 +0000270 def parallel_simple(self, function, machines, log=True, timeout=None):
271 """Run 'function' using parallel_simple, with an extra
272 wrapper to handle the necessary setup for continuous parsing,
273 if possible. If continuous parsing is already properly
274 initialized then this should just work."""
275 is_forking = not (len(machines) == 1 and
276 self.machines == machines)
277 if self.parse_job and is_forking:
278 def wrapper(machine):
279 self.parse_job += "/" + machine
280 self.using_parser = True
281 self.machines = [machine]
282 self.resultdir = os.path.join(self.resultdir,
283 machine)
284 self.init_parser(self.resultdir)
285 result = function(machine)
mblighfe0af112008-04-17 15:27:47 +0000286 self.cleanup_parser()
mbligh6437ff52008-04-17 15:24:38 +0000287 return result
288 else:
289 wrapper = function
290 subcommand.parallel_simple(wrapper, machines, log, timeout)
291
292
mblighe8b37a92007-12-19 15:54:11 +0000293 def run(self, reboot = False, install_before = False,
mblighddd54332008-03-07 18:14:06 +0000294 install_after = False, collect_crashdumps = True,
295 namespace = {}):
mbligh60dbd502007-10-26 14:59:31 +0000296 # use a copy so changes don't affect the original dictionary
297 namespace = namespace.copy()
mblighe8b37a92007-12-19 15:54:11 +0000298 machines = self.machines
mbligh60dbd502007-10-26 14:59:31 +0000299
mblighfaf0cd42007-11-19 16:00:24 +0000300 self.aborted = False
mblighf1c52842007-10-16 15:21:38 +0000301 namespace['machines'] = machines
302 namespace['args'] = self.args
303 namespace['job'] = self
mbligh6e294382007-11-05 18:11:29 +0000304 test_start_time = int(time.time())
mblighf1c52842007-10-16 15:21:38 +0000305
mbligh87c5d882007-10-29 17:07:24 +0000306 os.chdir(self.resultdir)
mblighcaa62c22008-04-07 21:51:17 +0000307
308 self.enable_external_logging()
mbligh87c5d882007-10-29 17:07:24 +0000309 status_log = os.path.join(self.resultdir, 'status.log')
mblighf1c52842007-10-16 15:21:38 +0000310 try:
mblighf36243d2007-10-30 15:36:16 +0000311 if install_before and machines:
312 exec(preamble + install, namespace, namespace)
mblighf1c52842007-10-16 15:21:38 +0000313 if self.client:
314 namespace['control'] = self.control
315 open('control', 'w').write(self.control)
316 open('control.srv', 'w').write(client_wrapper)
317 server_control = client_wrapper
318 else:
319 open('control.srv', 'w').write(self.control)
320 server_control = self.control
mblighf1c52842007-10-16 15:21:38 +0000321 exec(preamble + server_control, namespace, namespace)
322
323 finally:
mblighddd54332008-03-07 18:14:06 +0000324 if machines and collect_crashdumps:
mbligh6e294382007-11-05 18:11:29 +0000325 namespace['test_start_time'] = test_start_time
mbligh98ff1462007-12-19 16:27:55 +0000326 exec(preamble + crashdumps,
327 namespace, namespace)
mblighcaa62c22008-04-07 21:51:17 +0000328 self.disable_external_logging()
mblighf1c52842007-10-16 15:21:38 +0000329 if reboot and machines:
mbligh98ff1462007-12-19 16:27:55 +0000330 exec(preamble + reboot_segment,
331 namespace, namespace)
mblighf36243d2007-10-30 15:36:16 +0000332 if install_after and machines:
333 exec(preamble + install, namespace, namespace)
mblighf1c52842007-10-16 15:21:38 +0000334
335
336 def run_test(self, url, *args, **dargs):
337 """Summon a test object and run it.
338
339 tag
340 tag to add to testname
341 url
342 url of the test to run
343 """
344
mblighf1c52842007-10-16 15:21:38 +0000345 (group, testname) = test.testname(url)
346 tag = None
347 subdir = testname
mbligh43ac5222007-10-16 15:55:01 +0000348
mblighf1c52842007-10-16 15:21:38 +0000349 if dargs.has_key('tag'):
350 tag = dargs['tag']
351 del dargs['tag']
352 if tag:
353 subdir += '.' + tag
mblighf1c52842007-10-16 15:21:38 +0000354
mbligh43ac5222007-10-16 15:55:01 +0000355 try:
356 test.runtest(self, url, tag, args, dargs)
357 self.record('GOOD', subdir, testname, 'completed successfully')
358 except Exception, detail:
mbligh0a1727a2008-04-21 18:10:07 +0000359 self.record('FAIL', subdir, testname,
360 str(detail) + "\n" + traceback.format_exc())
mblighf1c52842007-10-16 15:21:38 +0000361
362
363 def run_group(self, function, *args, **dargs):
364 """\
365 function:
366 subroutine to run
367 *args:
368 arguments for the function
369 """
370
371 result = None
372 name = function.__name__
373
374 # Allow the tag for the group to be specified.
375 if dargs.has_key('tag'):
376 tag = dargs['tag']
377 del dargs['tag']
378 if tag:
379 name = tag
380
381 # if tag:
382 # name += '.' + tag
383 old_record_prefix = self.record_prefix
384 try:
385 try:
386 self.record('START', None, name)
387 self.record_prefix += '\t'
388 result = function(*args, **dargs)
389 self.record_prefix = old_record_prefix
390 self.record('END GOOD', None, name)
391 except:
392 self.record_prefix = old_record_prefix
mbligh0a1727a2008-04-21 18:10:07 +0000393 self.record('END FAIL', None, name, traceback.format_exc())
mblighf1c52842007-10-16 15:21:38 +0000394 # We don't want to raise up an error higher if it's just
395 # a TestError - we want to carry on to other tests. Hence
396 # this outer try/except block.
mbligh6437ff52008-04-17 15:24:38 +0000397 except error.TestError:
mblighf1c52842007-10-16 15:21:38 +0000398 pass
399 except:
mbligh6437ff52008-04-17 15:24:38 +0000400 raise error.TestError(name + ' failed\n' +
mbligh0a1727a2008-04-21 18:10:07 +0000401 traceback.format_exc())
mblighf1c52842007-10-16 15:21:38 +0000402
403 return result
404
405
mblighf4e04152008-02-21 16:05:53 +0000406 def record(self, status_code, subdir, operation, status=''):
mblighf1c52842007-10-16 15:21:38 +0000407 """
408 Record job-level status
409
410 The intent is to make this file both machine parseable and
411 human readable. That involves a little more complexity, but
412 really isn't all that bad ;-)
413
414 Format is <status code>\t<subdir>\t<operation>\t<status>
415
416 status code: (GOOD|WARN|FAIL|ABORT)
417 or START
418 or END (GOOD|WARN|FAIL|ABORT)
419
420 subdir: MUST be a relevant subdirectory in the results,
421 or None, which will be represented as '----'
422
423 operation: description of what you ran (e.g. "dbench", or
424 "mkfs -t foobar /dev/sda9")
425
426 status: error message or "completed sucessfully"
427
428 ------------------------------------------------------------
429
430 Initial tabs indicate indent levels for grouping, and is
431 governed by self.record_prefix
432
433 multiline messages have secondary lines prefaced by a double
434 space (' ')
mblighf4e04152008-02-21 16:05:53 +0000435
436 Executing this method will trigger the logging of all new
437 warnings to date from the various console loggers.
438 """
mblighdab39662008-02-27 16:47:55 +0000439 # poll all our warning loggers for new warnings
440 warnings = self._read_warnings()
441 for timestamp, msg in warnings:
442 self.__record("WARN", None, None, msg, timestamp)
443
444 # write out the actual status log line
445 self.__record(status_code, subdir, operation, status)
446
447
448 def _read_warnings(self):
mblighf4e04152008-02-21 16:05:53 +0000449 warnings = []
450 while True:
451 # pull in a line of output from every logger that has
452 # output ready to be read
453 loggers, _, _ = select.select(self.warning_loggers,
454 [], [], 0)
455 closed_loggers = set()
456 for logger in loggers:
457 line = logger.readline()
458 # record any broken pipes (aka line == empty)
459 if len(line) == 0:
460 closed_loggers.add(logger)
461 continue
462 timestamp, msg = line.split('\t', 1)
463 warnings.append((int(timestamp), msg.strip()))
464
465 # stop listening to loggers that are closed
466 self.warning_loggers -= closed_loggers
467
468 # stop if none of the loggers have any output left
469 if not loggers:
470 break
471
mblighdab39662008-02-27 16:47:55 +0000472 # sort into timestamp order
473 warnings.sort()
474 return warnings
mblighf4e04152008-02-21 16:05:53 +0000475
476
mblighdab39662008-02-27 16:47:55 +0000477 def _render_record(self, status_code, subdir, operation, status='',
478 epoch_time=None, record_prefix=None):
mblighf4e04152008-02-21 16:05:53 +0000479 """
mblighdab39662008-02-27 16:47:55 +0000480 Internal Function to generate a record to be written into a
481 status log. For use by server_job.* classes only.
mblighf1c52842007-10-16 15:21:38 +0000482 """
mblighf1c52842007-10-16 15:21:38 +0000483 if subdir:
484 if re.match(r'[\n\t]', subdir):
mbligh6437ff52008-04-17 15:24:38 +0000485 raise ValueError(
486 'Invalid character in subdir string')
mblighf1c52842007-10-16 15:21:38 +0000487 substr = subdir
488 else:
489 substr = '----'
mbligh6437ff52008-04-17 15:24:38 +0000490
mblighf1c52842007-10-16 15:21:38 +0000491 if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \
492 status_code):
mbligh6437ff52008-04-17 15:24:38 +0000493 raise ValueError('Invalid status code supplied: %s' %
494 status_code)
mblighe25fd5b2008-01-22 17:23:37 +0000495 if not operation:
496 operation = '----'
mblighf1c52842007-10-16 15:21:38 +0000497 if re.match(r'[\n\t]', operation):
mbligh6437ff52008-04-17 15:24:38 +0000498 raise ValueError(
499 'Invalid character in operation string')
mblighf1c52842007-10-16 15:21:38 +0000500 operation = operation.rstrip()
501 status = status.rstrip()
502 status = re.sub(r"\t", " ", status)
503 # Ensure any continuation lines are marked so we can
504 # detect them in the status file to ensure it is parsable.
505 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status)
506
mbligh30270302007-11-05 20:33:52 +0000507 # Generate timestamps for inclusion in the logs
mblighf4e04152008-02-21 16:05:53 +0000508 if epoch_time is None:
509 epoch_time = int(time.time())
mbligh30270302007-11-05 20:33:52 +0000510 local_time = time.localtime(epoch_time)
511 epoch_time_str = "timestamp=%d" % (epoch_time,)
512 local_time_str = time.strftime("localtime=%b %d %H:%M:%S",
513 local_time)
514
mblighdab39662008-02-27 16:47:55 +0000515 if record_prefix is None:
516 record_prefix = self.record_prefix
517
mbligh30270302007-11-05 20:33:52 +0000518 msg = '\t'.join(str(x) for x in (status_code, substr, operation,
519 epoch_time_str, local_time_str,
520 status))
mblighdab39662008-02-27 16:47:55 +0000521 return record_prefix + msg + '\n'
522
523
524 def _record_prerendered(self, msg):
525 """
526 Record a pre-rendered msg into the status logs. The only
527 change this makes to the message is to add on the local
528 indentation. Should not be called outside of server_job.*
529 classes. Unlike __record, this does not write the message
530 to standard output.
531 """
mbligh6437ff52008-04-17 15:24:38 +0000532 lines = []
mblighdab39662008-02-27 16:47:55 +0000533 status_file = os.path.join(self.resultdir, 'status.log')
534 status_log = open(status_file, 'a')
535 for line in msg.splitlines():
536 line = self.record_prefix + line + '\n'
mbligh6437ff52008-04-17 15:24:38 +0000537 lines.append(line)
mblighdab39662008-02-27 16:47:55 +0000538 status_log.write(line)
539 status_log.close()
mbligh6437ff52008-04-17 15:24:38 +0000540 self.__parse_status(lines)
mblighdab39662008-02-27 16:47:55 +0000541
542
543 def __record(self, status_code, subdir, operation, status='',
544 epoch_time=None):
545 """
546 Actual function for recording a single line into the status
547 logs. Should never be called directly, only by job.record as
548 this would bypass the console monitor logging.
549 """
550
551 msg = self._render_record(status_code, subdir, operation,
552 status, epoch_time)
553
mblighf1c52842007-10-16 15:21:38 +0000554
mbligh31a49de2007-11-05 18:41:19 +0000555 status_file = os.path.join(self.resultdir, 'status.log')
mblighdab39662008-02-27 16:47:55 +0000556 sys.stdout.write(msg)
557 open(status_file, "a").write(msg)
mblighf1c52842007-10-16 15:21:38 +0000558 if subdir:
mblighd56eb592008-01-22 16:36:34 +0000559 test_dir = os.path.join(self.resultdir, subdir)
560 if not os.path.exists(test_dir):
561 os.mkdir(test_dir)
562 status_file = os.path.join(test_dir, 'status')
mblighdab39662008-02-27 16:47:55 +0000563 open(status_file, "a").write(msg)
mbligh6437ff52008-04-17 15:24:38 +0000564 self.__parse_status([msg])
mblighb03ba642008-03-13 17:37:17 +0000565
566
mbligh6437ff52008-04-17 15:24:38 +0000567 def __parse_status(self, new_lines):
568 if not self.using_parser:
569 return
570 new_tests = self.parser.process_lines(new_lines)
571 for test in new_tests:
572 self.results_db.insert_test(self.job_model, test)
mblighdab39662008-02-27 16:47:55 +0000573
574
575# a file-like object for catching stderr from an autotest client and
576# extracting status logs from it
577class client_logger(object):
578 """Partial file object to write to both stdout and
579 the status log file. We only implement those methods
580 utils.run() actually calls.
581 """
582 parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
583 extract_indent = re.compile(r"^(\t*).*$")
584
585 def __init__(self, job):
586 self.job = job
587 self.leftover = ""
588 self.last_line = ""
589 self.logs = {}
590
591
592 def _process_log_dict(self, log_dict):
593 log_list = log_dict.pop("logs", [])
594 for key in sorted(log_dict.iterkeys()):
595 log_list += self._process_log_dict(log_dict.pop(key))
596 return log_list
597
598
599 def _process_logs(self):
600 """Go through the accumulated logs in self.log and print them
601 out to stdout and the status log. Note that this processes
602 logs in an ordering where:
603
604 1) logs to different tags are never interleaved
605 2) logs to x.y come before logs to x.y.z for all z
606 3) logs to x.y come before x.z whenever y < z
607
608 Note that this will in general not be the same as the
609 chronological ordering of the logs. However, if a chronological
610 ordering is desired that one can be reconstructed from the
611 status log by looking at timestamp lines."""
612 log_list = self._process_log_dict(self.logs)
613 for line in log_list:
614 self.job._record_prerendered(line + '\n')
615 if log_list:
616 self.last_line = log_list[-1]
617
618
619 def _process_quoted_line(self, tag, line):
620 """Process a line quoted with an AUTOTEST_STATUS flag. If the
621 tag is blank then we want to push out all the data we've been
622 building up in self.logs, and then the newest line. If the
623 tag is not blank, then push the line into the logs for handling
624 later."""
625 print line
626 if tag == "":
627 self._process_logs()
628 self.job._record_prerendered(line + '\n')
629 self.last_line = line
630 else:
631 tag_parts = [int(x) for x in tag.split(".")]
632 log_dict = self.logs
633 for part in tag_parts:
634 log_dict = log_dict.setdefault(part, {})
635 log_list = log_dict.setdefault("logs", [])
636 log_list.append(line)
637
638
639 def _process_line(self, line):
640 """Write out a line of data to the appropriate stream. Status
641 lines sent by autotest will be prepended with
642 "AUTOTEST_STATUS", and all other lines are ssh error
643 messages."""
644 match = self.parser.search(line)
645 if match:
646 tag, line = match.groups()
647 self._process_quoted_line(tag, line)
648 else:
mblighfe749d22008-03-07 18:14:46 +0000649 print line
mblighdab39662008-02-27 16:47:55 +0000650
651
652 def _format_warnings(self, last_line, warnings):
mbligh71d340d2008-03-05 15:51:16 +0000653 # use the indentation of whatever the last log line was
mblighdab39662008-02-27 16:47:55 +0000654 indent = self.extract_indent.match(last_line).group(1)
mbligh71d340d2008-03-05 15:51:16 +0000655 # if the last line starts a new group, add an extra indent
656 if last_line.lstrip('\t').startswith("START\t"):
657 indent += '\t'
mblighdab39662008-02-27 16:47:55 +0000658 return [self.job._render_record("WARN", None, None, msg,
659 timestamp, indent).rstrip('\n')
660 for timestamp, msg in warnings]
661
662
663 def _process_warnings(self, last_line, log_dict, warnings):
664 if log_dict.keys() in ([], ["logs"]):
665 # there are no sub-jobs, just append the warnings here
666 warnings = self._format_warnings(last_line, warnings)
667 log_list = log_dict.setdefault("logs", [])
668 log_list += warnings
669 for warning in warnings:
670 sys.stdout.write(warning + '\n')
671 else:
672 # there are sub-jobs, so put the warnings in there
673 log_list = log_dict.get("logs", [])
674 if log_list:
675 last_line = log_list[-1]
676 for key in sorted(log_dict.iterkeys()):
677 if key != "logs":
678 self._process_warnings(last_line,
679 log_dict[key],
680 warnings)
681
682
683 def write(self, data):
684 # first check for any new console warnings
685 warnings = self.job._read_warnings()
686 self._process_warnings(self.last_line, self.logs, warnings)
687 # now process the newest data written out
688 data = self.leftover + data
689 lines = data.split("\n")
690 # process every line but the last one
691 for line in lines[:-1]:
692 self._process_line(line)
693 # save the last line for later processing
694 # since we may not have the whole line yet
695 self.leftover = lines[-1]
696
697
698 def flush(self):
699 sys.stdout.flush()
mblighdab39662008-02-27 16:47:55 +0000700
701
702 def close(self):
703 if self.leftover:
704 self._process_line(self.leftover)
705 self._process_logs()
706 self.flush()
mblighcaa62c22008-04-07 21:51:17 +0000707
708# site_server_job.py may be non-existant or empty, make sure that an
709# appropriate site_server_job class is created nevertheless
710try:
mblighccb9e182008-04-17 15:42:10 +0000711 from autotest_lib.server.site_server_job import site_server_job
mblighcaa62c22008-04-07 21:51:17 +0000712except ImportError:
713 class site_server_job(base_server_job):
714 pass
715
716class server_job(site_server_job):
717 pass