blob: 77ca0e51d1f7b6ad56cd8718f0e67cc0bdf9a129 [file] [log] [blame]
Prashanth Bf66d51b2014-05-06 12:42:25 -07001#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8import mock
9
10import common
11
12from autotest_lib.client.common_lib.test_utils import unittest
13from autotest_lib.frontend import setup_django_environment
14from autotest_lib.frontend.afe import frontend_test_utils
15from autotest_lib.frontend.afe import models
Fang Deng522bc532014-11-20 17:48:34 -080016from autotest_lib.server.cros.dynamic_suite import constants
Prashanth B4ec98672014-05-15 10:44:54 -070017from autotest_lib.scheduler import email_manager
18from autotest_lib.scheduler import host_scheduler
19from autotest_lib.scheduler import monitor_db
Prashanth Bf66d51b2014-05-06 12:42:25 -070020from autotest_lib.scheduler import rdb
21from autotest_lib.scheduler import rdb_lib
22from autotest_lib.scheduler import rdb_testing_utils
Fang Deng522bc532014-11-20 17:48:34 -080023from autotest_lib.scheduler import scheduler_models
Prashanth Bf66d51b2014-05-06 12:42:25 -070024
25
26class QueryManagerTests(rdb_testing_utils.AbstractBaseRDBTester,
27 unittest.TestCase):
28 """Verify scheduler behavior when pending jobs are already given hosts."""
29
30 _config_section = 'AUTOTEST_WEB'
31
32
33 def testPendingQueueEntries(self):
34 """Test retrieval of pending queue entries."""
35 job = self.create_job(deps=set(['a']))
36
37 # Check that we don't pull the job we just created with only_hostless.
38 jobs_with_hosts = self.job_query_manager.get_pending_queue_entries(
39 only_hostless=True)
40 self.assertTrue(len(jobs_with_hosts) == 0)
41
42 # Check that only_hostless=False pulls new jobs, as always.
43 jobs_without_hosts = self.job_query_manager.get_pending_queue_entries(
44 only_hostless=False)
45 self.assertTrue(jobs_without_hosts[0].id == job.id and
46 jobs_without_hosts[0].host_id is None)
47
48
Jakob Juelichefa95312014-08-27 18:29:52 -070049 def testPendingQueueEntriesForShard(self):
50 """Test queue entries for shards aren't executed by master scheduler"""
51 job1 = self.create_job(deps=set(['a']))
52 job2 = self.create_job(deps=set(['b']))
53 shard = models.Shard.objects.create()
54 # Assign the job's label to a shard
55 shard.labels.add(job1.dependency_labels.all()[0])
56
57 # Check that we only pull jobs which are not assigned to a shard.
58 jobs_with_hosts = self.job_query_manager.get_pending_queue_entries()
59 self.assertTrue(len(jobs_with_hosts) == 1)
60 self.assertEqual(jobs_with_hosts[0].id, job2.id)
61
62
Prashanth Bf66d51b2014-05-06 12:42:25 -070063 def testHostQueries(self):
64 """Verify that the host query manager maintains its data structures."""
65 # Create a job and use the host_query_managers internal datastructures
66 # to retrieve its job info.
67 job = self.create_job(
68 deps=rdb_testing_utils.DEFAULT_DEPS,
69 acls=rdb_testing_utils.DEFAULT_ACLS)
70 queue_entries = self._dispatcher._refresh_pending_queue_entries()
71 job_manager = rdb_lib.JobQueryManager(queue_entries)
72 job_info = job_manager.get_job_info(queue_entries[0])
73 default_dep_ids = set([label.id for label in self.db_helper.get_labels(
74 name__in=rdb_testing_utils.DEFAULT_DEPS)])
75 default_acl_ids = set([acl.id for acl in self.db_helper.get_acls(
76 name__in=rdb_testing_utils.DEFAULT_ACLS)])
77 self.assertTrue(set(job_info['deps']) == default_dep_ids)
78 self.assertTrue(set(job_info['acls']) == default_acl_ids)
79
80
81 def testNewJobsWithHosts(self):
82 """Test that we handle inactive hqes with unleased hosts correctly."""
83 # Create a job and assign it an unleased host, then check that the
84 # HQE becomes active and the host remains assigned to it.
85 job = self.create_job(deps=['a'])
86 host = self.db_helper.create_host('h1', deps=['a'])
87 self.db_helper.add_host_to_job(host, job.id)
88
89 queue_entries = self._dispatcher._refresh_pending_queue_entries()
90 self._dispatcher._schedule_new_jobs()
91
92 host = self.db_helper.get_host(hostname='h1')[0]
93 self.assertTrue(host.leased == True and
94 host.status == models.Host.Status.READY)
95 hqes = list(self.db_helper.get_hqes(host_id=host.id))
96 self.assertTrue(len(hqes) == 1 and hqes[0].active and
97 hqes[0].status == models.HostQueueEntry.Status.QUEUED)
98
99
100 def testNewJobsWithInvalidHost(self):
101 """Test handling of inactive hqes assigned invalid, unleased hosts."""
102 # Create a job and assign it an unleased host, then check that the
103 # HQE becomes DOES NOT become active, because we validate the
104 # assignment again.
105 job = self.create_job(deps=['a'])
106 host = self.db_helper.create_host('h1', deps=['b'])
107 self.db_helper.add_host_to_job(host, job.id)
108
109 queue_entries = self._dispatcher._refresh_pending_queue_entries()
110 self._dispatcher._schedule_new_jobs()
111
112 host = self.db_helper.get_host(hostname='h1')[0]
113 self.assertTrue(host.leased == False and
114 host.status == models.Host.Status.READY)
115 hqes = list(self.db_helper.get_hqes(host_id=host.id))
116 self.assertTrue(len(hqes) == 1 and not hqes[0].active and
117 hqes[0].status == models.HostQueueEntry.Status.QUEUED)
118
119
120 def testNewJobsWithLeasedHost(self):
121 """Test handling of inactive hqes assigned leased hosts."""
122 # Create a job and assign it a leased host, then check that the
123 # HQE does not become active through the scheduler, and that the
124 # host gets released.
125 job = self.create_job(deps=['a'])
126 host = self.db_helper.create_host('h1', deps=['b'])
127 self.db_helper.add_host_to_job(host, job.id)
128 host.leased = 1
129 host.save()
130
131 rdb.batch_acquire_hosts = mock.MagicMock()
132 queue_entries = self._dispatcher._refresh_pending_queue_entries()
133 self._dispatcher._schedule_new_jobs()
134 self.assertTrue(rdb.batch_acquire_hosts.call_count == 0)
135 host = self.db_helper.get_host(hostname='h1')[0]
136 self.assertTrue(host.leased == True and
137 host.status == models.Host.Status.READY)
138 hqes = list(self.db_helper.get_hqes(host_id=host.id))
139 self.assertTrue(len(hqes) == 1 and not hqes[0].active and
140 hqes[0].status == models.HostQueueEntry.Status.QUEUED)
141 self.host_scheduler._release_hosts()
142 self.assertTrue(self.db_helper.get_host(hostname='h1')[0].leased == 0)
143
144
145 def testSpecialTaskOrdering(self):
146 """Test priority ordering of special tasks."""
147
148 # Create 2 special tasks, one with and one without an hqe.
Prashanth B4ec98672014-05-15 10:44:54 -0700149 # Activate the hqe and make sure it gets scheduled before the other.
Prashanth Bf66d51b2014-05-06 12:42:25 -0700150 host = self.db_helper.create_host('h1', deps=['a'])
Prashanth B4ec98672014-05-15 10:44:54 -0700151 job1 = self.create_job(deps=['a'])
152 self.db_helper.add_host_to_job(host, job1.id)
153 task1 = self.db_helper.create_special_task(job1.id)
154 hqe = self.db_helper.get_hqes(job=job1.id)[0]
Prashanth Bf66d51b2014-05-06 12:42:25 -0700155
Prashanth B4ec98672014-05-15 10:44:54 -0700156 # This task has no queue entry.
157 task2 = self.db_helper.create_special_task(host_id=host.id)
158
159 # Since the hqe task isn't active we get both back.
Prashanth Bf66d51b2014-05-06 12:42:25 -0700160 tasks = self.job_query_manager.get_prioritized_special_tasks()
Prashanth B4ec98672014-05-15 10:44:54 -0700161 self.assertTrue(tasks[1].queue_entry_id is None and
162 tasks[0].queue_entry_id == hqe.id)
163
164 # Activate the hqe and make sure the frontned task isn't returned.
165 self.db_helper.update_hqe(hqe.id, active=True)
166 tasks = self.job_query_manager.get_prioritized_special_tasks()
167 self.assertTrue(tasks[0].id == task1.id)
168
169
170class HostSchedulerTests(rdb_testing_utils.AbstractBaseRDBTester,
171 unittest.TestCase):
172 """Verify scheduler behavior when pending jobs are already given hosts."""
173
174 _config_section = 'AUTOTEST_WEB'
175
176
177 def setUp(self):
178 super(HostSchedulerTests, self).setUp()
179 self.host_scheduler = host_scheduler.HostScheduler()
180
181
182 def testSpecialTaskLocking(self):
183 """Test that frontend special tasks lock hosts."""
184 # Create multiple tasks with hosts and make sure the hosts get locked.
185 host = self.db_helper.create_host('h')
186 host1 = self.db_helper.create_host('h1')
187 task = self.db_helper.create_special_task(host_id=host.id)
188 task1 = self.db_helper.create_special_task(host_id=host1.id)
189 self.host_scheduler._lease_hosts_of_frontend_tasks()
190 self.assertTrue(self.db_helper.get_host(hostname='h')[0].leased == 1 and
191 self.db_helper.get_host(hostname='h1')[0].leased == 1)
192
193
194 def testJobScheduling(self):
195 """Test new host acquisitions."""
196 # Create a job that will find a host through the host scheduler, and
197 # make sure the hqe is activated, and a special task is created.
198 job = self.create_job(deps=set(['a']))
199 host = self.db_helper.create_host('h1', deps=set(['a']))
200 self.host_scheduler._schedule_jobs()
201 hqe = self.db_helper.get_hqes(job_id=job.id)[0]
202 self.assertTrue(hqe.active and hqe.host_id == host.id and
203 hqe.status == models.HostQueueEntry.Status.QUEUED)
204 task = self.db_helper.get_tasks(queue_entry_id=hqe.id)[0]
205 self.assertTrue(task.is_active == 0 and task.host_id == host.id)
206
207
208 def testOverlappingJobs(self):
209 """Check that we catch all cases of an overlapping job."""
210 job_1 = self.create_job(deps=set(['a']))
211 job_2 = self.create_job(deps=set(['a']))
212 host = self.db_helper.create_host('h1', deps=set(['a']))
213 self.db_helper.add_host_to_job(host, job_1.id, activate=True)
214 self.db_helper.add_host_to_job(host, job_2.id, activate=True)
215 jobs = (self.host_scheduler.job_query_manager.get_overlapping_jobs())
216 self.assertTrue(len(jobs) == 2 and
217 jobs[0]['host_id'] == jobs[1]['host_id'])
218 email_manager.manager.enqueue_notify_email = mock.MagicMock()
219 self.host_scheduler._check_host_assignments()
220 self.assertTrue(
221 email_manager.manager.enqueue_notify_email.call_count == 1)
222
223
224 def _check_agent_invariants(self, host, agent):
225 host_agents = list(self._dispatcher._host_agents[host.id])
226 self.assertTrue(len(host_agents) == 1)
227 self.assertTrue(host_agents[0].task.task.id == agent.id)
228 return host_agents[0]
229
230
231 def testLeasedFrontendTaskHost(self):
232 """Check that we don't scheduler a special task on an unleased host."""
233 # Create a special task without an hqe and make sure it isn't returned
234 # for scheduling till its host is leased.
235 host = self.db_helper.create_host('h1', deps=['a'])
236 task = self.db_helper.create_special_task(host_id=host.id)
237
238 tasks = self.job_query_manager.get_prioritized_special_tasks(
239 only_tasks_with_leased_hosts=True)
Prashanth Bf66d51b2014-05-06 12:42:25 -0700240 self.assertTrue(tasks == [])
Prashanth B4ec98672014-05-15 10:44:54 -0700241 tasks = self.job_query_manager.get_prioritized_special_tasks(
242 only_tasks_with_leased_hosts=False)
243 self.assertTrue(tasks[0].id == task.id)
244 self.host_scheduler._lease_hosts_of_frontend_tasks()
245 tasks = self.job_query_manager.get_prioritized_special_tasks(
246 only_tasks_with_leased_hosts=True)
247 self.assertTrue(tasks[0].id == task.id)
Prashanth Bf66d51b2014-05-06 12:42:25 -0700248
249
Prashanth B4ec98672014-05-15 10:44:54 -0700250 def testTickLockStep(self):
251 """Check that a frontend task and an hqe never run simultaneously."""
252
253 self.god.stub_with(monitor_db, '_inline_host_acquisition', False)
254
255 # Create a frontend special task against a host.
256 host = self.db_helper.create_host('h1', deps=set(['a']))
257 frontend_task = self.db_helper.create_special_task(host_id=host.id)
258 self._dispatcher._schedule_special_tasks()
259 # The frontend special task shouldn't get scheduled on the host till
260 # the host is leased.
261 self.assertFalse(self._dispatcher.host_has_agent(host))
262
263 # Create a job for the same host and make the host scheduler lease the
264 # host out to that job.
265 job = self.create_job(deps=set(['a']))
266 self.host_scheduler._schedule_jobs()
267 hqe = self.db_helper.get_hqes(job_id=job.id)[0]
268 tasks = self.job_query_manager.get_prioritized_special_tasks(
269 only_tasks_with_leased_hosts=True)
270 # We should not find the frontend special task, even though its host is
271 # now leased, because its leased by an active hqe.
272 self.assertTrue(len(tasks) == 1 and tasks[0].queue_entry_id == hqe.id)
273 self._dispatcher._schedule_special_tasks()
274 self.assertTrue(self._dispatcher.host_has_agent(host))
275
276 # Deactivate the hqe task and make sure the frontend task gets the host.
277 task = tasks[0]
278 self._dispatcher.remove_agent(self._check_agent_invariants(host, task))
279 task.is_complete = 1
280 task.is_active = 0
281 task.save()
282 self.db_helper.update_hqe(hqe.id, active=False)
283 self._dispatcher._schedule_special_tasks()
284 self.assertTrue(self._dispatcher.host_has_agent(host))
285 self._check_agent_invariants(host, frontend_task)
286
287 # Make sure we don't release the host being used by the incomplete task.
288 self.host_scheduler._release_hosts()
289 host = self.db_helper.get_host(hostname='h1')[0]
290 self.assertTrue(host.leased == True)
291
Fang Deng522bc532014-11-20 17:48:34 -0800292
293class SuiteRecorderTest(rdb_testing_utils.AbstractBaseRDBTester,
294 unittest.TestCase):
295 """Test the functionality of SuiteRecorder"""
296
297 _config_section = 'AUTOTEST_WEB'
298
299 def testGetSuiteHostAssignment(self):
300 """Test the initialization of SuiteRecord."""
301 hosts = []
302 num = 4
303 for i in range (0, num):
304 hosts.append(self.db_helper.create_host(
305 'h%d' % i, deps=set(['board:lumpy'])))
306 single_job = self.create_job(deps=set(['a']))
307 jobs_1 = self.create_suite(num=2, board='board:lumpy')
308 jobs_2 = self.create_suite(num=2, board='board:lumpy')
309 # We have 4 hosts, 5 jobs, one job in the second suite won't
310 # get a host.
311 all_jobs = ([single_job] +
312 [jobs_1[k] for k in jobs_1 if k !='parent_job'] +
313 [jobs_2[k] for k in jobs_2 if k !='parent_job'])
314 for i in range(0, num):
315 self.db_helper.add_host_to_job(hosts[i], all_jobs[i].id,
316 activate=True)
317 r = host_scheduler.SuiteRecorder(self.job_query_manager)
318 self.assertEqual(r.suite_host_num,
319 {jobs_1['parent_job'].id:2,
320 jobs_2['parent_job'].id:1})
321 self.assertEqual(r.hosts_to_suites,
322 {hosts[1].id: jobs_1['parent_job'].id,
323 hosts[2].id: jobs_1['parent_job'].id,
324 hosts[3].id: jobs_2['parent_job'].id})
325
326
327 def verify_state(self, recorder, suite_host_num, hosts_to_suites):
328 """Verify the suite, host information held by SuiteRecorder.
329
330 @param recorder: A SuiteRecorder object.
331 @param suite_host_num: a dict, expected value of suite_host_num.
332 @param hosts_to_suites: a dict, expected value of hosts_to_suites.
333 """
334 self.assertEqual(recorder.suite_host_num, suite_host_num)
335 self.assertEqual(recorder.hosts_to_suites, hosts_to_suites)
336
337
338 def assign_host_to_job(self, host, job, recorder=None):
339 """A helper function that adds a host to a job and record it.
340
341 @param host: A Host object.
342 @param job: A Job object.
343 @param recorder: A SuiteRecorder object to record the assignment.
344
345 @return a HostQueueEntry object that binds the host and job together.
346 """
347 self.db_helper.add_host_to_job(host, job)
348 hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%s',
349 params=(job.id,))[0]
350 if recorder:
351 recorder.record_assignment(hqe)
352 return hqe
353
354
355 def testRecordAssignmentAndRelease(self):
356 """Test when a host is assigned to suite"""
357 r = host_scheduler.SuiteRecorder(self.job_query_manager)
358 self.verify_state(r, {}, {})
359 host1 = self.db_helper.create_host('h1')
360 host2 = self.db_helper.create_host('h2')
361 jobs = self.create_suite(num=2)
362 hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%s',
363 params=(jobs[0].id,))[0]
364 # HQE got a host.
365 hqe = self.assign_host_to_job(host1, jobs[0], r)
366 self.verify_state(r, {jobs['parent_job'].id:1},
367 {host1.id: jobs['parent_job'].id})
368 # Tried to call record_assignment again, nothing should happen.
369 r.record_assignment(hqe)
370 self.verify_state(r, {jobs['parent_job'].id:1},
371 {host1.id: jobs['parent_job'].id})
372 # Second hqe got a host
373 self.assign_host_to_job(host2, jobs[1], r)
374 self.verify_state(r, {jobs['parent_job'].id:2},
375 {host1.id: jobs['parent_job'].id,
376 host2.id: jobs['parent_job'].id})
377 # Release host1
378 r.record_release([host1])
379 self.verify_state(r, {jobs['parent_job'].id:1},
380 {host2.id: jobs['parent_job'].id})
381 # Release host2
382 r.record_release([host2])
383 self.verify_state(r, {}, {})
384
385
386 def testGetMinDuts(self):
387 """Test get min dut for suite."""
388 host1 = self.db_helper.create_host('h1')
389 host2 = self.db_helper.create_host('h2')
390 host3 = self.db_helper.create_host('h3')
391 jobs = self.create_suite(num=3)
392 pid = jobs['parent_job'].id
393 # Set min_dut=1 for the suite as a job keyval.
394 keyval = models.JobKeyval(
395 job_id=pid, key=constants.SUITE_MIN_DUTS_KEY, value=2)
396 keyval.save()
397 r = host_scheduler.SuiteRecorder(self.job_query_manager)
398 # Not job has got any host, min dut to request should equal to what's
399 # specified in the job keyval.
400 self.assertEqual(r.get_min_duts([pid]), {pid: 2})
401 self.assign_host_to_job(host1, jobs[0], r)
402 self.assertEqual(r.get_min_duts([pid]), {pid: 1})
403 self.assign_host_to_job(host2, jobs[1], r)
404 self.assertEqual(r.get_min_duts([pid]), {pid: 0})
405 self.assign_host_to_job(host3, jobs[2], r)
406 self.assertEqual(r.get_min_duts([pid]), {pid: 0})
407 r.record_release([host1])
408 self.assertEqual(r.get_min_duts([pid]), {pid: 0})
409 r.record_release([host2])
410 self.assertEqual(r.get_min_duts([pid]), {pid: 1})
411 r.record_release([host3])
412 self.assertEqual(r.get_min_duts([pid]), {pid: 2})
413
414if __name__ == '__main__':
415 unittest.main()
416