blob: fa80ce0320a1836a12f8096cb6592d62f3ef0ae7 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8"""Host scheduler.
9
10If run as a standalone service, the host scheduler ensures the following:
11 1. Hosts will not be assigned to multiple hqes simultaneously. The process
12 of assignment in this case refers to the modification of the host_id
13 column of a row in the afe_host_queue_entries table, to reflect the host
14 id of a leased host matching the dependencies of the job.
15 2. Hosts that are not being used by active hqes or incomplete special tasks
16 will be released back to the available hosts pool, for acquisition by
17 subsequent hqes.
18In addition to these guarantees, the host scheduler also confirms that no 2
19active hqes/special tasks are assigned the same host, and sets the leased bit
20for hosts needed by frontend special tasks. The need for the latter is only
21apparent when viewed in the context of the job-scheduler (monitor_db), which
22runs special tasks only after their hosts have been leased.
Dale Curtisaa513362011-03-01 17:27:44 -080023"""
24
Prashanth B4ec98672014-05-15 10:44:54 -070025import argparse
Prashanth Bf66d51b2014-05-06 12:42:25 -070026import collections
Fang Dengc9b1be82014-10-20 17:54:20 -070027import datetime
Dale Curtisaa513362011-03-01 17:27:44 -080028import logging
Prashanth B4ec98672014-05-15 10:44:54 -070029import os
30import signal
31import sys
32import time
Dale Curtisaa513362011-03-01 17:27:44 -080033
Prashanth B4ec98672014-05-15 10:44:54 -070034import common
35from autotest_lib.frontend import setup_django_environment
36
Fang Dengc9b1be82014-10-20 17:54:20 -070037from autotest_lib.client.common_lib import host_queue_entry_states
Prashanth B4ec98672014-05-15 10:44:54 -070038from autotest_lib.client.common_lib import global_config
Fang Dengc9b1be82014-10-20 17:54:20 -070039from autotest_lib.client.common_lib.cros.graphite import es_utils
Michael Liangda8c60a2014-06-03 13:24:51 -070040from autotest_lib.client.common_lib.cros.graphite import stats
Fang Dengc9b1be82014-10-20 17:54:20 -070041from autotest_lib.server import constants
Prashanth B4ec98672014-05-15 10:44:54 -070042from autotest_lib.scheduler import email_manager
Prashanth Bf66d51b2014-05-06 12:42:25 -070043from autotest_lib.scheduler import query_managers
44from autotest_lib.scheduler import rdb_lib
45from autotest_lib.scheduler import rdb_utils
Prashanth B4ec98672014-05-15 10:44:54 -070046from autotest_lib.scheduler import scheduler_lib
47from autotest_lib.scheduler import scheduler_models
Dale Curtisaa513362011-03-01 17:27:44 -080048
Prashanth B4ec98672014-05-15 10:44:54 -070049_db_manager = None
50_shutdown = False
51_tick_pause_sec = global_config.global_config.get_config_value(
52 'SCHEDULER', 'tick_pause_sec', type=int, default=5)
53_monitor_db_host_acquisition = global_config.global_config.get_config_value(
Prashanth Bee0dca22014-05-28 12:16:57 -070054 'SCHEDULER', 'inline_host_acquisition', type=bool, default=True)
Prashanth B4ec98672014-05-15 10:44:54 -070055
Dale Curtisaa513362011-03-01 17:27:44 -080056
Prashanth Bf66d51b2014-05-06 12:42:25 -070057class BaseHostScheduler(object):
58 """Base class containing host acquisition logic.
Dale Curtisaa513362011-03-01 17:27:44 -080059
Prashanth Bf66d51b2014-05-06 12:42:25 -070060 This class contains all the core host acquisition logic needed by the
61 scheduler to run jobs on hosts. It is only capable of releasing hosts
62 back to the rdb through its tick, any other action must be instigated by
63 the job scheduler.
Dale Curtisaa513362011-03-01 17:27:44 -080064 """
Fang Deng1d6c2a02013-04-17 15:25:45 -070065
66
Prashanth Bf66d51b2014-05-06 12:42:25 -070067 _timer = stats.Timer('base_host_scheduler')
68 host_assignment = collections.namedtuple('host_assignment', ['host', 'job'])
Fang Deng1d6c2a02013-04-17 15:25:45 -070069
70
Prashanth Bf66d51b2014-05-06 12:42:25 -070071 def __init__(self):
72 self.host_query_manager = query_managers.AFEHostQueryManager()
Dale Curtisaa513362011-03-01 17:27:44 -080073
74
Fang Deng1d6c2a02013-04-17 15:25:45 -070075 @_timer.decorate
beepscc9fc702013-12-02 12:45:38 -080076 def _release_hosts(self):
77 """Release hosts to the RDB.
78
79 Release all hosts that are ready and are currently not being used by an
80 active hqe, and don't have a new special task scheduled against them.
81 """
Prashanth Bf66d51b2014-05-06 12:42:25 -070082 release_hostnames = [host.hostname for host in
83 self.host_query_manager.find_unused_healty_hosts()]
84 if release_hostnames:
85 self.host_query_manager.set_leased(
86 False, hostname__in=release_hostnames)
beepscc9fc702013-12-02 12:45:38 -080087
88
Prashanth Bf66d51b2014-05-06 12:42:25 -070089 @classmethod
90 def schedule_host_job(cls, host, queue_entry):
91 """Schedule a job on a host.
Dale Curtisaa513362011-03-01 17:27:44 -080092
Prashanth Bf66d51b2014-05-06 12:42:25 -070093 Scheduling a job involves:
94 1. Setting the active bit on the queue_entry.
95 2. Scheduling a special task on behalf of the queue_entry.
96 Performing these actions will lead the job scheduler through a chain of
97 events, culminating in running the test and collecting results from
98 the host.
Dale Curtisaa513362011-03-01 17:27:44 -080099
Prashanth Bf66d51b2014-05-06 12:42:25 -0700100 @param host: The host against which to schedule the job.
101 @param queue_entry: The queue_entry to schedule.
Dale Curtisaa513362011-03-01 17:27:44 -0800102 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700103 if queue_entry.host_id is None:
104 queue_entry.set_host(host)
105 elif host.id != queue_entry.host_id:
106 raise rdb_utils.RDBException('The rdb returned host: %s '
107 'but the job:%s was already assigned a host: %s. ' %
108 (host.hostname, queue_entry.job_id,
109 queue_entry.host.hostname))
110 queue_entry.update_field('active', True)
111
112 # TODO: crbug.com/373936. The host scheduler should only be assigning
113 # jobs to hosts, but the criterion we use to release hosts depends
114 # on it not being used by an active hqe. Since we're activating the
115 # hqe here, we also need to schedule its first prejob task. OTOH,
116 # we could converge to having the host scheduler manager all special
117 # tasks, since their only use today is to verify/cleanup/reset a host.
118 logging.info('Scheduling pre job tasks for entry: %s', queue_entry)
119 queue_entry.schedule_pre_job_tasks()
Dale Curtisaa513362011-03-01 17:27:44 -0800120
121
Prashanth Bf66d51b2014-05-06 12:42:25 -0700122 @classmethod
123 def find_hosts_for_jobs(cls, host_jobs):
124 """Find and verify hosts for a list of jobs.
125
126 @param host_jobs: A list of queue entries that either require hosts,
127 or require host assignment validation through the rdb.
128 @return: A list of tuples of the form (host, queue_entry) for each
129 valid host-queue_entry assignment.
Dale Curtisaa513362011-03-01 17:27:44 -0800130 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700131 jobs_with_hosts = []
132 hosts = rdb_lib.acquire_hosts(host_jobs)
133 for host, job in zip(hosts, host_jobs):
134 if host:
135 jobs_with_hosts.append(cls.host_assignment(host, job))
136 return jobs_with_hosts
Dale Curtisaa513362011-03-01 17:27:44 -0800137
138
Fang Deng1d6c2a02013-04-17 15:25:45 -0700139 @_timer.decorate
Dale Curtisaa513362011-03-01 17:27:44 -0800140 def tick(self):
Prashanth Bf66d51b2014-05-06 12:42:25 -0700141 """Schedule core host management activities."""
beepscc9fc702013-12-02 12:45:38 -0800142 self._release_hosts()
Dale Curtisaa513362011-03-01 17:27:44 -0800143
144
Prashanth B4ec98672014-05-15 10:44:54 -0700145class HostScheduler(BaseHostScheduler):
146 """A scheduler capable managing host acquisition for new jobs."""
147
148 _timer = stats.Timer('host_scheduler')
149
150
151 def __init__(self):
152 super(HostScheduler, self).__init__()
153 self.job_query_manager = query_managers.AFEJobQueryManager()
154
155
Fang Dengc9b1be82014-10-20 17:54:20 -0700156 @classmethod
157 def _record_host_assignment(cls, host, queue_entry):
158 """Record how long it takes to assign a host to a job in metadata db.
159
160 @param host: A Host object.
161 @param queue_entry: A HostQueueEntry object.
162 """
163 secs_in_queued = (datetime.datetime.now() -
164 queue_entry.job.created_on).total_seconds()
165 metadata = {
166 'job_id': queue_entry.job_id,
167 'hostname': host.hostname,
168 'status': host_queue_entry_states.Status.QUEUED,
169 'duration': secs_in_queued}
170 es_utils.ESMetadata().post(
171 type_str=constants.JOB_TIME_BREAKDOWN_KEY, metadata=metadata)
172
173
Prashanth B4ec98672014-05-15 10:44:54 -0700174 @_timer.decorate
175 def _schedule_jobs(self):
176 """Schedule new jobs against hosts."""
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -0700177
178 key = 'host_scheduler.jobs_per_tick'
179 new_jobs_with_hosts = 0
Prashanth B4ec98672014-05-15 10:44:54 -0700180 queue_entries = self.job_query_manager.get_pending_queue_entries(
181 only_hostless=False)
182 unverified_host_jobs = [job for job in queue_entries
183 if not job.is_hostless()]
184 if not unverified_host_jobs:
185 return
186 for acquisition in self.find_hosts_for_jobs(unverified_host_jobs):
187 self.schedule_host_job(acquisition.host, acquisition.job)
Fang Dengc9b1be82014-10-20 17:54:20 -0700188 self._record_host_assignment(acquisition.host, acquisition.job)
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -0700189 new_jobs_with_hosts += 1
190 stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
191 stats.Gauge(key).send('new_jobs_without_hosts',
192 len(unverified_host_jobs) - new_jobs_with_hosts)
Prashanth B4ec98672014-05-15 10:44:54 -0700193
194
195 @_timer.decorate
196 def _lease_hosts_of_frontend_tasks(self):
197 """Lease hosts of tasks scheduled through the frontend."""
198 # We really don't need to get all the special tasks here, just the ones
199 # without hqes, but reusing the method used by the scheduler ensures
200 # we prioritize the same way.
201 lease_hostnames = [
202 task.host.hostname for task in
203 self.job_query_manager.get_prioritized_special_tasks(
204 only_tasks_with_leased_hosts=False)
205 if task.queue_entry_id is None and not task.host.leased]
206 # Leasing a leased hosts here shouldn't be a problem:
207 # 1. The only way a host can be leased is if it's been assigned to
208 # an active hqe or another similar frontend task, but doing so will
209 # have already precluded it from the list of tasks returned by the
210 # job_query_manager.
211 # 2. The unleasing is done based on global conditions. Eg: Even if a
212 # task has already leased a host and we lease it again, the
213 # host scheduler won't release the host till both tasks are complete.
214 if lease_hostnames:
215 self.host_query_manager.set_leased(
216 True, hostname__in=lease_hostnames)
217
218
219 @_timer.decorate
220 def _check_host_assignments(self):
221 """Sanity check the current host assignments."""
222 # Move this into a periodic cleanup if pressed for performance.
223 message = ''
224 subject = 'Unexpected host assignments'
225 for offending_job in self.job_query_manager.get_overlapping_jobs():
226 # TODO: crbug.com/375551
227 message += ('HQE %s is using a host in use by another job. This '
228 'could be because of a frontend special task, in which '
229 'case they will only use the host sequentially. ' %
230 offending_job)
231 if message:
232 email_manager.manager.enqueue_notify_email(subject, message)
233
234
235 @_timer.decorate
236 def tick(self):
237 logging.info('Calling new tick.')
238 logging.info('Leasing hosts for frontend tasks.')
239 self._lease_hosts_of_frontend_tasks()
240 logging.info('Finding hosts for new jobs.')
241 self._schedule_jobs()
242 logging.info('Releasing unused hosts.')
243 self._release_hosts()
244 logging.info('Checking host assignments.')
245 self._check_host_assignments()
246 logging.info('Calling email_manager.')
247 email_manager.manager.send_queued_emails()
248
249
Prashanth Bf66d51b2014-05-06 12:42:25 -0700250class DummyHostScheduler(BaseHostScheduler):
251 """A dummy host scheduler that doesn't acquire or release hosts."""
252
253 def __init__(self):
254 pass
Dale Curtisaa513362011-03-01 17:27:44 -0800255
256
Prashanth Bf66d51b2014-05-06 12:42:25 -0700257 def tick(self):
258 pass
Prashanth B4ec98672014-05-15 10:44:54 -0700259
260
261def handle_signal(signum, frame):
262 """Sigint handler so we don't crash mid-tick."""
263 global _shutdown
264 _shutdown = True
265 logging.info("Shutdown request received.")
266
267
268def initialize(testing=False):
269 """Initialize the host scheduler."""
270 if testing:
271 # Don't import testing utilities unless we're in testing mode,
272 # as the database imports have side effects.
273 from autotest_lib.scheduler import rdb_testing_utils
274 rdb_testing_utils.FileDatabaseHelper().initialize_database_for_testing(
275 db_file_path=rdb_testing_utils.FileDatabaseHelper.DB_FILE)
276 global _db_manager
277 _db_manager = scheduler_lib.ConnectionManager()
278 scheduler_lib.setup_logging(
279 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
280 None, timestamped_logfile_prefix='host_scheduler')
281 logging.info("Setting signal handler")
282 signal.signal(signal.SIGINT, handle_signal)
283 signal.signal(signal.SIGTERM, handle_signal)
284 scheduler_models.initialize()
285
286
287def parse_arguments(argv):
288 """
289 Parse command line arguments
290
291 @param argv: argument list to parse
292 @returns: parsed arguments.
293 """
294 parser = argparse.ArgumentParser(description='Host scheduler.')
295 parser.add_argument('--testing', action='store_true', default=False,
296 help='Start the host scheduler in testing mode.')
Dan Shif6c65bd2014-08-29 16:15:07 -0700297 parser.add_argument('--production',
298 help=('Indicate that scheduler is running in production'
299 ' environment and it can use database that is not'
300 ' hosted in localhost. If it is set to False, '
301 'scheduler will fail if database is not in '
302 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700303 action='store_true', default=False)
Dan Shif6c65bd2014-08-29 16:15:07 -0700304 options = parser.parse_args(argv)
305
306 return options
Prashanth B4ec98672014-05-15 10:44:54 -0700307
308
309def main():
310 if _monitor_db_host_acquisition:
311 logging.info('Please set inline_host_acquisition=False in the shadow '
312 'config before starting the host scheduler.')
313 # The upstart job for the host scheduler understands exit(0) to mean
314 # 'don't respawn'. This is desirable when the job scheduler is acquiring
315 # hosts inline.
316 sys.exit(0)
317 try:
Dan Shif6c65bd2014-08-29 16:15:07 -0700318 options = parse_arguments(sys.argv[1:])
319 scheduler_lib.check_production_settings(options)
320
321 initialize(options.testing)
322
Prashanth B4ec98672014-05-15 10:44:54 -0700323 host_scheduler = HostScheduler()
324 while not _shutdown:
325 host_scheduler.tick()
326 time.sleep(_tick_pause_sec)
327 except Exception:
328 email_manager.manager.log_stacktrace(
329 'Uncaught exception; terminating host_scheduler.')
Prashanth B3f31d5b2014-06-20 12:13:57 -0700330 raise
331 finally:
332 email_manager.manager.send_queued_emails()
Dan Shif6c65bd2014-08-29 16:15:07 -0700333 if _db_manager:
334 _db_manager.disconnect()
Prashanth B4ec98672014-05-15 10:44:54 -0700335
336
337if __name__ == '__main__':
338 main()