blob: d1d078a2974b71b3c22baecfceae9c3945f2c8ad [file] [log] [blame]
Prashanth Bf66d51b2014-05-06 12:42:25 -07001#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8import mock
Justin Giorgi67ad67d2016-06-29 14:41:04 -07009import unittest
Prashanth Bf66d51b2014-05-06 12:42:25 -070010
11import common
Prashanth Bf66d51b2014-05-06 12:42:25 -070012from autotest_lib.frontend import setup_django_environment
13from autotest_lib.frontend.afe import frontend_test_utils
14from autotest_lib.frontend.afe import models
Fang Deng522bc532014-11-20 17:48:34 -080015from autotest_lib.server.cros.dynamic_suite import constants
Prashanth B4ec98672014-05-15 10:44:54 -070016from autotest_lib.scheduler import host_scheduler
17from autotest_lib.scheduler import monitor_db
Prashanth Bf66d51b2014-05-06 12:42:25 -070018from autotest_lib.scheduler import rdb
19from autotest_lib.scheduler import rdb_lib
20from autotest_lib.scheduler import rdb_testing_utils
Fang Deng522bc532014-11-20 17:48:34 -080021from autotest_lib.scheduler import scheduler_models
Prashanth Bf66d51b2014-05-06 12:42:25 -070022
23
24class QueryManagerTests(rdb_testing_utils.AbstractBaseRDBTester,
25 unittest.TestCase):
26 """Verify scheduler behavior when pending jobs are already given hosts."""
27
28 _config_section = 'AUTOTEST_WEB'
29
30
31 def testPendingQueueEntries(self):
32 """Test retrieval of pending queue entries."""
33 job = self.create_job(deps=set(['a']))
34
35 # Check that we don't pull the job we just created with only_hostless.
36 jobs_with_hosts = self.job_query_manager.get_pending_queue_entries(
37 only_hostless=True)
38 self.assertTrue(len(jobs_with_hosts) == 0)
39
40 # Check that only_hostless=False pulls new jobs, as always.
41 jobs_without_hosts = self.job_query_manager.get_pending_queue_entries(
42 only_hostless=False)
43 self.assertTrue(jobs_without_hosts[0].id == job.id and
44 jobs_without_hosts[0].host_id is None)
45
46
Jakob Juelichefa95312014-08-27 18:29:52 -070047 def testPendingQueueEntriesForShard(self):
48 """Test queue entries for shards aren't executed by master scheduler"""
49 job1 = self.create_job(deps=set(['a']))
50 job2 = self.create_job(deps=set(['b']))
51 shard = models.Shard.objects.create()
52 # Assign the job's label to a shard
53 shard.labels.add(job1.dependency_labels.all()[0])
54
55 # Check that we only pull jobs which are not assigned to a shard.
56 jobs_with_hosts = self.job_query_manager.get_pending_queue_entries()
57 self.assertTrue(len(jobs_with_hosts) == 1)
58 self.assertEqual(jobs_with_hosts[0].id, job2.id)
59
60
Prashanth Bf66d51b2014-05-06 12:42:25 -070061 def testHostQueries(self):
62 """Verify that the host query manager maintains its data structures."""
63 # Create a job and use the host_query_managers internal datastructures
64 # to retrieve its job info.
65 job = self.create_job(
66 deps=rdb_testing_utils.DEFAULT_DEPS,
67 acls=rdb_testing_utils.DEFAULT_ACLS)
68 queue_entries = self._dispatcher._refresh_pending_queue_entries()
69 job_manager = rdb_lib.JobQueryManager(queue_entries)
70 job_info = job_manager.get_job_info(queue_entries[0])
71 default_dep_ids = set([label.id for label in self.db_helper.get_labels(
72 name__in=rdb_testing_utils.DEFAULT_DEPS)])
73 default_acl_ids = set([acl.id for acl in self.db_helper.get_acls(
74 name__in=rdb_testing_utils.DEFAULT_ACLS)])
75 self.assertTrue(set(job_info['deps']) == default_dep_ids)
76 self.assertTrue(set(job_info['acls']) == default_acl_ids)
77
78
79 def testNewJobsWithHosts(self):
80 """Test that we handle inactive hqes with unleased hosts correctly."""
81 # Create a job and assign it an unleased host, then check that the
82 # HQE becomes active and the host remains assigned to it.
83 job = self.create_job(deps=['a'])
84 host = self.db_helper.create_host('h1', deps=['a'])
85 self.db_helper.add_host_to_job(host, job.id)
86
87 queue_entries = self._dispatcher._refresh_pending_queue_entries()
88 self._dispatcher._schedule_new_jobs()
89
90 host = self.db_helper.get_host(hostname='h1')[0]
91 self.assertTrue(host.leased == True and
92 host.status == models.Host.Status.READY)
93 hqes = list(self.db_helper.get_hqes(host_id=host.id))
94 self.assertTrue(len(hqes) == 1 and hqes[0].active and
95 hqes[0].status == models.HostQueueEntry.Status.QUEUED)
96
97
98 def testNewJobsWithInvalidHost(self):
99 """Test handling of inactive hqes assigned invalid, unleased hosts."""
100 # Create a job and assign it an unleased host, then check that the
101 # HQE becomes DOES NOT become active, because we validate the
102 # assignment again.
103 job = self.create_job(deps=['a'])
104 host = self.db_helper.create_host('h1', deps=['b'])
105 self.db_helper.add_host_to_job(host, job.id)
106
107 queue_entries = self._dispatcher._refresh_pending_queue_entries()
108 self._dispatcher._schedule_new_jobs()
109
110 host = self.db_helper.get_host(hostname='h1')[0]
111 self.assertTrue(host.leased == False and
112 host.status == models.Host.Status.READY)
113 hqes = list(self.db_helper.get_hqes(host_id=host.id))
114 self.assertTrue(len(hqes) == 1 and not hqes[0].active and
115 hqes[0].status == models.HostQueueEntry.Status.QUEUED)
116
117
118 def testNewJobsWithLeasedHost(self):
119 """Test handling of inactive hqes assigned leased hosts."""
120 # Create a job and assign it a leased host, then check that the
121 # HQE does not become active through the scheduler, and that the
122 # host gets released.
123 job = self.create_job(deps=['a'])
124 host = self.db_helper.create_host('h1', deps=['b'])
125 self.db_helper.add_host_to_job(host, job.id)
126 host.leased = 1
127 host.save()
128
129 rdb.batch_acquire_hosts = mock.MagicMock()
130 queue_entries = self._dispatcher._refresh_pending_queue_entries()
131 self._dispatcher._schedule_new_jobs()
132 self.assertTrue(rdb.batch_acquire_hosts.call_count == 0)
133 host = self.db_helper.get_host(hostname='h1')[0]
134 self.assertTrue(host.leased == True and
135 host.status == models.Host.Status.READY)
136 hqes = list(self.db_helper.get_hqes(host_id=host.id))
137 self.assertTrue(len(hqes) == 1 and not hqes[0].active and
138 hqes[0].status == models.HostQueueEntry.Status.QUEUED)
139 self.host_scheduler._release_hosts()
140 self.assertTrue(self.db_helper.get_host(hostname='h1')[0].leased == 0)
141
142
143 def testSpecialTaskOrdering(self):
144 """Test priority ordering of special tasks."""
145
146 # Create 2 special tasks, one with and one without an hqe.
Prashanth B4ec98672014-05-15 10:44:54 -0700147 # Activate the hqe and make sure it gets scheduled before the other.
Prashanth Bf66d51b2014-05-06 12:42:25 -0700148 host = self.db_helper.create_host('h1', deps=['a'])
Prashanth B4ec98672014-05-15 10:44:54 -0700149 job1 = self.create_job(deps=['a'])
150 self.db_helper.add_host_to_job(host, job1.id)
151 task1 = self.db_helper.create_special_task(job1.id)
152 hqe = self.db_helper.get_hqes(job=job1.id)[0]
Prashanth Bf66d51b2014-05-06 12:42:25 -0700153
Prashanth B4ec98672014-05-15 10:44:54 -0700154 # This task has no queue entry.
155 task2 = self.db_helper.create_special_task(host_id=host.id)
156
157 # Since the hqe task isn't active we get both back.
Prashanth Bf66d51b2014-05-06 12:42:25 -0700158 tasks = self.job_query_manager.get_prioritized_special_tasks()
Prashanth B4ec98672014-05-15 10:44:54 -0700159 self.assertTrue(tasks[1].queue_entry_id is None and
160 tasks[0].queue_entry_id == hqe.id)
161
162 # Activate the hqe and make sure the frontned task isn't returned.
163 self.db_helper.update_hqe(hqe.id, active=True)
164 tasks = self.job_query_manager.get_prioritized_special_tasks()
165 self.assertTrue(tasks[0].id == task1.id)
166
167
168class HostSchedulerTests(rdb_testing_utils.AbstractBaseRDBTester,
169 unittest.TestCase):
170 """Verify scheduler behavior when pending jobs are already given hosts."""
171
172 _config_section = 'AUTOTEST_WEB'
173
174
175 def setUp(self):
176 super(HostSchedulerTests, self).setUp()
177 self.host_scheduler = host_scheduler.HostScheduler()
178
179
180 def testSpecialTaskLocking(self):
181 """Test that frontend special tasks lock hosts."""
182 # Create multiple tasks with hosts and make sure the hosts get locked.
183 host = self.db_helper.create_host('h')
184 host1 = self.db_helper.create_host('h1')
185 task = self.db_helper.create_special_task(host_id=host.id)
186 task1 = self.db_helper.create_special_task(host_id=host1.id)
187 self.host_scheduler._lease_hosts_of_frontend_tasks()
188 self.assertTrue(self.db_helper.get_host(hostname='h')[0].leased == 1 and
189 self.db_helper.get_host(hostname='h1')[0].leased == 1)
190
191
192 def testJobScheduling(self):
193 """Test new host acquisitions."""
194 # Create a job that will find a host through the host scheduler, and
195 # make sure the hqe is activated, and a special task is created.
196 job = self.create_job(deps=set(['a']))
197 host = self.db_helper.create_host('h1', deps=set(['a']))
198 self.host_scheduler._schedule_jobs()
199 hqe = self.db_helper.get_hqes(job_id=job.id)[0]
200 self.assertTrue(hqe.active and hqe.host_id == host.id and
201 hqe.status == models.HostQueueEntry.Status.QUEUED)
202 task = self.db_helper.get_tasks(queue_entry_id=hqe.id)[0]
203 self.assertTrue(task.is_active == 0 and task.host_id == host.id)
204
205
Prashanth B4ec98672014-05-15 10:44:54 -0700206 def _check_agent_invariants(self, host, agent):
207 host_agents = list(self._dispatcher._host_agents[host.id])
208 self.assertTrue(len(host_agents) == 1)
209 self.assertTrue(host_agents[0].task.task.id == agent.id)
210 return host_agents[0]
211
212
213 def testLeasedFrontendTaskHost(self):
214 """Check that we don't scheduler a special task on an unleased host."""
215 # Create a special task without an hqe and make sure it isn't returned
216 # for scheduling till its host is leased.
217 host = self.db_helper.create_host('h1', deps=['a'])
218 task = self.db_helper.create_special_task(host_id=host.id)
219
220 tasks = self.job_query_manager.get_prioritized_special_tasks(
221 only_tasks_with_leased_hosts=True)
Prashanth Bf66d51b2014-05-06 12:42:25 -0700222 self.assertTrue(tasks == [])
Prashanth B4ec98672014-05-15 10:44:54 -0700223 tasks = self.job_query_manager.get_prioritized_special_tasks(
224 only_tasks_with_leased_hosts=False)
225 self.assertTrue(tasks[0].id == task.id)
226 self.host_scheduler._lease_hosts_of_frontend_tasks()
227 tasks = self.job_query_manager.get_prioritized_special_tasks(
228 only_tasks_with_leased_hosts=True)
229 self.assertTrue(tasks[0].id == task.id)
Prashanth Bf66d51b2014-05-06 12:42:25 -0700230
231
Prashanth B4ec98672014-05-15 10:44:54 -0700232 def testTickLockStep(self):
233 """Check that a frontend task and an hqe never run simultaneously."""
234
235 self.god.stub_with(monitor_db, '_inline_host_acquisition', False)
236
237 # Create a frontend special task against a host.
238 host = self.db_helper.create_host('h1', deps=set(['a']))
239 frontend_task = self.db_helper.create_special_task(host_id=host.id)
240 self._dispatcher._schedule_special_tasks()
241 # The frontend special task shouldn't get scheduled on the host till
242 # the host is leased.
243 self.assertFalse(self._dispatcher.host_has_agent(host))
244
245 # Create a job for the same host and make the host scheduler lease the
246 # host out to that job.
247 job = self.create_job(deps=set(['a']))
248 self.host_scheduler._schedule_jobs()
249 hqe = self.db_helper.get_hqes(job_id=job.id)[0]
250 tasks = self.job_query_manager.get_prioritized_special_tasks(
251 only_tasks_with_leased_hosts=True)
252 # We should not find the frontend special task, even though its host is
253 # now leased, because its leased by an active hqe.
254 self.assertTrue(len(tasks) == 1 and tasks[0].queue_entry_id == hqe.id)
255 self._dispatcher._schedule_special_tasks()
256 self.assertTrue(self._dispatcher.host_has_agent(host))
257
258 # Deactivate the hqe task and make sure the frontend task gets the host.
259 task = tasks[0]
260 self._dispatcher.remove_agent(self._check_agent_invariants(host, task))
261 task.is_complete = 1
262 task.is_active = 0
263 task.save()
264 self.db_helper.update_hqe(hqe.id, active=False)
265 self._dispatcher._schedule_special_tasks()
266 self.assertTrue(self._dispatcher.host_has_agent(host))
267 self._check_agent_invariants(host, frontend_task)
268
269 # Make sure we don't release the host being used by the incomplete task.
270 self.host_scheduler._release_hosts()
271 host = self.db_helper.get_host(hostname='h1')[0]
272 self.assertTrue(host.leased == True)
273
Fang Deng522bc532014-11-20 17:48:34 -0800274
275class SuiteRecorderTest(rdb_testing_utils.AbstractBaseRDBTester,
276 unittest.TestCase):
277 """Test the functionality of SuiteRecorder"""
278
279 _config_section = 'AUTOTEST_WEB'
280
281 def testGetSuiteHostAssignment(self):
282 """Test the initialization of SuiteRecord."""
283 hosts = []
284 num = 4
285 for i in range (0, num):
286 hosts.append(self.db_helper.create_host(
287 'h%d' % i, deps=set(['board:lumpy'])))
288 single_job = self.create_job(deps=set(['a']))
289 jobs_1 = self.create_suite(num=2, board='board:lumpy')
290 jobs_2 = self.create_suite(num=2, board='board:lumpy')
291 # We have 4 hosts, 5 jobs, one job in the second suite won't
292 # get a host.
293 all_jobs = ([single_job] +
294 [jobs_1[k] for k in jobs_1 if k !='parent_job'] +
295 [jobs_2[k] for k in jobs_2 if k !='parent_job'])
296 for i in range(0, num):
297 self.db_helper.add_host_to_job(hosts[i], all_jobs[i].id,
298 activate=True)
299 r = host_scheduler.SuiteRecorder(self.job_query_manager)
300 self.assertEqual(r.suite_host_num,
301 {jobs_1['parent_job'].id:2,
302 jobs_2['parent_job'].id:1})
303 self.assertEqual(r.hosts_to_suites,
304 {hosts[1].id: jobs_1['parent_job'].id,
305 hosts[2].id: jobs_1['parent_job'].id,
306 hosts[3].id: jobs_2['parent_job'].id})
307
308
309 def verify_state(self, recorder, suite_host_num, hosts_to_suites):
310 """Verify the suite, host information held by SuiteRecorder.
311
312 @param recorder: A SuiteRecorder object.
313 @param suite_host_num: a dict, expected value of suite_host_num.
314 @param hosts_to_suites: a dict, expected value of hosts_to_suites.
315 """
316 self.assertEqual(recorder.suite_host_num, suite_host_num)
317 self.assertEqual(recorder.hosts_to_suites, hosts_to_suites)
318
319
320 def assign_host_to_job(self, host, job, recorder=None):
321 """A helper function that adds a host to a job and record it.
322
323 @param host: A Host object.
324 @param job: A Job object.
325 @param recorder: A SuiteRecorder object to record the assignment.
326
327 @return a HostQueueEntry object that binds the host and job together.
328 """
329 self.db_helper.add_host_to_job(host, job)
330 hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%s',
331 params=(job.id,))[0]
332 if recorder:
333 recorder.record_assignment(hqe)
334 return hqe
335
336
337 def testRecordAssignmentAndRelease(self):
338 """Test when a host is assigned to suite"""
339 r = host_scheduler.SuiteRecorder(self.job_query_manager)
340 self.verify_state(r, {}, {})
341 host1 = self.db_helper.create_host('h1')
342 host2 = self.db_helper.create_host('h2')
343 jobs = self.create_suite(num=2)
344 hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%s',
345 params=(jobs[0].id,))[0]
346 # HQE got a host.
347 hqe = self.assign_host_to_job(host1, jobs[0], r)
348 self.verify_state(r, {jobs['parent_job'].id:1},
349 {host1.id: jobs['parent_job'].id})
350 # Tried to call record_assignment again, nothing should happen.
351 r.record_assignment(hqe)
352 self.verify_state(r, {jobs['parent_job'].id:1},
353 {host1.id: jobs['parent_job'].id})
354 # Second hqe got a host
355 self.assign_host_to_job(host2, jobs[1], r)
356 self.verify_state(r, {jobs['parent_job'].id:2},
357 {host1.id: jobs['parent_job'].id,
358 host2.id: jobs['parent_job'].id})
359 # Release host1
360 r.record_release([host1])
361 self.verify_state(r, {jobs['parent_job'].id:1},
362 {host2.id: jobs['parent_job'].id})
363 # Release host2
364 r.record_release([host2])
365 self.verify_state(r, {}, {})
366
367
368 def testGetMinDuts(self):
369 """Test get min dut for suite."""
370 host1 = self.db_helper.create_host('h1')
371 host2 = self.db_helper.create_host('h2')
372 host3 = self.db_helper.create_host('h3')
373 jobs = self.create_suite(num=3)
374 pid = jobs['parent_job'].id
375 # Set min_dut=1 for the suite as a job keyval.
376 keyval = models.JobKeyval(
377 job_id=pid, key=constants.SUITE_MIN_DUTS_KEY, value=2)
378 keyval.save()
379 r = host_scheduler.SuiteRecorder(self.job_query_manager)
380 # Not job has got any host, min dut to request should equal to what's
381 # specified in the job keyval.
382 self.assertEqual(r.get_min_duts([pid]), {pid: 2})
383 self.assign_host_to_job(host1, jobs[0], r)
384 self.assertEqual(r.get_min_duts([pid]), {pid: 1})
385 self.assign_host_to_job(host2, jobs[1], r)
386 self.assertEqual(r.get_min_duts([pid]), {pid: 0})
387 self.assign_host_to_job(host3, jobs[2], r)
388 self.assertEqual(r.get_min_duts([pid]), {pid: 0})
389 r.record_release([host1])
390 self.assertEqual(r.get_min_duts([pid]), {pid: 0})
391 r.record_release([host2])
392 self.assertEqual(r.get_min_duts([pid]), {pid: 1})
393 r.record_release([host3])
394 self.assertEqual(r.get_min_duts([pid]), {pid: 2})
395
396if __name__ == '__main__':
397 unittest.main()
398