two new major features:
(1) added test and job dependencies
-added M2M relationship between tests and labels and between jobs and labels, for tracking the labels on which a test/job depends
-modified test_importer to read the DEPENDENCIES field and create the right M2M relationships
-modified generate_control_file() RPC to compute and return the union of test dependencies. since generate_control_file now returns four pieces of information, i converted its return type from tuple to dict, and changed clients accordingly.
-modified job creation clients (GWT and CLI) to pass this dependency list to the create_job() RPC
-modified the create_job() RPC to check that hosts satisfy job dependencies, and to create M2M relationships
-modified the scheduler to check dependencies when scheduling jobs
-modified JobDetailView to show a job's dependencies
(2) added "only_if_needed" bit to labels; if true, a machine with this label can only be used if the label is requested (either by job dependencies or by the metahost label)
-added boolean field to Labels
-modified CLI label creation/viewing to support this new field
-made create_job() RPC and scheduler check for hosts with such a label that was not requested, and reject such hosts
also did some slight refactoring of other code in create_job() to simplify it while I was changing things there.
a couple notes:
-an only_if_needed label can be used if either the job depends on the label or it's a metahost for that label. we assume that if the user specifically requests the label in a metahost, then it's OK, even if the job doesn't depend on that label.
-one-time-hosts are assumed to satisfy job dependencies.
Signed-off-by: Steve Howard <showard@google.com>
git-svn-id: http://test.kernel.org/svn/autotest/trunk@2215 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/cli/job.py b/cli/job.py
index 1208789..7a7c30f 100755
--- a/cli/job.py
+++ b/cli/job.py
@@ -321,6 +321,10 @@
self.ctrl_file_data['do_push_packages'] = True
self.ctrl_file_data['use_container'] = options.container
+ # TODO: add support for manually specifying dependencies, when this is
+ # added to the frontend as well
+ self.data['dependencies'] = []
+
return (options, leftover)
@@ -330,20 +334,20 @@
socket.setdefaulttimeout(topic_common.UPLOAD_SOCKET_TIMEOUT)
print 'Uploading Kernel: this may take a while...',
- (ctrl_file, on_server,
- is_synch) = self.execute_rpc(op='generate_control_file',
- item=self.jobname,
- **self.ctrl_file_data)
+ cf_info = self.execute_rpc(op='generate_control_file',
+ item=self.jobname,
+ **self.ctrl_file_data)
if self.ctrl_file_data.has_key('kernel'):
print 'Done'
socket.setdefaulttimeout(topic_common.DEFAULT_SOCKET_TIMEOUT)
- self.data['control_file'] = ctrl_file
- self.data['is_synchronous'] = is_synch
- if on_server:
+ self.data['control_file'] = cf_info['control_file']
+ self.data['is_synchronous'] = cf_info['is_synchronous']
+ if cf_info['is_server']:
self.data['control_type'] = 'Server'
else:
self.data['control_type'] = 'Client'
+ self.data['dependencies'] = cf_info['dependencies']
return super(job_create, self).execute()
diff --git a/cli/job_unittest.py b/cli/job_unittest.py
index 3b2677b..4b3a62b 100755
--- a/cli/job_unittest.py
+++ b/cli/job_unittest.py
@@ -564,15 +564,19 @@
data = {'priority': 'Medium', 'control_file': ctrl_file, 'hosts': ['host0'],
'name': 'test_job0', 'control_type': 'Client',
- 'meta_hosts': [], 'is_synchronous': False}
+ 'meta_hosts': [], 'is_synchronous': False, 'dependencies': []}
def test_execute_create_job(self):
self.run_cmd(argv=['atest', 'job', 'create', '-t', 'sleeptest',
'test_job0', '-m', 'host0'],
- rpcs=[('generate_control_file', {'tests': ['sleeptest'],
- 'use_container': False},
- True, (self.ctrl_file, False, False)),
+ rpcs=[('generate_control_file',
+ {'tests': ['sleeptest'], 'use_container': False},
+ True,
+ {'control_file' : self.ctrl_file,
+ 'is_synchronous' : False,
+ 'is_server' : False,
+ 'dependencies' : []}),
('create_job', self.data, True, 180)],
out_words_ok=['test_job0', 'Created'],
out_words_no=['Uploading', 'Done'])
@@ -592,10 +596,14 @@
data['control_file'] = self.kernel_ctrl_file
self.run_cmd(argv=['atest', 'job', 'create', '-t', 'sleeptest',
'-k', 'kernel', 'test_job0', '-m', 'host0'],
- rpcs=[('generate_control_file', {'tests': ['sleeptest'],
- 'use_container': False, 'kernel': 'kernel',
- 'do_push_packages': True},
- True, (self.kernel_ctrl_file, False, False)),
+ rpcs=[('generate_control_file',
+ {'tests': ['sleeptest'], 'use_container': False,
+ 'kernel': 'kernel', 'do_push_packages': True},
+ True,
+ {'control_file' : self.kernel_ctrl_file,
+ 'is_synchronous' : False,
+ 'is_server' : False,
+ 'dependencies' : []}),
('create_job', data, True, 180)],
out_words_ok=['test_job0', 'Created',
'Uploading', 'Done'])
@@ -608,10 +616,14 @@
self.run_cmd(argv=['atest', 'job', 'create', '-t', 'sleeptest',
'-k', 'kernel', 'test job with spaces',
'-m', 'host0'],
- rpcs=[('generate_control_file', {'tests': ['sleeptest'],
- 'use_container': False, 'kernel': 'kernel',
- 'do_push_packages': True},
- True, (self.kernel_ctrl_file, False, False)),
+ rpcs=[('generate_control_file',
+ {'tests': ['sleeptest'], 'use_container': False,
+ 'kernel': 'kernel', 'do_push_packages': True},
+ True,
+ {'control_file' : self.kernel_ctrl_file,
+ 'is_synchronous' : False,
+ 'is_server' : False,
+ 'dependencies' : []}),
('create_job', data, True, 180)],
# This is actually 8 spaces,
# the tab has been converted by print.
@@ -713,9 +725,13 @@
data = self.data.copy()
self.run_cmd(argv=['atest', 'job', 'create', '-t', 'sleeptest',
'-c', 'test_job0', '-m', 'host0'],
- rpcs=[('generate_control_file', {'tests': ['sleeptest'],
- 'use_container': True}, True, (self.ctrl_file,
- False, False)),
+ rpcs=[('generate_control_file',
+ {'tests': ['sleeptest'], 'use_container': True},
+ True,
+ {'control_file' : self.ctrl_file,
+ 'is_synchronous' : False,
+ 'is_server' : False,
+ 'dependencies' : []}),
('create_job', data, True, 42)],
out_words_ok=['test_job0', 'Created'])
diff --git a/cli/label.py b/cli/label.py
index f8932e7..a4d6ac2 100755
--- a/cli/label.py
+++ b/cli/label.py
@@ -132,9 +132,9 @@
elif not self.all:
results = [label for label in results
if not label['platform']]
- keys = ['name', 'invalid']
+ keys = ['name', 'only_if_needed', 'invalid']
else:
- keys = ['name', 'platform', 'invalid']
+ keys = ['name', 'platform', 'only_if_needed', 'invalid']
super(label_list, self).output(results, keys)
@@ -147,12 +147,17 @@
help='To create this label as a platform',
default=False,
action='store_true')
+ self.parser.add_option('-o', '--only_if_needed',
+ help='To mark the label as "only use if needed',
+ default=False,
+ action='store_true')
def parse(self):
(options, leftover) = super(label_create, self).parse()
self.data_item_key = 'name'
self.data['platform'] = options.platform
+ self.data['only_if_needed'] = options.only_if_needed
return (options, leftover)
diff --git a/cli/label_unittest.py b/cli/label_unittest.py
index d160435..fff879c 100755
--- a/cli/label_unittest.py
+++ b/cli/label_unittest.py
@@ -15,27 +15,32 @@
u'platform': 0,
u'name': u'label0',
u'invalid': 0,
- u'kernel_config': u''},
+ u'kernel_config': u'',
+ u'only_if_needed': 0},
{u'id': 338, # Valid label
u'platform': 0,
u'name': u'label1',
u'invalid': 0,
- u'kernel_config': u''},
+ u'kernel_config': u'',
+ u'only_if_needed': 0},
{u'id': 340, # Invalid label
u'platform': 0,
u'name': u'label2',
u'invalid': 1,
- u'kernel_config': u''},
+ u'kernel_config': u'',
+ u'only_if_needed': 0},
{u'id': 350, # Valid platform
u'platform': 1,
u'name': u'plat0',
u'invalid': 0,
- u'kernel_config': u''},
+ u'kernel_config': u'',
+ u'only_if_needed': 0},
{u'id': 420, # Invalid platform
u'platform': 1,
u'name': u'plat1',
u'invalid': 1,
- u'kernel_config': u''}]
+ u'kernel_config': u'',
+ u'only_if_needed': 0}]
def test_label_list_labels_only(self):
@@ -82,9 +87,13 @@
def test_execute_create_two_labels(self):
self.run_cmd(argv=['atest', 'label', 'create', 'label0', 'label1',
'--ignore_site_file'],
- rpcs=[('add_label', {'name': 'label0', 'platform': False},
+ rpcs=[('add_label',
+ {'name': 'label0', 'platform': False,
+ 'only_if_needed': False},
True, 42),
- ('add_label', {'name': 'label1', 'platform': False},
+ ('add_label',
+ {'name': 'label1', 'platform': False,
+ 'only_if_needed': False},
True, 43)],
out_words_ok=['Created', 'label0', 'label1'])
@@ -92,9 +101,13 @@
def test_execute_create_two_labels_bad(self):
self.run_cmd(argv=['atest', 'label', 'create', 'label0', 'label1',
'--ignore_site_file'],
- rpcs=[('add_label', {'name': 'label0', 'platform': False},
+ rpcs=[('add_label',
+ {'name': 'label0', 'platform': False,
+ 'only_if_needed': False},
True, 3),
- ('add_label', {'name': 'label1', 'platform': False},
+ ('add_label',
+ {'name': 'label1', 'platform': False,
+ 'only_if_needed': False},
False,
'''ValidationError: {'name':
'This value must be unique (label0)'}''')],
diff --git a/cli/topic_common.py b/cli/topic_common.py
index 03e670c..bbec3f6 100755
--- a/cli/topic_common.py
+++ b/cli/topic_common.py
@@ -89,6 +89,7 @@
'created_on': 'Created On',
'synch_type': 'Synch Type',
'control_file': 'Control File',
+ 'only_if_needed': 'Use only if needed',
}
# In the failure, tag that will replace the item.
@@ -116,8 +117,12 @@
return field
-KEYS_CONVERT = {'locked': lambda flag: str(bool(flag)),
+def _int_2_bool_string(value):
+ return str(bool(value))
+
+KEYS_CONVERT = {'locked': _int_2_bool_string,
'invalid': lambda flag: str(bool(not flag)),
+ 'only_if_needed': _int_2_bool_string,
'platform': __convert_platform,
'labels': lambda labels: ', '.join(labels)}
diff --git a/frontend/afe/doctests/001_rpc_test.txt b/frontend/afe/doctests/001_rpc_test.txt
index 0e869e8..84697b4 100644
--- a/frontend/afe/doctests/001_rpc_test.txt
+++ b/frontend/afe/doctests/001_rpc_test.txt
@@ -51,6 +51,7 @@
... 'name': 'test_label',
... 'platform': 1,
... 'kernel_config': '/my/kernel/config',
+... 'only_if_needed' : False,
... 'invalid': 0}]
True
@@ -63,9 +64,9 @@
4L
>>> data = rpc_interface.get_labels(platform=False)
>>> data == [{'id': 2L, 'name': 'label1', 'platform': 0, 'kernel_config': '',
-... 'invalid': 0},
+... 'only_if_needed': False, 'invalid': 0},
... {'id': 4L, 'name': 'label3', 'platform': 0, 'kernel_config': '',
-... 'invalid': 0}]
+... 'only_if_needed': False, 'invalid': 0}]
True
# delete_label takes an ID or a name as well
@@ -340,13 +341,13 @@
>>> rpc_interface.label_add_hosts(id='my_label', hosts=['my_label_host1', 'my_label_host2'])
# generate a control file
->>> control_file, is_server, is_synch = rpc_interface.generate_control_file(
+>>> cf_info = rpc_interface.generate_control_file(
... tests=['sleeptest', 'my_test'],
... kernel='1.2.3.4',
... label='my_label')
->>> control_file
+>>> cf_info['control_file']
"kernel = '1.2.3.4'\ndef step_init():\n job.next_step([step_test])\n testkernel = job.kernel('1.2.3.4')\n testkernel.config('my_kernel_config')\n testkernel.install()\n testkernel.boot(args='')\n\ndef step_test():\n job.next_step('step0')\n job.next_step('step1')\n\ndef step0():\n job.run_test('testname')\n\ndef step1():\n job.run_test('testname')"
->>> print control_file #doctest: +NORMALIZE_WHITESPACE
+>>> print cf_info['control_file'] #doctest: +NORMALIZE_WHITESPACE
kernel = '1.2.3.4'
def step_init():
job.next_step([step_test])
@@ -362,13 +363,13 @@
job.run_test('testname')
def step1():
job.run_test('testname')
->>> is_server, is_synch
-(False, False)
+>>> cf_info['is_server'], cf_info['is_synchronous'], cf_info['dependencies']
+(False, False, [])
# create a job to run on host1, host2, and any two machines in my_label
>>> rpc_interface.create_job(name='my_job',
... priority='Low',
-... control_file=control_file,
+... control_file=cf_info['control_file'],
... control_type='Client',
... hosts=['host1', 'host2'],
... meta_hosts=['my_label', 'my_label'])
@@ -379,7 +380,7 @@
>>> data = data[0]
>>> data['id'], data['owner'], data['name'], data['priority']
(1L, 'debug_user', 'my_job', 'Low')
->>> data['control_file'] == control_file
+>>> data['control_file'] == cf_info['control_file']
True
>>> data['control_type']
'Client'
@@ -401,7 +402,7 @@
# get_host_queue_entries returns full info about the job within each queue entry
>>> job = data[0]['job']
->>> job == {'control_file': control_file, # refer to the control file we used
+>>> job == {'control_file': cf_info['control_file'], # the control file we used
... 'control_type': 'Client',
... 'created_on': None,
... 'id': 1L,
@@ -493,7 +494,7 @@
# is_synchronous
>>> rpc_interface.create_job(name='my_job',
... priority='Low',
-... control_file=control_file,
+... control_file=cf_info['control_file'],
... control_type='Server',
... is_synchronous=False,
... hosts=['host1'])
diff --git a/frontend/afe/doctests/003_misc_rpc_features.txt b/frontend/afe/doctests/003_misc_rpc_features.txt
index f5e686d..abaac61 100644
--- a/frontend/afe/doctests/003_misc_rpc_features.txt
+++ b/frontend/afe/doctests/003_misc_rpc_features.txt
@@ -10,8 +10,8 @@
3L
# profiler support in control file generation
->>> control_file, is_server, is_synch = rpc_interface.generate_control_file(
+>>> cf_info = rpc_interface.generate_control_file(
... tests=['sleeptest'],
... profilers=['oprofile', 'iostat'])
->>> control_file
+>>> cf_info['control_file']
"def step_init():\n job.next_step('step0')\n job.next_step('step1')\n job.next_step('step2')\n job.next_step('step3')\n job.next_step('step4')\n\ndef step0():\n job.profilers.add('oprofile')\n\ndef step1():\n job.profilers.add('iostat')\n\ndef step2():\n job.run_test('testname')\n\ndef step3():\n job.profilers.delete('oprofile')\n\ndef step4():\n job.profilers.delete('iostat')"
diff --git a/frontend/afe/models.py b/frontend/afe/models.py
index 3451a50..2f79014 100644
--- a/frontend/afe/models.py
+++ b/frontend/afe/models.py
@@ -21,10 +21,13 @@
kernel_config: url/path to kernel config to use for jobs run on this
label
platform: if True, this is a platform label (defaults to False)
+ only_if_needed: if True, a machine with this label can only be used if that
+ that label is requested by the job/test.
"""
name = dbmodels.CharField(maxlength=255, unique=True)
kernel_config = dbmodels.CharField(maxlength=255, blank=True)
platform = dbmodels.BooleanField(default=False)
+ only_if_needed = dbmodels.BooleanField(default=False)
invalid = dbmodels.BooleanField(default=False,
editable=settings.FULL_ADMIN)
@@ -262,6 +265,8 @@
machines.
Optional:
dependencies: What the test requires to run. Comma deliminated list
+ dependency_labels: many-to-many relationship with labels corresponding to
+ test dependencies.
experimental: If this is set to True production servers will ignore the test
run_verify: Whether or not the scheduler should run the verify stage
"""
@@ -286,6 +291,8 @@
synch_type = dbmodels.SmallIntegerField(choices=SynchType.choices(),
default=SynchType.ASYNCHRONOUS)
path = dbmodels.CharField(maxlength=255, unique=True)
+ dependency_labels = dbmodels.ManyToManyField(
+ Label, blank=True, filter_interface=dbmodels.HORIZONTAL)
name_field = 'name'
objects = model_logic.ExtendedManager()
@@ -472,6 +479,24 @@
return all_job_counts
+ def populate_dependencies(self, jobs):
+ job_ids = ','.join(str(job['id']) for job in jobs)
+ cursor = connection.cursor()
+ cursor.execute("""
+ SELECT jobs.id, GROUP_CONCAT(labels.name)
+ FROM jobs
+ INNER JOIN jobs_dependency_labels
+ ON jobs.id = jobs_dependency_labels.job_id
+ INNER JOIN labels ON jobs_dependency_labels.label_id = labels.id
+ WHERE jobs.id IN (%s)
+ GROUP BY jobs.id
+ """ % job_ids)
+ id_to_dependencies = dict((job_id, dependencies)
+ for job_id, dependencies in cursor.fetchall())
+ for job in jobs:
+ job['dependencies'] = id_to_dependencies.get(job['id'], '')
+
+
class Job(dbmodels.Model, model_logic.ModelExtensions):
"""\
owner: username of job owner
@@ -489,6 +514,8 @@
timeout: hours until job times out
email_list: list of people to email on completion delimited by any of:
white space, ',', ':', ';'
+ dependency_labels: many-to-many relationship with labels corresponding to
+ job dependencies
"""
Priority = enum.Enum('Low', 'Medium', 'High', 'Urgent')
ControlType = enum.Enum('Server', 'Client', start_value=1)
@@ -512,6 +539,8 @@
run_verify = dbmodels.BooleanField(default=True)
timeout = dbmodels.IntegerField()
email_list = dbmodels.CharField(maxlength=250, blank=True)
+ dependency_labels = dbmodels.ManyToManyField(
+ Label, blank=True, filter_interface=dbmodels.HORIZONTAL)
# custom manager
@@ -524,7 +553,8 @@
@classmethod
def create(cls, owner, name, priority, control_file, control_type,
- hosts, synch_type, timeout, run_verify, email_list):
+ hosts, synch_type, timeout, run_verify, email_list,
+ dependencies):
"""\
Creates a job by taking some information (the listed args)
and filling in the rest of the necessary information.
@@ -545,6 +575,7 @@
+ ' one host to run on'}
raise model_logic.ValidationError(errors)
job.save()
+ job.dependency_labels = dependencies
return job
diff --git a/frontend/afe/rpc_interface.py b/frontend/afe/rpc_interface.py
index c471a68..6beb3be 100644
--- a/frontend/afe/rpc_interface.py
+++ b/frontend/afe/rpc_interface.py
@@ -37,9 +37,10 @@
# labels
-def add_label(name, kernel_config=None, platform=None):
+def add_label(name, kernel_config=None, platform=None, only_if_needed=None):
return models.Label.add_object(name=name, kernel_config=kernel_config,
- platform=platform).id
+ platform=platform,
+ only_if_needed=only_if_needed).id
def modify_label(id, **data):
@@ -246,10 +247,11 @@
def generate_control_file(tests, kernel=None, label=None, profilers=[]):
"""\
Generates a client-side control file to load a kernel and run a set of
- tests. Returns a tuple (control_file, is_server, is_synchronous):
+ tests. Returns a dict with the following keys:
control_file - the control file text
is_server - is the control file a server-side control file?
is_synchronous - should the control file be run synchronously?
+ dependencies - a list of the names of labels on which the job depends
tests: list of tests to run
kernel: kernel to install in generated control file
@@ -257,21 +259,22 @@
profilers: list of profilers to activate during the job
"""
if not tests:
- return '', False, False
+ return dict(control_file='', is_server=False, is_synchronous=False,
+ dependencies=[])
- is_server, is_synchronous, test_objects, profiler_objects, label = (
+ cf_info, test_objects, profiler_objects, label = (
rpc_utils.prepare_generate_control_file(tests, kernel, label,
profilers))
- cf_text = control_file.generate_control(tests=test_objects, kernel=kernel,
- platform=label,
- profilers=profiler_objects,
- is_server=is_server)
- return cf_text, is_server, is_synchronous
+ cf_info['control_file'] = control_file.generate_control(
+ tests=test_objects, kernel=kernel, platform=label,
+ profilers=profiler_objects, is_server=cf_info['is_server'])
+ return cf_info
def create_job(name, priority, control_file, control_type, timeout=None,
is_synchronous=None, hosts=None, meta_hosts=None,
- run_verify=True, one_time_hosts=None, email_list=''):
+ run_verify=True, one_time_hosts=None, email_list='',
+ dependencies=[]):
"""\
Create and enqueue a job.
@@ -285,6 +288,7 @@
on.
timeout: hours until job times out
email_list: string containing emails to mail when the job is done
+ dependencies: list of label names on which this job depends
"""
if timeout is None:
@@ -299,33 +303,32 @@
"'meta_hosts', or 'one_time_hosts'"
})
- requested_host_counts = {}
+ labels_by_name = dict((label.name, label)
+ for label in models.Label.objects.all())
# convert hostnames & meta hosts to host/label objects
host_objects = []
+ metahost_objects = []
+ metahost_counts = {}
for host in hosts or []:
this_host = models.Host.smart_get(host)
host_objects.append(this_host)
for label in meta_hosts or []:
- this_label = models.Label.smart_get(label)
- host_objects.append(this_label)
- requested_host_counts.setdefault(this_label.name, 0)
- requested_host_counts[this_label.name] += 1
+ this_label = labels_by_name[label]
+ metahost_objects.append(this_label)
+ metahost_counts.setdefault(this_label, 0)
+ metahost_counts[this_label] += 1
for host in one_time_hosts or []:
this_host = models.Host.create_one_time_host(host)
host_objects.append(this_host)
# check that each metahost request has enough hosts under the label
- if meta_hosts:
- labels = models.Label.objects.filter(
- name__in=requested_host_counts.keys())
- for label in labels:
- count = label.host_set.count()
- if requested_host_counts[label.name] > count:
- error = ("You have requested %d %s's, but there are only %d."
- % (requested_host_counts[label.name],
- label.name, count))
- raise model_logic.ValidationError({'arguments' : error})
+ for label, requested_count in metahost_counts.iteritems():
+ available_count = label.host_set.count()
+ if requested_count > available_count:
+ error = ("You have requested %d %s's, but there are only %d."
+ % (requested_count, label.name, available_count))
+ raise model_logic.ValidationError({'meta_hosts' : error})
# default is_synchronous to some appropriate value
ControlType = models.Job.ControlType
@@ -338,15 +341,20 @@
else:
synch_type = models.Test.SynchType.ASYNCHRONOUS
+ rpc_utils.check_job_dependencies(host_objects, dependencies)
+ dependency_labels = [labels_by_name[label_name]
+ for label_name in dependencies]
+
job = models.Job.create(owner=owner, name=name, priority=priority,
control_file=control_file,
control_type=control_type,
synch_type=synch_type,
- hosts=host_objects,
+ hosts=host_objects + metahost_objects,
timeout=timeout,
run_verify=run_verify,
- email_list=email_list.strip())
- job.queue(host_objects)
+ email_list=email_list.strip(),
+ dependencies=dependency_labels)
+ job.queue(host_objects + metahost_objects)
return job.id
@@ -382,8 +390,9 @@
filter_data['extra_args'] = rpc_utils.extra_job_filters(not_yet_run,
running,
finished)
- return rpc_utils.prepare_for_serialization(
- models.Job.list_objects(filter_data))
+ jobs = models.Job.list_objects(filter_data)
+ models.Job.objects.populate_dependencies(jobs)
+ return rpc_utils.prepare_for_serialization(jobs)
def get_num_jobs(not_yet_run=False, running=False, finished=False,
diff --git a/frontend/afe/rpc_utils.py b/frontend/afe/rpc_utils.py
index ac175f0..fcdc0ba 100644
--- a/frontend/afe/rpc_utils.py
+++ b/frontend/afe/rpc_utils.py
@@ -142,4 +142,44 @@
if label:
label = models.Label.smart_get(label)
- return is_server, is_synchronous, test_objects, profiler_objects, label
+ dependencies = set(label.name for label
+ in models.Label.objects.filter(test__in=test_objects))
+
+ return (dict(is_server=is_server, is_synchronous=is_synchronous,
+ dependencies=list(dependencies)),
+ test_objects, profiler_objects, label)
+
+
+def check_job_dependencies(host_objects, job_dependencies):
+ """
+ Check that a set of machines satisfies a job's dependencies.
+ host_objects: list of models.Host objects
+ job_dependencies: list of names of labels
+ """
+ # check that hosts satisfy dependencies
+ host_ids = [host.id for host in host_objects]
+ hosts_in_job = models.Host.objects.filter(id__in=host_ids)
+ ok_hosts = hosts_in_job
+ for index, dependency in enumerate(job_dependencies):
+ ok_hosts &= models.Host.objects.filter_custom_join(
+ '_label%d' % index, labels__name=dependency)
+ failing_hosts = (set(host.hostname for host in host_objects) -
+ set(host.hostname for host in ok_hosts))
+ if failing_hosts:
+ raise model_logic.ValidationError(
+ {'hosts' : 'Host(s) failed to meet job dependencies: ' +
+ ', '.join(failing_hosts)})
+
+ # check for hosts that have only_if_needed labels that aren't requested
+ labels_not_requested = models.Label.objects.filter(only_if_needed=True,
+ host__id__in=host_ids)
+ labels_not_requested = labels_not_requested.exclude(
+ name__in=job_dependencies)
+ errors = []
+ for label in labels_not_requested:
+ hosts_in_label = hosts_in_job.filter(labels=label)
+ errors.append('Cannot use hosts with label "%s" unless requested: %s' %
+ (label.name,
+ ', '.join(host.hostname for host in hosts_in_label)))
+ if errors:
+ raise model_logic.ValidationError({'hosts' : '\n'.join(errors)})
diff --git a/frontend/client/src/autotest/afe/CreateJobView.java b/frontend/client/src/autotest/afe/CreateJobView.java
index 24f31c5..c615362 100644
--- a/frontend/client/src/autotest/afe/CreateJobView.java
+++ b/frontend/client/src/autotest/afe/CreateJobView.java
@@ -170,6 +170,7 @@
protected boolean controlEdited = false;
protected boolean controlReadyForSubmit = false;
+ private JSONArray dependencies = new JSONArray();
public CreateJobView(JobCreateListener listener) {
this.listener = listener;
@@ -316,10 +317,12 @@
rpcProxy.rpcCall("generate_control_file", params, new JsonRpcCallback() {
@Override
public void onSuccess(JSONValue result) {
- JSONArray results = result.isArray();
- String controlFileText = results.get(0).isString().stringValue();
- boolean isServer = results.get(1).isBoolean().booleanValue();
- boolean isSynchronous = results.get(2).isBoolean().booleanValue();
+ JSONObject controlInfo = result.isObject();
+ String controlFileText = controlInfo.get("control_file").isString().stringValue();
+ boolean isServer = controlInfo.get("is_server").isBoolean().booleanValue();
+ boolean isSynchronous =
+ controlInfo.get("is_synchronous").isBoolean().booleanValue();
+ dependencies = controlInfo.get("dependencies").isArray();
controlFile.setText(controlFileText);
controlTypeSelect.setControlType(isServer ? TestSelector.SERVER_TYPE :
TestSelector.CLIENT_TYPE);
@@ -523,6 +526,7 @@
controlFilePanel.setOpen(false);
editControlButton.setText(EDIT_CONTROL_STRING);
hostSelector.reset();
+ dependencies = new JSONArray();
}
protected void submitJob() {
@@ -565,6 +569,7 @@
args.put("meta_hosts", Utils.stringsToJSON(hosts.metaHosts));
args.put("one_time_hosts",
Utils.stringsToJSON(hosts.oneTimeHosts));
+ args.put("dependencies", dependencies);
rpcProxy.rpcCall("create_job", args, new JsonRpcCallback() {
@Override
diff --git a/frontend/client/src/autotest/afe/JobDetailView.java b/frontend/client/src/autotest/afe/JobDetailView.java
index 2adbdbd..c7acbb0 100644
--- a/frontend/client/src/autotest/afe/JobDetailView.java
+++ b/frontend/client/src/autotest/afe/JobDetailView.java
@@ -97,6 +97,7 @@
showField(jobObject, "email_list", "view_email_list");
showField(jobObject, "control_type", "view_control_type");
showField(jobObject, "control_file", "view_control_file");
+ showField(jobObject, "dependencies", "view_dependencies");
String synchType = jobObject.get("synch_type").isString().stringValue();
showText(synchType.toLowerCase(), "view_synch_type");
diff --git a/frontend/client/src/autotest/public/AfeClient.html b/frontend/client/src/autotest/public/AfeClient.html
index 8a50e78..fc4de49 100644
--- a/frontend/client/src/autotest/public/AfeClient.html
+++ b/frontend/client/src/autotest/public/AfeClient.html
@@ -54,6 +54,8 @@
<span id="view_timeout"></span> hours<br>
<span class="field-name">Email List:</span>
<span id="view_email_list"></span><br>
+ <span class="field-name">Dependencies:</span>
+ <span id="view_dependencies"></span><br>
<span class="field-name">Status:</span>
<span id="view_status"></span><br>
diff --git a/frontend/migrations/018_add_label_only_if_needed.py b/frontend/migrations/018_add_label_only_if_needed.py
new file mode 100644
index 0000000..790299a
--- /dev/null
+++ b/frontend/migrations/018_add_label_only_if_needed.py
@@ -0,0 +1,24 @@
+CREATE_MANY2MANY_TABLES = """
+CREATE TABLE `autotests_dependency_labels` (
+ `id` integer AUTO_INCREMENT NOT NULL PRIMARY KEY,
+ `test_id` integer NOT NULL REFERENCES `autotests` (`id`),
+ `label_id` integer NOT NULL REFERENCES `labels` (`id`),
+ UNIQUE (`test_id`, `label_id`)
+);
+CREATE TABLE `jobs_dependency_labels` (
+ `id` integer AUTO_INCREMENT NOT NULL PRIMARY KEY,
+ `job_id` integer NOT NULL REFERENCES `jobs` (`id`),
+ `label_id` integer NOT NULL REFERENCES `labels` (`id`),
+ UNIQUE (`job_id`, `label_id`)
+);
+"""
+
+def migrate_up(manager):
+ manager.execute('ALTER TABLE labels '
+ 'ADD COLUMN only_if_needed bool NOT NULL')
+ manager.execute_script(CREATE_MANY2MANY_TABLES)
+
+def migrate_down(manager):
+ manager.execute('ALTER TABLE labels DROP COLUMN only_if_needed')
+ manager.execute('DROP TABLE IF EXISTS `autotests_dependency_labels`')
+ manager.execute('DROP TABLE IF EXISTS `jobs_dependency_labels`')
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 922cf5a..94a92b0 100644
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -333,14 +333,21 @@
@classmethod
- def _get_many2many_dict(cls, query, id_list):
+ def _get_many2many_dict(cls, query, id_list, flip=False):
if not id_list:
return {}
query %= cls._get_sql_id_list(id_list)
rows = _db.execute(query)
+ return cls._process_many2many_dict(rows, flip)
+
+
+ @staticmethod
+ def _process_many2many_dict(rows, flip=False):
result = {}
for row in rows:
left_id, right_id = long(row[0]), long(row[1])
+ if flip:
+ left_id, right_id = right_id, left_id
result.setdefault(left_id, set()).add(right_id)
return result
@@ -368,6 +375,16 @@
@classmethod
+ def _get_job_dependencies(cls, job_ids):
+ query = """
+ SELECT job_id, label_id
+ FROM jobs_dependency_labels
+ WHERE job_id IN (%s)
+ """
+ return cls._get_many2many_dict(query, job_ids)
+
+
+ @classmethod
def _get_host_acls(cls, host_ids):
query = """
SELECT host_id, acl_group_id
@@ -383,8 +400,16 @@
SELECT label_id, host_id
FROM hosts_labels
WHERE host_id IN (%s)
- """
- return cls._get_many2many_dict(query, host_ids)
+ """ % cls._get_sql_id_list(host_ids)
+ rows = _db.execute(query)
+ labels_to_hosts = cls._process_many2many_dict(rows)
+ hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
+ return labels_to_hosts, hosts_to_labels
+
+
+ @classmethod
+ def _get_labels(cls):
+ return dict((label.id, label) for label in Label.fetch())
def refresh(self, pending_queue_entries):
@@ -394,10 +419,13 @@
for queue_entry in pending_queue_entries]
self._job_acls = self._get_job_acl_groups(relevant_jobs)
self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
+ self._job_dependencies = self._get_job_dependencies(relevant_jobs)
host_ids = self._hosts_available.keys()
self._host_acls = self._get_host_acls(host_ids)
- self._label_hosts = self._get_label_hosts(host_ids)
+ self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
+
+ self._labels = self._get_labels()
def _is_acl_accessible(self, host_id, queue_entry):
@@ -406,8 +434,44 @@
return len(host_acls.intersection(job_acls)) > 0
+ def _check_job_dependencies(self, job_dependencies, host_labels):
+ missing = job_dependencies - host_labels
+ return len(job_dependencies - host_labels) == 0
+
+
+ def _check_only_if_needed_labels(self, job_dependencies, host_labels,
+ queue_entry):
+ for label_id in host_labels:
+ label = self._labels[label_id]
+ if not label.only_if_needed:
+ # we don't care about non-only_if_needed labels
+ continue
+ if queue_entry.meta_host == label_id:
+ # if the label was requested in a metahost it's OK
+ continue
+ if label_id not in job_dependencies:
+ return False
+ return True
+
+
+ def _is_host_eligible_for_job(self, host_id, queue_entry):
+ job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
+ host_labels = self._host_labels.get(host_id, set())
+ if not self._is_acl_accessible(host_id, queue_entry):
+ print 'no acl access'
+ if not self._check_job_dependencies(job_dependencies, host_labels):
+ print 'didnt meet deps'
+ if not self._check_only_if_needed_labels(job_dependencies, host_labels,
+ queue_entry):
+ print 'didnt request oin label'
+ return (self._is_acl_accessible(host_id, queue_entry) and
+ self._check_job_dependencies(job_dependencies, host_labels) and
+ self._check_only_if_needed_labels(job_dependencies, host_labels,
+ queue_entry))
+
+
def _schedule_non_metahost(self, queue_entry):
- if not self._is_acl_accessible(queue_entry.host_id, queue_entry):
+ if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
return None
return self._hosts_available.pop(queue_entry.host_id, None)
@@ -436,7 +500,7 @@
continue
if host_id in ineligible_host_ids:
continue
- if not self._is_acl_accessible(host_id, queue_entry):
+ if not self._is_host_eligible_for_job(host_id, queue_entry):
continue
hosts_in_label.remove(host_id)
@@ -1584,7 +1648,7 @@
@classmethod
- def fetch(cls, where, params=(), joins='', order_by=''):
+ def fetch(cls, where='', params=(), joins='', order_by=''):
table = cls._get_table()
order_by = cls._prefix_with(order_by, 'ORDER BY ')
where = cls._prefix_with(where, 'WHERE ')
@@ -1614,6 +1678,18 @@
return ['id', 'job_id', 'host_id']
+class Label(DBObject):
+ @classmethod
+ def _get_table(cls):
+ return 'labels'
+
+
+ @classmethod
+ def _fields(cls):
+ return ['id', 'name', 'kernel_config', 'platform', 'invalid',
+ 'only_if_needed']
+
+
class Host(DBObject):
def __init__(self, id=None, row=None):
super(Host, self).__init__(id=id, row=row)
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py
index 90880b7..7eab730 100644
--- a/scheduler/monitor_db_unittest.py
+++ b/scheduler/monitor_db_unittest.py
@@ -25,6 +25,7 @@
-- create a label for each of two hosts
INSERT INTO labels (name) VALUES ('label1'), ('label2');
+INSERT INTO labels (name, only_if_needed) VALUES ('label3', true);
-- add hosts to labels
INSERT INTO hosts_labels (host_id, label_id) VALUES
@@ -297,6 +298,31 @@
self._check_for_extra_schedulings()
+ def _test_only_if_needed_labels_helper(self, use_metahosts):
+ # apply only_if_needed label3 to host1
+ self._do_query('INSERT INTO hosts_labels (host_id, label_id) '
+ 'VALUES (1, 3)')
+ self._create_job_simple([1], use_metahosts)
+ # if the job doesn't depend on label3, there should be no scheduling
+ self._dispatcher._schedule_new_jobs()
+ self._check_for_extra_schedulings()
+
+ # now make the job depend on label3
+ self._do_query('INSERT INTO jobs_dependency_labels (job_id, label_id) '
+ 'VALUES (1, 3)')
+ self._dispatcher._schedule_new_jobs()
+ self._assert_job_scheduled_on(1, 1)
+ self._check_for_extra_schedulings()
+
+ if use_metahosts:
+ # should also work if the metahost is the only_if_needed label
+ self._do_query('DELETE FROM jobs_dependency_labels')
+ self._create_job(metahosts=[3])
+ self._dispatcher._schedule_new_jobs()
+ self._assert_job_scheduled_on(2, 1)
+ self._check_for_extra_schedulings()
+
+
def test_basic_scheduling(self):
self._test_basic_scheduling_helper(False)
@@ -317,6 +343,10 @@
self._test_obey_ACLs_helper(False)
+ def test_only_if_needed_labels(self):
+ self._test_only_if_needed_labels_helper(False)
+
+
def test_non_metahost_on_invalid_host(self):
"""
Non-metahost entries can get scheduled on invalid hosts (this is how
@@ -349,6 +379,10 @@
self._test_obey_ACLs_helper(True)
+ def test_metahost_only_if_needed_labels(self):
+ self._test_only_if_needed_labels_helper(True)
+
+
def test_nonmetahost_over_metahost(self):
"""
Non-metahost entries should take priority over metahost entries
diff --git a/utils/test_importer.py b/utils/test_importer.py
index 2cc6cf3..0fff018 100644
--- a/utils/test_importer.py
+++ b/utils/test_importer.py
@@ -27,7 +27,7 @@
# Global
DRY_RUN = False
-
+DEPENDENCIES_NOT_FOUND = set()
def main(argv):
"""Main function"""
@@ -161,16 +161,17 @@
connection=db_connect()
cursor = connection.cursor()
# Get tests
- sql = "SELECT path FROM autotests";
+ sql = "SELECT id, path FROM autotests";
cursor.execute(sql)
results = cursor.fetchall()
- for path in results:
- full_path = os.path.join(autotest_dir, path[0])
+ for test_id, path in results:
+ full_path = os.path.join(autotest_dir, path)
if not os.path.isfile(full_path):
if verbose:
- print "Removing " + path[0]
- sql = "DELETE FROM autotests WHERE path='%s'" % path[0]
- db_execute(cursor, sql)
+ print "Removing " + path
+ db_execute(cursor, "DELETE FROM autotests WHERE id=%s" % test_id)
+ db_execute(cursor, "DELETE FROM autotests_dependency_labels WHERE "
+ "test_id=%s" % test_id)
# Find profilers that are no longer present
profilers = []
@@ -230,6 +231,7 @@
"""Update or add each test to the database"""
connection=db_connect()
cursor = connection.cursor()
+ new_test_dicts = []
for test in tests:
new_test = {}
new_test['path'] = test.replace(autotest_dir, '').lstrip('/')
@@ -257,6 +259,7 @@
continue
# clean tests for insertion into db
new_test = dict_db_clean(new_test)
+ new_test_dicts.append(new_test)
sql = "SELECT name,path FROM autotests WHERE path='%s' LIMIT 1"
sql %= new_test['path']
cursor.execute(sql)
@@ -294,6 +297,8 @@
db_execute(cursor, sql)
+ add_label_dependencies(new_test_dicts, cursor)
+
connection.commit()
connection.close()
@@ -337,9 +342,61 @@
test['time'] = test_time[test['time'].lower()]
if str == type(test['test_type']):
test['test_type'] = test_type[test['test_type'].lower()]
+
return test
+def add_label_dependencies(tests, cursor):
+ """
+ Look at the DEPENDENCIES field for each test and add the proper many-to-many
+ relationships.
+ """
+ label_name_to_id = get_id_map(cursor, 'labels', 'name')
+ test_path_to_id = get_id_map(cursor, 'autotests', 'path')
+
+ # clear out old relationships
+ test_ids = ','.join(str(test_path_to_id[test['path']])
+ for test in tests)
+ db_execute(cursor,
+ 'DELETE FROM autotests_dependency_labels WHERE test_id IN (%s)' %
+ test_ids)
+
+ value_pairs = []
+ for test in tests:
+ test_id = test_path_to_id[test['path']]
+ for label_name in test['dependencies'].split(','):
+ label_name = label_name.strip().lower()
+ if not label_name:
+ continue
+ if label_name not in label_name_to_id:
+ log_dependency_not_found(label_name)
+ continue
+ label_id = label_name_to_id[label_name]
+ value_pairs.append('(%s, %s)' % (test_id, label_id))
+
+ if not value_pairs:
+ return
+
+ query = ('INSERT INTO autotests_dependency_labels (test_id, label_id) '
+ 'VALUES ' + ','.join(value_pairs))
+ db_execute(cursor, query)
+
+
+def log_dependency_not_found(label_name):
+ if label_name in DEPENDENCIES_NOT_FOUND:
+ return
+ print 'Dependency %s not found' % label_name
+ DEPENDENCIES_NOT_FOUND.add(label_name)
+
+
+def get_id_map(cursor, table_name, name_field):
+ cursor.execute('SELECT id, %s FROM %s' % (name_field, table_name))
+ name_to_id = {}
+ for item_id, item_name in cursor.fetchall():
+ name_to_id[item_name] = item_id
+ return name_to_id
+
+
def get_tests_from_fs(parent_dir, control_pattern, add_noncompliant=False):
"""Find control jobs in location and create one big job
Returns: