blob: 959c9d87b102032984dce6423c0eba365d780006 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8"""Host scheduler.
9
10If run as a standalone service, the host scheduler ensures the following:
11 1. Hosts will not be assigned to multiple hqes simultaneously. The process
12 of assignment in this case refers to the modification of the host_id
13 column of a row in the afe_host_queue_entries table, to reflect the host
14 id of a leased host matching the dependencies of the job.
15 2. Hosts that are not being used by active hqes or incomplete special tasks
16 will be released back to the available hosts pool, for acquisition by
17 subsequent hqes.
18In addition to these guarantees, the host scheduler also confirms that no 2
19active hqes/special tasks are assigned the same host, and sets the leased bit
20for hosts needed by frontend special tasks. The need for the latter is only
21apparent when viewed in the context of the job-scheduler (monitor_db), which
22runs special tasks only after their hosts have been leased.
Dale Curtisaa513362011-03-01 17:27:44 -080023"""
24
Prashanth B4ec98672014-05-15 10:44:54 -070025import argparse
Prashanth Bf66d51b2014-05-06 12:42:25 -070026import collections
Dale Curtisaa513362011-03-01 17:27:44 -080027import logging
Prashanth B4ec98672014-05-15 10:44:54 -070028import os
29import signal
30import sys
31import time
Dale Curtisaa513362011-03-01 17:27:44 -080032
Prashanth B4ec98672014-05-15 10:44:54 -070033import common
34from autotest_lib.frontend import setup_django_environment
35
36from autotest_lib.client.common_lib import global_config
Michael Liangda8c60a2014-06-03 13:24:51 -070037from autotest_lib.client.common_lib.cros.graphite import stats
Prashanth B4ec98672014-05-15 10:44:54 -070038from autotest_lib.scheduler import email_manager
Prashanth Bf66d51b2014-05-06 12:42:25 -070039from autotest_lib.scheduler import query_managers
40from autotest_lib.scheduler import rdb_lib
41from autotest_lib.scheduler import rdb_utils
Prashanth B4ec98672014-05-15 10:44:54 -070042from autotest_lib.scheduler import scheduler_lib
43from autotest_lib.scheduler import scheduler_models
Dale Curtisaa513362011-03-01 17:27:44 -080044
Prashanth B4ec98672014-05-15 10:44:54 -070045_db_manager = None
46_shutdown = False
47_tick_pause_sec = global_config.global_config.get_config_value(
48 'SCHEDULER', 'tick_pause_sec', type=int, default=5)
49_monitor_db_host_acquisition = global_config.global_config.get_config_value(
Prashanth Bee0dca22014-05-28 12:16:57 -070050 'SCHEDULER', 'inline_host_acquisition', type=bool, default=True)
Prashanth B4ec98672014-05-15 10:44:54 -070051
Dale Curtisaa513362011-03-01 17:27:44 -080052
Prashanth Bf66d51b2014-05-06 12:42:25 -070053class BaseHostScheduler(object):
54 """Base class containing host acquisition logic.
Dale Curtisaa513362011-03-01 17:27:44 -080055
Prashanth Bf66d51b2014-05-06 12:42:25 -070056 This class contains all the core host acquisition logic needed by the
57 scheduler to run jobs on hosts. It is only capable of releasing hosts
58 back to the rdb through its tick, any other action must be instigated by
59 the job scheduler.
Dale Curtisaa513362011-03-01 17:27:44 -080060 """
Fang Deng1d6c2a02013-04-17 15:25:45 -070061
62
Prashanth Bf66d51b2014-05-06 12:42:25 -070063 _timer = stats.Timer('base_host_scheduler')
64 host_assignment = collections.namedtuple('host_assignment', ['host', 'job'])
Fang Deng1d6c2a02013-04-17 15:25:45 -070065
66
Prashanth Bf66d51b2014-05-06 12:42:25 -070067 def __init__(self):
68 self.host_query_manager = query_managers.AFEHostQueryManager()
Dale Curtisaa513362011-03-01 17:27:44 -080069
70
Fang Deng1d6c2a02013-04-17 15:25:45 -070071 @_timer.decorate
beepscc9fc702013-12-02 12:45:38 -080072 def _release_hosts(self):
73 """Release hosts to the RDB.
74
75 Release all hosts that are ready and are currently not being used by an
76 active hqe, and don't have a new special task scheduled against them.
77 """
Prashanth Bf66d51b2014-05-06 12:42:25 -070078 release_hostnames = [host.hostname for host in
79 self.host_query_manager.find_unused_healty_hosts()]
80 if release_hostnames:
81 self.host_query_manager.set_leased(
82 False, hostname__in=release_hostnames)
beepscc9fc702013-12-02 12:45:38 -080083
84
Prashanth Bf66d51b2014-05-06 12:42:25 -070085 @classmethod
86 def schedule_host_job(cls, host, queue_entry):
87 """Schedule a job on a host.
Dale Curtisaa513362011-03-01 17:27:44 -080088
Prashanth Bf66d51b2014-05-06 12:42:25 -070089 Scheduling a job involves:
90 1. Setting the active bit on the queue_entry.
91 2. Scheduling a special task on behalf of the queue_entry.
92 Performing these actions will lead the job scheduler through a chain of
93 events, culminating in running the test and collecting results from
94 the host.
Dale Curtisaa513362011-03-01 17:27:44 -080095
Prashanth Bf66d51b2014-05-06 12:42:25 -070096 @param host: The host against which to schedule the job.
97 @param queue_entry: The queue_entry to schedule.
Dale Curtisaa513362011-03-01 17:27:44 -080098 """
Prashanth Bf66d51b2014-05-06 12:42:25 -070099 if queue_entry.host_id is None:
100 queue_entry.set_host(host)
101 elif host.id != queue_entry.host_id:
102 raise rdb_utils.RDBException('The rdb returned host: %s '
103 'but the job:%s was already assigned a host: %s. ' %
104 (host.hostname, queue_entry.job_id,
105 queue_entry.host.hostname))
106 queue_entry.update_field('active', True)
107
108 # TODO: crbug.com/373936. The host scheduler should only be assigning
109 # jobs to hosts, but the criterion we use to release hosts depends
110 # on it not being used by an active hqe. Since we're activating the
111 # hqe here, we also need to schedule its first prejob task. OTOH,
112 # we could converge to having the host scheduler manager all special
113 # tasks, since their only use today is to verify/cleanup/reset a host.
114 logging.info('Scheduling pre job tasks for entry: %s', queue_entry)
115 queue_entry.schedule_pre_job_tasks()
Dale Curtisaa513362011-03-01 17:27:44 -0800116
117
Prashanth Bf66d51b2014-05-06 12:42:25 -0700118 @classmethod
119 def find_hosts_for_jobs(cls, host_jobs):
120 """Find and verify hosts for a list of jobs.
121
122 @param host_jobs: A list of queue entries that either require hosts,
123 or require host assignment validation through the rdb.
124 @return: A list of tuples of the form (host, queue_entry) for each
125 valid host-queue_entry assignment.
Dale Curtisaa513362011-03-01 17:27:44 -0800126 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700127 jobs_with_hosts = []
128 hosts = rdb_lib.acquire_hosts(host_jobs)
129 for host, job in zip(hosts, host_jobs):
130 if host:
131 jobs_with_hosts.append(cls.host_assignment(host, job))
132 return jobs_with_hosts
Dale Curtisaa513362011-03-01 17:27:44 -0800133
134
Fang Deng1d6c2a02013-04-17 15:25:45 -0700135 @_timer.decorate
Dale Curtisaa513362011-03-01 17:27:44 -0800136 def tick(self):
Prashanth Bf66d51b2014-05-06 12:42:25 -0700137 """Schedule core host management activities."""
beepscc9fc702013-12-02 12:45:38 -0800138 self._release_hosts()
Dale Curtisaa513362011-03-01 17:27:44 -0800139
140
Prashanth B4ec98672014-05-15 10:44:54 -0700141class HostScheduler(BaseHostScheduler):
142 """A scheduler capable managing host acquisition for new jobs."""
143
144 _timer = stats.Timer('host_scheduler')
145
146
147 def __init__(self):
148 super(HostScheduler, self).__init__()
149 self.job_query_manager = query_managers.AFEJobQueryManager()
150
151
152 @_timer.decorate
153 def _schedule_jobs(self):
154 """Schedule new jobs against hosts."""
155 queue_entries = self.job_query_manager.get_pending_queue_entries(
156 only_hostless=False)
157 unverified_host_jobs = [job for job in queue_entries
158 if not job.is_hostless()]
159 if not unverified_host_jobs:
160 return
161 for acquisition in self.find_hosts_for_jobs(unverified_host_jobs):
162 self.schedule_host_job(acquisition.host, acquisition.job)
163
164
165 @_timer.decorate
166 def _lease_hosts_of_frontend_tasks(self):
167 """Lease hosts of tasks scheduled through the frontend."""
168 # We really don't need to get all the special tasks here, just the ones
169 # without hqes, but reusing the method used by the scheduler ensures
170 # we prioritize the same way.
171 lease_hostnames = [
172 task.host.hostname for task in
173 self.job_query_manager.get_prioritized_special_tasks(
174 only_tasks_with_leased_hosts=False)
175 if task.queue_entry_id is None and not task.host.leased]
176 # Leasing a leased hosts here shouldn't be a problem:
177 # 1. The only way a host can be leased is if it's been assigned to
178 # an active hqe or another similar frontend task, but doing so will
179 # have already precluded it from the list of tasks returned by the
180 # job_query_manager.
181 # 2. The unleasing is done based on global conditions. Eg: Even if a
182 # task has already leased a host and we lease it again, the
183 # host scheduler won't release the host till both tasks are complete.
184 if lease_hostnames:
185 self.host_query_manager.set_leased(
186 True, hostname__in=lease_hostnames)
187
188
189 @_timer.decorate
190 def _check_host_assignments(self):
191 """Sanity check the current host assignments."""
192 # Move this into a periodic cleanup if pressed for performance.
193 message = ''
194 subject = 'Unexpected host assignments'
195 for offending_job in self.job_query_manager.get_overlapping_jobs():
196 # TODO: crbug.com/375551
197 message += ('HQE %s is using a host in use by another job. This '
198 'could be because of a frontend special task, in which '
199 'case they will only use the host sequentially. ' %
200 offending_job)
201 if message:
202 email_manager.manager.enqueue_notify_email(subject, message)
203
204
205 @_timer.decorate
206 def tick(self):
207 logging.info('Calling new tick.')
208 logging.info('Leasing hosts for frontend tasks.')
209 self._lease_hosts_of_frontend_tasks()
210 logging.info('Finding hosts for new jobs.')
211 self._schedule_jobs()
212 logging.info('Releasing unused hosts.')
213 self._release_hosts()
214 logging.info('Checking host assignments.')
215 self._check_host_assignments()
216 logging.info('Calling email_manager.')
217 email_manager.manager.send_queued_emails()
218
219
Prashanth Bf66d51b2014-05-06 12:42:25 -0700220class DummyHostScheduler(BaseHostScheduler):
221 """A dummy host scheduler that doesn't acquire or release hosts."""
222
223 def __init__(self):
224 pass
Dale Curtisaa513362011-03-01 17:27:44 -0800225
226
Prashanth Bf66d51b2014-05-06 12:42:25 -0700227 def tick(self):
228 pass
Prashanth B4ec98672014-05-15 10:44:54 -0700229
230
231def handle_signal(signum, frame):
232 """Sigint handler so we don't crash mid-tick."""
233 global _shutdown
234 _shutdown = True
235 logging.info("Shutdown request received.")
236
237
238def initialize(testing=False):
239 """Initialize the host scheduler."""
240 if testing:
241 # Don't import testing utilities unless we're in testing mode,
242 # as the database imports have side effects.
243 from autotest_lib.scheduler import rdb_testing_utils
244 rdb_testing_utils.FileDatabaseHelper().initialize_database_for_testing(
245 db_file_path=rdb_testing_utils.FileDatabaseHelper.DB_FILE)
246 global _db_manager
247 _db_manager = scheduler_lib.ConnectionManager()
248 scheduler_lib.setup_logging(
249 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
250 None, timestamped_logfile_prefix='host_scheduler')
251 logging.info("Setting signal handler")
252 signal.signal(signal.SIGINT, handle_signal)
253 signal.signal(signal.SIGTERM, handle_signal)
254 scheduler_models.initialize()
255
256
257def parse_arguments(argv):
258 """
259 Parse command line arguments
260
261 @param argv: argument list to parse
262 @returns: parsed arguments.
263 """
264 parser = argparse.ArgumentParser(description='Host scheduler.')
265 parser.add_argument('--testing', action='store_true', default=False,
266 help='Start the host scheduler in testing mode.')
267 return parser.parse_args(argv)
268
269
270def main():
271 if _monitor_db_host_acquisition:
272 logging.info('Please set inline_host_acquisition=False in the shadow '
273 'config before starting the host scheduler.')
274 # The upstart job for the host scheduler understands exit(0) to mean
275 # 'don't respawn'. This is desirable when the job scheduler is acquiring
276 # hosts inline.
277 sys.exit(0)
278 try:
279 initialize(parse_arguments(sys.argv[1:]).testing)
280 host_scheduler = HostScheduler()
281 while not _shutdown:
282 host_scheduler.tick()
283 time.sleep(_tick_pause_sec)
284 except Exception:
285 email_manager.manager.log_stacktrace(
286 'Uncaught exception; terminating host_scheduler.')
287 email_manager.manager.send_queued_emails()
288 _db_manager.disconnect()
289
290
291if __name__ == '__main__':
292 main()