blob: 719b2f283624a11471e232e0481c7bd91da376bc [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8"""Host scheduler.
9
10If run as a standalone service, the host scheduler ensures the following:
11 1. Hosts will not be assigned to multiple hqes simultaneously. The process
12 of assignment in this case refers to the modification of the host_id
13 column of a row in the afe_host_queue_entries table, to reflect the host
14 id of a leased host matching the dependencies of the job.
15 2. Hosts that are not being used by active hqes or incomplete special tasks
16 will be released back to the available hosts pool, for acquisition by
17 subsequent hqes.
18In addition to these guarantees, the host scheduler also confirms that no 2
19active hqes/special tasks are assigned the same host, and sets the leased bit
20for hosts needed by frontend special tasks. The need for the latter is only
21apparent when viewed in the context of the job-scheduler (monitor_db), which
22runs special tasks only after their hosts have been leased.
Fang Deng522bc532014-11-20 17:48:34 -080023
24** Suport minimum duts requirement for suites (non-inline mode) **
25
26Each suite can specify the minimum number of duts it requires by
27dropping a 'suite_min_duts' job keyval which defaults to 0.
28
29When suites are competing for duts, if any suite has not got minimum duts
30it requires, the host scheduler will try to meet the requirement first,
31even if other suite may have higher priority or earlier timestamp. Once
32all suites' minimum duts requirement have been fullfilled, the host
33scheduler will allocate the rest of duts based on job priority and suite job id.
34This is to prevent low priority suites from starving when sharing pool with
35high-priority suites.
36
37Note:
38 1. Prevent potential starvation:
39 We need to carefully choose |suite_min_duts| for both low and high
40 priority suites. If a high priority suite didn't specify it but a low
41 priority one does, the high priority suite can be starved!
42 2. Restart requirement:
43 Restart host scheduler if you manually released a host by setting
44 leased=0 in db. This is needed because host scheduler maintains internal
45 state of host assignment for suites.
46 3. Exchanging duts triggers provisioning:
47 TODO(fdeng): There is a chance two suites can exchange duts,
48 if the two suites are for different builds, the exchange
49 will trigger provisioning. This can be optimized by preferring getting
50 hosts with the same build.
Dale Curtisaa513362011-03-01 17:27:44 -080051"""
52
Prashanth B4ec98672014-05-15 10:44:54 -070053import argparse
Prashanth Bf66d51b2014-05-06 12:42:25 -070054import collections
Fang Dengc9b1be82014-10-20 17:54:20 -070055import datetime
Dale Curtisaa513362011-03-01 17:27:44 -080056import logging
Prashanth B4ec98672014-05-15 10:44:54 -070057import os
58import signal
59import sys
60import time
Dale Curtisaa513362011-03-01 17:27:44 -080061
Prashanth B4ec98672014-05-15 10:44:54 -070062import common
63from autotest_lib.frontend import setup_django_environment
64
65from autotest_lib.client.common_lib import global_config
Michael Liangda8c60a2014-06-03 13:24:51 -070066from autotest_lib.client.common_lib.cros.graphite import stats
Prashanth B4ec98672014-05-15 10:44:54 -070067from autotest_lib.scheduler import email_manager
Prashanth Bf66d51b2014-05-06 12:42:25 -070068from autotest_lib.scheduler import query_managers
69from autotest_lib.scheduler import rdb_lib
70from autotest_lib.scheduler import rdb_utils
Prashanth B4ec98672014-05-15 10:44:54 -070071from autotest_lib.scheduler import scheduler_lib
72from autotest_lib.scheduler import scheduler_models
Fang Deng042c1472014-10-23 13:56:41 -070073from autotest_lib.site_utils import job_overhead
Dan Shib9144a42014-12-01 16:09:32 -080074from autotest_lib.site_utils import server_manager_utils
Dale Curtisaa513362011-03-01 17:27:44 -080075
Prashanth B4ec98672014-05-15 10:44:54 -070076_db_manager = None
77_shutdown = False
78_tick_pause_sec = global_config.global_config.get_config_value(
79 'SCHEDULER', 'tick_pause_sec', type=int, default=5)
80_monitor_db_host_acquisition = global_config.global_config.get_config_value(
Prashanth Bee0dca22014-05-28 12:16:57 -070081 'SCHEDULER', 'inline_host_acquisition', type=bool, default=True)
Prashanth B4ec98672014-05-15 10:44:54 -070082
Dale Curtisaa513362011-03-01 17:27:44 -080083
Fang Deng522bc532014-11-20 17:48:34 -080084class SuiteRecorder(object):
85 """Recording the host assignment for suites.
86
87 The recorder holds two things:
88 * suite_host_num, records how many duts a suite is holding,
89 which is a map <suite_job_id -> num_of_hosts>
90 * hosts_to_suites, records which host is assigned to which
91 suite, it is a map <host_id -> suite_job_id>
92 The two datastructure got updated when a host is assigned to or released
93 by a job.
94
95 The reason to maintain hosts_to_suites is that, when a host is released,
96 we need to know which suite it was leased to. Querying the db for the
97 latest completed job that has run on a host is slow. Therefore, we go with
98 an alternative: keeping a <host id, suite job id> map
99 in memory (for 10K hosts, the map should take less than 1M memory on
100 64-bit machine with python 2.7)
101
102 """
103
104
105 _timer = stats.Timer('suite_recorder')
106
107
108 def __init__(self, job_query_manager):
109 """Initialize.
110
111 @param job_queue_manager: A JobQueueryManager object.
112 """
113 self.job_query_manager = job_query_manager
114 self.suite_host_num, self.hosts_to_suites = (
115 self.job_query_manager.get_suite_host_assignment())
116
117
118 def record_assignment(self, queue_entry):
119 """Record that the hqe has got a host.
120
121 @param queue_entry: A scheduler_models.HostQueueEntry object which has
122 got a host.
123 """
124 parent_id = queue_entry.job.parent_job_id
125 if not parent_id:
126 return
127 if self.hosts_to_suites.get(queue_entry.host_id, None) == parent_id:
128 logging.error('HQE (id: %d, parent_job_id: %d, host: %s) '
129 'seems already recorded', queue_entry.id,
130 parent_id, queue_entry.host.hostname)
131 return
132 num_hosts = self.suite_host_num.get(parent_id, 0)
133 self.suite_host_num[parent_id] = num_hosts + 1
134 self.hosts_to_suites[queue_entry.host_id] = parent_id
135 logging.debug('Suite %d got host %s, currently holding %d hosts',
136 parent_id, queue_entry.host.hostname,
137 self.suite_host_num[parent_id])
138
139
140 def record_release(self, hosts):
141 """Update the record with host releasing event.
142
143 @param hosts: A list of scheduler_models.Host objects.
144 """
145 for host in hosts:
146 if host.id in self.hosts_to_suites:
147 parent_job_id = self.hosts_to_suites.pop(host.id)
148 count = self.suite_host_num[parent_job_id] - 1
149 if count == 0:
150 del self.suite_host_num[parent_job_id]
151 else:
152 self.suite_host_num[parent_job_id] = count
153 logging.debug(
154 'Suite %d releases host %s, currently holding %d hosts',
155 parent_job_id, host.hostname, count)
156
157
158 def get_min_duts(self, suite_job_ids):
159 """Figure out min duts to request.
160
161 Given a set ids of suite jobs, figure out minimum duts to request for
162 each suite. It is determined by two factors: min_duts specified
163 for each suite in its job keyvals, and how many duts a suite is
164 currently holding.
165
166 @param suite_job_ids: A set of suite job ids.
167
168 @returns: A dictionary, the key is suite_job_id, the value
169 is the minimum number of duts to request.
170 """
171 suite_min_duts = self.job_query_manager.get_min_duts_of_suites(
172 suite_job_ids)
173 for parent_id in suite_job_ids:
174 min_duts = suite_min_duts.get(parent_id, 0)
175 cur_duts = self.suite_host_num.get(parent_id, 0)
176 suite_min_duts[parent_id] = max(0, min_duts - cur_duts)
177 logging.debug('Minimum duts to get for suites (suite_id: min_duts): %s',
178 suite_min_duts)
179 return suite_min_duts
180
181
Prashanth Bf66d51b2014-05-06 12:42:25 -0700182class BaseHostScheduler(object):
183 """Base class containing host acquisition logic.
Dale Curtisaa513362011-03-01 17:27:44 -0800184
Prashanth Bf66d51b2014-05-06 12:42:25 -0700185 This class contains all the core host acquisition logic needed by the
186 scheduler to run jobs on hosts. It is only capable of releasing hosts
187 back to the rdb through its tick, any other action must be instigated by
188 the job scheduler.
Dale Curtisaa513362011-03-01 17:27:44 -0800189 """
Fang Deng1d6c2a02013-04-17 15:25:45 -0700190
191
Prashanth Bf66d51b2014-05-06 12:42:25 -0700192 _timer = stats.Timer('base_host_scheduler')
193 host_assignment = collections.namedtuple('host_assignment', ['host', 'job'])
Fang Deng1d6c2a02013-04-17 15:25:45 -0700194
195
Prashanth Bf66d51b2014-05-06 12:42:25 -0700196 def __init__(self):
197 self.host_query_manager = query_managers.AFEHostQueryManager()
Dale Curtisaa513362011-03-01 17:27:44 -0800198
199
Fang Deng1d6c2a02013-04-17 15:25:45 -0700200 @_timer.decorate
beepscc9fc702013-12-02 12:45:38 -0800201 def _release_hosts(self):
202 """Release hosts to the RDB.
203
204 Release all hosts that are ready and are currently not being used by an
205 active hqe, and don't have a new special task scheduled against them.
Fang Deng522bc532014-11-20 17:48:34 -0800206
207 @return a list of hosts that are released.
beepscc9fc702013-12-02 12:45:38 -0800208 """
Fang Deng522bc532014-11-20 17:48:34 -0800209 release_hosts = self.host_query_manager.find_unused_healty_hosts()
210 release_hostnames = [host.hostname for host in release_hosts]
Prashanth Bf66d51b2014-05-06 12:42:25 -0700211 if release_hostnames:
212 self.host_query_manager.set_leased(
213 False, hostname__in=release_hostnames)
Fang Deng522bc532014-11-20 17:48:34 -0800214 return release_hosts
beepscc9fc702013-12-02 12:45:38 -0800215
216
Prashanth Bf66d51b2014-05-06 12:42:25 -0700217 @classmethod
218 def schedule_host_job(cls, host, queue_entry):
219 """Schedule a job on a host.
Dale Curtisaa513362011-03-01 17:27:44 -0800220
Prashanth Bf66d51b2014-05-06 12:42:25 -0700221 Scheduling a job involves:
222 1. Setting the active bit on the queue_entry.
223 2. Scheduling a special task on behalf of the queue_entry.
224 Performing these actions will lead the job scheduler through a chain of
225 events, culminating in running the test and collecting results from
226 the host.
Dale Curtisaa513362011-03-01 17:27:44 -0800227
Prashanth Bf66d51b2014-05-06 12:42:25 -0700228 @param host: The host against which to schedule the job.
229 @param queue_entry: The queue_entry to schedule.
Dale Curtisaa513362011-03-01 17:27:44 -0800230 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700231 if queue_entry.host_id is None:
232 queue_entry.set_host(host)
233 elif host.id != queue_entry.host_id:
234 raise rdb_utils.RDBException('The rdb returned host: %s '
235 'but the job:%s was already assigned a host: %s. ' %
236 (host.hostname, queue_entry.job_id,
237 queue_entry.host.hostname))
238 queue_entry.update_field('active', True)
239
240 # TODO: crbug.com/373936. The host scheduler should only be assigning
241 # jobs to hosts, but the criterion we use to release hosts depends
242 # on it not being used by an active hqe. Since we're activating the
243 # hqe here, we also need to schedule its first prejob task. OTOH,
244 # we could converge to having the host scheduler manager all special
245 # tasks, since their only use today is to verify/cleanup/reset a host.
246 logging.info('Scheduling pre job tasks for entry: %s', queue_entry)
247 queue_entry.schedule_pre_job_tasks()
Dale Curtisaa513362011-03-01 17:27:44 -0800248
249
Fang Deng522bc532014-11-20 17:48:34 -0800250 def acquire_hosts(self, host_jobs):
251 """Accquire hosts for given jobs.
252
253 This method sends jobs that need hosts to rdb.
254 Child class can override this method to pipe more args
255 to rdb.
256
257 @param host_jobs: A list of queue entries that either require hosts,
258 or require host assignment validation through the rdb.
259
260 @param return: A generator that yields an rdb_hosts.RDBClientHostWrapper
261 for each host acquired on behalf of a queue_entry,
262 or None if a host wasn't found.
263 """
264 return rdb_lib.acquire_hosts(host_jobs)
265
266
267 def find_hosts_for_jobs(self, host_jobs):
Prashanth Bf66d51b2014-05-06 12:42:25 -0700268 """Find and verify hosts for a list of jobs.
269
270 @param host_jobs: A list of queue entries that either require hosts,
271 or require host assignment validation through the rdb.
272 @return: A list of tuples of the form (host, queue_entry) for each
273 valid host-queue_entry assignment.
Dale Curtisaa513362011-03-01 17:27:44 -0800274 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700275 jobs_with_hosts = []
Fang Deng522bc532014-11-20 17:48:34 -0800276 hosts = self.acquire_hosts(host_jobs)
Prashanth Bf66d51b2014-05-06 12:42:25 -0700277 for host, job in zip(hosts, host_jobs):
278 if host:
Fang Deng522bc532014-11-20 17:48:34 -0800279 jobs_with_hosts.append(self.host_assignment(host, job))
Prashanth Bf66d51b2014-05-06 12:42:25 -0700280 return jobs_with_hosts
Dale Curtisaa513362011-03-01 17:27:44 -0800281
282
Fang Deng1d6c2a02013-04-17 15:25:45 -0700283 @_timer.decorate
Dale Curtisaa513362011-03-01 17:27:44 -0800284 def tick(self):
Prashanth Bf66d51b2014-05-06 12:42:25 -0700285 """Schedule core host management activities."""
beepscc9fc702013-12-02 12:45:38 -0800286 self._release_hosts()
Dale Curtisaa513362011-03-01 17:27:44 -0800287
288
Prashanth B4ec98672014-05-15 10:44:54 -0700289class HostScheduler(BaseHostScheduler):
290 """A scheduler capable managing host acquisition for new jobs."""
291
292 _timer = stats.Timer('host_scheduler')
293
294
295 def __init__(self):
296 super(HostScheduler, self).__init__()
297 self.job_query_manager = query_managers.AFEJobQueryManager()
Fang Deng522bc532014-11-20 17:48:34 -0800298 # Keeping track on how many hosts each suite is holding
299 # {suite_job_id: num_hosts}
300 self._suite_recorder = SuiteRecorder(self.job_query_manager)
Prashanth B4ec98672014-05-15 10:44:54 -0700301
302
Fang Deng522bc532014-11-20 17:48:34 -0800303 def _record_host_assignment(self, host, queue_entry):
304 """Record that |host| is assigned to |queue_entry|.
305
306 Record:
307 1. How long it takes to assign a host to a job in metadata db.
308 2. Record host assignment of a suite.
Fang Dengc9b1be82014-10-20 17:54:20 -0700309
310 @param host: A Host object.
311 @param queue_entry: A HostQueueEntry object.
312 """
313 secs_in_queued = (datetime.datetime.now() -
314 queue_entry.job.created_on).total_seconds()
Fang Deng042c1472014-10-23 13:56:41 -0700315 job_overhead.record_state_duration(
316 queue_entry.job_id, host.hostname,
317 job_overhead.STATUS.QUEUED, secs_in_queued)
Fang Deng522bc532014-11-20 17:48:34 -0800318 self._suite_recorder.record_assignment(queue_entry)
Fang Dengc9b1be82014-10-20 17:54:20 -0700319
320
Prashanth B4ec98672014-05-15 10:44:54 -0700321 @_timer.decorate
322 def _schedule_jobs(self):
323 """Schedule new jobs against hosts."""
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -0700324
325 key = 'host_scheduler.jobs_per_tick'
326 new_jobs_with_hosts = 0
Prashanth B4ec98672014-05-15 10:44:54 -0700327 queue_entries = self.job_query_manager.get_pending_queue_entries(
328 only_hostless=False)
329 unverified_host_jobs = [job for job in queue_entries
330 if not job.is_hostless()]
331 if not unverified_host_jobs:
332 return
333 for acquisition in self.find_hosts_for_jobs(unverified_host_jobs):
334 self.schedule_host_job(acquisition.host, acquisition.job)
Fang Dengc9b1be82014-10-20 17:54:20 -0700335 self._record_host_assignment(acquisition.host, acquisition.job)
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -0700336 new_jobs_with_hosts += 1
337 stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
338 stats.Gauge(key).send('new_jobs_without_hosts',
339 len(unverified_host_jobs) - new_jobs_with_hosts)
Prashanth B4ec98672014-05-15 10:44:54 -0700340
341
342 @_timer.decorate
343 def _lease_hosts_of_frontend_tasks(self):
344 """Lease hosts of tasks scheduled through the frontend."""
345 # We really don't need to get all the special tasks here, just the ones
346 # without hqes, but reusing the method used by the scheduler ensures
347 # we prioritize the same way.
348 lease_hostnames = [
349 task.host.hostname for task in
350 self.job_query_manager.get_prioritized_special_tasks(
351 only_tasks_with_leased_hosts=False)
352 if task.queue_entry_id is None and not task.host.leased]
353 # Leasing a leased hosts here shouldn't be a problem:
354 # 1. The only way a host can be leased is if it's been assigned to
355 # an active hqe or another similar frontend task, but doing so will
356 # have already precluded it from the list of tasks returned by the
357 # job_query_manager.
358 # 2. The unleasing is done based on global conditions. Eg: Even if a
359 # task has already leased a host and we lease it again, the
360 # host scheduler won't release the host till both tasks are complete.
361 if lease_hostnames:
362 self.host_query_manager.set_leased(
363 True, hostname__in=lease_hostnames)
364
365
Fang Deng522bc532014-11-20 17:48:34 -0800366 def acquire_hosts(self, host_jobs):
367 """Override acquire_hosts.
368
369 This method overrides the method in parent class.
370 It figures out a set of suites that |host_jobs| belong to;
371 and get min_duts requirement for each suite.
372 It pipes min_duts for each suite to rdb.
373
374 """
375 parent_job_ids = set([q.job.parent_job_id
376 for q in host_jobs if q.job.parent_job_id])
377 suite_min_duts = self._suite_recorder.get_min_duts(parent_job_ids)
378 return rdb_lib.acquire_hosts(host_jobs, suite_min_duts)
379
380
Prashanth B4ec98672014-05-15 10:44:54 -0700381 @_timer.decorate
382 def tick(self):
383 logging.info('Calling new tick.')
384 logging.info('Leasing hosts for frontend tasks.')
385 self._lease_hosts_of_frontend_tasks()
386 logging.info('Finding hosts for new jobs.')
387 self._schedule_jobs()
388 logging.info('Releasing unused hosts.')
Fang Deng522bc532014-11-20 17:48:34 -0800389 released_hosts = self._release_hosts()
390 logging.info('Updating suite assignment with released hosts')
391 self._suite_recorder.record_release(released_hosts)
Prashanth B4ec98672014-05-15 10:44:54 -0700392 logging.info('Calling email_manager.')
393 email_manager.manager.send_queued_emails()
394
395
Prashanth Bf66d51b2014-05-06 12:42:25 -0700396class DummyHostScheduler(BaseHostScheduler):
397 """A dummy host scheduler that doesn't acquire or release hosts."""
398
399 def __init__(self):
400 pass
Dale Curtisaa513362011-03-01 17:27:44 -0800401
402
Prashanth Bf66d51b2014-05-06 12:42:25 -0700403 def tick(self):
404 pass
Prashanth B4ec98672014-05-15 10:44:54 -0700405
406
407def handle_signal(signum, frame):
408 """Sigint handler so we don't crash mid-tick."""
409 global _shutdown
410 _shutdown = True
411 logging.info("Shutdown request received.")
412
413
414def initialize(testing=False):
415 """Initialize the host scheduler."""
416 if testing:
417 # Don't import testing utilities unless we're in testing mode,
418 # as the database imports have side effects.
419 from autotest_lib.scheduler import rdb_testing_utils
420 rdb_testing_utils.FileDatabaseHelper().initialize_database_for_testing(
421 db_file_path=rdb_testing_utils.FileDatabaseHelper.DB_FILE)
422 global _db_manager
423 _db_manager = scheduler_lib.ConnectionManager()
424 scheduler_lib.setup_logging(
425 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
426 None, timestamped_logfile_prefix='host_scheduler')
427 logging.info("Setting signal handler")
428 signal.signal(signal.SIGINT, handle_signal)
429 signal.signal(signal.SIGTERM, handle_signal)
430 scheduler_models.initialize()
431
432
433def parse_arguments(argv):
434 """
435 Parse command line arguments
436
437 @param argv: argument list to parse
438 @returns: parsed arguments.
439 """
440 parser = argparse.ArgumentParser(description='Host scheduler.')
441 parser.add_argument('--testing', action='store_true', default=False,
442 help='Start the host scheduler in testing mode.')
Dan Shif6c65bd2014-08-29 16:15:07 -0700443 parser.add_argument('--production',
444 help=('Indicate that scheduler is running in production'
445 ' environment and it can use database that is not'
446 ' hosted in localhost. If it is set to False, '
447 'scheduler will fail if database is not in '
448 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700449 action='store_true', default=False)
Dan Shif6c65bd2014-08-29 16:15:07 -0700450 options = parser.parse_args(argv)
451
452 return options
Prashanth B4ec98672014-05-15 10:44:54 -0700453
454
455def main():
456 if _monitor_db_host_acquisition:
457 logging.info('Please set inline_host_acquisition=False in the shadow '
458 'config before starting the host scheduler.')
459 # The upstart job for the host scheduler understands exit(0) to mean
460 # 'don't respawn'. This is desirable when the job scheduler is acquiring
461 # hosts inline.
462 sys.exit(0)
463 try:
Dan Shif6c65bd2014-08-29 16:15:07 -0700464 options = parse_arguments(sys.argv[1:])
465 scheduler_lib.check_production_settings(options)
466
Dan Shib9144a42014-12-01 16:09:32 -0800467 # If server database is enabled, check if the server has role
468 # `host_scheduler`. If the server does not have host_scheduler role,
469 # exception will be raised and host scheduler will not continue to run.
470 if server_manager_utils.use_server_db():
471 server_manager_utils.confirm_server_has_role(hostname='localhost',
472 role='host_scheduler')
473
Dan Shif6c65bd2014-08-29 16:15:07 -0700474 initialize(options.testing)
475
Prashanth B4ec98672014-05-15 10:44:54 -0700476 host_scheduler = HostScheduler()
477 while not _shutdown:
478 host_scheduler.tick()
479 time.sleep(_tick_pause_sec)
480 except Exception:
481 email_manager.manager.log_stacktrace(
482 'Uncaught exception; terminating host_scheduler.')
Prashanth B3f31d5b2014-06-20 12:13:57 -0700483 raise
484 finally:
485 email_manager.manager.send_queued_emails()
Dan Shif6c65bd2014-08-29 16:15:07 -0700486 if _db_manager:
487 _db_manager.disconnect()
Prashanth B4ec98672014-05-15 10:44:54 -0700488
489
490if __name__ == '__main__':
491 main()