blob: ff95312b4127901c87006a875a08154738d4f66d [file] [log] [blame]
Prashanth Bf66d51b2014-05-06 12:42:25 -07001#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8import mock
9
10import common
11
12from autotest_lib.client.common_lib.test_utils import unittest
13from autotest_lib.frontend import setup_django_environment
14from autotest_lib.frontend.afe import frontend_test_utils
15from autotest_lib.frontend.afe import models
Fang Deng522bc532014-11-20 17:48:34 -080016from autotest_lib.server.cros.dynamic_suite import constants
Prashanth B4ec98672014-05-15 10:44:54 -070017from autotest_lib.scheduler import host_scheduler
18from autotest_lib.scheduler import monitor_db
Prashanth Bf66d51b2014-05-06 12:42:25 -070019from autotest_lib.scheduler import rdb
20from autotest_lib.scheduler import rdb_lib
21from autotest_lib.scheduler import rdb_testing_utils
Fang Deng522bc532014-11-20 17:48:34 -080022from autotest_lib.scheduler import scheduler_models
Prashanth Bf66d51b2014-05-06 12:42:25 -070023
24
25class QueryManagerTests(rdb_testing_utils.AbstractBaseRDBTester,
26 unittest.TestCase):
27 """Verify scheduler behavior when pending jobs are already given hosts."""
28
29 _config_section = 'AUTOTEST_WEB'
30
31
32 def testPendingQueueEntries(self):
33 """Test retrieval of pending queue entries."""
34 job = self.create_job(deps=set(['a']))
35
36 # Check that we don't pull the job we just created with only_hostless.
37 jobs_with_hosts = self.job_query_manager.get_pending_queue_entries(
38 only_hostless=True)
39 self.assertTrue(len(jobs_with_hosts) == 0)
40
41 # Check that only_hostless=False pulls new jobs, as always.
42 jobs_without_hosts = self.job_query_manager.get_pending_queue_entries(
43 only_hostless=False)
44 self.assertTrue(jobs_without_hosts[0].id == job.id and
45 jobs_without_hosts[0].host_id is None)
46
47
Jakob Juelichefa95312014-08-27 18:29:52 -070048 def testPendingQueueEntriesForShard(self):
49 """Test queue entries for shards aren't executed by master scheduler"""
50 job1 = self.create_job(deps=set(['a']))
51 job2 = self.create_job(deps=set(['b']))
52 shard = models.Shard.objects.create()
53 # Assign the job's label to a shard
54 shard.labels.add(job1.dependency_labels.all()[0])
55
56 # Check that we only pull jobs which are not assigned to a shard.
57 jobs_with_hosts = self.job_query_manager.get_pending_queue_entries()
58 self.assertTrue(len(jobs_with_hosts) == 1)
59 self.assertEqual(jobs_with_hosts[0].id, job2.id)
60
61
Prashanth Bf66d51b2014-05-06 12:42:25 -070062 def testHostQueries(self):
63 """Verify that the host query manager maintains its data structures."""
64 # Create a job and use the host_query_managers internal datastructures
65 # to retrieve its job info.
66 job = self.create_job(
67 deps=rdb_testing_utils.DEFAULT_DEPS,
68 acls=rdb_testing_utils.DEFAULT_ACLS)
69 queue_entries = self._dispatcher._refresh_pending_queue_entries()
70 job_manager = rdb_lib.JobQueryManager(queue_entries)
71 job_info = job_manager.get_job_info(queue_entries[0])
72 default_dep_ids = set([label.id for label in self.db_helper.get_labels(
73 name__in=rdb_testing_utils.DEFAULT_DEPS)])
74 default_acl_ids = set([acl.id for acl in self.db_helper.get_acls(
75 name__in=rdb_testing_utils.DEFAULT_ACLS)])
76 self.assertTrue(set(job_info['deps']) == default_dep_ids)
77 self.assertTrue(set(job_info['acls']) == default_acl_ids)
78
79
80 def testNewJobsWithHosts(self):
81 """Test that we handle inactive hqes with unleased hosts correctly."""
82 # Create a job and assign it an unleased host, then check that the
83 # HQE becomes active and the host remains assigned to it.
84 job = self.create_job(deps=['a'])
85 host = self.db_helper.create_host('h1', deps=['a'])
86 self.db_helper.add_host_to_job(host, job.id)
87
88 queue_entries = self._dispatcher._refresh_pending_queue_entries()
89 self._dispatcher._schedule_new_jobs()
90
91 host = self.db_helper.get_host(hostname='h1')[0]
92 self.assertTrue(host.leased == True and
93 host.status == models.Host.Status.READY)
94 hqes = list(self.db_helper.get_hqes(host_id=host.id))
95 self.assertTrue(len(hqes) == 1 and hqes[0].active and
96 hqes[0].status == models.HostQueueEntry.Status.QUEUED)
97
98
99 def testNewJobsWithInvalidHost(self):
100 """Test handling of inactive hqes assigned invalid, unleased hosts."""
101 # Create a job and assign it an unleased host, then check that the
102 # HQE becomes DOES NOT become active, because we validate the
103 # assignment again.
104 job = self.create_job(deps=['a'])
105 host = self.db_helper.create_host('h1', deps=['b'])
106 self.db_helper.add_host_to_job(host, job.id)
107
108 queue_entries = self._dispatcher._refresh_pending_queue_entries()
109 self._dispatcher._schedule_new_jobs()
110
111 host = self.db_helper.get_host(hostname='h1')[0]
112 self.assertTrue(host.leased == False and
113 host.status == models.Host.Status.READY)
114 hqes = list(self.db_helper.get_hqes(host_id=host.id))
115 self.assertTrue(len(hqes) == 1 and not hqes[0].active and
116 hqes[0].status == models.HostQueueEntry.Status.QUEUED)
117
118
119 def testNewJobsWithLeasedHost(self):
120 """Test handling of inactive hqes assigned leased hosts."""
121 # Create a job and assign it a leased host, then check that the
122 # HQE does not become active through the scheduler, and that the
123 # host gets released.
124 job = self.create_job(deps=['a'])
125 host = self.db_helper.create_host('h1', deps=['b'])
126 self.db_helper.add_host_to_job(host, job.id)
127 host.leased = 1
128 host.save()
129
130 rdb.batch_acquire_hosts = mock.MagicMock()
131 queue_entries = self._dispatcher._refresh_pending_queue_entries()
132 self._dispatcher._schedule_new_jobs()
133 self.assertTrue(rdb.batch_acquire_hosts.call_count == 0)
134 host = self.db_helper.get_host(hostname='h1')[0]
135 self.assertTrue(host.leased == True and
136 host.status == models.Host.Status.READY)
137 hqes = list(self.db_helper.get_hqes(host_id=host.id))
138 self.assertTrue(len(hqes) == 1 and not hqes[0].active and
139 hqes[0].status == models.HostQueueEntry.Status.QUEUED)
140 self.host_scheduler._release_hosts()
141 self.assertTrue(self.db_helper.get_host(hostname='h1')[0].leased == 0)
142
143
144 def testSpecialTaskOrdering(self):
145 """Test priority ordering of special tasks."""
146
147 # Create 2 special tasks, one with and one without an hqe.
Prashanth B4ec98672014-05-15 10:44:54 -0700148 # Activate the hqe and make sure it gets scheduled before the other.
Prashanth Bf66d51b2014-05-06 12:42:25 -0700149 host = self.db_helper.create_host('h1', deps=['a'])
Prashanth B4ec98672014-05-15 10:44:54 -0700150 job1 = self.create_job(deps=['a'])
151 self.db_helper.add_host_to_job(host, job1.id)
152 task1 = self.db_helper.create_special_task(job1.id)
153 hqe = self.db_helper.get_hqes(job=job1.id)[0]
Prashanth Bf66d51b2014-05-06 12:42:25 -0700154
Prashanth B4ec98672014-05-15 10:44:54 -0700155 # This task has no queue entry.
156 task2 = self.db_helper.create_special_task(host_id=host.id)
157
158 # Since the hqe task isn't active we get both back.
Prashanth Bf66d51b2014-05-06 12:42:25 -0700159 tasks = self.job_query_manager.get_prioritized_special_tasks()
Prashanth B4ec98672014-05-15 10:44:54 -0700160 self.assertTrue(tasks[1].queue_entry_id is None and
161 tasks[0].queue_entry_id == hqe.id)
162
163 # Activate the hqe and make sure the frontned task isn't returned.
164 self.db_helper.update_hqe(hqe.id, active=True)
165 tasks = self.job_query_manager.get_prioritized_special_tasks()
166 self.assertTrue(tasks[0].id == task1.id)
167
168
169class HostSchedulerTests(rdb_testing_utils.AbstractBaseRDBTester,
170 unittest.TestCase):
171 """Verify scheduler behavior when pending jobs are already given hosts."""
172
173 _config_section = 'AUTOTEST_WEB'
174
175
176 def setUp(self):
177 super(HostSchedulerTests, self).setUp()
178 self.host_scheduler = host_scheduler.HostScheduler()
179
180
181 def testSpecialTaskLocking(self):
182 """Test that frontend special tasks lock hosts."""
183 # Create multiple tasks with hosts and make sure the hosts get locked.
184 host = self.db_helper.create_host('h')
185 host1 = self.db_helper.create_host('h1')
186 task = self.db_helper.create_special_task(host_id=host.id)
187 task1 = self.db_helper.create_special_task(host_id=host1.id)
188 self.host_scheduler._lease_hosts_of_frontend_tasks()
189 self.assertTrue(self.db_helper.get_host(hostname='h')[0].leased == 1 and
190 self.db_helper.get_host(hostname='h1')[0].leased == 1)
191
192
193 def testJobScheduling(self):
194 """Test new host acquisitions."""
195 # Create a job that will find a host through the host scheduler, and
196 # make sure the hqe is activated, and a special task is created.
197 job = self.create_job(deps=set(['a']))
198 host = self.db_helper.create_host('h1', deps=set(['a']))
199 self.host_scheduler._schedule_jobs()
200 hqe = self.db_helper.get_hqes(job_id=job.id)[0]
201 self.assertTrue(hqe.active and hqe.host_id == host.id and
202 hqe.status == models.HostQueueEntry.Status.QUEUED)
203 task = self.db_helper.get_tasks(queue_entry_id=hqe.id)[0]
204 self.assertTrue(task.is_active == 0 and task.host_id == host.id)
205
206
Prashanth B4ec98672014-05-15 10:44:54 -0700207 def _check_agent_invariants(self, host, agent):
208 host_agents = list(self._dispatcher._host_agents[host.id])
209 self.assertTrue(len(host_agents) == 1)
210 self.assertTrue(host_agents[0].task.task.id == agent.id)
211 return host_agents[0]
212
213
214 def testLeasedFrontendTaskHost(self):
215 """Check that we don't scheduler a special task on an unleased host."""
216 # Create a special task without an hqe and make sure it isn't returned
217 # for scheduling till its host is leased.
218 host = self.db_helper.create_host('h1', deps=['a'])
219 task = self.db_helper.create_special_task(host_id=host.id)
220
221 tasks = self.job_query_manager.get_prioritized_special_tasks(
222 only_tasks_with_leased_hosts=True)
Prashanth Bf66d51b2014-05-06 12:42:25 -0700223 self.assertTrue(tasks == [])
Prashanth B4ec98672014-05-15 10:44:54 -0700224 tasks = self.job_query_manager.get_prioritized_special_tasks(
225 only_tasks_with_leased_hosts=False)
226 self.assertTrue(tasks[0].id == task.id)
227 self.host_scheduler._lease_hosts_of_frontend_tasks()
228 tasks = self.job_query_manager.get_prioritized_special_tasks(
229 only_tasks_with_leased_hosts=True)
230 self.assertTrue(tasks[0].id == task.id)
Prashanth Bf66d51b2014-05-06 12:42:25 -0700231
232
Prashanth B4ec98672014-05-15 10:44:54 -0700233 def testTickLockStep(self):
234 """Check that a frontend task and an hqe never run simultaneously."""
235
236 self.god.stub_with(monitor_db, '_inline_host_acquisition', False)
237
238 # Create a frontend special task against a host.
239 host = self.db_helper.create_host('h1', deps=set(['a']))
240 frontend_task = self.db_helper.create_special_task(host_id=host.id)
241 self._dispatcher._schedule_special_tasks()
242 # The frontend special task shouldn't get scheduled on the host till
243 # the host is leased.
244 self.assertFalse(self._dispatcher.host_has_agent(host))
245
246 # Create a job for the same host and make the host scheduler lease the
247 # host out to that job.
248 job = self.create_job(deps=set(['a']))
249 self.host_scheduler._schedule_jobs()
250 hqe = self.db_helper.get_hqes(job_id=job.id)[0]
251 tasks = self.job_query_manager.get_prioritized_special_tasks(
252 only_tasks_with_leased_hosts=True)
253 # We should not find the frontend special task, even though its host is
254 # now leased, because its leased by an active hqe.
255 self.assertTrue(len(tasks) == 1 and tasks[0].queue_entry_id == hqe.id)
256 self._dispatcher._schedule_special_tasks()
257 self.assertTrue(self._dispatcher.host_has_agent(host))
258
259 # Deactivate the hqe task and make sure the frontend task gets the host.
260 task = tasks[0]
261 self._dispatcher.remove_agent(self._check_agent_invariants(host, task))
262 task.is_complete = 1
263 task.is_active = 0
264 task.save()
265 self.db_helper.update_hqe(hqe.id, active=False)
266 self._dispatcher._schedule_special_tasks()
267 self.assertTrue(self._dispatcher.host_has_agent(host))
268 self._check_agent_invariants(host, frontend_task)
269
270 # Make sure we don't release the host being used by the incomplete task.
271 self.host_scheduler._release_hosts()
272 host = self.db_helper.get_host(hostname='h1')[0]
273 self.assertTrue(host.leased == True)
274
Fang Deng522bc532014-11-20 17:48:34 -0800275
276class SuiteRecorderTest(rdb_testing_utils.AbstractBaseRDBTester,
277 unittest.TestCase):
278 """Test the functionality of SuiteRecorder"""
279
280 _config_section = 'AUTOTEST_WEB'
281
282 def testGetSuiteHostAssignment(self):
283 """Test the initialization of SuiteRecord."""
284 hosts = []
285 num = 4
286 for i in range (0, num):
287 hosts.append(self.db_helper.create_host(
288 'h%d' % i, deps=set(['board:lumpy'])))
289 single_job = self.create_job(deps=set(['a']))
290 jobs_1 = self.create_suite(num=2, board='board:lumpy')
291 jobs_2 = self.create_suite(num=2, board='board:lumpy')
292 # We have 4 hosts, 5 jobs, one job in the second suite won't
293 # get a host.
294 all_jobs = ([single_job] +
295 [jobs_1[k] for k in jobs_1 if k !='parent_job'] +
296 [jobs_2[k] for k in jobs_2 if k !='parent_job'])
297 for i in range(0, num):
298 self.db_helper.add_host_to_job(hosts[i], all_jobs[i].id,
299 activate=True)
300 r = host_scheduler.SuiteRecorder(self.job_query_manager)
301 self.assertEqual(r.suite_host_num,
302 {jobs_1['parent_job'].id:2,
303 jobs_2['parent_job'].id:1})
304 self.assertEqual(r.hosts_to_suites,
305 {hosts[1].id: jobs_1['parent_job'].id,
306 hosts[2].id: jobs_1['parent_job'].id,
307 hosts[3].id: jobs_2['parent_job'].id})
308
309
310 def verify_state(self, recorder, suite_host_num, hosts_to_suites):
311 """Verify the suite, host information held by SuiteRecorder.
312
313 @param recorder: A SuiteRecorder object.
314 @param suite_host_num: a dict, expected value of suite_host_num.
315 @param hosts_to_suites: a dict, expected value of hosts_to_suites.
316 """
317 self.assertEqual(recorder.suite_host_num, suite_host_num)
318 self.assertEqual(recorder.hosts_to_suites, hosts_to_suites)
319
320
321 def assign_host_to_job(self, host, job, recorder=None):
322 """A helper function that adds a host to a job and record it.
323
324 @param host: A Host object.
325 @param job: A Job object.
326 @param recorder: A SuiteRecorder object to record the assignment.
327
328 @return a HostQueueEntry object that binds the host and job together.
329 """
330 self.db_helper.add_host_to_job(host, job)
331 hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%s',
332 params=(job.id,))[0]
333 if recorder:
334 recorder.record_assignment(hqe)
335 return hqe
336
337
338 def testRecordAssignmentAndRelease(self):
339 """Test when a host is assigned to suite"""
340 r = host_scheduler.SuiteRecorder(self.job_query_manager)
341 self.verify_state(r, {}, {})
342 host1 = self.db_helper.create_host('h1')
343 host2 = self.db_helper.create_host('h2')
344 jobs = self.create_suite(num=2)
345 hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%s',
346 params=(jobs[0].id,))[0]
347 # HQE got a host.
348 hqe = self.assign_host_to_job(host1, jobs[0], r)
349 self.verify_state(r, {jobs['parent_job'].id:1},
350 {host1.id: jobs['parent_job'].id})
351 # Tried to call record_assignment again, nothing should happen.
352 r.record_assignment(hqe)
353 self.verify_state(r, {jobs['parent_job'].id:1},
354 {host1.id: jobs['parent_job'].id})
355 # Second hqe got a host
356 self.assign_host_to_job(host2, jobs[1], r)
357 self.verify_state(r, {jobs['parent_job'].id:2},
358 {host1.id: jobs['parent_job'].id,
359 host2.id: jobs['parent_job'].id})
360 # Release host1
361 r.record_release([host1])
362 self.verify_state(r, {jobs['parent_job'].id:1},
363 {host2.id: jobs['parent_job'].id})
364 # Release host2
365 r.record_release([host2])
366 self.verify_state(r, {}, {})
367
368
369 def testGetMinDuts(self):
370 """Test get min dut for suite."""
371 host1 = self.db_helper.create_host('h1')
372 host2 = self.db_helper.create_host('h2')
373 host3 = self.db_helper.create_host('h3')
374 jobs = self.create_suite(num=3)
375 pid = jobs['parent_job'].id
376 # Set min_dut=1 for the suite as a job keyval.
377 keyval = models.JobKeyval(
378 job_id=pid, key=constants.SUITE_MIN_DUTS_KEY, value=2)
379 keyval.save()
380 r = host_scheduler.SuiteRecorder(self.job_query_manager)
381 # Not job has got any host, min dut to request should equal to what's
382 # specified in the job keyval.
383 self.assertEqual(r.get_min_duts([pid]), {pid: 2})
384 self.assign_host_to_job(host1, jobs[0], r)
385 self.assertEqual(r.get_min_duts([pid]), {pid: 1})
386 self.assign_host_to_job(host2, jobs[1], r)
387 self.assertEqual(r.get_min_duts([pid]), {pid: 0})
388 self.assign_host_to_job(host3, jobs[2], r)
389 self.assertEqual(r.get_min_duts([pid]), {pid: 0})
390 r.record_release([host1])
391 self.assertEqual(r.get_min_duts([pid]), {pid: 0})
392 r.record_release([host2])
393 self.assertEqual(r.get_min_duts([pid]), {pid: 1})
394 r.record_release([host3])
395 self.assertEqual(r.get_min_duts([pid]), {pid: 2})
396
397if __name__ == '__main__':
398 unittest.main()
399