blob: 0b5ee3051b419cd04343819392be526f07a6dc36 [file] [log] [blame]
"""The main job wrapper
This is the core infrastructure.
"""
__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006"""
# standard stuff
import os, sys, re, pickle, shutil, time, traceback, types, copy
# autotest stuff
from autotest_utils import *
from parallel import *
from common.error import *
from common import barrier
import kernel, xen, test, profilers, filesystem, fd_stack, boottool
import harness, config
import sysinfo
import cpuset
JOB_PREAMBLE = """
from common.error import *
from autotest_utils import *
"""
class StepError(AutotestError):
pass
class base_job:
"""The actual job against which we do everything.
Properties:
autodir
The top level autotest directory (/usr/local/autotest).
Comes from os.environ['AUTODIR'].
bindir
<autodir>/bin/
libdir
<autodir>/lib/
testdir
<autodir>/tests/
site_testdir
<autodir>/site_tests/
profdir
<autodir>/profilers/
tmpdir
<autodir>/tmp/
resultdir
<autodir>/results/<jobtag>
stdout
fd_stack object for stdout
stderr
fd_stack object for stderr
profilers
the profilers object for this job
harness
the server harness object for this job
config
the job configuration for this job
"""
DEFAULT_LOG_FILENAME = "status"
def __init__(self, control, jobtag, cont, harness_type=None,
use_external_logging = False):
"""
control
The control file (pathname of)
jobtag
The job tag string (eg "default")
cont
If this is the continuation of this job
harness_type
An alternative server harness
"""
self.autodir = os.environ['AUTODIR']
self.bindir = os.path.join(self.autodir, 'bin')
self.libdir = os.path.join(self.autodir, 'lib')
self.testdir = os.path.join(self.autodir, 'tests')
self.site_testdir = os.path.join(self.autodir, 'site_tests')
self.profdir = os.path.join(self.autodir, 'profilers')
self.tmpdir = os.path.join(self.autodir, 'tmp')
self.resultdir = os.path.join(self.autodir, 'results', jobtag)
self.sysinfodir = os.path.join(self.resultdir, 'sysinfo')
self.control = os.path.abspath(control)
self.state_file = self.control + '.state'
self.state = None
if not cont:
if os.path.exists(self.tmpdir):
system('umount -f %s > /dev/null 2> /dev/null'%\
self.tmpdir, ignorestatus=True)
system('rm -rf ' + self.tmpdir)
os.mkdir(self.tmpdir)
results = os.path.join(self.autodir, 'results')
if not os.path.exists(results):
os.mkdir(results)
download = os.path.join(self.testdir, 'download')
if os.path.exists(download):
system('rm -rf ' + download)
os.mkdir(download)
if os.path.exists(self.resultdir):
system('rm -rf ' + self.resultdir)
os.mkdir(self.resultdir)
os.mkdir(self.sysinfodir)
os.mkdir(os.path.join(self.resultdir, 'debug'))
os.mkdir(os.path.join(self.resultdir, 'analysis'))
shutil.copyfile(self.control,
os.path.join(self.resultdir, 'control'))
self.control = control
self.jobtag = jobtag
self.log_filename = self.DEFAULT_LOG_FILENAME
self.container = None
self.stdout = fd_stack.fd_stack(1, sys.stdout)
self.stderr = fd_stack.fd_stack(2, sys.stderr)
self.group_level = 0
self.config = config.config(self)
self.harness = harness.select(harness_type, self)
self.profilers = profilers.profilers(self)
try:
tool = self.config_get('boottool.executable')
self.bootloader = boottool.boottool(tool)
except:
pass
sysinfo.log_per_reboot_data(self.sysinfodir)
if not cont:
self.record('START', None, None)
self.group_level = 1
self.harness.run_start()
if use_external_logging:
self.enable_external_logging()
def relative_path(self, path):
"""\
Return a patch relative to the job results directory
"""
head = len(self.resultdir) + 1 # remove the / inbetween
return path[head:]
def control_get(self):
return self.control
def control_set(self, control):
self.control = os.path.abspath(control)
def harness_select(self, which):
self.harness = harness.select(which, self)
def config_set(self, name, value):
self.config.set(name, value)
def config_get(self, name):
return self.config.get(name)
def setup_dirs(self, results_dir, tmp_dir):
if not tmp_dir:
tmp_dir = os.path.join(self.tmpdir, 'build')
if not os.path.exists(tmp_dir):
os.mkdir(tmp_dir)
if not os.path.isdir(tmp_dir):
e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
raise ValueError(e_msg)
# We label the first build "build" and then subsequent ones
# as "build.2", "build.3", etc. Whilst this is a little bit
# inconsistent, 99.9% of jobs will only have one build
# (that's not done as kernbench, sparse, or buildtest),
# so it works out much cleaner. One of life's comprimises.
if not results_dir:
results_dir = os.path.join(self.resultdir, 'build')
i = 2
while os.path.exists(results_dir):
results_dir = os.path.join(self.resultdir, 'build.%d' % i)
i += 1
if not os.path.exists(results_dir):
os.mkdir(results_dir)
return (results_dir, tmp_dir)
def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
kjob = None ):
"""Summon a xen object"""
(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
build_dir = 'xen'
return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob)
def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
"""Summon a kernel object"""
(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
build_dir = 'linux'
return kernel.auto_kernel(self, base_tree, results_dir,
tmp_dir, build_dir, leave)
def barrier(self, *args, **kwds):
"""Create a barrier object"""
return barrier.barrier(*args, **kwds)
def setup_dep(self, deps):
"""Set up the dependencies for this test.
deps is a list of libraries required for this test.
"""
for dep in deps:
try:
os.chdir(os.path.join(self.autodir, 'deps', dep))
system('./' + dep + '.py')
except:
error = "setting up dependency " + dep + "\n"
raise UnhandledError(error)
def __runtest(self, url, tag, args, dargs):
try:
l = lambda : test.runtest(self, url, tag, args, dargs)
pid = fork_start(self.resultdir, l)
fork_waitfor(self.resultdir, pid)
except AutotestError:
raise
except:
raise UnhandledError('running test ' + \
self.__class__.__name__ + "\n")
def run_test(self, url, *args, **dargs):
"""Summon a test object and run it.
tag
tag to add to testname
url
url of the test to run
"""
if not url:
raise TypeError("Test name is invalid. Switched arguments?")
(group, testname) = test.testname(url)
tag = dargs.pop('tag', None)
container = dargs.pop('container', None)
subdir = testname
if tag:
subdir += '.' + tag
if container:
cname = container.get('name', None)
if not cname: # get old name
cname = container.get('container_name', None)
mbytes = container.get('mbytes', None)
if not mbytes: # get old name
mbytes = container.get('mem', None)
cpus = container.get('cpus', None)
if not cpus: # get old name
cpus = container.get('cpu', None)
root = container.get('root', '')
self.new_container(mbytes=mbytes, cpus=cpus,
root=root, name=cname)
# We are running in a container now...
def group_func():
try:
self.__runtest(url, tag, args, dargs)
except Exception, detail:
self.record('FAIL', subdir, testname,
str(detail))
raise
else:
self.record('GOOD', subdir, testname,
'completed successfully')
result, exc_info = self.__rungroup(subdir, group_func)
if container:
self.release_container()
if exc_info and isinstance(exc_info[1], TestError):
return False
elif exc_info:
raise exc_info[0], exc_info[1], exc_info[2]
else:
return True
def __rungroup(self, name, function, *args, **dargs):
"""\
name:
name of the group
function:
subroutine to run
*args:
arguments for the function
Returns a 2-tuple (result, exc_info) where result
is the return value of function, and exc_info is
the sys.exc_info() of the exception thrown by the
function (which may be None).
"""
result, exc_info = None, None
try:
self.record('START', None, name)
self.group_level += 1
result = function(*args, **dargs)
self.group_level -= 1
self.record('END GOOD', None, name)
except Exception, e:
exc_info = sys.exc_info()
self.group_level -= 1
err_msg = str(e) + '\n' + format_error()
self.record('END FAIL', None, name, err_msg)
return result, exc_info
def run_group(self, function, *args, **dargs):
"""\
function:
subroutine to run
*args:
arguments for the function
"""
# Allow the tag for the group to be specified
name = function.__name__
tag = dargs.pop('tag', None)
if tag:
name = tag
result, exc_info = self.__rungroup(name, function,
*args, **dargs)
# if there was a non-TestError exception, raise it
if exc_info and not isinstance(exc_info[1], TestError):
err = ''.join(traceback.format_exception(*exc_info))
raise TestError(name + ' failed\n' + err)
# pass back the actual return value from the function
return result
def new_container(self, mbytes=None, cpus=None, root='', name=None):
if not grep('cpuset', '/proc/filesystems'):
print "Containers not enabled by latest reboot"
return # containers weren't enabled in this kernel boot
pid = os.getpid()
if not name:
name = 'test%d' % pid # make arbitrary unique name
self.container = cpuset.cpuset(name, job_size=mbytes,
job_pid=pid, cpus=cpus, root=root)
# This job's python shell is now running in the new container
# and all forked test processes will inherit that container
def release_container(self):
if self.container:
self.container.release()
self.container = None
def cpu_count(self):
if self.container:
return len(self.container.cpus)
return count_cpus() # use total system count
# Check the passed kernel identifier against the command line
# and the running kernel, abort the job on missmatch.
def kernel_check_ident(self, expected_when, expected_id, subdir,
type = 'src'):
print (("POST BOOT: checking booted kernel " +
"mark=%d identity='%s' type='%s'") %
(expected_when, expected_id, type))
running_id = running_os_ident()
cmdline = read_one_line("/proc/cmdline")
find_sum = re.compile(r'.*IDENT=(\d+)')
m = find_sum.match(cmdline)
cmdline_when = -1
if m:
cmdline_when = int(m.groups()[0])
# We have all the facts, see if they indicate we
# booted the requested kernel or not.
bad = False
if (type == 'src' and expected_id != running_id or
type == 'rpm' and not running_id.startswith(expected_id + '::')):
print "check_kernel_ident: kernel identifier mismatch"
bad = True
if expected_when != cmdline_when:
print "check_kernel_ident: kernel command line mismatch"
bad = True
if bad:
print " Expected Ident: " + expected_id
print " Running Ident: " + running_id
print " Expected Mark: %d" % (expected_when)
print "Command Line Mark: %d" % (cmdline_when)
print " Command Line: " + cmdline
raise JobError("boot failure", "reboot.verify")
self.record('GOOD', subdir, 'reboot.verify', expected_id)
def filesystem(self, device, mountpoint = None, loop_size = 0):
if not mountpoint:
mountpoint = self.tmpdir
return filesystem.filesystem(self, device, mountpoint,loop_size)
def enable_external_logging(self):
pass
def disable_external_logging(self):
pass
def reboot_setup(self):
pass
def reboot(self, tag='autotest'):
self.reboot_setup()
self.record('GOOD', None, 'reboot.start')
self.harness.run_reboot()
default = self.config_get('boot.set_default')
if default:
self.bootloader.set_default(tag)
else:
self.bootloader.boot_once(tag)
system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
self.quit()
def noop(self, text):
print "job: noop: " + text
def parallel(self, *tasklist):
"""Run tasks in parallel"""
pids = []
old_log_filename = self.log_filename
for i, task in enumerate(tasklist):
self.log_filename = old_log_filename + (".%d" % i)
task_func = lambda: task[0](*task[1:])
pids.append(fork_start(self.resultdir, task_func))
old_log_path = os.path.join(self.resultdir, old_log_filename)
old_log = open(old_log_path, "a")
exceptions = []
for i, pid in enumerate(pids):
# wait for the task to finish
try:
fork_waitfor(self.resultdir, pid)
except Exception, e:
exceptions.append(e)
# copy the logs from the subtask into the main log
new_log_path = old_log_path + (".%d" % i)
if os.path.exists(new_log_path):
new_log = open(new_log_path)
old_log.write(new_log.read())
new_log.close()
old_log.flush()
os.remove(new_log_path)
old_log.close()
self.log_filename = old_log_filename
# handle any exceptions raised by the parallel tasks
if exceptions:
msg = "%d task(s) failed" % len(exceptions)
raise JobError(msg, str(exceptions), exceptions)
def quit(self):
# XXX: should have a better name.
self.harness.run_pause()
raise JobContinue("more to come")
def complete(self, status):
"""Clean up and exit"""
# We are about to exit 'complete' so clean up the control file.
try:
os.unlink(self.state_file)
except:
pass
self.harness.run_complete()
self.disable_external_logging()
sys.exit(status)
def set_state(self, var, val):
# Deep copies make sure that the state can't be altered
# without it being re-written. Perf wise, deep copies
# are overshadowed by pickling/loading.
self.state[var] = copy.deepcopy(val)
pickle.dump(self.state, open(self.state_file, 'w'))
def __load_state(self):
assert(self.state == None)
try:
self.state = pickle.load(open(self.state_file, 'r'))
return True
except Exception:
print "Initializing the state engine."
self.state = {}
self.set_state('steps', []) # writes pickle file
return False
def get_state(self, var, default=None):
if var in self.state or default == None:
val = self.state[var]
else:
val = default
return copy.deepcopy(val)
def __create_step_tuple(self, fn, args, dargs):
# Legacy code passes in an array where the first arg is
# the function or its name.
if isinstance(fn, list):
assert(len(args) == 0)
assert(len(dargs) == 0)
args = fn[1:]
fn = fn[0]
# Pickling actual functions is harry, thus we have to call
# them by name. Unfortunately, this means only functions
# defined globally can be used as a next step.
if isinstance(fn, types.FunctionType):
fn = fn.__name__
if not isinstance(fn, types.StringTypes):
raise StepError("Next steps must be functions or "
"strings containing the function name")
return (fn, args, dargs)
def next_step(self, fn, *args, **dargs):
"""Define the next step"""
steps = self.get_state('steps')
steps.append(self.__create_step_tuple(fn, args, dargs))
self.set_state('steps', steps)
def next_step_prepend(self, fn, *args, **dargs):
"""Insert a new step, executing first"""
steps = self.get_state('steps')
steps.insert(0, self.__create_step_tuple(fn, args, dargs))
self.set_state('steps', steps)
def step_engine(self):
"""the stepping engine -- if the control file defines
step_init we will be using this engine to drive multiple runs.
"""
"""Do the next step"""
# Set up the environment and then interpret the control file.
# Some control files will have code outside of functions,
# which means we need to have our state engine initialized
# before reading in the file.
state_existed = self.__load_state()
lcl = {'job': self}
exec(JOB_PREAMBLE, lcl, lcl)
execfile(self.control, lcl, lcl)
# If we loaded in a mid-job state file, then we presumably
# know what steps we have yet to run.
if not state_existed:
if lcl.has_key('step_init'):
self.next_step([lcl['step_init']])
# Iterate through the steps. If we reboot, we'll simply
# continue iterating on the next step.
while len(self.get_state('steps')) > 0:
steps = self.get_state('steps')
(fn, args, dargs) = steps.pop(0)
self.set_state('steps', steps)
lcl['__args'] = args
lcl['__dargs'] = dargs
exec(fn + "(*__args, **__dargs)", lcl, lcl)
def record(self, status_code, subdir, operation, status = ''):
"""
Record job-level status
The intent is to make this file both machine parseable and
human readable. That involves a little more complexity, but
really isn't all that bad ;-)
Format is <status code>\t<subdir>\t<operation>\t<status>
status code: (GOOD|WARN|FAIL|ABORT)
or START
or END (GOOD|WARN|FAIL|ABORT)
subdir: MUST be a relevant subdirectory in the results,
or None, which will be represented as '----'
operation: description of what you ran (e.g. "dbench", or
"mkfs -t foobar /dev/sda9")
status: error message or "completed sucessfully"
------------------------------------------------------------
Initial tabs indicate indent levels for grouping, and is
governed by self.group_level
multiline messages have secondary lines prefaced by a double
space (' ')
"""
if subdir:
if re.match(r'[\n\t]', subdir):
raise ValueError("Invalid character in subdir string")
substr = subdir
else:
substr = '----'
if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \
status_code):
raise ValueError("Invalid status code supplied: %s" % status_code)
if not operation:
operation = '----'
if re.match(r'[\n\t]', operation):
raise ValueError("Invalid character in operation string")
operation = operation.rstrip()
status = status.rstrip()
status = re.sub(r"\t", " ", status)
# Ensure any continuation lines are marked so we can
# detect them in the status file to ensure it is parsable.
status = re.sub(r"\n", "\n" + "\t" * self.group_level + " ", status)
# Generate timestamps for inclusion in the logs
epoch_time = int(time.time()) # seconds since epoch, in UTC
local_time = time.localtime(epoch_time)
epoch_time_str = "timestamp=%d" % (epoch_time,)
local_time_str = time.strftime("localtime=%b %d %H:%M:%S",
local_time)
msg = '\t'.join(str(x) for x in (status_code, substr, operation,
epoch_time_str, local_time_str,
status))
msg = '\t' * self.group_level + msg
msg_tag = ""
if "." in self.log_filename:
msg_tag = self.log_filename.split(".", 1)[1]
self.harness.test_status_detail(status_code, substr, operation,
status, msg_tag)
self.harness.test_status(msg, msg_tag)
# log to stdout (if enabled)
#if self.log_filename == self.DEFAULT_LOG_FILENAME:
print msg
# log to the "root" status log
status_file = os.path.join(self.resultdir, self.log_filename)
open(status_file, "a").write(msg + "\n")
# log to the subdir status log (if subdir is set)
if subdir:
dir = os.path.join(self.resultdir, subdir)
if not os.path.exists(dir):
os.mkdir(dir)
status_file = os.path.join(dir,
self.DEFAULT_LOG_FILENAME)
open(status_file, "a").write(msg + "\n")
def runjob(control, cont = False, tag = "default", harness_type = '',
use_external_logging = False):
"""The main interface to this module
control
The control file to use for this job.
cont
Whether this is the continuation of a previously started job
"""
control = os.path.abspath(control)
state = control + '.state'
# instantiate the job object ready for the control file.
myjob = None
try:
# Check that the control file is valid
if not os.path.exists(control):
raise JobError(control + ": control file not found")
# When continuing, the job is complete when there is no
# state file, ensure we don't try and continue.
if cont and not os.path.exists(state):
raise JobComplete("all done")
if cont == False and os.path.exists(state):
os.unlink(state)
myjob = job(control, tag, cont, harness_type,
use_external_logging)
# Load in the users control file, may do any one of:
# 1) execute in toto
# 2) define steps, and select the first via next_step()
myjob.step_engine()
except JobContinue:
sys.exit(5)
except JobComplete:
sys.exit(1)
except JobError, instance:
print "JOB ERROR: " + instance.args[0]
if myjob:
command = None
if len(instance.args) > 1:
command = instance.args[1]
myjob.group_level = 0
myjob.record('ABORT', None, command, instance.args[0])
myjob.record('END ABORT', None, None)
myjob.complete(1)
else:
sys.exit(1)
except Exception, e:
msg = str(e) + '\n' + format_error()
print "JOB ERROR: " + msg
if myjob:
myjob.group_level = 0
myjob.record('ABORT', None, None, msg)
myjob.record('END ABORT', None, None)
myjob.complete(1)
else:
sys.exit(1)
# If we get here, then we assume the job is complete and good.
myjob.group_level = 0
myjob.record('END GOOD', None, None)
myjob.complete(0)
# site_job.py may be non-existant or empty, make sure that an appropriate
# site_job class is created nevertheless
try:
from site_job import site_job
except ImportError:
class site_job(base_job):
pass
class job(site_job):
pass