blob: 4a8d7cb0ae0987eb9220688c44bb1023bdbf37c8 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8"""Host scheduler.
9
10If run as a standalone service, the host scheduler ensures the following:
11 1. Hosts will not be assigned to multiple hqes simultaneously. The process
12 of assignment in this case refers to the modification of the host_id
13 column of a row in the afe_host_queue_entries table, to reflect the host
14 id of a leased host matching the dependencies of the job.
15 2. Hosts that are not being used by active hqes or incomplete special tasks
16 will be released back to the available hosts pool, for acquisition by
17 subsequent hqes.
18In addition to these guarantees, the host scheduler also confirms that no 2
19active hqes/special tasks are assigned the same host, and sets the leased bit
20for hosts needed by frontend special tasks. The need for the latter is only
21apparent when viewed in the context of the job-scheduler (monitor_db), which
22runs special tasks only after their hosts have been leased.
Dale Curtisaa513362011-03-01 17:27:44 -080023"""
24
Prashanth B4ec98672014-05-15 10:44:54 -070025import argparse
Prashanth Bf66d51b2014-05-06 12:42:25 -070026import collections
Fang Dengc9b1be82014-10-20 17:54:20 -070027import datetime
Dale Curtisaa513362011-03-01 17:27:44 -080028import logging
Prashanth B4ec98672014-05-15 10:44:54 -070029import os
30import signal
31import sys
32import time
Dale Curtisaa513362011-03-01 17:27:44 -080033
Prashanth B4ec98672014-05-15 10:44:54 -070034import common
35from autotest_lib.frontend import setup_django_environment
36
37from autotest_lib.client.common_lib import global_config
Michael Liangda8c60a2014-06-03 13:24:51 -070038from autotest_lib.client.common_lib.cros.graphite import stats
Prashanth B4ec98672014-05-15 10:44:54 -070039from autotest_lib.scheduler import email_manager
Prashanth Bf66d51b2014-05-06 12:42:25 -070040from autotest_lib.scheduler import query_managers
41from autotest_lib.scheduler import rdb_lib
42from autotest_lib.scheduler import rdb_utils
Prashanth B4ec98672014-05-15 10:44:54 -070043from autotest_lib.scheduler import scheduler_lib
44from autotest_lib.scheduler import scheduler_models
Fang Deng042c1472014-10-23 13:56:41 -070045from autotest_lib.site_utils import job_overhead
Dale Curtisaa513362011-03-01 17:27:44 -080046
Prashanth B4ec98672014-05-15 10:44:54 -070047_db_manager = None
48_shutdown = False
49_tick_pause_sec = global_config.global_config.get_config_value(
50 'SCHEDULER', 'tick_pause_sec', type=int, default=5)
51_monitor_db_host_acquisition = global_config.global_config.get_config_value(
Prashanth Bee0dca22014-05-28 12:16:57 -070052 'SCHEDULER', 'inline_host_acquisition', type=bool, default=True)
Prashanth B4ec98672014-05-15 10:44:54 -070053
Dale Curtisaa513362011-03-01 17:27:44 -080054
Prashanth Bf66d51b2014-05-06 12:42:25 -070055class BaseHostScheduler(object):
56 """Base class containing host acquisition logic.
Dale Curtisaa513362011-03-01 17:27:44 -080057
Prashanth Bf66d51b2014-05-06 12:42:25 -070058 This class contains all the core host acquisition logic needed by the
59 scheduler to run jobs on hosts. It is only capable of releasing hosts
60 back to the rdb through its tick, any other action must be instigated by
61 the job scheduler.
Dale Curtisaa513362011-03-01 17:27:44 -080062 """
Fang Deng1d6c2a02013-04-17 15:25:45 -070063
64
Prashanth Bf66d51b2014-05-06 12:42:25 -070065 _timer = stats.Timer('base_host_scheduler')
66 host_assignment = collections.namedtuple('host_assignment', ['host', 'job'])
Fang Deng1d6c2a02013-04-17 15:25:45 -070067
68
Prashanth Bf66d51b2014-05-06 12:42:25 -070069 def __init__(self):
70 self.host_query_manager = query_managers.AFEHostQueryManager()
Dale Curtisaa513362011-03-01 17:27:44 -080071
72
Fang Deng1d6c2a02013-04-17 15:25:45 -070073 @_timer.decorate
beepscc9fc702013-12-02 12:45:38 -080074 def _release_hosts(self):
75 """Release hosts to the RDB.
76
77 Release all hosts that are ready and are currently not being used by an
78 active hqe, and don't have a new special task scheduled against them.
79 """
Prashanth Bf66d51b2014-05-06 12:42:25 -070080 release_hostnames = [host.hostname for host in
81 self.host_query_manager.find_unused_healty_hosts()]
82 if release_hostnames:
83 self.host_query_manager.set_leased(
84 False, hostname__in=release_hostnames)
beepscc9fc702013-12-02 12:45:38 -080085
86
Prashanth Bf66d51b2014-05-06 12:42:25 -070087 @classmethod
88 def schedule_host_job(cls, host, queue_entry):
89 """Schedule a job on a host.
Dale Curtisaa513362011-03-01 17:27:44 -080090
Prashanth Bf66d51b2014-05-06 12:42:25 -070091 Scheduling a job involves:
92 1. Setting the active bit on the queue_entry.
93 2. Scheduling a special task on behalf of the queue_entry.
94 Performing these actions will lead the job scheduler through a chain of
95 events, culminating in running the test and collecting results from
96 the host.
Dale Curtisaa513362011-03-01 17:27:44 -080097
Prashanth Bf66d51b2014-05-06 12:42:25 -070098 @param host: The host against which to schedule the job.
99 @param queue_entry: The queue_entry to schedule.
Dale Curtisaa513362011-03-01 17:27:44 -0800100 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700101 if queue_entry.host_id is None:
102 queue_entry.set_host(host)
103 elif host.id != queue_entry.host_id:
104 raise rdb_utils.RDBException('The rdb returned host: %s '
105 'but the job:%s was already assigned a host: %s. ' %
106 (host.hostname, queue_entry.job_id,
107 queue_entry.host.hostname))
108 queue_entry.update_field('active', True)
109
110 # TODO: crbug.com/373936. The host scheduler should only be assigning
111 # jobs to hosts, but the criterion we use to release hosts depends
112 # on it not being used by an active hqe. Since we're activating the
113 # hqe here, we also need to schedule its first prejob task. OTOH,
114 # we could converge to having the host scheduler manager all special
115 # tasks, since their only use today is to verify/cleanup/reset a host.
116 logging.info('Scheduling pre job tasks for entry: %s', queue_entry)
117 queue_entry.schedule_pre_job_tasks()
Dale Curtisaa513362011-03-01 17:27:44 -0800118
119
Prashanth Bf66d51b2014-05-06 12:42:25 -0700120 @classmethod
121 def find_hosts_for_jobs(cls, host_jobs):
122 """Find and verify hosts for a list of jobs.
123
124 @param host_jobs: A list of queue entries that either require hosts,
125 or require host assignment validation through the rdb.
126 @return: A list of tuples of the form (host, queue_entry) for each
127 valid host-queue_entry assignment.
Dale Curtisaa513362011-03-01 17:27:44 -0800128 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700129 jobs_with_hosts = []
130 hosts = rdb_lib.acquire_hosts(host_jobs)
131 for host, job in zip(hosts, host_jobs):
132 if host:
133 jobs_with_hosts.append(cls.host_assignment(host, job))
134 return jobs_with_hosts
Dale Curtisaa513362011-03-01 17:27:44 -0800135
136
Fang Deng1d6c2a02013-04-17 15:25:45 -0700137 @_timer.decorate
Dale Curtisaa513362011-03-01 17:27:44 -0800138 def tick(self):
Prashanth Bf66d51b2014-05-06 12:42:25 -0700139 """Schedule core host management activities."""
beepscc9fc702013-12-02 12:45:38 -0800140 self._release_hosts()
Dale Curtisaa513362011-03-01 17:27:44 -0800141
142
Prashanth B4ec98672014-05-15 10:44:54 -0700143class HostScheduler(BaseHostScheduler):
144 """A scheduler capable managing host acquisition for new jobs."""
145
146 _timer = stats.Timer('host_scheduler')
147
148
149 def __init__(self):
150 super(HostScheduler, self).__init__()
151 self.job_query_manager = query_managers.AFEJobQueryManager()
152
153
Fang Dengc9b1be82014-10-20 17:54:20 -0700154 @classmethod
155 def _record_host_assignment(cls, host, queue_entry):
156 """Record how long it takes to assign a host to a job in metadata db.
157
158 @param host: A Host object.
159 @param queue_entry: A HostQueueEntry object.
160 """
161 secs_in_queued = (datetime.datetime.now() -
162 queue_entry.job.created_on).total_seconds()
Fang Deng042c1472014-10-23 13:56:41 -0700163 job_overhead.record_state_duration(
164 queue_entry.job_id, host.hostname,
165 job_overhead.STATUS.QUEUED, secs_in_queued)
Fang Dengc9b1be82014-10-20 17:54:20 -0700166
167
Prashanth B4ec98672014-05-15 10:44:54 -0700168 @_timer.decorate
169 def _schedule_jobs(self):
170 """Schedule new jobs against hosts."""
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -0700171
172 key = 'host_scheduler.jobs_per_tick'
173 new_jobs_with_hosts = 0
Prashanth B4ec98672014-05-15 10:44:54 -0700174 queue_entries = self.job_query_manager.get_pending_queue_entries(
175 only_hostless=False)
176 unverified_host_jobs = [job for job in queue_entries
177 if not job.is_hostless()]
178 if not unverified_host_jobs:
179 return
180 for acquisition in self.find_hosts_for_jobs(unverified_host_jobs):
181 self.schedule_host_job(acquisition.host, acquisition.job)
Fang Dengc9b1be82014-10-20 17:54:20 -0700182 self._record_host_assignment(acquisition.host, acquisition.job)
Prashanth Balasubramanian628bfcf2014-10-02 12:44:13 -0700183 new_jobs_with_hosts += 1
184 stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
185 stats.Gauge(key).send('new_jobs_without_hosts',
186 len(unverified_host_jobs) - new_jobs_with_hosts)
Prashanth B4ec98672014-05-15 10:44:54 -0700187
188
189 @_timer.decorate
190 def _lease_hosts_of_frontend_tasks(self):
191 """Lease hosts of tasks scheduled through the frontend."""
192 # We really don't need to get all the special tasks here, just the ones
193 # without hqes, but reusing the method used by the scheduler ensures
194 # we prioritize the same way.
195 lease_hostnames = [
196 task.host.hostname for task in
197 self.job_query_manager.get_prioritized_special_tasks(
198 only_tasks_with_leased_hosts=False)
199 if task.queue_entry_id is None and not task.host.leased]
200 # Leasing a leased hosts here shouldn't be a problem:
201 # 1. The only way a host can be leased is if it's been assigned to
202 # an active hqe or another similar frontend task, but doing so will
203 # have already precluded it from the list of tasks returned by the
204 # job_query_manager.
205 # 2. The unleasing is done based on global conditions. Eg: Even if a
206 # task has already leased a host and we lease it again, the
207 # host scheduler won't release the host till both tasks are complete.
208 if lease_hostnames:
209 self.host_query_manager.set_leased(
210 True, hostname__in=lease_hostnames)
211
212
213 @_timer.decorate
214 def _check_host_assignments(self):
215 """Sanity check the current host assignments."""
216 # Move this into a periodic cleanup if pressed for performance.
217 message = ''
218 subject = 'Unexpected host assignments'
219 for offending_job in self.job_query_manager.get_overlapping_jobs():
220 # TODO: crbug.com/375551
221 message += ('HQE %s is using a host in use by another job. This '
222 'could be because of a frontend special task, in which '
223 'case they will only use the host sequentially. ' %
224 offending_job)
225 if message:
226 email_manager.manager.enqueue_notify_email(subject, message)
227
228
229 @_timer.decorate
230 def tick(self):
231 logging.info('Calling new tick.')
232 logging.info('Leasing hosts for frontend tasks.')
233 self._lease_hosts_of_frontend_tasks()
234 logging.info('Finding hosts for new jobs.')
235 self._schedule_jobs()
236 logging.info('Releasing unused hosts.')
237 self._release_hosts()
238 logging.info('Checking host assignments.')
239 self._check_host_assignments()
240 logging.info('Calling email_manager.')
241 email_manager.manager.send_queued_emails()
242
243
Prashanth Bf66d51b2014-05-06 12:42:25 -0700244class DummyHostScheduler(BaseHostScheduler):
245 """A dummy host scheduler that doesn't acquire or release hosts."""
246
247 def __init__(self):
248 pass
Dale Curtisaa513362011-03-01 17:27:44 -0800249
250
Prashanth Bf66d51b2014-05-06 12:42:25 -0700251 def tick(self):
252 pass
Prashanth B4ec98672014-05-15 10:44:54 -0700253
254
255def handle_signal(signum, frame):
256 """Sigint handler so we don't crash mid-tick."""
257 global _shutdown
258 _shutdown = True
259 logging.info("Shutdown request received.")
260
261
262def initialize(testing=False):
263 """Initialize the host scheduler."""
264 if testing:
265 # Don't import testing utilities unless we're in testing mode,
266 # as the database imports have side effects.
267 from autotest_lib.scheduler import rdb_testing_utils
268 rdb_testing_utils.FileDatabaseHelper().initialize_database_for_testing(
269 db_file_path=rdb_testing_utils.FileDatabaseHelper.DB_FILE)
270 global _db_manager
271 _db_manager = scheduler_lib.ConnectionManager()
272 scheduler_lib.setup_logging(
273 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
274 None, timestamped_logfile_prefix='host_scheduler')
275 logging.info("Setting signal handler")
276 signal.signal(signal.SIGINT, handle_signal)
277 signal.signal(signal.SIGTERM, handle_signal)
278 scheduler_models.initialize()
279
280
281def parse_arguments(argv):
282 """
283 Parse command line arguments
284
285 @param argv: argument list to parse
286 @returns: parsed arguments.
287 """
288 parser = argparse.ArgumentParser(description='Host scheduler.')
289 parser.add_argument('--testing', action='store_true', default=False,
290 help='Start the host scheduler in testing mode.')
Dan Shif6c65bd2014-08-29 16:15:07 -0700291 parser.add_argument('--production',
292 help=('Indicate that scheduler is running in production'
293 ' environment and it can use database that is not'
294 ' hosted in localhost. If it is set to False, '
295 'scheduler will fail if database is not in '
296 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700297 action='store_true', default=False)
Dan Shif6c65bd2014-08-29 16:15:07 -0700298 options = parser.parse_args(argv)
299
300 return options
Prashanth B4ec98672014-05-15 10:44:54 -0700301
302
303def main():
304 if _monitor_db_host_acquisition:
305 logging.info('Please set inline_host_acquisition=False in the shadow '
306 'config before starting the host scheduler.')
307 # The upstart job for the host scheduler understands exit(0) to mean
308 # 'don't respawn'. This is desirable when the job scheduler is acquiring
309 # hosts inline.
310 sys.exit(0)
311 try:
Dan Shif6c65bd2014-08-29 16:15:07 -0700312 options = parse_arguments(sys.argv[1:])
313 scheduler_lib.check_production_settings(options)
314
315 initialize(options.testing)
316
Prashanth B4ec98672014-05-15 10:44:54 -0700317 host_scheduler = HostScheduler()
318 while not _shutdown:
319 host_scheduler.tick()
320 time.sleep(_tick_pause_sec)
321 except Exception:
322 email_manager.manager.log_stacktrace(
323 'Uncaught exception; terminating host_scheduler.')
Prashanth B3f31d5b2014-06-20 12:13:57 -0700324 raise
325 finally:
326 email_manager.manager.send_queued_emails()
Dan Shif6c65bd2014-08-29 16:15:07 -0700327 if _db_manager:
328 _db_manager.disconnect()
Prashanth B4ec98672014-05-15 10:44:54 -0700329
330
331if __name__ == '__main__':
332 main()