blob: a9a854ce72ba1d3f336b66e1c657ae9d4210fef4 [file] [log] [blame]
Prashanth Bf66d51b2014-05-06 12:42:25 -07001#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8import mock
9
10import common
11
12from autotest_lib.client.common_lib.test_utils import unittest
13from autotest_lib.frontend import setup_django_environment
14from autotest_lib.frontend.afe import frontend_test_utils
15from autotest_lib.frontend.afe import models
Prashanth B4ec98672014-05-15 10:44:54 -070016from autotest_lib.scheduler import email_manager
17from autotest_lib.scheduler import host_scheduler
18from autotest_lib.scheduler import monitor_db
Prashanth Bf66d51b2014-05-06 12:42:25 -070019from autotest_lib.scheduler import rdb
20from autotest_lib.scheduler import rdb_lib
21from autotest_lib.scheduler import rdb_testing_utils
22
23
24class QueryManagerTests(rdb_testing_utils.AbstractBaseRDBTester,
25 unittest.TestCase):
26 """Verify scheduler behavior when pending jobs are already given hosts."""
27
28 _config_section = 'AUTOTEST_WEB'
29
30
31 def testPendingQueueEntries(self):
32 """Test retrieval of pending queue entries."""
33 job = self.create_job(deps=set(['a']))
34
35 # Check that we don't pull the job we just created with only_hostless.
36 jobs_with_hosts = self.job_query_manager.get_pending_queue_entries(
37 only_hostless=True)
38 self.assertTrue(len(jobs_with_hosts) == 0)
39
40 # Check that only_hostless=False pulls new jobs, as always.
41 jobs_without_hosts = self.job_query_manager.get_pending_queue_entries(
42 only_hostless=False)
43 self.assertTrue(jobs_without_hosts[0].id == job.id and
44 jobs_without_hosts[0].host_id is None)
45
46
47 def testHostQueries(self):
48 """Verify that the host query manager maintains its data structures."""
49 # Create a job and use the host_query_managers internal datastructures
50 # to retrieve its job info.
51 job = self.create_job(
52 deps=rdb_testing_utils.DEFAULT_DEPS,
53 acls=rdb_testing_utils.DEFAULT_ACLS)
54 queue_entries = self._dispatcher._refresh_pending_queue_entries()
55 job_manager = rdb_lib.JobQueryManager(queue_entries)
56 job_info = job_manager.get_job_info(queue_entries[0])
57 default_dep_ids = set([label.id for label in self.db_helper.get_labels(
58 name__in=rdb_testing_utils.DEFAULT_DEPS)])
59 default_acl_ids = set([acl.id for acl in self.db_helper.get_acls(
60 name__in=rdb_testing_utils.DEFAULT_ACLS)])
61 self.assertTrue(set(job_info['deps']) == default_dep_ids)
62 self.assertTrue(set(job_info['acls']) == default_acl_ids)
63
64
65 def testNewJobsWithHosts(self):
66 """Test that we handle inactive hqes with unleased hosts correctly."""
67 # Create a job and assign it an unleased host, then check that the
68 # HQE becomes active and the host remains assigned to it.
69 job = self.create_job(deps=['a'])
70 host = self.db_helper.create_host('h1', deps=['a'])
71 self.db_helper.add_host_to_job(host, job.id)
72
73 queue_entries = self._dispatcher._refresh_pending_queue_entries()
74 self._dispatcher._schedule_new_jobs()
75
76 host = self.db_helper.get_host(hostname='h1')[0]
77 self.assertTrue(host.leased == True and
78 host.status == models.Host.Status.READY)
79 hqes = list(self.db_helper.get_hqes(host_id=host.id))
80 self.assertTrue(len(hqes) == 1 and hqes[0].active and
81 hqes[0].status == models.HostQueueEntry.Status.QUEUED)
82
83
84 def testNewJobsWithInvalidHost(self):
85 """Test handling of inactive hqes assigned invalid, unleased hosts."""
86 # Create a job and assign it an unleased host, then check that the
87 # HQE becomes DOES NOT become active, because we validate the
88 # assignment again.
89 job = self.create_job(deps=['a'])
90 host = self.db_helper.create_host('h1', deps=['b'])
91 self.db_helper.add_host_to_job(host, job.id)
92
93 queue_entries = self._dispatcher._refresh_pending_queue_entries()
94 self._dispatcher._schedule_new_jobs()
95
96 host = self.db_helper.get_host(hostname='h1')[0]
97 self.assertTrue(host.leased == False and
98 host.status == models.Host.Status.READY)
99 hqes = list(self.db_helper.get_hqes(host_id=host.id))
100 self.assertTrue(len(hqes) == 1 and not hqes[0].active and
101 hqes[0].status == models.HostQueueEntry.Status.QUEUED)
102
103
104 def testNewJobsWithLeasedHost(self):
105 """Test handling of inactive hqes assigned leased hosts."""
106 # Create a job and assign it a leased host, then check that the
107 # HQE does not become active through the scheduler, and that the
108 # host gets released.
109 job = self.create_job(deps=['a'])
110 host = self.db_helper.create_host('h1', deps=['b'])
111 self.db_helper.add_host_to_job(host, job.id)
112 host.leased = 1
113 host.save()
114
115 rdb.batch_acquire_hosts = mock.MagicMock()
116 queue_entries = self._dispatcher._refresh_pending_queue_entries()
117 self._dispatcher._schedule_new_jobs()
118 self.assertTrue(rdb.batch_acquire_hosts.call_count == 0)
119 host = self.db_helper.get_host(hostname='h1')[0]
120 self.assertTrue(host.leased == True and
121 host.status == models.Host.Status.READY)
122 hqes = list(self.db_helper.get_hqes(host_id=host.id))
123 self.assertTrue(len(hqes) == 1 and not hqes[0].active and
124 hqes[0].status == models.HostQueueEntry.Status.QUEUED)
125 self.host_scheduler._release_hosts()
126 self.assertTrue(self.db_helper.get_host(hostname='h1')[0].leased == 0)
127
128
129 def testSpecialTaskOrdering(self):
130 """Test priority ordering of special tasks."""
131
132 # Create 2 special tasks, one with and one without an hqe.
Prashanth B4ec98672014-05-15 10:44:54 -0700133 # Activate the hqe and make sure it gets scheduled before the other.
Prashanth Bf66d51b2014-05-06 12:42:25 -0700134 host = self.db_helper.create_host('h1', deps=['a'])
Prashanth B4ec98672014-05-15 10:44:54 -0700135 job1 = self.create_job(deps=['a'])
136 self.db_helper.add_host_to_job(host, job1.id)
137 task1 = self.db_helper.create_special_task(job1.id)
138 hqe = self.db_helper.get_hqes(job=job1.id)[0]
Prashanth Bf66d51b2014-05-06 12:42:25 -0700139
Prashanth B4ec98672014-05-15 10:44:54 -0700140 # This task has no queue entry.
141 task2 = self.db_helper.create_special_task(host_id=host.id)
142
143 # Since the hqe task isn't active we get both back.
Prashanth Bf66d51b2014-05-06 12:42:25 -0700144 tasks = self.job_query_manager.get_prioritized_special_tasks()
Prashanth B4ec98672014-05-15 10:44:54 -0700145 self.assertTrue(tasks[1].queue_entry_id is None and
146 tasks[0].queue_entry_id == hqe.id)
147
148 # Activate the hqe and make sure the frontned task isn't returned.
149 self.db_helper.update_hqe(hqe.id, active=True)
150 tasks = self.job_query_manager.get_prioritized_special_tasks()
151 self.assertTrue(tasks[0].id == task1.id)
152
153
154class HostSchedulerTests(rdb_testing_utils.AbstractBaseRDBTester,
155 unittest.TestCase):
156 """Verify scheduler behavior when pending jobs are already given hosts."""
157
158 _config_section = 'AUTOTEST_WEB'
159
160
161 def setUp(self):
162 super(HostSchedulerTests, self).setUp()
163 self.host_scheduler = host_scheduler.HostScheduler()
164
165
166 def testSpecialTaskLocking(self):
167 """Test that frontend special tasks lock hosts."""
168 # Create multiple tasks with hosts and make sure the hosts get locked.
169 host = self.db_helper.create_host('h')
170 host1 = self.db_helper.create_host('h1')
171 task = self.db_helper.create_special_task(host_id=host.id)
172 task1 = self.db_helper.create_special_task(host_id=host1.id)
173 self.host_scheduler._lease_hosts_of_frontend_tasks()
174 self.assertTrue(self.db_helper.get_host(hostname='h')[0].leased == 1 and
175 self.db_helper.get_host(hostname='h1')[0].leased == 1)
176
177
178 def testJobScheduling(self):
179 """Test new host acquisitions."""
180 # Create a job that will find a host through the host scheduler, and
181 # make sure the hqe is activated, and a special task is created.
182 job = self.create_job(deps=set(['a']))
183 host = self.db_helper.create_host('h1', deps=set(['a']))
184 self.host_scheduler._schedule_jobs()
185 hqe = self.db_helper.get_hqes(job_id=job.id)[0]
186 self.assertTrue(hqe.active and hqe.host_id == host.id and
187 hqe.status == models.HostQueueEntry.Status.QUEUED)
188 task = self.db_helper.get_tasks(queue_entry_id=hqe.id)[0]
189 self.assertTrue(task.is_active == 0 and task.host_id == host.id)
190
191
192 def testOverlappingJobs(self):
193 """Check that we catch all cases of an overlapping job."""
194 job_1 = self.create_job(deps=set(['a']))
195 job_2 = self.create_job(deps=set(['a']))
196 host = self.db_helper.create_host('h1', deps=set(['a']))
197 self.db_helper.add_host_to_job(host, job_1.id, activate=True)
198 self.db_helper.add_host_to_job(host, job_2.id, activate=True)
199 jobs = (self.host_scheduler.job_query_manager.get_overlapping_jobs())
200 self.assertTrue(len(jobs) == 2 and
201 jobs[0]['host_id'] == jobs[1]['host_id'])
202 email_manager.manager.enqueue_notify_email = mock.MagicMock()
203 self.host_scheduler._check_host_assignments()
204 self.assertTrue(
205 email_manager.manager.enqueue_notify_email.call_count == 1)
206
207
208 def _check_agent_invariants(self, host, agent):
209 host_agents = list(self._dispatcher._host_agents[host.id])
210 self.assertTrue(len(host_agents) == 1)
211 self.assertTrue(host_agents[0].task.task.id == agent.id)
212 return host_agents[0]
213
214
215 def testLeasedFrontendTaskHost(self):
216 """Check that we don't scheduler a special task on an unleased host."""
217 # Create a special task without an hqe and make sure it isn't returned
218 # for scheduling till its host is leased.
219 host = self.db_helper.create_host('h1', deps=['a'])
220 task = self.db_helper.create_special_task(host_id=host.id)
221
222 tasks = self.job_query_manager.get_prioritized_special_tasks(
223 only_tasks_with_leased_hosts=True)
Prashanth Bf66d51b2014-05-06 12:42:25 -0700224 self.assertTrue(tasks == [])
Prashanth B4ec98672014-05-15 10:44:54 -0700225 tasks = self.job_query_manager.get_prioritized_special_tasks(
226 only_tasks_with_leased_hosts=False)
227 self.assertTrue(tasks[0].id == task.id)
228 self.host_scheduler._lease_hosts_of_frontend_tasks()
229 tasks = self.job_query_manager.get_prioritized_special_tasks(
230 only_tasks_with_leased_hosts=True)
231 self.assertTrue(tasks[0].id == task.id)
Prashanth Bf66d51b2014-05-06 12:42:25 -0700232
233
Prashanth B4ec98672014-05-15 10:44:54 -0700234 def testTickLockStep(self):
235 """Check that a frontend task and an hqe never run simultaneously."""
236
237 self.god.stub_with(monitor_db, '_inline_host_acquisition', False)
238
239 # Create a frontend special task against a host.
240 host = self.db_helper.create_host('h1', deps=set(['a']))
241 frontend_task = self.db_helper.create_special_task(host_id=host.id)
242 self._dispatcher._schedule_special_tasks()
243 # The frontend special task shouldn't get scheduled on the host till
244 # the host is leased.
245 self.assertFalse(self._dispatcher.host_has_agent(host))
246
247 # Create a job for the same host and make the host scheduler lease the
248 # host out to that job.
249 job = self.create_job(deps=set(['a']))
250 self.host_scheduler._schedule_jobs()
251 hqe = self.db_helper.get_hqes(job_id=job.id)[0]
252 tasks = self.job_query_manager.get_prioritized_special_tasks(
253 only_tasks_with_leased_hosts=True)
254 # We should not find the frontend special task, even though its host is
255 # now leased, because its leased by an active hqe.
256 self.assertTrue(len(tasks) == 1 and tasks[0].queue_entry_id == hqe.id)
257 self._dispatcher._schedule_special_tasks()
258 self.assertTrue(self._dispatcher.host_has_agent(host))
259
260 # Deactivate the hqe task and make sure the frontend task gets the host.
261 task = tasks[0]
262 self._dispatcher.remove_agent(self._check_agent_invariants(host, task))
263 task.is_complete = 1
264 task.is_active = 0
265 task.save()
266 self.db_helper.update_hqe(hqe.id, active=False)
267 self._dispatcher._schedule_special_tasks()
268 self.assertTrue(self._dispatcher.host_has_agent(host))
269 self._check_agent_invariants(host, frontend_task)
270
271 # Make sure we don't release the host being used by the incomplete task.
272 self.host_scheduler._release_hosts()
273 host = self.db_helper.get_host(hostname='h1')[0]
274 self.assertTrue(host.leased == True)
275