Dale Curtis | aa51336 | 2011-03-01 17:27:44 -0800 | [diff] [blame] | 1 | """ |
| 2 | Autotest scheduling utility. |
| 3 | """ |
| 4 | |
| 5 | |
| 6 | import logging |
| 7 | |
| 8 | from autotest_lib.client.common_lib import global_config, utils |
| 9 | from autotest_lib.frontend.afe import models |
| 10 | from autotest_lib.scheduler import metahost_scheduler, scheduler_config |
| 11 | from autotest_lib.scheduler import scheduler_models |
| 12 | |
| 13 | |
| 14 | get_site_metahost_schedulers = utils.import_site_function( |
| 15 | __file__, 'autotest_lib.scheduler.site_metahost_scheduler', |
| 16 | 'get_metahost_schedulers', lambda : ()) |
| 17 | |
| 18 | |
| 19 | class SchedulerError(Exception): |
| 20 | """Raised by HostScheduler when an inconsistent state occurs.""" |
| 21 | |
| 22 | |
| 23 | class BaseHostScheduler(metahost_scheduler.HostSchedulingUtility): |
| 24 | """Handles the logic for choosing when to run jobs and on which hosts. |
| 25 | |
| 26 | This class makes several queries to the database on each tick, building up |
| 27 | some auxiliary data structures and using them to determine which hosts are |
| 28 | eligible to run which jobs, taking into account all the various factors that |
| 29 | affect that. |
| 30 | |
| 31 | In the past this was done with one or two very large, complex database |
| 32 | queries. It has proven much simpler and faster to build these auxiliary |
| 33 | data structures and perform the logic in Python. |
| 34 | """ |
| 35 | def __init__(self, db): |
| 36 | self._db = db |
| 37 | self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers() |
| 38 | |
| 39 | # load site-specific scheduler selected in global_config |
| 40 | site_schedulers_str = global_config.global_config.get_config_value( |
| 41 | scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers', |
| 42 | default='') |
| 43 | site_schedulers = set(site_schedulers_str.split(',')) |
| 44 | for scheduler in get_site_metahost_schedulers(): |
| 45 | if type(scheduler).__name__ in site_schedulers: |
| 46 | # always prepend, so site schedulers take precedence |
| 47 | self._metahost_schedulers = ( |
| 48 | [scheduler] + self._metahost_schedulers) |
| 49 | logging.info('Metahost schedulers: %s', |
| 50 | ', '.join(type(scheduler).__name__ for scheduler |
| 51 | in self._metahost_schedulers)) |
| 52 | |
| 53 | |
| 54 | def _get_ready_hosts(self): |
| 55 | # avoid any host with a currently active queue entry against it |
| 56 | hosts = scheduler_models.Host.fetch( |
| 57 | joins='LEFT JOIN afe_host_queue_entries AS active_hqe ' |
| 58 | 'ON (afe_hosts.id = active_hqe.host_id AND ' |
| 59 | 'active_hqe.active)', |
| 60 | where="active_hqe.host_id IS NULL " |
| 61 | "AND NOT afe_hosts.locked " |
| 62 | "AND (afe_hosts.status IS NULL " |
| 63 | "OR afe_hosts.status = 'Ready')") |
| 64 | return dict((host.id, host) for host in hosts) |
| 65 | |
| 66 | |
| 67 | def _get_sql_id_list(self, id_list): |
| 68 | return ','.join(str(item_id) for item_id in id_list) |
| 69 | |
| 70 | |
| 71 | def _get_many2many_dict(self, query, id_list, flip=False): |
| 72 | if not id_list: |
| 73 | return {} |
| 74 | query %= self._get_sql_id_list(id_list) |
| 75 | rows = self._db.execute(query) |
| 76 | return self._process_many2many_dict(rows, flip) |
| 77 | |
| 78 | |
| 79 | def _process_many2many_dict(self, rows, flip=False): |
| 80 | result = {} |
| 81 | for row in rows: |
| 82 | left_id, right_id = int(row[0]), int(row[1]) |
| 83 | if flip: |
| 84 | left_id, right_id = right_id, left_id |
| 85 | result.setdefault(left_id, set()).add(right_id) |
| 86 | return result |
| 87 | |
| 88 | |
| 89 | def _get_job_acl_groups(self, job_ids): |
| 90 | query = """ |
| 91 | SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id |
| 92 | FROM afe_jobs |
| 93 | INNER JOIN afe_users ON afe_users.login = afe_jobs.owner |
| 94 | INNER JOIN afe_acl_groups_users ON |
| 95 | afe_acl_groups_users.user_id = afe_users.id |
| 96 | WHERE afe_jobs.id IN (%s) |
| 97 | """ |
| 98 | return self._get_many2many_dict(query, job_ids) |
| 99 | |
| 100 | |
| 101 | def _get_job_ineligible_hosts(self, job_ids): |
| 102 | query = """ |
| 103 | SELECT job_id, host_id |
| 104 | FROM afe_ineligible_host_queues |
| 105 | WHERE job_id IN (%s) |
| 106 | """ |
| 107 | return self._get_many2many_dict(query, job_ids) |
| 108 | |
| 109 | |
| 110 | def _get_job_dependencies(self, job_ids): |
| 111 | query = """ |
| 112 | SELECT job_id, label_id |
| 113 | FROM afe_jobs_dependency_labels |
| 114 | WHERE job_id IN (%s) |
| 115 | """ |
| 116 | return self._get_many2many_dict(query, job_ids) |
| 117 | |
| 118 | |
| 119 | def _get_host_acls(self, host_ids): |
| 120 | query = """ |
| 121 | SELECT host_id, aclgroup_id |
| 122 | FROM afe_acl_groups_hosts |
| 123 | WHERE host_id IN (%s) |
| 124 | """ |
| 125 | return self._get_many2many_dict(query, host_ids) |
| 126 | |
| 127 | |
| 128 | def _get_label_hosts(self, host_ids): |
| 129 | if not host_ids: |
| 130 | return {}, {} |
| 131 | query = """ |
| 132 | SELECT label_id, host_id |
| 133 | FROM afe_hosts_labels |
| 134 | WHERE host_id IN (%s) |
| 135 | """ % self._get_sql_id_list(host_ids) |
| 136 | rows = self._db.execute(query) |
| 137 | labels_to_hosts = self._process_many2many_dict(rows) |
| 138 | hosts_to_labels = self._process_many2many_dict(rows, flip=True) |
| 139 | return labels_to_hosts, hosts_to_labels |
| 140 | |
| 141 | |
| 142 | def _get_labels(self): |
| 143 | return dict((label.id, label) for label |
| 144 | in scheduler_models.Label.fetch()) |
| 145 | |
| 146 | |
| 147 | def recovery_on_startup(self): |
| 148 | for metahost_scheduler in self._metahost_schedulers: |
| 149 | metahost_scheduler.recovery_on_startup() |
| 150 | |
| 151 | |
| 152 | def refresh(self, pending_queue_entries): |
| 153 | self._hosts_available = self._get_ready_hosts() |
| 154 | |
| 155 | relevant_jobs = [queue_entry.job_id |
| 156 | for queue_entry in pending_queue_entries] |
| 157 | self._job_acls = self._get_job_acl_groups(relevant_jobs) |
| 158 | self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs) |
| 159 | self._job_dependencies = self._get_job_dependencies(relevant_jobs) |
| 160 | |
| 161 | host_ids = self._hosts_available.keys() |
| 162 | self._host_acls = self._get_host_acls(host_ids) |
| 163 | self._label_hosts, self._host_labels = self._get_label_hosts(host_ids) |
| 164 | |
| 165 | self._labels = self._get_labels() |
| 166 | |
| 167 | |
| 168 | def tick(self): |
| 169 | for metahost_scheduler in self._metahost_schedulers: |
| 170 | metahost_scheduler.tick() |
| 171 | |
| 172 | |
| 173 | def hosts_in_label(self, label_id): |
| 174 | return set(self._label_hosts.get(label_id, ())) |
| 175 | |
| 176 | |
| 177 | def remove_host_from_label(self, host_id, label_id): |
| 178 | self._label_hosts[label_id].remove(host_id) |
| 179 | |
| 180 | |
| 181 | def pop_host(self, host_id): |
| 182 | return self._hosts_available.pop(host_id) |
| 183 | |
| 184 | |
| 185 | def ineligible_hosts_for_entry(self, queue_entry): |
| 186 | return set(self._ineligible_hosts.get(queue_entry.job_id, ())) |
| 187 | |
| 188 | |
| 189 | def _is_acl_accessible(self, host_id, queue_entry): |
| 190 | job_acls = self._job_acls.get(queue_entry.job_id, set()) |
| 191 | host_acls = self._host_acls.get(host_id, set()) |
| 192 | return len(host_acls.intersection(job_acls)) > 0 |
| 193 | |
| 194 | |
| 195 | def _check_job_dependencies(self, job_dependencies, host_labels): |
| 196 | missing = job_dependencies - host_labels |
| 197 | return len(missing) == 0 |
| 198 | |
| 199 | |
| 200 | def _check_only_if_needed_labels(self, job_dependencies, host_labels, |
| 201 | queue_entry): |
| 202 | if not queue_entry.meta_host: |
| 203 | # bypass only_if_needed labels when a specific host is selected |
| 204 | return True |
| 205 | |
| 206 | for label_id in host_labels: |
| 207 | label = self._labels[label_id] |
| 208 | if not label.only_if_needed: |
| 209 | # we don't care about non-only_if_needed labels |
| 210 | continue |
| 211 | if queue_entry.meta_host == label_id: |
| 212 | # if the label was requested in a metahost it's OK |
| 213 | continue |
| 214 | if label_id not in job_dependencies: |
| 215 | return False |
| 216 | return True |
| 217 | |
| 218 | |
| 219 | def _check_atomic_group_labels(self, host_labels, queue_entry): |
| 220 | """ |
| 221 | Determine if the given HostQueueEntry's atomic group settings are okay |
| 222 | to schedule on a host with the given labels. |
| 223 | |
| 224 | @param host_labels: A list of label ids that the host has. |
| 225 | @param queue_entry: The HostQueueEntry being considered for the host. |
| 226 | |
| 227 | @returns True if atomic group settings are okay, False otherwise. |
| 228 | """ |
| 229 | return (self._get_host_atomic_group_id(host_labels, queue_entry) == |
| 230 | queue_entry.atomic_group_id) |
| 231 | |
| 232 | |
| 233 | def _get_host_atomic_group_id(self, host_labels, queue_entry=None): |
| 234 | """ |
| 235 | Return the atomic group label id for a host with the given set of |
| 236 | labels if any, or None otherwise. Raises an exception if more than |
| 237 | one atomic group are found in the set of labels. |
| 238 | |
| 239 | @param host_labels: A list of label ids that the host has. |
| 240 | @param queue_entry: The HostQueueEntry we're testing. Only used for |
| 241 | extra info in a potential logged error message. |
| 242 | |
| 243 | @returns The id of the atomic group found on a label in host_labels |
| 244 | or None if no atomic group label is found. |
| 245 | """ |
| 246 | atomic_labels = [self._labels[label_id] for label_id in host_labels |
| 247 | if self._labels[label_id].atomic_group_id is not None] |
| 248 | atomic_ids = set(label.atomic_group_id for label in atomic_labels) |
| 249 | if not atomic_ids: |
| 250 | return None |
| 251 | if len(atomic_ids) > 1: |
| 252 | logging.error('More than one Atomic Group on HQE "%s" via: %r', |
| 253 | queue_entry, atomic_labels) |
| 254 | return atomic_ids.pop() |
| 255 | |
| 256 | |
| 257 | def _get_atomic_group_labels(self, atomic_group_id): |
| 258 | """ |
| 259 | Lookup the label ids that an atomic_group is associated with. |
| 260 | |
| 261 | @param atomic_group_id - The id of the AtomicGroup to look up. |
| 262 | |
| 263 | @returns A generator yeilding Label ids for this atomic group. |
| 264 | """ |
| 265 | return (id for id, label in self._labels.iteritems() |
| 266 | if label.atomic_group_id == atomic_group_id |
| 267 | and not label.invalid) |
| 268 | |
| 269 | |
| 270 | def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry): |
| 271 | """ |
| 272 | @param group_hosts - A sequence of Host ids to test for usability |
| 273 | and eligibility against the Job associated with queue_entry. |
| 274 | @param queue_entry - The HostQueueEntry that these hosts are being |
| 275 | tested for eligibility against. |
| 276 | |
| 277 | @returns A subset of group_hosts Host ids that are eligible for the |
| 278 | supplied queue_entry. |
| 279 | """ |
| 280 | return set(host_id for host_id in group_hosts |
| 281 | if self.is_host_usable(host_id) |
| 282 | and self.is_host_eligible_for_job(host_id, queue_entry)) |
| 283 | |
| 284 | |
| 285 | def is_host_eligible_for_job(self, host_id, queue_entry): |
| 286 | if self._is_host_invalid(host_id): |
| 287 | # if an invalid host is scheduled for a job, it's a one-time host |
| 288 | # and it therefore bypasses eligibility checks. note this can only |
| 289 | # happen for non-metahosts, because invalid hosts have their label |
| 290 | # relationships cleared. |
| 291 | return True |
| 292 | |
| 293 | job_dependencies = self._job_dependencies.get(queue_entry.job_id, set()) |
| 294 | host_labels = self._host_labels.get(host_id, set()) |
| 295 | |
| 296 | return (self._is_acl_accessible(host_id, queue_entry) and |
| 297 | self._check_job_dependencies(job_dependencies, host_labels) and |
| 298 | self._check_only_if_needed_labels( |
| 299 | job_dependencies, host_labels, queue_entry) and |
| 300 | self._check_atomic_group_labels(host_labels, queue_entry)) |
| 301 | |
| 302 | |
| 303 | def _is_host_invalid(self, host_id): |
| 304 | host_object = self._hosts_available.get(host_id, None) |
| 305 | return host_object and host_object.invalid |
| 306 | |
| 307 | |
| 308 | def _schedule_non_metahost(self, queue_entry): |
| 309 | if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry): |
| 310 | return None |
| 311 | return self._hosts_available.pop(queue_entry.host_id, None) |
| 312 | |
| 313 | |
| 314 | def is_host_usable(self, host_id): |
| 315 | if host_id not in self._hosts_available: |
| 316 | # host was already used during this scheduling cycle |
| 317 | return False |
| 318 | if self._hosts_available[host_id].invalid: |
| 319 | # Invalid hosts cannot be used for metahosts. They're included in |
| 320 | # the original query because they can be used by non-metahosts. |
| 321 | return False |
| 322 | return True |
| 323 | |
| 324 | |
| 325 | def schedule_entry(self, queue_entry): |
| 326 | if queue_entry.host_id is not None: |
| 327 | return self._schedule_non_metahost(queue_entry) |
| 328 | |
| 329 | for scheduler in self._metahost_schedulers: |
| 330 | if scheduler.can_schedule_metahost(queue_entry): |
| 331 | scheduler.schedule_metahost(queue_entry, self) |
| 332 | return None |
| 333 | |
| 334 | raise SchedulerError('No metahost scheduler to handle %s' % queue_entry) |
| 335 | |
| 336 | |
| 337 | def find_eligible_atomic_group(self, queue_entry): |
| 338 | """ |
| 339 | Given an atomic group host queue entry, locate an appropriate group |
| 340 | of hosts for the associated job to run on. |
| 341 | |
| 342 | The caller is responsible for creating new HQEs for the additional |
| 343 | hosts returned in order to run the actual job on them. |
| 344 | |
| 345 | @returns A list of Host instances in a ready state to satisfy this |
| 346 | atomic group scheduling. Hosts will all belong to the same |
| 347 | atomic group label as specified by the queue_entry. |
| 348 | An empty list will be returned if no suitable atomic |
| 349 | group could be found. |
| 350 | |
| 351 | TODO(gps): what is responsible for kicking off any attempted repairs on |
| 352 | a group of hosts? not this function, but something needs to. We do |
| 353 | not communicate that reason for returning [] outside of here... |
| 354 | For now, we'll just be unschedulable if enough hosts within one group |
| 355 | enter Repair Failed state. |
| 356 | """ |
| 357 | assert queue_entry.atomic_group_id is not None |
| 358 | job = queue_entry.job |
| 359 | assert job.synch_count and job.synch_count > 0 |
| 360 | atomic_group = queue_entry.atomic_group |
| 361 | if job.synch_count > atomic_group.max_number_of_machines: |
| 362 | # Such a Job and HostQueueEntry should never be possible to |
| 363 | # create using the frontend. Regardless, we can't process it. |
| 364 | # Abort it immediately and log an error on the scheduler. |
| 365 | queue_entry.set_status(models.HostQueueEntry.Status.ABORTED) |
| 366 | logging.error( |
| 367 | 'Error: job %d synch_count=%d > requested atomic_group %d ' |
| 368 | 'max_number_of_machines=%d. Aborted host_queue_entry %d.', |
| 369 | job.id, job.synch_count, atomic_group.id, |
| 370 | atomic_group.max_number_of_machines, queue_entry.id) |
| 371 | return [] |
| 372 | hosts_in_label = self.hosts_in_label(queue_entry.meta_host) |
| 373 | ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry) |
| 374 | |
| 375 | # Look in each label associated with atomic_group until we find one with |
| 376 | # enough hosts to satisfy the job. |
| 377 | for atomic_label_id in self._get_atomic_group_labels(atomic_group.id): |
| 378 | group_hosts = set(self.hosts_in_label(atomic_label_id)) |
| 379 | if queue_entry.meta_host is not None: |
| 380 | # If we have a metahost label, only allow its hosts. |
| 381 | group_hosts.intersection_update(hosts_in_label) |
| 382 | group_hosts -= ineligible_host_ids |
| 383 | eligible_host_ids_in_group = self._get_eligible_host_ids_in_group( |
| 384 | group_hosts, queue_entry) |
| 385 | |
| 386 | # Job.synch_count is treated as "minimum synch count" when |
| 387 | # scheduling for an atomic group of hosts. The atomic group |
| 388 | # number of machines is the maximum to pick out of a single |
| 389 | # atomic group label for scheduling at one time. |
| 390 | min_hosts = job.synch_count |
| 391 | max_hosts = atomic_group.max_number_of_machines |
| 392 | |
| 393 | if len(eligible_host_ids_in_group) < min_hosts: |
| 394 | # Not enough eligible hosts in this atomic group label. |
| 395 | continue |
| 396 | |
| 397 | eligible_hosts_in_group = [self._hosts_available[id] |
| 398 | for id in eligible_host_ids_in_group] |
| 399 | # So that they show up in a sane order when viewing the job. |
| 400 | eligible_hosts_in_group.sort(cmp=scheduler_models.Host.cmp_for_sort) |
| 401 | |
| 402 | # Limit ourselves to scheduling the atomic group size. |
| 403 | if len(eligible_hosts_in_group) > max_hosts: |
| 404 | eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts] |
| 405 | |
| 406 | # Remove the selected hosts from our cached internal state |
| 407 | # of available hosts in order to return the Host objects. |
| 408 | host_list = [] |
| 409 | for host in eligible_hosts_in_group: |
| 410 | hosts_in_label.discard(host.id) |
| 411 | self._hosts_available.pop(host.id) |
| 412 | host_list.append(host) |
| 413 | return host_list |
| 414 | |
| 415 | return [] |
| 416 | |
| 417 | |
| 418 | site_host_scheduler = utils.import_site_class( |
| 419 | __file__, 'autotest_lib.scheduler.site_host_scheduler', |
| 420 | 'site_host_scheduler', BaseHostScheduler) |
| 421 | |
| 422 | |
| 423 | class HostScheduler(site_host_scheduler): |
| 424 | pass |