blob: e8f9f98edc4215323ca11b1d9b4fb1c95a4d7b91 [file] [log] [blame]
Prashanth Bf66d51b2014-05-06 12:42:25 -07001#pylint: disable-msg=C0111
2
3# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Scheduler library classes.
8"""
9
10import collections
11import logging
12
13import common
14
Michael Liangda8c60a2014-06-03 13:24:51 -070015from autotest_lib.client.common_lib.cros.graphite import stats
Prashanth Bf66d51b2014-05-06 12:42:25 -070016from autotest_lib.frontend import setup_django_environment
17from autotest_lib.frontend.afe import models
18from autotest_lib.scheduler import scheduler_models
Prashanth Bf66d51b2014-05-06 12:42:25 -070019from autotest_lib.scheduler import scheduler_lib
20
21
22class AFEJobQueryManager(object):
23 """Query manager for AFE Jobs."""
24
25 # A subquery to only get inactive hostless jobs.
26 hostless_query = 'host_id IS NULL AND meta_host IS NULL'
27
28
29 def get_pending_queue_entries(self, only_hostless=False):
30 """
31 Fetch a list of new host queue entries.
32
33 The ordering of this list is important, as every new agent
34 we schedule can potentially contribute to the process count
35 on the drone, which has a static limit. The sort order
36 prioritizes jobs as follows:
37 1. High priority jobs: Based on the afe_job's priority
38 2. With hosts and metahosts: This will only happen if we don't
39 activate the hqe after assigning a host to it in
40 schedule_new_jobs.
41 3. With hosts but without metahosts: When tests are scheduled
42 through the frontend the owner of the job would have chosen
43 a host for it.
44 4. Without hosts but with metahosts: This is the common case of
45 a new test that needs a DUT. We assign a host and set it to
46 active so it shouldn't show up in case 2 on the next tick.
47 5. Without hosts and without metahosts: Hostless suite jobs, that
48 will result in new jobs that fall under category 4.
49
50 A note about the ordering of cases 3 and 4:
51 Prioritizing one case above the other leads to earlier acquisition
52 of the following resources: 1. process slots on the drone 2. machines.
53 - When a user schedules a job through the afe they choose a specific
54 host for it. Jobs with metahost can utilize any host that satisfies
55 the metahost criterion. This means that if we had scheduled 4 before
56 3 there is a good chance that a job which could've used another host,
57 will now use the host assigned to a metahost-less job. Given the
58 availability of machines in pool:suites, this almost guarantees
59 starvation for jobs scheduled through the frontend.
60 - Scheduling 4 before 3 also has its pros however, since a suite
61 has the concept of a time out, whereas users can wait. If we hit the
62 process count on the drone a suite can timeout waiting on the test,
63 but a user job generally has a much longer timeout, and relatively
64 harmless consequences.
65 The current ordering was chosed because it is more likely that we will
66 run out of machines in pool:suites than processes on the drone.
67
68 @returns A list of HQEs ordered according to sort_order.
69 """
70 sort_order = ('afe_jobs.priority DESC, '
71 'ISNULL(host_id), '
72 'ISNULL(meta_host), '
73 'parent_job_id, '
74 'job_id')
75 query=('NOT complete AND NOT active AND status="Queued"'
76 'AND NOT aborted')
77 if only_hostless:
78 query = '%s AND (%s)' % (query, self.hostless_query)
79 return list(scheduler_models.HostQueueEntry.fetch(
80 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
81 where=query, order_by=sort_order))
82
83
Prashanth B4ec98672014-05-15 10:44:54 -070084 def get_prioritized_special_tasks(self, only_tasks_with_leased_hosts=False):
Prashanth Bf66d51b2014-05-06 12:42:25 -070085 """
86 Returns all queued SpecialTasks prioritized for repair first, then
87 cleanup, then verify.
88
Prashanth B4ec98672014-05-15 10:44:54 -070089 @param only_tasks_with_leased_hosts: If true, this method only returns
90 tasks with leased hosts.
91
Prashanth Bf66d51b2014-05-06 12:42:25 -070092 @return: list of afe.models.SpecialTasks sorted according to priority.
93 """
94 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
95 is_complete=False,
96 host__locked=False)
97 # exclude hosts with active queue entries unless the SpecialTask is for
98 # that queue entry
99 queued_tasks = models.SpecialTask.objects.add_join(
100 queued_tasks, 'afe_host_queue_entries', 'host_id',
101 join_condition='afe_host_queue_entries.active',
102 join_from_key='host_id', force_left_join=True)
103 queued_tasks = queued_tasks.extra(
104 where=['(afe_host_queue_entries.id IS NULL OR '
105 'afe_host_queue_entries.id = '
106 'afe_special_tasks.queue_entry_id)'])
Prashanth B4ec98672014-05-15 10:44:54 -0700107 if only_tasks_with_leased_hosts:
108 queued_tasks = queued_tasks.filter(host__leased=True)
Prashanth Bf66d51b2014-05-06 12:42:25 -0700109
110 # reorder tasks by priority
111 task_priority_order = [models.SpecialTask.Task.REPAIR,
112 models.SpecialTask.Task.CLEANUP,
113 models.SpecialTask.Task.VERIFY,
114 models.SpecialTask.Task.RESET,
115 models.SpecialTask.Task.PROVISION]
116 def task_priority_key(task):
117 return task_priority_order.index(task.task)
118 return sorted(queued_tasks, key=task_priority_key)
119
120
121 @classmethod
122 def get_overlapping_jobs(cls):
123 """A helper method to get all active jobs using the same host.
124
125 @return: A list of dictionaries with the hqe id, job_id and host_id
126 of the currently overlapping jobs.
127 """
128 # Filter all active hqes and stand alone special tasks to make sure
129 # a host isn't being used by two jobs at the same time. An incomplete
130 # stand alone special task can share a host with an active hqe, an
131 # example of this is the cleanup scheduled in gathering.
132 hqe_hosts = list(models.HostQueueEntry.objects.filter(
133 active=1, complete=0, host_id__isnull=False).values_list(
134 'host_id', flat=True))
135 special_task_hosts = list(models.SpecialTask.objects.filter(
Prashanth B4ec98672014-05-15 10:44:54 -0700136 is_active=1, is_complete=0, host_id__isnull=False,
137 queue_entry_id__isnull=True).values_list('host_id', flat=True))
Prashanth Bf66d51b2014-05-06 12:42:25 -0700138 host_counts = collections.Counter(
139 hqe_hosts + special_task_hosts).most_common()
140 multiple_hosts = [count[0] for count in host_counts if count[1] > 1]
141 return list(models.HostQueueEntry.objects.filter(
142 host_id__in=multiple_hosts, active=True).values(
143 'id', 'job_id', 'host_id'))
144
145
146_timer = stats.Timer('scheduler.host_query_manager')
147class AFEHostQueryManager(object):
148 """Query manager for AFE Hosts."""
149
150 def __init__(self):
151 """Create an AFEHostQueryManager.
152
153 @param db: A connection to the database with the afe_hosts table.
154 """
155 self._db = scheduler_lib.ConnectionManager().get_connection()
156
157
158 def _process_many2many_dict(self, rows, flip=False):
159 result = {}
160 for row in rows:
161 left_id, right_id = int(row[0]), int(row[1])
162 if flip:
163 left_id, right_id = right_id, left_id
164 result.setdefault(left_id, set()).add(right_id)
165 return result
166
167
168 def _get_sql_id_list(self, id_list):
169 return ','.join(str(item_id) for item_id in id_list)
170
171
172 def _get_many2many_dict(self, query, id_list, flip=False):
173 if not id_list:
174 return {}
175 query %= self._get_sql_id_list(id_list)
176 rows = self._db.execute(query)
177 return self._process_many2many_dict(rows, flip)
178
179
180 @_timer.decorate
181 def _get_ready_hosts(self):
182 # We don't lose anything by re-doing these checks
183 # even though we release hosts on the same conditions.
184 # In the future we might have multiple clients that
185 # release_hosts and/or lock them independent of the
186 # scheduler tick.
187 hosts = scheduler_models.Host.fetch(
188 where="NOT afe_hosts.leased "
189 "AND NOT afe_hosts.locked "
190 "AND (afe_hosts.status IS NULL "
191 "OR afe_hosts.status = 'Ready')")
192 return dict((host.id, host) for host in hosts)
193
194
195 @_timer.decorate
196 def _get_job_acl_groups(self, job_ids):
197 query = """
198 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
199 FROM afe_jobs
200 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
201 INNER JOIN afe_acl_groups_users ON
202 afe_acl_groups_users.user_id = afe_users.id
203 WHERE afe_jobs.id IN (%s)
204 """
205 return self._get_many2many_dict(query, job_ids)
206
207
208 @_timer.decorate
209 def _get_job_ineligible_hosts(self, job_ids):
210 query = """
211 SELECT job_id, host_id
212 FROM afe_ineligible_host_queues
213 WHERE job_id IN (%s)
214 """
215 return self._get_many2many_dict(query, job_ids)
216
217
218 @_timer.decorate
219 def _get_job_dependencies(self, job_ids):
220 query = """
221 SELECT job_id, label_id
222 FROM afe_jobs_dependency_labels
223 WHERE job_id IN (%s)
224 """
225 return self._get_many2many_dict(query, job_ids)
226
227 @_timer.decorate
228 def _get_host_acls(self, host_ids):
229 query = """
230 SELECT host_id, aclgroup_id
231 FROM afe_acl_groups_hosts
232 WHERE host_id IN (%s)
233 """
234 return self._get_many2many_dict(query, host_ids)
235
236
237 @_timer.decorate
238 def _get_label_hosts(self, host_ids):
239 if not host_ids:
240 return {}, {}
241 query = """
242 SELECT label_id, host_id
243 FROM afe_hosts_labels
244 WHERE host_id IN (%s)
245 """ % self._get_sql_id_list(host_ids)
246 rows = self._db.execute(query)
247 labels_to_hosts = self._process_many2many_dict(rows)
248 hosts_to_labels = self._process_many2many_dict(rows, flip=True)
249 return labels_to_hosts, hosts_to_labels
250
251
252 @classmethod
253 def find_unused_healty_hosts(cls):
254 """Get hosts that are currently unused and in the READY state.
255
256 @return: A list of host objects, one for each unused healthy host.
257 """
258 # Avoid any host with a currently active queue entry against it.
259 hqe_join = ('LEFT JOIN afe_host_queue_entries AS active_hqe '
260 'ON (afe_hosts.id = active_hqe.host_id AND '
261 'active_hqe.active)')
262
263 # Avoid any host with a new special task against it. There are 2 cases
264 # when an inactive but incomplete special task will not use the host
265 # this tick: 1. When the host is locked 2. When an active hqe already
266 # has special tasks for the same host. In both these cases this host
267 # will not be in the ready hosts list anyway. In all other cases,
268 # an incomplete special task will grab the host before a new job does
269 # by assigning an agent to it.
270 special_task_join = ('LEFT JOIN afe_special_tasks as new_tasks '
271 'ON (afe_hosts.id = new_tasks.host_id AND '
272 'new_tasks.is_complete=0)')
273
274 return scheduler_models.Host.fetch(
275 joins='%s %s' % (hqe_join, special_task_join),
276 where="active_hqe.host_id IS NULL AND new_tasks.host_id IS NULL "
277 "AND afe_hosts.leased "
278 "AND NOT afe_hosts.locked "
279 "AND (afe_hosts.status IS NULL "
280 "OR afe_hosts.status = 'Ready')")
281
282
283 @_timer.decorate
284 def set_leased(self, leased_value, **kwargs):
285 """Modify the leased bit on the hosts with ids in host_ids.
286
287 @param leased_value: The True/False value of the leased column for
288 the hosts with ids in host_ids.
289 @param kwargs: The args to use in finding matching hosts.
290 """
291 logging.info('Setting leased = %s for the hosts that match %s',
292 leased_value, kwargs)
293 models.Host.objects.filter(**kwargs).update(leased=leased_value)
294
295
296 @_timer.decorate
297 def _get_labels(self, job_dependencies):
298 """
299 Calculate a dict mapping label id to label object so that we don't
300 frequently round trip to the database every time we need a label.
301
302 @param job_dependencies: A dict mapping an integer job id to a list of
303 integer label id's. ie. {job_id: [label_id]}
304 @return: A dict mapping an integer label id to a scheduler model label
305 object. ie. {label_id: label_object}
306
307 """
308 id_to_label = dict()
309 # Pull all the labels on hosts we might look at
310 host_labels = scheduler_models.Label.fetch(
311 where="id IN (SELECT label_id FROM afe_hosts_labels)")
312 id_to_label.update([(label.id, label) for label in host_labels])
313 # and pull all the labels on jobs we might look at.
314 job_label_set = set()
315 for job_deps in job_dependencies.values():
316 job_label_set.update(job_deps)
317 # On the rare/impossible chance that no jobs have any labels, we
318 # can skip this.
319 if job_label_set:
320 job_string_label_list = ','.join([str(x) for x in job_label_set])
321 job_labels = scheduler_models.Label.fetch(
322 where="id IN (%s)" % job_string_label_list)
323 id_to_label.update([(label.id, label) for label in job_labels])
324 return id_to_label
325
326
327 @_timer.decorate
328 def refresh(self, pending_queue_entries):
329 """Update the query manager.
330
331 Cache information about a list of queue entries and eligible hosts
332 from the database so clients can avoid expensive round trips during
333 host acquisition.
334
335 @param pending_queue_entries: A list of queue entries about which we
336 need information.
337 """
338 self._hosts_available = self._get_ready_hosts()
339 relevant_jobs = [queue_entry.job_id
340 for queue_entry in pending_queue_entries]
341 self._job_acls = self._get_job_acl_groups(relevant_jobs)
342 self._ineligible_hosts = (self._get_job_ineligible_hosts(relevant_jobs))
343 self._job_dependencies = (self._get_job_dependencies(relevant_jobs))
344 host_ids = self._hosts_available.keys()
345 self._host_acls = self._get_host_acls(host_ids)
346 self._label_hosts, self._host_labels = (
347 self._get_label_hosts(host_ids))
348 self._labels = self._get_labels(self._job_dependencies)
349