blob: d6325c7fa0f903f00fe3b13fb55eec3021107ad7 [file] [log] [blame]
Prashanth Bf66d51b2014-05-06 12:42:25 -07001#pylint: disable-msg=C0111
2
3# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Scheduler library classes.
8"""
9
10import collections
11import logging
12
13import common
14
15from autotest_lib.frontend import setup_django_environment
16from autotest_lib.frontend.afe import models
17from autotest_lib.scheduler import scheduler_models
18from autotest_lib.site_utils.graphite import stats
19from autotest_lib.scheduler import scheduler_lib
20
21
22class AFEJobQueryManager(object):
23 """Query manager for AFE Jobs."""
24
25 # A subquery to only get inactive hostless jobs.
26 hostless_query = 'host_id IS NULL AND meta_host IS NULL'
27
28
29 def get_pending_queue_entries(self, only_hostless=False):
30 """
31 Fetch a list of new host queue entries.
32
33 The ordering of this list is important, as every new agent
34 we schedule can potentially contribute to the process count
35 on the drone, which has a static limit. The sort order
36 prioritizes jobs as follows:
37 1. High priority jobs: Based on the afe_job's priority
38 2. With hosts and metahosts: This will only happen if we don't
39 activate the hqe after assigning a host to it in
40 schedule_new_jobs.
41 3. With hosts but without metahosts: When tests are scheduled
42 through the frontend the owner of the job would have chosen
43 a host for it.
44 4. Without hosts but with metahosts: This is the common case of
45 a new test that needs a DUT. We assign a host and set it to
46 active so it shouldn't show up in case 2 on the next tick.
47 5. Without hosts and without metahosts: Hostless suite jobs, that
48 will result in new jobs that fall under category 4.
49
50 A note about the ordering of cases 3 and 4:
51 Prioritizing one case above the other leads to earlier acquisition
52 of the following resources: 1. process slots on the drone 2. machines.
53 - When a user schedules a job through the afe they choose a specific
54 host for it. Jobs with metahost can utilize any host that satisfies
55 the metahost criterion. This means that if we had scheduled 4 before
56 3 there is a good chance that a job which could've used another host,
57 will now use the host assigned to a metahost-less job. Given the
58 availability of machines in pool:suites, this almost guarantees
59 starvation for jobs scheduled through the frontend.
60 - Scheduling 4 before 3 also has its pros however, since a suite
61 has the concept of a time out, whereas users can wait. If we hit the
62 process count on the drone a suite can timeout waiting on the test,
63 but a user job generally has a much longer timeout, and relatively
64 harmless consequences.
65 The current ordering was chosed because it is more likely that we will
66 run out of machines in pool:suites than processes on the drone.
67
68 @returns A list of HQEs ordered according to sort_order.
69 """
70 sort_order = ('afe_jobs.priority DESC, '
71 'ISNULL(host_id), '
72 'ISNULL(meta_host), '
73 'parent_job_id, '
74 'job_id')
75 query=('NOT complete AND NOT active AND status="Queued"'
76 'AND NOT aborted')
77 if only_hostless:
78 query = '%s AND (%s)' % (query, self.hostless_query)
79 return list(scheduler_models.HostQueueEntry.fetch(
80 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
81 where=query, order_by=sort_order))
82
83
84 def get_prioritized_special_tasks(self):
85 """
86 Returns all queued SpecialTasks prioritized for repair first, then
87 cleanup, then verify.
88
89 @return: list of afe.models.SpecialTasks sorted according to priority.
90 """
91 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
92 is_complete=False,
93 host__locked=False)
94 # exclude hosts with active queue entries unless the SpecialTask is for
95 # that queue entry
96 queued_tasks = models.SpecialTask.objects.add_join(
97 queued_tasks, 'afe_host_queue_entries', 'host_id',
98 join_condition='afe_host_queue_entries.active',
99 join_from_key='host_id', force_left_join=True)
100 queued_tasks = queued_tasks.extra(
101 where=['(afe_host_queue_entries.id IS NULL OR '
102 'afe_host_queue_entries.id = '
103 'afe_special_tasks.queue_entry_id)'])
104
105 # reorder tasks by priority
106 task_priority_order = [models.SpecialTask.Task.REPAIR,
107 models.SpecialTask.Task.CLEANUP,
108 models.SpecialTask.Task.VERIFY,
109 models.SpecialTask.Task.RESET,
110 models.SpecialTask.Task.PROVISION]
111 def task_priority_key(task):
112 return task_priority_order.index(task.task)
113 return sorted(queued_tasks, key=task_priority_key)
114
115
116 @classmethod
117 def get_overlapping_jobs(cls):
118 """A helper method to get all active jobs using the same host.
119
120 @return: A list of dictionaries with the hqe id, job_id and host_id
121 of the currently overlapping jobs.
122 """
123 # Filter all active hqes and stand alone special tasks to make sure
124 # a host isn't being used by two jobs at the same time. An incomplete
125 # stand alone special task can share a host with an active hqe, an
126 # example of this is the cleanup scheduled in gathering.
127 hqe_hosts = list(models.HostQueueEntry.objects.filter(
128 active=1, complete=0, host_id__isnull=False).values_list(
129 'host_id', flat=True))
130 special_task_hosts = list(models.SpecialTask.objects.filter(
131 is_active=1, is_complete=0, host_id__isnull=False,
132 queue_entry_id__isnull=True).values_list('host_id', flat=True))
133 host_counts = collections.Counter(
134 hqe_hosts + special_task_hosts).most_common()
135 multiple_hosts = [count[0] for count in host_counts if count[1] > 1]
136 return list(models.HostQueueEntry.objects.filter(
137 host_id__in=multiple_hosts, active=True).values(
138 'id', 'job_id', 'host_id'))
139
140
141_timer = stats.Timer('scheduler.host_query_manager')
142class AFEHostQueryManager(object):
143 """Query manager for AFE Hosts."""
144
145 def __init__(self):
146 """Create an AFEHostQueryManager.
147
148 @param db: A connection to the database with the afe_hosts table.
149 """
150 self._db = scheduler_lib.ConnectionManager().get_connection()
151
152
153 def _process_many2many_dict(self, rows, flip=False):
154 result = {}
155 for row in rows:
156 left_id, right_id = int(row[0]), int(row[1])
157 if flip:
158 left_id, right_id = right_id, left_id
159 result.setdefault(left_id, set()).add(right_id)
160 return result
161
162
163 def _get_sql_id_list(self, id_list):
164 return ','.join(str(item_id) for item_id in id_list)
165
166
167 def _get_many2many_dict(self, query, id_list, flip=False):
168 if not id_list:
169 return {}
170 query %= self._get_sql_id_list(id_list)
171 rows = self._db.execute(query)
172 return self._process_many2many_dict(rows, flip)
173
174
175 @_timer.decorate
176 def _get_ready_hosts(self):
177 # We don't lose anything by re-doing these checks
178 # even though we release hosts on the same conditions.
179 # In the future we might have multiple clients that
180 # release_hosts and/or lock them independent of the
181 # scheduler tick.
182 hosts = scheduler_models.Host.fetch(
183 where="NOT afe_hosts.leased "
184 "AND NOT afe_hosts.locked "
185 "AND (afe_hosts.status IS NULL "
186 "OR afe_hosts.status = 'Ready')")
187 return dict((host.id, host) for host in hosts)
188
189
190 @_timer.decorate
191 def _get_job_acl_groups(self, job_ids):
192 query = """
193 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
194 FROM afe_jobs
195 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
196 INNER JOIN afe_acl_groups_users ON
197 afe_acl_groups_users.user_id = afe_users.id
198 WHERE afe_jobs.id IN (%s)
199 """
200 return self._get_many2many_dict(query, job_ids)
201
202
203 @_timer.decorate
204 def _get_job_ineligible_hosts(self, job_ids):
205 query = """
206 SELECT job_id, host_id
207 FROM afe_ineligible_host_queues
208 WHERE job_id IN (%s)
209 """
210 return self._get_many2many_dict(query, job_ids)
211
212
213 @_timer.decorate
214 def _get_job_dependencies(self, job_ids):
215 query = """
216 SELECT job_id, label_id
217 FROM afe_jobs_dependency_labels
218 WHERE job_id IN (%s)
219 """
220 return self._get_many2many_dict(query, job_ids)
221
222 @_timer.decorate
223 def _get_host_acls(self, host_ids):
224 query = """
225 SELECT host_id, aclgroup_id
226 FROM afe_acl_groups_hosts
227 WHERE host_id IN (%s)
228 """
229 return self._get_many2many_dict(query, host_ids)
230
231
232 @_timer.decorate
233 def _get_label_hosts(self, host_ids):
234 if not host_ids:
235 return {}, {}
236 query = """
237 SELECT label_id, host_id
238 FROM afe_hosts_labels
239 WHERE host_id IN (%s)
240 """ % self._get_sql_id_list(host_ids)
241 rows = self._db.execute(query)
242 labels_to_hosts = self._process_many2many_dict(rows)
243 hosts_to_labels = self._process_many2many_dict(rows, flip=True)
244 return labels_to_hosts, hosts_to_labels
245
246
247 @classmethod
248 def find_unused_healty_hosts(cls):
249 """Get hosts that are currently unused and in the READY state.
250
251 @return: A list of host objects, one for each unused healthy host.
252 """
253 # Avoid any host with a currently active queue entry against it.
254 hqe_join = ('LEFT JOIN afe_host_queue_entries AS active_hqe '
255 'ON (afe_hosts.id = active_hqe.host_id AND '
256 'active_hqe.active)')
257
258 # Avoid any host with a new special task against it. There are 2 cases
259 # when an inactive but incomplete special task will not use the host
260 # this tick: 1. When the host is locked 2. When an active hqe already
261 # has special tasks for the same host. In both these cases this host
262 # will not be in the ready hosts list anyway. In all other cases,
263 # an incomplete special task will grab the host before a new job does
264 # by assigning an agent to it.
265 special_task_join = ('LEFT JOIN afe_special_tasks as new_tasks '
266 'ON (afe_hosts.id = new_tasks.host_id AND '
267 'new_tasks.is_complete=0)')
268
269 return scheduler_models.Host.fetch(
270 joins='%s %s' % (hqe_join, special_task_join),
271 where="active_hqe.host_id IS NULL AND new_tasks.host_id IS NULL "
272 "AND afe_hosts.leased "
273 "AND NOT afe_hosts.locked "
274 "AND (afe_hosts.status IS NULL "
275 "OR afe_hosts.status = 'Ready')")
276
277
278 @_timer.decorate
279 def set_leased(self, leased_value, **kwargs):
280 """Modify the leased bit on the hosts with ids in host_ids.
281
282 @param leased_value: The True/False value of the leased column for
283 the hosts with ids in host_ids.
284 @param kwargs: The args to use in finding matching hosts.
285 """
286 logging.info('Setting leased = %s for the hosts that match %s',
287 leased_value, kwargs)
288 models.Host.objects.filter(**kwargs).update(leased=leased_value)
289
290
291 @_timer.decorate
292 def _get_labels(self, job_dependencies):
293 """
294 Calculate a dict mapping label id to label object so that we don't
295 frequently round trip to the database every time we need a label.
296
297 @param job_dependencies: A dict mapping an integer job id to a list of
298 integer label id's. ie. {job_id: [label_id]}
299 @return: A dict mapping an integer label id to a scheduler model label
300 object. ie. {label_id: label_object}
301
302 """
303 id_to_label = dict()
304 # Pull all the labels on hosts we might look at
305 host_labels = scheduler_models.Label.fetch(
306 where="id IN (SELECT label_id FROM afe_hosts_labels)")
307 id_to_label.update([(label.id, label) for label in host_labels])
308 # and pull all the labels on jobs we might look at.
309 job_label_set = set()
310 for job_deps in job_dependencies.values():
311 job_label_set.update(job_deps)
312 # On the rare/impossible chance that no jobs have any labels, we
313 # can skip this.
314 if job_label_set:
315 job_string_label_list = ','.join([str(x) for x in job_label_set])
316 job_labels = scheduler_models.Label.fetch(
317 where="id IN (%s)" % job_string_label_list)
318 id_to_label.update([(label.id, label) for label in job_labels])
319 return id_to_label
320
321
322 @_timer.decorate
323 def refresh(self, pending_queue_entries):
324 """Update the query manager.
325
326 Cache information about a list of queue entries and eligible hosts
327 from the database so clients can avoid expensive round trips during
328 host acquisition.
329
330 @param pending_queue_entries: A list of queue entries about which we
331 need information.
332 """
333 self._hosts_available = self._get_ready_hosts()
334 relevant_jobs = [queue_entry.job_id
335 for queue_entry in pending_queue_entries]
336 self._job_acls = self._get_job_acl_groups(relevant_jobs)
337 self._ineligible_hosts = (self._get_job_ineligible_hosts(relevant_jobs))
338 self._job_dependencies = (self._get_job_dependencies(relevant_jobs))
339 host_ids = self._hosts_available.keys()
340 self._host_acls = self._get_host_acls(host_ids)
341 self._label_hosts, self._host_labels = (
342 self._get_label_hosts(host_ids))
343 self._labels = self._get_labels(self._job_dependencies)
344