Add support for atomic groups to the frontend RPC interface.
Signed-off-by: Gregory Smith <gps@google.com>
git-svn-id: http://test.kernel.org/svn/autotest/trunk@2967 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/frontend/afe/rpc_interface.py b/frontend/afe/rpc_interface.py
index e394eb1..65f9989 100644
--- a/frontend/afe/rpc_interface.py
+++ b/frontend/afe/rpc_interface.py
@@ -37,9 +37,9 @@
# labels
def add_label(name, kernel_config=None, platform=None, only_if_needed=None):
- return models.Label.add_object(name=name, kernel_config=kernel_config,
- platform=platform,
- only_if_needed=only_if_needed).id
+ return models.Label.add_object(
+ name=name, kernel_config=kernel_config, platform=platform,
+ only_if_needed=only_if_needed).id
def modify_label(id, **data):
@@ -61,8 +61,43 @@
def get_labels(**filter_data):
+ """\
+ @returns A sequence of nested dictionaries of label information.
+ """
+ return rpc_utils.prepare_rows_as_nested_dicts(
+ models.Label.query_objects(filter_data),
+ ('atomic_group',))
+
+
+# atomic groups
+
+def add_atomic_group(name, max_number_of_machines, description=None):
+ return models.AtomicGroup.add_object(
+ name=name, max_number_of_machines=max_number_of_machines,
+ description=description).id
+
+
+def modify_atomic_group(id, **data):
+ models.AtomicGroup.smart_get(id).update_object(data)
+
+
+def delete_atomic_group(id):
+ models.AtomicGroup.smart_get(id).delete()
+
+
+def atomic_group_add_labels(id, labels):
+ label_objs = models.Label.smart_get_bulk(labels)
+ models.AtomicGroup.smart_get(id).label_set.add(*label_objs)
+
+
+def atomic_group_remove_labels(id, labels):
+ label_objs = models.Label.smart_get_bulk(labels)
+ models.AtomicGroup.smart_get(id).label_set.remove(*label_objs)
+
+
+def get_atomic_groups(**filter_data):
return rpc_utils.prepare_for_serialization(
- models.Label.list_objects(filter_data))
+ models.AtomicGroup.list_objects(filter_data))
# hosts
@@ -290,26 +325,33 @@
def create_job(name, priority, control_file, control_type, timeout=None,
- synch_count=None, hosts=None, meta_hosts=None,
- run_verify=True, one_time_hosts=None, email_list='',
- dependencies=[], reboot_before=None, reboot_after=None):
+ synch_count=None, hosts=(), meta_hosts=(),
+ run_verify=True, one_time_hosts=(), email_list='',
+ dependencies=(), reboot_before=None, reboot_after=None,
+ atomic_group_name=None):
"""\
Create and enqueue a job.
priority: Low, Medium, High, Urgent
- control_file: contents of control file
- control_type: type of control file, Client or Server
- synch_count: how many machines the job uses per autoserv execution.
- synch_count == 1 means the job is asynchronous.
- hosts: list of hosts to run job on
- meta_hosts: list where each entry is a label name, and for each entry
+ control_file: String contents of the control file.
+ control_type: Type of control file, Client or Server.
+ timeout: Hours after this call returns until the job times out.
+ synch_count: How many machines the job uses per autoserv execution.
+ synch_count == 1 means the job is asynchronous. If an
+ atomic group is given this value is treated as a minimum.
+ hosts: List of hosts to run job on.
+ meta_hosts: List where each entry is a label name, and for each entry
one host will be chosen from that label to run the job
on.
- timeout: hours until job times out
- email_list: string containing emails to mail when the job is done
- dependencies: list of label names on which this job depends
+ run_verify: Should the host be verified before running the test?
+ one_time_hosts: List of hosts not in the database to run the job on.
+ email_list: String containing emails to mail when the job is done
+ dependencies: List of label names on which this job depends
reboot_before: Never, If dirty, or Always
reboot_after: Never, If all tests passed, or Always
+ atomic_group_name: The name of an atomic group to schedule the job on.
+
+ @returns The created Job id number.
"""
if timeout is None:
@@ -318,17 +360,34 @@
owner = thread_local.get_user().login
# input validation
- if not hosts and not meta_hosts and not one_time_hosts:
+ if not (hosts or meta_hosts or one_time_hosts or atomic_group_name):
raise model_logic.ValidationError({
'arguments' : "You must pass at least one of 'hosts', "
- "'meta_hosts', or 'one_time_hosts'"
+ "'meta_hosts', 'one_time_hosts', "
+ "or 'atomic_group_name'"
})
+ # Create and sanity check an AtomicGroup object if requested.
+ if atomic_group_name:
+ if one_time_hosts:
+ raise model_logic.ValidationError(
+ {'one_time_hosts':
+ 'One time hosts cannot be used with an Atomic Group.'})
+ atomic_group = models.AtomicGroup.smart_get(atomic_group_name)
+ if synch_count and synch_count > atomic_group.max_number_of_machines:
+ raise model_logic.ValidationError(
+ {'atomic_group_name' :
+ 'You have requested a synch_count (%d) greater than the '
+ 'maximum machines in the requested Atomic Group (%d).' %
+ (synch_count, atomic_group.max_number_of_machines)})
+ else:
+ atomic_group = None
+
labels_by_name = dict((label.name, label)
for label in models.Label.objects.all())
# convert hostnames & meta hosts to host/label objects
- host_objects = models.Host.smart_get_bulk(hosts or [])
+ host_objects = models.Host.smart_get_bulk(hosts)
metahost_objects = []
metahost_counts = {}
for label in meta_hosts or []:
@@ -353,10 +412,16 @@
% (requested_count, label.name, available_count))
raise model_logic.ValidationError({'meta_hosts' : error})
- if synch_count is not None and synch_count > len(all_host_objects):
- raise model_logic.ValidationError(
- {'hosts': 'only %d hosts provided for job with synch_count = %d'
- % (len(all_host_objects), synch_count)})
+ if atomic_group:
+ rpc_utils.check_atomic_group_create_job(
+ synch_count, host_objects, metahost_objects,
+ dependencies, atomic_group, labels_by_name)
+ else:
+ if synch_count is not None and synch_count > len(all_host_objects):
+ raise model_logic.ValidationError(
+ {'hosts':
+ 'only %d hosts provided for job with synch_count = %d' %
+ (len(all_host_objects), synch_count)})
rpc_utils.check_job_dependencies(host_objects, dependencies)
dependency_labels = [labels_by_name[label_name]
@@ -373,7 +438,7 @@
dependencies=dependency_labels,
reboot_before=reboot_before,
reboot_after=reboot_after)
- job.queue(all_host_objects)
+ job.queue(all_host_objects, atomic_group=atomic_group)
return job.id
@@ -445,6 +510,7 @@
hosts = []
meta_hosts = []
+ atomic_group_name = None
# For each queue entry, if the entry contains a host, add the entry into the
# hosts list if either:
@@ -460,6 +526,13 @@
hosts.append(queue_entry.host)
else:
meta_hosts.append(queue_entry.meta_host.name)
+ if atomic_group_name is None:
+ atomic_group_name = queue_entry.atomic_group.name
+ else:
+ assert atomic_group_name == queue_entry.atomic_group.name, (
+ 'DB inconsistency. HostQueueEntries with multiple atomic'
+ ' groups on job %s: %s != %s' % (
+ id, atomic_group_name, queue_entry.atomic_group.name))
host_dicts = []
@@ -489,6 +562,7 @@
in job.dependency_labels.all()]
info['meta_host_counts'] = meta_host_counts
info['hosts'] = host_dicts
+ info['atomic_group_name'] = atomic_group_name
return rpc_utils.prepare_for_serialization(info)
@@ -497,17 +571,11 @@
def get_host_queue_entries(**filter_data):
"""\
- TODO
+ @returns A sequence of nested dictionaries of host and job information.
"""
- query = models.HostQueueEntry.query_objects(filter_data)
- all_dicts = []
- for queue_entry in query.select_related():
- entry_dict = queue_entry.get_object_dict()
- if entry_dict['host'] is not None:
- entry_dict['host'] = queue_entry.host.get_object_dict()
- entry_dict['job'] = queue_entry.job.get_object_dict()
- all_dicts.append(entry_dict)
- return rpc_utils.prepare_for_serialization(all_dicts)
+ return rpc_utils.prepare_rows_as_nested_dicts(
+ models.HostQueueEntry.query_objects(filter_data),
+ ('host', 'atomic_group', 'job'))
def get_num_host_queue_entries(**filter_data):
@@ -519,7 +587,7 @@
def get_hqe_percentage_complete(**filter_data):
"""
- Computes the percentage of host queue entries matching the given filter data
+ Computes the fraction of host queue entries matching the given filter data
that are complete.
"""
query = models.HostQueueEntry.query_objects(filter_data)
@@ -544,15 +612,23 @@
"""\
Returns a dictionary containing a bunch of data that shouldn't change
often and is otherwise inaccessible. This includes:
- priorities: list of job priority choices
- default_priority: default priority value for new jobs
- users: sorted list of all users
- labels: sorted list of all labels
- tests: sorted list of all tests
- profilers: sorted list of all profilers
- user_login: logged-in username
- host_statuses: sorted list of possible Host statuses
- job_statuses: sorted list of possible HostQueueEntry statuses
+
+ priorities: List of job priority choices.
+ default_priority: Default priority value for new jobs.
+ users: Sorted list of all users.
+ labels: Sorted list of all labels.
+ atomic_groups: Sorted list of all atomic groups.
+ tests: Sorted list of all tests.
+ profilers: Sorted list of all profilers.
+ current_user: Logged-in username.
+ host_statuses: Sorted list of possible Host statuses.
+ job_statuses: Sorted list of possible HostQueueEntry statuses.
+ job_timeout_default: The default job timeout length in hours.
+ reboot_before_options: A list of valid RebootBefore string enums.
+ reboot_after_options: A list of valid RebootAfter string enums.
+ motd: Server's message of the day.
+ status_dictionary: A mapping from one word job status names to a more
+ informative description.
"""
job_fields = models.Job.get_field_dict()
@@ -564,6 +640,7 @@
result['default_priority'] = default_string
result['users'] = get_users(sort_by=['login'])
result['labels'] = get_labels(sort_by=['-platform', 'name'])
+ result['atomic_groups'] = get_atomic_groups(sort_by=['name'])
result['tests'] = get_tests(sort_by=['name'])
result['profilers'] = get_profilers(sort_by=['name'])
result['current_user'] = rpc_utils.prepare_for_serialization(