blob: 1dc19adb80bbae05c0b275e801be921fabc716e6 [file] [log] [blame]
Prashanth Bf66d51b2014-05-06 12:42:25 -07001#pylint: disable-msg=C0111
2
3# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Scheduler library classes.
8"""
9
10import collections
11import logging
12
13import common
14
15from autotest_lib.frontend import setup_django_environment
16from autotest_lib.frontend.afe import models
Fang Deng522bc532014-11-20 17:48:34 -080017from autotest_lib.server.cros.dynamic_suite import constants
Prashanth Bf66d51b2014-05-06 12:42:25 -070018from autotest_lib.scheduler import scheduler_models
Prashanth Bf66d51b2014-05-06 12:42:25 -070019from autotest_lib.scheduler import scheduler_lib
20
xixuan7224dcb2016-11-22 17:11:41 -080021from chromite.lib import metrics
Prashanth Bf66d51b2014-05-06 12:42:25 -070022
xixuan7224dcb2016-11-22 17:11:41 -080023_job_timer_name = 'chromeos/autotest/scheduler/job_query_durations/%s'
Prashanth Bf66d51b2014-05-06 12:42:25 -070024class AFEJobQueryManager(object):
25 """Query manager for AFE Jobs."""
26
27 # A subquery to only get inactive hostless jobs.
28 hostless_query = 'host_id IS NULL AND meta_host IS NULL'
29
30
xixuan7224dcb2016-11-22 17:11:41 -080031 @metrics.SecondsTimerDecorator(
32 _job_timer_name % 'get_pending_queue_entries')
Prashanth Bf66d51b2014-05-06 12:42:25 -070033 def get_pending_queue_entries(self, only_hostless=False):
34 """
35 Fetch a list of new host queue entries.
36
37 The ordering of this list is important, as every new agent
38 we schedule can potentially contribute to the process count
39 on the drone, which has a static limit. The sort order
40 prioritizes jobs as follows:
41 1. High priority jobs: Based on the afe_job's priority
42 2. With hosts and metahosts: This will only happen if we don't
43 activate the hqe after assigning a host to it in
44 schedule_new_jobs.
45 3. With hosts but without metahosts: When tests are scheduled
46 through the frontend the owner of the job would have chosen
47 a host for it.
48 4. Without hosts but with metahosts: This is the common case of
49 a new test that needs a DUT. We assign a host and set it to
50 active so it shouldn't show up in case 2 on the next tick.
51 5. Without hosts and without metahosts: Hostless suite jobs, that
52 will result in new jobs that fall under category 4.
53
54 A note about the ordering of cases 3 and 4:
55 Prioritizing one case above the other leads to earlier acquisition
56 of the following resources: 1. process slots on the drone 2. machines.
57 - When a user schedules a job through the afe they choose a specific
58 host for it. Jobs with metahost can utilize any host that satisfies
59 the metahost criterion. This means that if we had scheduled 4 before
60 3 there is a good chance that a job which could've used another host,
61 will now use the host assigned to a metahost-less job. Given the
62 availability of machines in pool:suites, this almost guarantees
63 starvation for jobs scheduled through the frontend.
64 - Scheduling 4 before 3 also has its pros however, since a suite
65 has the concept of a time out, whereas users can wait. If we hit the
66 process count on the drone a suite can timeout waiting on the test,
67 but a user job generally has a much longer timeout, and relatively
68 harmless consequences.
69 The current ordering was chosed because it is more likely that we will
70 run out of machines in pool:suites than processes on the drone.
71
72 @returns A list of HQEs ordered according to sort_order.
73 """
74 sort_order = ('afe_jobs.priority DESC, '
75 'ISNULL(host_id), '
76 'ISNULL(meta_host), '
77 'parent_job_id, '
78 'job_id')
Jakob Juelichefa95312014-08-27 18:29:52 -070079 # Don't execute jobs that should be executed by a shard in the global
80 # scheduler.
81 # This won't prevent the shard scheduler to run this, as the shard db
82 # doesn't have an an entry in afe_shards_labels.
Prashanth Bf66d51b2014-05-06 12:42:25 -070083 query=('NOT complete AND NOT active AND status="Queued"'
Jakob Juelichefa95312014-08-27 18:29:52 -070084 'AND NOT aborted AND afe_shards_labels.id IS NULL')
85
86 # TODO(jakobjuelich, beeps): Optimize this query. Details:
87 # Compressed output of EXPLAIN <query>:
88 # +------------------------+--------+-------------------------+-------+
89 # | table | type | key | rows |
90 # +------------------------+--------+-------------------------+-------+
91 # | afe_host_queue_entries | ref | host_queue_entry_status | 30536 |
92 # | afe_shards_labels | ref | shard_label_id_fk | 1 |
93 # | afe_jobs | eq_ref | PRIMARY | 1 |
94 # +------------------------+--------+-------------------------+-------+
95 # This shows the first part of the query fetches a lot of objects, that
96 # are then filtered. The joins are comparably fast: There's usually just
97 # one or none shard mapping that can be answered fully using an index
98 # (shard_label_id_fk), similar thing applies to the job.
99 #
100 # This works for now, but once O(#Jobs in shard) << O(#Jobs in Queued),
101 # it might be more efficient to filter on the meta_host first, instead
102 # of the status.
Prashanth Bf66d51b2014-05-06 12:42:25 -0700103 if only_hostless:
104 query = '%s AND (%s)' % (query, self.hostless_query)
105 return list(scheduler_models.HostQueueEntry.fetch(
Jakob Juelichefa95312014-08-27 18:29:52 -0700106 joins=('INNER JOIN afe_jobs ON (job_id=afe_jobs.id) '
107 'LEFT JOIN afe_shards_labels ON ('
108 'meta_host=afe_shards_labels.label_id)'),
Prashanth Bf66d51b2014-05-06 12:42:25 -0700109 where=query, order_by=sort_order))
110
111
xixuan7224dcb2016-11-22 17:11:41 -0800112 @metrics.SecondsTimerDecorator(
113 _job_timer_name % 'get_prioritized_special_tasks')
Prashanth B4ec98672014-05-15 10:44:54 -0700114 def get_prioritized_special_tasks(self, only_tasks_with_leased_hosts=False):
Prashanth Bf66d51b2014-05-06 12:42:25 -0700115 """
116 Returns all queued SpecialTasks prioritized for repair first, then
117 cleanup, then verify.
118
Prashanth B4ec98672014-05-15 10:44:54 -0700119 @param only_tasks_with_leased_hosts: If true, this method only returns
120 tasks with leased hosts.
121
Prashanth Bf66d51b2014-05-06 12:42:25 -0700122 @return: list of afe.models.SpecialTasks sorted according to priority.
123 """
124 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
125 is_complete=False,
126 host__locked=False)
127 # exclude hosts with active queue entries unless the SpecialTask is for
128 # that queue entry
129 queued_tasks = models.SpecialTask.objects.add_join(
130 queued_tasks, 'afe_host_queue_entries', 'host_id',
131 join_condition='afe_host_queue_entries.active',
132 join_from_key='host_id', force_left_join=True)
133 queued_tasks = queued_tasks.extra(
134 where=['(afe_host_queue_entries.id IS NULL OR '
135 'afe_host_queue_entries.id = '
136 'afe_special_tasks.queue_entry_id)'])
Prashanth B4ec98672014-05-15 10:44:54 -0700137 if only_tasks_with_leased_hosts:
138 queued_tasks = queued_tasks.filter(host__leased=True)
Prashanth Bf66d51b2014-05-06 12:42:25 -0700139
140 # reorder tasks by priority
141 task_priority_order = [models.SpecialTask.Task.REPAIR,
142 models.SpecialTask.Task.CLEANUP,
143 models.SpecialTask.Task.VERIFY,
144 models.SpecialTask.Task.RESET,
145 models.SpecialTask.Task.PROVISION]
146 def task_priority_key(task):
147 return task_priority_order.index(task.task)
148 return sorted(queued_tasks, key=task_priority_key)
149
150
151 @classmethod
152 def get_overlapping_jobs(cls):
153 """A helper method to get all active jobs using the same host.
154
155 @return: A list of dictionaries with the hqe id, job_id and host_id
156 of the currently overlapping jobs.
157 """
158 # Filter all active hqes and stand alone special tasks to make sure
159 # a host isn't being used by two jobs at the same time. An incomplete
160 # stand alone special task can share a host with an active hqe, an
161 # example of this is the cleanup scheduled in gathering.
162 hqe_hosts = list(models.HostQueueEntry.objects.filter(
163 active=1, complete=0, host_id__isnull=False).values_list(
164 'host_id', flat=True))
165 special_task_hosts = list(models.SpecialTask.objects.filter(
Prashanth B4ec98672014-05-15 10:44:54 -0700166 is_active=1, is_complete=0, host_id__isnull=False,
167 queue_entry_id__isnull=True).values_list('host_id', flat=True))
Prashanth Bf66d51b2014-05-06 12:42:25 -0700168 host_counts = collections.Counter(
169 hqe_hosts + special_task_hosts).most_common()
170 multiple_hosts = [count[0] for count in host_counts if count[1] > 1]
171 return list(models.HostQueueEntry.objects.filter(
172 host_id__in=multiple_hosts, active=True).values(
173 'id', 'job_id', 'host_id'))
174
175
xixuan7224dcb2016-11-22 17:11:41 -0800176 @metrics.SecondsTimerDecorator(
177 _job_timer_name % 'get_suite_host_assignment')
Fang Deng522bc532014-11-20 17:48:34 -0800178 def get_suite_host_assignment(self):
179 """A helper method to get how many hosts each suite is holding.
180
181 @return: Two dictionaries (suite_host_num, hosts_to_suites)
182 suite_host_num maps suite job id to number of hosts
183 holding by its child jobs.
184 hosts_to_suites contains current hosts held by
185 any suites, and maps the host id to its parent_job_id.
186 """
187 query = models.HostQueueEntry.objects.filter(
188 host_id__isnull=False, complete=0, active=1,
189 job__parent_job_id__isnull=False)
190 suite_host_num = {}
191 hosts_to_suites = {}
192 for hqe in query:
193 host_id = hqe.host_id
194 parent_job_id = hqe.job.parent_job_id
195 count = suite_host_num.get(parent_job_id, 0)
196 suite_host_num[parent_job_id] = count + 1
197 hosts_to_suites[host_id] = parent_job_id
198 return suite_host_num, hosts_to_suites
199
200
xixuan7224dcb2016-11-22 17:11:41 -0800201 @metrics.SecondsTimerDecorator( _job_timer_name % 'get_min_duts_of_suites')
Fang Deng522bc532014-11-20 17:48:34 -0800202 def get_min_duts_of_suites(self, suite_job_ids):
203 """Load suite_min_duts job keyval for a set of suites.
204
205 @param suite_job_ids: A set of suite job ids.
206
207 @return: A dictionary where the key is a suite job id,
208 the value is the value of 'suite_min_duts'.
209 """
210 query = models.JobKeyval.objects.filter(
211 job_id__in=suite_job_ids,
212 key=constants.SUITE_MIN_DUTS_KEY, value__isnull=False)
213 return dict((keyval.job_id, int(keyval.value)) for keyval in query)
214
215
xixuan7224dcb2016-11-22 17:11:41 -0800216_host_timer_name = 'chromeos/autotest/scheduler/host_query_durations/%s'
Prashanth Bf66d51b2014-05-06 12:42:25 -0700217class AFEHostQueryManager(object):
218 """Query manager for AFE Hosts."""
219
220 def __init__(self):
221 """Create an AFEHostQueryManager.
222
223 @param db: A connection to the database with the afe_hosts table.
224 """
225 self._db = scheduler_lib.ConnectionManager().get_connection()
226
227
228 def _process_many2many_dict(self, rows, flip=False):
229 result = {}
230 for row in rows:
231 left_id, right_id = int(row[0]), int(row[1])
232 if flip:
233 left_id, right_id = right_id, left_id
234 result.setdefault(left_id, set()).add(right_id)
235 return result
236
237
238 def _get_sql_id_list(self, id_list):
239 return ','.join(str(item_id) for item_id in id_list)
240
241
242 def _get_many2many_dict(self, query, id_list, flip=False):
243 if not id_list:
244 return {}
245 query %= self._get_sql_id_list(id_list)
246 rows = self._db.execute(query)
247 return self._process_many2many_dict(rows, flip)
248
249
Prashanth Bf66d51b2014-05-06 12:42:25 -0700250 def _get_ready_hosts(self):
251 # We don't lose anything by re-doing these checks
252 # even though we release hosts on the same conditions.
253 # In the future we might have multiple clients that
254 # release_hosts and/or lock them independent of the
255 # scheduler tick.
256 hosts = scheduler_models.Host.fetch(
257 where="NOT afe_hosts.leased "
258 "AND NOT afe_hosts.locked "
259 "AND (afe_hosts.status IS NULL "
260 "OR afe_hosts.status = 'Ready')")
261 return dict((host.id, host) for host in hosts)
262
263
xixuan7224dcb2016-11-22 17:11:41 -0800264 @metrics.SecondsTimerDecorator(_host_timer_name % 'get_job_acl_groups')
Prashanth Bf66d51b2014-05-06 12:42:25 -0700265 def _get_job_acl_groups(self, job_ids):
266 query = """
267 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
268 FROM afe_jobs
269 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
270 INNER JOIN afe_acl_groups_users ON
271 afe_acl_groups_users.user_id = afe_users.id
272 WHERE afe_jobs.id IN (%s)
273 """
274 return self._get_many2many_dict(query, job_ids)
275
276
Prashanth Bf66d51b2014-05-06 12:42:25 -0700277 def _get_job_ineligible_hosts(self, job_ids):
278 query = """
279 SELECT job_id, host_id
280 FROM afe_ineligible_host_queues
281 WHERE job_id IN (%s)
282 """
283 return self._get_many2many_dict(query, job_ids)
284
285
xixuan7224dcb2016-11-22 17:11:41 -0800286 @metrics.SecondsTimerDecorator(_host_timer_name % 'get_job_dependencies')
Prashanth Bf66d51b2014-05-06 12:42:25 -0700287 def _get_job_dependencies(self, job_ids):
288 query = """
289 SELECT job_id, label_id
290 FROM afe_jobs_dependency_labels
291 WHERE job_id IN (%s)
292 """
293 return self._get_many2many_dict(query, job_ids)
294
Prashanth Bf66d51b2014-05-06 12:42:25 -0700295 def _get_host_acls(self, host_ids):
296 query = """
297 SELECT host_id, aclgroup_id
298 FROM afe_acl_groups_hosts
299 WHERE host_id IN (%s)
300 """
301 return self._get_many2many_dict(query, host_ids)
302
303
Prashanth Bf66d51b2014-05-06 12:42:25 -0700304 def _get_label_hosts(self, host_ids):
305 if not host_ids:
306 return {}, {}
307 query = """
308 SELECT label_id, host_id
309 FROM afe_hosts_labels
310 WHERE host_id IN (%s)
311 """ % self._get_sql_id_list(host_ids)
312 rows = self._db.execute(query)
313 labels_to_hosts = self._process_many2many_dict(rows)
314 hosts_to_labels = self._process_many2many_dict(rows, flip=True)
315 return labels_to_hosts, hosts_to_labels
316
317
318 @classmethod
319 def find_unused_healty_hosts(cls):
320 """Get hosts that are currently unused and in the READY state.
321
322 @return: A list of host objects, one for each unused healthy host.
323 """
324 # Avoid any host with a currently active queue entry against it.
325 hqe_join = ('LEFT JOIN afe_host_queue_entries AS active_hqe '
326 'ON (afe_hosts.id = active_hqe.host_id AND '
327 'active_hqe.active)')
328
329 # Avoid any host with a new special task against it. There are 2 cases
330 # when an inactive but incomplete special task will not use the host
331 # this tick: 1. When the host is locked 2. When an active hqe already
332 # has special tasks for the same host. In both these cases this host
333 # will not be in the ready hosts list anyway. In all other cases,
334 # an incomplete special task will grab the host before a new job does
335 # by assigning an agent to it.
336 special_task_join = ('LEFT JOIN afe_special_tasks as new_tasks '
337 'ON (afe_hosts.id = new_tasks.host_id AND '
338 'new_tasks.is_complete=0)')
339
340 return scheduler_models.Host.fetch(
341 joins='%s %s' % (hqe_join, special_task_join),
342 where="active_hqe.host_id IS NULL AND new_tasks.host_id IS NULL "
343 "AND afe_hosts.leased "
344 "AND NOT afe_hosts.locked "
345 "AND (afe_hosts.status IS NULL "
346 "OR afe_hosts.status = 'Ready')")
347
xixuan7224dcb2016-11-22 17:11:41 -0800348 @metrics.SecondsTimerDecorator(_host_timer_name % 'set_leased')
Prashanth Bf66d51b2014-05-06 12:42:25 -0700349 def set_leased(self, leased_value, **kwargs):
350 """Modify the leased bit on the hosts with ids in host_ids.
351
352 @param leased_value: The True/False value of the leased column for
353 the hosts with ids in host_ids.
354 @param kwargs: The args to use in finding matching hosts.
355 """
356 logging.info('Setting leased = %s for the hosts that match %s',
357 leased_value, kwargs)
358 models.Host.objects.filter(**kwargs).update(leased=leased_value)
359
360
xixuan7224dcb2016-11-22 17:11:41 -0800361 @metrics.SecondsTimerDecorator(_host_timer_name % 'get_labels')
Prashanth Bf66d51b2014-05-06 12:42:25 -0700362 def _get_labels(self, job_dependencies):
363 """
364 Calculate a dict mapping label id to label object so that we don't
365 frequently round trip to the database every time we need a label.
366
367 @param job_dependencies: A dict mapping an integer job id to a list of
368 integer label id's. ie. {job_id: [label_id]}
369 @return: A dict mapping an integer label id to a scheduler model label
370 object. ie. {label_id: label_object}
371
372 """
373 id_to_label = dict()
374 # Pull all the labels on hosts we might look at
375 host_labels = scheduler_models.Label.fetch(
376 where="id IN (SELECT label_id FROM afe_hosts_labels)")
377 id_to_label.update([(label.id, label) for label in host_labels])
378 # and pull all the labels on jobs we might look at.
379 job_label_set = set()
380 for job_deps in job_dependencies.values():
381 job_label_set.update(job_deps)
382 # On the rare/impossible chance that no jobs have any labels, we
383 # can skip this.
384 if job_label_set:
385 job_string_label_list = ','.join([str(x) for x in job_label_set])
386 job_labels = scheduler_models.Label.fetch(
387 where="id IN (%s)" % job_string_label_list)
388 id_to_label.update([(label.id, label) for label in job_labels])
389 return id_to_label
390
391
Prashanth Bf66d51b2014-05-06 12:42:25 -0700392 def refresh(self, pending_queue_entries):
393 """Update the query manager.
394
395 Cache information about a list of queue entries and eligible hosts
396 from the database so clients can avoid expensive round trips during
397 host acquisition.
398
399 @param pending_queue_entries: A list of queue entries about which we
400 need information.
401 """
402 self._hosts_available = self._get_ready_hosts()
403 relevant_jobs = [queue_entry.job_id
404 for queue_entry in pending_queue_entries]
405 self._job_acls = self._get_job_acl_groups(relevant_jobs)
406 self._ineligible_hosts = (self._get_job_ineligible_hosts(relevant_jobs))
407 self._job_dependencies = (self._get_job_dependencies(relevant_jobs))
408 host_ids = self._hosts_available.keys()
409 self._host_acls = self._get_host_acls(host_ids)
410 self._label_hosts, self._host_labels = (
411 self._get_label_hosts(host_ids))
412 self._labels = self._get_labels(self._job_dependencies)