blob: ca6aaf5dc2047a06f336a1b76f9bb82694c96c11 [file] [log] [blame]
#
# Copyright 2008 Google Inc. All Rights Reserved.
"""
The job module contains the objects and methods used to
manage jobs in Autotest.
The valid actions are:
list: lists job(s)
create: create a job
abort: abort job(s)
stat: detailed listing of job(s)
The common options are:
See topic_common.py for a High Level Design and Algorithm.
"""
import getpass, os, pwd, re, socket, sys
from autotest_lib.cli import topic_common, action_common
class job(topic_common.atest):
"""Job class
atest job [create|list|stat|abort] <options>"""
usage_action = '[create|list|stat|abort]'
topic = msg_topic = 'job'
msg_items = '<job_ids>'
def _convert_status(self, results):
for result in results:
total = sum(result['status_counts'].values())
status = ['%s=%s(%.1f%%)' % (key, val, 100.0*float(val)/total)
for key, val in result['status_counts'].iteritems()]
status.sort()
result['status_counts'] = ', '.join(status)
class job_help(job):
"""Just here to get the atest logic working.
Usage is set by its parent"""
pass
class job_list_stat(action_common.atest_list, job):
def __init__(self):
super(job_list_stat, self).__init__()
self.topic_parse_info = topic_common.item_parse_info(
attribute_name='jobs',
use_leftover=True)
def __split_jobs_between_ids_names(self):
job_ids = []
job_names = []
# Sort between job IDs and names
for job_id in self.jobs:
if job_id.isdigit():
job_ids.append(job_id)
else:
job_names.append(job_id)
return (job_ids, job_names)
def execute_on_ids_and_names(self, op, filters={},
check_results={'id__in': 'id',
'name__in': 'id'},
tag_id='id__in', tag_name='name__in'):
if not self.jobs:
# Want everything
return super(job_list_stat, self).execute(op=op, filters=filters)
all_jobs = []
(job_ids, job_names) = self.__split_jobs_between_ids_names()
for items, tag in [(job_ids, tag_id),
(job_names, tag_name)]:
if items:
new_filters = filters.copy()
new_filters[tag] = items
jobs = super(job_list_stat,
self).execute(op=op,
filters=new_filters,
check_results=check_results)
all_jobs.extend(jobs)
return all_jobs
class job_list(job_list_stat):
"""atest job list [<jobs>] [--all] [--running] [--user <username>]"""
def __init__(self):
super(job_list, self).__init__()
self.parser.add_option('-a', '--all', help='List jobs for all '
'users.', action='store_true', default=False)
self.parser.add_option('-r', '--running', help='List only running '
'jobs', action='store_true')
self.parser.add_option('-u', '--user', help='List jobs for given '
'user', type='string')
def parse(self):
options, leftover = super(job_list, self).parse()
self.all = options.all
self.data['running'] = options.running
if options.user:
if options.all:
self.invalid_syntax('Only specify --all or --user, not both.')
else:
self.data['owner'] = options.user
elif not options.all and not self.jobs:
self.data['owner'] = getpass.getuser()
return options, leftover
def execute(self):
return self.execute_on_ids_and_names(op='get_jobs_summary',
filters=self.data)
def output(self, results):
keys = ['id', 'owner', 'name', 'status_counts']
if self.verbose:
keys.extend(['priority', 'control_type', 'created_on'])
self._convert_status(results)
super(job_list, self).output(results, keys)
class job_stat(job_list_stat):
"""atest job stat <job>"""
usage_action = 'stat'
def __init__(self):
super(job_stat, self).__init__()
self.parser.add_option('-f', '--control-file',
help='Display the control file',
action='store_true', default=False)
def parse(self):
options, leftover = super(job_stat, self).parse(req_items='jobs')
if not self.jobs:
self.invalid_syntax('Must specify at least one job.')
self.show_control_file = options.control_file
return options, leftover
def _merge_results(self, summary, qes):
hosts_status = {}
for qe in qes:
if qe['host']:
job_id = qe['job']['id']
hostname = qe['host']['hostname']
hosts_status.setdefault(job_id,
{}).setdefault(qe['status'],
[]).append(hostname)
for job in summary:
job_id = job['id']
if hosts_status.has_key(job_id):
this_job = hosts_status[job_id]
host_per_status = ['%s=%s' %(status, ','.join(host))
for status, host in this_job.iteritems()]
job['hosts_status'] = ', '.join(host_per_status)
else:
job['hosts_status'] = ''
return summary
def execute(self):
summary = self.execute_on_ids_and_names(op='get_jobs_summary')
# Get the real hostnames
qes = self.execute_on_ids_and_names(op='get_host_queue_entries',
check_results={},
tag_id='job__in',
tag_name='job__name__in')
self._convert_status(summary)
return self._merge_results(summary, qes)
def output(self, results):
if not self.verbose:
keys = ['id', 'name', 'priority', 'status_counts', 'hosts_status']
else:
keys = ['id', 'name', 'priority', 'status_counts', 'hosts_status',
'owner', 'control_type', 'synch_count', 'created_on',
'run_verify', 'reboot_before', 'reboot_after']
if self.show_control_file:
keys.append('control_file')
super(job_stat, self).output(results, keys)
class job_create(action_common.atest_create, job):
"""atest job create [--priority <Low|Medium|High|Urgent>]
[--synch_count] [--control-file </path/to/cfile>]
[--on-server] [--test <test1,test2>] [--kernel <http://kernel>]
[--mlist </path/to/machinelist>] [--machine <host1 host2 host3>]
[--labels <list of labels of machines to run on>]
[--reboot_before <option>] [--reboot_after <option>]
[--noverify] [--timeout <timeout>] [--one-time-hosts <hosts>]
[--email <email>] [--dependencies <labels this job is dependent on>]
[--atomic_group <atomic group name>]
job_name
Creating a job is rather different from the other create operations,
so it only uses the __init__() and output() from its superclass.
"""
op_action = 'create'
msg_items = 'job_name'
def __init__(self):
super(job_create, self).__init__()
self.hosts = []
self.ctrl_file_data = {}
self.data_item_key = 'name'
self.parser.add_option('-p', '--priority', help='Job priority (low, '
'medium, high, urgent), default=medium',
type='choice', choices=('low', 'medium', 'high',
'urgent'), default='medium')
self.parser.add_option('-y', '--synch_count', type=int,
help='Number of machines to use per autoserv '
'execution')
self.parser.add_option('-f', '--control-file',
help='use this control file', metavar='FILE')
self.parser.add_option('-s', '--server',
help='This is server-side job',
action='store_true', default=False)
self.parser.add_option('-t', '--test',
help='List of tests to run')
self.parser.add_option('-k', '--kernel', help='Install kernel from this'
' URL before beginning job')
self.parser.add_option('-d', '--dependencies', help='Comma separated '
'list of labels this job is dependent on.',
default='')
self.parser.add_option('-b', '--labels', help='Comma separated list of '
'labels to get machine list from.', default='')
self.parser.add_option('-G', '--atomic_group', help='Name of an Atomic '
'Group to schedule this job on.',
default='')
self.parser.add_option('-m', '--machine', help='List of machines to '
'run on')
self.parser.add_option('-M', '--mlist',
help='File listing machines to use',
type='string', metavar='MACHINE_FLIST')
self.parser.add_option('--one-time-hosts',
help='List of one time hosts')
self.parser.add_option('-e', '--email', help='A comma seperated list '
'of email addresses to notify of job completion',
default='')
self.parser.add_option('-B', '--reboot_before',
help='Whether or not to reboot the machine '
'before the job (never/if dirty/always)',
type='choice',
choices=('never', 'if dirty', 'always'))
self.parser.add_option('-a', '--reboot_after',
help='Whether or not to reboot the machine '
'after the job (never/if all tests passed/'
'always)',
type='choice',
choices=('never', 'if all tests passed',
'always'))
self.parser.add_option('-l', '--clone', help='Clone an existing job. '
'This will discard all other options except '
'--reuse-hosts.', default=False,
metavar='JOB_ID')
self.parser.add_option('-r', '--reuse-hosts', help='Use the exact same '
'hosts as cloned job. Only for use with '
'--clone.', action='store_true', default=False)
self.parser.add_option('-n', '--noverify',
help='Do not run verify for job',
default=False, action='store_true')
self.parser.add_option('-o', '--timeout', help='Job timeout in hours.',
metavar='TIMEOUT')
def parse(self):
host_info = topic_common.item_parse_info(attribute_name='hosts',
inline_option='machine',
filename_option='mlist')
job_info = topic_common.item_parse_info(attribute_name='jobname',
use_leftover=True)
oth_info = topic_common.item_parse_info(attribute_name='one_time_hosts',
inline_option='one_time_hosts')
options, leftover = super(job_create,
self).parse([host_info, job_info, oth_info],
req_items='jobname')
self.data = {}
if len(self.jobname) > 1:
self.invalid_syntax('Too many arguments specified, only expected '
'to receive job name: %s' % self.jobname)
self.jobname = self.jobname[0]
if options.reuse_hosts and not options.clone:
self.invalid_syntax('--reuse-hosts only to be used with --clone.')
# If cloning skip parse, parsing is done in execute
self.clone_id = options.clone
if options.clone:
self.op_action = 'clone'
self.msg_items = 'jobid'
self.reuse_hosts = options.reuse_hosts
return options, leftover
if (len(self.hosts) == 0 and not self.one_time_hosts
and not options.labels and not options.atomic_group):
self.invalid_syntax('Must specify at least one machine '
'or an atomic group '
'(-m, -M, -b, -G or --one-time-hosts).')
if not options.control_file and not options.test:
self.invalid_syntax('Must specify either --test or --control-file'
' to create a job.')
if options.control_file and options.test:
self.invalid_syntax('Can only specify one of --control-file or '
'--test, not both.')
if options.kernel:
self.ctrl_file_data['kernel'] = options.kernel
self.ctrl_file_data['do_push_packages'] = True
if options.control_file:
try:
control_file_f = open(options.control_file)
try:
control_file_data = control_file_f.read()
finally:
control_file_f.close()
except IOError:
self.generic_error('Unable to read from specified '
'control-file: %s' % options.control_file)
if options.kernel:
if options.server:
self.invalid_syntax(
'A control file and a kernel may only be specified'
' together on client side jobs.')
# execute() will pass this to the AFE server to wrap this
# control file up to include the kernel installation steps.
self.ctrl_file_data['client_control_file'] = control_file_data
else:
self.data['control_file'] = control_file_data
if options.test:
if options.server:
self.invalid_syntax('If you specify tests, then the '
'client/server setting is implicit and '
'cannot be overriden.')
tests = [t.strip() for t in options.test.split(',') if t.strip()]
self.ctrl_file_data['tests'] = tests
if options.priority:
self.data['priority'] = options.priority.capitalize()
if options.reboot_before:
self.data['reboot_before'] = options.reboot_before.capitalize()
if options.reboot_after:
self.data['reboot_after'] = options.reboot_after.capitalize()
if options.noverify:
self.data['run_verify'] = False
if options.timeout:
self.data['timeout'] = options.timeout
if self.one_time_hosts:
self.data['one_time_hosts'] = self.one_time_hosts
if options.labels:
labels = options.labels.split(',')
labels = [label.strip() for label in labels if label.strip()]
label_hosts = self.execute_rpc(op='get_hosts',
multiple_labels=labels)
for host in label_hosts:
self.hosts.append(host['hostname'])
self.data['name'] = self.jobname
(self.data['hosts'],
self.data['meta_hosts']) = self.parse_hosts(self.hosts)
if options.atomic_group:
self.data['atomic_group_name'] = options.atomic_group
deps = options.dependencies.split(',')
deps = [dep.strip() for dep in deps if dep.strip()]
self.data['dependencies'] = deps
self.data['email_list'] = options.email
if options.synch_count:
self.data['synch_count'] = options.synch_count
if options.server:
self.data['control_type'] = 'Server'
else:
self.data['control_type'] = 'Client'
return options, leftover
def execute(self):
if self.ctrl_file_data:
uploading_kernel = 'kernel' in self.ctrl_file_data
if uploading_kernel:
default_timeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(topic_common.UPLOAD_SOCKET_TIMEOUT)
print 'Uploading Kernel: this may take a while...',
sys.stdout.flush()
try:
cf_info = self.execute_rpc(op='generate_control_file',
item=self.jobname,
**self.ctrl_file_data)
finally:
if uploading_kernel:
socket.setdefaulttimeout(default_timeout)
if uploading_kernel:
print 'Done'
self.data['control_file'] = cf_info['control_file']
if 'synch_count' not in self.data:
self.data['synch_count'] = cf_info['synch_count']
if cf_info['is_server']:
self.data['control_type'] = 'Server'
else:
self.data['control_type'] = 'Client'
# Get the union of the 2 sets of dependencies
deps = set(self.data['dependencies'])
deps = sorted(deps.union(cf_info['dependencies']))
self.data['dependencies'] = list(deps)
if 'synch_count' not in self.data:
self.data['synch_count'] = 1
if self.clone_id:
clone_info = self.execute_rpc(op='get_info_for_clone',
id=self.clone_id,
preserve_metahosts=self.reuse_hosts)
self.data = clone_info['job']
# Remove fields from clone data that cannot be reused
unused_fields = ('name', 'created_on', 'id', 'owner')
for field in unused_fields:
del self.data[field]
# Keyword args cannot be unicode strings
for key, val in self.data.iteritems():
del self.data[key]
self.data[str(key)] = val
# Convert host list from clone info that can be used for job_create
host_list = []
if clone_info['meta_host_counts']:
# Creates a dictionary of meta_hosts, e.g.
# {u'label1': 3, u'label2': 2, u'label3': 5}
meta_hosts = clone_info['meta_host_counts']
# Create a list of formatted metahosts, e.g.
# [u'3*label1', u'2*label2', u'5*label3']
meta_host_list = ['%s*%s' % (str(val), key) for key,val in
meta_hosts.items()]
host_list.extend(meta_host_list)
if clone_info['hosts']:
# Creates a list of hosts, e.g. [u'host1', u'host2']
hosts = [host['hostname'] for host in clone_info['hosts']]
host_list.extend(hosts)
(self.data['hosts'],
self.data['meta_hosts']) = self.parse_hosts(host_list)
self.data['name'] = self.jobname
job_id = self.execute_rpc(op='create_job', **self.data)
return ['%s (id %s)' % (self.jobname, job_id)]
def get_items(self):
return [self.jobname]
class job_abort(job, action_common.atest_delete):
"""atest job abort <job(s)>"""
usage_action = op_action = 'abort'
msg_done = 'Aborted'
def parse(self):
job_info = topic_common.item_parse_info(attribute_name='jobids',
use_leftover=True)
options, leftover = super(job_abort, self).parse([job_info],
req_items='jobids')
def execute(self):
data = {'job__id__in': self.jobids}
self.execute_rpc(op='abort_host_queue_entries', **data)
print 'Aborting jobs: %s' % ', '.join(self.jobids)
def get_items(self):
return self.jobids