blob: aca504e4bb3b1ab92786f3318eabb8d95011d09e [file] [log] [blame]
Prashanth Bf66d51b2014-05-06 12:42:25 -07001#pylint: disable-msg=C0111
2
3# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Scheduler library classes.
8"""
9
10import collections
11import logging
12
13import common
14
Michael Liangda8c60a2014-06-03 13:24:51 -070015from autotest_lib.client.common_lib.cros.graphite import stats
Prashanth Bf66d51b2014-05-06 12:42:25 -070016from autotest_lib.frontend import setup_django_environment
17from autotest_lib.frontend.afe import models
18from autotest_lib.scheduler import scheduler_models
Prashanth Bf66d51b2014-05-06 12:42:25 -070019from autotest_lib.scheduler import scheduler_lib
20
21
Jakob Juelichefa95312014-08-27 18:29:52 -070022_job_timer = stats.Timer('scheduler.job_query_manager')
Prashanth Bf66d51b2014-05-06 12:42:25 -070023class AFEJobQueryManager(object):
24 """Query manager for AFE Jobs."""
25
26 # A subquery to only get inactive hostless jobs.
27 hostless_query = 'host_id IS NULL AND meta_host IS NULL'
28
29
Jakob Juelichefa95312014-08-27 18:29:52 -070030 @_job_timer.decorate
Prashanth Bf66d51b2014-05-06 12:42:25 -070031 def get_pending_queue_entries(self, only_hostless=False):
32 """
33 Fetch a list of new host queue entries.
34
35 The ordering of this list is important, as every new agent
36 we schedule can potentially contribute to the process count
37 on the drone, which has a static limit. The sort order
38 prioritizes jobs as follows:
39 1. High priority jobs: Based on the afe_job's priority
40 2. With hosts and metahosts: This will only happen if we don't
41 activate the hqe after assigning a host to it in
42 schedule_new_jobs.
43 3. With hosts but without metahosts: When tests are scheduled
44 through the frontend the owner of the job would have chosen
45 a host for it.
46 4. Without hosts but with metahosts: This is the common case of
47 a new test that needs a DUT. We assign a host and set it to
48 active so it shouldn't show up in case 2 on the next tick.
49 5. Without hosts and without metahosts: Hostless suite jobs, that
50 will result in new jobs that fall under category 4.
51
52 A note about the ordering of cases 3 and 4:
53 Prioritizing one case above the other leads to earlier acquisition
54 of the following resources: 1. process slots on the drone 2. machines.
55 - When a user schedules a job through the afe they choose a specific
56 host for it. Jobs with metahost can utilize any host that satisfies
57 the metahost criterion. This means that if we had scheduled 4 before
58 3 there is a good chance that a job which could've used another host,
59 will now use the host assigned to a metahost-less job. Given the
60 availability of machines in pool:suites, this almost guarantees
61 starvation for jobs scheduled through the frontend.
62 - Scheduling 4 before 3 also has its pros however, since a suite
63 has the concept of a time out, whereas users can wait. If we hit the
64 process count on the drone a suite can timeout waiting on the test,
65 but a user job generally has a much longer timeout, and relatively
66 harmless consequences.
67 The current ordering was chosed because it is more likely that we will
68 run out of machines in pool:suites than processes on the drone.
69
70 @returns A list of HQEs ordered according to sort_order.
71 """
72 sort_order = ('afe_jobs.priority DESC, '
73 'ISNULL(host_id), '
74 'ISNULL(meta_host), '
75 'parent_job_id, '
76 'job_id')
Jakob Juelichefa95312014-08-27 18:29:52 -070077 # Don't execute jobs that should be executed by a shard in the global
78 # scheduler.
79 # This won't prevent the shard scheduler to run this, as the shard db
80 # doesn't have an an entry in afe_shards_labels.
Prashanth Bf66d51b2014-05-06 12:42:25 -070081 query=('NOT complete AND NOT active AND status="Queued"'
Jakob Juelichefa95312014-08-27 18:29:52 -070082 'AND NOT aborted AND afe_shards_labels.id IS NULL')
83
84 # TODO(jakobjuelich, beeps): Optimize this query. Details:
85 # Compressed output of EXPLAIN <query>:
86 # +------------------------+--------+-------------------------+-------+
87 # | table | type | key | rows |
88 # +------------------------+--------+-------------------------+-------+
89 # | afe_host_queue_entries | ref | host_queue_entry_status | 30536 |
90 # | afe_shards_labels | ref | shard_label_id_fk | 1 |
91 # | afe_jobs | eq_ref | PRIMARY | 1 |
92 # +------------------------+--------+-------------------------+-------+
93 # This shows the first part of the query fetches a lot of objects, that
94 # are then filtered. The joins are comparably fast: There's usually just
95 # one or none shard mapping that can be answered fully using an index
96 # (shard_label_id_fk), similar thing applies to the job.
97 #
98 # This works for now, but once O(#Jobs in shard) << O(#Jobs in Queued),
99 # it might be more efficient to filter on the meta_host first, instead
100 # of the status.
Prashanth Bf66d51b2014-05-06 12:42:25 -0700101 if only_hostless:
102 query = '%s AND (%s)' % (query, self.hostless_query)
103 return list(scheduler_models.HostQueueEntry.fetch(
Jakob Juelichefa95312014-08-27 18:29:52 -0700104 joins=('INNER JOIN afe_jobs ON (job_id=afe_jobs.id) '
105 'LEFT JOIN afe_shards_labels ON ('
106 'meta_host=afe_shards_labels.label_id)'),
Prashanth Bf66d51b2014-05-06 12:42:25 -0700107 where=query, order_by=sort_order))
108
109
Jakob Juelichefa95312014-08-27 18:29:52 -0700110 @_job_timer.decorate
Prashanth B4ec98672014-05-15 10:44:54 -0700111 def get_prioritized_special_tasks(self, only_tasks_with_leased_hosts=False):
Prashanth Bf66d51b2014-05-06 12:42:25 -0700112 """
113 Returns all queued SpecialTasks prioritized for repair first, then
114 cleanup, then verify.
115
Prashanth B4ec98672014-05-15 10:44:54 -0700116 @param only_tasks_with_leased_hosts: If true, this method only returns
117 tasks with leased hosts.
118
Prashanth Bf66d51b2014-05-06 12:42:25 -0700119 @return: list of afe.models.SpecialTasks sorted according to priority.
120 """
121 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
122 is_complete=False,
123 host__locked=False)
124 # exclude hosts with active queue entries unless the SpecialTask is for
125 # that queue entry
126 queued_tasks = models.SpecialTask.objects.add_join(
127 queued_tasks, 'afe_host_queue_entries', 'host_id',
128 join_condition='afe_host_queue_entries.active',
129 join_from_key='host_id', force_left_join=True)
130 queued_tasks = queued_tasks.extra(
131 where=['(afe_host_queue_entries.id IS NULL OR '
132 'afe_host_queue_entries.id = '
133 'afe_special_tasks.queue_entry_id)'])
Prashanth B4ec98672014-05-15 10:44:54 -0700134 if only_tasks_with_leased_hosts:
135 queued_tasks = queued_tasks.filter(host__leased=True)
Prashanth Bf66d51b2014-05-06 12:42:25 -0700136
137 # reorder tasks by priority
138 task_priority_order = [models.SpecialTask.Task.REPAIR,
139 models.SpecialTask.Task.CLEANUP,
140 models.SpecialTask.Task.VERIFY,
141 models.SpecialTask.Task.RESET,
142 models.SpecialTask.Task.PROVISION]
143 def task_priority_key(task):
144 return task_priority_order.index(task.task)
145 return sorted(queued_tasks, key=task_priority_key)
146
147
148 @classmethod
149 def get_overlapping_jobs(cls):
150 """A helper method to get all active jobs using the same host.
151
152 @return: A list of dictionaries with the hqe id, job_id and host_id
153 of the currently overlapping jobs.
154 """
155 # Filter all active hqes and stand alone special tasks to make sure
156 # a host isn't being used by two jobs at the same time. An incomplete
157 # stand alone special task can share a host with an active hqe, an
158 # example of this is the cleanup scheduled in gathering.
159 hqe_hosts = list(models.HostQueueEntry.objects.filter(
160 active=1, complete=0, host_id__isnull=False).values_list(
161 'host_id', flat=True))
162 special_task_hosts = list(models.SpecialTask.objects.filter(
Prashanth B4ec98672014-05-15 10:44:54 -0700163 is_active=1, is_complete=0, host_id__isnull=False,
164 queue_entry_id__isnull=True).values_list('host_id', flat=True))
Prashanth Bf66d51b2014-05-06 12:42:25 -0700165 host_counts = collections.Counter(
166 hqe_hosts + special_task_hosts).most_common()
167 multiple_hosts = [count[0] for count in host_counts if count[1] > 1]
168 return list(models.HostQueueEntry.objects.filter(
169 host_id__in=multiple_hosts, active=True).values(
170 'id', 'job_id', 'host_id'))
171
172
Jakob Juelichefa95312014-08-27 18:29:52 -0700173_host_timer = stats.Timer('scheduler.host_query_manager')
Prashanth Bf66d51b2014-05-06 12:42:25 -0700174class AFEHostQueryManager(object):
175 """Query manager for AFE Hosts."""
176
177 def __init__(self):
178 """Create an AFEHostQueryManager.
179
180 @param db: A connection to the database with the afe_hosts table.
181 """
182 self._db = scheduler_lib.ConnectionManager().get_connection()
183
184
185 def _process_many2many_dict(self, rows, flip=False):
186 result = {}
187 for row in rows:
188 left_id, right_id = int(row[0]), int(row[1])
189 if flip:
190 left_id, right_id = right_id, left_id
191 result.setdefault(left_id, set()).add(right_id)
192 return result
193
194
195 def _get_sql_id_list(self, id_list):
196 return ','.join(str(item_id) for item_id in id_list)
197
198
199 def _get_many2many_dict(self, query, id_list, flip=False):
200 if not id_list:
201 return {}
202 query %= self._get_sql_id_list(id_list)
203 rows = self._db.execute(query)
204 return self._process_many2many_dict(rows, flip)
205
206
Jakob Juelichefa95312014-08-27 18:29:52 -0700207 @_host_timer.decorate
Prashanth Bf66d51b2014-05-06 12:42:25 -0700208 def _get_ready_hosts(self):
209 # We don't lose anything by re-doing these checks
210 # even though we release hosts on the same conditions.
211 # In the future we might have multiple clients that
212 # release_hosts and/or lock them independent of the
213 # scheduler tick.
214 hosts = scheduler_models.Host.fetch(
215 where="NOT afe_hosts.leased "
216 "AND NOT afe_hosts.locked "
217 "AND (afe_hosts.status IS NULL "
218 "OR afe_hosts.status = 'Ready')")
219 return dict((host.id, host) for host in hosts)
220
221
Jakob Juelichefa95312014-08-27 18:29:52 -0700222 @_host_timer.decorate
Prashanth Bf66d51b2014-05-06 12:42:25 -0700223 def _get_job_acl_groups(self, job_ids):
224 query = """
225 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
226 FROM afe_jobs
227 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
228 INNER JOIN afe_acl_groups_users ON
229 afe_acl_groups_users.user_id = afe_users.id
230 WHERE afe_jobs.id IN (%s)
231 """
232 return self._get_many2many_dict(query, job_ids)
233
234
Jakob Juelichefa95312014-08-27 18:29:52 -0700235 @_host_timer.decorate
Prashanth Bf66d51b2014-05-06 12:42:25 -0700236 def _get_job_ineligible_hosts(self, job_ids):
237 query = """
238 SELECT job_id, host_id
239 FROM afe_ineligible_host_queues
240 WHERE job_id IN (%s)
241 """
242 return self._get_many2many_dict(query, job_ids)
243
244
Jakob Juelichefa95312014-08-27 18:29:52 -0700245 @_host_timer.decorate
Prashanth Bf66d51b2014-05-06 12:42:25 -0700246 def _get_job_dependencies(self, job_ids):
247 query = """
248 SELECT job_id, label_id
249 FROM afe_jobs_dependency_labels
250 WHERE job_id IN (%s)
251 """
252 return self._get_many2many_dict(query, job_ids)
253
Jakob Juelichefa95312014-08-27 18:29:52 -0700254 @_host_timer.decorate
Prashanth Bf66d51b2014-05-06 12:42:25 -0700255 def _get_host_acls(self, host_ids):
256 query = """
257 SELECT host_id, aclgroup_id
258 FROM afe_acl_groups_hosts
259 WHERE host_id IN (%s)
260 """
261 return self._get_many2many_dict(query, host_ids)
262
263
Jakob Juelichefa95312014-08-27 18:29:52 -0700264 @_host_timer.decorate
Prashanth Bf66d51b2014-05-06 12:42:25 -0700265 def _get_label_hosts(self, host_ids):
266 if not host_ids:
267 return {}, {}
268 query = """
269 SELECT label_id, host_id
270 FROM afe_hosts_labels
271 WHERE host_id IN (%s)
272 """ % self._get_sql_id_list(host_ids)
273 rows = self._db.execute(query)
274 labels_to_hosts = self._process_many2many_dict(rows)
275 hosts_to_labels = self._process_many2many_dict(rows, flip=True)
276 return labels_to_hosts, hosts_to_labels
277
278
279 @classmethod
280 def find_unused_healty_hosts(cls):
281 """Get hosts that are currently unused and in the READY state.
282
283 @return: A list of host objects, one for each unused healthy host.
284 """
285 # Avoid any host with a currently active queue entry against it.
286 hqe_join = ('LEFT JOIN afe_host_queue_entries AS active_hqe '
287 'ON (afe_hosts.id = active_hqe.host_id AND '
288 'active_hqe.active)')
289
290 # Avoid any host with a new special task against it. There are 2 cases
291 # when an inactive but incomplete special task will not use the host
292 # this tick: 1. When the host is locked 2. When an active hqe already
293 # has special tasks for the same host. In both these cases this host
294 # will not be in the ready hosts list anyway. In all other cases,
295 # an incomplete special task will grab the host before a new job does
296 # by assigning an agent to it.
297 special_task_join = ('LEFT JOIN afe_special_tasks as new_tasks '
298 'ON (afe_hosts.id = new_tasks.host_id AND '
299 'new_tasks.is_complete=0)')
300
301 return scheduler_models.Host.fetch(
302 joins='%s %s' % (hqe_join, special_task_join),
303 where="active_hqe.host_id IS NULL AND new_tasks.host_id IS NULL "
304 "AND afe_hosts.leased "
305 "AND NOT afe_hosts.locked "
306 "AND (afe_hosts.status IS NULL "
307 "OR afe_hosts.status = 'Ready')")
308
309
Jakob Juelichefa95312014-08-27 18:29:52 -0700310 @_host_timer.decorate
Prashanth Bf66d51b2014-05-06 12:42:25 -0700311 def set_leased(self, leased_value, **kwargs):
312 """Modify the leased bit on the hosts with ids in host_ids.
313
314 @param leased_value: The True/False value of the leased column for
315 the hosts with ids in host_ids.
316 @param kwargs: The args to use in finding matching hosts.
317 """
318 logging.info('Setting leased = %s for the hosts that match %s',
319 leased_value, kwargs)
320 models.Host.objects.filter(**kwargs).update(leased=leased_value)
321
322
Jakob Juelichefa95312014-08-27 18:29:52 -0700323 @_host_timer.decorate
Prashanth Bf66d51b2014-05-06 12:42:25 -0700324 def _get_labels(self, job_dependencies):
325 """
326 Calculate a dict mapping label id to label object so that we don't
327 frequently round trip to the database every time we need a label.
328
329 @param job_dependencies: A dict mapping an integer job id to a list of
330 integer label id's. ie. {job_id: [label_id]}
331 @return: A dict mapping an integer label id to a scheduler model label
332 object. ie. {label_id: label_object}
333
334 """
335 id_to_label = dict()
336 # Pull all the labels on hosts we might look at
337 host_labels = scheduler_models.Label.fetch(
338 where="id IN (SELECT label_id FROM afe_hosts_labels)")
339 id_to_label.update([(label.id, label) for label in host_labels])
340 # and pull all the labels on jobs we might look at.
341 job_label_set = set()
342 for job_deps in job_dependencies.values():
343 job_label_set.update(job_deps)
344 # On the rare/impossible chance that no jobs have any labels, we
345 # can skip this.
346 if job_label_set:
347 job_string_label_list = ','.join([str(x) for x in job_label_set])
348 job_labels = scheduler_models.Label.fetch(
349 where="id IN (%s)" % job_string_label_list)
350 id_to_label.update([(label.id, label) for label in job_labels])
351 return id_to_label
352
353
Jakob Juelichefa95312014-08-27 18:29:52 -0700354 @_host_timer.decorate
Prashanth Bf66d51b2014-05-06 12:42:25 -0700355 def refresh(self, pending_queue_entries):
356 """Update the query manager.
357
358 Cache information about a list of queue entries and eligible hosts
359 from the database so clients can avoid expensive round trips during
360 host acquisition.
361
362 @param pending_queue_entries: A list of queue entries about which we
363 need information.
364 """
365 self._hosts_available = self._get_ready_hosts()
366 relevant_jobs = [queue_entry.job_id
367 for queue_entry in pending_queue_entries]
368 self._job_acls = self._get_job_acl_groups(relevant_jobs)
369 self._ineligible_hosts = (self._get_job_ineligible_hosts(relevant_jobs))
370 self._job_dependencies = (self._get_job_dependencies(relevant_jobs))
371 host_ids = self._hosts_available.keys()
372 self._host_acls = self._get_host_acls(host_ids)
373 self._label_hosts, self._host_labels = (
374 self._get_label_hosts(host_ids))
375 self._labels = self._get_labels(self._job_dependencies)